1 use std::{ 2 collections::VecDeque, 3 sync::atomic::{AtomicBool, AtomicUsize, Ordering}, 4 }; 5 6 use tokio::sync::{Mutex, Semaphore}; 7 use util::sync::RwLock; 8 9 use crate::chunk::chunk_payload_data::ChunkPayloadData; 10 11 /// Basic queue for either ordered or unordered chunks. 12 pub(crate) type PendingBaseQueue = VecDeque<ChunkPayloadData>; 13 14 // TODO: benchmark performance between multiple Atomic+Mutex vs one Mutex<PendingQueueInternal> 15 16 /// A queue for both ordered and unordered chunks. 17 #[derive(Debug)] 18 pub(crate) struct PendingQueue { 19 // These two fields limit appending bytes to the queue 20 // This two step process is necessary because 21 // A) We need backpressure which the semaphore applies by limiting the total amount of bytes via the permits 22 // B) The chunks of one fragmented message need to be put in direct sequence into the queue which the lock guarantees 23 // 24 // The semaphore is not inside the lock because the permits need to be returned without needing a lock on the semaphore 25 semaphore_lock: Mutex<()>, 26 semaphore: Semaphore, 27 28 unordered_queue: RwLock<PendingBaseQueue>, 29 ordered_queue: RwLock<PendingBaseQueue>, 30 queue_len: AtomicUsize, 31 n_bytes: AtomicUsize, 32 selected: AtomicBool, 33 unordered_is_selected: AtomicBool, 34 } 35 36 impl Default for PendingQueue { 37 fn default() -> Self { 38 PendingQueue::new() 39 } 40 } 41 42 // Some tests push a lot of data before starting to process any data... 43 #[cfg(test)] 44 const QUEUE_BYTES_LIMIT: usize = 128 * 1024 * 1024; 45 #[cfg(not(test))] 46 const QUEUE_BYTES_LIMIT: usize = 128 * 1024; 47 48 const QUEUE_APPEND_LARGE: usize = (QUEUE_BYTES_LIMIT * 2) / 3; 49 50 impl PendingQueue { 51 pub(crate) fn new() -> Self { 52 Self { 53 semaphore_lock: Mutex::default(), 54 semaphore: Semaphore::new(QUEUE_BYTES_LIMIT), 55 unordered_queue: Default::default(), 56 ordered_queue: Default::default(), 57 queue_len: Default::default(), 58 n_bytes: Default::default(), 59 selected: Default::default(), 60 unordered_is_selected: Default::default(), 61 } 62 } 63 64 /// Appends a chunk to the back of the pending queue. 65 pub(crate) async fn push(&self, c: ChunkPayloadData) { 66 let user_data_len = c.user_data.len(); 67 68 { 69 let sem_lock = self.semaphore_lock.lock().await; 70 let permits = self.semaphore.acquire_many(user_data_len as u32).await; 71 // unwrap ok because we never close the semaphore unless we have dropped self 72 permits.unwrap().forget(); 73 74 if c.unordered { 75 let mut unordered_queue = self.unordered_queue.write(); 76 unordered_queue.push_back(c); 77 } else { 78 let mut ordered_queue = self.ordered_queue.write(); 79 ordered_queue.push_back(c); 80 } 81 drop(sem_lock); 82 } 83 84 self.n_bytes.fetch_add(user_data_len, Ordering::SeqCst); 85 self.queue_len.fetch_add(1, Ordering::SeqCst); 86 } 87 88 /// Appends chunks to the back of the pending queue. 89 /// 90 /// # Panics 91 /// 92 /// If it's a mix of unordered and ordered chunks. 93 pub(crate) async fn append(&self, chunks: Vec<ChunkPayloadData>) { 94 if chunks.is_empty() { 95 return; 96 } 97 98 let total_user_data_len = chunks.iter().fold(0, |acc, c| acc + c.user_data.len()); 99 100 if total_user_data_len >= QUEUE_APPEND_LARGE { 101 self.append_large(chunks).await 102 } else { 103 let sem_lock = self.semaphore_lock.lock().await; 104 let permits = self 105 .semaphore 106 .acquire_many(total_user_data_len as u32) 107 .await; 108 // unwrap ok because we never close the semaphore unless we have dropped self 109 permits.unwrap().forget(); 110 self.append_unlimited(chunks, total_user_data_len); 111 drop(sem_lock); 112 } 113 } 114 115 // If this is a very large message we append chunks one by one to allow progress while we are appending 116 async fn append_large(&self, chunks: Vec<ChunkPayloadData>) { 117 // lock this for the whole duration 118 let sem_lock = self.semaphore_lock.lock().await; 119 120 for chunk in chunks.into_iter() { 121 let user_data_len = chunk.user_data.len(); 122 let permits = self.semaphore.acquire_many(user_data_len as u32).await; 123 // unwrap ok because we never close the semaphore unless we have dropped self 124 permits.unwrap().forget(); 125 126 if chunk.unordered { 127 let mut unordered_queue = self.unordered_queue.write(); 128 unordered_queue.push_back(chunk); 129 } else { 130 let mut ordered_queue = self.ordered_queue.write(); 131 ordered_queue.push_back(chunk); 132 } 133 self.n_bytes.fetch_add(user_data_len, Ordering::SeqCst); 134 self.queue_len.fetch_add(1, Ordering::SeqCst); 135 } 136 137 drop(sem_lock); 138 } 139 140 /// Assumes that A) enough permits have been acquired and forget from the semaphore and that the semaphore_lock is held 141 fn append_unlimited(&self, chunks: Vec<ChunkPayloadData>, total_user_data_len: usize) { 142 let chunks_len = chunks.len(); 143 let unordered = chunks 144 .first() 145 .expect("chunks to not be empty because of the above check") 146 .unordered; 147 if unordered { 148 let mut unordered_queue = self.unordered_queue.write(); 149 assert!( 150 chunks.iter().all(|c| c.unordered), 151 "expected all chunks to be unordered" 152 ); 153 unordered_queue.extend(chunks); 154 } else { 155 let mut ordered_queue = self.ordered_queue.write(); 156 assert!( 157 chunks.iter().all(|c| !c.unordered), 158 "expected all chunks to be ordered" 159 ); 160 ordered_queue.extend(chunks); 161 } 162 163 self.n_bytes 164 .fetch_add(total_user_data_len, Ordering::SeqCst); 165 self.queue_len.fetch_add(chunks_len, Ordering::SeqCst); 166 } 167 168 pub(crate) fn peek(&self) -> Option<ChunkPayloadData> { 169 if self.selected.load(Ordering::SeqCst) { 170 if self.unordered_is_selected.load(Ordering::SeqCst) { 171 let unordered_queue = self.unordered_queue.read(); 172 return unordered_queue.get(0).cloned(); 173 } else { 174 let ordered_queue = self.ordered_queue.read(); 175 return ordered_queue.get(0).cloned(); 176 } 177 } 178 179 let c = { 180 let unordered_queue = self.unordered_queue.read(); 181 unordered_queue.get(0).cloned() 182 }; 183 184 if c.is_some() { 185 return c; 186 } 187 188 let ordered_queue = self.ordered_queue.read(); 189 ordered_queue.get(0).cloned() 190 } 191 192 pub(crate) fn pop( 193 &self, 194 beginning_fragment: bool, 195 unordered: bool, 196 ) -> Option<ChunkPayloadData> { 197 let popped = if self.selected.load(Ordering::SeqCst) { 198 let popped = if self.unordered_is_selected.load(Ordering::SeqCst) { 199 let mut unordered_queue = self.unordered_queue.write(); 200 unordered_queue.pop_front() 201 } else { 202 let mut ordered_queue = self.ordered_queue.write(); 203 ordered_queue.pop_front() 204 }; 205 if let Some(p) = &popped { 206 if p.ending_fragment { 207 self.selected.store(false, Ordering::SeqCst); 208 } 209 } 210 popped 211 } else { 212 if !beginning_fragment { 213 return None; 214 } 215 if unordered { 216 let popped = { 217 let mut unordered_queue = self.unordered_queue.write(); 218 unordered_queue.pop_front() 219 }; 220 if let Some(p) = &popped { 221 if !p.ending_fragment { 222 self.selected.store(true, Ordering::SeqCst); 223 self.unordered_is_selected.store(true, Ordering::SeqCst); 224 } 225 } 226 popped 227 } else { 228 let popped = { 229 let mut ordered_queue = self.ordered_queue.write(); 230 ordered_queue.pop_front() 231 }; 232 if let Some(p) = &popped { 233 if !p.ending_fragment { 234 self.selected.store(true, Ordering::SeqCst); 235 self.unordered_is_selected.store(false, Ordering::SeqCst); 236 } 237 } 238 popped 239 } 240 }; 241 242 if let Some(p) = &popped { 243 let user_data_len = p.user_data.len(); 244 self.n_bytes.fetch_sub(user_data_len, Ordering::SeqCst); 245 self.queue_len.fetch_sub(1, Ordering::SeqCst); 246 self.semaphore.add_permits(user_data_len); 247 } 248 249 popped 250 } 251 252 pub(crate) fn get_num_bytes(&self) -> usize { 253 self.n_bytes.load(Ordering::SeqCst) 254 } 255 256 pub(crate) fn len(&self) -> usize { 257 self.queue_len.load(Ordering::SeqCst) 258 } 259 260 pub(crate) fn is_empty(&self) -> bool { 261 self.len() == 0 262 } 263 } 264