Skip to content

Commit

Permalink
Merge pull request #128 from AnonymousBit0111/fix/keep_alive_stuff
Browse files Browse the repository at this point in the history
Fix/keep alive stuff
  • Loading branch information
Sweattypalms authored Nov 17, 2024
2 parents 00ec7bb + 2f4e6a3 commit ca7ce8c
Show file tree
Hide file tree
Showing 25 changed files with 1,390 additions and 119 deletions.
11 changes: 8 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ members = [
"src/lib/derive_macros",
"src/lib/adapters/nbt", "src/lib/adapters/mca",
"src/tests", "src/lib/adapters/anvil",
"src/lib/text",
]

#================== Lints ==================#
Expand Down Expand Up @@ -74,6 +75,7 @@ ferrumc-core = { path = "src/lib/core" }
ferrumc-ecs = { path = "src/lib/ecs" }
ferrumc-events = { path = "src/lib/events" }
ferrumc-net = { path = "src/lib/net" }
ferrumc-text = { path = "src/lib/text" }
ferrumc-net-encryption = { path = "src/lib/net/crates/encryption" }
ferrumc-net-codec = { path = "src/lib/net/crates/codec" }
ferrumc-plugins = { path = "src/lib/plugins" }
Expand Down Expand Up @@ -117,25 +119,27 @@ rand = "0.9.0-alpha.2"
fnv = "1.0.7"

# Encoding/Serialization
serde = "1.0.210"
serde = { version = "1.0.210", features = ["derive"] }
serde_json = "1.0.128"
serde_derive = "1.0.210"
base64 = "0.22.1"
bitcode = "0.6.3"
bitcode_derive = "0.6.3"


# Data types
hashbrown = "0.15.0"
tinyvec = "1.8.0"
dashmap = "6.1.0"
uuid = "1.1"
uuid = { version = "1.1", features = ["v4", "v3", "serde"] }

# Macros
lazy_static = "1.5.0"
quote = "1.0.37"
syn = "2.0.77"
proc-macro2 = "1.0.86"
proc-macro-crate = "3.2.0"
paste = "1.0.15"
maplit = "1.0.2"
macro_rules_attribute = "0.2.0"

Expand All @@ -159,6 +163,7 @@ colored = "2.1.0"
# Misc
deepsize = "0.2.0"


# I/O
tempfile = "3.12.0"
memmap2 = "0.9.5"
Expand All @@ -174,4 +179,4 @@ debug = false
debug-assertions = false
overflow-checks = false
panic = "abort"
codegen-units = 1
codegen-units = 1
95 changes: 65 additions & 30 deletions src/bin/src/packet_handlers/login_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ use ferrumc_macros::event_handler;
use ferrumc_net::connection::{ConnectionState, StreamWriter};
use ferrumc_net::errors::NetError;
use ferrumc_net::packets::incoming::ack_finish_configuration::AckFinishConfigurationEvent;
use ferrumc_net::packets::incoming::keep_alive::IncomingKeepAlivePacket;
use ferrumc_net::packets::incoming::login_acknowledged::LoginAcknowledgedEvent;
use ferrumc_net::packets::incoming::login_start::LoginStartEvent;
use ferrumc_net::packets::incoming::server_bound_known_packs::ServerBoundKnownPacksEvent;
use ferrumc_net::packets::outgoing::client_bound_known_packs::ClientBoundKnownPacksPacket;
use ferrumc_net::packets::outgoing::finish_configuration::FinishConfigurationPacket;
use ferrumc_net::packets::outgoing::game_event::GameEventPacket;
use ferrumc_net::packets::outgoing::keep_alive::{KeepAlive, KeepAlivePacket};
use ferrumc_net::packets::outgoing::keep_alive::OutgoingKeepAlivePacket;
use ferrumc_net::packets::outgoing::login_play::LoginPlayPacket;
use ferrumc_net::packets::outgoing::login_success::LoginSuccessPacket;
use ferrumc_net::packets::outgoing::registry_data::get_registry_packets;
Expand All @@ -18,7 +20,6 @@ use ferrumc_net::packets::outgoing::synchronize_player_position::SynchronizePlay
use ferrumc_net::GlobalState;
use ferrumc_net_codec::encode::NetEncodeOpts;
use tracing::{debug, trace};
use ferrumc_net::packets::outgoing::finish_configuration::FinishConfigurationPacket;

