xref: /tonic/examples/src/streaming/client.rs (revision d1dbd031)
1 pub mod pb {
2     tonic::include_proto!("grpc.examples.echo");
3 }
4 
5 use std::time::Duration;
6 use tokio_stream::{Stream, StreamExt};
7 use tonic::transport::Channel;
8 
9 use pb::{echo_client::EchoClient, EchoRequest};
10 
echo_requests_iter() -> impl Stream<Item = EchoRequest>11 fn echo_requests_iter() -> impl Stream<Item = EchoRequest> {
12     tokio_stream::iter(1..usize::MAX).map(|i| EchoRequest {
13         message: format!("msg {:02}", i),
14     })
15 }
16 
streaming_echo(client: &mut EchoClient<Channel>, num: usize)17 async fn streaming_echo(client: &mut EchoClient<Channel>, num: usize) {
18     let stream = client
19         .server_streaming_echo(EchoRequest {
20             message: "foo".into(),
21         })
22         .await
23         .unwrap()
24         .into_inner();
25 
26     // stream is infinite - take just 5 elements and then disconnect
27     let mut stream = stream.take(num);
28     while let Some(item) = stream.next().await {
29         println!("\treceived: {}", item.unwrap().message);
30     }
31     // stream is dropped here and the disconnect info is sent to server
32 }
33 
bidirectional_streaming_echo(client: &mut EchoClient<Channel>, num: usize)34 async fn bidirectional_streaming_echo(client: &mut EchoClient<Channel>, num: usize) {
35     let in_stream = echo_requests_iter().take(num);
36 
37     let response = client
38         .bidirectional_streaming_echo(in_stream)
39         .await
40         .unwrap();
41 
42     let mut resp_stream = response.into_inner();
43 
44     while let Some(received) = resp_stream.next().await {
45         let received = received.unwrap();
46         println!("\treceived message: `{}`", received.message);
47     }
48 }
49 
bidirectional_streaming_echo_throttle(client: &mut EchoClient<Channel>, dur: Duration)50 async fn bidirectional_streaming_echo_throttle(client: &mut EchoClient<Channel>, dur: Duration) {
51     let in_stream = echo_requests_iter().throttle(dur);
52 
53     let response = client
54         .bidirectional_streaming_echo(in_stream)
55         .await
56         .unwrap();
57 
58     let mut resp_stream = response.into_inner();
59 
60     while let Some(received) = resp_stream.next().await {
61         let received = received.unwrap();
62         println!("\treceived message: `{}`", received.message);
63     }
64 }
65 
66 #[tokio::main]
main() -> Result<(), Box<dyn std::error::Error>>67 async fn main() -> Result<(), Box<dyn std::error::Error>> {
68     let mut client = EchoClient::connect("http://[::1]:50051").await.unwrap();
69 
70     println!("Streaming echo:");
71     streaming_echo(&mut client, 5).await;
72     tokio::time::sleep(Duration::from_secs(1)).await; //do not mess server println functions
73 
74     // Echo stream that sends 17 requests then graceful end that connection
75     println!("\r\nBidirectional stream echo:");
76     bidirectional_streaming_echo(&mut client, 17).await;
77 
78     // Echo stream that sends up to `usize::MAX` requests. One request each 2s.
79     // Exiting client with CTRL+C demonstrate how to distinguish broken pipe from
80     // graceful client disconnection (above example) on the server side.
81     println!("\r\nBidirectional stream echo (kill client with CTLR+C):");
82     bidirectional_streaming_echo_throttle(&mut client, Duration::from_secs(2)).await;
83 
84     Ok(())
85 }
86