1 use super::super::{Connection, Endpoint};
2 
3 use std::{
4     hash::Hash,
5     pin::Pin,
6     task::{Context, Poll},
7 };
8 use tokio::sync::mpsc::Receiver;
9 use tokio_stream::Stream;
10 use tower::discover::Change as TowerChange;
11 
12 /// A change in the service set.
13 #[derive(Debug, Clone)]
14 pub enum Change<K, V> {
15     /// A new service identified by key `K` was identified.
16     Insert(K, V),
17     /// The service identified by key `K` disappeared.
18     Remove(K),
19 }
20 
21 pub(crate) struct DynamicServiceStream<K: Hash + Eq + Clone> {
22     changes: Receiver<Change<K, Endpoint>>,
23 }
24 
25 impl<K: Hash + Eq + Clone> DynamicServiceStream<K> {
new(changes: Receiver<Change<K, Endpoint>>) -> Self26     pub(crate) fn new(changes: Receiver<Change<K, Endpoint>>) -> Self {
27         Self { changes }
28     }
29 }
30 
31 impl<K: Hash + Eq + Clone> Stream for DynamicServiceStream<K> {
32     type Item = Result<TowerChange<K, Connection>, crate::BoxError>;
33 
poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>34     fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
35         match Pin::new(&mut self.changes).poll_recv(cx) {
36             Poll::Pending | Poll::Ready(None) => Poll::Pending,
37             Poll::Ready(Some(change)) => match change {
38                 Change::Insert(k, endpoint) => {
39                     let connection = Connection::lazy(endpoint.http_connector(), endpoint);
40                     Poll::Ready(Some(Ok(TowerChange::Insert(k, connection))))
41                 }
42                 Change::Remove(k) => Poll::Ready(Some(Ok(TowerChange::Remove(k)))),
43             },
44         }
45     }
46 }
47 
48 impl<K: Hash + Eq + Clone> Unpin for DynamicServiceStream<K> {}
49