Skip to content

Commit

Permalink
fix: store TXs, not block
Browse files Browse the repository at this point in the history
  • Loading branch information
SupernaviX committed Nov 18, 2024
1 parent b706578 commit 11caf9a
Show file tree
Hide file tree
Showing 9 changed files with 608 additions and 604 deletions.
1,152 changes: 562 additions & 590 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion firefly-cardanoconnect/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ aide = { version = "0.13", features = ["axum"] }
anyhow = "1"
async-trait = "0.1"
axum = { version = "0.7", features = ["macros", "ws"] }
balius-runtime = { git = "https://github.com/SupernaviX/balius.git", rev = "5f0c81a" }
balius-runtime = { git = "https://github.com/SupernaviX/balius.git", rev = "3a899f9" }
blockfrost = { git = "https://github.com/blockfrost/blockfrost-rust.git", rev = "14e22b5", default-features = false, features = ["rustls-tls"] }
blockfrost-openapi = "0.1.69"
clap = { version = "4", features = ["derive"] }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ CREATE TABLE "block_records" (
"block_slot" BIGINT NULL,
"block_hash" TEXT NOT NULL,
"parent_hash" TEXT NULL,
"cbor" BLOB NOT NULL,
"transaction_hashes" TEXT NOT NULL,
"rolled_back" TINYINT NOT NULL,
FOREIGN KEY ("listener_id") REFERENCES "listeners" ("id") ON DELETE CASCADE
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
ALTER TABLE "block_records"
ADD COLUMN "transactions" BLOB NOT NULL
DEFAULT x'80'
16 changes: 15 additions & 1 deletion firefly-cardanoconnect/src/blockchain/blockfrost.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use std::{collections::VecDeque, time::Duration};

use anyhow::{bail, Result};
use anyhow::{bail, Context as _, Result};
use async_trait::async_trait;
use blockfrost::{
BlockFrostSettings, BlockfrostAPI, BlockfrostError, BlockfrostResult, Pagination,
};
use blockfrost_openapi::models::BlockContent;
use futures::future::try_join_all;
use pallas_primitives::conway::Tx;
use tokio::time;

Expand Down Expand Up @@ -272,12 +273,16 @@ async fn parse_block(api: &BlockfrostAPI, block: BlockContent) -> Result<BlockIn

let transaction_hashes = api.blocks_txs(&block_hash, Pagination::all()).await?;

let tx_body_requests = transaction_hashes.iter().map(|hash| fetch_tx(api, hash));
let transactions = try_join_all(tx_body_requests).await?;

let info = BlockInfo {
block_hash,
block_height,
block_slot,
parent_hash: block.previous_block,
transaction_hashes,
transactions,
};
Ok(info)
}
Expand Down Expand Up @@ -307,3 +312,12 @@ impl<T> BlockfrostResultExt for BlockfrostResult<T> {
}
}
}

async fn fetch_tx(blockfrost: &BlockfrostAPI, hash: &str) -> Result<Vec<u8>> {
let tx_body = blockfrost
.transactions_cbor(hash)
.await
.context("could not fetch tx body")?;
let bytes = hex::decode(&tx_body.cbor)?;
Ok(bytes)
}
2 changes: 1 addition & 1 deletion firefly-cardanoconnect/src/blockchain/mocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,8 @@ impl MockChain {
block_slot: block_height,
block_hash: Self::generate_hash(rng),
parent_hash,
cbor: vec![],
transaction_hashes,
transactions: vec![],
};
indexes.insert(block.block_hash.clone(), chain.len());
chain.push(block);
Expand Down
8 changes: 7 additions & 1 deletion firefly-cardanoconnect/src/blockchain/n2c.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,12 @@ impl N2cChainSync {

let block_hash = hex::encode(block.hash());
let parent_hash = block.header().previous_hash().map(hex::encode);
let mut transaction_hashes = vec![];
let mut transactions = vec![];
for tx in block.txs() {
transaction_hashes.push(hex::encode(tx.hash()));
transactions.push(tx.encode());
}
let transaction_hashes = block
.txs()
.iter()
Expand All @@ -243,8 +249,8 @@ impl N2cChainSync {
block_slot: Some(block_slot),
block_hash,
parent_hash,
cbor: content.0,
transaction_hashes,
transactions,
})
}
}
Expand Down
26 changes: 18 additions & 8 deletions firefly-cardanoconnect/src/persistence/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ impl Persistence for SqlitePersistence {
self.conn
.call_unwrap(move |c| {
c.prepare_cached(
"SELECT block_height, block_slot, block_hash, parent_hash, cbor, transaction_hashes, rolled_back
"SELECT block_height, block_slot, block_hash, parent_hash, transaction_hashes, transactions, rolled_back
FROM block_records
WHERE listener_id = ?1
ORDER BY id",
Expand All @@ -286,7 +286,7 @@ impl Persistence for SqlitePersistence {
self.conn.call_unwrap(move |c| {
let tx = c.transaction()?;
let mut insert = tx.prepare_cached(
"INSERT INTO block_records (listener_id, block_height, block_slot, block_hash, parent_hash, cbor, transaction_hashes, rolled_back)
"INSERT INTO block_records (listener_id, block_height, block_slot, block_hash, parent_hash, transaction_hashes, transactions, rolled_back)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
)?;

Expand All @@ -295,11 +295,15 @@ impl Persistence for SqlitePersistence {
let block_slot = record.block.block_slot;
let block_hash = record.block.block_hash.clone();
let parent_hash = record.block.parent_hash.clone();
let cbor = record.block.cbor;
let transaction_hashes = serde_json::to_string(&record.block.transaction_hashes)?;
let transactions = {
let mut bytes = vec![];
minicbor::encode(&record.block.transactions, &mut bytes).expect("infallible");
bytes
};
let rolled_back = record.rolled_back;

insert.execute(params![listener_id, block_height, block_slot, block_hash, parent_hash, cbor, transaction_hashes, rolled_back])?;
insert.execute(params![listener_id, block_height, block_slot, block_hash, parent_hash, transaction_hashes, transactions, rolled_back])?;
}

drop(insert);
Expand Down Expand Up @@ -396,17 +400,23 @@ fn parse_block_record(row: &Row) -> Result<BlockRecord> {
let block_slot: Option<u64> = row.get("block_slot")?;
let block_hash: String = row.get("block_hash")?;
let parent_hash: Option<String> = row.get("parent_hash")?;
let cbor: Vec<u8> = row.get("cbor")?;
let transaction_hashes: String = row.get("transaction_hashes")?;
let transaction_hashes = {
let raw: String = row.get("transaction_hashes")?;
serde_json::from_str(&raw)?
};
let transactions = {
let raw: Vec<u8> = row.get("transactions")?;
minicbor::decode(&raw)?
};
let rolled_back: bool = row.get("rolled_back")?;
Ok(BlockRecord {
block: BlockInfo {
block_height,
block_slot,
block_hash,
parent_hash,
cbor,
transaction_hashes: serde_json::from_str(&transaction_hashes)?,
transaction_hashes,
transactions,
},
rolled_back,
})
Expand Down
2 changes: 1 addition & 1 deletion firefly-cardanoconnect/src/streams/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@ pub struct BlockInfo {
pub block_slot: Option<u64>,
pub block_hash: String,
pub parent_hash: Option<String>,
pub cbor: Vec<u8>,
pub transaction_hashes: Vec<String>,
pub transactions: Vec<Vec<u8>>,
}
impl BlockInfo {
pub fn as_reference(&self) -> BlockReference {
Expand Down

0 comments on commit 11caf9a

Please sign in to comment.