xref: /webrtc/sctp/src/queue/pending_queue.rs (revision daaf05d1)
1 use std::{
2     collections::VecDeque,
3     sync::atomic::{AtomicBool, AtomicUsize, Ordering},
4 };
5 
6 use tokio::sync::{Mutex, Semaphore};
7 use util::sync::RwLock;
8 
9 use crate::chunk::chunk_payload_data::ChunkPayloadData;
10 
11 /// Basic queue for either ordered or unordered chunks.
12 pub(crate) type PendingBaseQueue = VecDeque<ChunkPayloadData>;
13 
14 // TODO: benchmark performance between multiple Atomic+Mutex vs one Mutex<PendingQueueInternal>
15 
16 /// A queue for both ordered and unordered chunks.
17 #[derive(Debug)]
18 pub(crate) struct PendingQueue {
19     // These two fields limit appending bytes to the queue
20     // This two step process is necessary because
21     // A) We need backpressure which the semaphore applies by limiting the total amount of bytes via the permits
22     // B) The chunks of one fragmented message need to be put in direct sequence into the queue which the lock guarantees
23     //
24     // The semaphore is not inside the lock because the permits need to be returned without needing a lock on the semaphore
25     semaphore_lock: Mutex<()>,
26     semaphore: Semaphore,
27 
28     unordered_queue: RwLock<PendingBaseQueue>,
29     ordered_queue: RwLock<PendingBaseQueue>,
30     queue_len: AtomicUsize,
31     n_bytes: AtomicUsize,
32     selected: AtomicBool,
33     unordered_is_selected: AtomicBool,
34 }
35 
36 impl Default for PendingQueue {
37     fn default() -> Self {
38         PendingQueue::new()
39     }
40 }
41 
42 // Some tests push a lot of data before starting to process any data...
43 #[cfg(test)]
44 const QUEUE_BYTES_LIMIT: usize = 128 * 1024 * 1024;
45 #[cfg(not(test))]
46 const QUEUE_BYTES_LIMIT: usize = 128 * 1024;
47 
48 const QUEUE_APPEND_LARGE: usize = (QUEUE_BYTES_LIMIT * 2) / 3;
49 
50 impl PendingQueue {
51     pub(crate) fn new() -> Self {
52         Self {
53             semaphore_lock: Mutex::default(),
54             semaphore: Semaphore::new(QUEUE_BYTES_LIMIT),
55             unordered_queue: Default::default(),
56             ordered_queue: Default::default(),
57             queue_len: Default::default(),
58             n_bytes: Default::default(),
59             selected: Default::default(),
60             unordered_is_selected: Default::default(),
61         }
62     }
63 
64     /// Appends a chunk to the back of the pending queue.
65     pub(crate) async fn push(&self, c: ChunkPayloadData) {
66         let user_data_len = c.user_data.len();
67 
68         {
69             let sem_lock = self.semaphore_lock.lock().await;
70             let permits = self.semaphore.acquire_many(user_data_len as u32).await;
71             // unwrap ok because we never close the semaphore unless we have dropped self
72             permits.unwrap().forget();
73 
74             if c.unordered {
75                 let mut unordered_queue = self.unordered_queue.write();
76                 unordered_queue.push_back(c);
77             } else {
78                 let mut ordered_queue = self.ordered_queue.write();
79                 ordered_queue.push_back(c);
80             }
81             drop(sem_lock);
82         }
83 
84         self.n_bytes.fetch_add(user_data_len, Ordering::SeqCst);
85         self.queue_len.fetch_add(1, Ordering::SeqCst);
86     }
87 
88     /// Appends chunks to the back of the pending queue.
89     ///
90     /// # Panics
91     ///
92     /// If it's a mix of unordered and ordered chunks.
93     pub(crate) async fn append(&self, chunks: Vec<ChunkPayloadData>) {
94         if chunks.is_empty() {
95             return;
96         }
97 
98         let total_user_data_len = chunks.iter().fold(0, |acc, c| acc + c.user_data.len());
99 
100         if total_user_data_len >= QUEUE_APPEND_LARGE {
101             self.append_large(chunks).await
102         } else {
103             let sem_lock = self.semaphore_lock.lock().await;
104             let permits = self
105                 .semaphore
106                 .acquire_many(total_user_data_len as u32)
107                 .await;
108             // unwrap ok because we never close the semaphore unless we have dropped self
109             permits.unwrap().forget();
110             self.append_unlimited(chunks, total_user_data_len);
111             drop(sem_lock);
112         }
113     }
114 
115     // If this is a very large message we append chunks one by one to allow progress while we are appending
116     async fn append_large(&self, chunks: Vec<ChunkPayloadData>) {
117         // lock this for the whole duration
118         let sem_lock = self.semaphore_lock.lock().await;
119 
120         for chunk in chunks.into_iter() {
121             let user_data_len = chunk.user_data.len();
122             let permits = self.semaphore.acquire_many(user_data_len as u32).await;
123             // unwrap ok because we never close the semaphore unless we have dropped self
124             permits.unwrap().forget();
125 
126             if chunk.unordered {
127                 let mut unordered_queue = self.unordered_queue.write();
128                 unordered_queue.push_back(chunk);
129             } else {
130                 let mut ordered_queue = self.ordered_queue.write();
131                 ordered_queue.push_back(chunk);
132             }
133             self.n_bytes.fetch_add(user_data_len, Ordering::SeqCst);
134             self.queue_len.fetch_add(1, Ordering::SeqCst);
135         }
136 
137         drop(sem_lock);
138     }
139 
140     /// Assumes that A) enough permits have been acquired and forget from the semaphore and that the semaphore_lock is held
141     fn append_unlimited(&self, chunks: Vec<ChunkPayloadData>, total_user_data_len: usize) {
142         let chunks_len = chunks.len();
143         let unordered = chunks
144             .first()
145             .expect("chunks to not be empty because of the above check")
146             .unordered;
147         if unordered {
148             let mut unordered_queue = self.unordered_queue.write();
149             assert!(
150                 chunks.iter().all(|c| c.unordered),
151                 "expected all chunks to be unordered"
152             );
153             unordered_queue.extend(chunks);
154         } else {
155             let mut ordered_queue = self.ordered_queue.write();
156             assert!(
157                 chunks.iter().all(|c| !c.unordered),
158                 "expected all chunks to be ordered"
159             );
160             ordered_queue.extend(chunks);
161         }
162 
163         self.n_bytes
164             .fetch_add(total_user_data_len, Ordering::SeqCst);
165         self.queue_len.fetch_add(chunks_len, Ordering::SeqCst);
166     }
167 
168     pub(crate) fn peek(&self) -> Option<ChunkPayloadData> {
169         if self.selected.load(Ordering::SeqCst) {
170             if self.unordered_is_selected.load(Ordering::SeqCst) {
171                 let unordered_queue = self.unordered_queue.read();
172                 return unordered_queue.get(0).cloned();
173             } else {
174                 let ordered_queue = self.ordered_queue.read();
175                 return ordered_queue.get(0).cloned();
176             }
177         }
178 
179         let c = {
180             let unordered_queue = self.unordered_queue.read();
181             unordered_queue.get(0).cloned()
182         };
183 
184         if c.is_some() {
185             return c;
186         }
187 
188         let ordered_queue = self.ordered_queue.read();
189         ordered_queue.get(0).cloned()
190     }
191 
192     pub(crate) fn pop(
193         &self,
194         beginning_fragment: bool,
195         unordered: bool,
196     ) -> Option<ChunkPayloadData> {
197         let popped = if self.selected.load(Ordering::SeqCst) {
198             let popped = if self.unordered_is_selected.load(Ordering::SeqCst) {
199                 let mut unordered_queue = self.unordered_queue.write();
200                 unordered_queue.pop_front()
201             } else {
202                 let mut ordered_queue = self.ordered_queue.write();
203                 ordered_queue.pop_front()
204             };
205             if let Some(p) = &popped {
206                 if p.ending_fragment {
207                     self.selected.store(false, Ordering::SeqCst);
208                 }
209             }
210             popped
211         } else {
212             if !beginning_fragment {
213                 return None;
214             }
215             if unordered {
216                 let popped = {
217                     let mut unordered_queue = self.unordered_queue.write();
218                     unordered_queue.pop_front()
219                 };
220                 if let Some(p) = &popped {
221                     if !p.ending_fragment {
222                         self.selected.store(true, Ordering::SeqCst);
223                         self.unordered_is_selected.store(true, Ordering::SeqCst);
224                     }
225                 }
226                 popped
227             } else {
228                 let popped = {
229                     let mut ordered_queue = self.ordered_queue.write();
230                     ordered_queue.pop_front()
231                 };
232                 if let Some(p) = &popped {
233                     if !p.ending_fragment {
234                         self.selected.store(true, Ordering::SeqCst);
235                         self.unordered_is_selected.store(false, Ordering::SeqCst);
236                     }
237                 }
238                 popped
239             }
240         };
241 
242         if let Some(p) = &popped {
243             let user_data_len = p.user_data.len();
244             self.n_bytes.fetch_sub(user_data_len, Ordering::SeqCst);
245             self.queue_len.fetch_sub(1, Ordering::SeqCst);
246             self.semaphore.add_permits(user_data_len);
247         }
248 
249         popped
250     }
251 
252     pub(crate) fn get_num_bytes(&self) -> usize {
253         self.n_bytes.load(Ordering::SeqCst)
254     }
255 
256     pub(crate) fn len(&self) -> usize {
257         self.queue_len.load(Ordering::SeqCst)
258     }
259 
260     pub(crate) fn is_empty(&self) -> bool {
261         self.len() == 0
262     }
263 }
264