1 use crate::error::Result;
2 
3 use super::*;
4 
5 use util::conn::conn_bridge::*;
6 use util::conn::*;
7 
8 use tokio::io::AsyncReadExt;
9 use tokio::io::AsyncWriteExt;
10 use tokio::sync::{broadcast, mpsc};
11 use tokio::time::Duration;
12 
bridge_process_at_least_one(br: &Arc<Bridge>)13 async fn bridge_process_at_least_one(br: &Arc<Bridge>) {
14     let mut n_sum = 0;
15     loop {
16         tokio::time::sleep(Duration::from_millis(10)).await;
17         n_sum += br.tick().await;
18         if br.len(0).await == 0 && br.len(1).await == 0 && n_sum > 0 {
19             break;
20         }
21     }
22 }
23 
create_new_association_pair( br: &Arc<Bridge>, ca: Arc<dyn Conn + Send + Sync>, cb: Arc<dyn Conn + Send + Sync>, ) -> Result<(Arc<Association>, Arc<Association>)>24 async fn create_new_association_pair(
25     br: &Arc<Bridge>,
26     ca: Arc<dyn Conn + Send + Sync>,
27     cb: Arc<dyn Conn + Send + Sync>,
28 ) -> Result<(Arc<Association>, Arc<Association>)> {
29     let (handshake0ch_tx, mut handshake0ch_rx) = mpsc::channel(1);
30     let (handshake1ch_tx, mut handshake1ch_rx) = mpsc::channel(1);
31     let (closed_tx, mut closed_rx0) = broadcast::channel::<()>(1);
32     let mut closed_rx1 = closed_tx.subscribe();
33 
34     // Setup client
35     tokio::spawn(async move {
36         let client = Association::client(sctp::association::Config {
37             net_conn: ca,
38             max_receive_buffer_size: 0,
39             max_message_size: 0,
40             name: "client".to_owned(),
41         })
42         .await;
43 
44         let _ = handshake0ch_tx.send(client).await;
45         let _ = closed_rx0.recv().await;
46 
47         Result::<()>::Ok(())
48     });
49 
50     // Setup server
51     tokio::spawn(async move {
52         let server = Association::server(sctp::association::Config {
53             net_conn: cb,
54             max_receive_buffer_size: 0,
55             max_message_size: 0,
56             name: "server".to_owned(),
57         })
58         .await;
59 
60         let _ = handshake1ch_tx.send(server).await;
61         let _ = closed_rx1.recv().await;
62 
63         Result::<()>::Ok(())
64     });
65 
66     let mut client = None;
67     let mut server = None;
68     let mut a0handshake_done = false;
69     let mut a1handshake_done = false;
70     let mut i = 0;
71     while (!a0handshake_done || !a1handshake_done) && i < 100 {
72         br.tick().await;
73 
74         let timer = tokio::time::sleep(Duration::from_millis(10));
75         tokio::pin!(timer);
76 
77         tokio::select! {
78             _ = timer.as_mut() =>{},
79             r0 = handshake0ch_rx.recv() => {
80                 if let Ok(c) = r0.unwrap() {
81                     client = Some(c);
82                 }
83                 a0handshake_done = true;
84             },
85             r1 = handshake1ch_rx.recv() => {
86                 if let Ok(s) = r1.unwrap() {
87                     server = Some(s);
88                 }
89                 a1handshake_done = true;
90             },
91         };
92         i += 1;
93     }
94 
95     if !a0handshake_done || !a1handshake_done {
96         return Err(Error::new("handshake failed".to_owned()));
97     }
98 
99     drop(closed_tx);
100 
101     Ok((Arc::new(client.unwrap()), Arc::new(server.unwrap())))
102 }
103 
close_association_pair( br: &Arc<Bridge>, client: Arc<Association>, server: Arc<Association>, )104 async fn close_association_pair(
105     br: &Arc<Bridge>,
106     client: Arc<Association>,
107     server: Arc<Association>,
108 ) {
109     let (handshake0ch_tx, mut handshake0ch_rx) = mpsc::channel(1);
110     let (handshake1ch_tx, mut handshake1ch_rx) = mpsc::channel(1);
111     let (closed_tx, mut closed_rx0) = broadcast::channel::<()>(1);
112     let mut closed_rx1 = closed_tx.subscribe();
113 
114     // Close client
115     tokio::spawn(async move {
116         client.close().await?;
117         let _ = handshake0ch_tx.send(()).await;
118         let _ = closed_rx0.recv().await;
119 
120         Result::<()>::Ok(())
121     });
122 
123     // Close server
124     tokio::spawn(async move {
125         server.close().await?;
126         let _ = handshake1ch_tx.send(()).await;
127         let _ = closed_rx1.recv().await;
128 
129         Result::<()>::Ok(())
130     });
131 
132     let mut a0handshake_done = false;
133     let mut a1handshake_done = false;
134     let mut i = 0;
135     while (!a0handshake_done || !a1handshake_done) && i < 100 {
136         br.tick().await;
137 
138         let timer = tokio::time::sleep(Duration::from_millis(10));
139         tokio::pin!(timer);
140 
141         tokio::select! {
142             _ = timer.as_mut() =>{},
143             _ = handshake0ch_rx.recv() => {
144                 a0handshake_done = true;
145             },
146             _ = handshake1ch_rx.recv() => {
147                 a1handshake_done = true;
148             },
149         };
150         i += 1;
151     }
152 
153     drop(closed_tx);
154 }
155 
156 //use std::io::Write;
157 
pr_ordered_unordered_test(channel_type: ChannelType, is_ordered: bool) -> Result<()>158 async fn pr_ordered_unordered_test(channel_type: ChannelType, is_ordered: bool) -> Result<()> {
159     /*env_logger::Builder::new()
160     .format(|buf, record| {
161         writeln!(
162             buf,
163             "{}:{} [{}] {} - {}",
164             record.file().unwrap_or("unknown"),
165             record.line().unwrap_or(0),
166             record.level(),
167             chrono::Local::now().format("%H:%M:%S.%6f"),
168             record.args()
169         )
170     })
171     .filter(None, log::LevelFilter::Trace)
172     .init();*/
173 
174     let mut sbuf = vec![0u8; 1000];
175     let mut rbuf = vec![0u8; 2000];
176 
177     let (br, ca, cb) = Bridge::new(0, None, None);
178 
179     let (a0, a1) = create_new_association_pair(&br, Arc::new(ca), Arc::new(cb)).await?;
180 
181     let cfg = Config {
182         channel_type,
183         reliability_parameter: 0,
184         label: "data".to_string(),
185         ..Default::default()
186     };
187 
188     let dc0 = DataChannel::dial(&a0, 100, cfg.clone()).await?;
189     bridge_process_at_least_one(&br).await;
190 
191     let existing_data_channels: Vec<DataChannel> = Vec::new();
192     let dc1 = DataChannel::accept(&a1, Config::default(), &existing_data_channels).await?;
193     bridge_process_at_least_one(&br).await;
194 
195     assert_eq!(dc0.config, cfg, "local config should match");
196     assert_eq!(dc1.config, cfg, "remote config should match");
197 
198     dc0.commit_reliability_params();
199     dc1.commit_reliability_params();
200 
201     sbuf[0..4].copy_from_slice(&1u32.to_be_bytes());
202     let n = dc0
203         .write_data_channel(&Bytes::from(sbuf.clone()), true)
204         .await?;
205     assert_eq!(sbuf.len(), n, "data length should match");
206 
207     sbuf[0..4].copy_from_slice(&2u32.to_be_bytes());
208     let n = dc0
209         .write_data_channel(&Bytes::from(sbuf.clone()), true)
210         .await?;
211     assert_eq!(sbuf.len(), n, "data length should match");
212 
213     if !is_ordered {
214         sbuf[0..4].copy_from_slice(&3u32.to_be_bytes());
215         let n = dc0
216             .write_data_channel(&Bytes::from(sbuf.clone()), true)
217             .await?;
218         assert_eq!(sbuf.len(), n, "data length should match");
219     }
220 
221     tokio::time::sleep(Duration::from_millis(100)).await;
222     br.drop_offset(0, 0, 1).await; // drop the first packet on the wire
223     if !is_ordered {
224         br.reorder(0).await;
225     } else {
226         tokio::time::sleep(Duration::from_millis(100)).await;
227     }
228     bridge_process_at_least_one(&br).await;
229 
230     if !is_ordered {
231         let (n, is_string) = dc1.read_data_channel(&mut rbuf[..]).await?;
232         assert!(is_string, "should return isString being true");
233         assert_eq!(sbuf.len(), n, "data length should match");
234         assert_eq!(
235             3,
236             u32::from_be_bytes([rbuf[0], rbuf[1], rbuf[2], rbuf[3]]),
237             "data should match"
238         );
239     }
240 
241     let (n, is_string) = dc1.read_data_channel(&mut rbuf[..]).await?;
242     assert!(is_string, "should return isString being true");
243     assert_eq!(sbuf.len(), n, "data length should match");
244     assert_eq!(
245         2,
246         u32::from_be_bytes([rbuf[0], rbuf[1], rbuf[2], rbuf[3]]),
247         "data should match"
248     );
249 
250     dc0.close().await?;
251     dc1.close().await?;
252     bridge_process_at_least_one(&br).await;
253 
254     close_association_pair(&br, a0, a1).await;
255 
256     Ok(())
257 }
258 
259 #[tokio::test]
test_data_channel_channel_type_reliable_ordered() -> Result<()>260 async fn test_data_channel_channel_type_reliable_ordered() -> Result<()> {
261     let mut sbuf = vec![0u8; 1000];
262     let mut rbuf = vec![0u8; 1500];
263 
264     let (br, ca, cb) = Bridge::new(0, None, None);
265 
266     let (a0, a1) = create_new_association_pair(&br, Arc::new(ca), Arc::new(cb)).await?;
267 
268     let cfg = Config {
269         channel_type: ChannelType::Reliable,
270         reliability_parameter: 123,
271         label: "data".to_string(),
272         ..Default::default()
273     };
274 
275     let dc0 = DataChannel::dial(&a0, 100, cfg.clone()).await?;
276     bridge_process_at_least_one(&br).await;
277 
278     let existing_data_channels: Vec<DataChannel> = Vec::new();
279     let dc1 = DataChannel::accept(&a1, Config::default(), &existing_data_channels).await?;
280     bridge_process_at_least_one(&br).await;
281 
282     assert_eq!(dc0.config, cfg, "local config should match");
283     assert_eq!(dc1.config, cfg, "remote config should match");
284 
285     br.reorder_next_nwrites(0, 2); // reordering on the wire
286 
287     sbuf[0..4].copy_from_slice(&1u32.to_be_bytes());
288     let n = dc0.write(&Bytes::from(sbuf.clone())).await?;
289     assert_eq!(sbuf.len(), n, "data length should match");
290 
291     sbuf[0..4].copy_from_slice(&2u32.to_be_bytes());
292     let n = dc0.write(&Bytes::from(sbuf.clone())).await?;
293     assert_eq!(sbuf.len(), n, "data length should match");
294 
295     bridge_process_at_least_one(&br).await;
296 
297     let n = dc1.read(&mut rbuf[..]).await?;
298     assert_eq!(sbuf.len(), n, "data length should match");
299     assert_eq!(
300         1,
301         u32::from_be_bytes([rbuf[0], rbuf[1], rbuf[2], rbuf[3]]),
302         "data should match"
303     );
304 
305     let n = dc1.read(&mut rbuf[..]).await?;
306     assert_eq!(sbuf.len(), n, "data length should match");
307     assert_eq!(
308         2,
309         u32::from_be_bytes([rbuf[0], rbuf[1], rbuf[2], rbuf[3]]),
310         "data should match"
311     );
312 
313     dc0.close().await?;
314     dc1.close().await?;
315     bridge_process_at_least_one(&br).await;
316 
317     close_association_pair(&br, a0, a1).await;
318 
319     Ok(())
320 }
321 
322 #[tokio::test]
test_data_channel_channel_type_reliable_unordered() -> Result<()>323 async fn test_data_channel_channel_type_reliable_unordered() -> Result<()> {
324     let mut sbuf = vec![0u8; 1000];
325     let mut rbuf = vec![0u8; 1500];
326 
327     let (br, ca, cb) = Bridge::new(0, None, None);
328 
329     let (a0, a1) = create_new_association_pair(&br, Arc::new(ca), Arc::new(cb)).await?;
330 
331     let cfg = Config {
332         channel_type: ChannelType::ReliableUnordered,
333         reliability_parameter: 123,
334         label: "data".to_string(),
335         ..Default::default()
336     };
337 
338     let dc0 = DataChannel::dial(&a0, 100, cfg.clone()).await?;
339     bridge_process_at_least_one(&br).await;
340 
341     let existing_data_channels: Vec<DataChannel> = Vec::new();
342     let dc1 = DataChannel::accept(&a1, Config::default(), &existing_data_channels).await?;
343     bridge_process_at_least_one(&br).await;
344 
345     assert_eq!(dc0.config, cfg, "local config should match");
346     assert_eq!(dc1.config, cfg, "remote config should match");
347 
348     dc0.commit_reliability_params();
349     dc1.commit_reliability_params();
350 
351     sbuf[0..4].copy_from_slice(&1u32.to_be_bytes());
352     let n = dc0
353         .write_data_channel(&Bytes::from(sbuf.clone()), true)
354         .await?;
355     assert_eq!(sbuf.len(), n, "data length should match");
356 
357     sbuf[0..4].copy_from_slice(&2u32.to_be_bytes());
358     let n = dc0
359         .write_data_channel(&Bytes::from(sbuf.clone()), true)
360         .await?;
361     assert_eq!(sbuf.len(), n, "data length should match");
362 
363     tokio::time::sleep(Duration::from_millis(100)).await;
364     br.reorder(0).await; // reordering on the wire
365     bridge_process_at_least_one(&br).await;
366 
367     let (n, is_string) = dc1.read_data_channel(&mut rbuf[..]).await?;
368     assert!(is_string, "should return isString being true");
369     assert_eq!(sbuf.len(), n, "data length should match");
370     assert_eq!(
371         2,
372         u32::from_be_bytes([rbuf[0], rbuf[1], rbuf[2], rbuf[3]]),
373         "data should match"
374     );
375 
376     let (n, is_string) = dc1.read_data_channel(&mut rbuf[..]).await?;
377     assert!(is_string, "should return isString being true");
378     assert_eq!(sbuf.len(), n, "data length should match");
379     assert_eq!(
380         1,
381         u32::from_be_bytes([rbuf[0], rbuf[1], rbuf[2], rbuf[3]]),
382         "data should match"
383     );
384 
385     dc0.close().await?;
386     dc1.close().await?;
387     bridge_process_at_least_one(&br).await;
388 
389     close_association_pair(&br, a0, a1).await;
390 
391     Ok(())
392 }
393 
394 #[cfg(not(target_os = "windows"))] // this times out in CI on windows.
395 #[tokio::test]
test_data_channel_channel_type_partial_reliable_rexmit() -> Result<()>396 async fn test_data_channel_channel_type_partial_reliable_rexmit() -> Result<()> {
397     pr_ordered_unordered_test(ChannelType::PartialReliableRexmit, true).await
398 }
399 
400 #[cfg(not(target_os = "windows"))] // this times out in CI on windows.
401 #[tokio::test]
test_data_channel_channel_type_partial_reliable_rexmit_unordered() -> Result<()>402 async fn test_data_channel_channel_type_partial_reliable_rexmit_unordered() -> Result<()> {
403     pr_ordered_unordered_test(ChannelType::PartialReliableRexmitUnordered, false).await
404 }
405 
406 #[cfg(not(target_os = "windows"))] // this times out in CI on windows.
407 #[tokio::test]
test_data_channel_channel_type_partial_reliable_timed() -> Result<()>408 async fn test_data_channel_channel_type_partial_reliable_timed() -> Result<()> {
409     pr_ordered_unordered_test(ChannelType::PartialReliableTimed, true).await
410 }
411 
412 #[cfg(not(target_os = "windows"))] // this times out in CI on windows.
413 #[tokio::test]
test_data_channel_channel_type_partial_reliable_timed_unordered() -> Result<()>414 async fn test_data_channel_channel_type_partial_reliable_timed_unordered() -> Result<()> {
415     pr_ordered_unordered_test(ChannelType::PartialReliableTimedUnordered, false).await
416 }
417 
418 //TODO: remove this conditional test
419 #[cfg(not(any(target_os = "macos", target_os = "windows")))]
420 #[tokio::test]
test_data_channel_buffered_amount() -> Result<()>421 async fn test_data_channel_buffered_amount() -> Result<()> {
422     let sbuf = vec![0u8; 1000];
423     let mut rbuf = vec![0u8; 1000];
424 
425     let n_cbs = Arc::new(AtomicUsize::new(0));
426 
427     let (br, ca, cb) = Bridge::new(0, None, None);
428 
429     let (a0, a1) = create_new_association_pair(&br, Arc::new(ca), Arc::new(cb)).await?;
430 
431     let dc0 = Arc::new(
432         DataChannel::dial(
433             &a0,
434             100,
435             Config {
436                 label: "data".to_owned(),
437                 ..Default::default()
438             },
439         )
440         .await?,
441     );
442     bridge_process_at_least_one(&br).await;
443 
444     let existing_data_channels: Vec<DataChannel> = Vec::new();
445     let dc1 = Arc::new(DataChannel::accept(&a1, Config::default(), &existing_data_channels).await?);
446     bridge_process_at_least_one(&br).await;
447 
448     while dc0.buffered_amount() > 0 {
449         bridge_process_at_least_one(&br).await;
450     }
451 
452     let n = dc0.write(&Bytes::new()).await?;
453     assert_eq!(n, 0, "data length should match");
454     assert_eq!(dc0.buffered_amount(), 1, "incorrect bufferedAmount");
455 
456     let n = dc0.write(&Bytes::from_static(&[0])).await?;
457     assert_eq!(n, 1, "data length should match");
458     assert_eq!(dc0.buffered_amount(), 2, "incorrect bufferedAmount");
459 
460     bridge_process_at_least_one(&br).await;
461 
462     let n = dc1.read(&mut rbuf[..]).await?;
463     assert_eq!(n, 0, "received length should match");
464 
465     let n = dc1.read(&mut rbuf[..]).await?;
466     assert_eq!(n, 1, "received length should match");
467 
468     dc0.set_buffered_amount_low_threshold(1500);
469     assert_eq!(
470         dc0.buffered_amount_low_threshold(),
471         1500,
472         "incorrect bufferedAmountLowThreshold"
473     );
474     let n_cbs2 = Arc::clone(&n_cbs);
475     dc0.on_buffered_amount_low(Box::new(move || {
476         n_cbs2.fetch_add(1, Ordering::SeqCst);
477         Box::pin(async {})
478     }));
479 
480     // Write 10 1000-byte packets (total 10,000 bytes)
481     for i in 0..10 {
482         let n = dc0.write(&Bytes::from(sbuf.clone())).await?;
483         assert_eq!(sbuf.len(), n, "data length should match");
484         assert_eq!(
485             sbuf.len() * (i + 1) + 2,
486             dc0.buffered_amount(),
487             "incorrect bufferedAmount"
488         );
489     }
490 
491     let dc1_cloned = Arc::clone(&dc1);
492     tokio::spawn(async move {
493         while let Ok(n) = dc1_cloned.read(&mut rbuf[..]).await {
494             if n == 0 {
495                 break;
496             }
497             assert_eq!(n, rbuf.len(), "received length should match");
498         }
499     });
500 
501     let since = tokio::time::Instant::now();
502     loop {
503         br.tick().await;
504         tokio::time::sleep(Duration::from_millis(10)).await;
505         if tokio::time::Instant::now().duration_since(since) > Duration::from_millis(500) {
506             break;
507         }
508     }
509 
510     dc0.close().await?;
511     dc1.close().await?;
512     bridge_process_at_least_one(&br).await;
513 
514     assert!(
515         n_cbs.load(Ordering::SeqCst) > 0,
516         "should make at least one callback"
517     );
518 
519     close_association_pair(&br, a0, a1).await;
520 
521     Ok(())
522 }
523 
524 //TODO: remove this conditional test
525 #[cfg(not(any(target_os = "macos", target_os = "windows")))] // this times out in CI on windows.
526 #[tokio::test]
test_stats() -> Result<()>527 async fn test_stats() -> Result<()> {
528     let sbuf = vec![0u8; 1000];
529     let mut rbuf = vec![0u8; 1500];
530 
531     let (br, ca, cb) = Bridge::new(0, None, None);
532 
533     let (a0, a1) = create_new_association_pair(&br, Arc::new(ca), Arc::new(cb)).await?;
534 
535     let cfg = Config {
536         channel_type: ChannelType::Reliable,
537         reliability_parameter: 123,
538         label: "data".to_owned(),
539         ..Default::default()
540     };
541 
542     let dc0 = DataChannel::dial(&a0, 100, cfg.clone()).await?;
543     bridge_process_at_least_one(&br).await;
544 
545     let existing_data_channels: Vec<DataChannel> = Vec::new();
546     let dc1 = DataChannel::accept(&a1, Config::default(), &existing_data_channels).await?;
547     bridge_process_at_least_one(&br).await;
548 
549     let mut bytes_sent = 0;
550 
551     let n = dc0.write(&Bytes::from(sbuf.clone())).await?;
552     assert_eq!(n, sbuf.len(), "data length should match");
553     bytes_sent += n;
554 
555     assert_eq!(dc0.bytes_sent(), bytes_sent);
556     assert_eq!(dc0.messages_sent(), 1);
557 
558     let n = dc0.write(&Bytes::from(sbuf.clone())).await?;
559     assert_eq!(n, sbuf.len(), "data length should match");
560     bytes_sent += n;
561 
562     assert_eq!(dc0.bytes_sent(), bytes_sent);
563     assert_eq!(dc0.messages_sent(), 2);
564 
565     let n = dc0.write(&Bytes::from_static(&[0])).await?;
566     assert_eq!(n, 1, "data length should match");
567     bytes_sent += n;
568 
569     assert_eq!(dc0.bytes_sent(), bytes_sent);
570     assert_eq!(dc0.messages_sent(), 3);
571 
572     let n = dc0.write(&Bytes::from_static(&[])).await?;
573     assert_eq!(n, 0, "data length should match");
574     bytes_sent += n;
575 
576     assert_eq!(dc0.bytes_sent(), bytes_sent);
577     assert_eq!(dc0.messages_sent(), 4);
578 
579     bridge_process_at_least_one(&br).await;
580 
581     let mut bytes_read = 0;
582 
583     let n = dc1.read(&mut rbuf[..]).await?;
584     assert_eq!(n, sbuf.len(), "data length should match");
585     bytes_read += n;
586 
587     assert_eq!(dc1.bytes_received(), bytes_read);
588     assert_eq!(dc1.messages_received(), 1);
589 
590     let n = dc1.read(&mut rbuf[..]).await?;
591     assert_eq!(n, sbuf.len(), "data length should match");
592     bytes_read += n;
593 
594     assert_eq!(dc1.bytes_received(), bytes_read);
595     assert_eq!(dc1.messages_received(), 2);
596 
597     let n = dc1.read(&mut rbuf[..]).await?;
598     assert_eq!(n, 1, "data length should match");
599     bytes_read += n;
600 
601     assert_eq!(dc1.bytes_received(), bytes_read);
602     assert_eq!(dc1.messages_received(), 3);
603 
604     let n = dc1.read(&mut rbuf[..]).await?;
605     assert_eq!(n, 0, "data length should match");
606     bytes_read += n;
607 
608     assert_eq!(dc1.bytes_received(), bytes_read);
609     assert_eq!(dc1.messages_received(), 4);
610 
611     dc0.close().await?;
612     dc1.close().await?;
613     bridge_process_at_least_one(&br).await;
614 
615     close_association_pair(&br, a0, a1).await;
616 
617     Ok(())
618 }
619 
620 #[tokio::test]
test_poll_data_channel() -> Result<()>621 async fn test_poll_data_channel() -> Result<()> {
622     let mut sbuf = vec![0u8; 1000];
623     let mut rbuf = vec![0u8; 1500];
624 
625     let (br, ca, cb) = Bridge::new(0, None, None);
626 
627     let (a0, a1) = create_new_association_pair(&br, Arc::new(ca), Arc::new(cb)).await?;
628 
629     let cfg = Config {
630         channel_type: ChannelType::Reliable,
631         reliability_parameter: 123,
632         label: "data".to_string(),
633         ..Default::default()
634     };
635 
636     let dc0 = Arc::new(DataChannel::dial(&a0, 100, cfg.clone()).await?);
637     bridge_process_at_least_one(&br).await;
638 
639     let existing_data_channels: Vec<DataChannel> = Vec::new();
640     let dc1 = Arc::new(DataChannel::accept(&a1, Config::default(), &existing_data_channels).await?);
641     bridge_process_at_least_one(&br).await;
642 
643     let mut poll_dc0 = PollDataChannel::new(dc0);
644     let mut poll_dc1 = PollDataChannel::new(dc1);
645 
646     sbuf[0..4].copy_from_slice(&1u32.to_be_bytes());
647     let n = poll_dc0
648         .write(&Bytes::from(sbuf.clone()))
649         .await
650         .map_err(|e| Error::new(e.to_string()))?;
651     assert_eq!(sbuf.len(), n, "data length should match");
652 
653     bridge_process_at_least_one(&br).await;
654 
655     let n = poll_dc1
656         .read(&mut rbuf[..])
657         .await
658         .map_err(|e| Error::new(e.to_string()))?;
659     assert_eq!(sbuf.len(), n, "data length should match");
660     assert_eq!(
661         1,
662         u32::from_be_bytes([rbuf[0], rbuf[1], rbuf[2], rbuf[3]]),
663         "data should match"
664     );
665 
666     poll_dc0.into_inner().close().await?;
667     poll_dc1.into_inner().close().await?;
668     bridge_process_at_least_one(&br).await;
669 
670     close_association_pair(&br, a0, a1).await;
671 
672     Ok(())
673 }
674