xref: /webrtc/sctp/src/queue/payload_queue.rs (revision ffe74184)
1 use crate::chunk::chunk_payload_data::ChunkPayloadData;
2 use crate::chunk::chunk_selective_ack::GapAckBlock;
3 use crate::util::*;
4 
5 use std::collections::HashMap;
6 use std::sync::atomic::{AtomicUsize, Ordering};
7 use std::sync::Arc;
8 
9 #[derive(Default, Debug)]
10 pub(crate) struct PayloadQueue {
11     pub(crate) length: Arc<AtomicUsize>,
12     pub(crate) chunk_map: HashMap<u32, ChunkPayloadData>,
13     pub(crate) sorted: Vec<u32>,
14     pub(crate) dup_tsn: Vec<u32>,
15     pub(crate) n_bytes: usize,
16 }
17 
18 impl PayloadQueue {
19     pub(crate) fn new(length: Arc<AtomicUsize>) -> Self {
20         length.store(0, Ordering::SeqCst);
21         PayloadQueue {
22             length,
23             ..Default::default()
24         }
25     }
26 
27     pub(crate) fn update_sorted_keys(&mut self) {
28         self.sorted.sort_by(|a, b| {
29             if sna32lt(*a, *b) {
30                 std::cmp::Ordering::Less
31             } else {
32                 std::cmp::Ordering::Greater
33             }
34         });
35     }
36 
37     pub(crate) fn can_push(&self, p: &ChunkPayloadData, cumulative_tsn: u32) -> bool {
38         !(self.chunk_map.contains_key(&p.tsn) || sna32lte(p.tsn, cumulative_tsn))
39     }
40 
41     pub(crate) fn push_no_check(&mut self, p: ChunkPayloadData) {
42         self.n_bytes += p.user_data.len();
43         self.sorted.push(p.tsn);
44         self.chunk_map.insert(p.tsn, p);
45         self.length.fetch_add(1, Ordering::SeqCst);
46         self.update_sorted_keys();
47     }
48 
49     /// push pushes a payload data. If the payload data is already in our queue or
50     /// older than our cumulative_tsn marker, it will be recored as duplications,
51     /// which can later be retrieved using popDuplicates.
52     pub(crate) fn push(&mut self, p: ChunkPayloadData, cumulative_tsn: u32) -> bool {
53         let ok = self.chunk_map.contains_key(&p.tsn);
54         if ok || sna32lte(p.tsn, cumulative_tsn) {
55             // Found the packet, log in dups
56             self.dup_tsn.push(p.tsn);
57             return false;
58         }
59 
60         self.n_bytes += p.user_data.len();
61         self.sorted.push(p.tsn);
62         self.chunk_map.insert(p.tsn, p);
63         self.length.fetch_add(1, Ordering::SeqCst);
64         self.update_sorted_keys();
65 
66         true
67     }
68 
69     /// pop pops only if the oldest chunk's TSN matches the given TSN.
70     pub(crate) fn pop(&mut self, tsn: u32) -> Option<ChunkPayloadData> {
71         if !self.sorted.is_empty() && tsn == self.sorted[0] {
72             self.sorted.remove(0);
73             if let Some(c) = self.chunk_map.remove(&tsn) {
74                 self.length.fetch_sub(1, Ordering::SeqCst);
75                 self.n_bytes -= c.user_data.len();
76                 return Some(c);
77             }
78         }
79 
80         None
81     }
82 
83     /// get returns reference to chunkPayloadData with the given TSN value.
84     pub(crate) fn get(&self, tsn: u32) -> Option<&ChunkPayloadData> {
85         self.chunk_map.get(&tsn)
86     }
87     pub(crate) fn get_mut(&mut self, tsn: u32) -> Option<&mut ChunkPayloadData> {
88         self.chunk_map.get_mut(&tsn)
89     }
90 
91     /// popDuplicates returns an array of TSN values that were found duplicate.
92     pub(crate) fn pop_duplicates(&mut self) -> Vec<u32> {
93         self.dup_tsn.drain(..).collect()
94     }
95 
96     pub(crate) fn get_gap_ack_blocks(&self, cumulative_tsn: u32) -> Vec<GapAckBlock> {
97         if self.chunk_map.is_empty() {
98             return vec![];
99         }
100 
101         let mut b = GapAckBlock::default();
102         let mut gap_ack_blocks = vec![];
103         for (i, tsn) in self.sorted.iter().enumerate() {
104             let diff = if *tsn >= cumulative_tsn {
105                 (*tsn - cumulative_tsn) as u16
106             } else {
107                 0
108             };
109 
110             if i == 0 {
111                 b.start = diff;
112                 b.end = b.start;
113             } else if b.end + 1 == diff {
114                 b.end += 1;
115             } else {
116                 gap_ack_blocks.push(b);
117 
118                 b.start = diff;
119                 b.end = diff;
120             }
121         }
122 
123         gap_ack_blocks.push(b);
124 
125         gap_ack_blocks
126     }
127 
128     pub(crate) fn get_gap_ack_blocks_string(&self, cumulative_tsn: u32) -> String {
129         let mut s = format!("cumTSN={}", cumulative_tsn);
130         for b in self.get_gap_ack_blocks(cumulative_tsn) {
131             s += format!(",{}-{}", b.start, b.end).as_str();
132         }
133         s
134     }
135 
136     pub(crate) fn mark_as_acked(&mut self, tsn: u32) -> usize {
137         let n_bytes_acked = if let Some(c) = self.chunk_map.get_mut(&tsn) {
138             c.acked = true;
139             c.retransmit = false;
140             let n = c.user_data.len();
141             self.n_bytes -= n;
142             c.user_data.clear();
143             n
144         } else {
145             0
146         };
147 
148         n_bytes_acked
149     }
150 
151     pub(crate) fn get_last_tsn_received(&self) -> Option<&u32> {
152         self.sorted.last()
153     }
154 
155     pub(crate) fn mark_all_to_retrasmit(&mut self) {
156         for c in self.chunk_map.values_mut() {
157             if c.acked || c.abandoned() {
158                 continue;
159             }
160             c.retransmit = true;
161         }
162     }
163 
164     pub(crate) fn get_num_bytes(&self) -> usize {
165         self.n_bytes
166     }
167 
168     pub(crate) fn len(&self) -> usize {
169         assert_eq!(self.chunk_map.len(), self.length.load(Ordering::SeqCst));
170         self.chunk_map.len()
171     }
172 
173     pub(crate) fn is_empty(&self) -> bool {
174         self.len() == 0
175     }
176 }
177