Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/keep alive stuff #128

Merged
11 changes: 8 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ members = [
"src/lib/derive_macros",
"src/lib/adapters/nbt", "src/lib/adapters/mca",
"src/tests", "src/lib/adapters/anvil",
"src/lib/text",
]

#================== Lints ==================#
Expand Down Expand Up @@ -74,6 +75,7 @@ ferrumc-core = { path = "src/lib/core" }
ferrumc-ecs = { path = "src/lib/ecs" }
ferrumc-events = { path = "src/lib/events" }
ferrumc-net = { path = "src/lib/net" }
ferrumc-text = { path = "src/lib/text" }
ferrumc-net-encryption = { path = "src/lib/net/crates/encryption" }
ferrumc-net-codec = { path = "src/lib/net/crates/codec" }
ferrumc-plugins = { path = "src/lib/plugins" }
Expand Down Expand Up @@ -117,25 +119,27 @@ rand = "0.9.0-alpha.2"
fnv = "1.0.7"

# Encoding/Serialization
serde = "1.0.210"
serde = { version = "1.0.210", features = ["derive"] }
serde_json = "1.0.128"
serde_derive = "1.0.210"
base64 = "0.22.1"
bitcode = "0.6.3"
bitcode_derive = "0.6.3"


# Data types
hashbrown = "0.15.0"
tinyvec = "1.8.0"
dashmap = "6.1.0"
uuid = "1.1"
uuid = { version = "1.1", features = ["v4", "v3", "serde"] }

# Macros
lazy_static = "1.5.0"
quote = "1.0.37"
syn = "2.0.77"
proc-macro2 = "1.0.86"
proc-macro-crate = "3.2.0"
paste = "1.0.15"
maplit = "1.0.2"
macro_rules_attribute = "0.2.0"

Expand All @@ -159,6 +163,7 @@ colored = "2.1.0"
# Misc
deepsize = "0.2.0"


# I/O
tempfile = "3.12.0"
memmap2 = "0.9.5"
Expand All @@ -174,4 +179,4 @@ debug = false
debug-assertions = false
overflow-checks = false
panic = "abort"
codegen-units = 1
codegen-units = 1
95 changes: 65 additions & 30 deletions src/bin/src/packet_handlers/login_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ use ferrumc_macros::event_handler;
use ferrumc_net::connection::{ConnectionState, StreamWriter};
use ferrumc_net::errors::NetError;
use ferrumc_net::packets::incoming::ack_finish_configuration::AckFinishConfigurationEvent;
use ferrumc_net::packets::incoming::keep_alive::IncomingKeepAlivePacket;
use ferrumc_net::packets::incoming::login_acknowledged::LoginAcknowledgedEvent;
use ferrumc_net::packets::incoming::login_start::LoginStartEvent;
use ferrumc_net::packets::incoming::server_bound_known_packs::ServerBoundKnownPacksEvent;
use ferrumc_net::packets::outgoing::client_bound_known_packs::ClientBoundKnownPacksPacket;
use ferrumc_net::packets::outgoing::finish_configuration::FinishConfigurationPacket;
use ferrumc_net::packets::outgoing::game_event::GameEventPacket;
use ferrumc_net::packets::outgoing::keep_alive::{KeepAlive, KeepAlivePacket};
use ferrumc_net::packets::outgoing::keep_alive::OutgoingKeepAlivePacket;
use ferrumc_net::packets::outgoing::login_play::LoginPlayPacket;
use ferrumc_net::packets::outgoing::login_success::LoginSuccessPacket;
use ferrumc_net::packets::outgoing::registry_data::get_registry_packets;
Expand All @@ -18,7 +20,6 @@ use ferrumc_net::packets::outgoing::synchronize_player_position::SynchronizePlay
use ferrumc_net::GlobalState;
use ferrumc_net_codec::encode::NetEncodeOpts;
use tracing::{debug, trace};
use ferrumc_net::packets::outgoing::finish_configuration::FinishConfigurationPacket;

