Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(torii-core): optimistically broadcast entity update #2466

Merged
merged 7 commits into from
Oct 8, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 37 additions & 1 deletion crates/torii/core/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use tracing::{debug, error};
use crate::simple_broker::SimpleBroker;
use crate::types::{
Entity as EntityUpdated, Event as EventEmitted, EventMessage as EventMessageUpdated,
Model as ModelRegistered,
Model as ModelRegistered, OptimisticEntity, OptimisticEventMessage,
};

pub(crate) const LOG_TARGET: &str = "torii_core::executor";
Expand Down Expand Up @@ -185,6 +185,19 @@ impl<'c> Executor<'c> {
let mut entity_updated = EntityUpdated::from_row(&row)?;
entity_updated.updated_model = Some(entity);
entity_updated.deleted = false;

let optimistic_entity = OptimisticEntity {
id: entity_updated.id.clone(),
keys: entity_updated.keys.clone(),
event_id: entity_updated.event_id.clone(),
executed_at: entity_updated.executed_at,
created_at: entity_updated.created_at,
updated_at: entity_updated.updated_at,
updated_model: entity_updated.updated_model.clone(),
deleted: entity_updated.deleted,
};
SimpleBroker::publish(optimistic_entity);

Comment on lines +229 to +240
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Ohayo, sensei! Consider implementing From trait for OptimisticEntity

The creation of OptimisticEntity duplicates field assignments from entity_updated. To reduce code duplication and improve maintainability, consider implementing the From trait for OptimisticEntity as suggested in a previous review comment.

Example implementation:

impl From<&EntityUpdated> for OptimisticEntity {
    fn from(entity: &EntityUpdated) -> Self {
        Self {
            id: entity.id.clone(),
            keys: entity.keys.clone(),
            event_id: entity.event_id.clone(),
            executed_at: entity.executed_at,
            created_at: entity.created_at,
            updated_at: entity.updated_at,
            updated_model: entity.updated_model.clone(),
            deleted: entity.deleted,
        }
    }
}

Then, you can simplify the code to:

let optimistic_entity = OptimisticEntity::from(&entity_updated);
SimpleBroker::publish(optimistic_entity);

This change will make the code more concise and easier to maintain, sensei!

let broker_message = BrokerMessage::EntityUpdated(entity_updated);
self.publish_queue.push(broker_message);
}
Expand Down Expand Up @@ -225,6 +238,17 @@ impl<'c> Executor<'c> {
entity_updated.deleted = true;
}

let optimistic_entity = OptimisticEntity {
id: entity_updated.id.clone(),
keys: entity_updated.keys.clone(),
event_id: entity_updated.event_id.clone(),
executed_at: entity_updated.executed_at,
created_at: entity_updated.created_at,
updated_at: entity_updated.updated_at,
updated_model: entity_updated.updated_model.clone(),
deleted: entity_updated.deleted,
};
SimpleBroker::publish(optimistic_entity);
Comment on lines +281 to +291
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Ohayo again, sensei! Let's use that From trait here too!

This block is almost identical to the one in the QueryType::SetEntity branch. Once you've implemented the From trait for OptimisticEntity as suggested earlier, you can simplify this code in the same way:

let optimistic_entity = OptimisticEntity::from(&entity_updated);
SimpleBroker::publish(optimistic_entity);

This change will maintain consistency across the codebase and reduce duplication, making it easier to maintain and understand. Keep up the great work, sensei!

let broker_message = BrokerMessage::EntityUpdated(entity_updated);
self.publish_queue.push(broker_message);
}
Expand All @@ -241,6 +265,18 @@ impl<'c> Executor<'c> {
})?;
let mut event_message = EventMessageUpdated::from_row(&row)?;
event_message.updated_model = Some(entity);

let optimistic_event_message = OptimisticEventMessage {
id: event_message.id.clone(),
keys: event_message.keys.clone(),
event_id: event_message.event_id.clone(),
executed_at: event_message.executed_at,
created_at: event_message.created_at,
updated_at: event_message.updated_at,
updated_model: event_message.updated_model.clone(),
};
SimpleBroker::publish(optimistic_event_message);

Comment on lines +309 to +319
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Ohayo once more, sensei! Let's complete the trifecta with From for OptimisticEventMessage!

To maintain consistency and reduce code duplication, let's apply the same pattern we used for OptimisticEntity to OptimisticEventMessage. Consider implementing the From trait for OptimisticEventMessage:

impl From<&EventMessageUpdated> for OptimisticEventMessage {
    fn from(event: &EventMessageUpdated) -> Self {
        Self {
            id: event.id.clone(),
            keys: event.keys.clone(),
            event_id: event.event_id.clone(),
            executed_at: event.executed_at,
            created_at: event.created_at,
            updated_at: event.updated_at,
            updated_model: event.updated_model.clone(),
        }
    }
}

Then, simplify the code to:

let optimistic_event_message = OptimisticEventMessage::from(&event_message);
SimpleBroker::publish(optimistic_event_message);

This change will complete the refactoring trifecta, making your code more consistent, maintainable, and elegant. You're doing great work, sensei!

let broker_message = BrokerMessage::EventMessageUpdated(event_message);
self.publish_queue.push(broker_message);
}
Expand Down
32 changes: 32 additions & 0 deletions crates/torii/core/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,23 @@
pub deleted: bool,
}

#[derive(FromRow, Deserialize, Debug, Clone)]

Check warning on line 49 in crates/torii/core/src/types.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/types.rs#L49

