Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(torii-core): optimistically broadcast entity update #2466

Merged
merged 7 commits into from
Oct 8, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 37 additions & 1 deletion crates/torii/core/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use tracing::{debug, error};
use crate::simple_broker::SimpleBroker;
use crate::types::{
Entity as EntityUpdated, Event as EventEmitted, EventMessage as EventMessageUpdated,
Model as ModelRegistered,
Model as ModelRegistered, OptimisticEntity, OptimisticEventMessage,
};

pub(crate) const LOG_TARGET: &str = "torii_core::executor";
Expand Down Expand Up @@ -185,6 +185,19 @@ impl<'c> Executor<'c> {
let mut entity_updated = EntityUpdated::from_row(&row)?;
entity_updated.updated_model = Some(entity);
entity_updated.deleted = false;

let optimistic_entity = OptimisticEntity {
id: entity_updated.id.clone(),
keys: entity_updated.keys.clone(),
event_id: entity_updated.event_id.clone(),
executed_at: entity_updated.executed_at.clone(),
created_at: entity_updated.created_at.clone(),
updated_at: entity_updated.updated_at.clone(),
updated_model: entity_updated.updated_model.clone(),
deleted: entity_updated.deleted,
};
SimpleBroker::publish(optimistic_entity);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider refactoring to reduce code duplication when creating OptimisticEntity

This block duplicates field cloning from entity_updated to create OptimisticEntity. To improve maintainability and reduce code duplication, consider implementing the From trait to convert EntityUpdated into OptimisticEntity.

Implement the From trait for OptimisticEntity:

impl From<&EntityUpdated> for OptimisticEntity {
    fn from(entity_updated: &EntityUpdated) -> Self {
        Self {
            id: entity_updated.id.clone(),
            keys: entity_updated.keys.clone(),
            event_id: entity_updated.event_id.clone(),
            executed_at: entity_updated.executed_at.clone(),
            created_at: entity_updated.created_at.clone(),
            updated_at: entity_updated.updated_at.clone(),
            updated_model: entity_updated.updated_model.clone(),
            deleted: entity_updated.deleted,
        }
    }
}

Then, you can simplify the code to:

let optimistic_entity = OptimisticEntity::from(&entity_updated);


let broker_message = BrokerMessage::EntityUpdated(entity_updated);
self.publish_queue.push(broker_message);
}
Expand Down Expand Up @@ -225,6 +238,17 @@ impl<'c> Executor<'c> {
entity_updated.deleted = true;
}

let optimistic_entity = OptimisticEntity {
id: entity_updated.id.clone(),
keys: entity_updated.keys.clone(),
event_id: entity_updated.event_id.clone(),
executed_at: entity_updated.executed_at.clone(),
created_at: entity_updated.created_at.clone(),
updated_at: entity_updated.updated_at.clone(),
updated_model: entity_updated.updated_model.clone(),
deleted: entity_updated.deleted,
};
SimpleBroker::publish(optimistic_entity);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider refactoring to reduce code duplication when creating OptimisticEntity

This block repeats the same cloning logic as before. Using the From trait implementation suggested earlier will reduce code duplication and improve maintainability.

let broker_message = BrokerMessage::EntityUpdated(entity_updated);
self.publish_queue.push(broker_message);
}
Expand All @@ -241,6 +265,18 @@ impl<'c> Executor<'c> {
})?;
let mut event_message = EventMessageUpdated::from_row(&row)?;
event_message.updated_model = Some(entity);

let optimistic_event_message = OptimisticEventMessage {
id: event_message.id.clone(),
keys: event_message.keys.clone(),
event_id: event_message.event_id.clone(),
executed_at: event_message.executed_at.clone(),
created_at: event_message.created_at.clone(),
updated_at: event_message.updated_at.clone(),
updated_model: event_message.updated_model.clone(),
};
SimpleBroker::publish(optimistic_event_message);

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider refactoring to reduce code duplication when creating OptimisticEventMessage

This block duplicates field cloning from event_message to create OptimisticEventMessage. To improve maintainability and reduce code duplication, consider implementing the From trait to convert EventMessageUpdated into OptimisticEventMessage.

Implement the From trait for OptimisticEventMessage:

impl From<&EventMessageUpdated> for OptimisticEventMessage {
    fn from(event_message: &EventMessageUpdated) -> Self {
        Self {
            id: event_message.id.clone(),
            keys: event_message.keys.clone(),
            event_id: event_message.event_id.clone(),
            executed_at: event_message.executed_at.clone(),
            created_at: event_message.created_at.clone(),
            updated_at: event_message.updated_at.clone(),
            updated_model: event_message.updated_model.clone(),
        }
    }
}

Then, you can simplify the code to:

let optimistic_event_message = OptimisticEventMessage::from(&event_message);

let broker_message = BrokerMessage::EventMessageUpdated(event_message);
self.publish_queue.push(broker_message);
}
Expand Down
32 changes: 32 additions & 0 deletions crates/torii/core/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,23 @@ pub struct Entity {
pub deleted: bool,
}