#[event_handler]
async fn handle_login_start(
Expand All @@ -31,19 +32,23 @@ async fn handle_login_start(
let username = login_start_event.login_start_packet.username.as_str();
debug!("Received login start from user with username {}", username);


// Add the player identity component to the ECS for the entity.
state.universe.add_component::<PlayerIdentity>(
login_start_event.conn_id,
PlayerIdentity::new(username.to_string(), uuid),
)?;

//Send a Login Success Response to further the login sequence
let mut writer = state
.universe
.get_mut::<StreamWriter>(login_start_event.conn_id)?;

writer.send_packet(&LoginSuccessPacket::new(uuid, username), &NetEncodeOpts::WithLength).await?;
writer
.send_packet(
&LoginSuccessPacket::new(uuid, username),
&NetEncodeOpts::WithLength,
)
.await?;

Ok(login_start_event)
}
Expand All @@ -62,15 +67,16 @@ async fn handle_login_acknowledged(

*connection_state = ConnectionState::Configuration;


// Send packets packet
let client_bound_known_packs = ClientBoundKnownPacksPacket::new();

let mut writer = state
.universe
.get_mut::<StreamWriter>(login_acknowledged_event.conn_id)?;

writer.send_packet(&client_bound_known_packs, &NetEncodeOpts::WithLength).await?;
writer
.send_packet(&client_bound_known_packs, &NetEncodeOpts::WithLength)
.await?;

Ok(login_acknowledged_event)
}
Expand All @@ -87,10 +93,17 @@ async fn handle_server_bound_known_packs(
.get_mut::<StreamWriter>(server_bound_known_packs_event.conn_id)?;

let registry_packets = get_registry_packets();
writer.send_packet(&registry_packets, &NetEncodeOpts::None).await?;

writer.send_packet(&FinishConfigurationPacket::new(), &NetEncodeOpts::WithLength).await?;

writer
.send_packet(&registry_packets, &NetEncodeOpts::None)
.await?;

writer
.send_packet(
&FinishConfigurationPacket::new(),
&NetEncodeOpts::WithLength,
)
.await?;

Ok(server_bound_known_packs_event)
}

Expand All @@ -103,34 +116,56 @@ async fn handle_ack_finish_configuration(

let conn_id = ack_finish_configuration_event.conn_id;

let mut conn_state = state
.universe
.get_mut::<ConnectionState>(conn_id)?;
let mut conn_state = state.universe.get_mut::<ConnectionState>(conn_id)?;

*conn_state = ConnectionState::Play;

let mut writer = state
.universe
.get_mut::<StreamWriter>(conn_id)?;

writer.send_packet(&LoginPlayPacket::new(conn_id), &NetEncodeOpts::WithLength).await?;
writer.send_packet(&SetDefaultSpawnPositionPacket::default(), &NetEncodeOpts::WithLength).await?;
writer.send_packet(&SynchronizePlayerPositionPacket::default(), &NetEncodeOpts::WithLength).await?;
writer.send_packet(&GameEventPacket::start_waiting_for_level_chunks(), &NetEncodeOpts::WithLength).await?;
let mut writer = state.universe.get_mut::<StreamWriter>(conn_id)?;

writer
.send_packet(&LoginPlayPacket::new(conn_id), &NetEncodeOpts::WithLength)
.await?;
writer
.send_packet(
&SetDefaultSpawnPositionPacket::default(),
&NetEncodeOpts::WithLength,
)
.await?;
writer
.send_packet(
&SynchronizePlayerPositionPacket::default(),
&NetEncodeOpts::WithLength,
)
.await?;
writer
.send_packet(
&GameEventPacket::start_waiting_for_level_chunks(),
&NetEncodeOpts::WithLength,
)
.await?;

send_keep_alive(conn_id, state, &mut writer).await?;


Ok(ack_finish_configuration_event)
}
async fn send_keep_alive(conn_id: usize, state: GlobalState, writer: &mut ComponentRefMut<'_, StreamWriter>) -> Result<(), NetError> {
let keep_alive_packet = KeepAlivePacket::default();
writer.send_packet(&keep_alive_packet, &NetEncodeOpts::WithLength).await?;

let id = keep_alive_packet.id;
async fn send_keep_alive(
conn_id: usize,
state: GlobalState,
writer: &mut ComponentRefMut<'_, StreamWriter>,
) -> Result<(), NetError> {
let keep_alive_packet = OutgoingKeepAlivePacket::default();
writer
.send_packet(&keep_alive_packet, &NetEncodeOpts::WithLength)
.await?;

state.universe.add_component::<KeepAlive>(conn_id, id)?;
let timestamp = keep_alive_packet.timestamp;

state
.universe
.add_component::<OutgoingKeepAlivePacket>(conn_id, keep_alive_packet)?;
state
.universe
.add_component::<IncomingKeepAlivePacket>(conn_id, IncomingKeepAlivePacket { timestamp })?;

Ok(())
}
}
76 changes: 54 additions & 22 deletions src/bin/src/systems/keep_alive_system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ use crate::systems::definition::System;
use async_trait::async_trait;
use ferrumc_core::identity::player_identity::PlayerIdentity;
use ferrumc_net::connection::{ConnectionState, StreamWriter};
use ferrumc_net::packets::outgoing::keep_alive::{KeepAlive, KeepAlivePacket};
use ferrumc_net::packets::incoming::keep_alive::IncomingKeepAlivePacket;
use ferrumc_net::packets::outgoing::keep_alive::OutgoingKeepAlivePacket;
use ferrumc_net::utils::broadcast::{BroadcastOptions, BroadcastToAll};
use ferrumc_net::utils::state::terminate_connection;
use ferrumc_net::GlobalState;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
Expand Down Expand Up @@ -46,19 +48,17 @@ impl System for KeepAliveSystem {
last_time = current_time;
}

let fifteen_seconds_ms = 15000; // 15 seconds in milliseconds

let entities = state
.universe
.query::<(&mut StreamWriter, &ConnectionState, &KeepAlive)>()
.query::<(&mut StreamWriter, &ConnectionState)>()
.into_entities()
.into_iter()
.filter_map(|entity| {
let conn_state = state.universe.get::<ConnectionState>(entity).ok()?;
let keep_alive = state.universe.get_mut::<KeepAlive>(entity).ok()?;
let keep_alive = state.universe.get_mut::<IncomingKeepAlivePacket>(entity).ok()?;

if matches!(*conn_state, ConnectionState::Play)
&& (current_time - keep_alive.id) >= fifteen_seconds_ms
&& (current_time - keep_alive.timestamp) >= 15000
{
Some(entity)
} else {
Expand All @@ -68,26 +68,58 @@ impl System for KeepAliveSystem {
.collect::<Vec<_>>();
if !entities.is_empty() {
trace!("there are {:?} players to keep alive", entities.len());
}

let packet = KeepAlivePacket::default();
// I know this is the second iteration of the entities vector, but it has to be done since terminate_connection is async
for entity in entities.iter() {
let keep_alive = state
.universe
.get_mut::<IncomingKeepAlivePacket>(*entity)
.ok()
.unwrap();

if (current_time - keep_alive.timestamp) >= 30000 {
// two iterations missed
if let Err(e) = terminate_connection(
state.clone(),
*entity,
"Keep alive timeout".to_string(),
)
.await
{
warn!(
"Failed to terminate connection for entity {:?} , Err : {:?}",
entity, e
);
}
}
}
let packet = OutgoingKeepAlivePacket { timestamp: current_time };

let broadcast_opts = BroadcastOptions::default()
.only(entities)
.with_sync_callback(move |entity, state| {
let Ok(mut keep_alive) =
state.universe.get_mut::<OutgoingKeepAlivePacket>(entity)
else {
warn!(
"Failed to get <OutgoingKeepAlive> component for entity {}",
entity
);
return;
};

let broadcast_opts = BroadcastOptions::default()
.only(entities)
.with_sync_callback(move |entity, state| {
let Ok(mut keep_alive) = state.universe.get_mut::<KeepAlive>(entity) else {
warn!("Failed to get <KeepAlive> component for entity {}", entity);
return;
};
*keep_alive = packet.clone();
});

*keep_alive = KeepAlive::from(current_time);
});
if let Err(e) = state
.broadcast(&OutgoingKeepAlivePacket { timestamp: current_time }, broadcast_opts)
.await
{
error!("Error sending keep alive packet: {}", e);
};
}

if let Err(e) = state.broadcast(&packet, broadcast_opts).await {
error!("Error sending keep alive packet: {}", e);
};
// TODO, this should be configurable as some people may have bad network so the clients may end up disconnecting from the server moments before the keep alive is sent
tokio::time::sleep(tokio::time::Duration::from_secs(30)).await;
tokio::time::sleep(tokio::time::Duration::from_secs(15)).await;
}
}

Expand Down
1 change: 1 addition & 0 deletions src/lib/adapters/nbt/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ ferrumc-net-codec = { workspace = true }
tracing = { workspace = true}
tokio = { workspace = true }
ferrumc-general-purpose = { workspace = true }
uuid = { workspace = true }

[lints]
workspace = true
Expand Down
4 changes: 2 additions & 2 deletions src/lib/adapters/nbt/src/de/borrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -634,7 +634,7 @@ impl NbtTapeElement<'_> {
writer.write_all(&[self.nbt_id()])?;
name.serialize(writer, &NBTSerializeOptions::None);
}
NBTSerializeOptions::Network => {
NBTSerializeOptions::Network | NBTSerializeOptions::Flatten => {
writer.write_all(&[self.nbt_id()])?;
}
}
Expand Down Expand Up @@ -754,4 +754,4 @@ impl NbtTapeElement<'_> {
}
}
}
}
}
Loading

0 comments on commit ca7ce8c

Please sign in to comment.