History log of /webrtc/rtp/src/sequence.rs (Results 1 – 6 of 6)
Revision (<<< Hide revision tags) (Show revision tags >>>) Date Author Comments
# bc5c52e6 27-May-2023 yngrtc <[email protected]>

[RTCP] remove unnecessary async function/trait


Revision tags: constraints-v0.1.0, interceptor-v0.8.2, rtcp-v0.7.2, mdns-v0.5.2, v0.6.0, interceptor-v0.8.1, media-v0.5.0, data-v0.6.0, sctp-v0.7.0, srtp-v0.9.1, rtcp-v0.7.1, rtp-v0.6.8, dtls-v0.7.0, ice-v0.9.0, turn-v0.6.1, stun-v0.4.4, mdns-v0.5.1, sdp-v0.5.3, util-v0.7.0
# 5b79f08a 14-Nov-2022 alexlapa <[email protected]>

remove unnecessary async in public API (#338)

> mind if i create a PR that makes all callback setters (e.g. RTCPeerConnection::on_ice_candidate()) non async? Currently callbacks are stored behind to

remove unnecessary async in public API (#338)

> mind if i create a PR that makes all callback setters (e.g. RTCPeerConnection::on_ice_candidate()) non async? Currently callbacks are stored behind tokio::Mutex'es, i believe that ArcSwap can be used here, so we can use atomic load/store. It wont affect performance in any way, just some quality of life improvements.

> All good with me. I'd like to get rid of tokio::Mutex, replacing it with sync variants, where that's possible

So i've refactored all public callback setters here.

Regarding removing `tokio::Mutex`es that you've mentioned, well, i've replaced some of those, but i don't want this PR to become even bigger, and it would be nice if you provide some feedback first. I've used both `ArcSwap` and `std::Mutex`, i guess its ok to use blocking mutex in simple cases, when you don't need to hold it, e.g. copy, store, swap, take one-liners. I will prepare follow-up PRs on this issue if this PR looks good to you.

And one more thing regarding the callbacks, maybe it makes sense to make them just dumb `Box<dyn Fn(_)>`? So `Box<dyn (FnMut(ConnectionState) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>>) + Send + Sync>` => `Box<dyn (Fn(ConnectionState)) + Send + Sync>`. Yeah, that might hurt users in some rare cases, but it would make everything simpler.

Been using this to play around with detecting deadlocks, didn't find anything though.

```diff
diff --git a/util/src/sync/mod.rs b/util/src/sync/mod.rs
index 6a101299..d5eb835d 100644
--- a/util/src/sync/mod.rs
+++ b/util/src/sync/mod.rs
@@ -1,4 +1,150 @@
-use std::{ops, sync};
+use std::backtrace::Backtrace;
+use std::sync::atomic::{AtomicBool, Ordering};
+use std::sync::Arc;
+use std::time::{Duration, Instant};
+use std::{fmt, ops, sync, thread};
+
+const MAX_AGE: Duration = Duration::from_millis(5000);
+const CHECK_INTERVAL: Duration = Duration::from_millis(20);
+static DEADLOCK_THREAD_STARTED: AtomicBool = AtomicBool::new(false);
+lazy_static! {
+ static ref TRACKED_LOCKS: Arc<sync::RwLock<Vec<TrackedLock>>> =
+ Arc::new(sync::RwLock::new(Vec::new()));
+}
+
+struct TrackedLock {
+ backtrace: Backtrace,
+ tracker: AliveTracker,
+ typ: Type,
+ reported: AtomicBool,
+}
+
+impl TrackedLock {
+ fn mark_reported(&self) -> bool {
+ self.reported
+ .compare_exchange(false, true, Ordering::Acquire, Ordering::Acquire)
+ .is_ok()
+ }
+}
+
+#[derive(Debug)]
+enum Type {
+ MutexLock,
+ RwReadLock,
+ RwWriteLock,
+}
+
+impl fmt::Display for Type {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ match self {
+ Type::MutexLock => write!(f, "MutexLock"),
+ Type::RwReadLock => write!(f, "RwReadLock"),
+ Type::RwWriteLock => write!(f, "RwWriteLock"),
+ }
+ }
+}
+
+struct Alive {
+ live: Arc<AtomicBool>,
+ born_at: Instant,
+}
+
+impl Alive {
+ fn new() -> Self {
+ Self {
+ live: Arc::new(AtomicBool::new(true)),
+ born_at: Instant::now(),
+ }
+ }
+
+ fn track(&self) -> AliveTracker {
+ AliveTracker {
+ live: self.live.clone(),
+ born_at: self.born_at.clone(),
+ }
+ }
+
+ fn mark_dead(&self) {
+ self.live.store(false, Ordering::Release);
+ }
+}
+
+impl Drop for Alive {
+ fn drop(&mut self) {
+ self.mark_dead();
+ }
+}
+
+struct AliveTracker {
+ live: Arc<AtomicBool>,
+ born_at: Instant,
+}
+
+impl AliveTracker {
+ fn is_dead(&self) -> bool {
+ !self.live.load(Ordering::Acquire)
+ }
+
+ fn age(&self) -> Option<Duration> {
+ if self.is_dead() {
+ None
+ } else {
+ Some(Instant::now() - self.born_at)
+ }
+ }
+}
+
+fn start_deadlock_thread() {
+ if DEADLOCK_THREAD_STARTED
+ .compare_exchange(false, true, Ordering::Acquire, Ordering::Acquire)
+ .is_err()
+ {
+ return;
+ }
+
+ thread::spawn(|| loop {
+ let lock = TRACKED_LOCKS.read().unwrap();
+ let mut needs_cleanup = false;
+
+ for tracked in lock.iter() {
+ let age = tracked.tracker.age();
+ match age {
+ Some(age) if age > MAX_AGE => {
+ if tracked.mark_reported() {
+ println!(
+ "Potential deadlock on {}. Backtrace:\n{}Held for: {}s",
+ tracked.typ,
+ tracked.backtrace,
+ age.as_secs_f64()
+ );
+ }
+ }
+ None => {
+ // Lock is dead
+ needs_cleanup = true;
+ }
+ _ => {}
+ }
+ }
+ drop(lock);
+
+ if needs_cleanup {
+ let mut write = TRACKED_LOCKS.write().unwrap();
+ write.retain(|tracked| !tracked.tracker.is_dead());
+ }
+ });
+}
+
+fn track(backtrace: Backtrace, alive: &Alive, typ: Type) {
+ start_deadlock_thread();
+ let mut write = TRACKED_LOCKS.write().unwrap();
+ write.push(TrackedLock {
+ backtrace,
+ tracker: alive.track(),
+ typ,
+ reported: AtomicBool::new(false),
+ });
+}

/// A synchronous mutual exclusion primitive useful for protecting shared data.
#[derive(Default, Debug)]
@@ -13,8 +159,10 @@ impl<T> Mutex<T> {
/// Acquires a mutex, blocking the current thread until it is able to do so.
pub fn lock(&self) -> MutexGuard<'_, T> {
let guard = self.0.lock().unwrap();
+ let alive = Alive::new();
+ track(Backtrace::capture(), &alive, Type::MutexLock);

- MutexGuard(guard)
+ MutexGuard(guard, alive)
}

/// Consumes this mutex, returning the underlying data.
@@ -25,7 +173,7 @@ impl<T> Mutex<T> {

/// An RAII implementation of a "scoped lock" of a mutex. When this structure is
/// dropped (falls out of scope), the lock will be unlocked.
-pub struct MutexGuard<'a, T>(sync::MutexGuard<'a, T>);
+pub struct MutexGuard<'a, T>(sync::MutexGuard<'a, T>, Alive);

impl<'a, T> ops::Deref for MutexGuard<'a, T> {
type Target = T;
@@ -55,22 +203,26 @@ impl<T> RwLock<T> {
/// until it can be acquired.
pub fn read(&self) -> RwLockReadGuard<'_, T> {
let guard = self.0.read().unwrap();
+ let alive = Alive::new();
+ track(Backtrace::capture(), &alive, Type::MutexLock);

- RwLockReadGuard(guard)
+ RwLockReadGuard(guard, alive)
}

/// Locks this rwlock with exclusive write access, blocking the current
/// thread until it can be acquired.
pub fn write(&self) -> RwLockWriteGuard<'_, T> {
let guard = self.0.write().unwrap();
+ let alive = Alive::new();
+ track(Backtrace::capture(), &alive, Type::MutexLock);

- RwLockWriteGuard(guard)
+ RwLockWriteGuard(guard, alive)
}
}

/// RAII structure used to release the shared read access of a lock when
/// dropped.
-pub struct RwLockReadGuard<'a, T>(sync::RwLockReadGuard<'a, T>);
+pub struct RwLockReadGuard<'a, T>(sync::RwLockReadGuard<'a, T>, Alive);

impl<'a, T> ops::Deref for RwLockReadGuard<'a, T> {
type Target = T;
@@ -82,7 +234,7 @@ impl<'a, T> ops::Deref for RwLockReadGuard<'a, T> {

/// RAII structure used to release the shared read access of a lock when
/// dropped.
-pub struct RwLockWriteGuard<'a, T>(sync::RwLockWriteGuard<'a, T>);
+pub struct RwLockWriteGuard<'a, T>(sync::RwLockWriteGuard<'a, T>, Alive);

impl<'a, T> ops::Deref for RwLockWriteGuard<'a, T> {
type Target = T;
```

show more ...


Revision tags: ice-v0.8.2, v0.5.1, ice-v0.8.1, v0.5.0, data-v0.5.0, dtls-v0.6.0, ice-v0.8.0, interceptor-v0.8.0, mdns-v0.5.0, media-v0.4.7, rtcp-v0.7.0, rtp-v0.6.7, sctp-v0.6.1, sdp-v0.5.2, srtp-v0.9.0, stun-v0.4.3, turn-v0.6.0, util-v0.6.0, test-tag
# ffe74184 23-Aug-2022 Martin Algesten <[email protected]>

Move all to top level


Revision tags: v0.4.0, v0.3.4, v0.3.3, v0.3.2, v0.3.1, v0.3.0, v0.2.1, v0.2.0, v0.1.3, v0.1.2, v0.1.1, v0.1.0, v0.1.0-beta.0.0.15, v0.1.0-beta.0.0.14
# d9e146c5 01-Nov-2020 Takayuki Maeda <[email protected]>

fix clippy warnings


# fcf38440 18-Sep-2020 Rain Liu <[email protected]>

add test_packetizer_abs_send_time


# 42d08b30 23-Aug-2019 rainliu <[email protected]>

add sequencer