1 pub mod pb { 2 tonic::include_proto!("grpc.examples.echo"); 3 } 4 5 use std::time::Duration; 6 use tokio_stream::{Stream, StreamExt}; 7 use tonic::transport::Channel; 8 9 use pb::{echo_client::EchoClient, EchoRequest}; 10 11 fn echo_requests_iter() -> impl Stream<Item = EchoRequest> { 12 tokio_stream::iter(1..usize::MAX).map(|i| EchoRequest { 13 message: format!("msg {:02}", i), 14 }) 15 } 16 17 async fn streaming_echo(client: &mut EchoClient<Channel>, num: usize) { 18 let stream = client 19 .server_streaming_echo(EchoRequest { 20 message: "foo".into(), 21 }) 22 .await 23 .unwrap() 24 .into_inner(); 25 26 // stream is infinite - take just 5 elements and then disconnect 27 let mut stream = stream.take(num); 28 while let Some(item) = stream.next().await { 29 println!("\treceived: {}", item.unwrap().message); 30 } 31 // stream is dropped here and the disconnect info is sent to server 32 } 33 34 async fn bidirectional_streaming_echo(client: &mut EchoClient<Channel>, num: usize) { 35 let in_stream = echo_requests_iter().take(num); 36 37 let response = client 38 .bidirectional_streaming_echo(in_stream) 39 .await 40 .unwrap(); 41 42 let mut resp_stream = response.into_inner(); 43 44 while let Some(received) = resp_stream.next().await { 45 let received = received.unwrap(); 46 println!("\treceived message: `{}`", received.message); 47 } 48 } 49 50 async fn bidirectional_streaming_echo_throttle(client: &mut EchoClient<Channel>, dur: Duration) { 51 let in_stream = echo_requests_iter().throttle(dur); 52 53 let response = client 54 .bidirectional_streaming_echo(in_stream) 55 .await 56 .unwrap(); 57 58 let mut resp_stream = response.into_inner(); 59 60 while let Some(received) = resp_stream.next().await { 61 let received = received.unwrap(); 62 println!("\treceived message: `{}`", received.message); 63 } 64 } 65 66 #[tokio::main] 67 async fn main() -> Result<(), Box<dyn std::error::Error>> { 68 let mut client = EchoClient::connect("http://[::1]:50051").await.unwrap(); 69 70 println!("Streaming echo:"); 71 streaming_echo(&mut client, 5).await; 72 tokio::time::sleep(Duration::from_secs(1)).await; //do not mess server println functions 73 74 // Echo stream that sends 17 requests then graceful end that connection 75 println!("\r\nBidirectional stream echo:"); 76 bidirectional_streaming_echo(&mut client, 17).await; 77 78 // Echo stream that sends up to `usize::MAX` requests. One request each 2s. 79 // Exiting client with CTRL+C demonstrate how to distinguish broken pipe from 80 // graceful client disconnection (above example) on the server side. 81 println!("\r\nBidirectional stream echo (kill client with CTLR+C):"); 82 bidirectional_streaming_echo_throttle(&mut client, Duration::from_secs(2)).await; 83 84 Ok(()) 85 } 86