1ffe74184SMartin Algesten use crate::chunk::chunk_payload_data::{ChunkPayloadData, PayloadProtocolIdentifier};
2ffe74184SMartin Algesten use crate::util::*;
3ffe74184SMartin Algesten
4ffe74184SMartin Algesten use crate::error::{Error, Result};
5ffe74184SMartin Algesten
6ffe74184SMartin Algesten use std::cmp::Ordering;
7ffe74184SMartin Algesten
sort_chunks_by_tsn(c: &mut [ChunkPayloadData])8ffe74184SMartin Algesten fn sort_chunks_by_tsn(c: &mut [ChunkPayloadData]) {
9ffe74184SMartin Algesten c.sort_by(|a, b| {
10ffe74184SMartin Algesten if sna32lt(a.tsn, b.tsn) {
11ffe74184SMartin Algesten Ordering::Less
12ffe74184SMartin Algesten } else {
13ffe74184SMartin Algesten Ordering::Greater
14ffe74184SMartin Algesten }
15ffe74184SMartin Algesten });
16ffe74184SMartin Algesten }
17ffe74184SMartin Algesten
sort_chunks_by_ssn(c: &mut [ChunkSet])18ffe74184SMartin Algesten fn sort_chunks_by_ssn(c: &mut [ChunkSet]) {
19ffe74184SMartin Algesten c.sort_by(|a, b| {
20ffe74184SMartin Algesten if sna16lt(a.ssn, b.ssn) {
21ffe74184SMartin Algesten Ordering::Less
22ffe74184SMartin Algesten } else {
23ffe74184SMartin Algesten Ordering::Greater
24ffe74184SMartin Algesten }
25ffe74184SMartin Algesten });
26ffe74184SMartin Algesten }
27ffe74184SMartin Algesten
28ffe74184SMartin Algesten /// chunkSet is a set of chunks that share the same SSN
29ffe74184SMartin Algesten #[derive(Debug, Clone)]
30ffe74184SMartin Algesten pub(crate) struct ChunkSet {
31ffe74184SMartin Algesten /// used only with the ordered chunks
32ffe74184SMartin Algesten pub(crate) ssn: u16,
33ffe74184SMartin Algesten pub(crate) ppi: PayloadProtocolIdentifier,
34ffe74184SMartin Algesten pub(crate) chunks: Vec<ChunkPayloadData>,
35ffe74184SMartin Algesten }
36ffe74184SMartin Algesten
37ffe74184SMartin Algesten impl ChunkSet {
new(ssn: u16, ppi: PayloadProtocolIdentifier) -> Self38ffe74184SMartin Algesten pub(crate) fn new(ssn: u16, ppi: PayloadProtocolIdentifier) -> Self {
39ffe74184SMartin Algesten ChunkSet {
40ffe74184SMartin Algesten ssn,
41ffe74184SMartin Algesten ppi,
42ffe74184SMartin Algesten chunks: vec![],
43ffe74184SMartin Algesten }
44ffe74184SMartin Algesten }
45ffe74184SMartin Algesten
push(&mut self, chunk: ChunkPayloadData) -> bool46ffe74184SMartin Algesten pub(crate) fn push(&mut self, chunk: ChunkPayloadData) -> bool {
47ffe74184SMartin Algesten // check if dup
48ffe74184SMartin Algesten for c in &self.chunks {
49ffe74184SMartin Algesten if c.tsn == chunk.tsn {
50ffe74184SMartin Algesten return false;
51ffe74184SMartin Algesten }
52ffe74184SMartin Algesten }
53ffe74184SMartin Algesten
54ffe74184SMartin Algesten // append and sort
55ffe74184SMartin Algesten self.chunks.push(chunk);
56ffe74184SMartin Algesten sort_chunks_by_tsn(&mut self.chunks);
57ffe74184SMartin Algesten
58ffe74184SMartin Algesten // Check if we now have a complete set
59ffe74184SMartin Algesten self.is_complete()
60ffe74184SMartin Algesten }
61ffe74184SMartin Algesten
is_complete(&self) -> bool62ffe74184SMartin Algesten pub(crate) fn is_complete(&self) -> bool {
63ffe74184SMartin Algesten // Condition for complete set
64ffe74184SMartin Algesten // 0. Has at least one chunk.
65ffe74184SMartin Algesten // 1. Begins with beginningFragment set to true
66ffe74184SMartin Algesten // 2. Ends with endingFragment set to true
67ffe74184SMartin Algesten // 3. TSN monotinically increase by 1 from beginning to end
68ffe74184SMartin Algesten
69ffe74184SMartin Algesten // 0.
70ffe74184SMartin Algesten let n_chunks = self.chunks.len();
71ffe74184SMartin Algesten if n_chunks == 0 {
72ffe74184SMartin Algesten return false;
73ffe74184SMartin Algesten }
74ffe74184SMartin Algesten
75ffe74184SMartin Algesten // 1.
76ffe74184SMartin Algesten if !self.chunks[0].beginning_fragment {
77ffe74184SMartin Algesten return false;
78ffe74184SMartin Algesten }
79ffe74184SMartin Algesten
80ffe74184SMartin Algesten // 2.
81ffe74184SMartin Algesten if !self.chunks[n_chunks - 1].ending_fragment {
82ffe74184SMartin Algesten return false;
83ffe74184SMartin Algesten }
84ffe74184SMartin Algesten
85ffe74184SMartin Algesten // 3.
86ffe74184SMartin Algesten let mut last_tsn = 0u32;
87ffe74184SMartin Algesten for (i, c) in self.chunks.iter().enumerate() {
88ffe74184SMartin Algesten if i > 0 {
89ffe74184SMartin Algesten // Fragments must have contiguous TSN
90ffe74184SMartin Algesten // From RFC 4960 Section 3.3.1:
91ffe74184SMartin Algesten // When a user message is fragmented into multiple chunks, the TSNs are
92ffe74184SMartin Algesten // used by the receiver to reassemble the message. This means that the
93ffe74184SMartin Algesten // TSNs for each fragment of a fragmented user message MUST be strictly
94ffe74184SMartin Algesten // sequential.
95ffe74184SMartin Algesten if c.tsn != last_tsn + 1 {
96ffe74184SMartin Algesten // mid or end fragment is missing
97ffe74184SMartin Algesten return false;
98ffe74184SMartin Algesten }
99ffe74184SMartin Algesten }
100ffe74184SMartin Algesten
101ffe74184SMartin Algesten last_tsn = c.tsn;
102ffe74184SMartin Algesten }
103ffe74184SMartin Algesten
104ffe74184SMartin Algesten true
105ffe74184SMartin Algesten }
106ffe74184SMartin Algesten }
107ffe74184SMartin Algesten
108ffe74184SMartin Algesten #[derive(Default, Debug)]
109ffe74184SMartin Algesten pub(crate) struct ReassemblyQueue {
110ffe74184SMartin Algesten pub(crate) si: u16,
111ffe74184SMartin Algesten pub(crate) next_ssn: u16,
112ffe74184SMartin Algesten /// expected SSN for next ordered chunk
113ffe74184SMartin Algesten pub(crate) ordered: Vec<ChunkSet>,
114ffe74184SMartin Algesten pub(crate) unordered: Vec<ChunkSet>,
115ffe74184SMartin Algesten pub(crate) unordered_chunks: Vec<ChunkPayloadData>,
116ffe74184SMartin Algesten pub(crate) n_bytes: usize,
117ffe74184SMartin Algesten }
118ffe74184SMartin Algesten
119ffe74184SMartin Algesten impl ReassemblyQueue {
120ffe74184SMartin Algesten /// From RFC 4960 Sec 6.5:
121ffe74184SMartin Algesten /// The Stream Sequence Number in all the streams MUST start from 0 when
122ffe74184SMartin Algesten /// the association is Established. Also, when the Stream Sequence
123ffe74184SMartin Algesten /// Number reaches the value 65535 the next Stream Sequence Number MUST
124ffe74184SMartin Algesten /// be set to 0.
new(si: u16) -> Self125ffe74184SMartin Algesten pub(crate) fn new(si: u16) -> Self {
126ffe74184SMartin Algesten ReassemblyQueue {
127ffe74184SMartin Algesten si,
128ffe74184SMartin Algesten next_ssn: 0, // From RFC 4960 Sec 6.5:
129ffe74184SMartin Algesten ordered: vec![],
130ffe74184SMartin Algesten unordered: vec![],
131ffe74184SMartin Algesten unordered_chunks: vec![],
132ffe74184SMartin Algesten n_bytes: 0,
133ffe74184SMartin Algesten }
134ffe74184SMartin Algesten }
135ffe74184SMartin Algesten
push(&mut self, chunk: ChunkPayloadData) -> bool136ffe74184SMartin Algesten pub(crate) fn push(&mut self, chunk: ChunkPayloadData) -> bool {
137ffe74184SMartin Algesten if chunk.stream_identifier != self.si {
138ffe74184SMartin Algesten return false;
139ffe74184SMartin Algesten }
140ffe74184SMartin Algesten
141ffe74184SMartin Algesten if chunk.unordered {
142ffe74184SMartin Algesten // First, insert into unordered_chunks array
143ffe74184SMartin Algesten //atomic.AddUint64(&r.n_bytes, uint64(len(chunk.userData)))
144ffe74184SMartin Algesten self.n_bytes += chunk.user_data.len();
145ffe74184SMartin Algesten self.unordered_chunks.push(chunk);
146ffe74184SMartin Algesten sort_chunks_by_tsn(&mut self.unordered_chunks);
147ffe74184SMartin Algesten
148ffe74184SMartin Algesten // Scan unordered_chunks that are contiguous (in TSN)
149ffe74184SMartin Algesten // If found, append the complete set to the unordered array
150ffe74184SMartin Algesten if let Some(cset) = self.find_complete_unordered_chunk_set() {
151ffe74184SMartin Algesten self.unordered.push(cset);
152ffe74184SMartin Algesten return true;
153ffe74184SMartin Algesten }
154ffe74184SMartin Algesten
155ffe74184SMartin Algesten false
156ffe74184SMartin Algesten } else {
157ffe74184SMartin Algesten // This is an ordered chunk
158ffe74184SMartin Algesten if sna16lt(chunk.stream_sequence_number, self.next_ssn) {
159ffe74184SMartin Algesten return false;
160ffe74184SMartin Algesten }
161ffe74184SMartin Algesten
162ffe74184SMartin Algesten self.n_bytes += chunk.user_data.len();
163ffe74184SMartin Algesten
164ffe74184SMartin Algesten // Check if a chunkSet with the SSN already exists
165ffe74184SMartin Algesten for s in &mut self.ordered {
166ffe74184SMartin Algesten if s.ssn == chunk.stream_sequence_number {
167ffe74184SMartin Algesten return s.push(chunk);
168ffe74184SMartin Algesten }
169ffe74184SMartin Algesten }
170ffe74184SMartin Algesten
171ffe74184SMartin Algesten // If not found, create a new chunkSet
172ffe74184SMartin Algesten let mut cset = ChunkSet::new(chunk.stream_sequence_number, chunk.payload_type);
173ffe74184SMartin Algesten let unordered = chunk.unordered;
174ffe74184SMartin Algesten let ok = cset.push(chunk);
175ffe74184SMartin Algesten self.ordered.push(cset);
176ffe74184SMartin Algesten if !unordered {
177ffe74184SMartin Algesten sort_chunks_by_ssn(&mut self.ordered);
178ffe74184SMartin Algesten }
179ffe74184SMartin Algesten
180ffe74184SMartin Algesten ok
181ffe74184SMartin Algesten }
182ffe74184SMartin Algesten }
183ffe74184SMartin Algesten
find_complete_unordered_chunk_set(&mut self) -> Option<ChunkSet>184ffe74184SMartin Algesten pub(crate) fn find_complete_unordered_chunk_set(&mut self) -> Option<ChunkSet> {
185ffe74184SMartin Algesten let mut start_idx = -1isize;
186ffe74184SMartin Algesten let mut n_chunks = 0usize;
187ffe74184SMartin Algesten let mut last_tsn = 0u32;
188ffe74184SMartin Algesten let mut found = false;
189ffe74184SMartin Algesten
190ffe74184SMartin Algesten for (i, c) in self.unordered_chunks.iter().enumerate() {
191ffe74184SMartin Algesten // seek beigining
192ffe74184SMartin Algesten if c.beginning_fragment {
193ffe74184SMartin Algesten start_idx = i as isize;
194ffe74184SMartin Algesten n_chunks = 1;
195ffe74184SMartin Algesten last_tsn = c.tsn;
196ffe74184SMartin Algesten
197ffe74184SMartin Algesten if c.ending_fragment {
198ffe74184SMartin Algesten found = true;
199ffe74184SMartin Algesten break;
200ffe74184SMartin Algesten }
201ffe74184SMartin Algesten continue;
202ffe74184SMartin Algesten }
203ffe74184SMartin Algesten
204ffe74184SMartin Algesten if start_idx < 0 {
205ffe74184SMartin Algesten continue;
206ffe74184SMartin Algesten }
207ffe74184SMartin Algesten
208ffe74184SMartin Algesten // Check if contiguous in TSN
209ffe74184SMartin Algesten if c.tsn != last_tsn + 1 {
210ffe74184SMartin Algesten start_idx = -1;
211ffe74184SMartin Algesten continue;
212ffe74184SMartin Algesten }
213ffe74184SMartin Algesten
214ffe74184SMartin Algesten last_tsn = c.tsn;
215ffe74184SMartin Algesten n_chunks += 1;
216ffe74184SMartin Algesten
217ffe74184SMartin Algesten if c.ending_fragment {
218ffe74184SMartin Algesten found = true;
219ffe74184SMartin Algesten break;
220ffe74184SMartin Algesten }
221ffe74184SMartin Algesten }
222ffe74184SMartin Algesten
223ffe74184SMartin Algesten if !found {
224ffe74184SMartin Algesten return None;
225ffe74184SMartin Algesten }
226ffe74184SMartin Algesten
227ffe74184SMartin Algesten // Extract the range of chunks
228ffe74184SMartin Algesten let chunks: Vec<ChunkPayloadData> = self
229ffe74184SMartin Algesten .unordered_chunks
230ffe74184SMartin Algesten .drain(start_idx as usize..(start_idx as usize) + n_chunks)
231ffe74184SMartin Algesten .collect();
232ffe74184SMartin Algesten
233ffe74184SMartin Algesten let mut chunk_set = ChunkSet::new(0, chunks[0].payload_type);
234ffe74184SMartin Algesten chunk_set.chunks = chunks;
235ffe74184SMartin Algesten
236ffe74184SMartin Algesten Some(chunk_set)
237ffe74184SMartin Algesten }
238ffe74184SMartin Algesten
is_readable(&self) -> bool239ffe74184SMartin Algesten pub(crate) fn is_readable(&self) -> bool {
240ffe74184SMartin Algesten // Check unordered first
241ffe74184SMartin Algesten if !self.unordered.is_empty() {
242ffe74184SMartin Algesten // The chunk sets in r.unordered should all be complete.
243ffe74184SMartin Algesten return true;
244ffe74184SMartin Algesten }
245ffe74184SMartin Algesten
246ffe74184SMartin Algesten // Check ordered sets
247ffe74184SMartin Algesten if !self.ordered.is_empty() {
248ffe74184SMartin Algesten let cset = &self.ordered[0];
249ffe74184SMartin Algesten if cset.is_complete() && sna16lte(cset.ssn, self.next_ssn) {
250ffe74184SMartin Algesten return true;
251ffe74184SMartin Algesten }
252ffe74184SMartin Algesten }
253ffe74184SMartin Algesten false
254ffe74184SMartin Algesten }
255ffe74184SMartin Algesten
read(&mut self, buf: &mut [u8]) -> Result<(usize, PayloadProtocolIdentifier)>256ffe74184SMartin Algesten pub(crate) fn read(&mut self, buf: &mut [u8]) -> Result<(usize, PayloadProtocolIdentifier)> {
257ffe74184SMartin Algesten // Check unordered first
258ffe74184SMartin Algesten let cset = if !self.unordered.is_empty() {
259ffe74184SMartin Algesten self.unordered.remove(0)
260ffe74184SMartin Algesten } else if !self.ordered.is_empty() {
261ffe74184SMartin Algesten // Now, check ordered
262ffe74184SMartin Algesten let cset = &self.ordered[0];
263ffe74184SMartin Algesten if !cset.is_complete() {
264ffe74184SMartin Algesten return Err(Error::ErrTryAgain);
265ffe74184SMartin Algesten }
266ffe74184SMartin Algesten if sna16gt(cset.ssn, self.next_ssn) {
267ffe74184SMartin Algesten return Err(Error::ErrTryAgain);
268ffe74184SMartin Algesten }
269ffe74184SMartin Algesten if cset.ssn == self.next_ssn {
270*95f8c260SChristopher Corley // From RFC 4960 Sec 6.5:
271*95f8c260SChristopher Corley self.next_ssn = self.next_ssn.wrapping_add(1);
272ffe74184SMartin Algesten }
273ffe74184SMartin Algesten self.ordered.remove(0)
274ffe74184SMartin Algesten } else {
275ffe74184SMartin Algesten return Err(Error::ErrTryAgain);
276ffe74184SMartin Algesten };
277ffe74184SMartin Algesten
278ffe74184SMartin Algesten // Concat all fragments into the buffer
279ffe74184SMartin Algesten let mut n_written = 0;
280ffe74184SMartin Algesten let mut err = None;
281ffe74184SMartin Algesten for c in &cset.chunks {
282ffe74184SMartin Algesten let to_copy = c.user_data.len();
283ffe74184SMartin Algesten self.subtract_num_bytes(to_copy);
284ffe74184SMartin Algesten if err.is_none() {
285ffe74184SMartin Algesten let n = std::cmp::min(to_copy, buf.len() - n_written);
286ffe74184SMartin Algesten buf[n_written..n_written + n].copy_from_slice(&c.user_data[..n]);
287ffe74184SMartin Algesten n_written += n;
288ffe74184SMartin Algesten if n < to_copy {
289ffe74184SMartin Algesten err = Some(Error::ErrShortBuffer);
290ffe74184SMartin Algesten }
291ffe74184SMartin Algesten }
292ffe74184SMartin Algesten }
293ffe74184SMartin Algesten
294ffe74184SMartin Algesten if let Some(err) = err {
295ffe74184SMartin Algesten Err(err)
296ffe74184SMartin Algesten } else {
297ffe74184SMartin Algesten Ok((n_written, cset.ppi))
298ffe74184SMartin Algesten }
299ffe74184SMartin Algesten }
300ffe74184SMartin Algesten
301ffe74184SMartin Algesten /// Use last_ssn to locate a chunkSet then remove it if the set has
302ffe74184SMartin Algesten /// not been complete
forward_tsn_for_ordered(&mut self, last_ssn: u16)303ffe74184SMartin Algesten pub(crate) fn forward_tsn_for_ordered(&mut self, last_ssn: u16) {
304ffe74184SMartin Algesten let num_bytes = self
305ffe74184SMartin Algesten .ordered
306ffe74184SMartin Algesten .iter()
307ffe74184SMartin Algesten .filter(|s| sna16lte(s.ssn, last_ssn) && !s.is_complete())
308ffe74184SMartin Algesten .fold(0, |n, s| {
309ffe74184SMartin Algesten n + s.chunks.iter().fold(0, |acc, c| acc + c.user_data.len())
310ffe74184SMartin Algesten });
311ffe74184SMartin Algesten self.subtract_num_bytes(num_bytes);
312ffe74184SMartin Algesten
313ffe74184SMartin Algesten self.ordered
314ffe74184SMartin Algesten .retain(|s| !sna16lte(s.ssn, last_ssn) || s.is_complete());
315ffe74184SMartin Algesten
316ffe74184SMartin Algesten // Finally, forward next_ssn
317ffe74184SMartin Algesten if sna16lte(self.next_ssn, last_ssn) {
318*95f8c260SChristopher Corley self.next_ssn = last_ssn.wrapping_add(1);
319ffe74184SMartin Algesten }
320ffe74184SMartin Algesten }
321ffe74184SMartin Algesten
322ffe74184SMartin Algesten /// Remove all fragments in the unordered sets that contains chunks
323ffe74184SMartin Algesten /// equal to or older than `new_cumulative_tsn`.
324ffe74184SMartin Algesten /// We know all sets in the r.unordered are complete ones.
325ffe74184SMartin Algesten /// Just remove chunks that are equal to or older than new_cumulative_tsn
326ffe74184SMartin Algesten /// from the unordered_chunks
forward_tsn_for_unordered(&mut self, new_cumulative_tsn: u32)327ffe74184SMartin Algesten pub(crate) fn forward_tsn_for_unordered(&mut self, new_cumulative_tsn: u32) {
328ffe74184SMartin Algesten let mut last_idx: isize = -1;
329ffe74184SMartin Algesten for (i, c) in self.unordered_chunks.iter().enumerate() {
330ffe74184SMartin Algesten if sna32gt(c.tsn, new_cumulative_tsn) {
331ffe74184SMartin Algesten break;
332ffe74184SMartin Algesten }
333ffe74184SMartin Algesten last_idx = i as isize;
334ffe74184SMartin Algesten }
335ffe74184SMartin Algesten if last_idx >= 0 {
336ffe74184SMartin Algesten for i in 0..(last_idx + 1) as usize {
337ffe74184SMartin Algesten self.subtract_num_bytes(self.unordered_chunks[i].user_data.len());
338ffe74184SMartin Algesten }
339ffe74184SMartin Algesten self.unordered_chunks.drain(..(last_idx + 1) as usize);
340ffe74184SMartin Algesten }
341ffe74184SMartin Algesten }
342ffe74184SMartin Algesten
subtract_num_bytes(&mut self, n_bytes: usize)343ffe74184SMartin Algesten pub(crate) fn subtract_num_bytes(&mut self, n_bytes: usize) {
344ffe74184SMartin Algesten if self.n_bytes >= n_bytes {
345ffe74184SMartin Algesten self.n_bytes -= n_bytes;
346ffe74184SMartin Algesten } else {
347ffe74184SMartin Algesten self.n_bytes = 0;
348ffe74184SMartin Algesten }
349ffe74184SMartin Algesten }
350ffe74184SMartin Algesten
get_num_bytes(&self) -> usize351ffe74184SMartin Algesten pub(crate) fn get_num_bytes(&self) -> usize {
352ffe74184SMartin Algesten self.n_bytes
353ffe74184SMartin Algesten }
354ffe74184SMartin Algesten }
355