1ffe74184SMartin Algesten use crate::chunk::chunk_payload_data::ChunkPayloadData; 2ffe74184SMartin Algesten use crate::chunk::chunk_selective_ack::GapAckBlock; 3ffe74184SMartin Algesten use crate::util::*; 4ffe74184SMartin Algesten 5225cec03SMoritz Borcherding use std::collections::{HashMap, VecDeque}; 6ffe74184SMartin Algesten use std::sync::atomic::{AtomicUsize, Ordering}; 7ffe74184SMartin Algesten use std::sync::Arc; 8ffe74184SMartin Algesten 9ffe74184SMartin Algesten #[derive(Default, Debug)] 10ffe74184SMartin Algesten pub(crate) struct PayloadQueue { 11ffe74184SMartin Algesten pub(crate) length: Arc<AtomicUsize>, 12ffe74184SMartin Algesten pub(crate) chunk_map: HashMap<u32, ChunkPayloadData>, 13225cec03SMoritz Borcherding pub(crate) sorted: VecDeque<u32>, 14ffe74184SMartin Algesten pub(crate) dup_tsn: Vec<u32>, 15ffe74184SMartin Algesten pub(crate) n_bytes: usize, 16ffe74184SMartin Algesten } 17ffe74184SMartin Algesten 18ffe74184SMartin Algesten impl PayloadQueue { new(length: Arc<AtomicUsize>) -> Self19ffe74184SMartin Algesten pub(crate) fn new(length: Arc<AtomicUsize>) -> Self { 20ffe74184SMartin Algesten length.store(0, Ordering::SeqCst); 21ffe74184SMartin Algesten PayloadQueue { 22ffe74184SMartin Algesten length, 23ffe74184SMartin Algesten ..Default::default() 24ffe74184SMartin Algesten } 25ffe74184SMartin Algesten } 26ffe74184SMartin Algesten can_push(&self, p: &ChunkPayloadData, cumulative_tsn: u32) -> bool27ffe74184SMartin Algesten pub(crate) fn can_push(&self, p: &ChunkPayloadData, cumulative_tsn: u32) -> bool { 28ffe74184SMartin Algesten !(self.chunk_map.contains_key(&p.tsn) || sna32lte(p.tsn, cumulative_tsn)) 29ffe74184SMartin Algesten } 30ffe74184SMartin Algesten push_no_check(&mut self, p: ChunkPayloadData)31ffe74184SMartin Algesten pub(crate) fn push_no_check(&mut self, p: ChunkPayloadData) { 32225cec03SMoritz Borcherding let tsn = p.tsn; 33ffe74184SMartin Algesten self.n_bytes += p.user_data.len(); 34225cec03SMoritz Borcherding self.chunk_map.insert(tsn, p); 35ffe74184SMartin Algesten self.length.fetch_add(1, Ordering::SeqCst); 36225cec03SMoritz Borcherding 37225cec03SMoritz Borcherding if self.sorted.is_empty() || sna32gt(tsn, *self.sorted.back().unwrap()) { 38225cec03SMoritz Borcherding self.sorted.push_back(tsn); 39225cec03SMoritz Borcherding } else if sna32lt(tsn, *self.sorted.front().unwrap()) { 40225cec03SMoritz Borcherding self.sorted.push_front(tsn); 41225cec03SMoritz Borcherding } else { 42225cec03SMoritz Borcherding fn compare_tsn(a: u32, b: u32) -> std::cmp::Ordering { 43225cec03SMoritz Borcherding if sna32lt(a, b) { 44225cec03SMoritz Borcherding std::cmp::Ordering::Less 45225cec03SMoritz Borcherding } else { 46225cec03SMoritz Borcherding std::cmp::Ordering::Greater 47225cec03SMoritz Borcherding } 48225cec03SMoritz Borcherding } 49225cec03SMoritz Borcherding let pos = match self 50225cec03SMoritz Borcherding .sorted 51225cec03SMoritz Borcherding .binary_search_by(|element| compare_tsn(*element, tsn)) 52225cec03SMoritz Borcherding { 53225cec03SMoritz Borcherding Ok(pos) => pos, 54225cec03SMoritz Borcherding Err(pos) => pos, 55225cec03SMoritz Borcherding }; 56225cec03SMoritz Borcherding self.sorted.insert(pos, tsn); 57225cec03SMoritz Borcherding } 58ffe74184SMartin Algesten } 59ffe74184SMartin Algesten 60ffe74184SMartin Algesten /// push pushes a payload data. If the payload data is already in our queue or 61ffe74184SMartin Algesten /// older than our cumulative_tsn marker, it will be recored as duplications, 62ffe74184SMartin Algesten /// which can later be retrieved using popDuplicates. push(&mut self, p: ChunkPayloadData, cumulative_tsn: u32) -> bool63ffe74184SMartin Algesten pub(crate) fn push(&mut self, p: ChunkPayloadData, cumulative_tsn: u32) -> bool { 64ffe74184SMartin Algesten let ok = self.chunk_map.contains_key(&p.tsn); 65ffe74184SMartin Algesten if ok || sna32lte(p.tsn, cumulative_tsn) { 66ffe74184SMartin Algesten // Found the packet, log in dups 67ffe74184SMartin Algesten self.dup_tsn.push(p.tsn); 68ffe74184SMartin Algesten return false; 69ffe74184SMartin Algesten } 70ffe74184SMartin Algesten 71225cec03SMoritz Borcherding self.push_no_check(p); 72ffe74184SMartin Algesten true 73ffe74184SMartin Algesten } 74ffe74184SMartin Algesten 75ffe74184SMartin Algesten /// pop pops only if the oldest chunk's TSN matches the given TSN. pop(&mut self, tsn: u32) -> Option<ChunkPayloadData>76ffe74184SMartin Algesten pub(crate) fn pop(&mut self, tsn: u32) -> Option<ChunkPayloadData> { 77225cec03SMoritz Borcherding if Some(&tsn) == self.sorted.front() { 78225cec03SMoritz Borcherding self.sorted.pop_front(); 79ffe74184SMartin Algesten if let Some(c) = self.chunk_map.remove(&tsn) { 80ffe74184SMartin Algesten self.length.fetch_sub(1, Ordering::SeqCst); 81ffe74184SMartin Algesten self.n_bytes -= c.user_data.len(); 82ffe74184SMartin Algesten return Some(c); 83ffe74184SMartin Algesten } 84ffe74184SMartin Algesten } 85ffe74184SMartin Algesten 86ffe74184SMartin Algesten None 87ffe74184SMartin Algesten } 88ffe74184SMartin Algesten 89ffe74184SMartin Algesten /// get returns reference to chunkPayloadData with the given TSN value. get(&self, tsn: u32) -> Option<&ChunkPayloadData>90ffe74184SMartin Algesten pub(crate) fn get(&self, tsn: u32) -> Option<&ChunkPayloadData> { 91ffe74184SMartin Algesten self.chunk_map.get(&tsn) 92ffe74184SMartin Algesten } get_mut(&mut self, tsn: u32) -> Option<&mut ChunkPayloadData>93ffe74184SMartin Algesten pub(crate) fn get_mut(&mut self, tsn: u32) -> Option<&mut ChunkPayloadData> { 94ffe74184SMartin Algesten self.chunk_map.get_mut(&tsn) 95ffe74184SMartin Algesten } 96ffe74184SMartin Algesten 97ffe74184SMartin Algesten /// popDuplicates returns an array of TSN values that were found duplicate. pop_duplicates(&mut self) -> Vec<u32>98ffe74184SMartin Algesten pub(crate) fn pop_duplicates(&mut self) -> Vec<u32> { 99ffe74184SMartin Algesten self.dup_tsn.drain(..).collect() 100ffe74184SMartin Algesten } 101ffe74184SMartin Algesten get_gap_ack_blocks(&self, cumulative_tsn: u32) -> Vec<GapAckBlock>102ffe74184SMartin Algesten pub(crate) fn get_gap_ack_blocks(&self, cumulative_tsn: u32) -> Vec<GapAckBlock> { 103ffe74184SMartin Algesten if self.chunk_map.is_empty() { 104ffe74184SMartin Algesten return vec![]; 105ffe74184SMartin Algesten } 106ffe74184SMartin Algesten 107ffe74184SMartin Algesten let mut b = GapAckBlock::default(); 108ffe74184SMartin Algesten let mut gap_ack_blocks = vec![]; 109ffe74184SMartin Algesten for (i, tsn) in self.sorted.iter().enumerate() { 110ffe74184SMartin Algesten let diff = if *tsn >= cumulative_tsn { 111ffe74184SMartin Algesten (*tsn - cumulative_tsn) as u16 112ffe74184SMartin Algesten } else { 113ffe74184SMartin Algesten 0 114ffe74184SMartin Algesten }; 115ffe74184SMartin Algesten 116ffe74184SMartin Algesten if i == 0 { 117ffe74184SMartin Algesten b.start = diff; 118ffe74184SMartin Algesten b.end = b.start; 119ffe74184SMartin Algesten } else if b.end + 1 == diff { 120ffe74184SMartin Algesten b.end += 1; 121ffe74184SMartin Algesten } else { 122ffe74184SMartin Algesten gap_ack_blocks.push(b); 123ffe74184SMartin Algesten 124ffe74184SMartin Algesten b.start = diff; 125ffe74184SMartin Algesten b.end = diff; 126ffe74184SMartin Algesten } 127ffe74184SMartin Algesten } 128ffe74184SMartin Algesten 129ffe74184SMartin Algesten gap_ack_blocks.push(b); 130ffe74184SMartin Algesten 131ffe74184SMartin Algesten gap_ack_blocks 132ffe74184SMartin Algesten } 133ffe74184SMartin Algesten get_gap_ack_blocks_string(&self, cumulative_tsn: u32) -> String134ffe74184SMartin Algesten pub(crate) fn get_gap_ack_blocks_string(&self, cumulative_tsn: u32) -> String { 135*5d8fe953SJoão Oliveira let mut s = format!("cumTSN={cumulative_tsn}"); 136ffe74184SMartin Algesten for b in self.get_gap_ack_blocks(cumulative_tsn) { 137ffe74184SMartin Algesten s += format!(",{}-{}", b.start, b.end).as_str(); 138ffe74184SMartin Algesten } 139ffe74184SMartin Algesten s 140ffe74184SMartin Algesten } 141ffe74184SMartin Algesten mark_as_acked(&mut self, tsn: u32) -> usize142ffe74184SMartin Algesten pub(crate) fn mark_as_acked(&mut self, tsn: u32) -> usize { 143ffe74184SMartin Algesten let n_bytes_acked = if let Some(c) = self.chunk_map.get_mut(&tsn) { 144ffe74184SMartin Algesten c.acked = true; 145ffe74184SMartin Algesten c.retransmit = false; 146ffe74184SMartin Algesten let n = c.user_data.len(); 147ffe74184SMartin Algesten self.n_bytes -= n; 148ffe74184SMartin Algesten c.user_data.clear(); 149ffe74184SMartin Algesten n 150ffe74184SMartin Algesten } else { 151ffe74184SMartin Algesten 0 152ffe74184SMartin Algesten }; 153ffe74184SMartin Algesten 154ffe74184SMartin Algesten n_bytes_acked 155ffe74184SMartin Algesten } 156ffe74184SMartin Algesten get_last_tsn_received(&self) -> Option<&u32>157ffe74184SMartin Algesten pub(crate) fn get_last_tsn_received(&self) -> Option<&u32> { 158225cec03SMoritz Borcherding self.sorted.back() 159ffe74184SMartin Algesten } 160ffe74184SMartin Algesten mark_all_to_retrasmit(&mut self)161ffe74184SMartin Algesten pub(crate) fn mark_all_to_retrasmit(&mut self) { 162ffe74184SMartin Algesten for c in self.chunk_map.values_mut() { 163ffe74184SMartin Algesten if c.acked || c.abandoned() { 164ffe74184SMartin Algesten continue; 165ffe74184SMartin Algesten } 166ffe74184SMartin Algesten c.retransmit = true; 167ffe74184SMartin Algesten } 168ffe74184SMartin Algesten } 169ffe74184SMartin Algesten get_num_bytes(&self) -> usize170ffe74184SMartin Algesten pub(crate) fn get_num_bytes(&self) -> usize { 171ffe74184SMartin Algesten self.n_bytes 172ffe74184SMartin Algesten } 173ffe74184SMartin Algesten len(&self) -> usize174ffe74184SMartin Algesten pub(crate) fn len(&self) -> usize { 175ffe74184SMartin Algesten assert_eq!(self.chunk_map.len(), self.length.load(Ordering::SeqCst)); 176ffe74184SMartin Algesten self.chunk_map.len() 177ffe74184SMartin Algesten } 178ffe74184SMartin Algesten is_empty(&self) -> bool179ffe74184SMartin Algesten pub(crate) fn is_empty(&self) -> bool { 180ffe74184SMartin Algesten self.len() == 0 181ffe74184SMartin Algesten } 182ffe74184SMartin Algesten } 183