1 use crate::chunk::chunk_payload_data::ChunkPayloadData; 2 use crate::chunk::chunk_selective_ack::GapAckBlock; 3 use crate::util::*; 4 5 use std::collections::HashMap; 6 use std::sync::atomic::{AtomicUsize, Ordering}; 7 use std::sync::Arc; 8 9 #[derive(Default, Debug)] 10 pub(crate) struct PayloadQueue { 11 pub(crate) length: Arc<AtomicUsize>, 12 pub(crate) chunk_map: HashMap<u32, ChunkPayloadData>, 13 pub(crate) sorted: Vec<u32>, 14 pub(crate) dup_tsn: Vec<u32>, 15 pub(crate) n_bytes: usize, 16 } 17 18 impl PayloadQueue { 19 pub(crate) fn new(length: Arc<AtomicUsize>) -> Self { 20 length.store(0, Ordering::SeqCst); 21 PayloadQueue { 22 length, 23 ..Default::default() 24 } 25 } 26 27 pub(crate) fn update_sorted_keys(&mut self) { 28 self.sorted.sort_by(|a, b| { 29 if sna32lt(*a, *b) { 30 std::cmp::Ordering::Less 31 } else { 32 std::cmp::Ordering::Greater 33 } 34 }); 35 } 36 37 pub(crate) fn can_push(&self, p: &ChunkPayloadData, cumulative_tsn: u32) -> bool { 38 !(self.chunk_map.contains_key(&p.tsn) || sna32lte(p.tsn, cumulative_tsn)) 39 } 40 41 pub(crate) fn push_no_check(&mut self, p: ChunkPayloadData) { 42 self.n_bytes += p.user_data.len(); 43 self.sorted.push(p.tsn); 44 self.chunk_map.insert(p.tsn, p); 45 self.length.fetch_add(1, Ordering::SeqCst); 46 self.update_sorted_keys(); 47 } 48 49 /// push pushes a payload data. If the payload data is already in our queue or 50 /// older than our cumulative_tsn marker, it will be recored as duplications, 51 /// which can later be retrieved using popDuplicates. 52 pub(crate) fn push(&mut self, p: ChunkPayloadData, cumulative_tsn: u32) -> bool { 53 let ok = self.chunk_map.contains_key(&p.tsn); 54 if ok || sna32lte(p.tsn, cumulative_tsn) { 55 // Found the packet, log in dups 56 self.dup_tsn.push(p.tsn); 57 return false; 58 } 59 60 self.n_bytes += p.user_data.len(); 61 self.sorted.push(p.tsn); 62 self.chunk_map.insert(p.tsn, p); 63 self.length.fetch_add(1, Ordering::SeqCst); 64 self.update_sorted_keys(); 65 66 true 67 } 68 69 /// pop pops only if the oldest chunk's TSN matches the given TSN. 70 pub(crate) fn pop(&mut self, tsn: u32) -> Option<ChunkPayloadData> { 71 if !self.sorted.is_empty() && tsn == self.sorted[0] { 72 self.sorted.remove(0); 73 if let Some(c) = self.chunk_map.remove(&tsn) { 74 self.length.fetch_sub(1, Ordering::SeqCst); 75 self.n_bytes -= c.user_data.len(); 76 return Some(c); 77 } 78 } 79 80 None 81 } 82 83 /// get returns reference to chunkPayloadData with the given TSN value. 84 pub(crate) fn get(&self, tsn: u32) -> Option<&ChunkPayloadData> { 85 self.chunk_map.get(&tsn) 86 } 87 pub(crate) fn get_mut(&mut self, tsn: u32) -> Option<&mut ChunkPayloadData> { 88 self.chunk_map.get_mut(&tsn) 89 } 90 91 /// popDuplicates returns an array of TSN values that were found duplicate. 92 pub(crate) fn pop_duplicates(&mut self) -> Vec<u32> { 93 self.dup_tsn.drain(..).collect() 94 } 95 96 pub(crate) fn get_gap_ack_blocks(&self, cumulative_tsn: u32) -> Vec<GapAckBlock> { 97 if self.chunk_map.is_empty() { 98 return vec![]; 99 } 100 101 let mut b = GapAckBlock::default(); 102 let mut gap_ack_blocks = vec![]; 103 for (i, tsn) in self.sorted.iter().enumerate() { 104 let diff = if *tsn >= cumulative_tsn { 105 (*tsn - cumulative_tsn) as u16 106 } else { 107 0 108 }; 109 110 if i == 0 { 111 b.start = diff; 112 b.end = b.start; 113 } else if b.end + 1 == diff { 114 b.end += 1; 115 } else { 116 gap_ack_blocks.push(b); 117 118 b.start = diff; 119 b.end = diff; 120 } 121 } 122 123 gap_ack_blocks.push(b); 124 125 gap_ack_blocks 126 } 127 128 pub(crate) fn get_gap_ack_blocks_string(&self, cumulative_tsn: u32) -> String { 129 let mut s = format!("cumTSN={}", cumulative_tsn); 130 for b in self.get_gap_ack_blocks(cumulative_tsn) { 131 s += format!(",{}-{}", b.start, b.end).as_str(); 132 } 133 s 134 } 135 136 pub(crate) fn mark_as_acked(&mut self, tsn: u32) -> usize { 137 let n_bytes_acked = if let Some(c) = self.chunk_map.get_mut(&tsn) { 138 c.acked = true; 139 c.retransmit = false; 140 let n = c.user_data.len(); 141 self.n_bytes -= n; 142 c.user_data.clear(); 143 n 144 } else { 145 0 146 }; 147 148 n_bytes_acked 149 } 150 151 pub(crate) fn get_last_tsn_received(&self) -> Option<&u32> { 152 self.sorted.last() 153 } 154 155 pub(crate) fn mark_all_to_retrasmit(&mut self) { 156 for c in self.chunk_map.values_mut() { 157 if c.acked || c.abandoned() { 158 continue; 159 } 160 c.retransmit = true; 161 } 162 } 163 164 pub(crate) fn get_num_bytes(&self) -> usize { 165 self.n_bytes 166 } 167 168 pub(crate) fn len(&self) -> usize { 169 assert_eq!(self.chunk_map.len(), self.length.load(Ordering::SeqCst)); 170 self.chunk_map.len() 171 } 172 173 pub(crate) fn is_empty(&self) -> bool { 174 self.len() == 0 175 } 176 } 177