1 pub mod pb { 2 tonic::include_proto!("grpc.examples.echo"); 3 } 4 5 use futures::stream::Stream; 6 use std::time::Duration; 7 use tokio_stream::StreamExt; 8 use tonic::transport::Channel; 9 10 use pb::{echo_client::EchoClient, EchoRequest}; 11 12 fn echo_requests_iter() -> impl Stream<Item = EchoRequest> { 13 tokio_stream::iter(1..usize::MAX).map(|i| EchoRequest { 14 message: format!("msg {:02}", i), 15 }) 16 } 17 18 async fn streaming_echo(client: &mut EchoClient<Channel>, num: usize) { 19 let stream = client 20 .server_streaming_echo(EchoRequest { 21 message: "foo".into(), 22 }) 23 .await 24 .unwrap() 25 .into_inner(); 26 27 // stream is infinite - take just 5 elements and then disconnect 28 let mut stream = stream.take(num); 29 while let Some(item) = stream.next().await { 30 println!("\treceived: {}", item.unwrap().message); 31 } 32 // stream is droped here and the disconnect info is send to server 33 } 34 35 async fn bidirectional_streaming_echo(client: &mut EchoClient<Channel>, num: usize) { 36 let in_stream = echo_requests_iter().take(num); 37 38 let response = client 39 .bidirectional_streaming_echo(in_stream) 40 .await 41 .unwrap(); 42 43 let mut resp_stream = response.into_inner(); 44 45 while let Some(received) = resp_stream.next().await { 46 let received = received.unwrap(); 47 println!("\treceived message: `{}`", received.message); 48 } 49 } 50 51 async fn bidirectional_streaming_echo_throttle(client: &mut EchoClient<Channel>, dur: Duration) { 52 let in_stream = echo_requests_iter().throttle(dur); 53 54 let response = client 55 .bidirectional_streaming_echo(in_stream) 56 .await 57 .unwrap(); 58 59 let mut resp_stream = response.into_inner(); 60 61 while let Some(received) = resp_stream.next().await { 62 let received = received.unwrap(); 63 println!("\treceived message: `{}`", received.message); 64 } 65 } 66 67 #[tokio::main] 68 async fn main() -> Result<(), Box<dyn std::error::Error>> { 69 let mut client = EchoClient::connect("http://[::1]:50051").await.unwrap(); 70 71 println!("Streaming echo:"); 72 streaming_echo(&mut client, 5).await; 73 tokio::time::sleep(Duration::from_secs(1)).await; //do not mess server println functions 74 75 // Echo stream that sends 17 requests then graceful end that connection 76 println!("\r\nBidirectional stream echo:"); 77 bidirectional_streaming_echo(&mut client, 17).await; 78 79 // Echo stream that sends up to `usize::MAX` requets. One request each 2s. 80 // Exiting client with CTRL+C demonstrate how to distinguish broken pipe from 81 //graceful client disconnection (above example) on the server side. 82 println!("\r\nBidirectional stream echo (kill client with CTLR+C):"); 83 bidirectional_streaming_echo_throttle(&mut client, Duration::from_secs(2)).await; 84 85 Ok(()) 86 } 87