Skip to content

Commit

Permalink
the server can now handle client timeouts
Browse files Browse the repository at this point in the history
  • Loading branch information
AnonymousBit0111 committed Nov 7, 2024
1 parent f16adf0 commit 6399270
Show file tree
Hide file tree
Showing 4 changed files with 132 additions and 87 deletions.
93 changes: 61 additions & 32 deletions src/bin/src/packet_handlers/login_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ use ferrumc_net::packets::incoming::login_acknowledged::LoginAcknowledgedEvent;
use ferrumc_net::packets::incoming::login_start::LoginStartEvent;
use ferrumc_net::packets::incoming::server_bound_known_packs::ServerBoundKnownPacksEvent;
use ferrumc_net::packets::outgoing::client_bound_known_packs::ClientBoundKnownPacksPacket;
use ferrumc_net::packets::outgoing::finish_configuration::FinishConfigurationPacket;
use ferrumc_net::packets::outgoing::game_event::GameEventPacket;
use ferrumc_net::packets::outgoing::keep_alive::{KeepAlive, KeepAlivePacket};
use ferrumc_net::packets::outgoing::keep_alive::OutgoingKeepAlivePacket;
use ferrumc_net::packets::outgoing::login_play::LoginPlayPacket;
use ferrumc_net::packets::outgoing::login_success::LoginSuccessPacket;
use ferrumc_net::packets::outgoing::registry_data::get_registry_packets;
Expand All @@ -18,7 +19,6 @@ use ferrumc_net::packets::outgoing::synchronize_player_position::SynchronizePlay
use ferrumc_net::GlobalState;
use ferrumc_net_codec::encode::NetEncodeOpts;
use tracing::{debug, trace};
use ferrumc_net::packets::outgoing::finish_configuration::FinishConfigurationPacket;

#[event_handler]
async fn handle_login_start(
Expand All @@ -31,19 +31,23 @@ async fn handle_login_start(
let username = login_start_event.login_start_packet.username.as_str();
debug!("Received login start from user with username {}", username);


// Add the player identity component to the ECS for the entity.
state.universe.add_component::<PlayerIdentity>(
login_start_event.conn_id,
PlayerIdentity::new(username.to_string(), uuid),
)?;

//Send a Login Success Response to further the login sequence
let mut writer = state
.universe
.get_mut::<StreamWriter>(login_start_event.conn_id)?;

writer.send_packet(&LoginSuccessPacket::new(uuid, username), &NetEncodeOpts::WithLength).await?;
writer
.send_packet(
&LoginSuccessPacket::new(uuid, username),
&NetEncodeOpts::WithLength,
)
.await?;

Ok(login_start_event)
}
Expand All @@ -62,15 +66,16 @@ async fn handle_login_acknowledged(

*connection_state = ConnectionState::Configuration;


// Send packets packet
let client_bound_known_packs = ClientBoundKnownPacksPacket::new();

let mut writer = state
.universe
.get_mut::<StreamWriter>(login_acknowledged_event.conn_id)?;

writer.send_packet(&client_bound_known_packs, &NetEncodeOpts::WithLength).await?;
writer
.send_packet(&client_bound_known_packs, &NetEncodeOpts::WithLength)
.await?;

Ok(login_acknowledged_event)
}
Expand All @@ -87,10 +92,17 @@ async fn handle_server_bound_known_packs(
.get_mut::<StreamWriter>(server_bound_known_packs_event.conn_id)?;

let registry_packets = get_registry_packets();
writer.send_packet(&registry_packets, &NetEncodeOpts::None).await?;

writer.send_packet(&FinishConfigurationPacket::new(), &NetEncodeOpts::WithLength).await?;

writer
.send_packet(&registry_packets, &NetEncodeOpts::None)
.await?;

writer
.send_packet(
&FinishConfigurationPacket::new(),
&NetEncodeOpts::WithLength,
)
.await?;

Ok(server_bound_known_packs_event)
}

Expand All @@ -103,34 +115,51 @@ async fn handle_ack_finish_configuration(

let conn_id = ack_finish_configuration_event.conn_id;

let mut conn_state = state
.universe
.get_mut::<ConnectionState>(conn_id)?;
let mut conn_state = state.universe.get_mut::<ConnectionState>(conn_id)?;

*conn_state = ConnectionState::Play;

let mut writer = state
.universe
.get_mut::<StreamWriter>(conn_id)?;

writer.send_packet(&LoginPlayPacket::new(conn_id), &NetEncodeOpts::WithLength).await?;
writer.send_packet(&SetDefaultSpawnPositionPacket::default(), &NetEncodeOpts::WithLength).await?;
writer.send_packet(&SynchronizePlayerPositionPacket::default(), &NetEncodeOpts::WithLength).await?;
writer.send_packet(&GameEventPacket::start_waiting_for_level_chunks(), &NetEncodeOpts::WithLength).await?;
let mut writer = state.universe.get_mut::<StreamWriter>(conn_id)?;

writer
.send_packet(&LoginPlayPacket::new(conn_id), &NetEncodeOpts::WithLength)
.await?;
writer
.send_packet(
&SetDefaultSpawnPositionPacket::default(),
&NetEncodeOpts::WithLength,
)
.await?;
writer
.send_packet(
&SynchronizePlayerPositionPacket::default(),
&NetEncodeOpts::WithLength,
)
.await?;
writer
.send_packet(
&GameEventPacket::start_waiting_for_level_chunks(),
&NetEncodeOpts::WithLength,
)
.await?;

send_keep_alive(conn_id, state, &mut writer).await?;


Ok(ack_finish_configuration_event)
}
async fn send_keep_alive(conn_id: usize, state: GlobalState, writer: &mut ComponentRefMut<'_, StreamWriter>) -> Result<(), NetError> {
let keep_alive_packet = KeepAlivePacket::default();
writer.send_packet(&keep_alive_packet, &NetEncodeOpts::WithLength).await?;

let id = keep_alive_packet.id;

state.universe.add_component::<KeepAlive>(conn_id, id)?;

async fn send_keep_alive(
conn_id: usize,
state: GlobalState,
writer: &mut ComponentRefMut<'_, StreamWriter>,
) -> Result<(), NetError> {
let keep_alive_packet = OutgoingKeepAlivePacket::default();
writer
.send_packet(&keep_alive_packet, &NetEncodeOpts::WithLength)
.await?;

state
.universe
.add_component::<OutgoingKeepAlivePacket>(conn_id, keep_alive_packet)?;

Ok(())
}
}
58 changes: 33 additions & 25 deletions src/bin/src/systems/keep_alive_system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ use crate::systems::definition::System;
use async_trait::async_trait;
use ferrumc_core::identity::player_identity::PlayerIdentity;
use ferrumc_net::connection::{ConnectionState, StreamWriter};
use ferrumc_net::packets::outgoing::keep_alive::{KeepAlive, KeepAlivePacket};
use ferrumc_net::packets::incoming::keep_alive::IncomingKeepAlivePacket;
use ferrumc_net::packets::outgoing::keep_alive::OutgoingKeepAlivePacket;
use ferrumc_net::utils::broadcast::{BroadcastOptions, BroadcastToAll};
use ferrumc_net::GlobalState;
use std::sync::atomic::{AtomicBool, Ordering};
Expand All @@ -20,6 +21,7 @@ impl KeepAliveSystem {
}
}
}
const FIFTEEN_SECONDS_MS: i64 = 15000; // 15 seconds in milliseconds

