自定义服务发现与负载均衡

服务发现

Discover trait 提供了自定义服务发现的能力,其支持自定义静态或可订阅的服务发现能力。

Trait 定义

/// [`Instance`] contains information of an instance from the target service.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Instance {
    pub address: Address,
    pub weight: u32,
    pub tags: HashMap<Cow<'static, str>, Cow<'static, str>>,
}

/// Change indicates the change of the service discover.
///
/// Change contains the difference between the current discovery result and the previous one.
/// It is designed for providing detail information when dispatching an event for service
/// discovery result change.
///
/// Since the loadbalancer may rely on caching the result of discover to improve performance,
/// the discover implementation should dispatch an event when result changes.
#[derive(Debug, Clone)]
pub struct Change<K> {
    /// `key` should be the same as the output of `WatchableDiscover::key`,
    /// which is often used by cache.
    pub key: K,
    pub all: Vec<Arc<Instance>>,
    pub added: Vec<Arc<Instance>>,
    pub updated: Vec<Arc<Instance>>,
    pub removed: Vec<Arc<Instance>>,
}

/// [`Discover`] is the most basic trait for Discover.
pub trait Discover: Send + Sync + 'static {
    /// `Key` identifies a set of instances, such as the cluster name.
    type Key: Hash + PartialEq + Eq + Send + Sync + Clone + 'static;
    /// `Error` is the discovery error.
    type Error: std::error::Error + Send + Sync;
    /// `DiscFut` is a Future object which returns a discovery result.
    type DiscFut<'future>: Future<Output = Result<Vec<Arc<Instance>>, Self::Error>> + Send + 'future;

    /// `discover` allows to request an endpoint and return a discover future.
    fn discover(&self, endpoint: &Endpoint) -> Self::DiscFut<'_>;
    /// `key` should return a key suitable for cache.
    fn key(&self, endpoint: &Endpoint) -> Self::Key;
    /// `watch` should return a [`async_broadcast::Receiver`] which can be used to subscribe
    /// [`Change`].
    fn watch(&self) -> Option<Receiver<Change<Self::Key>>>;
}

示例

pub struct StaticDiscover {
    instances: Vec<Arc<Instance>>,
}

impl Discover for StaticDiscover {
    type Key = ();
    type Error = Infallible;
    type DiscFut<'a> = impl Future<Output = Result<Vec<Arc<Instance>>, Self::Error>> + 'a;

    fn discover(&self, _: &Endpoint) -> Self::DiscFut<'_> {
        async { Ok(self.instances.clone()) }
    }

    fn key(&self, _: &Endpoint) -> Self::Key {}

    fn watch(&self) -> Option<async_broadcast::Receiver<Change<Self::Key>>> {
        None
    }
}

负载均衡

Volo 提供基于 LoadBalance trait 自定义负载均衡策略的能力:

/// [`LoadBalance`] promise the feature of the load balance policy.
pub trait LoadBalance<D>: Send + Sync + 'static
where
    D: Discover,
{
    /// `InstanceIter` is an iterator of [`crate::discovery::Instance`].
    type InstanceIter: Iterator<Item = Address> + Send;
    /// `GetFut` is the return type of `get_picker`.
    type GetFut<'future>: Future<Output = Result<Self::InstanceIter, LoadBalanceError>>
        + Send
        + 'future
    where
        Self: 'future;

    /// `get_picker` allows to get an instance iterator of a specified endpoint from self or
    /// service discovery.
    fn get_picker<'future>(
        &'future self,
        endpoint: &'future Endpoint,
        discover: &'future D,
    ) -> Self::GetFut<'future>
    where
        Self: 'future;
    /// `rebalance` is the callback method be used in service discovering subscription.
    fn rebalance(&self, changes: Change<D::Key>);
}

示例

pub struct InstancePicker {
    instances: Vec<Arc<Instance>>,
    index: usize
}

impl Iterator for InstancePicker {
    type Item = Address;

    fn next(&mut self) -> Option<Self::Item> {
        let i = self.instances.get(self.index);
        self.index += 1;
        i.map(|i| i.clone().address.clone())
    }
}

#[derive(Clone)]
pub struct RoundRobin<K>
where
    K: Hash + PartialEq + Eq + Send + Sync + 'static,
{
    router: DashMap<K, Arc<Vec<Arc<Instance>>>>,
}

impl<D> LoadBalance<D> for RoundRobin<D::Key>
where
    D: Discover,
{
    type InstanceIter = InstancePicker;
    type GetFut<'future> =
        impl Future<Output = Result<Self::InstanceIter, LoadBalanceError>> + Send + 'future
        where
            Self: 'future;

    fn get_picker<'future>(
        &'future self,
        endpoint: &'future Endpoint,
        discover: &'future D,
    ) -> Self::GetFut<'future>
    where
        Self: 'future, 
    {
        async {
            let key = discover.key(endpoint);
            let list = match self.router.entry(key) {
                Entry::Occupied(e) => e.get().clone(),
                Entry::Vacant(e) => {
                    let instances =
                        Arc::new(discover.discover(endpoint).await?);
                    e.insert(instances).value().clone()
                }
            };
            Ok(InstancePicker {
                instances: list.to_vec(),
                index: 0
            })
        }
    }

    fn rebalance(&self, changes: Change<D::Key>) {
        if let Entry::Occupied(entry) = self.router.entry(changes.key.clone()) {
            entry.replace_entry(Arc::new(changes.all));
        }
    }
}

最后修改 July 30, 2023 : fix type error (#735) (50c91bf)