1 use { 2 super::{ 3 define, define::ClientHandshakeState, errors::HandshakeError, 4 handshake_trait::THandshakeClient, utils, 5 }, 6 byteorder::BigEndian, 7 bytes::BytesMut, 8 bytesio::{bytes_reader::BytesReader, bytes_writer::AsyncBytesWriter, bytesio::TNetIO}, 9 std::sync::Arc, 10 tokio::sync::Mutex, 11 }; 12 13 // use super::define; 14 // use super::utils; 15 // use super::{define::ClientHandshakeState, handshake_trait::THandshakeClient}; 16 17 pub struct SimpleHandshakeClient { 18 reader: BytesReader, 19 writer: AsyncBytesWriter, 20 s1_bytes: BytesMut, 21 pub state: ClientHandshakeState, 22 } 23 24 impl SimpleHandshakeClient { new(io: Arc<Mutex<Box<dyn TNetIO + Send + Sync>>>) -> Self25 pub fn new(io: Arc<Mutex<Box<dyn TNetIO + Send + Sync>>>) -> Self { 26 Self { 27 reader: BytesReader::new(BytesMut::new()), 28 writer: AsyncBytesWriter::new(io), 29 s1_bytes: BytesMut::new(), 30 state: ClientHandshakeState::WriteC0C1, 31 } 32 } 33 extend_data(&mut self, data: &[u8])34 pub fn extend_data(&mut self, data: &[u8]) { 35 self.reader.extend_from_slice(data); 36 } flush(&mut self) -> Result<(), HandshakeError>37 pub async fn flush(&mut self) -> Result<(), HandshakeError> { 38 self.writer.flush().await?; 39 Ok(()) 40 } 41 handshake(&mut self) -> Result<(), HandshakeError>42 pub async fn handshake(&mut self) -> Result<(), HandshakeError> { 43 loop { 44 match self.state { 45 ClientHandshakeState::WriteC0C1 => { 46 self.write_c0()?; 47 self.write_c1()?; 48 self.flush().await?; 49 self.state = ClientHandshakeState::ReadS0S1S2; 50 break; 51 } 52 53 ClientHandshakeState::ReadS0S1S2 => { 54 self.read_s0()?; 55 self.read_s1()?; 56 self.read_s2()?; 57 self.state = ClientHandshakeState::WriteC2; 58 } 59 60 ClientHandshakeState::WriteC2 => { 61 self.write_c2()?; 62 self.flush().await?; 63 self.state = ClientHandshakeState::Finish; 64 } 65 66 ClientHandshakeState::Finish => { 67 break; 68 } 69 } 70 } 71 72 Ok(()) 73 } 74 } 75 76 impl THandshakeClient for SimpleHandshakeClient { write_c0(&mut self) -> Result<(), HandshakeError>77 fn write_c0(&mut self) -> Result<(), HandshakeError> { 78 self.writer.write_u8(define::RTMP_VERSION as u8)?; 79 Ok(()) 80 } write_c1(&mut self) -> Result<(), HandshakeError>81 fn write_c1(&mut self) -> Result<(), HandshakeError> { 82 self.writer.write_u32::<BigEndian>(utils::current_time())?; 83 self.writer.write_u32::<BigEndian>(0)?; 84 85 self.writer 86 .write_random_bytes((define::RTMP_HANDSHAKE_SIZE - 8) as u32)?; 87 Ok(()) 88 } write_c2(&mut self) -> Result<(), HandshakeError>89 fn write_c2(&mut self) -> Result<(), HandshakeError> { 90 self.writer.write(&self.s1_bytes[0..])?; 91 Ok(()) 92 } 93 read_s0(&mut self) -> Result<(), HandshakeError>94 fn read_s0(&mut self) -> Result<(), HandshakeError> { 95 self.reader.read_u8()?; 96 Ok(()) 97 } read_s1(&mut self) -> Result<(), HandshakeError>98 fn read_s1(&mut self) -> Result<(), HandshakeError> { 99 self.s1_bytes = self.reader.read_bytes(define::RTMP_HANDSHAKE_SIZE)?; 100 Ok(()) 101 } read_s2(&mut self) -> Result<(), HandshakeError>102 fn read_s2(&mut self) -> Result<(), HandshakeError> { 103 let _ = self.reader.read_bytes(define::RTMP_HANDSHAKE_SIZE)?; 104 Ok(()) 105 } 106 } 107