xref: /webrtc/sctp/src/queue/pending_queue.rs (revision 529e41be)
1 use tokio::sync::{Mutex, Semaphore};
2 use util::sync::RwLock;
3 
4 use std::{
5     collections::VecDeque,
6     sync::atomic::{AtomicBool, AtomicUsize, Ordering},
7 };
8 
9 use crate::chunk::chunk_payload_data::ChunkPayloadData;
10 
11 // TODO: benchmark performance between multiple Atomic+Mutex vs one Mutex<PendingQueueInternal>
12 
13 // Some tests push a lot of data before starting to process any data...
14 #[cfg(test)]
15 const QUEUE_BYTES_LIMIT: usize = 128 * 1024 * 1024;
16 /// Maximum size of the pending queue, in bytes.
17 #[cfg(not(test))]
18 const QUEUE_BYTES_LIMIT: usize = 128 * 1024;
19 /// Total user data size, beyound which the packet will be split into chunks. The chunks will be
20 /// added to the pending queue one by one.
21 const QUEUE_APPEND_LARGE: usize = (QUEUE_BYTES_LIMIT * 2) / 3;
22 
23 /// Basic queue for either ordered or unordered chunks.
24 pub(crate) type PendingBaseQueue = VecDeque<ChunkPayloadData>;
25 
26 /// A queue for both ordered and unordered chunks.
27 #[derive(Debug)]
28 pub(crate) struct PendingQueue {
29     // These two fields limit appending bytes to the queue
30     // This two step process is necessary because
31     // A) We need backpressure which the semaphore applies by limiting the total amount of bytes via the permits
32     // B) The chunks of one fragmented message need to be put in direct sequence into the queue which the lock guarantees
33     //
34     // The semaphore is not inside the lock because the permits need to be returned without needing a lock on the semaphore
35     semaphore_lock: Mutex<()>,
36     semaphore: Semaphore,
37 
38     unordered_queue: RwLock<PendingBaseQueue>,
39     ordered_queue: RwLock<PendingBaseQueue>,
40     queue_len: AtomicUsize,
41     n_bytes: AtomicUsize,
42     selected: AtomicBool,
43     unordered_is_selected: AtomicBool,
44 }
45 
46 impl Default for PendingQueue {
47     fn default() -> Self {
48         PendingQueue::new()
49     }
50 }
51 
52 impl PendingQueue {
53     pub(crate) fn new() -> Self {
54         Self {
55             semaphore_lock: Mutex::default(),
56             semaphore: Semaphore::new(QUEUE_BYTES_LIMIT),
57             unordered_queue: Default::default(),
58             ordered_queue: Default::default(),
59             queue_len: Default::default(),
60             n_bytes: Default::default(),
61             selected: Default::default(),
62             unordered_is_selected: Default::default(),
63         }
64     }
65 
66     /// Appends a chunk to the back of the pending queue.
67     pub(crate) async fn push(&self, c: ChunkPayloadData) {
68         let user_data_len = c.user_data.len();
69 
70         {
71             let _sem_lock = self.semaphore_lock.lock().await;
72             let permits = self.semaphore.acquire_many(user_data_len as u32).await;
73             // unwrap ok because we never close the semaphore unless we have dropped self
74             permits.unwrap().forget();
75 
76             if c.unordered {
77                 let mut unordered_queue = self.unordered_queue.write();
78                 unordered_queue.push_back(c);
79             } else {
80                 let mut ordered_queue = self.ordered_queue.write();
81                 ordered_queue.push_back(c);
82             }
83         }
84 
85         self.n_bytes.fetch_add(user_data_len, Ordering::SeqCst);
86         self.queue_len.fetch_add(1, Ordering::SeqCst);
87     }
88 
89     /// Appends chunks to the back of the pending queue.
90     ///
91     /// # Panics
92     ///
93     /// If it's a mix of unordered and ordered chunks.
94     pub(crate) async fn append(&self, chunks: Vec<ChunkPayloadData>) {
95         if chunks.is_empty() {
96             return;
97         }
98 
99         let total_user_data_len = chunks.iter().fold(0, |acc, c| acc + c.user_data.len());
100 
101         if total_user_data_len >= QUEUE_APPEND_LARGE {
102             self.append_large(chunks).await
103         } else {
104             let _sem_lock = self.semaphore_lock.lock().await;
105             let permits = self
106                 .semaphore
107                 .acquire_many(total_user_data_len as u32)
108                 .await;
109             // unwrap ok because we never close the semaphore unless we have dropped self
110             permits.unwrap().forget();
111             self.append_unlimited(chunks, total_user_data_len);
112         }
113     }
114 
115     // If this is a very large message we append chunks one by one to allow progress while we are appending
116     async fn append_large(&self, chunks: Vec<ChunkPayloadData>) {
117         // lock this for the whole duration
118         let _sem_lock = self.semaphore_lock.lock().await;
119 
120         for chunk in chunks.into_iter() {
121             let user_data_len = chunk.user_data.len();
122             let permits = self.semaphore.acquire_many(user_data_len as u32).await;
123             // unwrap ok because we never close the semaphore unless we have dropped self
124             permits.unwrap().forget();
125 
126             if chunk.unordered {
127                 let mut unordered_queue = self.unordered_queue.write();
128                 unordered_queue.push_back(chunk);
129             } else {
130                 let mut ordered_queue = self.ordered_queue.write();
131                 ordered_queue.push_back(chunk);
132             }
133             self.n_bytes.fetch_add(user_data_len, Ordering::SeqCst);
134             self.queue_len.fetch_add(1, Ordering::SeqCst);
135         }
136     }
137 
138     /// Assumes that A) enough permits have been acquired and forget from the semaphore and that the semaphore_lock is held
139     fn append_unlimited(&self, chunks: Vec<ChunkPayloadData>, total_user_data_len: usize) {
140         let chunks_len = chunks.len();
141         let unordered = chunks
142             .first()
143             .expect("chunks to not be empty because of the above check")
144             .unordered;
145         if unordered {
146             let mut unordered_queue = self.unordered_queue.write();
147             assert!(
148                 chunks.iter().all(|c| c.unordered),
149                 "expected all chunks to be unordered"
150             );
151             unordered_queue.extend(chunks);
152         } else {
153             let mut ordered_queue = self.ordered_queue.write();
154             assert!(
155                 chunks.iter().all(|c| !c.unordered),
156                 "expected all chunks to be ordered"
157             );
158             ordered_queue.extend(chunks);
159         }
160 
161         self.n_bytes
162             .fetch_add(total_user_data_len, Ordering::SeqCst);
163         self.queue_len.fetch_add(chunks_len, Ordering::SeqCst);
164     }
165 
166     pub(crate) fn peek(&self) -> Option<ChunkPayloadData> {
167         if self.selected.load(Ordering::SeqCst) {
168             if self.unordered_is_selected.load(Ordering::SeqCst) {
169                 let unordered_queue = self.unordered_queue.read();
170                 return unordered_queue.get(0).cloned();
171             } else {
172                 let ordered_queue = self.ordered_queue.read();
173                 return ordered_queue.get(0).cloned();
174             }
175         }
176 
177         let c = {
178             let unordered_queue = self.unordered_queue.read();
179             unordered_queue.get(0).cloned()
180         };
181 
182         if c.is_some() {
183             return c;
184         }
185 
186         let ordered_queue = self.ordered_queue.read();
187         ordered_queue.get(0).cloned()
188     }
189 
190     pub(crate) fn pop(
191         &self,
192         beginning_fragment: bool,
193         unordered: bool,
194     ) -> Option<ChunkPayloadData> {
195         let popped = if self.selected.load(Ordering::SeqCst) {
196             let popped = if self.unordered_is_selected.load(Ordering::SeqCst) {
197                 let mut unordered_queue = self.unordered_queue.write();
198                 unordered_queue.pop_front()
199             } else {
200                 let mut ordered_queue = self.ordered_queue.write();
201                 ordered_queue.pop_front()
202             };
203             if let Some(p) = &popped {
204                 if p.ending_fragment {
205                     self.selected.store(false, Ordering::SeqCst);
206                 }
207             }
208             popped
209         } else {
210             if !beginning_fragment {
211                 return None;
212             }
213             if unordered {
214                 let popped = {
215                     let mut unordered_queue = self.unordered_queue.write();
216                     unordered_queue.pop_front()
217                 };
218                 if let Some(p) = &popped {
219                     if !p.ending_fragment {
220                         self.selected.store(true, Ordering::SeqCst);
221                         self.unordered_is_selected.store(true, Ordering::SeqCst);
222                     }
223                 }
224                 popped
225             } else {
226                 let popped = {
227                     let mut ordered_queue = self.ordered_queue.write();
228                     ordered_queue.pop_front()
229                 };
230                 if let Some(p) = &popped {
231                     if !p.ending_fragment {
232                         self.selected.store(true, Ordering::SeqCst);
233                         self.unordered_is_selected.store(false, Ordering::SeqCst);
234                     }
235                 }
236                 popped
237             }
238         };
239 
240         if let Some(p) = &popped {
241             let user_data_len = p.user_data.len();
242             self.n_bytes.fetch_sub(user_data_len, Ordering::SeqCst);
243             self.queue_len.fetch_sub(1, Ordering::SeqCst);
244             self.semaphore.add_permits(user_data_len);
245         }
246 
247         popped
248     }
249 
250     pub(crate) fn get_num_bytes(&self) -> usize {
251         self.n_bytes.load(Ordering::SeqCst)
252     }
253 
254     pub(crate) fn len(&self) -> usize {
255         self.queue_len.load(Ordering::SeqCst)
256     }
257 
258     pub(crate) fn is_empty(&self) -> bool {
259         self.len() == 0
260     }
261 }
262