xref: /webrtc/sctp/src/queue/payload_queue.rs (revision 83f2d1bb)
1 use crate::chunk::chunk_payload_data::ChunkPayloadData;
2 use crate::chunk::chunk_selective_ack::GapAckBlock;
3 use crate::util::*;
4 
5 use std::collections::{HashMap, VecDeque};
6 use std::sync::atomic::{AtomicUsize, Ordering};
7 use std::sync::Arc;
8 
9 #[derive(Default, Debug)]
10 pub(crate) struct PayloadQueue {
11     pub(crate) length: Arc<AtomicUsize>,
12     pub(crate) chunk_map: HashMap<u32, ChunkPayloadData>,
13     pub(crate) sorted: VecDeque<u32>,
14     pub(crate) dup_tsn: Vec<u32>,
15     pub(crate) n_bytes: usize,
16 }
17 
18 impl PayloadQueue {
19     pub(crate) fn new(length: Arc<AtomicUsize>) -> Self {
20         length.store(0, Ordering::SeqCst);
21         PayloadQueue {
22             length,
23             ..Default::default()
24         }
25     }
26 
27     pub(crate) fn can_push(&self, p: &ChunkPayloadData, cumulative_tsn: u32) -> bool {
28         !(self.chunk_map.contains_key(&p.tsn) || sna32lte(p.tsn, cumulative_tsn))
29     }
30 
31     pub(crate) fn push_no_check(&mut self, p: ChunkPayloadData) {
32         let tsn = p.tsn;
33         self.n_bytes += p.user_data.len();
34         self.chunk_map.insert(tsn, p);
35         self.length.fetch_add(1, Ordering::SeqCst);
36 
37         if self.sorted.is_empty() || sna32gt(tsn, *self.sorted.back().unwrap()) {
38             self.sorted.push_back(tsn);
39         } else if sna32lt(tsn, *self.sorted.front().unwrap()) {
40             self.sorted.push_front(tsn);
41         } else {
42             fn compare_tsn(a: u32, b: u32) -> std::cmp::Ordering {
43                 if sna32lt(a, b) {
44                     std::cmp::Ordering::Less
45                 } else {
46                     std::cmp::Ordering::Greater
47                 }
48             }
49             let pos = match self
50                 .sorted
51                 .binary_search_by(|element| compare_tsn(*element, tsn))
52             {
53                 Ok(pos) => pos,
54                 Err(pos) => pos,
55             };
56             self.sorted.insert(pos, tsn);
57         }
58     }
59 
60     /// push pushes a payload data. If the payload data is already in our queue or
61     /// older than our cumulative_tsn marker, it will be recored as duplications,
62     /// which can later be retrieved using popDuplicates.
63     pub(crate) fn push(&mut self, p: ChunkPayloadData, cumulative_tsn: u32) -> bool {
64         let ok = self.chunk_map.contains_key(&p.tsn);
65         if ok || sna32lte(p.tsn, cumulative_tsn) {
66             // Found the packet, log in dups
67             self.dup_tsn.push(p.tsn);
68             return false;
69         }
70 
71         self.push_no_check(p);
72         true
73     }
74 
75     /// pop pops only if the oldest chunk's TSN matches the given TSN.
76     pub(crate) fn pop(&mut self, tsn: u32) -> Option<ChunkPayloadData> {
77         if Some(&tsn) == self.sorted.front() {
78             self.sorted.pop_front();
79             if let Some(c) = self.chunk_map.remove(&tsn) {
80                 self.length.fetch_sub(1, Ordering::SeqCst);
81                 self.n_bytes -= c.user_data.len();
82                 return Some(c);
83             }
84         }
85 
86         None
87     }
88 
89     /// get returns reference to chunkPayloadData with the given TSN value.
90     pub(crate) fn get(&self, tsn: u32) -> Option<&ChunkPayloadData> {
91         self.chunk_map.get(&tsn)
92     }
93     pub(crate) fn get_mut(&mut self, tsn: u32) -> Option<&mut ChunkPayloadData> {
94         self.chunk_map.get_mut(&tsn)
95     }
96 
97     /// popDuplicates returns an array of TSN values that were found duplicate.
98     pub(crate) fn pop_duplicates(&mut self) -> Vec<u32> {
99         self.dup_tsn.drain(..).collect()
100     }
101 
102     pub(crate) fn get_gap_ack_blocks(&self, cumulative_tsn: u32) -> Vec<GapAckBlock> {
103         if self.chunk_map.is_empty() {
104             return vec![];
105         }
106 
107         let mut b = GapAckBlock::default();
108         let mut gap_ack_blocks = vec![];
109         for (i, tsn) in self.sorted.iter().enumerate() {
110             let diff = if *tsn >= cumulative_tsn {
111                 (*tsn - cumulative_tsn) as u16
112             } else {
113                 0
114             };
115 
116             if i == 0 {
117                 b.start = diff;
118                 b.end = b.start;
119             } else if b.end + 1 == diff {
120                 b.end += 1;
121             } else {
122                 gap_ack_blocks.push(b);
123 
124                 b.start = diff;
125                 b.end = diff;
126             }
127         }
128 
129         gap_ack_blocks.push(b);
130 
131         gap_ack_blocks
132     }
133 
134     pub(crate) fn get_gap_ack_blocks_string(&self, cumulative_tsn: u32) -> String {
135         let mut s = format!("cumTSN={}", cumulative_tsn);
136         for b in self.get_gap_ack_blocks(cumulative_tsn) {
137             s += format!(",{}-{}", b.start, b.end).as_str();
138         }
139         s
140     }
141 
142     pub(crate) fn mark_as_acked(&mut self, tsn: u32) -> usize {
143         let n_bytes_acked = if let Some(c) = self.chunk_map.get_mut(&tsn) {
144             c.acked = true;
145             c.retransmit = false;
146             let n = c.user_data.len();
147             self.n_bytes -= n;
148             c.user_data.clear();
149             n
150         } else {
151             0
152         };
153 
154         n_bytes_acked
155     }
156 
157     pub(crate) fn get_last_tsn_received(&self) -> Option<&u32> {
158         self.sorted.back()
159     }
160 
161     pub(crate) fn mark_all_to_retrasmit(&mut self) {
162         for c in self.chunk_map.values_mut() {
163             if c.acked || c.abandoned() {
164                 continue;
165             }
166             c.retransmit = true;
167         }
168     }
169 
170     pub(crate) fn get_num_bytes(&self) -> usize {
171         self.n_bytes
172     }
173 
174     pub(crate) fn len(&self) -> usize {
175         assert_eq!(self.chunk_map.len(), self.length.load(Ordering::SeqCst));
176         self.chunk_map.len()
177     }
178 
179     pub(crate) fn is_empty(&self) -> bool {
180         self.len() == 0
181     }
182 }
183