Skip to content

Commit

Permalink
feat: add sender to node subscription event
Browse files Browse the repository at this point in the history
  • Loading branch information
Cifko committed Nov 25, 2024
1 parent 1896a33 commit 97fc9cc
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 8 deletions.
7 changes: 5 additions & 2 deletions atoma-state/src/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,11 @@ pub async fn handle_atoma_event(
info!("Published event: {:?}", event);
Ok(())
}
AtomaEvent::NodeRegisteredEvent(event) => {
info!("Node registered event: {:?}", event);
AtomaEvent::NodeRegisteredEvent((event, sender)) => {
info!(
"Node registered event: {:?} from sui address {:?}",
event, sender
);
Ok(())
}
AtomaEvent::NodeSubscribedToModelEvent(event) => {
Expand Down
3 changes: 2 additions & 1 deletion atoma-sui/src/events.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use serde::{Deserialize, Serialize};
use std::str::FromStr;
use sui_sdk::types::base_types::SuiAddress;
use thiserror::Error;

pub type Result<T> = std::result::Result<T, SuiEventParseError>;
Expand Down Expand Up @@ -106,7 +107,7 @@ pub enum AtomaEvent {
PublishedEvent(PublishedEvent),

/// An event emitted when a new node is registered in the Atoma network.
NodeRegisteredEvent(NodeRegisteredEvent),
NodeRegisteredEvent((NodeRegisteredEvent, SuiAddress)),

/// An event emitted when a node subscribes to a specific AI model.
NodeSubscribedToModelEvent(NodeSubscribedToModelEvent),
Expand Down
16 changes: 11 additions & 5 deletions atoma-sui/src/subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use serde_json::Value;
use std::{path::Path, str::FromStr, time::Duration};
use sui_sdk::{
rpc_types::{EventFilter, EventPage},
types::{event::EventID, Identifier},
types::{base_types::SuiAddress, event::EventID, Identifier},
SuiClient, SuiClientBuilder,
};
use thiserror::Error;
Expand Down Expand Up @@ -177,7 +177,8 @@ impl SuiEventSubscriber {
);
match AtomaEventIdentifier::from_str(event_name.as_str()) {
Ok(atoma_event_id) => {
let atoma_event = match parse_event(&atoma_event_id, sui_event.parsed_json).await {
let sender = sui_event.sender;
let atoma_event = match parse_event(&atoma_event_id, sui_event.parsed_json, sender).await {
Ok(atoma_event) => atoma_event,
Err(e) => {
error!(
Expand Down Expand Up @@ -380,7 +381,11 @@ fn write_cursor_to_toml_file(cursor: Option<EventID>, path: &str) -> Result<()>
/// }
/// ```
#[instrument(level = "trace", skip_all)]
async fn parse_event(event: &AtomaEventIdentifier, value: Value) -> Result<AtomaEvent> {
async fn parse_event(
event: &AtomaEventIdentifier,
value: Value,
sender: SuiAddress,
) -> Result<AtomaEvent> {
match event {
AtomaEventIdentifier::DisputeEvent => {
Ok(AtomaEvent::DisputeEvent(serde_json::from_value(value)?))
Expand All @@ -394,9 +399,10 @@ async fn parse_event(event: &AtomaEventIdentifier, value: Value) -> Result<Atoma
AtomaEventIdentifier::NewlySampledNodesEvent => Ok(AtomaEvent::NewlySampledNodesEvent(
serde_json::from_value(value)?,
)),
AtomaEventIdentifier::NodeRegisteredEvent => Ok(AtomaEvent::NodeRegisteredEvent(
AtomaEventIdentifier::NodeRegisteredEvent => Ok(AtomaEvent::NodeRegisteredEvent((
serde_json::from_value(value)?,
)),
sender,
))),
AtomaEventIdentifier::NodeSubscribedToModelEvent => Ok(
AtomaEvent::NodeSubscribedToModelEvent(serde_json::from_value(value)?),
),
Expand Down

0 comments on commit 97fc9cc

Please sign in to comment.