xref: /webrtc/sctp/src/queue/pending_queue.rs (revision 254a17e2)
1 use std::{
2     collections::VecDeque,
3     sync::atomic::{AtomicBool, AtomicUsize, Ordering},
4 };
5 
6 use util::sync::RwLock;
7 
8 use crate::chunk::chunk_payload_data::ChunkPayloadData;
9 
10 /// Basic queue for either ordered or unordered chunks.
11 pub(crate) type PendingBaseQueue = VecDeque<ChunkPayloadData>;
12 
13 // TODO: benchmark performance between multiple Atomic+Mutex vs one Mutex<PendingQueueInternal>
14 
15 /// A queue for both ordered and unordered chunks.
16 #[derive(Debug, Default)]
17 pub(crate) struct PendingQueue {
18     unordered_queue: RwLock<PendingBaseQueue>,
19     ordered_queue: RwLock<PendingBaseQueue>,
20     queue_len: AtomicUsize,
21     n_bytes: AtomicUsize,
22     selected: AtomicBool,
23     unordered_is_selected: AtomicBool,
24 }
25 
26 impl PendingQueue {
27     pub(crate) fn new() -> Self {
28         PendingQueue::default()
29     }
30 
31     /// Appends a chunk to the back of the pending queue.
32     pub(crate) fn push(&self, c: ChunkPayloadData) {
33         let user_data_len = c.user_data.len();
34 
35         if c.unordered {
36             let mut unordered_queue = self.unordered_queue.write();
37             unordered_queue.push_back(c);
38         } else {
39             let mut ordered_queue = self.ordered_queue.write();
40             ordered_queue.push_back(c);
41         }
42 
43         self.n_bytes.fetch_add(user_data_len, Ordering::SeqCst);
44         self.queue_len.fetch_add(1, Ordering::SeqCst);
45     }
46 
47     /// Appends chunks to the back of the pending queue.
48     ///
49     /// # Panics
50     ///
51     /// If it's a mix of unordered and ordered chunks.
52     pub(crate) fn append(&self, chunks: Vec<ChunkPayloadData>) {
53         if chunks.is_empty() {
54             return;
55         }
56 
57         let total_user_data_len = chunks.iter().fold(0, |acc, c| acc + c.user_data.len());
58         let chunks_len = chunks.len();
59 
60         let unordered = chunks
61             .first()
62             .expect("chunks to not be empty because of the above check")
63             .unordered;
64         if unordered {
65             let mut unordered_queue = self.unordered_queue.write();
66             for c in chunks {
67                 assert!(c.unordered, "expected all chunks to be unordered");
68                 unordered_queue.push_back(c);
69             }
70         } else {
71             let mut ordered_queue = self.ordered_queue.write();
72             for c in chunks {
73                 assert!(!c.unordered, "expected all chunks to be ordered");
74                 ordered_queue.push_back(c);
75             }
76         }
77 
78         self.n_bytes
79             .fetch_add(total_user_data_len, Ordering::SeqCst);
80         self.queue_len.fetch_add(chunks_len, Ordering::SeqCst);
81     }
82 
83     pub(crate) fn peek(&self) -> Option<ChunkPayloadData> {
84         if self.selected.load(Ordering::SeqCst) {
85             if self.unordered_is_selected.load(Ordering::SeqCst) {
86                 let unordered_queue = self.unordered_queue.read();
87                 return unordered_queue.get(0).cloned();
88             } else {
89                 let ordered_queue = self.ordered_queue.read();
90                 return ordered_queue.get(0).cloned();
91             }
92         }
93 
94         let c = {
95             let unordered_queue = self.unordered_queue.read();
96             unordered_queue.get(0).cloned()
97         };
98 
99         if c.is_some() {
100             return c;
101         }
102 
103         let ordered_queue = self.ordered_queue.read();
104         ordered_queue.get(0).cloned()
105     }
106 
107     pub(crate) fn pop(
108         &self,
109         beginning_fragment: bool,
110         unordered: bool,
111     ) -> Option<ChunkPayloadData> {
112         let popped = if self.selected.load(Ordering::SeqCst) {
113             let popped = if self.unordered_is_selected.load(Ordering::SeqCst) {
114                 let mut unordered_queue = self.unordered_queue.write();
115                 unordered_queue.pop_front()
116             } else {
117                 let mut ordered_queue = self.ordered_queue.write();
118                 ordered_queue.pop_front()
119             };
120             if let Some(p) = &popped {
121                 if p.ending_fragment {
122                     self.selected.store(false, Ordering::SeqCst);
123                 }
124             }
125             popped
126         } else {
127             if !beginning_fragment {
128                 return None;
129             }
130             if unordered {
131                 let popped = {
132                     let mut unordered_queue = self.unordered_queue.write();
133                     unordered_queue.pop_front()
134                 };
135                 if let Some(p) = &popped {
136                     if !p.ending_fragment {
137                         self.selected.store(true, Ordering::SeqCst);
138                         self.unordered_is_selected.store(true, Ordering::SeqCst);
139                     }
140                 }
141                 popped
142             } else {
143                 let popped = {
144                     let mut ordered_queue = self.ordered_queue.write();
145                     ordered_queue.pop_front()
146                 };
147                 if let Some(p) = &popped {
148                     if !p.ending_fragment {
149                         self.selected.store(true, Ordering::SeqCst);
150                         self.unordered_is_selected.store(false, Ordering::SeqCst);
151                     }
152                 }
153                 popped
154             }
155         };
156 
157         if let Some(p) = &popped {
158             self.n_bytes.fetch_sub(p.user_data.len(), Ordering::SeqCst);
159             self.queue_len.fetch_sub(1, Ordering::SeqCst);
160         }
161 
162         popped
163     }
164 
165     pub(crate) fn get_num_bytes(&self) -> usize {
166         self.n_bytes.load(Ordering::SeqCst)
167     }
168 
169     pub(crate) fn len(&self) -> usize {
170         self.queue_len.load(Ordering::SeqCst)
171     }
172 
173     pub(crate) fn is_empty(&self) -> bool {
174         self.len() == 0
175     }
176 }
177