1 use std::future::Future;
2 use std::net::SocketAddr;
3
4 use tokio::net::TcpListener;
5 use tokio::time::Duration;
6 use tokio::{join, try_join};
7 use tokio_stream::wrappers::TcpListenerStream;
8 use tokio_stream::{self as stream, StreamExt};
9 use tonic::transport::{Channel, Error, Server};
10 use tonic::{Response, Streaming};
11
12 use test_web::pb::{test_client::TestClient, test_server::TestServer, Input};
13 use test_web::Svc;
14 use tonic_web::GrpcWebLayer;
15
16 #[tokio::test]
smoke_unary()17 async fn smoke_unary() {
18 let (mut c1, mut c2, mut c3, mut c4) = spawn().await.expect("clients");
19
20 let (r1, r2, r3, r4) = try_join!(
21 c1.unary_call(input()),
22 c2.unary_call(input()),
23 c3.unary_call(input()),
24 c4.unary_call(input()),
25 )
26 .expect("responses");
27
28 assert!(meta(&r1) == meta(&r2) && meta(&r2) == meta(&r3) && meta(&r3) == meta(&r4));
29 assert!(data(&r1) == data(&r2) && data(&r2) == data(&r3) && data(&r3) == data(&r4));
30 }
31
32 #[tokio::test]
smoke_client_stream()33 async fn smoke_client_stream() {
34 let (mut c1, mut c2, mut c3, mut c4) = spawn().await.expect("clients");
35
36 let input_stream = || stream::iter(vec![input(), input()]);
37
38 let (r1, r2, r3, r4) = try_join!(
39 c1.client_stream(input_stream()),
40 c2.client_stream(input_stream()),
41 c3.client_stream(input_stream()),
42 c4.client_stream(input_stream()),
43 )
44 .expect("responses");
45
46 assert!(meta(&r1) == meta(&r2) && meta(&r2) == meta(&r3) && meta(&r3) == meta(&r4));
47 assert!(data(&r1) == data(&r2) && data(&r2) == data(&r3) && data(&r3) == data(&r4));
48 }
49
50 #[tokio::test]
smoke_server_stream()51 async fn smoke_server_stream() {
52 let (mut c1, mut c2, mut c3, mut c4) = spawn().await.expect("clients");
53
54 let (r1, r2, r3, r4) = try_join!(
55 c1.server_stream(input()),
56 c2.server_stream(input()),
57 c3.server_stream(input()),
58 c4.server_stream(input()),
59 )
60 .expect("responses");
61
62 assert!(meta(&r1) == meta(&r2) && meta(&r2) == meta(&r3) && meta(&r3) == meta(&r4));
63
64 let r1 = stream(r1).await;
65 let r2 = stream(r2).await;
66 let r3 = stream(r3).await;
67 let r4 = stream(r4).await;
68
69 assert!(r1 == r2 && r2 == r3 && r3 == r4);
70 }
71 #[tokio::test]
smoke_error()72 async fn smoke_error() {
73 let (mut c1, mut c2, mut c3, mut c4) = spawn().await.expect("clients");
74
75 let boom = Input {
76 id: 1,
77 desc: "boom".to_owned(),
78 };
79
80 let (r1, r2, r3, r4) = join!(
81 c1.unary_call(boom.clone()),
82 c2.unary_call(boom.clone()),
83 c3.unary_call(boom.clone()),
84 c4.unary_call(boom.clone()),
85 );
86
87 let s1 = r1.unwrap_err();
88 let s2 = r2.unwrap_err();
89 let s3 = r3.unwrap_err();
90 let s4 = r4.unwrap_err();
91
92 assert!(status(&s1) == status(&s2) && status(&s2) == status(&s3) && status(&s3) == status(&s4))
93 }
94
bind() -> (TcpListener, String)95 async fn bind() -> (TcpListener, String) {
96 let addr = SocketAddr::from(([127, 0, 0, 1], 0));
97 let lis = TcpListener::bind(addr).await.expect("listener");
98 let url = format!("http://{}", lis.local_addr().unwrap());
99
100 (lis, url)
101 }
102
grpc(accept_h1: bool) -> (impl Future<Output = Result<(), Error>>, String)103 async fn grpc(accept_h1: bool) -> (impl Future<Output = Result<(), Error>>, String) {
104 let (listener, url) = bind().await;
105
106 let fut = Server::builder()
107 .accept_http1(accept_h1)
108 .add_service(TestServer::new(Svc))
109 .serve_with_incoming(TcpListenerStream::new(listener));
110
111 (fut, url)
112 }
113
grpc_web(accept_h1: bool) -> (impl Future<Output = Result<(), Error>>, String)114 async fn grpc_web(accept_h1: bool) -> (impl Future<Output = Result<(), Error>>, String) {
115 let (listener, url) = bind().await;
116
117 let fut = Server::builder()
118 .accept_http1(accept_h1)
119 .layer(GrpcWebLayer::new())
120 .add_service(TestServer::new(Svc))
121 .serve_with_incoming(TcpListenerStream::new(listener));
122
123 (fut, url)
124 }
125
126 type Client = TestClient<Channel>;
127
spawn() -> Result<(Client, Client, Client, Client), Error>128 async fn spawn() -> Result<(Client, Client, Client, Client), Error> {
129 let ((s1, u1), (s2, u2), (s3, u3), (s4, u4)) =
130 join!(grpc(true), grpc(false), grpc_web(true), grpc_web(false));
131
132 drop(tokio::spawn(async move { join!(s1, s2, s3, s4) }));
133
134 tokio::time::sleep(Duration::from_millis(30)).await;
135
136 try_join!(
137 TestClient::connect(u1),
138 TestClient::connect(u2),
139 TestClient::connect(u3),
140 TestClient::connect(u4)
141 )
142 }
143
input() -> Input144 fn input() -> Input {
145 Input {
146 id: 1,
147 desc: "one".to_owned(),
148 }
149 }
150
meta<T>(r: &Response<T>) -> String151 fn meta<T>(r: &Response<T>) -> String {
152 format!("{:?}", r.metadata())
153 }
154
data<T>(r: &Response<T>) -> &T155 fn data<T>(r: &Response<T>) -> &T {
156 r.get_ref()
157 }
158
stream<T>(r: Response<Streaming<T>>) -> Vec<T>159 async fn stream<T>(r: Response<Streaming<T>>) -> Vec<T> {
160 r.into_inner().collect::<Result<Vec<_>, _>>().await.unwrap()
161 }
162
status(s: &tonic::Status) -> (String, tonic::Code)163 fn status(s: &tonic::Status) -> (String, tonic::Code) {
164 (format!("{:?}", s.metadata()), s.code())
165 }
166