1 use crate::chunk::chunk_payload_data::ChunkPayloadData; 2 use crate::chunk::chunk_selective_ack::GapAckBlock; 3 use crate::util::*; 4 5 use std::collections::{HashMap, VecDeque}; 6 use std::sync::atomic::{AtomicUsize, Ordering}; 7 use std::sync::Arc; 8 9 #[derive(Default, Debug)] 10 pub(crate) struct PayloadQueue { 11 pub(crate) length: Arc<AtomicUsize>, 12 pub(crate) chunk_map: HashMap<u32, ChunkPayloadData>, 13 pub(crate) sorted: VecDeque<u32>, 14 pub(crate) dup_tsn: Vec<u32>, 15 pub(crate) n_bytes: usize, 16 } 17 18 impl PayloadQueue { new(length: Arc<AtomicUsize>) -> Self19 pub(crate) fn new(length: Arc<AtomicUsize>) -> Self { 20 length.store(0, Ordering::SeqCst); 21 PayloadQueue { 22 length, 23 ..Default::default() 24 } 25 } 26 can_push(&self, p: &ChunkPayloadData, cumulative_tsn: u32) -> bool27 pub(crate) fn can_push(&self, p: &ChunkPayloadData, cumulative_tsn: u32) -> bool { 28 !(self.chunk_map.contains_key(&p.tsn) || sna32lte(p.tsn, cumulative_tsn)) 29 } 30 push_no_check(&mut self, p: ChunkPayloadData)31 pub(crate) fn push_no_check(&mut self, p: ChunkPayloadData) { 32 let tsn = p.tsn; 33 self.n_bytes += p.user_data.len(); 34 self.chunk_map.insert(tsn, p); 35 self.length.fetch_add(1, Ordering::SeqCst); 36 37 if self.sorted.is_empty() || sna32gt(tsn, *self.sorted.back().unwrap()) { 38 self.sorted.push_back(tsn); 39 } else if sna32lt(tsn, *self.sorted.front().unwrap()) { 40 self.sorted.push_front(tsn); 41 } else { 42 fn compare_tsn(a: u32, b: u32) -> std::cmp::Ordering { 43 if sna32lt(a, b) { 44 std::cmp::Ordering::Less 45 } else { 46 std::cmp::Ordering::Greater 47 } 48 } 49 let pos = match self 50 .sorted 51 .binary_search_by(|element| compare_tsn(*element, tsn)) 52 { 53 Ok(pos) => pos, 54 Err(pos) => pos, 55 }; 56 self.sorted.insert(pos, tsn); 57 } 58 } 59 60 /// push pushes a payload data. If the payload data is already in our queue or 61 /// older than our cumulative_tsn marker, it will be recored as duplications, 62 /// which can later be retrieved using popDuplicates. push(&mut self, p: ChunkPayloadData, cumulative_tsn: u32) -> bool63 pub(crate) fn push(&mut self, p: ChunkPayloadData, cumulative_tsn: u32) -> bool { 64 let ok = self.chunk_map.contains_key(&p.tsn); 65 if ok || sna32lte(p.tsn, cumulative_tsn) { 66 // Found the packet, log in dups 67 self.dup_tsn.push(p.tsn); 68 return false; 69 } 70 71 self.push_no_check(p); 72 true 73 } 74 75 /// pop pops only if the oldest chunk's TSN matches the given TSN. pop(&mut self, tsn: u32) -> Option<ChunkPayloadData>76 pub(crate) fn pop(&mut self, tsn: u32) -> Option<ChunkPayloadData> { 77 if Some(&tsn) == self.sorted.front() { 78 self.sorted.pop_front(); 79 if let Some(c) = self.chunk_map.remove(&tsn) { 80 self.length.fetch_sub(1, Ordering::SeqCst); 81 self.n_bytes -= c.user_data.len(); 82 return Some(c); 83 } 84 } 85 86 None 87 } 88 89 /// get returns reference to chunkPayloadData with the given TSN value. get(&self, tsn: u32) -> Option<&ChunkPayloadData>90 pub(crate) fn get(&self, tsn: u32) -> Option<&ChunkPayloadData> { 91 self.chunk_map.get(&tsn) 92 } get_mut(&mut self, tsn: u32) -> Option<&mut ChunkPayloadData>93 pub(crate) fn get_mut(&mut self, tsn: u32) -> Option<&mut ChunkPayloadData> { 94 self.chunk_map.get_mut(&tsn) 95 } 96 97 /// popDuplicates returns an array of TSN values that were found duplicate. pop_duplicates(&mut self) -> Vec<u32>98 pub(crate) fn pop_duplicates(&mut self) -> Vec<u32> { 99 self.dup_tsn.drain(..).collect() 100 } 101 get_gap_ack_blocks(&self, cumulative_tsn: u32) -> Vec<GapAckBlock>102 pub(crate) fn get_gap_ack_blocks(&self, cumulative_tsn: u32) -> Vec<GapAckBlock> { 103 if self.chunk_map.is_empty() { 104 return vec![]; 105 } 106 107 let mut b = GapAckBlock::default(); 108 let mut gap_ack_blocks = vec![]; 109 for (i, tsn) in self.sorted.iter().enumerate() { 110 let diff = if *tsn >= cumulative_tsn { 111 (*tsn - cumulative_tsn) as u16 112 } else { 113 0 114 }; 115 116 if i == 0 { 117 b.start = diff; 118 b.end = b.start; 119 } else if b.end + 1 == diff { 120 b.end += 1; 121 } else { 122 gap_ack_blocks.push(b); 123 124 b.start = diff; 125 b.end = diff; 126 } 127 } 128 129 gap_ack_blocks.push(b); 130 131 gap_ack_blocks 132 } 133 get_gap_ack_blocks_string(&self, cumulative_tsn: u32) -> String134 pub(crate) fn get_gap_ack_blocks_string(&self, cumulative_tsn: u32) -> String { 135 let mut s = format!("cumTSN={cumulative_tsn}"); 136 for b in self.get_gap_ack_blocks(cumulative_tsn) { 137 s += format!(",{}-{}", b.start, b.end).as_str(); 138 } 139 s 140 } 141 mark_as_acked(&mut self, tsn: u32) -> usize142 pub(crate) fn mark_as_acked(&mut self, tsn: u32) -> usize { 143 let n_bytes_acked = if let Some(c) = self.chunk_map.get_mut(&tsn) { 144 c.acked = true; 145 c.retransmit = false; 146 let n = c.user_data.len(); 147 self.n_bytes -= n; 148 c.user_data.clear(); 149 n 150 } else { 151 0 152 }; 153 154 n_bytes_acked 155 } 156 get_last_tsn_received(&self) -> Option<&u32>157 pub(crate) fn get_last_tsn_received(&self) -> Option<&u32> { 158 self.sorted.back() 159 } 160 mark_all_to_retrasmit(&mut self)161 pub(crate) fn mark_all_to_retrasmit(&mut self) { 162 for c in self.chunk_map.values_mut() { 163 if c.acked || c.abandoned() { 164 continue; 165 } 166 c.retransmit = true; 167 } 168 } 169 get_num_bytes(&self) -> usize170 pub(crate) fn get_num_bytes(&self) -> usize { 171 self.n_bytes 172 } 173 len(&self) -> usize174 pub(crate) fn len(&self) -> usize { 175 assert_eq!(self.chunk_map.len(), self.length.load(Ordering::SeqCst)); 176 self.chunk_map.len() 177 } 178 is_empty(&self) -> bool179 pub(crate) fn is_empty(&self) -> bool { 180 self.len() == 0 181 } 182 } 183