xref: /xiu/protocol/rtmp/src/chunk/packetizer.rs (revision 2f8005f4)
1 use {
2     super::{
3         define::CHUNK_SIZE, errors::PackError, ChunkBasicHeader, ChunkHeader, ChunkInfo,
4         ChunkMessageHeader,
5     },
6     byteorder::{BigEndian, LittleEndian},
7     bytesio::{bytes_writer::AsyncBytesWriter, bytesio::TNetIO},
8     std::{collections::HashMap, sync::Arc},
9     tokio::sync::Mutex,
10 };
11 
12 #[derive(Eq, PartialEq, Debug)]
13 pub enum PackResult {
14     Success,
15     NotEnoughBytes,
16 }
17 
18 pub struct ChunkPacketizer {
19     csid_2_chunk_header: HashMap<u32, ChunkHeader>,
20     //https://doc.rust-lang.org/stable/rust-by-example/scope/lifetime/fn.html
21     //https://zhuanlan.zhihu.com/p/165976086
22     //chunk_info: ChunkInfo,
23     max_chunk_size: usize,
24     //bytes: Cursor<Vec<u8>>,
25     writer: AsyncBytesWriter,
26 }
27 
28 impl ChunkPacketizer {
29     pub fn new(io: Arc<Mutex<Box<dyn TNetIO + Send + Sync>>>) -> Self {
30         Self {
31             csid_2_chunk_header: HashMap::new(),
32             //chunk_info: ChunkInfo::new(),
33             writer: AsyncBytesWriter::new(io),
34             max_chunk_size: CHUNK_SIZE as usize,
35         }
36     }
37     fn zip_chunk_header(&mut self, chunk_info: &mut ChunkInfo) -> Result<PackResult, PackError> {
38         chunk_info.basic_header.format = 0;
39 
40         let pre_header = self
41             .csid_2_chunk_header
42             .get_mut(&chunk_info.basic_header.chunk_stream_id);
43 
44         if let Some(val) = pre_header {
45             let cur_msg_header = &mut chunk_info.message_header;
46             let pre_msg_header = &val.message_header;
47 
48             if cur_msg_header.msg_streamd_id == pre_msg_header.msg_streamd_id {
49                 chunk_info.basic_header.format = 1;
50                 cur_msg_header.timestamp -= pre_msg_header.timestamp;
51 
52                 if cur_msg_header.msg_type_id == pre_msg_header.msg_type_id
53                     && cur_msg_header.msg_length == pre_msg_header.msg_length
54                 {
55                     chunk_info.basic_header.format = 2;
56                     if chunk_info.message_header.timestamp == pre_msg_header.timestamp {
57                         chunk_info.basic_header.format = 3;
58                     }
59                 }
60             }
61         }
62         Ok(PackResult::Success)
63     }
64 
65     fn write_basic_header(&mut self, fmt: u8, csid: u32) -> Result<(), PackError> {
66         if csid >= 64 + 255 {
67             self.writer.write_u8(fmt << 6 | 1)?;
68             self.writer.write_u16::<BigEndian>((csid - 64) as u16)?;
69         } else if csid >= 64 {
70             self.writer.write_u8(fmt << 6)?;
71             self.writer.write_u8((csid - 64) as u8)?;
72         } else {
73             self.writer.write_u8(fmt << 6 | csid as u8)?;
74         }
75 
76         Ok(())
77     }
78 
79     fn write_message_header(
80         &mut self,
81         basic_header: &ChunkBasicHeader,
82         message_header: &mut ChunkMessageHeader,
83     ) -> Result<(), PackError> {
84         let timestamp = if message_header.timestamp >= 0xFFFFFF {
85             message_header.is_extended_timestamp = true;
86             0xFFFFFF
87         } else {
88             message_header.timestamp
89         };
90 
91         match basic_header.format {
92             0 => {
93                 self.writer.write_u24::<BigEndian>(timestamp)?;
94                 self.writer
95                     .write_u24::<BigEndian>(message_header.msg_length)?;
96                 self.writer.write_u8(message_header.msg_type_id)?;
97                 self.writer
98                     .write_u32::<LittleEndian>(message_header.msg_streamd_id)?;
99             }
100             1 => {
101                 self.writer.write_u24::<BigEndian>(timestamp)?;
102                 self.writer
103                     .write_u24::<BigEndian>(message_header.msg_length)?;
104             }
105             2 => {
106                 self.writer.write_u24::<BigEndian>(timestamp)?;
107             }
108             _ => {}
109         }
110 
111         Ok(())
112     }
113 
114     fn write_extened_timestamp(&mut self, timestamp: u32) -> Result<(), PackError> {
115         self.writer.write_u32::<BigEndian>(timestamp)?;
116 
117         Ok(())
118     }
119 
120     pub async fn write_chunk(&mut self, chunk_info: &mut ChunkInfo) -> Result<(), PackError> {
121         self.zip_chunk_header(chunk_info)?;
122 
123         let mut whole_payload_size = chunk_info.payload.len();
124 
125         self.write_basic_header(
126             chunk_info.basic_header.format,
127             chunk_info.basic_header.chunk_stream_id,
128         )?;
129         self.write_message_header(&chunk_info.basic_header, &mut chunk_info.message_header)?;
130 
131         if chunk_info.message_header.is_extended_timestamp {
132             self.write_extened_timestamp(chunk_info.message_header.timestamp)?;
133         }
134 
135         let mut cur_payload_size: usize;
136 
137         while whole_payload_size > 0 {
138             cur_payload_size = if whole_payload_size > self.max_chunk_size {
139                 self.max_chunk_size
140             } else {
141                 whole_payload_size
142             };
143 
144             let payload_bytes = chunk_info.payload.split_to(cur_payload_size);
145             self.writer.write(&payload_bytes[0..])?;
146 
147             whole_payload_size -= cur_payload_size;
148 
149             if whole_payload_size > 0 {
150                 self.write_basic_header(3, chunk_info.basic_header.chunk_stream_id)?;
151                 if chunk_info.message_header.is_extended_timestamp {
152                     self.write_extened_timestamp(chunk_info.message_header.timestamp)?;
153                 }
154             }
155         }
156         self.writer.flush().await?;
157 
158         Ok(())
159     }
160 }
161