xref: /tonic/examples/src/streaming/client.rs (revision 5c8a5d70)
1 pub mod pb {
2     tonic::include_proto!("grpc.examples.echo");
3 }
4 
5 use futures::stream::Stream;
6 use std::time::Duration;
7 use tokio_stream::StreamExt;
8 use tonic::transport::Channel;
9 
10 use pb::{echo_client::EchoClient, EchoRequest};
11 
12 fn echo_requests_iter() -> impl Stream<Item = EchoRequest> {
13     tokio_stream::iter(1..usize::MAX).map(|i| EchoRequest {
14         message: format!("msg {:02}", i),
15     })
16 }
17 
18 async fn streaming_echo(client: &mut EchoClient<Channel>, num: usize) {
19     let stream = client
20         .server_streaming_echo(EchoRequest {
21             message: "foo".into(),
22         })
23         .await
24         .unwrap()
25         .into_inner();
26 
27     // stream is infinite - take just 5 elements and then disconnect
28     let mut stream = stream.take(num);
29     while let Some(item) = stream.next().await {
30         println!("\treceived: {}", item.unwrap().message);
31     }
32     // stream is droped here and the disconnect info is send to server
33 }
34 
35 async fn bidirectional_streaming_echo(client: &mut EchoClient<Channel>, num: usize) {
36     let in_stream = echo_requests_iter().take(num);
37 
38     let response = client
39         .bidirectional_streaming_echo(in_stream)
40         .await
41         .unwrap();
42 
43     let mut resp_stream = response.into_inner();
44 
45     while let Some(received) = resp_stream.next().await {
46         let received = received.unwrap();
47         println!("\treceived message: `{}`", received.message);
48     }
49 }
50 
51 async fn bidirectional_streaming_echo_throttle(client: &mut EchoClient<Channel>, dur: Duration) {
52     let in_stream = echo_requests_iter().throttle(dur);
53 
54     let response = client
55         .bidirectional_streaming_echo(in_stream)
56         .await
57         .unwrap();
58 
59     let mut resp_stream = response.into_inner();
60 
61     while let Some(received) = resp_stream.next().await {
62         let received = received.unwrap();
63         println!("\treceived message: `{}`", received.message);
64     }
65 }
66 
67 #[tokio::main]
68 async fn main() -> Result<(), Box<dyn std::error::Error>> {
69     let mut client = EchoClient::connect("http://[::1]:50051").await.unwrap();
70 
71     println!("Streaming echo:");
72     streaming_echo(&mut client, 5).await;
73     tokio::time::sleep(Duration::from_secs(1)).await; //do not mess server println functions
74 
75     // Echo stream that sends 17 requests then graceful end that connection
76     println!("\r\nBidirectional stream echo:");
77     bidirectional_streaming_echo(&mut client, 17).await;
78 
79     // Echo stream that sends up to `usize::MAX` requets. One request each 2s.
80     // Exiting client with CTRL+C demonstrate how to distinguish broken pipe from
81     //graceful client disconnection (above example) on the server side.
82     println!("\r\nBidirectional stream echo (kill client with CTLR+C):");
83     bidirectional_streaming_echo_throttle(&mut client, Duration::from_secs(2)).await;
84 
85     Ok(())
86 }
87