1 pub mod pb {
2 tonic::include_proto!("grpc.examples.unaryecho");
3 }
4
5 use pb::{echo_client::EchoClient, EchoRequest};
6 use tonic::transport::channel::Change;
7 use tonic::transport::Channel;
8 use tonic::transport::Endpoint;
9
10 use std::sync::Arc;
11
12 use std::sync::atomic::{AtomicBool, Ordering::SeqCst};
13 use tokio::time::timeout;
14
15 #[tokio::main]
main() -> Result<(), Box<dyn std::error::Error>>16 async fn main() -> Result<(), Box<dyn std::error::Error>> {
17 let e1 = Endpoint::from_static("http://[::1]:50051");
18 let e2 = Endpoint::from_static("http://[::1]:50052");
19
20 let (channel, rx) = Channel::balance_channel(10);
21 let mut client = EchoClient::new(channel);
22
23 let done = Arc::new(AtomicBool::new(false));
24 let demo_done = done.clone();
25 tokio::spawn(async move {
26 tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
27 println!("Added first endpoint");
28 let change = Change::Insert("1", e1);
29 let res = rx.send(change).await;
30 println!("{:?}", res);
31 tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
32 println!("Added second endpoint");
33 let change = Change::Insert("2", e2);
34 let res = rx.send(change).await;
35 println!("{:?}", res);
36 tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
37 println!("Removed first endpoint");
38 let change = Change::Remove("1");
39 let res = rx.send(change).await;
40 println!("{:?}", res);
41
42 tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
43 println!("Removed second endpoint");
44 let change = Change::Remove("2");
45 let res = rx.send(change).await;
46 println!("{:?}", res);
47
48 tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
49 println!("Added third endpoint");
50 let e3 = Endpoint::from_static("http://[::1]:50051");
51 let change = Change::Insert("3", e3);
52 let res = rx.send(change).await;
53 println!("{:?}", res);
54
55 tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
56 println!("Removed third endpoint");
57 let change = Change::Remove("3");
58 let res = rx.send(change).await;
59 println!("{:?}", res);
60 demo_done.swap(true, SeqCst);
61 });
62
63 while !done.load(SeqCst) {
64 tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
65 let request = tonic::Request::new(EchoRequest {
66 message: "hello".into(),
67 });
68
69 let rx = client.unary_echo(request);
70 if let Ok(resp) = timeout(tokio::time::Duration::from_secs(10), rx).await {
71 println!("RESPONSE={:?}", resp);
72 } else {
73 println!("did not receive value within 10 secs");
74 }
75 }
76
77 println!("... Bye");
78
79 Ok(())
80 }
81