1 use tokio::sync::{Mutex, Semaphore}; 2 use util::sync::RwLock; 3 4 use std::{ 5 collections::VecDeque, 6 sync::atomic::{AtomicBool, AtomicUsize, Ordering}, 7 }; 8 9 use crate::chunk::chunk_payload_data::ChunkPayloadData; 10 11 // TODO: benchmark performance between multiple Atomic+Mutex vs one Mutex<PendingQueueInternal> 12 13 // Some tests push a lot of data before starting to process any data... 14 #[cfg(test)] 15 const QUEUE_BYTES_LIMIT: usize = 128 * 1024 * 1024; 16 /// Maximum size of the pending queue, in bytes. 17 #[cfg(not(test))] 18 const QUEUE_BYTES_LIMIT: usize = 128 * 1024; 19 /// Total user data size, beyound which the packet will be split into chunks. The chunks will be 20 /// added to the pending queue one by one. 21 const QUEUE_APPEND_LARGE: usize = (QUEUE_BYTES_LIMIT * 2) / 3; 22 23 /// Basic queue for either ordered or unordered chunks. 24 pub(crate) type PendingBaseQueue = VecDeque<ChunkPayloadData>; 25 26 /// A queue for both ordered and unordered chunks. 27 #[derive(Debug)] 28 pub(crate) struct PendingQueue { 29 // These two fields limit appending bytes to the queue 30 // This two step process is necessary because 31 // A) We need backpressure which the semaphore applies by limiting the total amount of bytes via the permits 32 // B) The chunks of one fragmented message need to be put in direct sequence into the queue which the lock guarantees 33 // 34 // The semaphore is not inside the lock because the permits need to be returned without needing a lock on the semaphore 35 semaphore_lock: Mutex<()>, 36 semaphore: Semaphore, 37 38 unordered_queue: RwLock<PendingBaseQueue>, 39 ordered_queue: RwLock<PendingBaseQueue>, 40 queue_len: AtomicUsize, 41 n_bytes: AtomicUsize, 42 selected: AtomicBool, 43 unordered_is_selected: AtomicBool, 44 } 45 46 impl Default for PendingQueue { 47 fn default() -> Self { 48 PendingQueue::new() 49 } 50 } 51 52 impl PendingQueue { 53 pub(crate) fn new() -> Self { 54 Self { 55 semaphore_lock: Mutex::default(), 56 semaphore: Semaphore::new(QUEUE_BYTES_LIMIT), 57 unordered_queue: Default::default(), 58 ordered_queue: Default::default(), 59 queue_len: Default::default(), 60 n_bytes: Default::default(), 61 selected: Default::default(), 62 unordered_is_selected: Default::default(), 63 } 64 } 65 66 /// Appends a chunk to the back of the pending queue. 67 pub(crate) async fn push(&self, c: ChunkPayloadData) { 68 let user_data_len = c.user_data.len(); 69 70 { 71 let _sem_lock = self.semaphore_lock.lock().await; 72 let permits = self.semaphore.acquire_many(user_data_len as u32).await; 73 // unwrap ok because we never close the semaphore unless we have dropped self 74 permits.unwrap().forget(); 75 76 if c.unordered { 77 let mut unordered_queue = self.unordered_queue.write(); 78 unordered_queue.push_back(c); 79 } else { 80 let mut ordered_queue = self.ordered_queue.write(); 81 ordered_queue.push_back(c); 82 } 83 } 84 85 self.n_bytes.fetch_add(user_data_len, Ordering::SeqCst); 86 self.queue_len.fetch_add(1, Ordering::SeqCst); 87 } 88 89 /// Appends chunks to the back of the pending queue. 90 /// 91 /// # Panics 92 /// 93 /// If it's a mix of unordered and ordered chunks. 94 pub(crate) async fn append(&self, chunks: Vec<ChunkPayloadData>) { 95 if chunks.is_empty() { 96 return; 97 } 98 99 let total_user_data_len = chunks.iter().fold(0, |acc, c| acc + c.user_data.len()); 100 101 if total_user_data_len >= QUEUE_APPEND_LARGE { 102 self.append_large(chunks).await 103 } else { 104 let _sem_lock = self.semaphore_lock.lock().await; 105 let permits = self 106 .semaphore 107 .acquire_many(total_user_data_len as u32) 108 .await; 109 // unwrap ok because we never close the semaphore unless we have dropped self 110 permits.unwrap().forget(); 111 self.append_unlimited(chunks, total_user_data_len); 112 } 113 } 114 115 // If this is a very large message we append chunks one by one to allow progress while we are appending 116 async fn append_large(&self, chunks: Vec<ChunkPayloadData>) { 117 // lock this for the whole duration 118 let _sem_lock = self.semaphore_lock.lock().await; 119 120 for chunk in chunks.into_iter() { 121 let user_data_len = chunk.user_data.len(); 122 let permits = self.semaphore.acquire_many(user_data_len as u32).await; 123 // unwrap ok because we never close the semaphore unless we have dropped self 124 permits.unwrap().forget(); 125 126 if chunk.unordered { 127 let mut unordered_queue = self.unordered_queue.write(); 128 unordered_queue.push_back(chunk); 129 } else { 130 let mut ordered_queue = self.ordered_queue.write(); 131 ordered_queue.push_back(chunk); 132 } 133 self.n_bytes.fetch_add(user_data_len, Ordering::SeqCst); 134 self.queue_len.fetch_add(1, Ordering::SeqCst); 135 } 136 } 137 138 /// Assumes that A) enough permits have been acquired and forget from the semaphore and that the semaphore_lock is held 139 fn append_unlimited(&self, chunks: Vec<ChunkPayloadData>, total_user_data_len: usize) { 140 let chunks_len = chunks.len(); 141 let unordered = chunks 142 .first() 143 .expect("chunks to not be empty because of the above check") 144 .unordered; 145 if unordered { 146 let mut unordered_queue = self.unordered_queue.write(); 147 assert!( 148 chunks.iter().all(|c| c.unordered), 149 "expected all chunks to be unordered" 150 ); 151 unordered_queue.extend(chunks); 152 } else { 153 let mut ordered_queue = self.ordered_queue.write(); 154 assert!( 155 chunks.iter().all(|c| !c.unordered), 156 "expected all chunks to be ordered" 157 ); 158 ordered_queue.extend(chunks); 159 } 160 161 self.n_bytes 162 .fetch_add(total_user_data_len, Ordering::SeqCst); 163 self.queue_len.fetch_add(chunks_len, Ordering::SeqCst); 164 } 165 166 pub(crate) fn peek(&self) -> Option<ChunkPayloadData> { 167 if self.selected.load(Ordering::SeqCst) { 168 if self.unordered_is_selected.load(Ordering::SeqCst) { 169 let unordered_queue = self.unordered_queue.read(); 170 return unordered_queue.get(0).cloned(); 171 } else { 172 let ordered_queue = self.ordered_queue.read(); 173 return ordered_queue.get(0).cloned(); 174 } 175 } 176 177 let c = { 178 let unordered_queue = self.unordered_queue.read(); 179 unordered_queue.get(0).cloned() 180 }; 181 182 if c.is_some() { 183 return c; 184 } 185 186 let ordered_queue = self.ordered_queue.read(); 187 ordered_queue.get(0).cloned() 188 } 189 190 pub(crate) fn pop( 191 &self, 192 beginning_fragment: bool, 193 unordered: bool, 194 ) -> Option<ChunkPayloadData> { 195 let popped = if self.selected.load(Ordering::SeqCst) { 196 let popped = if self.unordered_is_selected.load(Ordering::SeqCst) { 197 let mut unordered_queue = self.unordered_queue.write(); 198 unordered_queue.pop_front() 199 } else { 200 let mut ordered_queue = self.ordered_queue.write(); 201 ordered_queue.pop_front() 202 }; 203 if let Some(p) = &popped { 204 if p.ending_fragment { 205 self.selected.store(false, Ordering::SeqCst); 206 } 207 } 208 popped 209 } else { 210 if !beginning_fragment { 211 return None; 212 } 213 if unordered { 214 let popped = { 215 let mut unordered_queue = self.unordered_queue.write(); 216 unordered_queue.pop_front() 217 }; 218 if let Some(p) = &popped { 219 if !p.ending_fragment { 220 self.selected.store(true, Ordering::SeqCst); 221 self.unordered_is_selected.store(true, Ordering::SeqCst); 222 } 223 } 224 popped 225 } else { 226 let popped = { 227 let mut ordered_queue = self.ordered_queue.write(); 228 ordered_queue.pop_front() 229 }; 230 if let Some(p) = &popped { 231 if !p.ending_fragment { 232 self.selected.store(true, Ordering::SeqCst); 233 self.unordered_is_selected.store(false, Ordering::SeqCst); 234 } 235 } 236 popped 237 } 238 }; 239 240 if let Some(p) = &popped { 241 let user_data_len = p.user_data.len(); 242 self.n_bytes.fetch_sub(user_data_len, Ordering::SeqCst); 243 self.queue_len.fetch_sub(1, Ordering::SeqCst); 244 self.semaphore.add_permits(user_data_len); 245 } 246 247 popped 248 } 249 250 pub(crate) fn get_num_bytes(&self) -> usize { 251 self.n_bytes.load(Ordering::SeqCst) 252 } 253 254 pub(crate) fn len(&self) -> usize { 255 self.queue_len.load(Ordering::SeqCst) 256 } 257 258 pub(crate) fn is_empty(&self) -> bool { 259 self.len() == 0 260 } 261 } 262