Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature/better_systems #92

Merged
merged 7 commits into from
Oct 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/bin/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,12 @@ ctor = { workspace = true }
parking_lot = { workspace = true }
tracing = { workspace = true }
tokio = { workspace = true }
rayon = { workspace = true }
futures = { workspace = true }
serde_json = { workspace = true }
async-trait = "0.1.83"


[[bin]]
name = "ferrumc"
path = "src/main.rs"
8 changes: 4 additions & 4 deletions src/bin/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
// Security or something like that
#![forbid(unsafe_code)]

#![feature(slice_as_chunks)]

use ferrumc_ecs::Universe;
use ferrumc_net::ServerState;
use std::sync::{Arc};
use tracing::{error, info};
use ferrumc_net::server::create_server_listener;
use systems::definition;

pub(crate) mod errors;
Expand Down Expand Up @@ -44,10 +47,7 @@ async fn entry() -> Result<()> {


async fn create_state() -> Result<ServerState> {
let config = ferrumc_config::statics::get_global_config();
let addy = format!("{}:{}", config.host, config.port);

let listener = tokio::net::TcpListener::bind(addy).await?;
let listener = create_server_listener().await?;

Ok(ServerState {
universe: Universe::new(),
Expand Down
12 changes: 8 additions & 4 deletions src/bin/src/packet_handlers/handshake.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use ferrumc_macros::event_handler;
use ferrumc_net::connection::ConnectionState;
use ferrumc_net::errors::NetError::Packet;
use ferrumc_net::errors::NetError::{Packet};
use ferrumc_net::errors::{NetError, PacketError};
use ferrumc_net::packets::incoming::handshake::HandshakeEvent;
use ferrumc_net::GlobalState;
use tracing::trace;
use tracing::{error, trace};
use ferrumc_ecs::errors::ECSError;
use ferrumc_net::utils::ecs_helpers::EntityExt;

#[event_handler]
Expand All @@ -17,8 +18,11 @@ async fn handle_handshake(

// set connection state to handshake
let entity = handshake_event.conn_id;
let mut connection_state = entity
.get_mut::<ConnectionState>(state)?;
let Ok(mut connection_state) = entity
.get_mut::<ConnectionState>(state) else {
error!("Failed to get connection state");
return Err(NetError::ECSError(ECSError::ComponentNotFound));
};

trace!(
"conn state: {} -> {}",
Expand Down
51 changes: 36 additions & 15 deletions src/bin/src/packet_handlers/tick_handler.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
use std::sync::Arc;

use ferrumc_macros::event_handler;
use ferrumc_net::connection::ConnectionState;
use ferrumc_net::connection::StreamWriter;
use ferrumc_net::errors::NetError;
use ferrumc_net::packets::outgoing::update_time::TickEvent;
use ferrumc_net::packets::outgoing::update_time::UpdateTimePacket;
use ferrumc_net::GlobalState;
use ferrumc_net_codec::encode::NetEncodeOpts;
use ferrumc_net_codec::encode::{NetEncode, NetEncodeOpts};
use futures::StreamExt;
use tracing::error;

#[event_handler]
Expand All @@ -18,22 +17,44 @@ async fn handle_tick(event: TickEvent, state: GlobalState) -> Result<TickEvent,

///////

let packet = Arc::new(UpdateTimePacket::new(event.tick, event.tick % 24000));
let packet = UpdateTimePacket::new(event.tick, event.tick % 24000);
let packet = {
let mut buffer = Vec::new();
packet.encode(&mut buffer, &NetEncodeOpts::WithLength)?;
buffer
};

let query = state
.universe
.query::<(&mut StreamWriter, &ConnectionState)>();

for (mut writer, connection_state) in query {
if let ConnectionState::Play = *connection_state {
if let Err(e) = writer
.send_packet(packet.as_ref(), &NetEncodeOpts::WithLength)
.await
{
error!("Error sending update_time packet: {}", e);
.query::<(&mut StreamWriter, &ConnectionState)>()
.into_entities()
.into_iter()
.filter_map(|entity| {
let conn_state = state.universe.get::<ConnectionState>(entity).ok()?;
if matches!(*conn_state, ConnectionState::Play) {
Some(entity)
} else {
None
}
}
}
})
.collect::<Vec<_>>();

tokio::spawn(
futures::stream::iter(query.into_iter())
.fold((state, packet), move |(state, packet), entity| {
async move {
if let Ok(mut writer) = state.universe.get_mut::<StreamWriter>(entity) {
if let Err(e) = writer
.send_packet(&packet.as_slice(), &NetEncodeOpts::None)
.await
{
error!("Error sending update_time packet: {}", e);
}
}

(state, packet)
}
})
);
Ok(event)
}
26 changes: 15 additions & 11 deletions src/bin/src/systems/definition.rs
Original file line number Diff line number Diff line change
@@ -1,30 +1,33 @@
use std::sync::Arc;
use ferrumc_net::{GlobalState, NetResult};
use futures::stream::FuturesUnordered;
use tracing::{debug, debug_span, info, Instrument};
use async_trait::async_trait;
use crate::systems::keep_alive_system::KeepAliveSystem;
use crate::systems::tcp_listener_system::TcpListenerSystem;

use super::ticking_system::TickingSystem;
use crate::systems::ticking_system::TickingSystem;

#[async_trait]
pub trait System: Send + Sync {
async fn start(&self, state: GlobalState);
async fn stop(&self, state: GlobalState);
async fn start(self: Arc<Self>, state: GlobalState);
async fn stop(self: Arc<Self>, state: GlobalState);

fn name(&self) -> &'static str;
}

pub static ALL_SYSTEMS: &[&dyn System] = &[
&TcpListenerSystem,
&KeepAliveSystem,
&TickingSystem
];

pub fn create_systems() -> Vec<Arc<dyn System>> {
vec![
Arc::new(TcpListenerSystem),
Arc::new(KeepAliveSystem::new()),
Arc::new(TickingSystem),
]
}
pub async fn start_all_systems(state: GlobalState) -> NetResult<()> {
let systems = create_systems();
let handles = FuturesUnordered::new();

for system in ALL_SYSTEMS {
for system in systems {
let name = system.name();

let handle = tokio::spawn(
Expand All @@ -41,9 +44,10 @@ pub async fn start_all_systems(state: GlobalState) -> NetResult<()> {
}

pub async fn stop_all_systems(state: GlobalState) -> NetResult<()> {
let systems = create_systems();
info!("Stopping all systems...");

for system in ALL_SYSTEMS {
for system in systems {
debug!("Stopping system: {}", system.name());
system.stop(state.clone()).await;
}
Expand Down
28 changes: 19 additions & 9 deletions src/bin/src/systems/keep_alive_system.rs
Original file line number Diff line number Diff line change
@@ -1,36 +1,46 @@
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
use async_trait::async_trait;
use tracing::info;
use ferrumc_core::identity::player_identity::PlayerIdentity;
use ferrumc_net::GlobalState;
use crate::systems::definition::System;

pub struct KeepAliveSystem;
pub struct KeepAliveSystem {
shutdown: AtomicBool
}

static KILLED: AtomicBool = AtomicBool::new(false);
impl KeepAliveSystem {
pub const fn new() -> Self {
Self {
shutdown: AtomicBool::new(false)
}
}
}

#[async_trait]
impl System for KeepAliveSystem {
async fn start(&self, state: GlobalState) {
async fn start(self: Arc<Self>, state: GlobalState) {
loop {
if KILLED.load(Ordering::Relaxed) {
if self.shutdown.load(Ordering::Relaxed) {
break;
}

let online_players = state.universe.query::<&PlayerIdentity>();
drop(online_players);
// tracing::debug!("Total of {} online players", online_players.count());
info!("Online players: {}", online_players.count());

tokio::time::sleep(Duration::from_secs(5)).await;
}
}

async fn stop(&self, _state: GlobalState) {
async fn stop(self: Arc<Self>, _state: GlobalState) {
tracing::debug!("Stopping keep alive system...");
KILLED.store(true, Ordering::Relaxed);
self.shutdown.store(true, Ordering::Relaxed);
}

fn name(&self) -> &'static str {
"keep_alive"
}
}

7 changes: 5 additions & 2 deletions src/bin/src/systems/tcp_listener_system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@ pub struct TcpListenerSystem;

#[async_trait]
impl System for TcpListenerSystem {
async fn start(&self, state: GlobalState) {
async fn start(self: Arc<Self>, state: GlobalState) {
if let Err(e) = TcpListenerSystem::initiate_loop(state).await {
error!("TCP listener system failed with error: {:?}", e);
}
}

async fn stop(&self, _state: GlobalState) {
async fn stop(self: Arc<Self>, _state: GlobalState) {
debug!("Stopping TCP listener system...");
}

Expand All @@ -40,5 +40,8 @@ impl TcpListenerSystem {
.instrument(info_span!("conn", %addy).or_current())
);
}

#[allow(unreachable_code)]
Ok(())
}
}
13 changes: 7 additions & 6 deletions src/bin/src/systems/ticking_system.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::sync::Arc;
use crate::systems::definition::System;
use async_trait::async_trait;
use ferrumc_events::infrastructure::Event;
Expand All @@ -14,7 +15,7 @@ static KILLED: AtomicBool = AtomicBool::new(false);

#[async_trait]
impl System for TickingSystem {
async fn start(&self, state: GlobalState) {
async fn start(self: Arc<Self>, state: GlobalState) {
// TODO game time must be loaded from a file
let mut tick = 0;
while !KILLED.load(Ordering::Relaxed) {
Expand All @@ -23,22 +24,22 @@ impl System for TickingSystem {
let res = TickEvent::trigger(TickEvent::new(tick), state.clone()).await;

if res.is_err() {
debug!("error : {:?}", res);
debug!("error: {:?}", res);
}
let now = Instant::now();
if required_end > now {
tokio::time::sleep(required_end - now).await;
} else {
let time_debt = now - required_end;
info!("running behind! by : {}ms", time_debt.as_millis());
info!("Running behind by {:?}", time_debt);
}

tick += 1;
tick += 200;
}
}

async fn stop(&self, _state: GlobalState) {
tracing::debug!("Stopping ticking system...");
async fn stop(self: Arc<Self>, _state: GlobalState) {
debug!("Stopping ticking system...");
KILLED.store(true, Ordering::Relaxed);
}

Expand Down
12 changes: 6 additions & 6 deletions src/lib/ecs/src/components/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,18 +298,18 @@ impl ComponentManager {

Ok(())
}
pub fn get<'a, T: Component>(&self, entity_id: usize) -> Option<ComponentRef<'a, T>> {
pub fn get<'a, T: Component>(&self, entity_id: usize) -> ECSResult<ComponentRef<'a, T>> {
let type_id = TypeId::of::<T>();
let ptr = *self.components.get(&type_id)?;
let ptr = *self.components.get(&type_id).ok_or(ECSError::ComponentTypeNotFound)?;
let component_set = unsafe { &*(ptr as *const ComponentSparseSet<T>) };
component_set.get(entity_id).ok()
component_set.get(entity_id)
}

pub fn get_mut<'a, T: Component>(&self, entity_id: usize) -> Option<ComponentRefMut<'a, T>> {
pub fn get_mut<'a, T: Component>(&self, entity_id: usize) -> ECSResult<ComponentRefMut<'a, T>> {
let type_id = TypeId::of::<T>();
let ptr = *self.components.get(&type_id)?;
let ptr = *self.components.get(&type_id).ok_or(ECSError::ComponentTypeNotFound)?;
let component_set = unsafe { &*(ptr as *const ComponentSparseSet<T>) };
component_set.get_mut(entity_id).ok()
component_set.get_mut(entity_id)
}

pub fn remove<T: Component>(&self, entity_id: usize) -> ECSResult<()> {
Expand Down
Loading