History log of /webrtc/data/src/data_channel/mod.rs (Results 1 – 7 of 7)
Revision (<<< Hide revision tags) (Show revision tags >>>) Date Author Comments
# 84b8594c 27-Feb-2023 Thomas Eizinger <[email protected]>

Remove `derive_builder` dependency


Revision tags: constraints-v0.1.0
# daaf05d1 02-Jan-2023 Moritz Borcherding <[email protected]>

sctp: limit the bytes in the PendingQueue by using a semaphore (#367)

As discussed in #360 the pending queue can grow indefinitely if the sender writes packets faster than the association is able to

sctp: limit the bytes in the PendingQueue by using a semaphore (#367)

As discussed in #360 the pending queue can grow indefinitely if the sender writes packets faster than the association is able to transmit them.

This PR solves this by enforcing a limit on the pending queue. This blocks the sender until enough space is free.

show more ...


Revision tags: interceptor-v0.8.2, rtcp-v0.7.2, mdns-v0.5.2, v0.6.0, interceptor-v0.8.1, media-v0.5.0, data-v0.6.0, sctp-v0.7.0, srtp-v0.9.1, rtcp-v0.7.1, rtp-v0.6.8, dtls-v0.7.0, ice-v0.9.0, turn-v0.6.1, stun-v0.4.4, mdns-v0.5.1, sdp-v0.5.3, util-v0.7.0
# 0acb5a49 15-Nov-2022 Anton Kaliaev <[email protected]>

[sctp] make `write` sync (#344)

There's no reason for it to be async because it just buffers the data in memory (actual IO is happening in a separate thread).


# 5b79f08a 14-Nov-2022 alexlapa <[email protected]>

remove unnecessary async in public API (#338)

> mind if i create a PR that makes all callback setters (e.g. RTCPeerConnection::on_ice_candidate()) non async? Currently callbacks are stored behind to

remove unnecessary async in public API (#338)

> mind if i create a PR that makes all callback setters (e.g. RTCPeerConnection::on_ice_candidate()) non async? Currently callbacks are stored behind tokio::Mutex'es, i believe that ArcSwap can be used here, so we can use atomic load/store. It wont affect performance in any way, just some quality of life improvements.

> All good with me. I'd like to get rid of tokio::Mutex, replacing it with sync variants, where that's possible

So i've refactored all public callback setters here.

Regarding removing `tokio::Mutex`es that you've mentioned, well, i've replaced some of those, but i don't want this PR to become even bigger, and it would be nice if you provide some feedback first. I've used both `ArcSwap` and `std::Mutex`, i guess its ok to use blocking mutex in simple cases, when you don't need to hold it, e.g. copy, store, swap, take one-liners. I will prepare follow-up PRs on this issue if this PR looks good to you.

And one more thing regarding the callbacks, maybe it makes sense to make them just dumb `Box<dyn Fn(_)>`? So `Box<dyn (FnMut(ConnectionState) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>>) + Send + Sync>` => `Box<dyn (Fn(ConnectionState)) + Send + Sync>`. Yeah, that might hurt users in some rare cases, but it would make everything simpler.

Been using this to play around with detecting deadlocks, didn't find anything though.

```diff
diff --git a/util/src/sync/mod.rs b/util/src/sync/mod.rs
index 6a101299..d5eb835d 100644
--- a/util/src/sync/mod.rs
+++ b/util/src/sync/mod.rs
@@ -1,4 +1,150 @@
-use std::{ops, sync};
+use std::backtrace::Backtrace;
+use std::sync::atomic::{AtomicBool, Ordering};
+use std::sync::Arc;
+use std::time::{Duration, Instant};
+use std::{fmt, ops, sync, thread};
+
+const MAX_AGE: Duration = Duration::from_millis(5000);
+const CHECK_INTERVAL: Duration = Duration::from_millis(20);
+static DEADLOCK_THREAD_STARTED: AtomicBool = AtomicBool::new(false);
+lazy_static! {
+ static ref TRACKED_LOCKS: Arc<sync::RwLock<Vec<TrackedLock>>> =
+ Arc::new(sync::RwLock::new(Vec::new()));
+}
+
+struct TrackedLock {
+ backtrace: Backtrace,
+ tracker: AliveTracker,
+ typ: Type,
+ reported: AtomicBool,
+}
+
+impl TrackedLock {
+ fn mark_reported(&self) -> bool {
+ self.reported
+ .compare_exchange(false, true, Ordering::Acquire, Ordering::Acquire)
+ .is_ok()
+ }
+}
+
+#[derive(Debug)]
+enum Type {
+ MutexLock,
+ RwReadLock,
+ RwWriteLock,
+}
+
+impl fmt::Display for Type {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ match self {
+ Type::MutexLock => write!(f, "MutexLock"),
+ Type::RwReadLock => write!(f, "RwReadLock"),
+ Type::RwWriteLock => write!(f, "RwWriteLock"),
+ }
+ }
+}
+
+struct Alive {
+ live: Arc<AtomicBool>,
+ born_at: Instant,
+}
+
+impl Alive {
+ fn new() -> Self {
+ Self {
+ live: Arc::new(AtomicBool::new(true)),
+ born_at: Instant::now(),
+ }
+ }
+
+ fn track(&self) -> AliveTracker {
+ AliveTracker {
+ live: self.live.clone(),
+ born_at: self.born_at.clone(),
+ }
+ }
+
+ fn mark_dead(&self) {
+ self.live.store(false, Ordering::Release);
+ }
+}
+
+impl Drop for Alive {
+ fn drop(&mut self) {
+ self.mark_dead();
+ }
+}
+
+struct AliveTracker {
+ live: Arc<AtomicBool>,
+ born_at: Instant,
+}
+
+impl AliveTracker {
+ fn is_dead(&self) -> bool {
+ !self.live.load(Ordering::Acquire)
+ }
+
+ fn age(&self) -> Option<Duration> {
+ if self.is_dead() {
+ None
+ } else {
+ Some(Instant::now() - self.born_at)
+ }
+ }
+}
+
+fn start_deadlock_thread() {
+ if DEADLOCK_THREAD_STARTED
+ .compare_exchange(false, true, Ordering::Acquire, Ordering::Acquire)
+ .is_err()
+ {
+ return;
+ }
+
+ thread::spawn(|| loop {
+ let lock = TRACKED_LOCKS.read().unwrap();
+ let mut needs_cleanup = false;
+
+ for tracked in lock.iter() {
+ let age = tracked.tracker.age();
+ match age {
+ Some(age) if age > MAX_AGE => {
+ if tracked.mark_reported() {
+ println!(
+ "Potential deadlock on {}. Backtrace:\n{}Held for: {}s",
+ tracked.typ,
+ tracked.backtrace,
+ age.as_secs_f64()
+ );
+ }
+ }
+ None => {
+ // Lock is dead
+ needs_cleanup = true;
+ }
+ _ => {}
+ }
+ }
+ drop(lock);
+
+ if needs_cleanup {
+ let mut write = TRACKED_LOCKS.write().unwrap();
+ write.retain(|tracked| !tracked.tracker.is_dead());
+ }
+ });
+}
+
+fn track(backtrace: Backtrace, alive: &Alive, typ: Type) {
+ start_deadlock_thread();
+ let mut write = TRACKED_LOCKS.write().unwrap();
+ write.push(TrackedLock {
+ backtrace,
+ tracker: alive.track(),
+ typ,
+ reported: AtomicBool::new(false),
+ });
+}

/// A synchronous mutual exclusion primitive useful for protecting shared data.
#[derive(Default, Debug)]
@@ -13,8 +159,10 @@ impl<T> Mutex<T> {
/// Acquires a mutex, blocking the current thread until it is able to do so.
pub fn lock(&self) -> MutexGuard<'_, T> {
let guard = self.0.lock().unwrap();
+ let alive = Alive::new();
+ track(Backtrace::capture(), &alive, Type::MutexLock);

- MutexGuard(guard)
+ MutexGuard(guard, alive)
}

/// Consumes this mutex, returning the underlying data.
@@ -25,7 +173,7 @@ impl<T> Mutex<T> {

/// An RAII implementation of a "scoped lock" of a mutex. When this structure is
/// dropped (falls out of scope), the lock will be unlocked.
-pub struct MutexGuard<'a, T>(sync::MutexGuard<'a, T>);
+pub struct MutexGuard<'a, T>(sync::MutexGuard<'a, T>, Alive);

impl<'a, T> ops::Deref for MutexGuard<'a, T> {
type Target = T;
@@ -55,22 +203,26 @@ impl<T> RwLock<T> {
/// until it can be acquired.
pub fn read(&self) -> RwLockReadGuard<'_, T> {
let guard = self.0.read().unwrap();
+ let alive = Alive::new();
+ track(Backtrace::capture(), &alive, Type::MutexLock);

- RwLockReadGuard(guard)
+ RwLockReadGuard(guard, alive)
}

/// Locks this rwlock with exclusive write access, blocking the current
/// thread until it can be acquired.
pub fn write(&self) -> RwLockWriteGuard<'_, T> {
let guard = self.0.write().unwrap();
+ let alive = Alive::new();
+ track(Backtrace::capture(), &alive, Type::MutexLock);

- RwLockWriteGuard(guard)
+ RwLockWriteGuard(guard, alive)
}
}

/// RAII structure used to release the shared read access of a lock when
/// dropped.
-pub struct RwLockReadGuard<'a, T>(sync::RwLockReadGuard<'a, T>);
+pub struct RwLockReadGuard<'a, T>(sync::RwLockReadGuard<'a, T>, Alive);

impl<'a, T> ops::Deref for RwLockReadGuard<'a, T> {
type Target = T;
@@ -82,7 +234,7 @@ impl<'a, T> ops::Deref for RwLockReadGuard<'a, T> {

/// RAII structure used to release the shared read access of a lock when
/// dropped.
-pub struct RwLockWriteGuard<'a, T>(sync::RwLockWriteGuard<'a, T>);
+pub struct RwLockWriteGuard<'a, T>(sync::RwLockWriteGuard<'a, T>, Alive);

impl<'a, T> ops::Deref for RwLockWriteGuard<'a, T> {
type Target = T;
```

show more ...


# 137f11f2 09-Nov-2022 Anton Kaliaev <[email protected]>

[PollStream/PollDataChannel] do not loose bytes (#341)

Returning Poll::Ready(Ok(n)) when write_fut is Some is incorrect since buf already contains new data and we should instead always return Poll::

[PollStream/PollDataChannel] do not loose bytes (#341)

Returning Poll::Ready(Ok(n)) when write_fut is Some is incorrect since buf already contains new data and we should instead always return Poll::Ready(Ok(buf.len())) and update write_fut.

In the future, if/when we make write synchronous, this whole block could be simplified to just contain a single call (1 line) to write. See #344

show more ...


# d50269a6 07-Nov-2022 Anton Kaliaev <[email protected]>

PollStream/PollDataChannel: flush before shutting down (#340)

* PollStream/PollDataChannel: flush before shutting down

"Invocation of a shutdown implies an invocation of flush. Once this
method

PollStream/PollDataChannel: flush before shutting down (#340)

* PollStream/PollDataChannel: flush before shutting down

"Invocation of a shutdown implies an invocation of flush. Once this
method returns Ready it implies that a flush successfully happened
before the shutdown happened. That is, callers don’t need to call flush
before calling shutdown. They can rely that by calling shutdown any
pending buffered data will be written out."

https://docs.rs/tokio/1.21.2/tokio/io/trait.AsyncWrite.html#tymethod.poll_shutdown

* format code

* fix clippy warning

* fix another clippy warning

* add changelog entries

show more ...


Revision tags: ice-v0.8.2, v0.5.1, ice-v0.8.1, v0.5.0, data-v0.5.0, dtls-v0.6.0, ice-v0.8.0, interceptor-v0.8.0, mdns-v0.5.0, media-v0.4.7, rtcp-v0.7.0, rtp-v0.6.7, sctp-v0.6.1, sdp-v0.5.2, srtp-v0.9.0, stun-v0.4.3, turn-v0.6.0, util-v0.6.0, test-tag
# ffe74184 23-Aug-2022 Martin Algesten <[email protected]>

Move all to top level