xref: /tonic/examples/src/streaming/client.rs (revision d1dbd031)
146040429SLucio Franco pub mod pb {
246040429SLucio Franco     tonic::include_proto!("grpc.examples.echo");
346040429SLucio Franco }
446040429SLucio Franco 
501e5be50SSylwester Rąpała use std::time::Duration;
6c9c4acbcStottoto use tokio_stream::{Stream, StreamExt};
701e5be50SSylwester Rąpała use tonic::transport::Channel;
801e5be50SSylwester Rąpała 
946040429SLucio Franco use pb::{echo_client::EchoClient, EchoRequest};
1046040429SLucio Franco 
echo_requests_iter() -> impl Stream<Item = EchoRequest>1101e5be50SSylwester Rąpała fn echo_requests_iter() -> impl Stream<Item = EchoRequest> {
1201e5be50SSylwester Rąpała     tokio_stream::iter(1..usize::MAX).map(|i| EchoRequest {
1301e5be50SSylwester Rąpała         message: format!("msg {:02}", i),
1401e5be50SSylwester Rąpała     })
1501e5be50SSylwester Rąpała }
1646040429SLucio Franco 
streaming_echo(client: &mut EchoClient<Channel>, num: usize)1701e5be50SSylwester Rąpała async fn streaming_echo(client: &mut EchoClient<Channel>, num: usize) {
1846040429SLucio Franco     let stream = client
1946040429SLucio Franco         .server_streaming_echo(EchoRequest {
2046040429SLucio Franco             message: "foo".into(),
2146040429SLucio Franco         })
2246040429SLucio Franco         .await
2301e5be50SSylwester Rąpała         .unwrap()
2401e5be50SSylwester Rąpała         .into_inner();
2501e5be50SSylwester Rąpała 
2601e5be50SSylwester Rąpała     // stream is infinite - take just 5 elements and then disconnect
2701e5be50SSylwester Rąpała     let mut stream = stream.take(num);
2801e5be50SSylwester Rąpała     while let Some(item) = stream.next().await {
296d954738Scui fliter         println!("\treceived: {}", item.unwrap().message);
3001e5be50SSylwester Rąpała     }
31*d1dbd031Smwtian     // stream is dropped here and the disconnect info is sent to server
3201e5be50SSylwester Rąpała }
3301e5be50SSylwester Rąpała 
bidirectional_streaming_echo(client: &mut EchoClient<Channel>, num: usize)3401e5be50SSylwester Rąpała async fn bidirectional_streaming_echo(client: &mut EchoClient<Channel>, num: usize) {
3501e5be50SSylwester Rąpała     let in_stream = echo_requests_iter().take(num);
3601e5be50SSylwester Rąpała 
3701e5be50SSylwester Rąpała     let response = client
3801e5be50SSylwester Rąpała         .bidirectional_streaming_echo(in_stream)
3901e5be50SSylwester Rąpała         .await
4046040429SLucio Franco         .unwrap();
4146040429SLucio Franco 
4201e5be50SSylwester Rąpała     let mut resp_stream = response.into_inner();
4346040429SLucio Franco 
446d954738Scui fliter     while let Some(received) = resp_stream.next().await {
456d954738Scui fliter         let received = received.unwrap();
466d954738Scui fliter         println!("\treceived message: `{}`", received.message);
4701e5be50SSylwester Rąpała     }
4801e5be50SSylwester Rąpała }
4946040429SLucio Franco 
bidirectional_streaming_echo_throttle(client: &mut EchoClient<Channel>, dur: Duration)5001e5be50SSylwester Rąpała async fn bidirectional_streaming_echo_throttle(client: &mut EchoClient<Channel>, dur: Duration) {
5101e5be50SSylwester Rąpała     let in_stream = echo_requests_iter().throttle(dur);
5246040429SLucio Franco 
5301e5be50SSylwester Rąpała     let response = client
5401e5be50SSylwester Rąpała         .bidirectional_streaming_echo(in_stream)
5501e5be50SSylwester Rąpała         .await
5601e5be50SSylwester Rąpała         .unwrap();
5701e5be50SSylwester Rąpała 
5801e5be50SSylwester Rąpała     let mut resp_stream = response.into_inner();
5901e5be50SSylwester Rąpała 
606d954738Scui fliter     while let Some(received) = resp_stream.next().await {
616d954738Scui fliter         let received = received.unwrap();
626d954738Scui fliter         println!("\treceived message: `{}`", received.message);
6301e5be50SSylwester Rąpała     }
6401e5be50SSylwester Rąpała }
6501e5be50SSylwester Rąpała 
6601e5be50SSylwester Rąpała #[tokio::main]
main() -> Result<(), Box<dyn std::error::Error>>6701e5be50SSylwester Rąpała async fn main() -> Result<(), Box<dyn std::error::Error>> {
6801e5be50SSylwester Rąpała     let mut client = EchoClient::connect("http://[::1]:50051").await.unwrap();
6901e5be50SSylwester Rąpała 
7001e5be50SSylwester Rąpała     println!("Streaming echo:");
7101e5be50SSylwester Rąpała     streaming_echo(&mut client, 5).await;
7201e5be50SSylwester Rąpała     tokio::time::sleep(Duration::from_secs(1)).await; //do not mess server println functions
7301e5be50SSylwester Rąpała 
746d954738Scui fliter     // Echo stream that sends 17 requests then graceful end that connection
7501e5be50SSylwester Rąpała     println!("\r\nBidirectional stream echo:");
7601e5be50SSylwester Rąpała     bidirectional_streaming_echo(&mut client, 17).await;
7701e5be50SSylwester Rąpała 
78c0d1397aSLuca Pette     // Echo stream that sends up to `usize::MAX` requests. One request each 2s.
796d954738Scui fliter     // Exiting client with CTRL+C demonstrate how to distinguish broken pipe from
806d954738Scui fliter     // graceful client disconnection (above example) on the server side.
8101e5be50SSylwester Rąpała     println!("\r\nBidirectional stream echo (kill client with CTLR+C):");
8201e5be50SSylwester Rąpała     bidirectional_streaming_echo_throttle(&mut client, Duration::from_secs(2)).await;
8346040429SLucio Franco 
8446040429SLucio Franco     Ok(())
8546040429SLucio Franco }
86