#[derive(FromRow, Deserialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
pub struct OptimisticEntity {
pub id: String,
pub keys: String,
pub event_id: String,
pub executed_at: DateTime<Utc>,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,

// this should never be None
#[sqlx(skip)]
pub updated_model: Option<Ty>,
#[sqlx(skip)]
pub deleted: bool,
}
Comment on lines +49 to +64
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Ohayo, sensei! Consider refactoring to avoid duplication.

The OptimisticEntity structure appears to be identical to the existing Entity structure. To maintain DRY (Don't Repeat Yourself) principles, consider using a type alias instead:

pub type OptimisticEntity = Entity;

This approach would reduce code duplication and make maintenance easier. If there are specific reasons for keeping them separate, please add a comment explaining the rationale.

Also, the updated_model field is still using Option<Ty>. If it should never be None as per the comment, consider changing it to:

pub updated_model: Ty,

If Option is necessary due to SQLx requirements, please add a comment explaining this constraint.


#[derive(FromRow, Deserialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
pub struct EventMessage {
Expand All @@ -61,6 +78,21 @@ pub struct EventMessage {
pub updated_model: Option<Ty>,
}

#[derive(FromRow, Deserialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
pub struct OptimisticEventMessage {
pub id: String,
pub keys: String,
pub event_id: String,
pub executed_at: DateTime<Utc>,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,

// this should never be None
#[sqlx(skip)]
pub updated_model: Option<Ty>,
}
Comment on lines +81 to +94
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Ohayo again, sensei! Let's discuss the OptimisticEventMessage structure.

The OptimisticEventMessage structure is nearly identical to the EventMessage structure, with one key difference: it's missing the deleted field. Here are some suggestions:

  1. If OptimisticEventMessage doesn't need the deleted field, consider using a type alias with a custom type:
pub struct OptimisticEventMessageData {
    pub event_message: EventMessage,
    // Add any additional fields specific to OptimisticEventMessage
}

pub type OptimisticEventMessage = OptimisticEventMessageData;
  1. If the deleted field should be present, add it to maintain consistency:
#[sqlx(skip)]
pub deleted: bool,
  1. If there's a specific reason for the current implementation, please add a comment explaining the rationale behind the differences between EventMessage and OptimisticEventMessage.

These changes would improve code clarity and maintainability. What do you think, sensei?


#[derive(FromRow, Deserialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
pub struct Model {
Expand Down
14 changes: 10 additions & 4 deletions crates/torii/grpc/src/server/subscriptions/entity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use tokio::sync::RwLock;
use torii_core::error::{Error, ParseError};
use torii_core::simple_broker::SimpleBroker;
use torii_core::sql::FELT_DELIMITER;
use torii_core::types::Entity;
use torii_core::types::OptimisticEntity;
use tracing::{error, trace};

use crate::proto;
Expand Down Expand Up @@ -78,15 +78,21 @@ impl EntityManager {
#[allow(missing_debug_implementations)]
pub struct Service {
subs_manager: Arc<EntityManager>,
simple_broker: Pin<Box<dyn Stream<Item = Entity> + Send>>,
simple_broker: Pin<Box<dyn Stream<Item = OptimisticEntity> + Send>>,
}

impl Service {
pub fn new(subs_manager: Arc<EntityManager>) -> Self {
Self { subs_manager, simple_broker: Box::pin(SimpleBroker::<Entity>::subscribe()) }
Self {
subs_manager,
simple_broker: Box::pin(SimpleBroker::<OptimisticEntity>::subscribe()),
}
}

async fn publish_updates(subs: Arc<EntityManager>, entity: &Entity) -> Result<(), Error> {
async fn publish_updates(
subs: Arc<EntityManager>,
entity: &OptimisticEntity,
) -> Result<(), Error> {
let mut closed_stream = Vec::new();
let hashed = Felt::from_str(&entity.id).map_err(ParseError::FromStr)?;
let keys = entity
Expand Down
11 changes: 7 additions & 4 deletions crates/torii/grpc/src/server/subscriptions/event_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use tokio::sync::RwLock;
use torii_core::error::{Error, ParseError};
use torii_core::simple_broker::SimpleBroker;
use torii_core::sql::FELT_DELIMITER;
use torii_core::types::EventMessage;
use torii_core::types::OptimisticEventMessage;
use tracing::{error, trace};

use super::entity::EntitiesSubscriber;
Expand Down Expand Up @@ -72,17 +72,20 @@ impl EventMessageManager {
#[allow(missing_debug_implementations)]
pub struct Service {
subs_manager: Arc<EventMessageManager>,
simple_broker: Pin<Box<dyn Stream<Item = EventMessage> + Send>>,
simple_broker: Pin<Box<dyn Stream<Item = OptimisticEventMessage> + Send>>,
}

impl Service {
pub fn new(subs_manager: Arc<EventMessageManager>) -> Self {
Self { subs_manager, simple_broker: Box::pin(SimpleBroker::<EventMessage>::subscribe()) }
Self {
subs_manager,
simple_broker: Box::pin(SimpleBroker::<OptimisticEventMessage>::subscribe()),
}
}

async fn publish_updates(
subs: Arc<EventMessageManager>,
entity: &EventMessage,
entity: &OptimisticEventMessage,
) -> Result<(), Error> {
let mut closed_stream = Vec::new();
let hashed = Felt::from_str(&entity.id).map_err(ParseError::FromStr)?;
Expand Down
Loading