diff --git a/Cargo.lock b/Cargo.lock index a0593ea..2c71c76 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -813,12 +813,28 @@ dependencies = [ "futures-sink", ] +[[package]] +name = "futures-channel-preview" +version = "0.3.0-alpha.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d5e5f4df964fa9c1c2f8bddeb5c3611631cacd93baf810fc8bb2fb4b495c263a" +dependencies = [ + "futures-core-preview", + "futures-sink-preview", +] + [[package]] name = "futures-core" version = "0.3.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c09fd04b7e4073ac7156a9539b57a484a8ea920f79c7c675d05d289ab6110d3" +[[package]] +name = "futures-core-preview" +version = "0.3.0-alpha.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b35b6263fb1ef523c3056565fa67b1d16f0a8604ff12b11b08c25f28a734c60a" + [[package]] name = "futures-executor" version = "0.3.21" @@ -830,12 +846,29 @@ dependencies = [ "futures-util", ] +[[package]] +name = "futures-executor-preview" +version = "0.3.0-alpha.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75236e88bd9fe88e5e8bfcd175b665d0528fe03ca4c5207fabc028c8f9d93e98" +dependencies = [ + "futures-core-preview", + "futures-util-preview", + "num_cpus", +] + [[package]] name = "futures-io" version = "0.3.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fc4045962a5a5e935ee2fdedaa4e08284547402885ab326734432bed5d12966b" +[[package]] +name = "futures-io-preview" +version = "0.3.0-alpha.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f4914ae450db1921a56c91bde97a27846287d062087d4a652efc09bb3a01ebda" + [[package]] name = "futures-macro" version = "0.3.21" @@ -847,18 +880,48 @@ dependencies = [ "syn 1.0.98", ] +[[package]] +name = "futures-preview" +version = "0.3.0-alpha.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b1dce2a0267ada5c6ff75a8ba864b4e679a9e2aa44262af7a3b5516d530d76e" +dependencies = [ + "futures-channel-preview", + "futures-core-preview", + "futures-executor-preview", + "futures-io-preview", + "futures-sink-preview", + "futures-util-preview", +] + [[package]] name = "futures-sink" version = "0.3.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "21163e139fa306126e6eedaf49ecdb4588f939600f0b1e770f4205ee4b7fa868" +[[package]] +name = "futures-sink-preview" +version = "0.3.0-alpha.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "86f148ef6b69f75bb610d4f9a2336d4fc88c4b5b67129d1a340dd0fd362efeec" + [[package]] name = "futures-task" version = "0.3.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "57c66a976bf5909d801bbef33416c41372779507e7a6b3a5e25e4749c58f776a" +[[package]] +name = "futures-timer" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f9eb554aa23143abc64ec4d0016f038caf53bb7cbc3d91490835c54edc96550" +dependencies = [ + "futures-preview", + "pin-utils", +] + [[package]] name = "futures-util" version = "0.3.21" @@ -877,6 +940,21 @@ dependencies = [ "slab", ] +[[package]] +name = "futures-util-preview" +version = "0.3.0-alpha.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5ce968633c17e5f97936bd2797b6e38fb56cf16a7422319f7ec2e30d3c470e8d" +dependencies = [ + "futures-channel-preview", + "futures-core-preview", + "futures-io-preview", + "futures-sink-preview", + "memchr", + "pin-utils", + "slab", +] + [[package]] name = "generic-array" version = "0.14.7" @@ -1089,6 +1167,16 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "112c678d4050afce233f4f2852bb2eb519230b3cf12f33585275537d7e41578d" +[[package]] +name = "jittr" +version = "0.2.0" +source = "git+https://github.com/nemosupremo/jittr?rev=e0953a5#e0953a5f73ac07308ef54929a9db9da423e6aa01" +dependencies = [ + "futures", + "futures-timer", + "log", +] + [[package]] name = "js-sys" version = "0.3.58" @@ -1726,6 +1814,7 @@ dependencies = [ "h264-reader", "hex", "http-auth", + "jittr", "log", "mylog", "once_cell", diff --git a/Cargo.toml b/Cargo.toml index 626b93b..0ec9cbb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,6 +29,7 @@ futures = "0.3.14" h264-reader = "0.7.0" hex = "0.4.3" http-auth = "0.1.2" +jittr = { git = "https://github.com/nemosupremo/jittr", rev = "e0953a5" } log = "0.4.8" once_cell = "1.7.2" pin-project = "1.0.7" @@ -46,7 +47,12 @@ url = "2.2.1" [dev-dependencies] criterion = { version = "0.5.0", features = ["async_tokio"] } mylog = { git = "https://github.com/scottlamb/mylog" } -tokio = { version = "1.5.0", features = ["io-util", "macros", "rt-multi-thread", "test-util"] } +tokio = { version = "1.5.0", features = [ + "io-util", + "macros", + "rt-multi-thread", + "test-util", +] } [profile.bench] debug = true diff --git a/src/client/mod.rs b/src/client/mod.rs index 397c495..7092f33 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -1173,6 +1173,9 @@ struct SessionInner { /// Round-robining between them rather than always starting at 0 should /// prevent one stream from starving the others. udp_next_poll_i: usize, + + jitter: jittr::JitterBuffer, + jitter_sleep: Pin>, } #[derive(Copy, Clone)] @@ -1536,6 +1539,8 @@ impl Session { keepalive_timer: None, flags: 0, udp_next_poll_i: 0, + jitter: Default::default(), + jitter_sleep: Box::pin(tokio::time::sleep(std::time::Duration::from_nanos(1))), }), Described { sdp }, )) @@ -2391,23 +2396,12 @@ impl Session { match r { Ok(()) => { let msg = Bytes::copy_from_slice(buf.filled()); - let pkt_ctx = crate::PacketContext(crate::PacketContextInner::Udp { - received_wall: when, - }); - match rtp_handler.rtp( - inner.options, - stream_ctx, - inner.presentation.tool.as_ref(), - conn_ctx, - &pkt_ctx, - timeline, - i, - msg, - ) { - Ok(Some(p)) => return Poll::Ready(Some(Ok(p))), - Ok(None) => buf.clear(), - Err(e) => return Poll::Ready(Some(Err(e))), - } + let (raw, _payload_range) = match crate::rtp::RawPacket::new(msg) { + Ok(x) => x, + Err(_err) => todo!("Corrupt packet"), + }; + inner.jitter.push((i, raw).into()); + buf.clear(); } Err(source) if source.kind() == io::ErrorKind::ConnectionRefused => { // See comment above @@ -2581,6 +2575,65 @@ impl futures::Stream for Session { } } + loop { + let inner = self.0.as_mut().project(); + let packet = if inner.jitter.lossless_packets_buffered() > 0 { + inner.jitter.pop().unwrap() + } else { + if matches!(inner.jitter_sleep.as_mut().poll(cx), Poll::Ready(())) { + match inner.jitter.pop_skip(true) { + Some(p) => p, + None => break, + } + } else { + break; + } + }; + let sid = packet.0; + let s = &mut inner.presentation.streams[sid]; + let (timeline, rtp_handler, stream_ctx, _udp_sockets) = match &mut s.state { + StreamState::Playing { + timeline, + rtp_handler, + ctx, + udp_sockets: Some(udp_sockets), + .. + } => (timeline, rtp_handler, ctx, udp_sockets), + _ => break, + }; + + let when = crate::WallTime::now(); + let pkt_ctx = crate::PacketContext(crate::PacketContextInner::Udp { + received_wall: when, + }); + let conn_ctx = inner + .conn + .as_ref() + .ok_or_else(|| wrap!(ErrorInt::FailedPrecondition("no connection".into())))? + .inner + .ctx(); + let pkt = rtp_handler.rtp( + inner.options, + &stream_ctx, + inner.presentation.tool.as_ref(), + conn_ctx, + &pkt_ctx, + timeline, + sid, + packet.into_data(), + ); + + inner + .jitter_sleep + .as_mut() + .reset(tokio::time::Instant::now() + std::time::Duration::from_millis(200)); + match pkt { + Err(e) => return Poll::Ready(Some(Err(e))), + Ok(Some(pkt)) => return Poll::Ready(Some(Ok(pkt))), + Ok(None) => continue, + } + } + // Then check if it's time for a new keepalive. // Note: in production keepalive_timer is always Some. Tests may disable it. if let Some(t) = self.0.keepalive_timer.as_mut() { diff --git a/src/rtp.rs b/src/rtp.rs index 6ddefe7..8e44ad0 100644 --- a/src/rtp.rs +++ b/src/rtp.rs @@ -191,6 +191,32 @@ impl RawPacket { } } +pub struct RawPacketCtx(pub usize, RawPacket); + +impl RawPacketCtx { + pub fn into_data(self) -> Bytes { + (self.1).0 + } +} + +impl jittr::Packet for RawPacketCtx { + fn sequence_number(&self) -> u16 { + self.1.sequence_number() + } +} + +impl Clone for RawPacketCtx { + fn clone(&self) -> Self { + Self(self.0, RawPacket((self.1).0.clone())) + } +} + +impl From<(usize, RawPacket)> for RawPacketCtx { + fn from(value: (usize, RawPacket)) -> Self { + Self(value.0, value.1) + } +} + #[derive(Debug)] #[doc(hidden)] pub struct RawPacketError {