xref: /tonic/tests/web/tests/grpc.rs (revision 99b663e3)
1 use std::future::Future;
2 use std::net::SocketAddr;
3 
4 use tokio::net::TcpListener;
5 use tokio::time::Duration;
6 use tokio::{join, try_join};
7 use tokio_stream::wrappers::TcpListenerStream;
8 use tokio_stream::{self as stream, StreamExt};
9 use tonic::transport::{Channel, Error, Server};
10 use tonic::{Response, Streaming};
11 
12 use test_web::pb::{test_client::TestClient, test_server::TestServer, Input};
13 use test_web::Svc;
14 use tonic_web::GrpcWebLayer;
15 
16 #[tokio::test]
smoke_unary()17 async fn smoke_unary() {
18     let (mut c1, mut c2, mut c3, mut c4) = spawn().await.expect("clients");
19 
20     let (r1, r2, r3, r4) = try_join!(
21         c1.unary_call(input()),
22         c2.unary_call(input()),
23         c3.unary_call(input()),
24         c4.unary_call(input()),
25     )
26     .expect("responses");
27 
28     assert!(meta(&r1) == meta(&r2) && meta(&r2) == meta(&r3) && meta(&r3) == meta(&r4));
29     assert!(data(&r1) == data(&r2) && data(&r2) == data(&r3) && data(&r3) == data(&r4));
30 }
31 
32 #[tokio::test]
smoke_client_stream()33 async fn smoke_client_stream() {
34     let (mut c1, mut c2, mut c3, mut c4) = spawn().await.expect("clients");
35 
36     let input_stream = || stream::iter(vec![input(), input()]);
37 
38     let (r1, r2, r3, r4) = try_join!(
39         c1.client_stream(input_stream()),
40         c2.client_stream(input_stream()),
41         c3.client_stream(input_stream()),
42         c4.client_stream(input_stream()),
43     )
44     .expect("responses");
45 
46     assert!(meta(&r1) == meta(&r2) && meta(&r2) == meta(&r3) && meta(&r3) == meta(&r4));
47     assert!(data(&r1) == data(&r2) && data(&r2) == data(&r3) && data(&r3) == data(&r4));
48 }
49 
50 #[tokio::test]
smoke_server_stream()51 async fn smoke_server_stream() {
52     let (mut c1, mut c2, mut c3, mut c4) = spawn().await.expect("clients");
53 
54     let (r1, r2, r3, r4) = try_join!(
55         c1.server_stream(input()),
56         c2.server_stream(input()),
57         c3.server_stream(input()),
58         c4.server_stream(input()),
59     )
60     .expect("responses");
61 
62     assert!(meta(&r1) == meta(&r2) && meta(&r2) == meta(&r3) && meta(&r3) == meta(&r4));
63 
64     let r1 = stream(r1).await;
65     let r2 = stream(r2).await;
66     let r3 = stream(r3).await;
67     let r4 = stream(r4).await;
68 
69     assert!(r1 == r2 && r2 == r3 && r3 == r4);
70 }
71 #[tokio::test]
smoke_error()72 async fn smoke_error() {
73     let (mut c1, mut c2, mut c3, mut c4) = spawn().await.expect("clients");
74 
75     let boom = Input {
76         id: 1,
77         desc: "boom".to_owned(),
78     };
79 
80     let (r1, r2, r3, r4) = join!(
81         c1.unary_call(boom.clone()),
82         c2.unary_call(boom.clone()),
83         c3.unary_call(boom.clone()),
84         c4.unary_call(boom.clone()),
85     );
86 
87     let s1 = r1.unwrap_err();
88     let s2 = r2.unwrap_err();
89     let s3 = r3.unwrap_err();
90     let s4 = r4.unwrap_err();
91 
92     assert!(status(&s1) == status(&s2) && status(&s2) == status(&s3) && status(&s3) == status(&s4))
93 }
94 
bind() -> (TcpListener, String)95 async fn bind() -> (TcpListener, String) {
96     let addr = SocketAddr::from(([127, 0, 0, 1], 0));
97     let lis = TcpListener::bind(addr).await.expect("listener");
98     let url = format!("http://{}", lis.local_addr().unwrap());
99 
100     (lis, url)
101 }
102 
grpc(accept_h1: bool) -> (impl Future<Output = Result<(), Error>>, String)103 async fn grpc(accept_h1: bool) -> (impl Future<Output = Result<(), Error>>, String) {
104     let (listener, url) = bind().await;
105 
106     let fut = Server::builder()
107         .accept_http1(accept_h1)
108         .add_service(TestServer::new(Svc))
109         .serve_with_incoming(TcpListenerStream::new(listener));
110 
111     (fut, url)
112 }
113 
grpc_web(accept_h1: bool) -> (impl Future<Output = Result<(), Error>>, String)114 async fn grpc_web(accept_h1: bool) -> (impl Future<Output = Result<(), Error>>, String) {
115     let (listener, url) = bind().await;
116 
117     let fut = Server::builder()
118         .accept_http1(accept_h1)
119         .layer(GrpcWebLayer::new())
120         .add_service(TestServer::new(Svc))
121         .serve_with_incoming(TcpListenerStream::new(listener));
122 
123     (fut, url)
124 }
125 
126 type Client = TestClient<Channel>;
127 
spawn() -> Result<(Client, Client, Client, Client), Error>128 async fn spawn() -> Result<(Client, Client, Client, Client), Error> {
129     let ((s1, u1), (s2, u2), (s3, u3), (s4, u4)) =
130         join!(grpc(true), grpc(false), grpc_web(true), grpc_web(false));
131 
132     drop(tokio::spawn(async move { join!(s1, s2, s3, s4) }));
133 
134     tokio::time::sleep(Duration::from_millis(30)).await;
135 
136     try_join!(
137         TestClient::connect(u1),
138         TestClient::connect(u2),
139         TestClient::connect(u3),
140         TestClient::connect(u4)
141     )
142 }
143 
input() -> Input144 fn input() -> Input {
145     Input {
146         id: 1,
147         desc: "one".to_owned(),
148     }
149 }
150 
meta<T>(r: &Response<T>) -> String151 fn meta<T>(r: &Response<T>) -> String {
152     format!("{:?}", r.metadata())
153 }
154 
data<T>(r: &Response<T>) -> &T155 fn data<T>(r: &Response<T>) -> &T {
156     r.get_ref()
157 }
158 
stream<T>(r: Response<Streaming<T>>) -> Vec<T>159 async fn stream<T>(r: Response<Streaming<T>>) -> Vec<T> {
160     r.into_inner().collect::<Result<Vec<_>, _>>().await.unwrap()
161 }
162 
status(s: &tonic::Status) -> (String, tonic::Code)163 fn status(s: &tonic::Status) -> (String, tonic::Code) {
164     (format!("{:?}", s.metadata()), s.code())
165 }
166