1 use super::super::{Connection, Endpoint}; 2 3 use std::{ 4 hash::Hash, 5 pin::Pin, 6 task::{Context, Poll}, 7 }; 8 use tokio::sync::mpsc::Receiver; 9 use tokio_stream::Stream; 10 use tower::discover::Change as TowerChange; 11 12 /// A change in the service set. 13 #[derive(Debug, Clone)] 14 pub enum Change<K, V> { 15 /// A new service identified by key `K` was identified. 16 Insert(K, V), 17 /// The service identified by key `K` disappeared. 18 Remove(K), 19 } 20 21 pub(crate) struct DynamicServiceStream<K: Hash + Eq + Clone> { 22 changes: Receiver<Change<K, Endpoint>>, 23 } 24 25 impl<K: Hash + Eq + Clone> DynamicServiceStream<K> { new(changes: Receiver<Change<K, Endpoint>>) -> Self26 pub(crate) fn new(changes: Receiver<Change<K, Endpoint>>) -> Self { 27 Self { changes } 28 } 29 } 30 31 impl<K: Hash + Eq + Clone> Stream for DynamicServiceStream<K> { 32 type Item = Result<TowerChange<K, Connection>, crate::BoxError>; 33 poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>34 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { 35 match Pin::new(&mut self.changes).poll_recv(cx) { 36 Poll::Pending | Poll::Ready(None) => Poll::Pending, 37 Poll::Ready(Some(change)) => match change { 38 Change::Insert(k, endpoint) => { 39 let connection = Connection::lazy(endpoint.http_connector(), endpoint); 40 Poll::Ready(Some(Ok(TowerChange::Insert(k, connection)))) 41 } 42 Change::Remove(k) => Poll::Ready(Some(Ok(TowerChange::Remove(k)))), 43 }, 44 } 45 } 46 } 47 48 impl<K: Hash + Eq + Clone> Unpin for DynamicServiceStream<K> {} 49