Added line #L49 was not covered by tests
#[serde(rename_all = "camelCase")]
pub struct OptimisticEntity {
pub id: String,
pub keys: String,
pub event_id: String,
pub executed_at: DateTime<Utc>,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,

// this should never be None
#[sqlx(skip)]
pub updated_model: Option<Ty>,
#[sqlx(skip)]
pub deleted: bool,
}
Comment on lines +49 to +64
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Ohayo, sensei! Consider refactoring to avoid duplication.

The OptimisticEntity structure appears to be identical to the existing Entity structure. To maintain DRY (Don't Repeat Yourself) principles, consider using a type alias instead:

pub type OptimisticEntity = Entity;

This approach would reduce code duplication and make maintenance easier. If there are specific reasons for keeping them separate, please add a comment explaining the rationale.

Also, the updated_model field is still using Option<Ty>. If it should never be None as per the comment, consider changing it to:

pub updated_model: Ty,

If Option is necessary due to SQLx requirements, please add a comment explaining this constraint.


#[derive(FromRow, Deserialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
pub struct EventMessage {
Expand All @@ -61,6 +78,21 @@
pub updated_model: Option<Ty>,
}

#[derive(FromRow, Deserialize, Debug, Clone)]

Check warning on line 81 in crates/torii/core/src/types.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/types.rs#L81

Added line #L81 was not covered by tests
#[serde(rename_all = "camelCase")]
pub struct OptimisticEventMessage {
pub id: String,
pub keys: String,
pub event_id: String,
pub executed_at: DateTime<Utc>,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,

// this should never be None
#[sqlx(skip)]
pub updated_model: Option<Ty>,
}
Comment on lines +81 to +94
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Ohayo again, sensei! Let's discuss the OptimisticEventMessage structure.

The OptimisticEventMessage structure is nearly identical to the EventMessage structure, with one key difference: it's missing the deleted field. Here are some suggestions:

  1. If OptimisticEventMessage doesn't need the deleted field, consider using a type alias with a custom type:
pub struct OptimisticEventMessageData {
    pub event_message: EventMessage,
    // Add any additional fields specific to OptimisticEventMessage
}

pub type OptimisticEventMessage = OptimisticEventMessageData;
  1. If the deleted field should be present, add it to maintain consistency:
#[sqlx(skip)]
pub deleted: bool,
  1. If there's a specific reason for the current implementation, please add a comment explaining the rationale behind the differences between EventMessage and OptimisticEventMessage.

These changes would improve code clarity and maintainability. What do you think, sensei?


#[derive(FromRow, Deserialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
pub struct Model {
Expand Down
14 changes: 10 additions & 4 deletions crates/torii/grpc/src/server/subscriptions/entity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
use torii_core::error::{Error, ParseError};
use torii_core::simple_broker::SimpleBroker;
use torii_core::sql::FELT_DELIMITER;
use torii_core::types::Entity;
use torii_core::types::OptimisticEntity;
use tracing::{error, trace};

use crate::proto;
Expand Down Expand Up @@ -78,15 +78,21 @@
#[allow(missing_debug_implementations)]
pub struct Service {
subs_manager: Arc<EntityManager>,
simple_broker: Pin<Box<dyn Stream<Item = Entity> + Send>>,
simple_broker: Pin<Box<dyn Stream<Item = OptimisticEntity> + Send>>,
}

impl Service {
pub fn new(subs_manager: Arc<EntityManager>) -> Self {
Self { subs_manager, simple_broker: Box::pin(SimpleBroker::<Entity>::subscribe()) }
Self {
subs_manager,
simple_broker: Box::pin(SimpleBroker::<OptimisticEntity>::subscribe()),
}
}

async fn publish_updates(subs: Arc<EntityManager>, entity: &Entity) -> Result<(), Error> {
async fn publish_updates(
subs: Arc<EntityManager>,
entity: &OptimisticEntity,
) -> Result<(), Error> {

Check warning on line 95 in crates/torii/grpc/src/server/subscriptions/entity.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/subscriptions/entity.rs#L92-L95

Added lines #L92 - L95 were not covered by tests
let mut closed_stream = Vec::new();
let hashed = Felt::from_str(&entity.id).map_err(ParseError::FromStr)?;
let keys = entity
Expand Down
11 changes: 7 additions & 4 deletions crates/torii/grpc/src/server/subscriptions/event_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
use torii_core::error::{Error, ParseError};
use torii_core::simple_broker::SimpleBroker;
use torii_core::sql::FELT_DELIMITER;
use torii_core::types::EventMessage;
use torii_core::types::OptimisticEventMessage;
use tracing::{error, trace};

use super::entity::EntitiesSubscriber;
Expand Down Expand Up @@ -72,17 +72,20 @@
#[allow(missing_debug_implementations)]
pub struct Service {
subs_manager: Arc<EventMessageManager>,
simple_broker: Pin<Box<dyn Stream<Item = EventMessage> + Send>>,
simple_broker: Pin<Box<dyn Stream<Item = OptimisticEventMessage> + Send>>,
}

impl Service {
pub fn new(subs_manager: Arc<EventMessageManager>) -> Self {
Self { subs_manager, simple_broker: Box::pin(SimpleBroker::<EventMessage>::subscribe()) }
Self {
subs_manager,
simple_broker: Box::pin(SimpleBroker::<OptimisticEventMessage>::subscribe()),
}
}

async fn publish_updates(
subs: Arc<EventMessageManager>,
entity: &EventMessage,
entity: &OptimisticEventMessage,

Check warning on line 88 in crates/torii/grpc/src/server/subscriptions/event_message.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/subscriptions/event_message.rs#L88

Added line #L88 was not covered by tests
) -> Result<(), Error> {
let mut closed_stream = Vec::new();
let hashed = Felt::from_str(&entity.id).map_err(ParseError::FromStr)?;
Expand Down
Loading