146040429SLucio Franco pub mod pb {
246040429SLucio Franco tonic::include_proto!("grpc.examples.echo");
346040429SLucio Franco }
446040429SLucio Franco
501e5be50SSylwester Rąpała use std::time::Duration;
6c9c4acbcStottoto use tokio_stream::{Stream, StreamExt};
701e5be50SSylwester Rąpała use tonic::transport::Channel;
801e5be50SSylwester Rąpała
946040429SLucio Franco use pb::{echo_client::EchoClient, EchoRequest};
1046040429SLucio Franco
echo_requests_iter() -> impl Stream<Item = EchoRequest>1101e5be50SSylwester Rąpała fn echo_requests_iter() -> impl Stream<Item = EchoRequest> {
1201e5be50SSylwester Rąpała tokio_stream::iter(1..usize::MAX).map(|i| EchoRequest {
1301e5be50SSylwester Rąpała message: format!("msg {:02}", i),
1401e5be50SSylwester Rąpała })
1501e5be50SSylwester Rąpała }
1646040429SLucio Franco
streaming_echo(client: &mut EchoClient<Channel>, num: usize)1701e5be50SSylwester Rąpała async fn streaming_echo(client: &mut EchoClient<Channel>, num: usize) {
1846040429SLucio Franco let stream = client
1946040429SLucio Franco .server_streaming_echo(EchoRequest {
2046040429SLucio Franco message: "foo".into(),
2146040429SLucio Franco })
2246040429SLucio Franco .await
2301e5be50SSylwester Rąpała .unwrap()
2401e5be50SSylwester Rąpała .into_inner();
2501e5be50SSylwester Rąpała
2601e5be50SSylwester Rąpała // stream is infinite - take just 5 elements and then disconnect
2701e5be50SSylwester Rąpała let mut stream = stream.take(num);
2801e5be50SSylwester Rąpała while let Some(item) = stream.next().await {
296d954738Scui fliter println!("\treceived: {}", item.unwrap().message);
3001e5be50SSylwester Rąpała }
31*d1dbd031Smwtian // stream is dropped here and the disconnect info is sent to server
3201e5be50SSylwester Rąpała }
3301e5be50SSylwester Rąpała
bidirectional_streaming_echo(client: &mut EchoClient<Channel>, num: usize)3401e5be50SSylwester Rąpała async fn bidirectional_streaming_echo(client: &mut EchoClient<Channel>, num: usize) {
3501e5be50SSylwester Rąpała let in_stream = echo_requests_iter().take(num);
3601e5be50SSylwester Rąpała
3701e5be50SSylwester Rąpała let response = client
3801e5be50SSylwester Rąpała .bidirectional_streaming_echo(in_stream)
3901e5be50SSylwester Rąpała .await
4046040429SLucio Franco .unwrap();
4146040429SLucio Franco
4201e5be50SSylwester Rąpała let mut resp_stream = response.into_inner();
4346040429SLucio Franco
446d954738Scui fliter while let Some(received) = resp_stream.next().await {
456d954738Scui fliter let received = received.unwrap();
466d954738Scui fliter println!("\treceived message: `{}`", received.message);
4701e5be50SSylwester Rąpała }
4801e5be50SSylwester Rąpała }
4946040429SLucio Franco
bidirectional_streaming_echo_throttle(client: &mut EchoClient<Channel>, dur: Duration)5001e5be50SSylwester Rąpała async fn bidirectional_streaming_echo_throttle(client: &mut EchoClient<Channel>, dur: Duration) {
5101e5be50SSylwester Rąpała let in_stream = echo_requests_iter().throttle(dur);
5246040429SLucio Franco
5301e5be50SSylwester Rąpała let response = client
5401e5be50SSylwester Rąpała .bidirectional_streaming_echo(in_stream)
5501e5be50SSylwester Rąpała .await
5601e5be50SSylwester Rąpała .unwrap();
5701e5be50SSylwester Rąpała
5801e5be50SSylwester Rąpała let mut resp_stream = response.into_inner();
5901e5be50SSylwester Rąpała
606d954738Scui fliter while let Some(received) = resp_stream.next().await {
616d954738Scui fliter let received = received.unwrap();
626d954738Scui fliter println!("\treceived message: `{}`", received.message);
6301e5be50SSylwester Rąpała }
6401e5be50SSylwester Rąpała }
6501e5be50SSylwester Rąpała
6601e5be50SSylwester Rąpała #[tokio::main]
main() -> Result<(), Box<dyn std::error::Error>>6701e5be50SSylwester Rąpała async fn main() -> Result<(), Box<dyn std::error::Error>> {
6801e5be50SSylwester Rąpała let mut client = EchoClient::connect("http://[::1]:50051").await.unwrap();
6901e5be50SSylwester Rąpała
7001e5be50SSylwester Rąpała println!("Streaming echo:");
7101e5be50SSylwester Rąpała streaming_echo(&mut client, 5).await;
7201e5be50SSylwester Rąpała tokio::time::sleep(Duration::from_secs(1)).await; //do not mess server println functions
7301e5be50SSylwester Rąpała
746d954738Scui fliter // Echo stream that sends 17 requests then graceful end that connection
7501e5be50SSylwester Rąpała println!("\r\nBidirectional stream echo:");
7601e5be50SSylwester Rąpała bidirectional_streaming_echo(&mut client, 17).await;
7701e5be50SSylwester Rąpała
78c0d1397aSLuca Pette // Echo stream that sends up to `usize::MAX` requests. One request each 2s.
796d954738Scui fliter // Exiting client with CTRL+C demonstrate how to distinguish broken pipe from
806d954738Scui fliter // graceful client disconnection (above example) on the server side.
8101e5be50SSylwester Rąpała println!("\r\nBidirectional stream echo (kill client with CTLR+C):");
8201e5be50SSylwester Rąpała bidirectional_streaming_echo_throttle(&mut client, Duration::from_secs(2)).await;
8346040429SLucio Franco
8446040429SLucio Franco Ok(())
8546040429SLucio Franco }
86