xref: /webrtc/stun/src/agent.rs (revision ffe74184)
1 #[cfg(test)]
2 mod agent_test;
3 
4 use crate::client::ClientTransaction;
5 use crate::error::*;
6 use crate::message::*;
7 
8 use rand::Rng;
9 use std::collections::HashMap;
10 use std::sync::Arc;
11 use tokio::sync::mpsc;
12 use tokio::time::Instant;
13 
14 /// Handler handles state changes of transaction.
15 /// Handler is called on transaction state change.
16 /// Usage of e is valid only during call, user must
17 /// copy needed fields explicitly.
18 pub type Handler = Option<Arc<mpsc::UnboundedSender<Event>>>;
19 
20 /// noop_handler just discards any event.
noop_handler() -> Handler21 pub fn noop_handler() -> Handler {
22     None
23 }
24 
25 /// Agent is low-level abstraction over transaction list that
26 /// handles concurrency (all calls are goroutine-safe) and
27 /// time outs (via Collect call).
28 pub struct Agent {
29     /// transactions is map of transactions that are currently
30     /// in progress. Event handling is done in such way when
31     /// transaction is unregistered before AgentTransaction access,
32     /// minimizing mux lock and protecting AgentTransaction from
33     /// data races via unexpected concurrent access.
34     transactions: HashMap<TransactionId, AgentTransaction>,
35     /// all calls are invalid if true
36     closed: bool,
37     /// handles transactions
38     handler: Handler,
39 }
40 
41 #[derive(Debug, Clone)]
42 pub enum EventType {
43     Callback(TransactionId),
44     Insert(ClientTransaction),
45     Remove(TransactionId),
46     Close,
47 }
48 
49 impl Default for EventType {
default() -> Self50     fn default() -> Self {
51         EventType::Callback(TransactionId::default())
52     }
53 }
54 
55 /// Event is passed to Handler describing the transaction event.
56 /// Do not reuse outside Handler.
57 #[derive(Debug)] //Clone
58 pub struct Event {
59     pub event_type: EventType,
60     pub event_body: Result<Message>,
61 }
62 
63 impl Default for Event {
default() -> Self64     fn default() -> Self {
65         Event {
66             event_type: EventType::default(),
67             event_body: Ok(Message::default()),
68         }
69     }
70 }
71 
72 /// AgentTransaction represents transaction in progress.
73 /// Concurrent access is invalid.
74 pub(crate) struct AgentTransaction {
75     id: TransactionId,
76     deadline: Instant,
77 }
78 
79 /// AGENT_COLLECT_CAP is initial capacity for Agent.Collect slices,
80 /// sufficient to make function zero-alloc in most cases.
81 const AGENT_COLLECT_CAP: usize = 100;
82 
83 #[derive(PartialEq, Eq, Hash, Copy, Clone, Default, Debug)]
84 pub struct TransactionId(pub [u8; TRANSACTION_ID_SIZE]);
85 
86 impl TransactionId {
87     /// new returns new random transaction ID using crypto/rand
88     /// as source.
new() -> Self89     pub fn new() -> Self {
90         let mut b = TransactionId([0u8; TRANSACTION_ID_SIZE]);
91         rand::thread_rng().fill(&mut b.0);
92         b
93     }
94 }
95 
96 impl Setter for TransactionId {
add_to(&self, m: &mut Message) -> Result<()>97     fn add_to(&self, m: &mut Message) -> Result<()> {
98         m.transaction_id = *self;
99         m.write_transaction_id();
100         Ok(())
101     }
102 }
103 
104 /// ClientAgent is Agent implementation that is used by Client to
105 /// process transactions.
106 #[derive(Debug)]
107 pub enum ClientAgent {
108     Process(Message),
109     Collect(Instant),
110     Start(TransactionId, Instant),
111     Stop(TransactionId),
112     Close,
113 }
114 
115 impl Agent {
116     /// new initializes and returns new Agent with provided handler.
new(handler: Handler) -> Self117     pub fn new(handler: Handler) -> Self {
118         Agent {
119             transactions: HashMap::new(),
120             closed: false,
121             handler,
122         }
123     }
124 
125     /// stop_with_error removes transaction from list and calls handler with
126     /// provided error. Can return ErrTransactionNotExists and ErrAgentClosed.
stop_with_error(&mut self, id: TransactionId, error: Error) -> Result<()>127     pub fn stop_with_error(&mut self, id: TransactionId, error: Error) -> Result<()> {
128         if self.closed {
129             return Err(Error::ErrAgentClosed);
130         }
131 
132         let v = self.transactions.remove(&id);
133         if let Some(t) = v {
134             if let Some(handler) = &self.handler {
135                 handler.send(Event {
136                     event_type: EventType::Callback(t.id),
137                     event_body: Err(error),
138                 })?;
139             }
140             Ok(())
141         } else {
142             Err(Error::ErrTransactionNotExists)
143         }
144     }
145 
146     /// process incoming message, synchronously passing it to handler.
process(&mut self, message: Message) -> Result<()>147     pub fn process(&mut self, message: Message) -> Result<()> {
148         if self.closed {
149             return Err(Error::ErrAgentClosed);
150         }
151 
152         self.transactions.remove(&message.transaction_id);
153 
154         let e = Event {
155             event_type: EventType::Callback(message.transaction_id),
156             event_body: Ok(message),
157         };
158 
159         if let Some(handler) = &self.handler {
160             handler.send(e)?;
161         }
162 
163         Ok(())
164     }
165 
166     /// close terminates all transactions with ErrAgentClosed and renders Agent to
167     /// closed state.
close(&mut self) -> Result<()>168     pub fn close(&mut self) -> Result<()> {
169         if self.closed {
170             return Err(Error::ErrAgentClosed);
171         }
172 
173         for id in self.transactions.keys() {
174             let e = Event {
175                 event_type: EventType::Callback(*id),
176                 event_body: Err(Error::ErrAgentClosed),
177             };
178             if let Some(handler) = &self.handler {
179                 handler.send(e)?;
180             }
181         }
182         self.transactions = HashMap::new();
183         self.closed = true;
184         self.handler = noop_handler();
185 
186         Ok(())
187     }
188 
189     /// start registers transaction with provided id and deadline.
190     /// Could return ErrAgentClosed, ErrTransactionExists.
191     ///
192     /// Agent handler is guaranteed to be eventually called.
start(&mut self, id: TransactionId, deadline: Instant) -> Result<()>193     pub fn start(&mut self, id: TransactionId, deadline: Instant) -> Result<()> {
194         if self.closed {
195             return Err(Error::ErrAgentClosed);
196         }
197         if self.transactions.contains_key(&id) {
198             return Err(Error::ErrTransactionExists);
199         }
200 
201         self.transactions
202             .insert(id, AgentTransaction { id, deadline });
203 
204         Ok(())
205     }
206 
207     /// stop stops transaction by id with ErrTransactionStopped, blocking
208     /// until handler returns.
stop(&mut self, id: TransactionId) -> Result<()>209     pub fn stop(&mut self, id: TransactionId) -> Result<()> {
210         self.stop_with_error(id, Error::ErrTransactionStopped)
211     }
212 
213     /// collect terminates all transactions that have deadline before provided
214     /// time, blocking until all handlers will process ErrTransactionTimeOut.
215     /// Will return ErrAgentClosed if agent is already closed.
216     ///
217     /// It is safe to call Collect concurrently but makes no sense.
collect(&mut self, deadline: Instant) -> Result<()>218     pub fn collect(&mut self, deadline: Instant) -> Result<()> {
219         if self.closed {
220             // Doing nothing if agent is closed.
221             // All transactions should be already closed
222             // during Close() call.
223             return Err(Error::ErrAgentClosed);
224         }
225 
226         let mut to_remove: Vec<TransactionId> = Vec::with_capacity(AGENT_COLLECT_CAP);
227 
228         // Adding all transactions with deadline before gc_time
229         // to toCall and to_remove slices.
230         // No allocs if there are less than AGENT_COLLECT_CAP
231         // timed out transactions.
232         for (id, t) in &self.transactions {
233             if t.deadline < deadline {
234                 to_remove.push(*id);
235             }
236         }
237         // Un-registering timed out transactions.
238         for id in &to_remove {
239             self.transactions.remove(id);
240         }
241 
242         for id in to_remove {
243             let event = Event {
244                 event_type: EventType::Callback(id),
245                 event_body: Err(Error::ErrTransactionTimeOut),
246             };
247             if let Some(handler) = &self.handler {
248                 handler.send(event)?;
249             }
250         }
251 
252         Ok(())
253     }
254 
255     /// set_handler sets agent handler to h.
set_handler(&mut self, h: Handler) -> Result<()>256     pub fn set_handler(&mut self, h: Handler) -> Result<()> {
257         if self.closed {
258             return Err(Error::ErrAgentClosed);
259         }
260         self.handler = h;
261 
262         Ok(())
263     }
264 
run(mut agent: Agent, mut rx: mpsc::Receiver<ClientAgent>)265     pub(crate) async fn run(mut agent: Agent, mut rx: mpsc::Receiver<ClientAgent>) {
266         while let Some(client_agent) = rx.recv().await {
267             let result = match client_agent {
268                 ClientAgent::Process(message) => agent.process(message),
269                 ClientAgent::Collect(deadline) => agent.collect(deadline),
270                 ClientAgent::Start(tid, deadline) => agent.start(tid, deadline),
271                 ClientAgent::Stop(tid) => agent.stop(tid),
272                 ClientAgent::Close => agent.close(),
273             };
274 
275             if let Err(err) = result {
276                 if Error::ErrAgentClosed == err {
277                     break;
278                 }
279             }
280         }
281     }
282 }
283