1 use super::*; 2 use crate::config::*; 3 use crate::protection_profile::*; 4 5 use std::io::{BufReader, BufWriter}; 6 7 use util::Error; 8 9 use tokio::net::UdpSocket; 10 11 async fn build_session_srtcp_pair() -> Result<(Session, Session), Error> { 12 let ua = UdpSocket::bind("127.0.0.1:0").await?; 13 let ub = UdpSocket::bind("127.0.0.1:0").await?; 14 15 ua.connect(ub.local_addr()?).await?; 16 ub.connect(ua.local_addr()?).await?; 17 18 let ca = Config { 19 profile: PROTECTION_PROFILE_AES128CM_HMAC_SHA1_80, 20 keys: SessionKeys { 21 local_master_key: vec![ 22 0xE1, 0xF9, 0x7A, 0x0D, 0x3E, 0x01, 0x8B, 0xE0, 0xD6, 0x4F, 0xA3, 0x2C, 0x06, 0xDE, 23 0x41, 0x39, 24 ], 25 local_master_salt: vec![ 26 0x0E, 0xC6, 0x75, 0xAD, 0x49, 0x8A, 0xFE, 0xEB, 0xB6, 0x96, 0x0B, 0x3A, 0xAB, 0xE6, 27 ], 28 remote_master_key: vec![ 29 0xE1, 0xF9, 0x7A, 0x0D, 0x3E, 0x01, 0x8B, 0xE0, 0xD6, 0x4F, 0xA3, 0x2C, 0x06, 0xDE, 30 0x41, 0x39, 31 ], 32 remote_master_salt: vec![ 33 0x0E, 0xC6, 0x75, 0xAD, 0x49, 0x8A, 0xFE, 0xEB, 0xB6, 0x96, 0x0B, 0x3A, 0xAB, 0xE6, 34 ], 35 }, 36 37 local_rtp_options: None, 38 remote_rtp_options: None, 39 40 local_rtcp_options: None, 41 remote_rtcp_options: None, 42 }; 43 let cb = Config { 44 profile: PROTECTION_PROFILE_AES128CM_HMAC_SHA1_80, 45 keys: SessionKeys { 46 local_master_key: vec![ 47 0xE1, 0xF9, 0x7A, 0x0D, 0x3E, 0x01, 0x8B, 0xE0, 0xD6, 0x4F, 0xA3, 0x2C, 0x06, 0xDE, 48 0x41, 0x39, 49 ], 50 local_master_salt: vec![ 51 0x0E, 0xC6, 0x75, 0xAD, 0x49, 0x8A, 0xFE, 0xEB, 0xB6, 0x96, 0x0B, 0x3A, 0xAB, 0xE6, 52 ], 53 remote_master_key: vec![ 54 0xE1, 0xF9, 0x7A, 0x0D, 0x3E, 0x01, 0x8B, 0xE0, 0xD6, 0x4F, 0xA3, 0x2C, 0x06, 0xDE, 55 0x41, 0x39, 56 ], 57 remote_master_salt: vec![ 58 0x0E, 0xC6, 0x75, 0xAD, 0x49, 0x8A, 0xFE, 0xEB, 0xB6, 0x96, 0x0B, 0x3A, 0xAB, 0xE6, 59 ], 60 }, 61 62 local_rtp_options: None, 63 remote_rtp_options: None, 64 65 local_rtcp_options: None, 66 remote_rtcp_options: None, 67 }; 68 69 let sa = Session::new(ua, ca, false).await?; 70 let sb = Session::new(ub, cb, false).await?; 71 72 Ok((sa, sb)) 73 } 74 75 const TEST_SSRC: u32 = 5000; 76 const RTCP_HEADER_SIZE: usize = 4; 77 78 #[tokio::test] 79 async fn test_session_srtcp_accept() -> Result<(), Error> { 80 let (mut sa, mut sb) = build_session_srtcp_pair().await?; 81 82 let rtcp_packet = rtcp::packet::Packet::PictureLossIndication( 83 rtcp::picture_loss_indication::PictureLossIndication { 84 media_ssrc: TEST_SSRC, 85 ..Default::default() 86 }, 87 ); 88 let mut test_payload = vec![]; 89 { 90 let mut writer = BufWriter::<&mut Vec<u8>>::new(test_payload.as_mut()); 91 rtcp_packet.marshal(&mut writer)?; 92 } 93 let mut read_buffer = vec![0; test_payload.len()]; 94 95 sa.write_rtcp(&rtcp_packet).await?; 96 97 let mut read_stream = sb.accept().await?; 98 let ssrc = read_stream.get_ssrc(); 99 assert_eq!( 100 ssrc, TEST_SSRC, 101 "SSRC mismatch during accept exp({}) actual({})", 102 TEST_SSRC, ssrc 103 ); 104 105 read_stream.read(&mut read_buffer).await?; 106 107 assert_eq!( 108 &test_payload[..], 109 &read_buffer[..], 110 "Sent buffer does not match the one received exp({:?}) actual({:?})", 111 &test_payload[..], 112 &read_buffer[..] 113 ); 114 115 sa.close().await?; 116 sb.close().await?; 117 118 Ok(()) 119 } 120 121 #[tokio::test] 122 async fn test_session_srtcp_listen() -> Result<(), Error> { 123 let (mut sa, mut sb) = build_session_srtcp_pair().await?; 124 125 let rtcp_packet = rtcp::packet::Packet::PictureLossIndication( 126 rtcp::picture_loss_indication::PictureLossIndication { 127 media_ssrc: TEST_SSRC, 128 ..Default::default() 129 }, 130 ); 131 let mut test_payload = vec![]; 132 { 133 let mut writer = BufWriter::<&mut Vec<u8>>::new(test_payload.as_mut()); 134 rtcp_packet.marshal(&mut writer)?; 135 } 136 let mut read_buffer = vec![0; test_payload.len()]; 137 138 let mut read_stream = sb.listen(TEST_SSRC).await?; 139 140 sa.write_rtcp(&rtcp_packet).await?; 141 142 read_stream.read(&mut read_buffer).await?; 143 144 assert_eq!( 145 &test_payload[..], 146 &read_buffer[..], 147 "Sent buffer does not match the one received exp({:?}) actual({:?})", 148 &test_payload[..], 149 &read_buffer[..] 150 ); 151 152 sa.close().await?; 153 sb.close().await?; 154 155 Ok(()) 156 } 157 158 fn encrypt_srtcp(context: &mut Context, pkt: &rtcp::packet::Packet) -> Result<Vec<u8>, Error> { 159 let mut decrypted = vec![]; 160 { 161 let mut writer = BufWriter::<&mut Vec<u8>>::new(decrypted.as_mut()); 162 pkt.marshal(&mut writer)?; 163 } 164 165 let encrypted = context.encrypt_rtcp(&decrypted)?; 166 167 Ok(encrypted) 168 } 169 170 const PLI_PACKET_SIZE: usize = 8; 171 172 async fn get_sender_ssrc(read_stream: &mut Stream) -> Result<u32, Error> { 173 let auth_tag_size = PROTECTION_PROFILE_AES128CM_HMAC_SHA1_80.auth_tag_len()?; 174 let mut read_buffer = vec![0; PLI_PACKET_SIZE + auth_tag_size]; 175 176 let (n, _) = read_stream.read_rtcp(&mut read_buffer).await?; 177 178 let mut reader = BufReader::new(&read_buffer[0..n]); 179 let pli = rtcp::picture_loss_indication::PictureLossIndication::unmarshal(&mut reader)?; 180 181 Ok(pli.sender_ssrc) 182 } 183 184 #[tokio::test] 185 async fn test_session_srtcp_replay_protection() -> Result<(), Error> { 186 let (mut sa, mut sb) = build_session_srtcp_pair().await?; 187 188 let mut read_stream = sb.listen(TEST_SSRC).await?; 189 190 // Generate test packets 191 let mut packets = vec![]; 192 let mut expected_ssrc = vec![]; 193 { 194 let mut local_context = sa.local_context.lock().await; 195 for i in 0..0x10u32 { 196 expected_ssrc.push(i); 197 198 let packet = rtcp::packet::Packet::PictureLossIndication( 199 rtcp::picture_loss_indication::PictureLossIndication { 200 media_ssrc: TEST_SSRC, 201 sender_ssrc: i, 202 }, 203 ); 204 205 let encrypted = encrypt_srtcp(&mut local_context, &packet)?; 206 207 packets.push(encrypted); 208 } 209 } 210 211 let (done_tx, mut done_rx) = mpsc::channel::<()>(1); 212 213 let received_ssrc = Arc::new(Mutex::new(vec![])); 214 let cloned_received_ssrc = Arc::clone(&received_ssrc); 215 let count = expected_ssrc.len(); 216 217 tokio::spawn(async move { 218 let mut i = 0; 219 while i < count { 220 match get_sender_ssrc(&mut read_stream).await { 221 Ok(ssrc) => { 222 let mut r = cloned_received_ssrc.lock().await; 223 r.push(ssrc); 224 225 i += 1; 226 } 227 Err(_) => break, 228 } 229 } 230 231 drop(done_tx); 232 }); 233 234 // Write with replay attack 235 for packet in &packets { 236 sa.udp_tx.send(packet).await?; 237 238 // Immediately replay 239 sa.udp_tx.send(packet).await?; 240 } 241 for packet in &packets { 242 // Delayed replay 243 sa.udp_tx.send(packet).await?; 244 } 245 246 done_rx.recv().await; 247 248 sa.close().await?; 249 sb.close().await?; 250 251 { 252 let received_ssrc = received_ssrc.lock().await; 253 assert_eq!(&expected_ssrc[..], &received_ssrc[..]); 254 } 255 256 Ok(()) 257 } 258