xref: /webrtc/sctp/src/queue/pending_queue.rs (revision ffe74184)
1 use crate::chunk::chunk_payload_data::ChunkPayloadData;
2 
3 use std::collections::VecDeque;
4 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
5 use tokio::sync::Mutex;
6 
7 /// pendingBaseQueue
8 pub(crate) type PendingBaseQueue = VecDeque<ChunkPayloadData>;
9 
10 // TODO: benchmark performance between multiple Atomic+Mutex vs one Mutex<PendingQueueInternal>
11 
12 /// pendingQueue
13 #[derive(Debug, Default)]
14 pub(crate) struct PendingQueue {
15     unordered_queue: Mutex<PendingBaseQueue>,
16     ordered_queue: Mutex<PendingBaseQueue>,
17     queue_len: AtomicUsize,
18     n_bytes: AtomicUsize,
19     selected: AtomicBool,
20     unordered_is_selected: AtomicBool,
21 }
22 
23 impl PendingQueue {
24     pub(crate) fn new() -> Self {
25         PendingQueue::default()
26     }
27 
28     pub(crate) async fn push(&self, c: ChunkPayloadData) {
29         self.n_bytes.fetch_add(c.user_data.len(), Ordering::SeqCst);
30         if c.unordered {
31             let mut unordered_queue = self.unordered_queue.lock().await;
32             unordered_queue.push_back(c);
33         } else {
34             let mut ordered_queue = self.ordered_queue.lock().await;
35             ordered_queue.push_back(c);
36         }
37         self.queue_len.fetch_add(1, Ordering::SeqCst);
38     }
39 
40     pub(crate) async fn peek(&self) -> Option<ChunkPayloadData> {
41         if self.selected.load(Ordering::SeqCst) {
42             if self.unordered_is_selected.load(Ordering::SeqCst) {
43                 let unordered_queue = self.unordered_queue.lock().await;
44                 return unordered_queue.get(0).cloned();
45             } else {
46                 let ordered_queue = self.ordered_queue.lock().await;
47                 return ordered_queue.get(0).cloned();
48             }
49         }
50 
51         let c = {
52             let unordered_queue = self.unordered_queue.lock().await;
53             unordered_queue.get(0).cloned()
54         };
55 
56         if c.is_some() {
57             return c;
58         }
59 
60         let ordered_queue = self.ordered_queue.lock().await;
61         ordered_queue.get(0).cloned()
62     }
63 
64     pub(crate) async fn pop(
65         &self,
66         beginning_fragment: bool,
67         unordered: bool,
68     ) -> Option<ChunkPayloadData> {
69         let popped = if self.selected.load(Ordering::SeqCst) {
70             let popped = if self.unordered_is_selected.load(Ordering::SeqCst) {
71                 let mut unordered_queue = self.unordered_queue.lock().await;
72                 unordered_queue.pop_front()
73             } else {
74                 let mut ordered_queue = self.ordered_queue.lock().await;
75                 ordered_queue.pop_front()
76             };
77             if let Some(p) = &popped {
78                 if p.ending_fragment {
79                     self.selected.store(false, Ordering::SeqCst);
80                 }
81             }
82             popped
83         } else {
84             if !beginning_fragment {
85                 return None;
86             }
87             if unordered {
88                 let popped = {
89                     let mut unordered_queue = self.unordered_queue.lock().await;
90                     unordered_queue.pop_front()
91                 };
92                 if let Some(p) = &popped {
93                     if !p.ending_fragment {
94                         self.selected.store(true, Ordering::SeqCst);
95                         self.unordered_is_selected.store(true, Ordering::SeqCst);
96                     }
97                 }
98                 popped
99             } else {
100                 let popped = {
101                     let mut ordered_queue = self.ordered_queue.lock().await;
102                     ordered_queue.pop_front()
103                 };
104                 if let Some(p) = &popped {
105                     if !p.ending_fragment {
106                         self.selected.store(true, Ordering::SeqCst);
107                         self.unordered_is_selected.store(false, Ordering::SeqCst);
108                     }
109                 }
110                 popped
111             }
112         };
113 
114         if let Some(p) = &popped {
115             self.n_bytes.fetch_sub(p.user_data.len(), Ordering::SeqCst);
116             self.queue_len.fetch_sub(1, Ordering::SeqCst);
117         }
118 
119         popped
120     }
121 
122     pub(crate) fn get_num_bytes(&self) -> usize {
123         self.n_bytes.load(Ordering::SeqCst)
124     }
125 
126     pub(crate) fn len(&self) -> usize {
127         self.queue_len.load(Ordering::SeqCst)
128     }
129 
130     pub(crate) fn is_empty(&self) -> bool {
131         self.len() == 0
132     }
133 }
134