1 use crate::chunk::chunk_payload_data::ChunkPayloadData; 2 3 use std::collections::VecDeque; 4 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; 5 use tokio::sync::Mutex; 6 7 /// pendingBaseQueue 8 pub(crate) type PendingBaseQueue = VecDeque<ChunkPayloadData>; 9 10 // TODO: benchmark performance between multiple Atomic+Mutex vs one Mutex<PendingQueueInternal> 11 12 /// pendingQueue 13 #[derive(Debug, Default)] 14 pub(crate) struct PendingQueue { 15 unordered_queue: Mutex<PendingBaseQueue>, 16 ordered_queue: Mutex<PendingBaseQueue>, 17 queue_len: AtomicUsize, 18 n_bytes: AtomicUsize, 19 selected: AtomicBool, 20 unordered_is_selected: AtomicBool, 21 } 22 23 impl PendingQueue { 24 pub(crate) fn new() -> Self { 25 PendingQueue::default() 26 } 27 28 /// Appends a chunk to the back of the pending queue. 29 pub(crate) async fn push(&self, c: ChunkPayloadData) { 30 let user_data_len = c.user_data.len(); 31 32 if c.unordered { 33 let mut unordered_queue = self.unordered_queue.lock().await; 34 unordered_queue.push_back(c); 35 } else { 36 let mut ordered_queue = self.ordered_queue.lock().await; 37 ordered_queue.push_back(c); 38 } 39 40 self.n_bytes.fetch_add(user_data_len, Ordering::SeqCst); 41 self.queue_len.fetch_add(1, Ordering::SeqCst); 42 } 43 44 /// Appends chunks to the back of the pending queue. 45 /// 46 /// # Panics 47 /// 48 /// If it's a mix of unordered and ordered chunks. 49 pub(crate) async fn append(&self, chunks: Vec<ChunkPayloadData>) { 50 if chunks.is_empty() { 51 return; 52 } 53 54 let total_user_data_len = chunks.iter().fold(0, |acc, c| acc + c.user_data.len()); 55 let chunks_len = chunks.len(); 56 57 let unordered = chunks 58 .first() 59 .expect("chunks to not be empty because of the above check") 60 .unordered; 61 if unordered { 62 let mut unordered_queue = self.unordered_queue.lock().await; 63 for c in chunks { 64 assert!(c.unordered, "expected all chunks to be unordered"); 65 unordered_queue.push_back(c); 66 } 67 } else { 68 let mut ordered_queue = self.ordered_queue.lock().await; 69 for c in chunks { 70 assert!(!c.unordered, "expected all chunks to be ordered"); 71 ordered_queue.push_back(c); 72 } 73 } 74 75 self.n_bytes 76 .fetch_add(total_user_data_len, Ordering::SeqCst); 77 self.queue_len.fetch_add(chunks_len, Ordering::SeqCst); 78 } 79 80 pub(crate) async fn peek(&self) -> Option<ChunkPayloadData> { 81 if self.selected.load(Ordering::SeqCst) { 82 if self.unordered_is_selected.load(Ordering::SeqCst) { 83 let unordered_queue = self.unordered_queue.lock().await; 84 return unordered_queue.get(0).cloned(); 85 } else { 86 let ordered_queue = self.ordered_queue.lock().await; 87 return ordered_queue.get(0).cloned(); 88 } 89 } 90 91 let c = { 92 let unordered_queue = self.unordered_queue.lock().await; 93 unordered_queue.get(0).cloned() 94 }; 95 96 if c.is_some() { 97 return c; 98 } 99 100 let ordered_queue = self.ordered_queue.lock().await; 101 ordered_queue.get(0).cloned() 102 } 103 104 pub(crate) async fn pop( 105 &self, 106 beginning_fragment: bool, 107 unordered: bool, 108 ) -> Option<ChunkPayloadData> { 109 let popped = if self.selected.load(Ordering::SeqCst) { 110 let popped = if self.unordered_is_selected.load(Ordering::SeqCst) { 111 let mut unordered_queue = self.unordered_queue.lock().await; 112 unordered_queue.pop_front() 113 } else { 114 let mut ordered_queue = self.ordered_queue.lock().await; 115 ordered_queue.pop_front() 116 }; 117 if let Some(p) = &popped { 118 if p.ending_fragment { 119 self.selected.store(false, Ordering::SeqCst); 120 } 121 } 122 popped 123 } else { 124 if !beginning_fragment { 125 return None; 126 } 127 if unordered { 128 let popped = { 129 let mut unordered_queue = self.unordered_queue.lock().await; 130 unordered_queue.pop_front() 131 }; 132 if let Some(p) = &popped { 133 if !p.ending_fragment { 134 self.selected.store(true, Ordering::SeqCst); 135 self.unordered_is_selected.store(true, Ordering::SeqCst); 136 } 137 } 138 popped 139 } else { 140 let popped = { 141 let mut ordered_queue = self.ordered_queue.lock().await; 142 ordered_queue.pop_front() 143 }; 144 if let Some(p) = &popped { 145 if !p.ending_fragment { 146 self.selected.store(true, Ordering::SeqCst); 147 self.unordered_is_selected.store(false, Ordering::SeqCst); 148 } 149 } 150 popped 151 } 152 }; 153 154 if let Some(p) = &popped { 155 self.n_bytes.fetch_sub(p.user_data.len(), Ordering::SeqCst); 156 self.queue_len.fetch_sub(1, Ordering::SeqCst); 157 } 158 159 popped 160 } 161 162 pub(crate) fn get_num_bytes(&self) -> usize { 163 self.n_bytes.load(Ordering::SeqCst) 164 } 165 166 pub(crate) fn len(&self) -> usize { 167 self.queue_len.load(Ordering::SeqCst) 168 } 169 170 pub(crate) fn is_empty(&self) -> bool { 171 self.len() == 0 172 } 173 } 174