1 pub mod pb { 2 tonic::include_proto!("grpc.examples.echo"); 3 } 4 5 use pb::{echo_client::EchoClient, EchoRequest}; 6 use tonic::transport::Channel; 7 8 use tonic::transport::Endpoint; 9 10 use std::sync::Arc; 11 12 use std::sync::atomic::{AtomicBool, Ordering::SeqCst}; 13 use tokio::time::timeout; 14 use tower::discover::Change; 15 16 #[tokio::main] 17 async fn main() -> Result<(), Box<dyn std::error::Error>> { 18 let e1 = Endpoint::from_static("http://[::1]:50051"); 19 let e2 = Endpoint::from_static("http://[::1]:50052"); 20 21 let (channel, rx) = Channel::balance_channel(10); 22 let mut client = EchoClient::new(channel); 23 24 let done = Arc::new(AtomicBool::new(false)); 25 let demo_done = done.clone(); 26 tokio::spawn(async move { 27 tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; 28 println!("Added first endpoint"); 29 let change = Change::Insert("1", e1); 30 let res = rx.send(change).await; 31 println!("{:?}", res); 32 tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; 33 println!("Added second endpoint"); 34 let change = Change::Insert("2", e2); 35 let res = rx.send(change).await; 36 println!("{:?}", res); 37 tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; 38 println!("Removed first endpoint"); 39 let change = Change::Remove("1"); 40 let res = rx.send(change).await; 41 println!("{:?}", res); 42 43 tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; 44 println!("Removed second endpoint"); 45 let change = Change::Remove("2"); 46 let res = rx.send(change).await; 47 println!("{:?}", res); 48 49 tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; 50 println!("Added third endpoint"); 51 let e3 = Endpoint::from_static("http://[::1]:50051"); 52 let change = Change::Insert("3", e3); 53 let res = rx.send(change).await; 54 println!("{:?}", res); 55 56 tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; 57 println!("Removed third endpoint"); 58 let change = Change::Remove("3"); 59 let res = rx.send(change).await; 60 println!("{:?}", res); 61 demo_done.swap(true, SeqCst); 62 }); 63 64 while !done.load(SeqCst) { 65 tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; 66 let request = tonic::Request::new(EchoRequest { 67 message: "hello".into(), 68 }); 69 70 let rx = client.unary_echo(request); 71 if let Ok(resp) = timeout(tokio::time::Duration::from_secs(10), rx).await { 72 println!("RESPONSE={:?}", resp); 73 } else { 74 println!("did not receive value within 10 secs"); 75 } 76 } 77 78 println!("... Bye"); 79 80 Ok(()) 81 } 82