1*6bc4a8f1SAnton use tokio::sync::{Mutex, Semaphore}; 2*6bc4a8f1SAnton use util::sync::RwLock; 3*6bc4a8f1SAnton 40acb5a49SAnton Kaliaev use std::{ 50acb5a49SAnton Kaliaev collections::VecDeque, 60acb5a49SAnton Kaliaev sync::atomic::{AtomicBool, AtomicUsize, Ordering}, 70acb5a49SAnton Kaliaev }; 80acb5a49SAnton Kaliaev 9ffe74184SMartin Algesten use crate::chunk::chunk_payload_data::ChunkPayloadData; 10ffe74184SMartin Algesten 11*6bc4a8f1SAnton // TODO: benchmark performance between multiple Atomic+Mutex vs one Mutex<PendingQueueInternal> 12*6bc4a8f1SAnton 13*6bc4a8f1SAnton // Some tests push a lot of data before starting to process any data... 14*6bc4a8f1SAnton #[cfg(test)] 15*6bc4a8f1SAnton const QUEUE_BYTES_LIMIT: usize = 128 * 1024 * 1024; 16*6bc4a8f1SAnton /// Maximum size of the pending queue, in bytes. 17*6bc4a8f1SAnton #[cfg(not(test))] 18*6bc4a8f1SAnton const QUEUE_BYTES_LIMIT: usize = 128 * 1024; 19*6bc4a8f1SAnton /// Total user data size, beyound which the packet will be split into chunks. The chunks will be 20*6bc4a8f1SAnton /// added to the pending queue one by one. 21*6bc4a8f1SAnton const QUEUE_APPEND_LARGE: usize = (QUEUE_BYTES_LIMIT * 2) / 3; 22*6bc4a8f1SAnton 230acb5a49SAnton Kaliaev /// Basic queue for either ordered or unordered chunks. 24ffe74184SMartin Algesten pub(crate) type PendingBaseQueue = VecDeque<ChunkPayloadData>; 25ffe74184SMartin Algesten 260acb5a49SAnton Kaliaev /// A queue for both ordered and unordered chunks. 27daaf05d1SMoritz Borcherding #[derive(Debug)] 28ffe74184SMartin Algesten pub(crate) struct PendingQueue { 29daaf05d1SMoritz Borcherding // These two fields limit appending bytes to the queue 30daaf05d1SMoritz Borcherding // This two step process is necessary because 31daaf05d1SMoritz Borcherding // A) We need backpressure which the semaphore applies by limiting the total amount of bytes via the permits 32daaf05d1SMoritz Borcherding // B) The chunks of one fragmented message need to be put in direct sequence into the queue which the lock guarantees 33daaf05d1SMoritz Borcherding // 34daaf05d1SMoritz Borcherding // The semaphore is not inside the lock because the permits need to be returned without needing a lock on the semaphore 35daaf05d1SMoritz Borcherding semaphore_lock: Mutex<()>, 36daaf05d1SMoritz Borcherding semaphore: Semaphore, 37daaf05d1SMoritz Borcherding 380acb5a49SAnton Kaliaev unordered_queue: RwLock<PendingBaseQueue>, 390acb5a49SAnton Kaliaev ordered_queue: RwLock<PendingBaseQueue>, 40ffe74184SMartin Algesten queue_len: AtomicUsize, 41ffe74184SMartin Algesten n_bytes: AtomicUsize, 42ffe74184SMartin Algesten selected: AtomicBool, 43ffe74184SMartin Algesten unordered_is_selected: AtomicBool, 44ffe74184SMartin Algesten } 45ffe74184SMartin Algesten 46daaf05d1SMoritz Borcherding impl Default for PendingQueue { default() -> Self47daaf05d1SMoritz Borcherding fn default() -> Self { 48daaf05d1SMoritz Borcherding PendingQueue::new() 49daaf05d1SMoritz Borcherding } 50daaf05d1SMoritz Borcherding } 51daaf05d1SMoritz Borcherding 52ffe74184SMartin Algesten impl PendingQueue { new() -> Self53ffe74184SMartin Algesten pub(crate) fn new() -> Self { 54daaf05d1SMoritz Borcherding Self { 55daaf05d1SMoritz Borcherding semaphore_lock: Mutex::default(), 56daaf05d1SMoritz Borcherding semaphore: Semaphore::new(QUEUE_BYTES_LIMIT), 57daaf05d1SMoritz Borcherding unordered_queue: Default::default(), 58daaf05d1SMoritz Borcherding ordered_queue: Default::default(), 59daaf05d1SMoritz Borcherding queue_len: Default::default(), 60daaf05d1SMoritz Borcherding n_bytes: Default::default(), 61daaf05d1SMoritz Borcherding selected: Default::default(), 62daaf05d1SMoritz Borcherding unordered_is_selected: Default::default(), 63daaf05d1SMoritz Borcherding } 64ffe74184SMartin Algesten } 65ffe74184SMartin Algesten 664b638fa6SAnton Kaliaev /// Appends a chunk to the back of the pending queue. push(&self, c: ChunkPayloadData)67daaf05d1SMoritz Borcherding pub(crate) async fn push(&self, c: ChunkPayloadData) { 684b638fa6SAnton Kaliaev let user_data_len = c.user_data.len(); 694b638fa6SAnton Kaliaev 70daaf05d1SMoritz Borcherding { 71*6bc4a8f1SAnton let _sem_lock = self.semaphore_lock.lock().await; 72daaf05d1SMoritz Borcherding let permits = self.semaphore.acquire_many(user_data_len as u32).await; 73daaf05d1SMoritz Borcherding // unwrap ok because we never close the semaphore unless we have dropped self 74daaf05d1SMoritz Borcherding permits.unwrap().forget(); 75daaf05d1SMoritz Borcherding 76ffe74184SMartin Algesten if c.unordered { 770acb5a49SAnton Kaliaev let mut unordered_queue = self.unordered_queue.write(); 78ffe74184SMartin Algesten unordered_queue.push_back(c); 79ffe74184SMartin Algesten } else { 800acb5a49SAnton Kaliaev let mut ordered_queue = self.ordered_queue.write(); 81ffe74184SMartin Algesten ordered_queue.push_back(c); 82ffe74184SMartin Algesten } 83daaf05d1SMoritz Borcherding } 844b638fa6SAnton Kaliaev 854b638fa6SAnton Kaliaev self.n_bytes.fetch_add(user_data_len, Ordering::SeqCst); 86ffe74184SMartin Algesten self.queue_len.fetch_add(1, Ordering::SeqCst); 87ffe74184SMartin Algesten } 88ffe74184SMartin Algesten 894b638fa6SAnton Kaliaev /// Appends chunks to the back of the pending queue. 904b638fa6SAnton Kaliaev /// 914b638fa6SAnton Kaliaev /// # Panics 924b638fa6SAnton Kaliaev /// 934b638fa6SAnton Kaliaev /// If it's a mix of unordered and ordered chunks. append(&self, chunks: Vec<ChunkPayloadData>)94daaf05d1SMoritz Borcherding pub(crate) async fn append(&self, chunks: Vec<ChunkPayloadData>) { 954b638fa6SAnton Kaliaev if chunks.is_empty() { 964b638fa6SAnton Kaliaev return; 974b638fa6SAnton Kaliaev } 984b638fa6SAnton Kaliaev 994b638fa6SAnton Kaliaev let total_user_data_len = chunks.iter().fold(0, |acc, c| acc + c.user_data.len()); 1004b638fa6SAnton Kaliaev 101daaf05d1SMoritz Borcherding if total_user_data_len >= QUEUE_APPEND_LARGE { 102daaf05d1SMoritz Borcherding self.append_large(chunks).await 103daaf05d1SMoritz Borcherding } else { 104*6bc4a8f1SAnton let _sem_lock = self.semaphore_lock.lock().await; 105daaf05d1SMoritz Borcherding let permits = self 106daaf05d1SMoritz Borcherding .semaphore 107daaf05d1SMoritz Borcherding .acquire_many(total_user_data_len as u32) 108daaf05d1SMoritz Borcherding .await; 109daaf05d1SMoritz Borcherding // unwrap ok because we never close the semaphore unless we have dropped self 110daaf05d1SMoritz Borcherding permits.unwrap().forget(); 111daaf05d1SMoritz Borcherding self.append_unlimited(chunks, total_user_data_len); 112daaf05d1SMoritz Borcherding } 113daaf05d1SMoritz Borcherding } 114daaf05d1SMoritz Borcherding 115daaf05d1SMoritz Borcherding // If this is a very large message we append chunks one by one to allow progress while we are appending append_large(&self, chunks: Vec<ChunkPayloadData>)116daaf05d1SMoritz Borcherding async fn append_large(&self, chunks: Vec<ChunkPayloadData>) { 117daaf05d1SMoritz Borcherding // lock this for the whole duration 118*6bc4a8f1SAnton let _sem_lock = self.semaphore_lock.lock().await; 119daaf05d1SMoritz Borcherding 120daaf05d1SMoritz Borcherding for chunk in chunks.into_iter() { 121daaf05d1SMoritz Borcherding let user_data_len = chunk.user_data.len(); 122daaf05d1SMoritz Borcherding let permits = self.semaphore.acquire_many(user_data_len as u32).await; 123daaf05d1SMoritz Borcherding // unwrap ok because we never close the semaphore unless we have dropped self 124daaf05d1SMoritz Borcherding permits.unwrap().forget(); 125daaf05d1SMoritz Borcherding 126daaf05d1SMoritz Borcherding if chunk.unordered { 127daaf05d1SMoritz Borcherding let mut unordered_queue = self.unordered_queue.write(); 128daaf05d1SMoritz Borcherding unordered_queue.push_back(chunk); 129daaf05d1SMoritz Borcherding } else { 130daaf05d1SMoritz Borcherding let mut ordered_queue = self.ordered_queue.write(); 131daaf05d1SMoritz Borcherding ordered_queue.push_back(chunk); 132daaf05d1SMoritz Borcherding } 133daaf05d1SMoritz Borcherding self.n_bytes.fetch_add(user_data_len, Ordering::SeqCst); 134daaf05d1SMoritz Borcherding self.queue_len.fetch_add(1, Ordering::SeqCst); 135daaf05d1SMoritz Borcherding } 136daaf05d1SMoritz Borcherding } 137daaf05d1SMoritz Borcherding 138daaf05d1SMoritz Borcherding /// Assumes that A) enough permits have been acquired and forget from the semaphore and that the semaphore_lock is held append_unlimited(&self, chunks: Vec<ChunkPayloadData>, total_user_data_len: usize)139daaf05d1SMoritz Borcherding fn append_unlimited(&self, chunks: Vec<ChunkPayloadData>, total_user_data_len: usize) { 140daaf05d1SMoritz Borcherding let chunks_len = chunks.len(); 1414b638fa6SAnton Kaliaev let unordered = chunks 1424b638fa6SAnton Kaliaev .first() 1434b638fa6SAnton Kaliaev .expect("chunks to not be empty because of the above check") 1444b638fa6SAnton Kaliaev .unordered; 1454b638fa6SAnton Kaliaev if unordered { 1460acb5a49SAnton Kaliaev let mut unordered_queue = self.unordered_queue.write(); 147daaf05d1SMoritz Borcherding assert!( 148daaf05d1SMoritz Borcherding chunks.iter().all(|c| c.unordered), 149daaf05d1SMoritz Borcherding "expected all chunks to be unordered" 150daaf05d1SMoritz Borcherding ); 151daaf05d1SMoritz Borcherding unordered_queue.extend(chunks); 1524b638fa6SAnton Kaliaev } else { 1530acb5a49SAnton Kaliaev let mut ordered_queue = self.ordered_queue.write(); 154daaf05d1SMoritz Borcherding assert!( 155daaf05d1SMoritz Borcherding chunks.iter().all(|c| !c.unordered), 156daaf05d1SMoritz Borcherding "expected all chunks to be ordered" 157daaf05d1SMoritz Borcherding ); 158daaf05d1SMoritz Borcherding ordered_queue.extend(chunks); 1594b638fa6SAnton Kaliaev } 1604b638fa6SAnton Kaliaev 1614b638fa6SAnton Kaliaev self.n_bytes 1624b638fa6SAnton Kaliaev .fetch_add(total_user_data_len, Ordering::SeqCst); 1634b638fa6SAnton Kaliaev self.queue_len.fetch_add(chunks_len, Ordering::SeqCst); 1644b638fa6SAnton Kaliaev } 1654b638fa6SAnton Kaliaev peek(&self) -> Option<ChunkPayloadData>1660acb5a49SAnton Kaliaev pub(crate) fn peek(&self) -> Option<ChunkPayloadData> { 167ffe74184SMartin Algesten if self.selected.load(Ordering::SeqCst) { 168ffe74184SMartin Algesten if self.unordered_is_selected.load(Ordering::SeqCst) { 1690acb5a49SAnton Kaliaev let unordered_queue = self.unordered_queue.read(); 170ffe74184SMartin Algesten return unordered_queue.get(0).cloned(); 171ffe74184SMartin Algesten } else { 1720acb5a49SAnton Kaliaev let ordered_queue = self.ordered_queue.read(); 173ffe74184SMartin Algesten return ordered_queue.get(0).cloned(); 174ffe74184SMartin Algesten } 175ffe74184SMartin Algesten } 176ffe74184SMartin Algesten 177ffe74184SMartin Algesten let c = { 1780acb5a49SAnton Kaliaev let unordered_queue = self.unordered_queue.read(); 179ffe74184SMartin Algesten unordered_queue.get(0).cloned() 180ffe74184SMartin Algesten }; 181ffe74184SMartin Algesten 182ffe74184SMartin Algesten if c.is_some() { 183ffe74184SMartin Algesten return c; 184ffe74184SMartin Algesten } 185ffe74184SMartin Algesten 1860acb5a49SAnton Kaliaev let ordered_queue = self.ordered_queue.read(); 187ffe74184SMartin Algesten ordered_queue.get(0).cloned() 188ffe74184SMartin Algesten } 189ffe74184SMartin Algesten pop( &self, beginning_fragment: bool, unordered: bool, ) -> Option<ChunkPayloadData>1900acb5a49SAnton Kaliaev pub(crate) fn pop( 191ffe74184SMartin Algesten &self, 192ffe74184SMartin Algesten beginning_fragment: bool, 193ffe74184SMartin Algesten unordered: bool, 194ffe74184SMartin Algesten ) -> Option<ChunkPayloadData> { 195ffe74184SMartin Algesten let popped = if self.selected.load(Ordering::SeqCst) { 196ffe74184SMartin Algesten let popped = if self.unordered_is_selected.load(Ordering::SeqCst) { 1970acb5a49SAnton Kaliaev let mut unordered_queue = self.unordered_queue.write(); 198ffe74184SMartin Algesten unordered_queue.pop_front() 199ffe74184SMartin Algesten } else { 2000acb5a49SAnton Kaliaev let mut ordered_queue = self.ordered_queue.write(); 201ffe74184SMartin Algesten ordered_queue.pop_front() 202ffe74184SMartin Algesten }; 203ffe74184SMartin Algesten if let Some(p) = &popped { 204ffe74184SMartin Algesten if p.ending_fragment { 205ffe74184SMartin Algesten self.selected.store(false, Ordering::SeqCst); 206ffe74184SMartin Algesten } 207ffe74184SMartin Algesten } 208ffe74184SMartin Algesten popped 209ffe74184SMartin Algesten } else { 210ffe74184SMartin Algesten if !beginning_fragment { 211ffe74184SMartin Algesten return None; 212ffe74184SMartin Algesten } 213ffe74184SMartin Algesten if unordered { 214ffe74184SMartin Algesten let popped = { 2150acb5a49SAnton Kaliaev let mut unordered_queue = self.unordered_queue.write(); 216ffe74184SMartin Algesten unordered_queue.pop_front() 217ffe74184SMartin Algesten }; 218ffe74184SMartin Algesten if let Some(p) = &popped { 219ffe74184SMartin Algesten if !p.ending_fragment { 220ffe74184SMartin Algesten self.selected.store(true, Ordering::SeqCst); 221ffe74184SMartin Algesten self.unordered_is_selected.store(true, Ordering::SeqCst); 222ffe74184SMartin Algesten } 223ffe74184SMartin Algesten } 224ffe74184SMartin Algesten popped 225ffe74184SMartin Algesten } else { 226ffe74184SMartin Algesten let popped = { 2270acb5a49SAnton Kaliaev let mut ordered_queue = self.ordered_queue.write(); 228ffe74184SMartin Algesten ordered_queue.pop_front() 229ffe74184SMartin Algesten }; 230ffe74184SMartin Algesten if let Some(p) = &popped { 231ffe74184SMartin Algesten if !p.ending_fragment { 232ffe74184SMartin Algesten self.selected.store(true, Ordering::SeqCst); 233ffe74184SMartin Algesten self.unordered_is_selected.store(false, Ordering::SeqCst); 234ffe74184SMartin Algesten } 235ffe74184SMartin Algesten } 236ffe74184SMartin Algesten popped 237ffe74184SMartin Algesten } 238ffe74184SMartin Algesten }; 239ffe74184SMartin Algesten 240ffe74184SMartin Algesten if let Some(p) = &popped { 241daaf05d1SMoritz Borcherding let user_data_len = p.user_data.len(); 242daaf05d1SMoritz Borcherding self.n_bytes.fetch_sub(user_data_len, Ordering::SeqCst); 243ffe74184SMartin Algesten self.queue_len.fetch_sub(1, Ordering::SeqCst); 244daaf05d1SMoritz Borcherding self.semaphore.add_permits(user_data_len); 245ffe74184SMartin Algesten } 246ffe74184SMartin Algesten 247ffe74184SMartin Algesten popped 248ffe74184SMartin Algesten } 249ffe74184SMartin Algesten get_num_bytes(&self) -> usize250ffe74184SMartin Algesten pub(crate) fn get_num_bytes(&self) -> usize { 251ffe74184SMartin Algesten self.n_bytes.load(Ordering::SeqCst) 252ffe74184SMartin Algesten } 253ffe74184SMartin Algesten len(&self) -> usize254ffe74184SMartin Algesten pub(crate) fn len(&self) -> usize { 255ffe74184SMartin Algesten self.queue_len.load(Ordering::SeqCst) 256ffe74184SMartin Algesten } 257ffe74184SMartin Algesten is_empty(&self) -> bool258ffe74184SMartin Algesten pub(crate) fn is_empty(&self) -> bool { 259ffe74184SMartin Algesten self.len() == 0 260ffe74184SMartin Algesten } 261ffe74184SMartin Algesten } 262