1 use crate::chunk::chunk_payload_data::ChunkPayloadData; 2 3 use std::collections::VecDeque; 4 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; 5 use tokio::sync::Mutex; 6 7 /// pendingBaseQueue 8 pub(crate) type PendingBaseQueue = VecDeque<ChunkPayloadData>; 9 10 // TODO: benchmark performance between multiple Atomic+Mutex vs one Mutex<PendingQueueInternal> 11 12 /// pendingQueue 13 #[derive(Debug, Default)] 14 pub(crate) struct PendingQueue { 15 unordered_queue: Mutex<PendingBaseQueue>, 16 ordered_queue: Mutex<PendingBaseQueue>, 17 queue_len: AtomicUsize, 18 n_bytes: AtomicUsize, 19 selected: AtomicBool, 20 unordered_is_selected: AtomicBool, 21 } 22 23 impl PendingQueue { 24 pub(crate) fn new() -> Self { 25 PendingQueue::default() 26 } 27 28 pub(crate) async fn push(&self, c: ChunkPayloadData) { 29 self.n_bytes.fetch_add(c.user_data.len(), Ordering::SeqCst); 30 if c.unordered { 31 let mut unordered_queue = self.unordered_queue.lock().await; 32 unordered_queue.push_back(c); 33 } else { 34 let mut ordered_queue = self.ordered_queue.lock().await; 35 ordered_queue.push_back(c); 36 } 37 self.queue_len.fetch_add(1, Ordering::SeqCst); 38 } 39 40 pub(crate) async fn peek(&self) -> Option<ChunkPayloadData> { 41 if self.selected.load(Ordering::SeqCst) { 42 if self.unordered_is_selected.load(Ordering::SeqCst) { 43 let unordered_queue = self.unordered_queue.lock().await; 44 return unordered_queue.get(0).cloned(); 45 } else { 46 let ordered_queue = self.ordered_queue.lock().await; 47 return ordered_queue.get(0).cloned(); 48 } 49 } 50 51 let c = { 52 let unordered_queue = self.unordered_queue.lock().await; 53 unordered_queue.get(0).cloned() 54 }; 55 56 if c.is_some() { 57 return c; 58 } 59 60 let ordered_queue = self.ordered_queue.lock().await; 61 ordered_queue.get(0).cloned() 62 } 63 64 pub(crate) async fn pop( 65 &self, 66 beginning_fragment: bool, 67 unordered: bool, 68 ) -> Option<ChunkPayloadData> { 69 let popped = if self.selected.load(Ordering::SeqCst) { 70 let popped = if self.unordered_is_selected.load(Ordering::SeqCst) { 71 let mut unordered_queue = self.unordered_queue.lock().await; 72 unordered_queue.pop_front() 73 } else { 74 let mut ordered_queue = self.ordered_queue.lock().await; 75 ordered_queue.pop_front() 76 }; 77 if let Some(p) = &popped { 78 if p.ending_fragment { 79 self.selected.store(false, Ordering::SeqCst); 80 } 81 } 82 popped 83 } else { 84 if !beginning_fragment { 85 return None; 86 } 87 if unordered { 88 let popped = { 89 let mut unordered_queue = self.unordered_queue.lock().await; 90 unordered_queue.pop_front() 91 }; 92 if let Some(p) = &popped { 93 if !p.ending_fragment { 94 self.selected.store(true, Ordering::SeqCst); 95 self.unordered_is_selected.store(true, Ordering::SeqCst); 96 } 97 } 98 popped 99 } else { 100 let popped = { 101 let mut ordered_queue = self.ordered_queue.lock().await; 102 ordered_queue.pop_front() 103 }; 104 if let Some(p) = &popped { 105 if !p.ending_fragment { 106 self.selected.store(true, Ordering::SeqCst); 107 self.unordered_is_selected.store(false, Ordering::SeqCst); 108 } 109 } 110 popped 111 } 112 }; 113 114 if let Some(p) = &popped { 115 self.n_bytes.fetch_sub(p.user_data.len(), Ordering::SeqCst); 116 self.queue_len.fetch_sub(1, Ordering::SeqCst); 117 } 118 119 popped 120 } 121 122 pub(crate) fn get_num_bytes(&self) -> usize { 123 self.n_bytes.load(Ordering::SeqCst) 124 } 125 126 pub(crate) fn len(&self) -> usize { 127 self.queue_len.load(Ordering::SeqCst) 128 } 129 130 pub(crate) fn is_empty(&self) -> bool { 131 self.len() == 0 132 } 133 } 134