Skip to content

Commit

Permalink
Add packets from UDP to jittrbuffer in order to handle unordered packets
Browse files Browse the repository at this point in the history
  • Loading branch information
nemosupremo committed Oct 28, 2024
1 parent c4c3806 commit 2512089
Show file tree
Hide file tree
Showing 4 changed files with 192 additions and 18 deletions.
89 changes: 89 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 7 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ futures = "0.3.14"
h264-reader = "0.7.0"
hex = "0.4.3"
http-auth = "0.1.2"
jittr = { git = "https://github.com/nemosupremo/jittr", rev = "e0953a5" }
log = "0.4.8"
once_cell = "1.7.2"
pin-project = "1.0.7"
Expand All @@ -46,7 +47,12 @@ url = "2.2.1"
[dev-dependencies]
criterion = { version = "0.5.0", features = ["async_tokio"] }
mylog = { git = "https://github.com/scottlamb/mylog" }
tokio = { version = "1.5.0", features = ["io-util", "macros", "rt-multi-thread", "test-util"] }
tokio = { version = "1.5.0", features = [
"io-util",
"macros",
"rt-multi-thread",
"test-util",
] }

[profile.bench]
debug = true
Expand Down
87 changes: 70 additions & 17 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1173,6 +1173,9 @@ struct SessionInner {
/// Round-robining between them rather than always starting at 0 should
/// prevent one stream from starving the others.
udp_next_poll_i: usize,

jitter: jittr::JitterBuffer<crate::rtp::RawPacketCtx, 16>,
jitter_sleep: Pin<Box<tokio::time::Sleep>>,
}

#[derive(Copy, Clone)]
Expand Down Expand Up @@ -1536,6 +1539,8 @@ impl Session<Described> {
keepalive_timer: None,
flags: 0,
udp_next_poll_i: 0,
jitter: Default::default(),
jitter_sleep: Box::pin(tokio::time::sleep(std::time::Duration::from_nanos(1))),
}),
Described { sdp },
))
Expand Down Expand Up @@ -2391,23 +2396,12 @@ impl Session<Playing> {
match r {
Ok(()) => {
let msg = Bytes::copy_from_slice(buf.filled());
let pkt_ctx = crate::PacketContext(crate::PacketContextInner::Udp {
received_wall: when,
});
match rtp_handler.rtp(
inner.options,
stream_ctx,
inner.presentation.tool.as_ref(),
conn_ctx,
&pkt_ctx,
timeline,
i,
msg,
) {
Ok(Some(p)) => return Poll::Ready(Some(Ok(p))),
Ok(None) => buf.clear(),
Err(e) => return Poll::Ready(Some(Err(e))),
}
let (raw, _payload_range) = match crate::rtp::RawPacket::new(msg) {
Ok(x) => x,
Err(_err) => todo!("Corrupt packet"),
};
inner.jitter.push((i, raw).into());
buf.clear();
}
Err(source) if source.kind() == io::ErrorKind::ConnectionRefused => {
// See comment above
Expand Down Expand Up @@ -2581,6 +2575,65 @@ impl futures::Stream for Session<Playing> {
}
}

loop {
let inner = self.0.as_mut().project();
let packet = if inner.jitter.lossless_packets_buffered() > 0 {
inner.jitter.pop().unwrap()
} else {
if matches!(inner.jitter_sleep.as_mut().poll(cx), Poll::Ready(())) {
match inner.jitter.pop_skip(true) {
Some(p) => p,
None => break,
}
} else {
break;
}
};
let sid = packet.0;
let s = &mut inner.presentation.streams[sid];
let (timeline, rtp_handler, stream_ctx, _udp_sockets) = match &mut s.state {
StreamState::Playing {
timeline,
rtp_handler,
ctx,
udp_sockets: Some(udp_sockets),
..
} => (timeline, rtp_handler, ctx, udp_sockets),
_ => break,
};

let when = crate::WallTime::now();
let pkt_ctx = crate::PacketContext(crate::PacketContextInner::Udp {
received_wall: when,
});
let conn_ctx = inner
.conn
.as_ref()
.ok_or_else(|| wrap!(ErrorInt::FailedPrecondition("no connection".into())))?
.inner
.ctx();
let pkt = rtp_handler.rtp(
inner.options,
&stream_ctx,
inner.presentation.tool.as_ref(),
conn_ctx,
&pkt_ctx,
timeline,
sid,
packet.into_data(),
);

inner
.jitter_sleep
.as_mut()
.reset(tokio::time::Instant::now() + std::time::Duration::from_millis(200));
match pkt {
Err(e) => return Poll::Ready(Some(Err(e))),
Ok(Some(pkt)) => return Poll::Ready(Some(Ok(pkt))),
Ok(None) => continue,
}
}

// Then check if it's time for a new keepalive.
// Note: in production keepalive_timer is always Some. Tests may disable it.
if let Some(t) = self.0.keepalive_timer.as_mut() {
Expand Down
26 changes: 26 additions & 0 deletions src/rtp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,32 @@ impl RawPacket {
}
}

pub struct RawPacketCtx(pub usize, RawPacket);

impl RawPacketCtx {
pub fn into_data(self) -> Bytes {
(self.1).0
}
}

impl jittr::Packet for RawPacketCtx {
fn sequence_number(&self) -> u16 {
self.1.sequence_number()
}
}

impl Clone for RawPacketCtx {
fn clone(&self) -> Self {
Self(self.0, RawPacket((self.1).0.clone()))
}
}

impl From<(usize, RawPacket)> for RawPacketCtx {
fn from(value: (usize, RawPacket)) -> Self {
Self(value.0, value.1)
}
}

#[derive(Debug)]
#[doc(hidden)]
pub struct RawPacketError {
Expand Down

0 comments on commit 2512089

Please sign in to comment.