Skip to content

Commit

Permalink
feat: send websocket events for operation updates
Browse files Browse the repository at this point in the history
  • Loading branch information
SupernaviX committed Dec 9, 2024
1 parent 0faf5cc commit efd2e46
Show file tree
Hide file tree
Showing 6 changed files with 161 additions and 61 deletions.
8 changes: 7 additions & 1 deletion firefly-cardanoconnect/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use routes::{
};
use signer::CardanoSigner;
use streams::StreamManager;
use tokio::sync::broadcast;
use tracing::instrument;

mod blockchain;
Expand Down Expand Up @@ -75,18 +76,23 @@ async fn init_state(config: &CardanoConnectConfig, mock_data: bool) -> Result<Ap
} else {
Arc::new(ContractManager::none())
};

let operation_sink = broadcast::Sender::new(1024);
let operations = Arc::new(OperationsManager::new(
blockchain.clone(),
contracts,
persistence.clone(),
signer.clone(),
operation_sink.clone(),
));

let state = AppState {
blockchain: blockchain.clone(),
operations,
signer,
stream_manager: Arc::new(StreamManager::new(persistence, blockchain).await?),
stream_manager: Arc::new(
StreamManager::new(persistence, blockchain, operation_sink).await?,
),
};

Ok(state)
Expand Down
24 changes: 18 additions & 6 deletions firefly-cardanoconnect/src/operations/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use anyhow::Context;
use firefly_server::apitypes::{ApiError, ApiResult};
use pallas_primitives::conway::Tx;
use serde_json::Value;
use tokio::sync::broadcast;

