1 use crate::error::Result;
2
3 use super::*;
4
5 use util::conn::conn_bridge::*;
6 use util::conn::*;
7
8 use tokio::io::AsyncReadExt;
9 use tokio::io::AsyncWriteExt;
10 use tokio::sync::{broadcast, mpsc};
11 use tokio::time::Duration;
12
bridge_process_at_least_one(br: &Arc<Bridge>)13 async fn bridge_process_at_least_one(br: &Arc<Bridge>) {
14 let mut n_sum = 0;
15 loop {
16 tokio::time::sleep(Duration::from_millis(10)).await;
17 n_sum += br.tick().await;
18 if br.len(0).await == 0 && br.len(1).await == 0 && n_sum > 0 {
19 break;
20 }
21 }
22 }
23
create_new_association_pair( br: &Arc<Bridge>, ca: Arc<dyn Conn + Send + Sync>, cb: Arc<dyn Conn + Send + Sync>, ) -> Result<(Arc<Association>, Arc<Association>)>24 async fn create_new_association_pair(
25 br: &Arc<Bridge>,
26 ca: Arc<dyn Conn + Send + Sync>,
27 cb: Arc<dyn Conn + Send + Sync>,
28 ) -> Result<(Arc<Association>, Arc<Association>)> {
29 let (handshake0ch_tx, mut handshake0ch_rx) = mpsc::channel(1);
30 let (handshake1ch_tx, mut handshake1ch_rx) = mpsc::channel(1);
31 let (closed_tx, mut closed_rx0) = broadcast::channel::<()>(1);
32 let mut closed_rx1 = closed_tx.subscribe();
33
34 // Setup client
35 tokio::spawn(async move {
36 let client = Association::client(sctp::association::Config {
37 net_conn: ca,
38 max_receive_buffer_size: 0,
39 max_message_size: 0,
40 name: "client".to_owned(),
41 })
42 .await;
43
44 let _ = handshake0ch_tx.send(client).await;
45 let _ = closed_rx0.recv().await;
46
47 Result::<()>::Ok(())
48 });
49
50 // Setup server
51 tokio::spawn(async move {
52 let server = Association::server(sctp::association::Config {
53 net_conn: cb,
54 max_receive_buffer_size: 0,
55 max_message_size: 0,
56 name: "server".to_owned(),
57 })
58 .await;
59
60 let _ = handshake1ch_tx.send(server).await;
61 let _ = closed_rx1.recv().await;
62
63 Result::<()>::Ok(())
64 });
65
66 let mut client = None;
67 let mut server = None;
68 let mut a0handshake_done = false;
69 let mut a1handshake_done = false;
70 let mut i = 0;
71 while (!a0handshake_done || !a1handshake_done) && i < 100 {
72 br.tick().await;
73
74 let timer = tokio::time::sleep(Duration::from_millis(10));
75 tokio::pin!(timer);
76
77 tokio::select! {
78 _ = timer.as_mut() =>{},
79 r0 = handshake0ch_rx.recv() => {
80 if let Ok(c) = r0.unwrap() {
81 client = Some(c);
82 }
83 a0handshake_done = true;
84 },
85 r1 = handshake1ch_rx.recv() => {
86 if let Ok(s) = r1.unwrap() {
87 server = Some(s);
88 }
89 a1handshake_done = true;
90 },
91 };
92 i += 1;
93 }
94
95 if !a0handshake_done || !a1handshake_done {
96 return Err(Error::new("handshake failed".to_owned()));
97 }
98
99 drop(closed_tx);
100
101 Ok((Arc::new(client.unwrap()), Arc::new(server.unwrap())))
102 }
103
close_association_pair( br: &Arc<Bridge>, client: Arc<Association>, server: Arc<Association>, )104 async fn close_association_pair(
105 br: &Arc<Bridge>,
106 client: Arc<Association>,
107 server: Arc<Association>,
108 ) {
109 let (handshake0ch_tx, mut handshake0ch_rx) = mpsc::channel(1);
110 let (handshake1ch_tx, mut handshake1ch_rx) = mpsc::channel(1);
111 let (closed_tx, mut closed_rx0) = broadcast::channel::<()>(1);
112 let mut closed_rx1 = closed_tx.subscribe();
113
114 // Close client
115 tokio::spawn(async move {
116 client.close().await?;
117 let _ = handshake0ch_tx.send(()).await;
118 let _ = closed_rx0.recv().await;
119
120 Result::<()>::Ok(())
121 });
122
123 // Close server
124 tokio::spawn(async move {
125 server.close().await?;
126 let _ = handshake1ch_tx.send(()).await;
127 let _ = closed_rx1.recv().await;
128
129 Result::<()>::Ok(())
130 });
131
132 let mut a0handshake_done = false;
133 let mut a1handshake_done = false;
134 let mut i = 0;
135 while (!a0handshake_done || !a1handshake_done) && i < 100 {
136 br.tick().await;
137
138 let timer = tokio::time::sleep(Duration::from_millis(10));
139 tokio::pin!(timer);
140
141 tokio::select! {
142 _ = timer.as_mut() =>{},
143 _ = handshake0ch_rx.recv() => {
144 a0handshake_done = true;
145 },
146 _ = handshake1ch_rx.recv() => {
147 a1handshake_done = true;
148 },
149 };
150 i += 1;
151 }
152
153 drop(closed_tx);
154 }
155
156 //use std::io::Write;
157
pr_ordered_unordered_test(channel_type: ChannelType, is_ordered: bool) -> Result<()>158 async fn pr_ordered_unordered_test(channel_type: ChannelType, is_ordered: bool) -> Result<()> {
159 /*env_logger::Builder::new()
160 .format(|buf, record| {
161 writeln!(
162 buf,
163 "{}:{} [{}] {} - {}",
164 record.file().unwrap_or("unknown"),
165 record.line().unwrap_or(0),
166 record.level(),
167 chrono::Local::now().format("%H:%M:%S.%6f"),
168 record.args()
169 )
170 })
171 .filter(None, log::LevelFilter::Trace)
172 .init();*/
173
174 let mut sbuf = vec![0u8; 1000];
175 let mut rbuf = vec![0u8; 2000];
176
177 let (br, ca, cb) = Bridge::new(0, None, None);
178
179 let (a0, a1) = create_new_association_pair(&br, Arc::new(ca), Arc::new(cb)).await?;
180
181 let cfg = Config {
182 channel_type,
183 reliability_parameter: 0,
184 label: "data".to_string(),
185 ..Default::default()
186 };
187
188 let dc0 = DataChannel::dial(&a0, 100, cfg.clone()).await?;
189 bridge_process_at_least_one(&br).await;
190
191 let existing_data_channels: Vec<DataChannel> = Vec::new();
192 let dc1 = DataChannel::accept(&a1, Config::default(), &existing_data_channels).await?;
193 bridge_process_at_least_one(&br).await;
194
195 assert_eq!(dc0.config, cfg, "local config should match");
196 assert_eq!(dc1.config, cfg, "remote config should match");
197
198 dc0.commit_reliability_params();
199 dc1.commit_reliability_params();
200
201 sbuf[0..4].copy_from_slice(&1u32.to_be_bytes());
202 let n = dc0
203 .write_data_channel(&Bytes::from(sbuf.clone()), true)
204 .await?;
205 assert_eq!(sbuf.len(), n, "data length should match");
206
207 sbuf[0..4].copy_from_slice(&2u32.to_be_bytes());
208 let n = dc0
209 .write_data_channel(&Bytes::from(sbuf.clone()), true)
210 .await?;
211 assert_eq!(sbuf.len(), n, "data length should match");
212
213 if !is_ordered {
214 sbuf[0..4].copy_from_slice(&3u32.to_be_bytes());
215 let n = dc0
216 .write_data_channel(&Bytes::from(sbuf.clone()), true)
217 .await?;
218 assert_eq!(sbuf.len(), n, "data length should match");
219 }
220
221 tokio::time::sleep(Duration::from_millis(100)).await;
222 br.drop_offset(0, 0, 1).await; // drop the first packet on the wire
223 if !is_ordered {
224 br.reorder(0).await;
225 } else {
226 tokio::time::sleep(Duration::from_millis(100)).await;
227 }
228 bridge_process_at_least_one(&br).await;
229
230 if !is_ordered {
231 let (n, is_string) = dc1.read_data_channel(&mut rbuf[..]).await?;
232 assert!(is_string, "should return isString being true");
233 assert_eq!(sbuf.len(), n, "data length should match");
234 assert_eq!(
235 3,
236 u32::from_be_bytes([rbuf[0], rbuf[1], rbuf[2], rbuf[3]]),
237 "data should match"
238 );
239 }
240
241 let (n, is_string) = dc1.read_data_channel(&mut rbuf[..]).await?;
242 assert!(is_string, "should return isString being true");
243 assert_eq!(sbuf.len(), n, "data length should match");
244 assert_eq!(
245 2,
246 u32::from_be_bytes([rbuf[0], rbuf[1], rbuf[2], rbuf[3]]),
247 "data should match"
248 );
249
250 dc0.close().await?;
251 dc1.close().await?;
252 bridge_process_at_least_one(&br).await;
253
254 close_association_pair(&br, a0, a1).await;
255
256 Ok(())
257 }
258
259 #[tokio::test]
test_data_channel_channel_type_reliable_ordered() -> Result<()>260 async fn test_data_channel_channel_type_reliable_ordered() -> Result<()> {
261 let mut sbuf = vec![0u8; 1000];
262 let mut rbuf = vec![0u8; 1500];
263
264 let (br, ca, cb) = Bridge::new(0, None, None);
265
266 let (a0, a1) = create_new_association_pair(&br, Arc::new(ca), Arc::new(cb)).await?;
267
268 let cfg = Config {
269 channel_type: ChannelType::Reliable,
270 reliability_parameter: 123,
271 label: "data".to_string(),
272 ..Default::default()
273 };
274
275 let dc0 = DataChannel::dial(&a0, 100, cfg.clone()).await?;
276 bridge_process_at_least_one(&br).await;
277
278 let existing_data_channels: Vec<DataChannel> = Vec::new();
279 let dc1 = DataChannel::accept(&a1, Config::default(), &existing_data_channels).await?;
280 bridge_process_at_least_one(&br).await;
281
282 assert_eq!(dc0.config, cfg, "local config should match");
283 assert_eq!(dc1.config, cfg, "remote config should match");
284
285 br.reorder_next_nwrites(0, 2); // reordering on the wire
286
287 sbuf[0..4].copy_from_slice(&1u32.to_be_bytes());
288 let n = dc0.write(&Bytes::from(sbuf.clone())).await?;
289 assert_eq!(sbuf.len(), n, "data length should match");
290
291 sbuf[0..4].copy_from_slice(&2u32.to_be_bytes());
292 let n = dc0.write(&Bytes::from(sbuf.clone())).await?;
293 assert_eq!(sbuf.len(), n, "data length should match");
294
295 bridge_process_at_least_one(&br).await;
296
297 let n = dc1.read(&mut rbuf[..]).await?;
298 assert_eq!(sbuf.len(), n, "data length should match");
299 assert_eq!(
300 1,
301 u32::from_be_bytes([rbuf[0], rbuf[1], rbuf[2], rbuf[3]]),
302 "data should match"
303 );
304
305 let n = dc1.read(&mut rbuf[..]).await?;
306 assert_eq!(sbuf.len(), n, "data length should match");
307 assert_eq!(
308 2,
309 u32::from_be_bytes([rbuf[0], rbuf[1], rbuf[2], rbuf[3]]),
310 "data should match"
311 );
312
313 dc0.close().await?;
314 dc1.close().await?;
315 bridge_process_at_least_one(&br).await;
316
317 close_association_pair(&br, a0, a1).await;
318
319 Ok(())
320 }
321
322 #[tokio::test]
test_data_channel_channel_type_reliable_unordered() -> Result<()>323 async fn test_data_channel_channel_type_reliable_unordered() -> Result<()> {
324 let mut sbuf = vec![0u8; 1000];
325 let mut rbuf = vec![0u8; 1500];
326
327 let (br, ca, cb) = Bridge::new(0, None, None);
328
329 let (a0, a1) = create_new_association_pair(&br, Arc::new(ca), Arc::new(cb)).await?;
330
331 let cfg = Config {
332 channel_type: ChannelType::ReliableUnordered,
333 reliability_parameter: 123,
334 label: "data".to_string(),
335 ..Default::default()
336 };
337
338 let dc0 = DataChannel::dial(&a0, 100, cfg.clone()).await?;
339 bridge_process_at_least_one(&br).await;
340
341 let existing_data_channels: Vec<DataChannel> = Vec::new();
342 let dc1 = DataChannel::accept(&a1, Config::default(), &existing_data_channels).await?;
343 bridge_process_at_least_one(&br).await;
344
345 assert_eq!(dc0.config, cfg, "local config should match");
346 assert_eq!(dc1.config, cfg, "remote config should match");
347
348 dc0.commit_reliability_params();
349 dc1.commit_reliability_params();
350
351 sbuf[0..4].copy_from_slice(&1u32.to_be_bytes());
352 let n = dc0
353 .write_data_channel(&Bytes::from(sbuf.clone()), true)
354 .await?;
355 assert_eq!(sbuf.len(), n, "data length should match");
356
357 sbuf[0..4].copy_from_slice(&2u32.to_be_bytes());
358 let n = dc0
359 .write_data_channel(&Bytes::from(sbuf.clone()), true)
360 .await?;
361 assert_eq!(sbuf.len(), n, "data length should match");
362
363 tokio::time::sleep(Duration::from_millis(100)).await;
364 br.reorder(0).await; // reordering on the wire
365 bridge_process_at_least_one(&br).await;
366
367 let (n, is_string) = dc1.read_data_channel(&mut rbuf[..]).await?;
368 assert!(is_string, "should return isString being true");
369 assert_eq!(sbuf.len(), n, "data length should match");
370 assert_eq!(
371 2,
372 u32::from_be_bytes([rbuf[0], rbuf[1], rbuf[2], rbuf[3]]),
373 "data should match"
374 );
375
376 let (n, is_string) = dc1.read_data_channel(&mut rbuf[..]).await?;
377 assert!(is_string, "should return isString being true");
378 assert_eq!(sbuf.len(), n, "data length should match");
379 assert_eq!(
380 1,
381 u32::from_be_bytes([rbuf[0], rbuf[1], rbuf[2], rbuf[3]]),
382 "data should match"
383 );
384
385 dc0.close().await?;
386 dc1.close().await?;
387 bridge_process_at_least_one(&br).await;
388
389 close_association_pair(&br, a0, a1).await;
390
391 Ok(())
392 }
393
394 #[cfg(not(target_os = "windows"))] // this times out in CI on windows.
395 #[tokio::test]
test_data_channel_channel_type_partial_reliable_rexmit() -> Result<()>396 async fn test_data_channel_channel_type_partial_reliable_rexmit() -> Result<()> {
397 pr_ordered_unordered_test(ChannelType::PartialReliableRexmit, true).await
398 }
399
400 #[cfg(not(target_os = "windows"))] // this times out in CI on windows.
401 #[tokio::test]
test_data_channel_channel_type_partial_reliable_rexmit_unordered() -> Result<()>402 async fn test_data_channel_channel_type_partial_reliable_rexmit_unordered() -> Result<()> {
403 pr_ordered_unordered_test(ChannelType::PartialReliableRexmitUnordered, false).await
404 }
405
406 #[cfg(not(target_os = "windows"))] // this times out in CI on windows.
407 #[tokio::test]
test_data_channel_channel_type_partial_reliable_timed() -> Result<()>408 async fn test_data_channel_channel_type_partial_reliable_timed() -> Result<()> {
409 pr_ordered_unordered_test(ChannelType::PartialReliableTimed, true).await
410 }
411
412 #[cfg(not(target_os = "windows"))] // this times out in CI on windows.
413 #[tokio::test]
test_data_channel_channel_type_partial_reliable_timed_unordered() -> Result<()>414 async fn test_data_channel_channel_type_partial_reliable_timed_unordered() -> Result<()> {
415 pr_ordered_unordered_test(ChannelType::PartialReliableTimedUnordered, false).await
416 }
417
418 //TODO: remove this conditional test
419 #[cfg(not(any(target_os = "macos", target_os = "windows")))]
420 #[tokio::test]
test_data_channel_buffered_amount() -> Result<()>421 async fn test_data_channel_buffered_amount() -> Result<()> {
422 let sbuf = vec![0u8; 1000];
423 let mut rbuf = vec![0u8; 1000];
424
425 let n_cbs = Arc::new(AtomicUsize::new(0));
426
427 let (br, ca, cb) = Bridge::new(0, None, None);
428
429 let (a0, a1) = create_new_association_pair(&br, Arc::new(ca), Arc::new(cb)).await?;
430
431 let dc0 = Arc::new(
432 DataChannel::dial(
433 &a0,
434 100,
435 Config {
436 label: "data".to_owned(),
437 ..Default::default()
438 },
439 )
440 .await?,
441 );
442 bridge_process_at_least_one(&br).await;
443
444 let existing_data_channels: Vec<DataChannel> = Vec::new();
445 let dc1 = Arc::new(DataChannel::accept(&a1, Config::default(), &existing_data_channels).await?);
446 bridge_process_at_least_one(&br).await;
447
448 while dc0.buffered_amount() > 0 {
449 bridge_process_at_least_one(&br).await;
450 }
451
452 let n = dc0.write(&Bytes::new()).await?;
453 assert_eq!(n, 0, "data length should match");
454 assert_eq!(dc0.buffered_amount(), 1, "incorrect bufferedAmount");
455
456 let n = dc0.write(&Bytes::from_static(&[0])).await?;
457 assert_eq!(n, 1, "data length should match");
458 assert_eq!(dc0.buffered_amount(), 2, "incorrect bufferedAmount");
459
460 bridge_process_at_least_one(&br).await;
461
462 let n = dc1.read(&mut rbuf[..]).await?;
463 assert_eq!(n, 0, "received length should match");
464
465 let n = dc1.read(&mut rbuf[..]).await?;
466 assert_eq!(n, 1, "received length should match");
467
468 dc0.set_buffered_amount_low_threshold(1500);
469 assert_eq!(
470 dc0.buffered_amount_low_threshold(),
471 1500,
472 "incorrect bufferedAmountLowThreshold"
473 );
474 let n_cbs2 = Arc::clone(&n_cbs);
475 dc0.on_buffered_amount_low(Box::new(move || {
476 n_cbs2.fetch_add(1, Ordering::SeqCst);
477 Box::pin(async {})
478 }));
479
480 // Write 10 1000-byte packets (total 10,000 bytes)
481 for i in 0..10 {
482 let n = dc0.write(&Bytes::from(sbuf.clone())).await?;
483 assert_eq!(sbuf.len(), n, "data length should match");
484 assert_eq!(
485 sbuf.len() * (i + 1) + 2,
486 dc0.buffered_amount(),
487 "incorrect bufferedAmount"
488 );
489 }
490
491 let dc1_cloned = Arc::clone(&dc1);
492 tokio::spawn(async move {
493 while let Ok(n) = dc1_cloned.read(&mut rbuf[..]).await {
494 if n == 0 {
495 break;
496 }
497 assert_eq!(n, rbuf.len(), "received length should match");
498 }
499 });
500
501 let since = tokio::time::Instant::now();
502 loop {
503 br.tick().await;
504 tokio::time::sleep(Duration::from_millis(10)).await;
505 if tokio::time::Instant::now().duration_since(since) > Duration::from_millis(500) {
506 break;
507 }
508 }
509
510 dc0.close().await?;
511 dc1.close().await?;
512 bridge_process_at_least_one(&br).await;
513
514 assert!(
515 n_cbs.load(Ordering::SeqCst) > 0,
516 "should make at least one callback"
517 );
518
519 close_association_pair(&br, a0, a1).await;
520
521 Ok(())
522 }
523
524 //TODO: remove this conditional test
525 #[cfg(not(any(target_os = "macos", target_os = "windows")))] // this times out in CI on windows.
526 #[tokio::test]
test_stats() -> Result<()>527 async fn test_stats() -> Result<()> {
528 let sbuf = vec![0u8; 1000];
529 let mut rbuf = vec![0u8; 1500];
530
531 let (br, ca, cb) = Bridge::new(0, None, None);
532
533 let (a0, a1) = create_new_association_pair(&br, Arc::new(ca), Arc::new(cb)).await?;
534
535 let cfg = Config {
536 channel_type: ChannelType::Reliable,
537 reliability_parameter: 123,
538 label: "data".to_owned(),
539 ..Default::default()
540 };
541
542 let dc0 = DataChannel::dial(&a0, 100, cfg.clone()).await?;
543 bridge_process_at_least_one(&br).await;
544
545 let existing_data_channels: Vec<DataChannel> = Vec::new();
546 let dc1 = DataChannel::accept(&a1, Config::default(), &existing_data_channels).await?;
547 bridge_process_at_least_one(&br).await;
548
549 let mut bytes_sent = 0;
550
551 let n = dc0.write(&Bytes::from(sbuf.clone())).await?;
552 assert_eq!(n, sbuf.len(), "data length should match");
553 bytes_sent += n;
554
555 assert_eq!(dc0.bytes_sent(), bytes_sent);
556 assert_eq!(dc0.messages_sent(), 1);
557
558 let n = dc0.write(&Bytes::from(sbuf.clone())).await?;
559 assert_eq!(n, sbuf.len(), "data length should match");
560 bytes_sent += n;
561
562 assert_eq!(dc0.bytes_sent(), bytes_sent);
563 assert_eq!(dc0.messages_sent(), 2);
564
565 let n = dc0.write(&Bytes::from_static(&[0])).await?;
566 assert_eq!(n, 1, "data length should match");
567 bytes_sent += n;
568
569 assert_eq!(dc0.bytes_sent(), bytes_sent);
570 assert_eq!(dc0.messages_sent(), 3);
571
572 let n = dc0.write(&Bytes::from_static(&[])).await?;
573 assert_eq!(n, 0, "data length should match");
574 bytes_sent += n;
575
576 assert_eq!(dc0.bytes_sent(), bytes_sent);
577 assert_eq!(dc0.messages_sent(), 4);
578
579 bridge_process_at_least_one(&br).await;
580
581 let mut bytes_read = 0;
582
583 let n = dc1.read(&mut rbuf[..]).await?;
584 assert_eq!(n, sbuf.len(), "data length should match");
585 bytes_read += n;
586
587 assert_eq!(dc1.bytes_received(), bytes_read);
588 assert_eq!(dc1.messages_received(), 1);
589
590 let n = dc1.read(&mut rbuf[..]).await?;
591 assert_eq!(n, sbuf.len(), "data length should match");
592 bytes_read += n;
593
594 assert_eq!(dc1.bytes_received(), bytes_read);
595 assert_eq!(dc1.messages_received(), 2);
596
597 let n = dc1.read(&mut rbuf[..]).await?;
598 assert_eq!(n, 1, "data length should match");
599 bytes_read += n;
600
601 assert_eq!(dc1.bytes_received(), bytes_read);
602 assert_eq!(dc1.messages_received(), 3);
603
604 let n = dc1.read(&mut rbuf[..]).await?;
605 assert_eq!(n, 0, "data length should match");
606 bytes_read += n;
607
608 assert_eq!(dc1.bytes_received(), bytes_read);
609 assert_eq!(dc1.messages_received(), 4);
610
611 dc0.close().await?;
612 dc1.close().await?;
613 bridge_process_at_least_one(&br).await;
614
615 close_association_pair(&br, a0, a1).await;
616
617 Ok(())
618 }
619
620 #[tokio::test]
test_poll_data_channel() -> Result<()>621 async fn test_poll_data_channel() -> Result<()> {
622 let mut sbuf = vec![0u8; 1000];
623 let mut rbuf = vec![0u8; 1500];
624
625 let (br, ca, cb) = Bridge::new(0, None, None);
626
627 let (a0, a1) = create_new_association_pair(&br, Arc::new(ca), Arc::new(cb)).await?;
628
629 let cfg = Config {
630 channel_type: ChannelType::Reliable,
631 reliability_parameter: 123,
632 label: "data".to_string(),
633 ..Default::default()
634 };
635
636 let dc0 = Arc::new(DataChannel::dial(&a0, 100, cfg.clone()).await?);
637 bridge_process_at_least_one(&br).await;
638
639 let existing_data_channels: Vec<DataChannel> = Vec::new();
640 let dc1 = Arc::new(DataChannel::accept(&a1, Config::default(), &existing_data_channels).await?);
641 bridge_process_at_least_one(&br).await;
642
643 let mut poll_dc0 = PollDataChannel::new(dc0);
644 let mut poll_dc1 = PollDataChannel::new(dc1);
645
646 sbuf[0..4].copy_from_slice(&1u32.to_be_bytes());
647 let n = poll_dc0
648 .write(&Bytes::from(sbuf.clone()))
649 .await
650 .map_err(|e| Error::new(e.to_string()))?;
651 assert_eq!(sbuf.len(), n, "data length should match");
652
653 bridge_process_at_least_one(&br).await;
654
655 let n = poll_dc1
656 .read(&mut rbuf[..])
657 .await
658 .map_err(|e| Error::new(e.to_string()))?;
659 assert_eq!(sbuf.len(), n, "data length should match");
660 assert_eq!(
661 1,
662 u32::from_be_bytes([rbuf[0], rbuf[1], rbuf[2], rbuf[3]]),
663 "data should match"
664 );
665
666 poll_dc0.into_inner().close().await?;
667 poll_dc1.into_inner().close().await?;
668 bridge_process_at_least_one(&br).await;
669
670 close_association_pair(&br, a0, a1).await;
671
672 Ok(())
673 }
674