xref: /webrtc/sctp/src/queue/payload_queue.rs (revision 5d8fe953)
1ffe74184SMartin Algesten use crate::chunk::chunk_payload_data::ChunkPayloadData;
2ffe74184SMartin Algesten use crate::chunk::chunk_selective_ack::GapAckBlock;
3ffe74184SMartin Algesten use crate::util::*;
4ffe74184SMartin Algesten 
5225cec03SMoritz Borcherding use std::collections::{HashMap, VecDeque};
6ffe74184SMartin Algesten use std::sync::atomic::{AtomicUsize, Ordering};
7ffe74184SMartin Algesten use std::sync::Arc;
8ffe74184SMartin Algesten 
9ffe74184SMartin Algesten #[derive(Default, Debug)]
10ffe74184SMartin Algesten pub(crate) struct PayloadQueue {
11ffe74184SMartin Algesten     pub(crate) length: Arc<AtomicUsize>,
12ffe74184SMartin Algesten     pub(crate) chunk_map: HashMap<u32, ChunkPayloadData>,
13225cec03SMoritz Borcherding     pub(crate) sorted: VecDeque<u32>,
14ffe74184SMartin Algesten     pub(crate) dup_tsn: Vec<u32>,
15ffe74184SMartin Algesten     pub(crate) n_bytes: usize,
16ffe74184SMartin Algesten }
17ffe74184SMartin Algesten 
18ffe74184SMartin Algesten impl PayloadQueue {
new(length: Arc<AtomicUsize>) -> Self19ffe74184SMartin Algesten     pub(crate) fn new(length: Arc<AtomicUsize>) -> Self {
20ffe74184SMartin Algesten         length.store(0, Ordering::SeqCst);
21ffe74184SMartin Algesten         PayloadQueue {
22ffe74184SMartin Algesten             length,
23ffe74184SMartin Algesten             ..Default::default()
24ffe74184SMartin Algesten         }
25ffe74184SMartin Algesten     }
26ffe74184SMartin Algesten 
can_push(&self, p: &ChunkPayloadData, cumulative_tsn: u32) -> bool27ffe74184SMartin Algesten     pub(crate) fn can_push(&self, p: &ChunkPayloadData, cumulative_tsn: u32) -> bool {
28ffe74184SMartin Algesten         !(self.chunk_map.contains_key(&p.tsn) || sna32lte(p.tsn, cumulative_tsn))
29ffe74184SMartin Algesten     }
30ffe74184SMartin Algesten 
push_no_check(&mut self, p: ChunkPayloadData)31ffe74184SMartin Algesten     pub(crate) fn push_no_check(&mut self, p: ChunkPayloadData) {
32225cec03SMoritz Borcherding         let tsn = p.tsn;
33ffe74184SMartin Algesten         self.n_bytes += p.user_data.len();
34225cec03SMoritz Borcherding         self.chunk_map.insert(tsn, p);
35ffe74184SMartin Algesten         self.length.fetch_add(1, Ordering::SeqCst);
36225cec03SMoritz Borcherding 
37225cec03SMoritz Borcherding         if self.sorted.is_empty() || sna32gt(tsn, *self.sorted.back().unwrap()) {
38225cec03SMoritz Borcherding             self.sorted.push_back(tsn);
39225cec03SMoritz Borcherding         } else if sna32lt(tsn, *self.sorted.front().unwrap()) {
40225cec03SMoritz Borcherding             self.sorted.push_front(tsn);
41225cec03SMoritz Borcherding         } else {
42225cec03SMoritz Borcherding             fn compare_tsn(a: u32, b: u32) -> std::cmp::Ordering {
43225cec03SMoritz Borcherding                 if sna32lt(a, b) {
44225cec03SMoritz Borcherding                     std::cmp::Ordering::Less
45225cec03SMoritz Borcherding                 } else {
46225cec03SMoritz Borcherding                     std::cmp::Ordering::Greater
47225cec03SMoritz Borcherding                 }
48225cec03SMoritz Borcherding             }
49225cec03SMoritz Borcherding             let pos = match self
50225cec03SMoritz Borcherding                 .sorted
51225cec03SMoritz Borcherding                 .binary_search_by(|element| compare_tsn(*element, tsn))
52225cec03SMoritz Borcherding             {
53225cec03SMoritz Borcherding                 Ok(pos) => pos,
54225cec03SMoritz Borcherding                 Err(pos) => pos,
55225cec03SMoritz Borcherding             };
56225cec03SMoritz Borcherding             self.sorted.insert(pos, tsn);
57225cec03SMoritz Borcherding         }
58ffe74184SMartin Algesten     }
59ffe74184SMartin Algesten 
60ffe74184SMartin Algesten     /// push pushes a payload data. If the payload data is already in our queue or
61ffe74184SMartin Algesten     /// older than our cumulative_tsn marker, it will be recored as duplications,
62ffe74184SMartin Algesten     /// which can later be retrieved using popDuplicates.
push(&mut self, p: ChunkPayloadData, cumulative_tsn: u32) -> bool63ffe74184SMartin Algesten     pub(crate) fn push(&mut self, p: ChunkPayloadData, cumulative_tsn: u32) -> bool {
64ffe74184SMartin Algesten         let ok = self.chunk_map.contains_key(&p.tsn);
65ffe74184SMartin Algesten         if ok || sna32lte(p.tsn, cumulative_tsn) {
66ffe74184SMartin Algesten             // Found the packet, log in dups
67ffe74184SMartin Algesten             self.dup_tsn.push(p.tsn);
68ffe74184SMartin Algesten             return false;
69ffe74184SMartin Algesten         }
70ffe74184SMartin Algesten 
71225cec03SMoritz Borcherding         self.push_no_check(p);
72ffe74184SMartin Algesten         true
73ffe74184SMartin Algesten     }
74ffe74184SMartin Algesten 
75ffe74184SMartin Algesten     /// pop pops only if the oldest chunk's TSN matches the given TSN.
pop(&mut self, tsn: u32) -> Option<ChunkPayloadData>76ffe74184SMartin Algesten     pub(crate) fn pop(&mut self, tsn: u32) -> Option<ChunkPayloadData> {
77225cec03SMoritz Borcherding         if Some(&tsn) == self.sorted.front() {
78225cec03SMoritz Borcherding             self.sorted.pop_front();
79ffe74184SMartin Algesten             if let Some(c) = self.chunk_map.remove(&tsn) {
80ffe74184SMartin Algesten                 self.length.fetch_sub(1, Ordering::SeqCst);
81ffe74184SMartin Algesten                 self.n_bytes -= c.user_data.len();
82ffe74184SMartin Algesten                 return Some(c);
83ffe74184SMartin Algesten             }
84ffe74184SMartin Algesten         }
85ffe74184SMartin Algesten 
86ffe74184SMartin Algesten         None
87ffe74184SMartin Algesten     }
88ffe74184SMartin Algesten 
89ffe74184SMartin Algesten     /// get returns reference to chunkPayloadData with the given TSN value.
get(&self, tsn: u32) -> Option<&ChunkPayloadData>90ffe74184SMartin Algesten     pub(crate) fn get(&self, tsn: u32) -> Option<&ChunkPayloadData> {
91ffe74184SMartin Algesten         self.chunk_map.get(&tsn)
92ffe74184SMartin Algesten     }
get_mut(&mut self, tsn: u32) -> Option<&mut ChunkPayloadData>93ffe74184SMartin Algesten     pub(crate) fn get_mut(&mut self, tsn: u32) -> Option<&mut ChunkPayloadData> {
94ffe74184SMartin Algesten         self.chunk_map.get_mut(&tsn)
95ffe74184SMartin Algesten     }
96ffe74184SMartin Algesten 
97ffe74184SMartin Algesten     /// popDuplicates returns an array of TSN values that were found duplicate.
pop_duplicates(&mut self) -> Vec<u32>98ffe74184SMartin Algesten     pub(crate) fn pop_duplicates(&mut self) -> Vec<u32> {
99ffe74184SMartin Algesten         self.dup_tsn.drain(..).collect()
100ffe74184SMartin Algesten     }
101ffe74184SMartin Algesten 
get_gap_ack_blocks(&self, cumulative_tsn: u32) -> Vec<GapAckBlock>102ffe74184SMartin Algesten     pub(crate) fn get_gap_ack_blocks(&self, cumulative_tsn: u32) -> Vec<GapAckBlock> {
103ffe74184SMartin Algesten         if self.chunk_map.is_empty() {
104ffe74184SMartin Algesten             return vec![];
105ffe74184SMartin Algesten         }
106ffe74184SMartin Algesten 
107ffe74184SMartin Algesten         let mut b = GapAckBlock::default();
108ffe74184SMartin Algesten         let mut gap_ack_blocks = vec![];
109ffe74184SMartin Algesten         for (i, tsn) in self.sorted.iter().enumerate() {
110ffe74184SMartin Algesten             let diff = if *tsn >= cumulative_tsn {
111ffe74184SMartin Algesten                 (*tsn - cumulative_tsn) as u16
112ffe74184SMartin Algesten             } else {
113ffe74184SMartin Algesten                 0
114ffe74184SMartin Algesten             };
115ffe74184SMartin Algesten 
116ffe74184SMartin Algesten             if i == 0 {
117ffe74184SMartin Algesten                 b.start = diff;
118ffe74184SMartin Algesten                 b.end = b.start;
119ffe74184SMartin Algesten             } else if b.end + 1 == diff {
120ffe74184SMartin Algesten                 b.end += 1;
121ffe74184SMartin Algesten             } else {
122ffe74184SMartin Algesten                 gap_ack_blocks.push(b);
123ffe74184SMartin Algesten 
124ffe74184SMartin Algesten                 b.start = diff;
125ffe74184SMartin Algesten                 b.end = diff;
126ffe74184SMartin Algesten             }
127ffe74184SMartin Algesten         }
128ffe74184SMartin Algesten 
129ffe74184SMartin Algesten         gap_ack_blocks.push(b);
130ffe74184SMartin Algesten 
131ffe74184SMartin Algesten         gap_ack_blocks
132ffe74184SMartin Algesten     }
133ffe74184SMartin Algesten 
get_gap_ack_blocks_string(&self, cumulative_tsn: u32) -> String134ffe74184SMartin Algesten     pub(crate) fn get_gap_ack_blocks_string(&self, cumulative_tsn: u32) -> String {
135*5d8fe953SJoão Oliveira         let mut s = format!("cumTSN={cumulative_tsn}");
136ffe74184SMartin Algesten         for b in self.get_gap_ack_blocks(cumulative_tsn) {
137ffe74184SMartin Algesten             s += format!(",{}-{}", b.start, b.end).as_str();
138ffe74184SMartin Algesten         }
139ffe74184SMartin Algesten         s
140ffe74184SMartin Algesten     }
141ffe74184SMartin Algesten 
mark_as_acked(&mut self, tsn: u32) -> usize142ffe74184SMartin Algesten     pub(crate) fn mark_as_acked(&mut self, tsn: u32) -> usize {
143ffe74184SMartin Algesten         let n_bytes_acked = if let Some(c) = self.chunk_map.get_mut(&tsn) {
144ffe74184SMartin Algesten             c.acked = true;
145ffe74184SMartin Algesten             c.retransmit = false;
146ffe74184SMartin Algesten             let n = c.user_data.len();
147ffe74184SMartin Algesten             self.n_bytes -= n;
148ffe74184SMartin Algesten             c.user_data.clear();
149ffe74184SMartin Algesten             n
150ffe74184SMartin Algesten         } else {
151ffe74184SMartin Algesten             0
152ffe74184SMartin Algesten         };
153ffe74184SMartin Algesten 
154ffe74184SMartin Algesten         n_bytes_acked
155ffe74184SMartin Algesten     }
156ffe74184SMartin Algesten 
get_last_tsn_received(&self) -> Option<&u32>157ffe74184SMartin Algesten     pub(crate) fn get_last_tsn_received(&self) -> Option<&u32> {
158225cec03SMoritz Borcherding         self.sorted.back()
159ffe74184SMartin Algesten     }
160ffe74184SMartin Algesten 
mark_all_to_retrasmit(&mut self)161ffe74184SMartin Algesten     pub(crate) fn mark_all_to_retrasmit(&mut self) {
162ffe74184SMartin Algesten         for c in self.chunk_map.values_mut() {
163ffe74184SMartin Algesten             if c.acked || c.abandoned() {
164ffe74184SMartin Algesten                 continue;
165ffe74184SMartin Algesten             }
166ffe74184SMartin Algesten             c.retransmit = true;
167ffe74184SMartin Algesten         }
168ffe74184SMartin Algesten     }
169ffe74184SMartin Algesten 
get_num_bytes(&self) -> usize170ffe74184SMartin Algesten     pub(crate) fn get_num_bytes(&self) -> usize {
171ffe74184SMartin Algesten         self.n_bytes
172ffe74184SMartin Algesten     }
173ffe74184SMartin Algesten 
len(&self) -> usize174ffe74184SMartin Algesten     pub(crate) fn len(&self) -> usize {
175ffe74184SMartin Algesten         assert_eq!(self.chunk_map.len(), self.length.load(Ordering::SeqCst));
176ffe74184SMartin Algesten         self.chunk_map.len()
177ffe74184SMartin Algesten     }
178ffe74184SMartin Algesten 
is_empty(&self) -> bool179ffe74184SMartin Algesten     pub(crate) fn is_empty(&self) -> bool {
180ffe74184SMartin Algesten         self.len() == 0
181ffe74184SMartin Algesten     }
182ffe74184SMartin Algesten }
183