From efd2e461291f1ad529f97cdf5763f87b3d2c63ae Mon Sep 17 00:00:00 2001 From: Simon Gellis Date: Mon, 9 Dec 2024 17:44:09 -0500 Subject: [PATCH] feat: send websocket events for operation updates --- firefly-cardanoconnect/src/main.rs | 8 +- .../src/operations/manager.rs | 24 +++- firefly-cardanoconnect/src/routes/ws.rs | 124 ++++++++++++------ firefly-cardanoconnect/src/streams.rs | 1 + firefly-cardanoconnect/src/streams/manager.rs | 11 +- firefly-cardanoconnect/src/streams/mux.rs | 54 +++++++- 6 files changed, 161 insertions(+), 61 deletions(-) diff --git a/firefly-cardanoconnect/src/main.rs b/firefly-cardanoconnect/src/main.rs index 51b47a9..4dcb5fc 100644 --- a/firefly-cardanoconnect/src/main.rs +++ b/firefly-cardanoconnect/src/main.rs @@ -25,6 +25,7 @@ use routes::{ }; use signer::CardanoSigner; use streams::StreamManager; +use tokio::sync::broadcast; use tracing::instrument; mod blockchain; @@ -75,18 +76,23 @@ async fn init_state(config: &CardanoConnectConfig, mock_data: bool) -> Result, persistence: Arc, signer: Arc, + operation_sink: broadcast::Sender, } impl OperationsManager { @@ -25,12 +27,14 @@ impl OperationsManager { contracts: Arc, persistence: Arc, signer: Arc, + operation_sink: broadcast::Sender, ) -> Self { Self { blockchain, contracts, persistence, signer, + operation_sink, } } @@ -40,16 +44,16 @@ impl OperationsManager { status: OperationStatus::Pending, tx_id: None, }; - self.persistence.write_operation(&op).await?; + self.update_operation(&op).await?; match self.contracts.deploy(name, contract).await { Ok(()) => { op.status = OperationStatus::Succeeded; - self.persistence.write_operation(&op).await?; + self.update_operation(&op).await?; Ok(()) } Err(err) => { op.status = OperationStatus::Failed(err.to_string()); - self.persistence.write_operation(&op).await?; + self.update_operation(&op).await?; Err(err.into()) } } @@ -68,13 +72,13 @@ impl OperationsManager { status: OperationStatus::Pending, tx_id: None, }; - self.persistence.write_operation(&op).await?; + self.update_operation(&op).await?; let result = self.contracts.invoke(contract, method, params).await; let value = match result { Ok(v) => v, Err(err) => { op.status = OperationStatus::Failed(err.to_string()); - self.persistence.write_operation(&op).await?; + self.update_operation(&op).await?; return Err(err.into()); } }; @@ -83,7 +87,7 @@ impl OperationsManager { } op.status = OperationStatus::Succeeded; - self.persistence.write_operation(&op).await?; + self.update_operation(&op).await?; Ok(()) } @@ -95,6 +99,14 @@ impl OperationsManager { Ok(op) } + async fn update_operation(&self, op: &Operation) -> ApiResult<()> { + // Notify consumers about this status update. + // Errors are fine, just means nobody is listening + let _ = self.operation_sink.send(op.clone()); + self.persistence.write_operation(op).await?; + Ok(()) + } + async fn submit_transaction(&self, address: &str, tx: Vec) -> ApiResult { let mut transaction: Tx = minicbor::decode(&tx)?; self.signer diff --git a/firefly-cardanoconnect/src/routes/ws.rs b/firefly-cardanoconnect/src/routes/ws.rs index 4beacd9..1e16c52 100644 --- a/firefly-cardanoconnect/src/routes/ws.rs +++ b/firefly-cardanoconnect/src/routes/ws.rs @@ -12,7 +12,11 @@ use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; use tracing::{error, instrument, warn, Level}; -use crate::AppState; +use crate::{ + operations::Operation, + streams::{Batch, StreamMessage}, + AppState, +}; async fn handle_socket( AppState { stream_manager, .. }: AppState, @@ -28,56 +32,76 @@ async fn handle_socket( }; let mut subscription = stream_manager.subscribe(&topic).await?; - while let Some(batch) = subscription.recv().await { - let outgoing_batch = OutgoingBatch { - batch_number: batch.batch_number, - events: batch - .events - .iter() - .map(|e| Event { - listener_id: Some(e.id.listener_id.clone().into()), - signature: e.id.signature.clone(), - block_number: e.id.block_number, - block_hash: e.id.block_hash.clone(), - transaction_hash: e.id.transaction_hash.clone(), - transaction_index: e.id.transaction_index, - log_index: e.id.log_index, - timestamp: e.id.timestamp.map(systemtime_to_rfc3339), - data: e.data.clone(), - }) - .collect(), - }; - let outgoing_json = serde_json::to_string(&outgoing_batch)?; - socket.send(Message::Text(outgoing_json)).await?; + while let Some(message) = subscription.recv().await { + match message { + StreamMessage::Batch(batch) => send_batch(&mut socket, &topic, batch).await?, + StreamMessage::Operation(op) => send_operation(&mut socket, op).await?, + } + } + Ok(()) +} - let Some(response) = read_message(&mut socket).await? else { - bail!("socket was closed after sending a batch"); - }; - match response { - IncomingMessage::Ack(ack) => { - if ack.topic != topic { - bail!("client acked messages from the wrong topic"); - } - if ack.batch_number != batch.batch_number { - bail!("client acked the wrong batch"); - } - batch.ack(); +async fn send_batch(socket: &mut WebSocket, topic: &str, batch: Batch) -> Result<()> { + let outgoing_batch = OutgoingBatch { + batch_number: batch.batch_number, + events: batch + .events + .iter() + .map(|e| Event { + listener_id: Some(e.id.listener_id.clone().into()), + signature: e.id.signature.clone(), + block_number: e.id.block_number, + block_hash: e.id.block_hash.clone(), + transaction_hash: e.id.transaction_hash.clone(), + transaction_index: e.id.transaction_index, + log_index: e.id.log_index, + timestamp: e.id.timestamp.map(systemtime_to_rfc3339), + data: e.data.clone(), + }) + .collect(), + }; + let outgoing_json = serde_json::to_string(&outgoing_batch)?; + socket.send(Message::Text(outgoing_json)).await?; + + let Some(response) = read_message(socket).await? else { + bail!("socket was closed after sending a batch"); + }; + match response { + IncomingMessage::Ack(ack) => { + if ack.topic != topic { + bail!("client acked messages from the wrong topic"); } - IncomingMessage::Error(err) => { - if err.topic != topic { - bail!("client acked messages from the wrong topic"); - } - error!("client couldn't process batch: {}", err.message); - continue; + if ack.batch_number != batch.batch_number { + bail!("client acked the wrong batch"); } - other => { - bail!("unexpected response to batch! {:?}", other); + batch.ack(); + } + IncomingMessage::Error(err) => { + if err.topic != topic { + bail!("client acked messages from the wrong topic"); } + error!("client couldn't process batch: {}", err.message); + } + other => { + bail!("unexpected response to batch! {:?}", other); } } Ok(()) } +async fn send_operation(socket: &mut WebSocket, op: Operation) -> Result<()> { + let operation = OutgoingOperation { + headers: OperationHeaders { + request_id: op.id.to_string(), + type_: op.status.name().to_string(), + }, + transaction_hash: op.tx_id.clone(), + error_message: op.status.error_message().map(|m| m.to_string()), + }; + let outgoing_json = serde_json::to_string(&operation)?; + socket.send(Message::Text(outgoing_json)).await?; + Ok(()) +} fn systemtime_to_rfc3339(value: SystemTime) -> String { let date: DateTime = value.into(); date.to_rfc3339() @@ -158,6 +182,22 @@ struct OutgoingBatch { events: Vec, } +#[derive(Debug, Serialize)] +#[serde(rename_all = "camelCase")] +struct OutgoingOperation { + headers: OperationHeaders, + transaction_hash: Option, + error_message: Option, +} + +#[derive(Debug, Serialize)] +#[serde(rename_all = "camelCase")] +struct OperationHeaders { + request_id: String, + #[serde(rename = "type")] + type_: String, +} + pub async fn handle_socket_upgrade( State(app_state): State, ws: WebSocketUpgrade, diff --git a/firefly-cardanoconnect/src/streams.rs b/firefly-cardanoconnect/src/streams.rs index af02ec9..89609d1 100644 --- a/firefly-cardanoconnect/src/streams.rs +++ b/firefly-cardanoconnect/src/streams.rs @@ -4,4 +4,5 @@ mod mux; mod types; pub use manager::StreamManager; +pub use mux::{Batch, StreamMessage}; pub use types::*; diff --git a/firefly-cardanoconnect/src/streams/manager.rs b/firefly-cardanoconnect/src/streams/manager.rs index 3ff6c8e..d6e03ec 100644 --- a/firefly-cardanoconnect/src/streams/manager.rs +++ b/firefly-cardanoconnect/src/streams/manager.rs @@ -2,13 +2,13 @@ use std::{sync::Arc, time::Duration}; use anyhow::Result; use firefly_server::apitypes::{ApiError, ApiResult}; -use tokio::sync::mpsc; +use tokio::sync::broadcast; use ulid::Ulid; -use crate::{blockchain::BlockchainClient, persistence::Persistence}; +use crate::{blockchain::BlockchainClient, operations::Operation, persistence::Persistence}; use super::{ - mux::{Batch, Multiplexer}, + mux::{Multiplexer, StreamSubscription}, BlockReference, Listener, ListenerFilter, ListenerId, ListenerType, Stream, StreamId, }; @@ -21,10 +21,11 @@ impl StreamManager { pub async fn new( persistence: Arc, blockchain: Arc, + operation_sink: broadcast::Sender, ) -> Result { Ok(Self { persistence: persistence.clone(), - mux: Multiplexer::new(persistence, blockchain).await?, + mux: Multiplexer::new(persistence, blockchain, operation_sink).await?, }) } @@ -160,7 +161,7 @@ impl StreamManager { Ok(()) } - pub async fn subscribe(&self, topic: &str) -> Result> { + pub async fn subscribe(&self, topic: &str) -> Result { self.mux.subscribe(topic).await } } diff --git a/firefly-cardanoconnect/src/streams/mux.rs b/firefly-cardanoconnect/src/streams/mux.rs index 47be1d3..4b8915c 100644 --- a/firefly-cardanoconnect/src/streams/mux.rs +++ b/firefly-cardanoconnect/src/streams/mux.rs @@ -11,13 +11,14 @@ use firefly_server::apitypes::ToAnyhow; use serde_json::json; use tokio::{ select, - sync::{mpsc, oneshot}, + sync::{broadcast, mpsc, oneshot}, time, }; use tracing::warn; use crate::{ blockchain::BlockchainClient, + operations::Operation, persistence::Persistence, streams::{blockchain::ListenerEvent, EventId}, }; @@ -32,6 +33,7 @@ use super::{ pub struct Multiplexer { dispatchers: Arc>, stream_ids_by_topic: Arc>, + operation_sink: broadcast::Sender, persistence: Arc, data_source: Arc, } @@ -40,6 +42,7 @@ impl Multiplexer { pub async fn new( persistence: Arc, blockchain: Arc, + operation_sink: broadcast::Sender, ) -> Result { let data_source = Arc::new(DataSource::new(blockchain, persistence.clone())); @@ -49,13 +52,19 @@ impl Multiplexer { let topic = stream.name.clone(); stream_ids_by_topic.insert(topic.clone(), stream.id.clone()); - let dispatcher = - StreamDispatcher::new(&stream, persistence.clone(), data_source.clone()).await?; + let dispatcher = StreamDispatcher::new( + &stream, + persistence.clone(), + data_source.clone(), + operation_sink.clone(), + ) + .await?; dispatchers.insert(stream.id, dispatcher); } Ok(Self { dispatchers: Arc::new(dispatchers), stream_ids_by_topic: Arc::new(stream_ids_by_topic), + operation_sink, persistence, data_source, }) @@ -75,6 +84,7 @@ impl Multiplexer { stream, self.persistence.clone(), self.data_source.clone(), + self.operation_sink.clone(), ) .await?, ); @@ -121,7 +131,7 @@ impl Multiplexer { dispatcher.remove_listener(listener_id).await } - pub async fn subscribe(&self, topic: &str) -> Result> { + pub async fn subscribe(&self, topic: &str) -> Result { let Some(stream_id) = self.stream_ids_by_topic.get(topic) else { bail!("no stream found for topic {topic}"); }; @@ -134,6 +144,7 @@ impl Multiplexer { struct StreamDispatcher { state_change_sink: mpsc::Sender, + operation_sink: broadcast::Sender, } impl StreamDispatcher { @@ -141,6 +152,7 @@ impl StreamDispatcher { stream: &Stream, persistence: Arc, data_source: Arc, + operation_sink: broadcast::Sender, ) -> Result { let (state_change_sink, state_change_source) = mpsc::channel(16); @@ -181,7 +193,10 @@ impl StreamDispatcher { }; worker.run(state_change_source).await; }); - Ok(Self { state_change_sink }) + Ok(Self { + state_change_sink, + operation_sink, + }) } pub async fn update_settings(&self, stream: &Stream) -> Result<()> { @@ -224,13 +239,17 @@ impl StreamDispatcher { Ok(()) } - pub async fn subscribe(&self) -> Result> { + pub async fn subscribe(&self) -> Result { let (batch_sink, batch_source) = mpsc::channel(1); self.state_change_sink .send(StreamDispatcherStateChange::NewBatchSink(batch_sink)) .await .context("could not subscribe to stream")?; - Ok(batch_source) + let operation_source = self.operation_sink.subscribe(); + Ok(StreamSubscription { + batch_receiver: batch_source, + operation_receiver: operation_source, + }) } } @@ -607,3 +626,24 @@ impl Batch { let _ = self.ack_tx.send(()); } } + +pub struct StreamSubscription { + batch_receiver: mpsc::Receiver, + operation_receiver: broadcast::Receiver, +} + +impl StreamSubscription { + pub async fn recv(&mut self) -> Option { + select! { + biased; + Ok(op) = self.operation_receiver.recv() => Some(StreamMessage::Operation(op)), + Some(batch) = self.batch_receiver.recv() => Some(StreamMessage::Batch(batch)), + else => None + } + } +} + +pub enum StreamMessage { + Batch(Batch), + Operation(Operation), +}