Skip to content

Commit

Permalink
feat(torii-core): optimistically broadcast entity update (#2466)
Browse files Browse the repository at this point in the history
* refactor: update to use executor

* clippy

* make sure order

* fmt

* c

* fmt
  • Loading branch information
Larkooo authored Oct 8, 2024
1 parent 771639c commit d039c6d
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 9 deletions.
39 changes: 38 additions & 1 deletion crates/torii/core/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ use tracing::{debug, error};
use crate::simple_broker::SimpleBroker;
use crate::types::{
Contract as ContractUpdated, Entity as EntityUpdated, Event as EventEmitted,
EventMessage as EventMessageUpdated, Model as ModelRegistered,
EventMessage as EventMessageUpdated, Model as ModelRegistered, OptimisticEntity,
OptimisticEventMessage,
};

pub(crate) const LOG_TARGET: &str = "torii_core::executor";
Expand Down Expand Up @@ -224,6 +225,19 @@ impl<'c> Executor<'c> {
let mut entity_updated = EntityUpdated::from_row(&row)?;
entity_updated.updated_model = Some(entity);
entity_updated.deleted = false;

let optimistic_entity = OptimisticEntity {
id: entity_updated.id.clone(),
keys: entity_updated.keys.clone(),
event_id: entity_updated.event_id.clone(),
executed_at: entity_updated.executed_at,
created_at: entity_updated.created_at,
updated_at: entity_updated.updated_at,
updated_model: entity_updated.updated_model.clone(),
deleted: entity_updated.deleted,
};
SimpleBroker::publish(optimistic_entity);

let broker_message = BrokerMessage::EntityUpdated(entity_updated);
self.publish_queue.push(broker_message);
}
Expand Down Expand Up @@ -264,6 +278,17 @@ impl<'c> Executor<'c> {
entity_updated.deleted = true;
}

let optimistic_entity = OptimisticEntity {
id: entity_updated.id.clone(),
keys: entity_updated.keys.clone(),
event_id: entity_updated.event_id.clone(),
executed_at: entity_updated.executed_at,
created_at: entity_updated.created_at,
updated_at: entity_updated.updated_at,
updated_model: entity_updated.updated_model.clone(),
deleted: entity_updated.deleted,
};
SimpleBroker::publish(optimistic_entity);
let broker_message = BrokerMessage::EntityUpdated(entity_updated);
self.publish_queue.push(broker_message);
}
Expand All @@ -280,6 +305,18 @@ impl<'c> Executor<'c> {
})?;
let mut event_message = EventMessageUpdated::from_row(&row)?;
event_message.updated_model = Some(entity);

let optimistic_event_message = OptimisticEventMessage {
id: event_message.id.clone(),
keys: event_message.keys.clone(),
event_id: event_message.event_id.clone(),
executed_at: event_message.executed_at,
created_at: event_message.created_at,
updated_at: event_message.updated_at,
updated_model: event_message.updated_model.clone(),
};
SimpleBroker::publish(optimistic_event_message);

let broker_message = BrokerMessage::EventMessageUpdated(event_message);
self.publish_queue.push(broker_message);
}
Expand Down
32 changes: 32 additions & 0 deletions crates/torii/core/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,23 @@ pub struct Entity {
pub deleted: bool,
}

#[derive(FromRow, Deserialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
pub struct OptimisticEntity {
pub id: String,
pub keys: String,
pub event_id: String,
pub executed_at: DateTime<Utc>,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,

// this should never be None
#[sqlx(skip)]
pub updated_model: Option<Ty>,
#[sqlx(skip)]
pub deleted: bool,
}

#[derive(FromRow, Deserialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
pub struct EventMessage {
Expand All @@ -61,6 +78,21 @@ pub struct EventMessage {
pub updated_model: Option<Ty>,
}

#[derive(FromRow, Deserialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
pub struct OptimisticEventMessage {
pub id: String,
pub keys: String,
pub event_id: String,
pub executed_at: DateTime<Utc>,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,

// this should never be None
#[sqlx(skip)]
pub updated_model: Option<Ty>,
}

#[derive(FromRow, Deserialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
pub struct Model {
Expand Down
14 changes: 10 additions & 4 deletions crates/torii/grpc/src/server/subscriptions/entity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use tokio::sync::RwLock;
use torii_core::error::{Error, ParseError};
use torii_core::simple_broker::SimpleBroker;
use torii_core::sql::FELT_DELIMITER;
use torii_core::types::Entity;
use torii_core::types::OptimisticEntity;
use tracing::{error, trace};

use crate::proto;
Expand Down Expand Up @@ -78,15 +78,21 @@ impl EntityManager {
#[allow(missing_debug_implementations)]
pub struct Service {
subs_manager: Arc<EntityManager>,
simple_broker: Pin<Box<dyn Stream<Item = Entity> + Send>>,
simple_broker: Pin<Box<dyn Stream<Item = OptimisticEntity> + Send>>,
}

impl Service {
pub fn new(subs_manager: Arc<EntityManager>) -> Self {
Self { subs_manager, simple_broker: Box::pin(SimpleBroker::<Entity>::subscribe()) }
Self {
subs_manager,
simple_broker: Box::pin(SimpleBroker::<OptimisticEntity>::subscribe()),
}
}

async fn publish_updates(subs: Arc<EntityManager>, entity: &Entity) -> Result<(), Error> {
async fn publish_updates(
subs: Arc<EntityManager>,
entity: &OptimisticEntity,
) -> Result<(), Error> {
let mut closed_stream = Vec::new();
let hashed = Felt::from_str(&entity.id).map_err(ParseError::FromStr)?;
let keys = entity
Expand Down
11 changes: 7 additions & 4 deletions crates/torii/grpc/src/server/subscriptions/event_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use tokio::sync::RwLock;
use torii_core::error::{Error, ParseError};
use torii_core::simple_broker::SimpleBroker;
use torii_core::sql::FELT_DELIMITER;
use torii_core::types::EventMessage;
use torii_core::types::OptimisticEventMessage;
use tracing::{error, trace};

use super::entity::EntitiesSubscriber;
Expand Down Expand Up @@ -72,17 +72,20 @@ impl EventMessageManager {
#[allow(missing_debug_implementations)]
pub struct Service {
subs_manager: Arc<EventMessageManager>,
simple_broker: Pin<Box<dyn Stream<Item = EventMessage> + Send>>,
simple_broker: Pin<Box<dyn Stream<Item = OptimisticEventMessage> + Send>>,
}

impl Service {
pub fn new(subs_manager: Arc<EventMessageManager>) -> Self {
Self { subs_manager, simple_broker: Box::pin(SimpleBroker::<EventMessage>::subscribe()) }
Self {
subs_manager,
simple_broker: Box::pin(SimpleBroker::<OptimisticEventMessage>::subscribe()),
}
}

async fn publish_updates(
subs: Arc<EventMessageManager>,
entity: &EventMessage,
entity: &OptimisticEventMessage,
) -> Result<(), Error> {
let mut closed_stream = Vec::new();
let hashed = Felt::from_str(&entity.id).map_err(ParseError::FromStr)?;
Expand Down

0 comments on commit d039c6d

Please sign in to comment.