#[event_handler]
async fn handle_login_start(
Expand All @@ -31,19 +32,23 @@ async fn handle_login_start(
let username = login_start_event.login_start_packet.username.as_str();
debug!("Received login start from user with username {}", username);


// Add the player identity component to the ECS for the entity.
state.universe.add_component::<PlayerIdentity>(
login_start_event.conn_id,
PlayerIdentity::new(username.to_string(), uuid),
)?;

//Send a Login Success Response to further the login sequence
let mut writer = state
.universe
.get_mut::<StreamWriter>(login_start_event.conn_id)?;

writer.send_packet(&LoginSuccessPacket::new(uuid, username), &NetEncodeOpts::WithLength).await?;
writer
.send_packet(
&LoginSuccessPacket::new(uuid, username),
&NetEncodeOpts::WithLength,
)
.await?;

Ok(login_start_event)
}
Expand All @@ -62,15 +67,16 @@ async fn handle_login_acknowledged(

*connection_state = ConnectionState::Configuration;


// Send packets packet
let client_bound_known_packs = ClientBoundKnownPacksPacket::new();

let mut writer = state
.universe
.get_mut::<StreamWriter>(login_acknowledged_event.conn_id)?;

writer.send_packet(&client_bound_known_packs, &NetEncodeOpts::WithLength).await?;
writer
.send_packet(&client_bound_known_packs, &NetEncodeOpts::WithLength)
.await?;

Ok(login_acknowledged_event)
}
Expand All @@ -87,10 +93,17 @@ async fn handle_server_bound_known_packs(
.get_mut::<StreamWriter>(server_bound_known_packs_event.conn_id)?;

let registry_packets = get_registry_packets();
writer.send_packet(&registry_packets, &NetEncodeOpts::None).await?;

writer.send_packet(&FinishConfigurationPacket::new(), &NetEncodeOpts::WithLength).await?;

writer
.send_packet(&registry_packets, &NetEncodeOpts::None)
.await?;

writer
.send_packet(
&FinishConfigurationPacket::new(),
&NetEncodeOpts::WithLength,
)
.await?;

Ok(server_bound_known_packs_event)
}

Expand All @@ -103,34 +116,56 @@ async fn handle_ack_finish_configuration(

let conn_id = ack_finish_configuration_event.conn_id;

let mut conn_state = state
.universe
.get_mut::<ConnectionState>(conn_id)?;
let mut conn_state = state.universe.get_mut::<ConnectionState>(conn_id)?;

*conn_state = ConnectionState::Play;

let mut writer = state
.universe
.get_mut::<StreamWriter>(conn_id)?;

writer.send_packet(&LoginPlayPacket::new(conn_id), &NetEncodeOpts::WithLength).await?;
writer.send_packet(&SetDefaultSpawnPositionPacket::default(), &NetEncodeOpts::WithLength).await?;
writer.send_packet(&SynchronizePlayerPositionPacket::default(), &NetEncodeOpts::WithLength).await?;
writer.send_packet(&GameEventPacket::start_waiting_for_level_chunks(), &NetEncodeOpts::WithLength).await?;
let mut writer = state.universe.get_mut::<StreamWriter>(conn_id)?;

writer
.send_packet(&LoginPlayPacket::new(conn_id), &NetEncodeOpts::WithLength)
.await?;
writer
.send_packet(
&SetDefaultSpawnPositionPacket::default(),
&NetEncodeOpts::WithLength,
)
.await?;
writer
.send_packet(
&SynchronizePlayerPositionPacket::default(),
&NetEncodeOpts::WithLength,
)
.await?;
writer
.send_packet(
&GameEventPacket::start_waiting_for_level_chunks(),
&NetEncodeOpts::WithLength,
)
.await?;

send_keep_alive(conn_id, state, &mut writer).await?;


Ok(ack_finish_configuration_event)
}
async fn send_keep_alive(conn_id: usize, state: GlobalState, writer: &mut ComponentRefMut<'_, StreamWriter>) -> Result<(), NetError> {
let keep_alive_packet = KeepAlivePacket::default();
writer.send_packet(&keep_alive_packet, &NetEncodeOpts::WithLength).await?;

let id = keep_alive_packet.id;
async fn send_keep_alive(
conn_id: usize,
state: GlobalState,
writer: &mut ComponentRefMut<'_, StreamWriter>,
) -> Result<(), NetError> {
let keep_alive_packet = OutgoingKeepAlivePacket::default();
writer
.send_packet(&keep_alive_packet, &NetEncodeOpts::WithLength)
.await?;

state.universe.add_component::<KeepAlive>(conn_id, id)?;
let timestamp = keep_alive_packet.timestamp;

state
.universe
.add_component::<OutgoingKeepAlivePacket>(conn_id, keep_alive_packet)?;
state
.universe
.add_component::<IncomingKeepAlivePacket>(conn_id, IncomingKeepAlivePacket { timestamp })?;

Ok(())
}
}
76 changes: 54 additions & 22 deletions src/bin/src/systems/keep_alive_system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ use crate::systems::definition::System;
use async_trait::async_trait;
use ferrumc_core::identity::player_identity::PlayerIdentity;
use ferrumc_net::connection::{ConnectionState, StreamWriter};
use ferrumc_net::packets::outgoing::keep_alive::{KeepAlive, KeepAlivePacket};
use ferrumc_net::packets::incoming::keep_alive::IncomingKeepAlivePacket;
use ferrumc_net::packets::outgoing::keep_alive::OutgoingKeepAlivePacket;
use ferrumc_net::utils::broadcast::{BroadcastOptions, BroadcastToAll};
use ferrumc_net::utils::state::terminate_connection;
use ferrumc_net::GlobalState;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
Expand Down Expand Up @@ -46,19 +48,17 @@ impl System for KeepAliveSystem {
last_time = current_time;
}

let fifteen_seconds_ms = 15000; // 15 seconds in milliseconds

let entities = state
.universe
.query::<(&mut StreamWriter, &ConnectionState, &KeepAlive)>()
.query::<(&mut StreamWriter, &ConnectionState)>()
.into_entities()
.into_iter()
.filter_map(|entity| {
let conn_state = state.universe.get::<ConnectionState>(entity).ok()?;
let keep_alive = state.universe.get_mut::<KeepAlive>(entity).ok()?;
let keep_alive = state.universe.get_mut::<IncomingKeepAlivePacket>(entity).ok()?;

if matches!(*conn_state, ConnectionState::Play)
&& (current_time - keep_alive.id) >= fifteen_seconds_ms
&& (current_time - keep_alive.timestamp) >= 15000
{
Some(entity)
} else {
Expand All @@ -68,26 +68,58 @@ impl System for KeepAliveSystem {
.collect::<Vec<_>>();
if !entities.is_empty() {
trace!("there are {:?} players to keep alive", entities.len());
}

let packet = KeepAlivePacket::default();
// I know this is the second iteration of the entities vector, but it has to be done since terminate_connection is async
for entity in entities.iter() {
let keep_alive = state
.universe
.get_mut::<IncomingKeepAlivePacket>(*entity)
.ok()
.unwrap();

if (current_time - keep_alive.timestamp) >= 30000 {
// two iterations missed
if let Err(e) = terminate_connection(
state.clone(),
*entity,
"Keep alive timeout".to_string(),
)
.await
{
warn!(
"Failed to terminate connection for entity {:?} , Err : {:?}",
entity, e
);
}
}
}
let packet = OutgoingKeepAlivePacket { timestamp: current_time };

let broadcast_opts = BroadcastOptions::default()
.only(entities)
.with_sync_callback(move |entity, state| {
let Ok(mut keep_alive) =
state.universe.get_mut::<OutgoingKeepAlivePacket>(entity)
else {
warn!(
"Failed to get <OutgoingKeepAlive> component for entity {}",
entity
);
return;
};

let broadcast_opts = BroadcastOptions::default()
.only(entities)
.with_sync_callback(move |entity, state| {
let Ok(mut keep_alive) = state.universe.get_mut::<KeepAlive>(entity) else {
warn!("Failed to get <KeepAlive> component for entity {}", entity);
return;
};
*keep_alive = packet.clone();
});

*keep_alive = KeepAlive::from(current_time);
});
if let Err(e) = state
.broadcast(&OutgoingKeepAlivePacket { timestamp: current_time }, broadcast_opts)
.await
{
error!("Error sending keep alive packet: {}", e);
};
}

if let Err(e) = state.broadcast(&packet, broadcast_opts).await {
error!("Error sending keep alive packet: {}", e);
};
// TODO, this should be configurable as some people may have bad network so the clients may end up disconnecting from the server moments before the keep alive is sent
tokio::time::sleep(tokio::time::Duration::from_secs(30)).await;
tokio::time::sleep(tokio::time::Duration::from_secs(15)).await;
}
}

Expand Down
1 change: 1 addition & 0 deletions src/lib/adapters/nbt/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ ferrumc-net-codec = { workspace = true }
tracing = { workspace = true}
tokio = { workspace = true }
ferrumc-general-purpose = { workspace = true }
uuid = { workspace = true }

[lints]
workspace = true
Expand Down
4 changes: 2 additions & 2 deletions src/lib/adapters/nbt/src/de/borrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -634,7 +634,7 @@ impl NbtTapeElement<'_> {
writer.write_all(&[self.nbt_id()])?;
name.serialize(writer, &NBTSerializeOptions::None);
}
NBTSerializeOptions::Network => {
NBTSerializeOptions::Network | NBTSerializeOptions::Flatten => {
writer.write_all(&[self.nbt_id()])?;
}
}
Expand Down Expand Up @@ -754,4 +754,4 @@ impl NbtTapeElement<'_> {
}
}
}
}
}
Loading