Skip to content

Commit

Permalink
feat: tunnel keep-alive messages
Browse files Browse the repository at this point in the history
  • Loading branch information
jacobtread committed Jul 2, 2024
1 parent 75329f7 commit a65fb08
Showing 1 changed file with 51 additions and 1 deletion.
52 changes: 51 additions & 1 deletion src/services/tunnel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use self::codec::{TunnelCodec, TunnelMessage};
use crate::utils::{hashing::IntHashMap, types::GameID};
use bytes::Bytes;
use futures_util::{Sink, Stream};
use hyper::upgrade::Upgraded;
use hyper_util::rt::TokioIo;
Expand All @@ -17,8 +18,12 @@ use std::{
Arc,
},
task::{ready, Context, Poll},
time::Duration,
};
use tokio::{
sync::mpsc,
time::{interval_at, Instant, Interval, MissedTickBehavior},
};
use tokio::sync::mpsc;
use tokio_util::codec::Framed;

use super::sessions::AssociationId;
Expand Down Expand Up @@ -240,6 +245,8 @@ pub struct Tunnel {
write_state: TunnelWriteState,
/// The service access
service: Arc<TunnelService>,
/// Interval for sending keep alive messages
keep_alive_interval: Interval,
}

impl Drop for Tunnel {
Expand Down Expand Up @@ -273,6 +280,9 @@ enum TunnelReadState {
}

impl Tunnel {
// Send keep-alive pings every 10s
const KEEP_ALIVE_DELAY: Duration = Duration::from_secs(10);

/// Starts a new tunnel on `io` using the tunnel `service`
///
/// ## Arguments
Expand All @@ -294,13 +304,20 @@ impl Tunnel {
.write()
.insert_tunnel(id, TunnelHandle { tx });

// Create the interval to track keep alive pings
let keep_alive_start = Instant::now() + Self::KEEP_ALIVE_DELAY;
let mut keep_alive_interval = interval_at(keep_alive_start, Self::KEEP_ALIVE_DELAY);

keep_alive_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);

// Spawn the tunnel task
tokio::spawn(Tunnel {
service,
id,
io,
rx,
write_state: Default::default(),
keep_alive_interval,
});

id
Expand Down Expand Up @@ -372,6 +389,11 @@ impl Tunnel {
return Poll::Ready(TunnelReadState::Stop);
};

// Ping messages can be ignored
if message.index == 255 {
return Poll::Ready(TunnelReadState::Continue);
}

// Get the path through the tunnel
let (target_handle, index) = match self.service.get_tunnel_route(self.id, message.index) {
Some(value) => value,
Expand Down Expand Up @@ -413,6 +435,27 @@ impl Future for Tunnel {
}
}

// Write a ping message at the interval if we aren't already sending a message
if this.keep_alive_interval.poll_tick(cx).is_ready() {
if let TunnelWriteState::Recv = this.write_state {
// Move to a writing state
this.write_state = TunnelWriteState::Write(Some(TunnelMessage {
index: 255,
message: Bytes::new(),
}));

// Poll the writer with the new message
if let Poll::Ready(next_state) = this.poll_write_state(cx) {
this.write_state = next_state;

// Tunnel has stopped
if let TunnelWriteState::Stop = this.write_state {
return Poll::Ready(());
}
}
}
}

Poll::Pending
}
}
Expand Down Expand Up @@ -443,6 +486,13 @@ mod codec {
//! Length: 16-bits. Determines the size in bytes of the payload that follows
//!
//! Payload: Variable length. The message bytes payload of `Length`
//!
//!
//! ## Keep alive
//!
//! The server will send keep-alive messages, these are in the same
//! format as the packet above. However, the index will always be 255
//! and the payload will be empty.
use bytes::{Buf, BufMut, Bytes};
use tokio_util::codec::{Decoder, Encoder};
Expand Down

0 comments on commit a65fb08

Please sign in to comment.