#[async_trait]
impl System for KeepAliveSystem {
Expand All @@ -29,11 +31,7 @@ impl System for KeepAliveSystem {
.duration_since(std::time::UNIX_EPOCH)
.expect("Time went backwards")
.as_millis() as i64;
loop {
if self.shutdown.load(Ordering::Relaxed) {
break;
}

while !self.shutdown.load(Ordering::Relaxed) {
let online_players = state.universe.query::<&PlayerIdentity>();

let current_time = std::time::SystemTime::now()
Expand All @@ -46,19 +44,24 @@ impl System for KeepAliveSystem {
last_time = current_time;
}

let fifteen_seconds_ms = 15000; // 15 seconds in milliseconds

let entities = state
.universe
.query::<(&mut StreamWriter, &ConnectionState, &KeepAlive)>()
.query::<(
&mut StreamWriter,
&ConnectionState,
&IncomingKeepAlivePacket,
)>()
.into_entities()
.into_iter()
.filter_map(|entity| {
let conn_state = state.universe.get::<ConnectionState>(entity).ok()?;
let keep_alive = state.universe.get_mut::<KeepAlive>(entity).ok()?;
let keep_alive = state
.universe
.get_mut::<IncomingKeepAlivePacket>(entity)
.ok()?;

if matches!(*conn_state, ConnectionState::Play)
&& (current_time - keep_alive.id) >= fifteen_seconds_ms
&& (current_time - keep_alive.id) >= FIFTEEN_SECONDS_MS
{
Some(entity)
} else {
Expand All @@ -68,24 +71,29 @@ impl System for KeepAliveSystem {
.collect::<Vec<_>>();
if !entities.is_empty() {
trace!("there are {:?} players to keep alive", entities.len());
}

let packet = KeepAlivePacket::default();
let packet = OutgoingKeepAlivePacket { id: current_time };

let broadcast_opts = BroadcastOptions::default()
.only(entities)
.with_sync_callback(move |entity, state| {
let Ok(mut keep_alive) = state.universe.get_mut::<KeepAlive>(entity) else {
warn!("Failed to get <KeepAlive> component for entity {}", entity);
return;
};
let broadcast_opts = BroadcastOptions::default()
.only(entities)
.with_sync_callback(move |entity, state| {
let Ok(mut outgoing_keep_alive) =
state.universe.get_mut::<OutgoingKeepAlivePacket>(entity)
else {
warn!(
"Failed to get <OutgoingKeepAlive> component for entity {}",
entity
);
return;
};

*keep_alive = KeepAlive::from(current_time);
});
*outgoing_keep_alive = OutgoingKeepAlivePacket { id: current_time };
});

if let Err(e) = state.broadcast(&packet, broadcast_opts).await {
error!("Error sending keep alive packet: {}", e);
};
if let Err(e) = state.broadcast(&packet, broadcast_opts).await {
error!("Error sending keep alive packet: {}", e);
};
}
// TODO, this should be configurable as some people may have bad network so the clients may end up disconnecting from the server moments before the keep alive is sent
tokio::time::sleep(tokio::time::Duration::from_secs(30)).await;
}
Expand Down
39 changes: 34 additions & 5 deletions src/lib/net/src/packets/incoming/keep_alive.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use crate::packets::outgoing::keep_alive::KeepAlive;
use crate::packets::outgoing::keep_alive::OutgoingKeepAlivePacket;
use crate::packets::IncomingPacket;
use crate::{NetResult, ServerState};
use ferrumc_ecs::components::storage::ComponentRefMut;
use ferrumc_ecs::errors::ECSError;
use ferrumc_macros::{packet, NetDecode};
use std::sync::Arc;
use tracing::debug;
use tracing::{debug, warn};

#[derive(NetDecode)]
#[packet(packet_id = 0x18, state = "play")]
Expand All @@ -13,15 +15,42 @@ pub struct IncomingKeepAlivePacket {

impl IncomingPacket for IncomingKeepAlivePacket {
async fn handle(self, conn_id: usize, state: Arc<ServerState>) -> NetResult<()> {
let mut last_keep_alive = state.universe.get_mut::<KeepAlive>(conn_id)?;
// TODO handle errors.
let last_keep_alive = state.universe.get_mut::<OutgoingKeepAlivePacket>(conn_id)?;

if self.id != last_keep_alive.id {
debug!(
"Invalid keep alive packet received from {:?} with id {:?} (expected {:?})",
"Invalid keep alive packet received from entity {:?} with id {:?} (expected {:?})",
conn_id, self.id, last_keep_alive.id
);
return NetResult::Err(crate::errors::NetError::Packet(
crate::errors::PacketError::InvalidState(0x18),
));
// TODO Kick player
}

let result = state.universe.get_mut::<IncomingKeepAlivePacket>(conn_id);

if result.is_err() {
let err = result.as_ref().err().unwrap();
if matches!(err, ECSError::ComponentTypeNotFound) {
state
.universe
.add_component(conn_id, IncomingKeepAlivePacket { id: self.id })?;
let mut last_received_keep_alive = state.universe.get_mut(conn_id)?;
*last_received_keep_alive = self;
} else {
warn!(
"Failed to get or create <IncomingKeepAlive> component: {:?}",
err
);
return Err(crate::errors::NetError::ECSError(result.err().unwrap()));
}
} else {
*last_keep_alive = KeepAlive::from(self.id);
let mut last_received_keep_alive: ComponentRefMut<'_, IncomingKeepAlivePacket> =
result.unwrap();

*last_received_keep_alive = self;
}

Ok(())
Expand Down
29 changes: 4 additions & 25 deletions src/lib/net/src/packets/outgoing/keep_alive.rs
Original file line number Diff line number Diff line change
@@ -1,39 +1,18 @@
use ferrumc_macros::{packet, NetEncode};
use std::io::Write;

#[derive(Debug, NetEncode)]
pub struct KeepAlive {
pub id: i64,
}

mod adapters {
impl From<i64> for super::KeepAlive {
fn from(id: i64) -> Self {
Self { id }
}
}
}

#[derive(NetEncode)]
#[packet(packet_id = 0x26)]
pub struct KeepAlivePacket {
pub id: KeepAlive,
pub struct OutgoingKeepAlivePacket {
pub id: i64,
}

impl Default for KeepAlivePacket {
impl Default for OutgoingKeepAlivePacket {
fn default() -> Self {
let current_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("Time went backwards?? LMAO")
.as_millis() as i64;
Self::new(current_ms)
}
}

impl KeepAlivePacket {
pub fn new(id: i64) -> Self {
Self {
id: KeepAlive::from(id),
}
Self { id: current_ms }
}
}

0 comments on commit 6399270

Please sign in to comment.