1 use std::{ 2 collections::VecDeque, 3 sync::atomic::{AtomicBool, AtomicUsize, Ordering}, 4 }; 5 6 use util::sync::RwLock; 7 8 use crate::chunk::chunk_payload_data::ChunkPayloadData; 9 10 /// Basic queue for either ordered or unordered chunks. 11 pub(crate) type PendingBaseQueue = VecDeque<ChunkPayloadData>; 12 13 // TODO: benchmark performance between multiple Atomic+Mutex vs one Mutex<PendingQueueInternal> 14 15 /// A queue for both ordered and unordered chunks. 16 #[derive(Debug, Default)] 17 pub(crate) struct PendingQueue { 18 unordered_queue: RwLock<PendingBaseQueue>, 19 ordered_queue: RwLock<PendingBaseQueue>, 20 queue_len: AtomicUsize, 21 n_bytes: AtomicUsize, 22 selected: AtomicBool, 23 unordered_is_selected: AtomicBool, 24 } 25 26 impl PendingQueue { 27 pub(crate) fn new() -> Self { 28 PendingQueue::default() 29 } 30 31 /// Appends a chunk to the back of the pending queue. 32 pub(crate) fn push(&self, c: ChunkPayloadData) { 33 let user_data_len = c.user_data.len(); 34 35 if c.unordered { 36 let mut unordered_queue = self.unordered_queue.write(); 37 unordered_queue.push_back(c); 38 } else { 39 let mut ordered_queue = self.ordered_queue.write(); 40 ordered_queue.push_back(c); 41 } 42 43 self.n_bytes.fetch_add(user_data_len, Ordering::SeqCst); 44 self.queue_len.fetch_add(1, Ordering::SeqCst); 45 } 46 47 /// Appends chunks to the back of the pending queue. 48 /// 49 /// # Panics 50 /// 51 /// If it's a mix of unordered and ordered chunks. 52 pub(crate) fn append(&self, chunks: Vec<ChunkPayloadData>) { 53 if chunks.is_empty() { 54 return; 55 } 56 57 let total_user_data_len = chunks.iter().fold(0, |acc, c| acc + c.user_data.len()); 58 let chunks_len = chunks.len(); 59 60 let unordered = chunks 61 .first() 62 .expect("chunks to not be empty because of the above check") 63 .unordered; 64 if unordered { 65 let mut unordered_queue = self.unordered_queue.write(); 66 for c in chunks { 67 assert!(c.unordered, "expected all chunks to be unordered"); 68 unordered_queue.push_back(c); 69 } 70 } else { 71 let mut ordered_queue = self.ordered_queue.write(); 72 for c in chunks { 73 assert!(!c.unordered, "expected all chunks to be ordered"); 74 ordered_queue.push_back(c); 75 } 76 } 77 78 self.n_bytes 79 .fetch_add(total_user_data_len, Ordering::SeqCst); 80 self.queue_len.fetch_add(chunks_len, Ordering::SeqCst); 81 } 82 83 pub(crate) fn peek(&self) -> Option<ChunkPayloadData> { 84 if self.selected.load(Ordering::SeqCst) { 85 if self.unordered_is_selected.load(Ordering::SeqCst) { 86 let unordered_queue = self.unordered_queue.read(); 87 return unordered_queue.get(0).cloned(); 88 } else { 89 let ordered_queue = self.ordered_queue.read(); 90 return ordered_queue.get(0).cloned(); 91 } 92 } 93 94 let c = { 95 let unordered_queue = self.unordered_queue.read(); 96 unordered_queue.get(0).cloned() 97 }; 98 99 if c.is_some() { 100 return c; 101 } 102 103 let ordered_queue = self.ordered_queue.read(); 104 ordered_queue.get(0).cloned() 105 } 106 107 pub(crate) fn pop( 108 &self, 109 beginning_fragment: bool, 110 unordered: bool, 111 ) -> Option<ChunkPayloadData> { 112 let popped = if self.selected.load(Ordering::SeqCst) { 113 let popped = if self.unordered_is_selected.load(Ordering::SeqCst) { 114 let mut unordered_queue = self.unordered_queue.write(); 115 unordered_queue.pop_front() 116 } else { 117 let mut ordered_queue = self.ordered_queue.write(); 118 ordered_queue.pop_front() 119 }; 120 if let Some(p) = &popped { 121 if p.ending_fragment { 122 self.selected.store(false, Ordering::SeqCst); 123 } 124 } 125 popped 126 } else { 127 if !beginning_fragment { 128 return None; 129 } 130 if unordered { 131 let popped = { 132 let mut unordered_queue = self.unordered_queue.write(); 133 unordered_queue.pop_front() 134 }; 135 if let Some(p) = &popped { 136 if !p.ending_fragment { 137 self.selected.store(true, Ordering::SeqCst); 138 self.unordered_is_selected.store(true, Ordering::SeqCst); 139 } 140 } 141 popped 142 } else { 143 let popped = { 144 let mut ordered_queue = self.ordered_queue.write(); 145 ordered_queue.pop_front() 146 }; 147 if let Some(p) = &popped { 148 if !p.ending_fragment { 149 self.selected.store(true, Ordering::SeqCst); 150 self.unordered_is_selected.store(false, Ordering::SeqCst); 151 } 152 } 153 popped 154 } 155 }; 156 157 if let Some(p) = &popped { 158 self.n_bytes.fetch_sub(p.user_data.len(), Ordering::SeqCst); 159 self.queue_len.fetch_sub(1, Ordering::SeqCst); 160 } 161 162 popped 163 } 164 165 pub(crate) fn get_num_bytes(&self) -> usize { 166 self.n_bytes.load(Ordering::SeqCst) 167 } 168 169 pub(crate) fn len(&self) -> usize { 170 self.queue_len.load(Ordering::SeqCst) 171 } 172 173 pub(crate) fn is_empty(&self) -> bool { 174 self.len() == 0 175 } 176 } 177