use crate::{
blockchain::BlockchainClient, contracts::ContractManager, persistence::Persistence,
Expand All @@ -17,6 +18,7 @@ pub struct OperationsManager {
contracts: Arc<ContractManager>,
persistence: Arc<dyn Persistence>,
signer: Arc<CardanoSigner>,
operation_sink: broadcast::Sender<Operation>,
}

impl OperationsManager {
Expand All @@ -25,12 +27,14 @@ impl OperationsManager {
contracts: Arc<ContractManager>,
persistence: Arc<dyn Persistence>,
signer: Arc<CardanoSigner>,
operation_sink: broadcast::Sender<Operation>,
) -> Self {
Self {
blockchain,
contracts,
persistence,
signer,
operation_sink,
}
}

Expand All @@ -40,16 +44,16 @@ impl OperationsManager {
status: OperationStatus::Pending,
tx_id: None,
};
self.persistence.write_operation(&op).await?;
self.update_operation(&op).await?;
match self.contracts.deploy(name, contract).await {
Ok(()) => {
op.status = OperationStatus::Succeeded;
self.persistence.write_operation(&op).await?;
self.update_operation(&op).await?;
Ok(())
}
Err(err) => {
op.status = OperationStatus::Failed(err.to_string());
self.persistence.write_operation(&op).await?;
self.update_operation(&op).await?;
Err(err.into())
}
}
Expand All @@ -68,13 +72,13 @@ impl OperationsManager {
status: OperationStatus::Pending,
tx_id: None,
};
self.persistence.write_operation(&op).await?;
self.update_operation(&op).await?;
let result = self.contracts.invoke(contract, method, params).await;
let value = match result {
Ok(v) => v,
Err(err) => {
op.status = OperationStatus::Failed(err.to_string());
self.persistence.write_operation(&op).await?;
self.update_operation(&op).await?;
return Err(err.into());
}
};
Expand All @@ -83,7 +87,7 @@ impl OperationsManager {
}

op.status = OperationStatus::Succeeded;
self.persistence.write_operation(&op).await?;
self.update_operation(&op).await?;

Ok(())
}
Expand All @@ -95,6 +99,14 @@ impl OperationsManager {
Ok(op)
}

async fn update_operation(&self, op: &Operation) -> ApiResult<()> {
// Notify consumers about this status update.
// Errors are fine, just means nobody is listening
let _ = self.operation_sink.send(op.clone());
self.persistence.write_operation(op).await?;
Ok(())
}

async fn submit_transaction(&self, address: &str, tx: Vec<u8>) -> ApiResult<String> {
let mut transaction: Tx = minicbor::decode(&tx)?;
self.signer
Expand Down
124 changes: 82 additions & 42 deletions firefly-cardanoconnect/src/routes/ws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,11 @@ use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use tracing::{error, instrument, warn, Level};

use crate::AppState;
use crate::{
operations::Operation,
streams::{Batch, StreamMessage},
AppState,
};

async fn handle_socket(
AppState { stream_manager, .. }: AppState,
Expand All @@ -28,56 +32,76 @@ async fn handle_socket(
};
let mut subscription = stream_manager.subscribe(&topic).await?;

while let Some(batch) = subscription.recv().await {
let outgoing_batch = OutgoingBatch {
batch_number: batch.batch_number,
events: batch
.events
.iter()
.map(|e| Event {
listener_id: Some(e.id.listener_id.clone().into()),
signature: e.id.signature.clone(),
block_number: e.id.block_number,
block_hash: e.id.block_hash.clone(),
transaction_hash: e.id.transaction_hash.clone(),
transaction_index: e.id.transaction_index,
log_index: e.id.log_index,
timestamp: e.id.timestamp.map(systemtime_to_rfc3339),
data: e.data.clone(),
})
.collect(),
};
let outgoing_json = serde_json::to_string(&outgoing_batch)?;
socket.send(Message::Text(outgoing_json)).await?;
while let Some(message) = subscription.recv().await {
match message {
StreamMessage::Batch(batch) => send_batch(&mut socket, &topic, batch).await?,
StreamMessage::Operation(op) => send_operation(&mut socket, op).await?,
}
}
Ok(())
}

let Some(response) = read_message(&mut socket).await? else {
bail!("socket was closed after sending a batch");
};
match response {
IncomingMessage::Ack(ack) => {
if ack.topic != topic {
bail!("client acked messages from the wrong topic");
}
if ack.batch_number != batch.batch_number {
bail!("client acked the wrong batch");
}
batch.ack();
async fn send_batch(socket: &mut WebSocket, topic: &str, batch: Batch) -> Result<()> {
let outgoing_batch = OutgoingBatch {
batch_number: batch.batch_number,
events: batch
.events
.iter()
.map(|e| Event {
listener_id: Some(e.id.listener_id.clone().into()),
signature: e.id.signature.clone(),
block_number: e.id.block_number,
block_hash: e.id.block_hash.clone(),
transaction_hash: e.id.transaction_hash.clone(),
transaction_index: e.id.transaction_index,
log_index: e.id.log_index,
timestamp: e.id.timestamp.map(systemtime_to_rfc3339),
data: e.data.clone(),
})
.collect(),
};
let outgoing_json = serde_json::to_string(&outgoing_batch)?;
socket.send(Message::Text(outgoing_json)).await?;

let Some(response) = read_message(socket).await? else {
bail!("socket was closed after sending a batch");
};
match response {
IncomingMessage::Ack(ack) => {
if ack.topic != topic {
bail!("client acked messages from the wrong topic");
}
IncomingMessage::Error(err) => {
if err.topic != topic {
bail!("client acked messages from the wrong topic");
}
error!("client couldn't process batch: {}", err.message);
continue;
if ack.batch_number != batch.batch_number {
bail!("client acked the wrong batch");
}
other => {
bail!("unexpected response to batch! {:?}", other);
batch.ack();
}
IncomingMessage::Error(err) => {
if err.topic != topic {
bail!("client acked messages from the wrong topic");
}
error!("client couldn't process batch: {}", err.message);
}
other => {
bail!("unexpected response to batch! {:?}", other);
}
}
Ok(())
}

async fn send_operation(socket: &mut WebSocket, op: Operation) -> Result<()> {
let operation = OutgoingOperation {
headers: OperationHeaders {
request_id: op.id.to_string(),
type_: op.status.name().to_string(),
},
transaction_hash: op.tx_id.clone(),
error_message: op.status.error_message().map(|m| m.to_string()),
};
let outgoing_json = serde_json::to_string(&operation)?;
socket.send(Message::Text(outgoing_json)).await?;
Ok(())
}
fn systemtime_to_rfc3339(value: SystemTime) -> String {
let date: DateTime<Utc> = value.into();
date.to_rfc3339()
Expand Down Expand Up @@ -158,6 +182,22 @@ struct OutgoingBatch {
events: Vec<Event>,
}

#[derive(Debug, Serialize)]
#[serde(rename_all = "camelCase")]
struct OutgoingOperation {
headers: OperationHeaders,
transaction_hash: Option<String>,
error_message: Option<String>,
}

#[derive(Debug, Serialize)]
#[serde(rename_all = "camelCase")]
struct OperationHeaders {
request_id: String,
#[serde(rename = "type")]
type_: String,
}

pub async fn handle_socket_upgrade(
State(app_state): State<AppState>,
ws: WebSocketUpgrade,
Expand Down
1 change: 1 addition & 0 deletions firefly-cardanoconnect/src/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ mod mux;
mod types;

pub use manager::StreamManager;
pub use mux::{Batch, StreamMessage};
pub use types::*;
11 changes: 6 additions & 5 deletions firefly-cardanoconnect/src/streams/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ use std::{sync::Arc, time::Duration};

use anyhow::Result;
use firefly_server::apitypes::{ApiError, ApiResult};
use tokio::sync::mpsc;
use tokio::sync::broadcast;
use ulid::Ulid;

use crate::{blockchain::BlockchainClient, persistence::Persistence};
use crate::{blockchain::BlockchainClient, operations::Operation, persistence::Persistence};

use super::{
mux::{Batch, Multiplexer},
mux::{Multiplexer, StreamSubscription},
BlockReference, Listener, ListenerFilter, ListenerId, ListenerType, Stream, StreamId,
};

Expand All @@ -21,10 +21,11 @@ impl StreamManager {
pub async fn new(
persistence: Arc<dyn Persistence>,
blockchain: Arc<BlockchainClient>,
operation_sink: broadcast::Sender<Operation>,
) -> Result<Self> {
Ok(Self {
persistence: persistence.clone(),
mux: Multiplexer::new(persistence, blockchain).await?,
mux: Multiplexer::new(persistence, blockchain, operation_sink).await?,
})
}

Expand Down Expand Up @@ -160,7 +161,7 @@ impl StreamManager {
Ok(())
}

pub async fn subscribe(&self, topic: &str) -> Result<mpsc::Receiver<Batch>> {
pub async fn subscribe(&self, topic: &str) -> Result<StreamSubscription> {
self.mux.subscribe(topic).await
}
}
Loading

0 comments on commit efd2e46

Please sign in to comment.