xref: /webrtc/sctp/src/queue/pending_queue.rs (revision 5b79f08a)
1 use crate::chunk::chunk_payload_data::ChunkPayloadData;
2 
3 use std::collections::VecDeque;
4 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
5 use tokio::sync::Mutex;
6 
7 /// pendingBaseQueue
8 pub(crate) type PendingBaseQueue = VecDeque<ChunkPayloadData>;
9 
10 // TODO: benchmark performance between multiple Atomic+Mutex vs one Mutex<PendingQueueInternal>
11 
12 /// pendingQueue
13 #[derive(Debug, Default)]
14 pub(crate) struct PendingQueue {
15     unordered_queue: Mutex<PendingBaseQueue>,
16     ordered_queue: Mutex<PendingBaseQueue>,
17     queue_len: AtomicUsize,
18     n_bytes: AtomicUsize,
19     selected: AtomicBool,
20     unordered_is_selected: AtomicBool,
21 }
22 
23 impl PendingQueue {
24     pub(crate) fn new() -> Self {
25         PendingQueue::default()
26     }
27 
28     /// Appends a chunk to the back of the pending queue.
29     pub(crate) async fn push(&self, c: ChunkPayloadData) {
30         let user_data_len = c.user_data.len();
31 
32         if c.unordered {
33             let mut unordered_queue = self.unordered_queue.lock().await;
34             unordered_queue.push_back(c);
35         } else {
36             let mut ordered_queue = self.ordered_queue.lock().await;
37             ordered_queue.push_back(c);
38         }
39 
40         self.n_bytes.fetch_add(user_data_len, Ordering::SeqCst);
41         self.queue_len.fetch_add(1, Ordering::SeqCst);
42     }
43 
44     /// Appends chunks to the back of the pending queue.
45     ///
46     /// # Panics
47     ///
48     /// If it's a mix of unordered and ordered chunks.
49     pub(crate) async fn append(&self, chunks: Vec<ChunkPayloadData>) {
50         if chunks.is_empty() {
51             return;
52         }
53 
54         let total_user_data_len = chunks.iter().fold(0, |acc, c| acc + c.user_data.len());
55         let chunks_len = chunks.len();
56 
57         let unordered = chunks
58             .first()
59             .expect("chunks to not be empty because of the above check")
60             .unordered;
61         if unordered {
62             let mut unordered_queue = self.unordered_queue.lock().await;
63             for c in chunks {
64                 assert!(c.unordered, "expected all chunks to be unordered");
65                 unordered_queue.push_back(c);
66             }
67         } else {
68             let mut ordered_queue = self.ordered_queue.lock().await;
69             for c in chunks {
70                 assert!(!c.unordered, "expected all chunks to be ordered");
71                 ordered_queue.push_back(c);
72             }
73         }
74 
75         self.n_bytes
76             .fetch_add(total_user_data_len, Ordering::SeqCst);
77         self.queue_len.fetch_add(chunks_len, Ordering::SeqCst);
78     }
79 
80     pub(crate) async fn peek(&self) -> Option<ChunkPayloadData> {
81         if self.selected.load(Ordering::SeqCst) {
82             if self.unordered_is_selected.load(Ordering::SeqCst) {
83                 let unordered_queue = self.unordered_queue.lock().await;
84                 return unordered_queue.get(0).cloned();
85             } else {
86                 let ordered_queue = self.ordered_queue.lock().await;
87                 return ordered_queue.get(0).cloned();
88             }
89         }
90 
91         let c = {
92             let unordered_queue = self.unordered_queue.lock().await;
93             unordered_queue.get(0).cloned()
94         };
95 
96         if c.is_some() {
97             return c;
98         }
99 
100         let ordered_queue = self.ordered_queue.lock().await;
101         ordered_queue.get(0).cloned()
102     }
103 
104     pub(crate) async fn pop(
105         &self,
106         beginning_fragment: bool,
107         unordered: bool,
108     ) -> Option<ChunkPayloadData> {
109         let popped = if self.selected.load(Ordering::SeqCst) {
110             let popped = if self.unordered_is_selected.load(Ordering::SeqCst) {
111                 let mut unordered_queue = self.unordered_queue.lock().await;
112                 unordered_queue.pop_front()
113             } else {
114                 let mut ordered_queue = self.ordered_queue.lock().await;
115                 ordered_queue.pop_front()
116             };
117             if let Some(p) = &popped {
118                 if p.ending_fragment {
119                     self.selected.store(false, Ordering::SeqCst);
120                 }
121             }
122             popped
123         } else {
124             if !beginning_fragment {
125                 return None;
126             }
127             if unordered {
128                 let popped = {
129                     let mut unordered_queue = self.unordered_queue.lock().await;
130                     unordered_queue.pop_front()
131                 };
132                 if let Some(p) = &popped {
133                     if !p.ending_fragment {
134                         self.selected.store(true, Ordering::SeqCst);
135                         self.unordered_is_selected.store(true, Ordering::SeqCst);
136                     }
137                 }
138                 popped
139             } else {
140                 let popped = {
141                     let mut ordered_queue = self.ordered_queue.lock().await;
142                     ordered_queue.pop_front()
143                 };
144                 if let Some(p) = &popped {
145                     if !p.ending_fragment {
146                         self.selected.store(true, Ordering::SeqCst);
147                         self.unordered_is_selected.store(false, Ordering::SeqCst);
148                     }
149                 }
150                 popped
151             }
152         };
153 
154         if let Some(p) = &popped {
155             self.n_bytes.fetch_sub(p.user_data.len(), Ordering::SeqCst);
156             self.queue_len.fetch_sub(1, Ordering::SeqCst);
157         }
158 
159         popped
160     }
161 
162     pub(crate) fn get_num_bytes(&self) -> usize {
163         self.n_bytes.load(Ordering::SeqCst)
164     }
165 
166     pub(crate) fn len(&self) -> usize {
167         self.queue_len.load(Ordering::SeqCst)
168     }
169 
170     pub(crate) fn is_empty(&self) -> bool {
171         self.len() == 0
172     }
173 }
174