xref: /webrtc/sctp/src/queue/pending_queue.rs (revision 6bc4a8f1)
1*6bc4a8f1SAnton use tokio::sync::{Mutex, Semaphore};
2*6bc4a8f1SAnton use util::sync::RwLock;
3*6bc4a8f1SAnton 
40acb5a49SAnton Kaliaev use std::{
50acb5a49SAnton Kaliaev     collections::VecDeque,
60acb5a49SAnton Kaliaev     sync::atomic::{AtomicBool, AtomicUsize, Ordering},
70acb5a49SAnton Kaliaev };
80acb5a49SAnton Kaliaev 
9ffe74184SMartin Algesten use crate::chunk::chunk_payload_data::ChunkPayloadData;
10ffe74184SMartin Algesten 
11*6bc4a8f1SAnton // TODO: benchmark performance between multiple Atomic+Mutex vs one Mutex<PendingQueueInternal>
12*6bc4a8f1SAnton 
13*6bc4a8f1SAnton // Some tests push a lot of data before starting to process any data...
14*6bc4a8f1SAnton #[cfg(test)]
15*6bc4a8f1SAnton const QUEUE_BYTES_LIMIT: usize = 128 * 1024 * 1024;
16*6bc4a8f1SAnton /// Maximum size of the pending queue, in bytes.
17*6bc4a8f1SAnton #[cfg(not(test))]
18*6bc4a8f1SAnton const QUEUE_BYTES_LIMIT: usize = 128 * 1024;
19*6bc4a8f1SAnton /// Total user data size, beyound which the packet will be split into chunks. The chunks will be
20*6bc4a8f1SAnton /// added to the pending queue one by one.
21*6bc4a8f1SAnton const QUEUE_APPEND_LARGE: usize = (QUEUE_BYTES_LIMIT * 2) / 3;
22*6bc4a8f1SAnton 
230acb5a49SAnton Kaliaev /// Basic queue for either ordered or unordered chunks.
24ffe74184SMartin Algesten pub(crate) type PendingBaseQueue = VecDeque<ChunkPayloadData>;
25ffe74184SMartin Algesten 
260acb5a49SAnton Kaliaev /// A queue for both ordered and unordered chunks.
27daaf05d1SMoritz Borcherding #[derive(Debug)]
28ffe74184SMartin Algesten pub(crate) struct PendingQueue {
29daaf05d1SMoritz Borcherding     // These two fields limit appending bytes to the queue
30daaf05d1SMoritz Borcherding     // This two step process is necessary because
31daaf05d1SMoritz Borcherding     // A) We need backpressure which the semaphore applies by limiting the total amount of bytes via the permits
32daaf05d1SMoritz Borcherding     // B) The chunks of one fragmented message need to be put in direct sequence into the queue which the lock guarantees
33daaf05d1SMoritz Borcherding     //
34daaf05d1SMoritz Borcherding     // The semaphore is not inside the lock because the permits need to be returned without needing a lock on the semaphore
35daaf05d1SMoritz Borcherding     semaphore_lock: Mutex<()>,
36daaf05d1SMoritz Borcherding     semaphore: Semaphore,
37daaf05d1SMoritz Borcherding 
380acb5a49SAnton Kaliaev     unordered_queue: RwLock<PendingBaseQueue>,
390acb5a49SAnton Kaliaev     ordered_queue: RwLock<PendingBaseQueue>,
40ffe74184SMartin Algesten     queue_len: AtomicUsize,
41ffe74184SMartin Algesten     n_bytes: AtomicUsize,
42ffe74184SMartin Algesten     selected: AtomicBool,
43ffe74184SMartin Algesten     unordered_is_selected: AtomicBool,
44ffe74184SMartin Algesten }
45ffe74184SMartin Algesten 
46daaf05d1SMoritz Borcherding impl Default for PendingQueue {
default() -> Self47daaf05d1SMoritz Borcherding     fn default() -> Self {
48daaf05d1SMoritz Borcherding         PendingQueue::new()
49daaf05d1SMoritz Borcherding     }
50daaf05d1SMoritz Borcherding }
51daaf05d1SMoritz Borcherding 
52ffe74184SMartin Algesten impl PendingQueue {
new() -> Self53ffe74184SMartin Algesten     pub(crate) fn new() -> Self {
54daaf05d1SMoritz Borcherding         Self {
55daaf05d1SMoritz Borcherding             semaphore_lock: Mutex::default(),
56daaf05d1SMoritz Borcherding             semaphore: Semaphore::new(QUEUE_BYTES_LIMIT),
57daaf05d1SMoritz Borcherding             unordered_queue: Default::default(),
58daaf05d1SMoritz Borcherding             ordered_queue: Default::default(),
59daaf05d1SMoritz Borcherding             queue_len: Default::default(),
60daaf05d1SMoritz Borcherding             n_bytes: Default::default(),
61daaf05d1SMoritz Borcherding             selected: Default::default(),
62daaf05d1SMoritz Borcherding             unordered_is_selected: Default::default(),
63daaf05d1SMoritz Borcherding         }
64ffe74184SMartin Algesten     }
65ffe74184SMartin Algesten 
664b638fa6SAnton Kaliaev     /// Appends a chunk to the back of the pending queue.
push(&self, c: ChunkPayloadData)67daaf05d1SMoritz Borcherding     pub(crate) async fn push(&self, c: ChunkPayloadData) {
684b638fa6SAnton Kaliaev         let user_data_len = c.user_data.len();
694b638fa6SAnton Kaliaev 
70daaf05d1SMoritz Borcherding         {
71*6bc4a8f1SAnton             let _sem_lock = self.semaphore_lock.lock().await;
72daaf05d1SMoritz Borcherding             let permits = self.semaphore.acquire_many(user_data_len as u32).await;
73daaf05d1SMoritz Borcherding             // unwrap ok because we never close the semaphore unless we have dropped self
74daaf05d1SMoritz Borcherding             permits.unwrap().forget();
75daaf05d1SMoritz Borcherding 
76ffe74184SMartin Algesten             if c.unordered {
770acb5a49SAnton Kaliaev                 let mut unordered_queue = self.unordered_queue.write();
78ffe74184SMartin Algesten                 unordered_queue.push_back(c);
79ffe74184SMartin Algesten             } else {
800acb5a49SAnton Kaliaev                 let mut ordered_queue = self.ordered_queue.write();
81ffe74184SMartin Algesten                 ordered_queue.push_back(c);
82ffe74184SMartin Algesten             }
83daaf05d1SMoritz Borcherding         }
844b638fa6SAnton Kaliaev 
854b638fa6SAnton Kaliaev         self.n_bytes.fetch_add(user_data_len, Ordering::SeqCst);
86ffe74184SMartin Algesten         self.queue_len.fetch_add(1, Ordering::SeqCst);
87ffe74184SMartin Algesten     }
88ffe74184SMartin Algesten 
894b638fa6SAnton Kaliaev     /// Appends chunks to the back of the pending queue.
904b638fa6SAnton Kaliaev     ///
914b638fa6SAnton Kaliaev     /// # Panics
924b638fa6SAnton Kaliaev     ///
934b638fa6SAnton Kaliaev     /// If it's a mix of unordered and ordered chunks.
append(&self, chunks: Vec<ChunkPayloadData>)94daaf05d1SMoritz Borcherding     pub(crate) async fn append(&self, chunks: Vec<ChunkPayloadData>) {
954b638fa6SAnton Kaliaev         if chunks.is_empty() {
964b638fa6SAnton Kaliaev             return;
974b638fa6SAnton Kaliaev         }
984b638fa6SAnton Kaliaev 
994b638fa6SAnton Kaliaev         let total_user_data_len = chunks.iter().fold(0, |acc, c| acc + c.user_data.len());
1004b638fa6SAnton Kaliaev 
101daaf05d1SMoritz Borcherding         if total_user_data_len >= QUEUE_APPEND_LARGE {
102daaf05d1SMoritz Borcherding             self.append_large(chunks).await
103daaf05d1SMoritz Borcherding         } else {
104*6bc4a8f1SAnton             let _sem_lock = self.semaphore_lock.lock().await;
105daaf05d1SMoritz Borcherding             let permits = self
106daaf05d1SMoritz Borcherding                 .semaphore
107daaf05d1SMoritz Borcherding                 .acquire_many(total_user_data_len as u32)
108daaf05d1SMoritz Borcherding                 .await;
109daaf05d1SMoritz Borcherding             // unwrap ok because we never close the semaphore unless we have dropped self
110daaf05d1SMoritz Borcherding             permits.unwrap().forget();
111daaf05d1SMoritz Borcherding             self.append_unlimited(chunks, total_user_data_len);
112daaf05d1SMoritz Borcherding         }
113daaf05d1SMoritz Borcherding     }
114daaf05d1SMoritz Borcherding 
115daaf05d1SMoritz Borcherding     // If this is a very large message we append chunks one by one to allow progress while we are appending
append_large(&self, chunks: Vec<ChunkPayloadData>)116daaf05d1SMoritz Borcherding     async fn append_large(&self, chunks: Vec<ChunkPayloadData>) {
117daaf05d1SMoritz Borcherding         // lock this for the whole duration
118*6bc4a8f1SAnton         let _sem_lock = self.semaphore_lock.lock().await;
119daaf05d1SMoritz Borcherding 
120daaf05d1SMoritz Borcherding         for chunk in chunks.into_iter() {
121daaf05d1SMoritz Borcherding             let user_data_len = chunk.user_data.len();
122daaf05d1SMoritz Borcherding             let permits = self.semaphore.acquire_many(user_data_len as u32).await;
123daaf05d1SMoritz Borcherding             // unwrap ok because we never close the semaphore unless we have dropped self
124daaf05d1SMoritz Borcherding             permits.unwrap().forget();
125daaf05d1SMoritz Borcherding 
126daaf05d1SMoritz Borcherding             if chunk.unordered {
127daaf05d1SMoritz Borcherding                 let mut unordered_queue = self.unordered_queue.write();
128daaf05d1SMoritz Borcherding                 unordered_queue.push_back(chunk);
129daaf05d1SMoritz Borcherding             } else {
130daaf05d1SMoritz Borcherding                 let mut ordered_queue = self.ordered_queue.write();
131daaf05d1SMoritz Borcherding                 ordered_queue.push_back(chunk);
132daaf05d1SMoritz Borcherding             }
133daaf05d1SMoritz Borcherding             self.n_bytes.fetch_add(user_data_len, Ordering::SeqCst);
134daaf05d1SMoritz Borcherding             self.queue_len.fetch_add(1, Ordering::SeqCst);
135daaf05d1SMoritz Borcherding         }
136daaf05d1SMoritz Borcherding     }
137daaf05d1SMoritz Borcherding 
138daaf05d1SMoritz Borcherding     /// Assumes that A) enough permits have been acquired and forget from the semaphore and that the semaphore_lock is held
append_unlimited(&self, chunks: Vec<ChunkPayloadData>, total_user_data_len: usize)139daaf05d1SMoritz Borcherding     fn append_unlimited(&self, chunks: Vec<ChunkPayloadData>, total_user_data_len: usize) {
140daaf05d1SMoritz Borcherding         let chunks_len = chunks.len();
1414b638fa6SAnton Kaliaev         let unordered = chunks
1424b638fa6SAnton Kaliaev             .first()
1434b638fa6SAnton Kaliaev             .expect("chunks to not be empty because of the above check")
1444b638fa6SAnton Kaliaev             .unordered;
1454b638fa6SAnton Kaliaev         if unordered {
1460acb5a49SAnton Kaliaev             let mut unordered_queue = self.unordered_queue.write();
147daaf05d1SMoritz Borcherding             assert!(
148daaf05d1SMoritz Borcherding                 chunks.iter().all(|c| c.unordered),
149daaf05d1SMoritz Borcherding                 "expected all chunks to be unordered"
150daaf05d1SMoritz Borcherding             );
151daaf05d1SMoritz Borcherding             unordered_queue.extend(chunks);
1524b638fa6SAnton Kaliaev         } else {
1530acb5a49SAnton Kaliaev             let mut ordered_queue = self.ordered_queue.write();
154daaf05d1SMoritz Borcherding             assert!(
155daaf05d1SMoritz Borcherding                 chunks.iter().all(|c| !c.unordered),
156daaf05d1SMoritz Borcherding                 "expected all chunks to be ordered"
157daaf05d1SMoritz Borcherding             );
158daaf05d1SMoritz Borcherding             ordered_queue.extend(chunks);
1594b638fa6SAnton Kaliaev         }
1604b638fa6SAnton Kaliaev 
1614b638fa6SAnton Kaliaev         self.n_bytes
1624b638fa6SAnton Kaliaev             .fetch_add(total_user_data_len, Ordering::SeqCst);
1634b638fa6SAnton Kaliaev         self.queue_len.fetch_add(chunks_len, Ordering::SeqCst);
1644b638fa6SAnton Kaliaev     }
1654b638fa6SAnton Kaliaev 
peek(&self) -> Option<ChunkPayloadData>1660acb5a49SAnton Kaliaev     pub(crate) fn peek(&self) -> Option<ChunkPayloadData> {
167ffe74184SMartin Algesten         if self.selected.load(Ordering::SeqCst) {
168ffe74184SMartin Algesten             if self.unordered_is_selected.load(Ordering::SeqCst) {
1690acb5a49SAnton Kaliaev                 let unordered_queue = self.unordered_queue.read();
170ffe74184SMartin Algesten                 return unordered_queue.get(0).cloned();
171ffe74184SMartin Algesten             } else {
1720acb5a49SAnton Kaliaev                 let ordered_queue = self.ordered_queue.read();
173ffe74184SMartin Algesten                 return ordered_queue.get(0).cloned();
174ffe74184SMartin Algesten             }
175ffe74184SMartin Algesten         }
176ffe74184SMartin Algesten 
177ffe74184SMartin Algesten         let c = {
1780acb5a49SAnton Kaliaev             let unordered_queue = self.unordered_queue.read();
179ffe74184SMartin Algesten             unordered_queue.get(0).cloned()
180ffe74184SMartin Algesten         };
181ffe74184SMartin Algesten 
182ffe74184SMartin Algesten         if c.is_some() {
183ffe74184SMartin Algesten             return c;
184ffe74184SMartin Algesten         }
185ffe74184SMartin Algesten 
1860acb5a49SAnton Kaliaev         let ordered_queue = self.ordered_queue.read();
187ffe74184SMartin Algesten         ordered_queue.get(0).cloned()
188ffe74184SMartin Algesten     }
189ffe74184SMartin Algesten 
pop( &self, beginning_fragment: bool, unordered: bool, ) -> Option<ChunkPayloadData>1900acb5a49SAnton Kaliaev     pub(crate) fn pop(
191ffe74184SMartin Algesten         &self,
192ffe74184SMartin Algesten         beginning_fragment: bool,
193ffe74184SMartin Algesten         unordered: bool,
194ffe74184SMartin Algesten     ) -> Option<ChunkPayloadData> {
195ffe74184SMartin Algesten         let popped = if self.selected.load(Ordering::SeqCst) {
196ffe74184SMartin Algesten             let popped = if self.unordered_is_selected.load(Ordering::SeqCst) {
1970acb5a49SAnton Kaliaev                 let mut unordered_queue = self.unordered_queue.write();
198ffe74184SMartin Algesten                 unordered_queue.pop_front()
199ffe74184SMartin Algesten             } else {
2000acb5a49SAnton Kaliaev                 let mut ordered_queue = self.ordered_queue.write();
201ffe74184SMartin Algesten                 ordered_queue.pop_front()
202ffe74184SMartin Algesten             };
203ffe74184SMartin Algesten             if let Some(p) = &popped {
204ffe74184SMartin Algesten                 if p.ending_fragment {
205ffe74184SMartin Algesten                     self.selected.store(false, Ordering::SeqCst);
206ffe74184SMartin Algesten                 }
207ffe74184SMartin Algesten             }
208ffe74184SMartin Algesten             popped
209ffe74184SMartin Algesten         } else {
210ffe74184SMartin Algesten             if !beginning_fragment {
211ffe74184SMartin Algesten                 return None;
212ffe74184SMartin Algesten             }
213ffe74184SMartin Algesten             if unordered {
214ffe74184SMartin Algesten                 let popped = {
2150acb5a49SAnton Kaliaev                     let mut unordered_queue = self.unordered_queue.write();
216ffe74184SMartin Algesten                     unordered_queue.pop_front()
217ffe74184SMartin Algesten                 };
218ffe74184SMartin Algesten                 if let Some(p) = &popped {
219ffe74184SMartin Algesten                     if !p.ending_fragment {
220ffe74184SMartin Algesten                         self.selected.store(true, Ordering::SeqCst);
221ffe74184SMartin Algesten                         self.unordered_is_selected.store(true, Ordering::SeqCst);
222ffe74184SMartin Algesten                     }
223ffe74184SMartin Algesten                 }
224ffe74184SMartin Algesten                 popped
225ffe74184SMartin Algesten             } else {
226ffe74184SMartin Algesten                 let popped = {
2270acb5a49SAnton Kaliaev                     let mut ordered_queue = self.ordered_queue.write();
228ffe74184SMartin Algesten                     ordered_queue.pop_front()
229ffe74184SMartin Algesten                 };
230ffe74184SMartin Algesten                 if let Some(p) = &popped {
231ffe74184SMartin Algesten                     if !p.ending_fragment {
232ffe74184SMartin Algesten                         self.selected.store(true, Ordering::SeqCst);
233ffe74184SMartin Algesten                         self.unordered_is_selected.store(false, Ordering::SeqCst);
234ffe74184SMartin Algesten                     }
235ffe74184SMartin Algesten                 }
236ffe74184SMartin Algesten                 popped
237ffe74184SMartin Algesten             }
238ffe74184SMartin Algesten         };
239ffe74184SMartin Algesten 
240ffe74184SMartin Algesten         if let Some(p) = &popped {
241daaf05d1SMoritz Borcherding             let user_data_len = p.user_data.len();
242daaf05d1SMoritz Borcherding             self.n_bytes.fetch_sub(user_data_len, Ordering::SeqCst);
243ffe74184SMartin Algesten             self.queue_len.fetch_sub(1, Ordering::SeqCst);
244daaf05d1SMoritz Borcherding             self.semaphore.add_permits(user_data_len);
245ffe74184SMartin Algesten         }
246ffe74184SMartin Algesten 
247ffe74184SMartin Algesten         popped
248ffe74184SMartin Algesten     }
249ffe74184SMartin Algesten 
get_num_bytes(&self) -> usize250ffe74184SMartin Algesten     pub(crate) fn get_num_bytes(&self) -> usize {
251ffe74184SMartin Algesten         self.n_bytes.load(Ordering::SeqCst)
252ffe74184SMartin Algesten     }
253ffe74184SMartin Algesten 
len(&self) -> usize254ffe74184SMartin Algesten     pub(crate) fn len(&self) -> usize {
255ffe74184SMartin Algesten         self.queue_len.load(Ordering::SeqCst)
256ffe74184SMartin Algesten     }
257ffe74184SMartin Algesten 
is_empty(&self) -> bool258ffe74184SMartin Algesten     pub(crate) fn is_empty(&self) -> bool {
259ffe74184SMartin Algesten         self.len() == 0
260ffe74184SMartin Algesten     }
261ffe74184SMartin Algesten }
262