From 6399270e47c7e64ac5f73acdf15cea1576601cb1 Mon Sep 17 00:00:00 2001 From: AnonymousBit <68566858+AnonymousBit0111@users.noreply.github.com> Date: Thu, 7 Nov 2024 18:49:54 +0300 Subject: [PATCH] the server can now handle client timeouts --- src/bin/src/packet_handlers/login_process.rs | 93 ++++++++++++------- src/bin/src/systems/keep_alive_system.rs | 58 +++++++----- .../net/src/packets/incoming/keep_alive.rs | 39 +++++++- .../net/src/packets/outgoing/keep_alive.rs | 29 +----- 4 files changed, 132 insertions(+), 87 deletions(-) diff --git a/src/bin/src/packet_handlers/login_process.rs b/src/bin/src/packet_handlers/login_process.rs index d44638fa..437c3fcb 100644 --- a/src/bin/src/packet_handlers/login_process.rs +++ b/src/bin/src/packet_handlers/login_process.rs @@ -8,8 +8,9 @@ use ferrumc_net::packets::incoming::login_acknowledged::LoginAcknowledgedEvent; use ferrumc_net::packets::incoming::login_start::LoginStartEvent; use ferrumc_net::packets::incoming::server_bound_known_packs::ServerBoundKnownPacksEvent; use ferrumc_net::packets::outgoing::client_bound_known_packs::ClientBoundKnownPacksPacket; +use ferrumc_net::packets::outgoing::finish_configuration::FinishConfigurationPacket; use ferrumc_net::packets::outgoing::game_event::GameEventPacket; -use ferrumc_net::packets::outgoing::keep_alive::{KeepAlive, KeepAlivePacket}; +use ferrumc_net::packets::outgoing::keep_alive::OutgoingKeepAlivePacket; use ferrumc_net::packets::outgoing::login_play::LoginPlayPacket; use ferrumc_net::packets::outgoing::login_success::LoginSuccessPacket; use ferrumc_net::packets::outgoing::registry_data::get_registry_packets; @@ -18,7 +19,6 @@ use ferrumc_net::packets::outgoing::synchronize_player_position::SynchronizePlay use ferrumc_net::GlobalState; use ferrumc_net_codec::encode::NetEncodeOpts; use tracing::{debug, trace}; -use ferrumc_net::packets::outgoing::finish_configuration::FinishConfigurationPacket; #[event_handler] async fn handle_login_start( @@ -31,19 +31,23 @@ async fn handle_login_start( let username = login_start_event.login_start_packet.username.as_str(); debug!("Received login start from user with username {}", username); - // Add the player identity component to the ECS for the entity. state.universe.add_component::( login_start_event.conn_id, PlayerIdentity::new(username.to_string(), uuid), )?; - + //Send a Login Success Response to further the login sequence let mut writer = state .universe .get_mut::(login_start_event.conn_id)?; - writer.send_packet(&LoginSuccessPacket::new(uuid, username), &NetEncodeOpts::WithLength).await?; + writer + .send_packet( + &LoginSuccessPacket::new(uuid, username), + &NetEncodeOpts::WithLength, + ) + .await?; Ok(login_start_event) } @@ -62,7 +66,6 @@ async fn handle_login_acknowledged( *connection_state = ConnectionState::Configuration; - // Send packets packet let client_bound_known_packs = ClientBoundKnownPacksPacket::new(); @@ -70,7 +73,9 @@ async fn handle_login_acknowledged( .universe .get_mut::(login_acknowledged_event.conn_id)?; - writer.send_packet(&client_bound_known_packs, &NetEncodeOpts::WithLength).await?; + writer + .send_packet(&client_bound_known_packs, &NetEncodeOpts::WithLength) + .await?; Ok(login_acknowledged_event) } @@ -87,10 +92,17 @@ async fn handle_server_bound_known_packs( .get_mut::(server_bound_known_packs_event.conn_id)?; let registry_packets = get_registry_packets(); - writer.send_packet(®istry_packets, &NetEncodeOpts::None).await?; - - writer.send_packet(&FinishConfigurationPacket::new(), &NetEncodeOpts::WithLength).await?; - + writer + .send_packet(®istry_packets, &NetEncodeOpts::None) + .await?; + + writer + .send_packet( + &FinishConfigurationPacket::new(), + &NetEncodeOpts::WithLength, + ) + .await?; + Ok(server_bound_known_packs_event) } @@ -103,34 +115,51 @@ async fn handle_ack_finish_configuration( let conn_id = ack_finish_configuration_event.conn_id; - let mut conn_state = state - .universe - .get_mut::(conn_id)?; + let mut conn_state = state.universe.get_mut::(conn_id)?; *conn_state = ConnectionState::Play; - let mut writer = state - .universe - .get_mut::(conn_id)?; - - writer.send_packet(&LoginPlayPacket::new(conn_id), &NetEncodeOpts::WithLength).await?; - writer.send_packet(&SetDefaultSpawnPositionPacket::default(), &NetEncodeOpts::WithLength).await?; - writer.send_packet(&SynchronizePlayerPositionPacket::default(), &NetEncodeOpts::WithLength).await?; - writer.send_packet(&GameEventPacket::start_waiting_for_level_chunks(), &NetEncodeOpts::WithLength).await?; + let mut writer = state.universe.get_mut::(conn_id)?; + + writer + .send_packet(&LoginPlayPacket::new(conn_id), &NetEncodeOpts::WithLength) + .await?; + writer + .send_packet( + &SetDefaultSpawnPositionPacket::default(), + &NetEncodeOpts::WithLength, + ) + .await?; + writer + .send_packet( + &SynchronizePlayerPositionPacket::default(), + &NetEncodeOpts::WithLength, + ) + .await?; + writer + .send_packet( + &GameEventPacket::start_waiting_for_level_chunks(), + &NetEncodeOpts::WithLength, + ) + .await?; send_keep_alive(conn_id, state, &mut writer).await?; - Ok(ack_finish_configuration_event) } -async fn send_keep_alive(conn_id: usize, state: GlobalState, writer: &mut ComponentRefMut<'_, StreamWriter>) -> Result<(), NetError> { - let keep_alive_packet = KeepAlivePacket::default(); - writer.send_packet(&keep_alive_packet, &NetEncodeOpts::WithLength).await?; - - let id = keep_alive_packet.id; - - state.universe.add_component::(conn_id, id)?; - +async fn send_keep_alive( + conn_id: usize, + state: GlobalState, + writer: &mut ComponentRefMut<'_, StreamWriter>, +) -> Result<(), NetError> { + let keep_alive_packet = OutgoingKeepAlivePacket::default(); + writer + .send_packet(&keep_alive_packet, &NetEncodeOpts::WithLength) + .await?; + + state + .universe + .add_component::(conn_id, keep_alive_packet)?; Ok(()) -} \ No newline at end of file +} diff --git a/src/bin/src/systems/keep_alive_system.rs b/src/bin/src/systems/keep_alive_system.rs index 18eafdf9..4183a1b8 100644 --- a/src/bin/src/systems/keep_alive_system.rs +++ b/src/bin/src/systems/keep_alive_system.rs @@ -2,7 +2,8 @@ use crate::systems::definition::System; use async_trait::async_trait; use ferrumc_core::identity::player_identity::PlayerIdentity; use ferrumc_net::connection::{ConnectionState, StreamWriter}; -use ferrumc_net::packets::outgoing::keep_alive::{KeepAlive, KeepAlivePacket}; +use ferrumc_net::packets::incoming::keep_alive::IncomingKeepAlivePacket; +use ferrumc_net::packets::outgoing::keep_alive::OutgoingKeepAlivePacket; use ferrumc_net::utils::broadcast::{BroadcastOptions, BroadcastToAll}; use ferrumc_net::GlobalState; use std::sync::atomic::{AtomicBool, Ordering}; @@ -20,6 +21,7 @@ impl KeepAliveSystem { } } } +const FIFTEEN_SECONDS_MS: i64 = 15000; // 15 seconds in milliseconds #[async_trait] impl System for KeepAliveSystem { @@ -29,11 +31,7 @@ impl System for KeepAliveSystem { .duration_since(std::time::UNIX_EPOCH) .expect("Time went backwards") .as_millis() as i64; - loop { - if self.shutdown.load(Ordering::Relaxed) { - break; - } - + while !self.shutdown.load(Ordering::Relaxed) { let online_players = state.universe.query::<&PlayerIdentity>(); let current_time = std::time::SystemTime::now() @@ -46,19 +44,24 @@ impl System for KeepAliveSystem { last_time = current_time; } - let fifteen_seconds_ms = 15000; // 15 seconds in milliseconds - let entities = state .universe - .query::<(&mut StreamWriter, &ConnectionState, &KeepAlive)>() + .query::<( + &mut StreamWriter, + &ConnectionState, + &IncomingKeepAlivePacket, + )>() .into_entities() .into_iter() .filter_map(|entity| { let conn_state = state.universe.get::(entity).ok()?; - let keep_alive = state.universe.get_mut::(entity).ok()?; + let keep_alive = state + .universe + .get_mut::(entity) + .ok()?; if matches!(*conn_state, ConnectionState::Play) - && (current_time - keep_alive.id) >= fifteen_seconds_ms + && (current_time - keep_alive.id) >= FIFTEEN_SECONDS_MS { Some(entity) } else { @@ -68,24 +71,29 @@ impl System for KeepAliveSystem { .collect::>(); if !entities.is_empty() { trace!("there are {:?} players to keep alive", entities.len()); - } - let packet = KeepAlivePacket::default(); + let packet = OutgoingKeepAlivePacket { id: current_time }; - let broadcast_opts = BroadcastOptions::default() - .only(entities) - .with_sync_callback(move |entity, state| { - let Ok(mut keep_alive) = state.universe.get_mut::(entity) else { - warn!("Failed to get component for entity {}", entity); - return; - }; + let broadcast_opts = BroadcastOptions::default() + .only(entities) + .with_sync_callback(move |entity, state| { + let Ok(mut outgoing_keep_alive) = + state.universe.get_mut::(entity) + else { + warn!( + "Failed to get component for entity {}", + entity + ); + return; + }; - *keep_alive = KeepAlive::from(current_time); - }); + *outgoing_keep_alive = OutgoingKeepAlivePacket { id: current_time }; + }); - if let Err(e) = state.broadcast(&packet, broadcast_opts).await { - error!("Error sending keep alive packet: {}", e); - }; + if let Err(e) = state.broadcast(&packet, broadcast_opts).await { + error!("Error sending keep alive packet: {}", e); + }; + } // TODO, this should be configurable as some people may have bad network so the clients may end up disconnecting from the server moments before the keep alive is sent tokio::time::sleep(tokio::time::Duration::from_secs(30)).await; } diff --git a/src/lib/net/src/packets/incoming/keep_alive.rs b/src/lib/net/src/packets/incoming/keep_alive.rs index a68c3e56..fb541b2b 100644 --- a/src/lib/net/src/packets/incoming/keep_alive.rs +++ b/src/lib/net/src/packets/incoming/keep_alive.rs @@ -1,9 +1,11 @@ -use crate::packets::outgoing::keep_alive::KeepAlive; +use crate::packets::outgoing::keep_alive::OutgoingKeepAlivePacket; use crate::packets::IncomingPacket; use crate::{NetResult, ServerState}; +use ferrumc_ecs::components::storage::ComponentRefMut; +use ferrumc_ecs::errors::ECSError; use ferrumc_macros::{packet, NetDecode}; use std::sync::Arc; -use tracing::debug; +use tracing::{debug, warn}; #[derive(NetDecode)] #[packet(packet_id = 0x18, state = "play")] @@ -13,15 +15,42 @@ pub struct IncomingKeepAlivePacket { impl IncomingPacket for IncomingKeepAlivePacket { async fn handle(self, conn_id: usize, state: Arc) -> NetResult<()> { - let mut last_keep_alive = state.universe.get_mut::(conn_id)?; + // TODO handle errors. + let last_keep_alive = state.universe.get_mut::(conn_id)?; + if self.id != last_keep_alive.id { debug!( - "Invalid keep alive packet received from {:?} with id {:?} (expected {:?})", + "Invalid keep alive packet received from entity {:?} with id {:?} (expected {:?})", conn_id, self.id, last_keep_alive.id ); + return NetResult::Err(crate::errors::NetError::Packet( + crate::errors::PacketError::InvalidState(0x18), + )); // TODO Kick player + } + + let result = state.universe.get_mut::(conn_id); + + if result.is_err() { + let err = result.as_ref().err().unwrap(); + if matches!(err, ECSError::ComponentTypeNotFound) { + state + .universe + .add_component(conn_id, IncomingKeepAlivePacket { id: self.id })?; + let mut last_received_keep_alive = state.universe.get_mut(conn_id)?; + *last_received_keep_alive = self; + } else { + warn!( + "Failed to get or create component: {:?}", + err + ); + return Err(crate::errors::NetError::ECSError(result.err().unwrap())); + } } else { - *last_keep_alive = KeepAlive::from(self.id); + let mut last_received_keep_alive: ComponentRefMut<'_, IncomingKeepAlivePacket> = + result.unwrap(); + + *last_received_keep_alive = self; } Ok(()) diff --git a/src/lib/net/src/packets/outgoing/keep_alive.rs b/src/lib/net/src/packets/outgoing/keep_alive.rs index 23a36b77..41b0aa81 100644 --- a/src/lib/net/src/packets/outgoing/keep_alive.rs +++ b/src/lib/net/src/packets/outgoing/keep_alive.rs @@ -1,39 +1,18 @@ use ferrumc_macros::{packet, NetEncode}; use std::io::Write; -#[derive(Debug, NetEncode)] -pub struct KeepAlive { - pub id: i64, -} - -mod adapters { - impl From for super::KeepAlive { - fn from(id: i64) -> Self { - Self { id } - } - } -} - #[derive(NetEncode)] #[packet(packet_id = 0x26)] -pub struct KeepAlivePacket { - pub id: KeepAlive, +pub struct OutgoingKeepAlivePacket { + pub id: i64, } -impl Default for KeepAlivePacket { +impl Default for OutgoingKeepAlivePacket { fn default() -> Self { let current_ms = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .expect("Time went backwards?? LMAO") .as_millis() as i64; - Self::new(current_ms) - } -} - -impl KeepAlivePacket { - pub fn new(id: i64) -> Self { - Self { - id: KeepAlive::from(id), - } + Self { id: current_ms } } }