| 5b79f08a | 14-Nov-2022 |
alexlapa <[email protected]> |
remove unnecessary async in public API (#338)
> mind if i create a PR that makes all callback setters (e.g. RTCPeerConnection::on_ice_candidate()) non async? Currently callbacks are stored behind to
remove unnecessary async in public API (#338)
> mind if i create a PR that makes all callback setters (e.g. RTCPeerConnection::on_ice_candidate()) non async? Currently callbacks are stored behind tokio::Mutex'es, i believe that ArcSwap can be used here, so we can use atomic load/store. It wont affect performance in any way, just some quality of life improvements.
> All good with me. I'd like to get rid of tokio::Mutex, replacing it with sync variants, where that's possible
So i've refactored all public callback setters here.
Regarding removing `tokio::Mutex`es that you've mentioned, well, i've replaced some of those, but i don't want this PR to become even bigger, and it would be nice if you provide some feedback first. I've used both `ArcSwap` and `std::Mutex`, i guess its ok to use blocking mutex in simple cases, when you don't need to hold it, e.g. copy, store, swap, take one-liners. I will prepare follow-up PRs on this issue if this PR looks good to you.
And one more thing regarding the callbacks, maybe it makes sense to make them just dumb `Box<dyn Fn(_)>`? So `Box<dyn (FnMut(ConnectionState) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>>) + Send + Sync>` => `Box<dyn (Fn(ConnectionState)) + Send + Sync>`. Yeah, that might hurt users in some rare cases, but it would make everything simpler.
Been using this to play around with detecting deadlocks, didn't find anything though.
```diff
diff --git a/util/src/sync/mod.rs b/util/src/sync/mod.rs
index 6a101299..d5eb835d 100644
--- a/util/src/sync/mod.rs
+++ b/util/src/sync/mod.rs
@@ -1,4 +1,150 @@
-use std::{ops, sync};
+use std::backtrace::Backtrace;
+use std::sync::atomic::{AtomicBool, Ordering};
+use std::sync::Arc;
+use std::time::{Duration, Instant};
+use std::{fmt, ops, sync, thread};
+
+const MAX_AGE: Duration = Duration::from_millis(5000);
+const CHECK_INTERVAL: Duration = Duration::from_millis(20);
+static DEADLOCK_THREAD_STARTED: AtomicBool = AtomicBool::new(false);
+lazy_static! {
+ static ref TRACKED_LOCKS: Arc<sync::RwLock<Vec<TrackedLock>>> =
+ Arc::new(sync::RwLock::new(Vec::new()));
+}
+
+struct TrackedLock {
+ backtrace: Backtrace,
+ tracker: AliveTracker,
+ typ: Type,
+ reported: AtomicBool,
+}
+
+impl TrackedLock {
+ fn mark_reported(&self) -> bool {
+ self.reported
+ .compare_exchange(false, true, Ordering::Acquire, Ordering::Acquire)
+ .is_ok()
+ }
+}
+
+#[derive(Debug)]
+enum Type {
+ MutexLock,
+ RwReadLock,
+ RwWriteLock,
+}
+
+impl fmt::Display for Type {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ match self {
+ Type::MutexLock => write!(f, "MutexLock"),
+ Type::RwReadLock => write!(f, "RwReadLock"),
+ Type::RwWriteLock => write!(f, "RwWriteLock"),
+ }
+ }
+}
+
+struct Alive {
+ live: Arc<AtomicBool>,
+ born_at: Instant,
+}
+
+impl Alive {
+ fn new() -> Self {
+ Self {
+ live: Arc::new(AtomicBool::new(true)),
+ born_at: Instant::now(),
+ }
+ }
+
+ fn track(&self) -> AliveTracker {
+ AliveTracker {
+ live: self.live.clone(),
+ born_at: self.born_at.clone(),
+ }
+ }
+
+ fn mark_dead(&self) {
+ self.live.store(false, Ordering::Release);
+ }
+}
+
+impl Drop for Alive {
+ fn drop(&mut self) {
+ self.mark_dead();
+ }
+}
+
+struct AliveTracker {
+ live: Arc<AtomicBool>,
+ born_at: Instant,
+}
+
+impl AliveTracker {
+ fn is_dead(&self) -> bool {
+ !self.live.load(Ordering::Acquire)
+ }
+
+ fn age(&self) -> Option<Duration> {
+ if self.is_dead() {
+ None
+ } else {
+ Some(Instant::now() - self.born_at)
+ }
+ }
+}
+
+fn start_deadlock_thread() {
+ if DEADLOCK_THREAD_STARTED
+ .compare_exchange(false, true, Ordering::Acquire, Ordering::Acquire)
+ .is_err()
+ {
+ return;
+ }
+
+ thread::spawn(|| loop {
+ let lock = TRACKED_LOCKS.read().unwrap();
+ let mut needs_cleanup = false;
+
+ for tracked in lock.iter() {
+ let age = tracked.tracker.age();
+ match age {
+ Some(age) if age > MAX_AGE => {
+ if tracked.mark_reported() {
+ println!(
+ "Potential deadlock on {}. Backtrace:\n{}Held for: {}s",
+ tracked.typ,
+ tracked.backtrace,
+ age.as_secs_f64()
+ );
+ }
+ }
+ None => {
+ // Lock is dead
+ needs_cleanup = true;
+ }
+ _ => {}
+ }
+ }
+ drop(lock);
+
+ if needs_cleanup {
+ let mut write = TRACKED_LOCKS.write().unwrap();
+ write.retain(|tracked| !tracked.tracker.is_dead());
+ }
+ });
+}
+
+fn track(backtrace: Backtrace, alive: &Alive, typ: Type) {
+ start_deadlock_thread();
+ let mut write = TRACKED_LOCKS.write().unwrap();
+ write.push(TrackedLock {
+ backtrace,
+ tracker: alive.track(),
+ typ,
+ reported: AtomicBool::new(false),
+ });
+}
/// A synchronous mutual exclusion primitive useful for protecting shared data.
#[derive(Default, Debug)]
@@ -13,8 +159,10 @@ impl<T> Mutex<T> {
/// Acquires a mutex, blocking the current thread until it is able to do so.
pub fn lock(&self) -> MutexGuard<'_, T> {
let guard = self.0.lock().unwrap();
+ let alive = Alive::new();
+ track(Backtrace::capture(), &alive, Type::MutexLock);
- MutexGuard(guard)
+ MutexGuard(guard, alive)
}
/// Consumes this mutex, returning the underlying data.
@@ -25,7 +173,7 @@ impl<T> Mutex<T> {
/// An RAII implementation of a "scoped lock" of a mutex. When this structure is
/// dropped (falls out of scope), the lock will be unlocked.
-pub struct MutexGuard<'a, T>(sync::MutexGuard<'a, T>);
+pub struct MutexGuard<'a, T>(sync::MutexGuard<'a, T>, Alive);
impl<'a, T> ops::Deref for MutexGuard<'a, T> {
type Target = T;
@@ -55,22 +203,26 @@ impl<T> RwLock<T> {
/// until it can be acquired.
pub fn read(&self) -> RwLockReadGuard<'_, T> {
let guard = self.0.read().unwrap();
+ let alive = Alive::new();
+ track(Backtrace::capture(), &alive, Type::MutexLock);
- RwLockReadGuard(guard)
+ RwLockReadGuard(guard, alive)
}
/// Locks this rwlock with exclusive write access, blocking the current
/// thread until it can be acquired.
pub fn write(&self) -> RwLockWriteGuard<'_, T> {
let guard = self.0.write().unwrap();
+ let alive = Alive::new();
+ track(Backtrace::capture(), &alive, Type::MutexLock);
- RwLockWriteGuard(guard)
+ RwLockWriteGuard(guard, alive)
}
}
/// RAII structure used to release the shared read access of a lock when
/// dropped.
-pub struct RwLockReadGuard<'a, T>(sync::RwLockReadGuard<'a, T>);
+pub struct RwLockReadGuard<'a, T>(sync::RwLockReadGuard<'a, T>, Alive);
impl<'a, T> ops::Deref for RwLockReadGuard<'a, T> {
type Target = T;
@@ -82,7 +234,7 @@ impl<'a, T> ops::Deref for RwLockReadGuard<'a, T> {
/// RAII structure used to release the shared read access of a lock when
/// dropped.
-pub struct RwLockWriteGuard<'a, T>(sync::RwLockWriteGuard<'a, T>);
+pub struct RwLockWriteGuard<'a, T>(sync::RwLockWriteGuard<'a, T>, Alive);
impl<'a, T> ops::Deref for RwLockWriteGuard<'a, T> {
type Target = T;
```
show more ...
|