1 pub mod pb {
2 tonic::include_proto!("grpc.examples.echo");
3 }
4
5 use std::time::Duration;
6 use tokio_stream::{Stream, StreamExt};
7 use tonic::transport::Channel;
8
9 use pb::{echo_client::EchoClient, EchoRequest};
10
echo_requests_iter() -> impl Stream<Item = EchoRequest>11 fn echo_requests_iter() -> impl Stream<Item = EchoRequest> {
12 tokio_stream::iter(1..usize::MAX).map(|i| EchoRequest {
13 message: format!("msg {:02}", i),
14 })
15 }
16
streaming_echo(client: &mut EchoClient<Channel>, num: usize)17 async fn streaming_echo(client: &mut EchoClient<Channel>, num: usize) {
18 let stream = client
19 .server_streaming_echo(EchoRequest {
20 message: "foo".into(),
21 })
22 .await
23 .unwrap()
24 .into_inner();
25
26 // stream is infinite - take just 5 elements and then disconnect
27 let mut stream = stream.take(num);
28 while let Some(item) = stream.next().await {
29 println!("\treceived: {}", item.unwrap().message);
30 }
31 // stream is dropped here and the disconnect info is sent to server
32 }
33
bidirectional_streaming_echo(client: &mut EchoClient<Channel>, num: usize)34 async fn bidirectional_streaming_echo(client: &mut EchoClient<Channel>, num: usize) {
35 let in_stream = echo_requests_iter().take(num);
36
37 let response = client
38 .bidirectional_streaming_echo(in_stream)
39 .await
40 .unwrap();
41
42 let mut resp_stream = response.into_inner();
43
44 while let Some(received) = resp_stream.next().await {
45 let received = received.unwrap();
46 println!("\treceived message: `{}`", received.message);
47 }
48 }
49
bidirectional_streaming_echo_throttle(client: &mut EchoClient<Channel>, dur: Duration)50 async fn bidirectional_streaming_echo_throttle(client: &mut EchoClient<Channel>, dur: Duration) {
51 let in_stream = echo_requests_iter().throttle(dur);
52
53 let response = client
54 .bidirectional_streaming_echo(in_stream)
55 .await
56 .unwrap();
57
58 let mut resp_stream = response.into_inner();
59
60 while let Some(received) = resp_stream.next().await {
61 let received = received.unwrap();
62 println!("\treceived message: `{}`", received.message);
63 }
64 }
65
66 #[tokio::main]
main() -> Result<(), Box<dyn std::error::Error>>67 async fn main() -> Result<(), Box<dyn std::error::Error>> {
68 let mut client = EchoClient::connect("http://[::1]:50051").await.unwrap();
69
70 println!("Streaming echo:");
71 streaming_echo(&mut client, 5).await;
72 tokio::time::sleep(Duration::from_secs(1)).await; //do not mess server println functions
73
74 // Echo stream that sends 17 requests then graceful end that connection
75 println!("\r\nBidirectional stream echo:");
76 bidirectional_streaming_echo(&mut client, 17).await;
77
78 // Echo stream that sends up to `usize::MAX` requests. One request each 2s.
79 // Exiting client with CTRL+C demonstrate how to distinguish broken pipe from
80 // graceful client disconnection (above example) on the server side.
81 println!("\r\nBidirectional stream echo (kill client with CTLR+C):");
82 bidirectional_streaming_echo_throttle(&mut client, Duration::from_secs(2)).await;
83
84 Ok(())
85 }
86