Skip to content

Commit

Permalink
Add ping interval to prevent client disconnects
Browse files Browse the repository at this point in the history
  • Loading branch information
rudyfraser committed Sep 24, 2024
1 parent dd21792 commit 43f1057
Showing 1 changed file with 20 additions and 0 deletions.
20 changes: 20 additions & 0 deletions rsky-pds/src/apis/com/atproto/sync/subscribe_repos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use rsky_lexicon::com::atproto::sync::{
use serde_json::json;
use std::time::SystemTime;
use ws::Message;
use tokio::time::{interval, Duration as TokioDuration};

fn get_backfill_limit(ms: u64) -> String {
let system_time = SystemTime::now();
Expand Down Expand Up @@ -117,6 +118,10 @@ pub async fn subscribe_repos<'a>(
let event_stream = outbox.events(outbox_cursor).await;
pin_mut!(ws);
pin_mut!(event_stream);

// Initialize the ping interval
let mut ping_interval = interval(TokioDuration::from_secs(30));

loop {
select! {
evt = event_stream.next() => {
Expand Down Expand Up @@ -286,6 +291,16 @@ pub async fn subscribe_repos<'a>(
};
break;
},
ws::Message::Ping(payload) => {
// Respond to Ping with Pong
println!("Received Ping message");
let pong_message = ws::Message::Pong(payload);
yield pong_message;
},
ws::Message::Pong(_) => {
// Received Pong, can log or ignore
println!("Received Pong message");
},
_ => {
println!("Received other message: {:?}", message);
}
Expand All @@ -301,6 +316,11 @@ pub async fn subscribe_repos<'a>(
}
}
},
// Add the ping interval tick arm
_ = ping_interval.tick() => {
// Send a Ping message to the client
yield ws::Message::Ping(vec![]);
},
_ = &mut shutdown => break
}
}
Expand Down

0 comments on commit 43f1057

Please sign in to comment.