1 #[cfg(test)]
2 mod agent_test;
3
4 use crate::client::ClientTransaction;
5 use crate::error::*;
6 use crate::message::*;
7
8 use rand::Rng;
9 use std::collections::HashMap;
10 use std::sync::Arc;
11 use tokio::sync::mpsc;
12 use tokio::time::Instant;
13
14 /// Handler handles state changes of transaction.
15 /// Handler is called on transaction state change.
16 /// Usage of e is valid only during call, user must
17 /// copy needed fields explicitly.
18 pub type Handler = Option<Arc<mpsc::UnboundedSender<Event>>>;
19
20 /// noop_handler just discards any event.
noop_handler() -> Handler21 pub fn noop_handler() -> Handler {
22 None
23 }
24
25 /// Agent is low-level abstraction over transaction list that
26 /// handles concurrency (all calls are goroutine-safe) and
27 /// time outs (via Collect call).
28 pub struct Agent {
29 /// transactions is map of transactions that are currently
30 /// in progress. Event handling is done in such way when
31 /// transaction is unregistered before AgentTransaction access,
32 /// minimizing mux lock and protecting AgentTransaction from
33 /// data races via unexpected concurrent access.
34 transactions: HashMap<TransactionId, AgentTransaction>,
35 /// all calls are invalid if true
36 closed: bool,
37 /// handles transactions
38 handler: Handler,
39 }
40
41 #[derive(Debug, Clone)]
42 pub enum EventType {
43 Callback(TransactionId),
44 Insert(ClientTransaction),
45 Remove(TransactionId),
46 Close,
47 }
48
49 impl Default for EventType {
default() -> Self50 fn default() -> Self {
51 EventType::Callback(TransactionId::default())
52 }
53 }
54
55 /// Event is passed to Handler describing the transaction event.
56 /// Do not reuse outside Handler.
57 #[derive(Debug)] //Clone
58 pub struct Event {
59 pub event_type: EventType,
60 pub event_body: Result<Message>,
61 }
62
63 impl Default for Event {
default() -> Self64 fn default() -> Self {
65 Event {
66 event_type: EventType::default(),
67 event_body: Ok(Message::default()),
68 }
69 }
70 }
71
72 /// AgentTransaction represents transaction in progress.
73 /// Concurrent access is invalid.
74 pub(crate) struct AgentTransaction {
75 id: TransactionId,
76 deadline: Instant,
77 }
78
79 /// AGENT_COLLECT_CAP is initial capacity for Agent.Collect slices,
80 /// sufficient to make function zero-alloc in most cases.
81 const AGENT_COLLECT_CAP: usize = 100;
82
83 #[derive(PartialEq, Eq, Hash, Copy, Clone, Default, Debug)]
84 pub struct TransactionId(pub [u8; TRANSACTION_ID_SIZE]);
85
86 impl TransactionId {
87 /// new returns new random transaction ID using crypto/rand
88 /// as source.
new() -> Self89 pub fn new() -> Self {
90 let mut b = TransactionId([0u8; TRANSACTION_ID_SIZE]);
91 rand::thread_rng().fill(&mut b.0);
92 b
93 }
94 }
95
96 impl Setter for TransactionId {
add_to(&self, m: &mut Message) -> Result<()>97 fn add_to(&self, m: &mut Message) -> Result<()> {
98 m.transaction_id = *self;
99 m.write_transaction_id();
100 Ok(())
101 }
102 }
103
104 /// ClientAgent is Agent implementation that is used by Client to
105 /// process transactions.
106 #[derive(Debug)]
107 pub enum ClientAgent {
108 Process(Message),
109 Collect(Instant),
110 Start(TransactionId, Instant),
111 Stop(TransactionId),
112 Close,
113 }
114
115 impl Agent {
116 /// new initializes and returns new Agent with provided handler.
new(handler: Handler) -> Self117 pub fn new(handler: Handler) -> Self {
118 Agent {
119 transactions: HashMap::new(),
120 closed: false,
121 handler,
122 }
123 }
124
125 /// stop_with_error removes transaction from list and calls handler with
126 /// provided error. Can return ErrTransactionNotExists and ErrAgentClosed.
stop_with_error(&mut self, id: TransactionId, error: Error) -> Result<()>127 pub fn stop_with_error(&mut self, id: TransactionId, error: Error) -> Result<()> {
128 if self.closed {
129 return Err(Error::ErrAgentClosed);
130 }
131
132 let v = self.transactions.remove(&id);
133 if let Some(t) = v {
134 if let Some(handler) = &self.handler {
135 handler.send(Event {
136 event_type: EventType::Callback(t.id),
137 event_body: Err(error),
138 })?;
139 }
140 Ok(())
141 } else {
142 Err(Error::ErrTransactionNotExists)
143 }
144 }
145
146 /// process incoming message, synchronously passing it to handler.
process(&mut self, message: Message) -> Result<()>147 pub fn process(&mut self, message: Message) -> Result<()> {
148 if self.closed {
149 return Err(Error::ErrAgentClosed);
150 }
151
152 self.transactions.remove(&message.transaction_id);
153
154 let e = Event {
155 event_type: EventType::Callback(message.transaction_id),
156 event_body: Ok(message),
157 };
158
159 if let Some(handler) = &self.handler {
160 handler.send(e)?;
161 }
162
163 Ok(())
164 }
165
166 /// close terminates all transactions with ErrAgentClosed and renders Agent to
167 /// closed state.
close(&mut self) -> Result<()>168 pub fn close(&mut self) -> Result<()> {
169 if self.closed {
170 return Err(Error::ErrAgentClosed);
171 }
172
173 for id in self.transactions.keys() {
174 let e = Event {
175 event_type: EventType::Callback(*id),
176 event_body: Err(Error::ErrAgentClosed),
177 };
178 if let Some(handler) = &self.handler {
179 handler.send(e)?;
180 }
181 }
182 self.transactions = HashMap::new();
183 self.closed = true;
184 self.handler = noop_handler();
185
186 Ok(())
187 }
188
189 /// start registers transaction with provided id and deadline.
190 /// Could return ErrAgentClosed, ErrTransactionExists.
191 ///
192 /// Agent handler is guaranteed to be eventually called.
start(&mut self, id: TransactionId, deadline: Instant) -> Result<()>193 pub fn start(&mut self, id: TransactionId, deadline: Instant) -> Result<()> {
194 if self.closed {
195 return Err(Error::ErrAgentClosed);
196 }
197 if self.transactions.contains_key(&id) {
198 return Err(Error::ErrTransactionExists);
199 }
200
201 self.transactions
202 .insert(id, AgentTransaction { id, deadline });
203
204 Ok(())
205 }
206
207 /// stop stops transaction by id with ErrTransactionStopped, blocking
208 /// until handler returns.
stop(&mut self, id: TransactionId) -> Result<()>209 pub fn stop(&mut self, id: TransactionId) -> Result<()> {
210 self.stop_with_error(id, Error::ErrTransactionStopped)
211 }
212
213 /// collect terminates all transactions that have deadline before provided
214 /// time, blocking until all handlers will process ErrTransactionTimeOut.
215 /// Will return ErrAgentClosed if agent is already closed.
216 ///
217 /// It is safe to call Collect concurrently but makes no sense.
collect(&mut self, deadline: Instant) -> Result<()>218 pub fn collect(&mut self, deadline: Instant) -> Result<()> {
219 if self.closed {
220 // Doing nothing if agent is closed.
221 // All transactions should be already closed
222 // during Close() call.
223 return Err(Error::ErrAgentClosed);
224 }
225
226 let mut to_remove: Vec<TransactionId> = Vec::with_capacity(AGENT_COLLECT_CAP);
227
228 // Adding all transactions with deadline before gc_time
229 // to toCall and to_remove slices.
230 // No allocs if there are less than AGENT_COLLECT_CAP
231 // timed out transactions.
232 for (id, t) in &self.transactions {
233 if t.deadline < deadline {
234 to_remove.push(*id);
235 }
236 }
237 // Un-registering timed out transactions.
238 for id in &to_remove {
239 self.transactions.remove(id);
240 }
241
242 for id in to_remove {
243 let event = Event {
244 event_type: EventType::Callback(id),
245 event_body: Err(Error::ErrTransactionTimeOut),
246 };
247 if let Some(handler) = &self.handler {
248 handler.send(event)?;
249 }
250 }
251
252 Ok(())
253 }
254
255 /// set_handler sets agent handler to h.
set_handler(&mut self, h: Handler) -> Result<()>256 pub fn set_handler(&mut self, h: Handler) -> Result<()> {
257 if self.closed {
258 return Err(Error::ErrAgentClosed);
259 }
260 self.handler = h;
261
262 Ok(())
263 }
264
run(mut agent: Agent, mut rx: mpsc::Receiver<ClientAgent>)265 pub(crate) async fn run(mut agent: Agent, mut rx: mpsc::Receiver<ClientAgent>) {
266 while let Some(client_agent) = rx.recv().await {
267 let result = match client_agent {
268 ClientAgent::Process(message) => agent.process(message),
269 ClientAgent::Collect(deadline) => agent.collect(deadline),
270 ClientAgent::Start(tid, deadline) => agent.start(tid, deadline),
271 ClientAgent::Stop(tid) => agent.stop(tid),
272 ClientAgent::Close => agent.close(),
273 };
274
275 if let Err(err) = result {
276 if Error::ErrAgentClosed == err {
277 break;
278 }
279 }
280 }
281 }
282 }
283