1 use super::{chunk_header::*, chunk_type::*, *}; 2 3 use bytes::{Buf, BufMut, Bytes, BytesMut}; 4 use std::fmt; 5 use std::sync::atomic::{AtomicBool, Ordering}; 6 use std::sync::Arc; 7 use std::time::SystemTime; 8 9 pub(crate) const PAYLOAD_DATA_ENDING_FRAGMENT_BITMASK: u8 = 1; 10 pub(crate) const PAYLOAD_DATA_BEGINING_FRAGMENT_BITMASK: u8 = 2; 11 pub(crate) const PAYLOAD_DATA_UNORDERED_BITMASK: u8 = 4; 12 pub(crate) const PAYLOAD_DATA_IMMEDIATE_SACK: u8 = 8; 13 pub(crate) const PAYLOAD_DATA_HEADER_SIZE: usize = 12; 14 15 /// PayloadProtocolIdentifier is an enum for DataChannel payload types 16 /// PayloadProtocolIdentifier enums 17 /// https://www.iana.org/assignments/sctp-parameters/sctp-parameters.xhtml#sctp-parameters-25 18 #[derive(Default, Debug, Copy, Clone, PartialEq, Eq)] 19 #[repr(C)] 20 pub enum PayloadProtocolIdentifier { 21 Dcep = 50, 22 String = 51, 23 Binary = 53, 24 StringEmpty = 56, 25 BinaryEmpty = 57, 26 #[default] 27 Unknown, 28 } 29 30 impl fmt::Display for PayloadProtocolIdentifier { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result31 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 32 let s = match *self { 33 PayloadProtocolIdentifier::Dcep => "WebRTC DCEP", 34 PayloadProtocolIdentifier::String => "WebRTC String", 35 PayloadProtocolIdentifier::Binary => "WebRTC Binary", 36 PayloadProtocolIdentifier::StringEmpty => "WebRTC String (Empty)", 37 PayloadProtocolIdentifier::BinaryEmpty => "WebRTC Binary (Empty)", 38 _ => "Unknown Payload Protocol Identifier", 39 }; 40 write!(f, "{s}") 41 } 42 } 43 44 impl From<u32> for PayloadProtocolIdentifier { from(v: u32) -> PayloadProtocolIdentifier45 fn from(v: u32) -> PayloadProtocolIdentifier { 46 match v { 47 50 => PayloadProtocolIdentifier::Dcep, 48 51 => PayloadProtocolIdentifier::String, 49 53 => PayloadProtocolIdentifier::Binary, 50 56 => PayloadProtocolIdentifier::StringEmpty, 51 57 => PayloadProtocolIdentifier::BinaryEmpty, 52 _ => PayloadProtocolIdentifier::Unknown, 53 } 54 } 55 } 56 57 ///chunkPayloadData represents an SCTP Chunk of type DATA 58 /// 59 /// 0 1 2 3 60 /// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 61 ///+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ 62 ///| Type = 0 | Reserved|U|B|E| Length | 63 ///+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ 64 ///| TSN | 65 ///+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ 66 ///| Stream Identifier S | Stream Sequence Number n | 67 ///+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ 68 ///| Payload Protocol Identifier | 69 ///+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ 70 ///| | 71 ///| User Data (seq n of Stream S) | 72 ///| | 73 ///+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ 74 /// 75 /// 76 ///An unfragmented user message shall have both the B and E bits set to 77 ///'1'. Setting both B and E bits to '0' indicates a middle fragment of 78 ///a multi-fragment user message, as summarized in the following table: 79 /// B E Description 80 ///============================================================ 81 ///| 1 0 | First piece of a fragmented user message | 82 ///+----------------------------------------------------------+ 83 ///| 0 0 | Middle piece of a fragmented user message | 84 ///+----------------------------------------------------------+ 85 ///| 0 1 | Last piece of a fragmented user message | 86 ///+----------------------------------------------------------+ 87 ///| 1 1 | Unfragmented message | 88 ///============================================================ 89 ///| Table 1: Fragment Description Flags | 90 ///============================================================ 91 #[derive(Debug, Clone)] 92 pub struct ChunkPayloadData { 93 pub(crate) unordered: bool, 94 pub(crate) beginning_fragment: bool, 95 pub(crate) ending_fragment: bool, 96 pub(crate) immediate_sack: bool, 97 98 pub(crate) tsn: u32, 99 pub(crate) stream_identifier: u16, 100 pub(crate) stream_sequence_number: u16, 101 pub(crate) payload_type: PayloadProtocolIdentifier, 102 pub(crate) user_data: Bytes, 103 104 /// Whether this data chunk was acknowledged (received by peer) 105 pub(crate) acked: bool, 106 pub(crate) miss_indicator: u32, 107 108 /// Partial-reliability parameters used only by sender 109 pub(crate) since: SystemTime, 110 /// number of transmission made for this chunk 111 pub(crate) nsent: u32, 112 113 /// valid only with the first fragment 114 pub(crate) abandoned: Arc<AtomicBool>, 115 /// valid only with the first fragment 116 pub(crate) all_inflight: Arc<AtomicBool>, 117 118 /// Retransmission flag set when T1-RTX timeout occurred and this 119 /// chunk is still in the inflight queue 120 pub(crate) retransmit: bool, 121 } 122 123 impl Default for ChunkPayloadData { default() -> Self124 fn default() -> Self { 125 ChunkPayloadData { 126 unordered: false, 127 beginning_fragment: false, 128 ending_fragment: false, 129 immediate_sack: false, 130 tsn: 0, 131 stream_identifier: 0, 132 stream_sequence_number: 0, 133 payload_type: PayloadProtocolIdentifier::default(), 134 user_data: Bytes::new(), 135 acked: false, 136 miss_indicator: 0, 137 since: SystemTime::now(), 138 nsent: 0, 139 abandoned: Arc::new(AtomicBool::new(false)), 140 all_inflight: Arc::new(AtomicBool::new(false)), 141 retransmit: false, 142 } 143 } 144 } 145 146 /// makes chunkPayloadData printable 147 impl fmt::Display for ChunkPayloadData { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result148 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 149 write!(f, "{}\n{}", self.header(), self.tsn) 150 } 151 } 152 153 impl Chunk for ChunkPayloadData { header(&self) -> ChunkHeader154 fn header(&self) -> ChunkHeader { 155 let mut flags: u8 = 0; 156 if self.ending_fragment { 157 flags = 1; 158 } 159 if self.beginning_fragment { 160 flags |= 1 << 1; 161 } 162 if self.unordered { 163 flags |= 1 << 2; 164 } 165 if self.immediate_sack { 166 flags |= 1 << 3; 167 } 168 169 ChunkHeader { 170 typ: CT_PAYLOAD_DATA, 171 flags, 172 value_length: self.value_length() as u16, 173 } 174 } 175 unmarshal(raw: &Bytes) -> Result<Self>176 fn unmarshal(raw: &Bytes) -> Result<Self> { 177 let header = ChunkHeader::unmarshal(raw)?; 178 179 if header.typ != CT_PAYLOAD_DATA { 180 return Err(Error::ErrChunkTypeNotPayloadData); 181 } 182 183 let immediate_sack = (header.flags & PAYLOAD_DATA_IMMEDIATE_SACK) != 0; 184 let unordered = (header.flags & PAYLOAD_DATA_UNORDERED_BITMASK) != 0; 185 let beginning_fragment = (header.flags & PAYLOAD_DATA_BEGINING_FRAGMENT_BITMASK) != 0; 186 let ending_fragment = (header.flags & PAYLOAD_DATA_ENDING_FRAGMENT_BITMASK) != 0; 187 188 // validity of value_length is checked in ChunkHeader::unmarshal 189 if header.value_length() < PAYLOAD_DATA_HEADER_SIZE { 190 return Err(Error::ErrChunkPayloadSmall); 191 } 192 193 let reader = &mut raw.slice(CHUNK_HEADER_SIZE..CHUNK_HEADER_SIZE + header.value_length()); 194 195 let tsn = reader.get_u32(); 196 let stream_identifier = reader.get_u16(); 197 let stream_sequence_number = reader.get_u16(); 198 let payload_type: PayloadProtocolIdentifier = reader.get_u32().into(); 199 let user_data = raw.slice( 200 CHUNK_HEADER_SIZE + PAYLOAD_DATA_HEADER_SIZE..CHUNK_HEADER_SIZE + header.value_length(), 201 ); 202 203 Ok(ChunkPayloadData { 204 unordered, 205 beginning_fragment, 206 ending_fragment, 207 immediate_sack, 208 209 tsn, 210 stream_identifier, 211 stream_sequence_number, 212 payload_type, 213 user_data, 214 acked: false, 215 miss_indicator: 0, 216 since: SystemTime::now(), 217 nsent: 0, 218 abandoned: Arc::new(AtomicBool::new(false)), 219 all_inflight: Arc::new(AtomicBool::new(false)), 220 retransmit: false, 221 }) 222 } 223 marshal_to(&self, writer: &mut BytesMut) -> Result<usize>224 fn marshal_to(&self, writer: &mut BytesMut) -> Result<usize> { 225 self.header().marshal_to(writer)?; 226 227 writer.put_u32(self.tsn); 228 writer.put_u16(self.stream_identifier); 229 writer.put_u16(self.stream_sequence_number); 230 writer.put_u32(self.payload_type as u32); 231 writer.extend_from_slice(&self.user_data); 232 233 Ok(writer.len()) 234 } 235 check(&self) -> Result<()>236 fn check(&self) -> Result<()> { 237 Ok(()) 238 } 239 value_length(&self) -> usize240 fn value_length(&self) -> usize { 241 PAYLOAD_DATA_HEADER_SIZE + self.user_data.len() 242 } 243 as_any(&self) -> &(dyn Any + Send + Sync)244 fn as_any(&self) -> &(dyn Any + Send + Sync) { 245 self 246 } 247 } 248 249 impl ChunkPayloadData { abandoned(&self) -> bool250 pub(crate) fn abandoned(&self) -> bool { 251 let (abandoned, all_inflight) = ( 252 self.abandoned.load(Ordering::SeqCst), 253 self.all_inflight.load(Ordering::SeqCst), 254 ); 255 256 abandoned && all_inflight 257 } 258 set_abandoned(&self, abandoned: bool)259 pub(crate) fn set_abandoned(&self, abandoned: bool) { 260 self.abandoned.store(abandoned, Ordering::SeqCst); 261 } 262 set_all_inflight(&mut self)263 pub(crate) fn set_all_inflight(&mut self) { 264 if self.ending_fragment { 265 self.all_inflight.store(true, Ordering::SeqCst); 266 } 267 } 268 } 269