1 use {
2     super::{
3         define, define::ClientHandshakeState, errors::HandshakeError,
4         handshake_trait::THandshakeClient, utils,
5     },
6     byteorder::BigEndian,
7     bytes::BytesMut,
8     bytesio::{bytes_reader::BytesReader, bytes_writer::AsyncBytesWriter, bytesio::TNetIO},
9     std::sync::Arc,
10     tokio::sync::Mutex,
11 };
12 
13 // use super::define;
14 // use super::utils;
15 // use super::{define::ClientHandshakeState, handshake_trait::THandshakeClient};
16 
17 pub struct SimpleHandshakeClient {
18     reader: BytesReader,
19     writer: AsyncBytesWriter,
20     s1_bytes: BytesMut,
21     pub state: ClientHandshakeState,
22 }
23 
24 impl SimpleHandshakeClient {
new(io: Arc<Mutex<Box<dyn TNetIO + Send + Sync>>>) -> Self25     pub fn new(io: Arc<Mutex<Box<dyn TNetIO + Send + Sync>>>) -> Self {
26         Self {
27             reader: BytesReader::new(BytesMut::new()),
28             writer: AsyncBytesWriter::new(io),
29             s1_bytes: BytesMut::new(),
30             state: ClientHandshakeState::WriteC0C1,
31         }
32     }
33 
extend_data(&mut self, data: &[u8])34     pub fn extend_data(&mut self, data: &[u8]) {
35         self.reader.extend_from_slice(data);
36     }
flush(&mut self) -> Result<(), HandshakeError>37     pub async fn flush(&mut self) -> Result<(), HandshakeError> {
38         self.writer.flush().await?;
39         Ok(())
40     }
41 
handshake(&mut self) -> Result<(), HandshakeError>42     pub async fn handshake(&mut self) -> Result<(), HandshakeError> {
43         loop {
44             match self.state {
45                 ClientHandshakeState::WriteC0C1 => {
46                     self.write_c0()?;
47                     self.write_c1()?;
48                     self.flush().await?;
49                     self.state = ClientHandshakeState::ReadS0S1S2;
50                     break;
51                 }
52 
53                 ClientHandshakeState::ReadS0S1S2 => {
54                     self.read_s0()?;
55                     self.read_s1()?;
56                     self.read_s2()?;
57                     self.state = ClientHandshakeState::WriteC2;
58                 }
59 
60                 ClientHandshakeState::WriteC2 => {
61                     self.write_c2()?;
62                     self.flush().await?;
63                     self.state = ClientHandshakeState::Finish;
64                 }
65 
66                 ClientHandshakeState::Finish => {
67                     break;
68                 }
69             }
70         }
71 
72         Ok(())
73     }
74 }
75 
76 impl THandshakeClient for SimpleHandshakeClient {
write_c0(&mut self) -> Result<(), HandshakeError>77     fn write_c0(&mut self) -> Result<(), HandshakeError> {
78         self.writer.write_u8(define::RTMP_VERSION as u8)?;
79         Ok(())
80     }
write_c1(&mut self) -> Result<(), HandshakeError>81     fn write_c1(&mut self) -> Result<(), HandshakeError> {
82         self.writer.write_u32::<BigEndian>(utils::current_time())?;
83         self.writer.write_u32::<BigEndian>(0)?;
84 
85         self.writer
86             .write_random_bytes((define::RTMP_HANDSHAKE_SIZE - 8) as u32)?;
87         Ok(())
88     }
write_c2(&mut self) -> Result<(), HandshakeError>89     fn write_c2(&mut self) -> Result<(), HandshakeError> {
90         self.writer.write(&self.s1_bytes[0..])?;
91         Ok(())
92     }
93 
read_s0(&mut self) -> Result<(), HandshakeError>94     fn read_s0(&mut self) -> Result<(), HandshakeError> {
95         self.reader.read_u8()?;
96         Ok(())
97     }
read_s1(&mut self) -> Result<(), HandshakeError>98     fn read_s1(&mut self) -> Result<(), HandshakeError> {
99         self.s1_bytes = self.reader.read_bytes(define::RTMP_HANDSHAKE_SIZE)?;
100         Ok(())
101     }
read_s2(&mut self) -> Result<(), HandshakeError>102     fn read_s2(&mut self) -> Result<(), HandshakeError> {
103         let _ = self.reader.read_bytes(define::RTMP_HANDSHAKE_SIZE)?;
104         Ok(())
105     }
106 }
107