Skip to content

Commit

Permalink
node: introduce query_as
Browse files Browse the repository at this point in the history
- Fix tests
- Fix queries
- Rename remove_block to remove_block_and_events
- Add hex serialization to the ArchivedEvent data field
- Remove TODO comment
  • Loading branch information
Neotamandua committed Nov 12, 2024
1 parent 389f4a0 commit 650fbd7
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 143 deletions.

This file was deleted.

2 changes: 1 addition & 1 deletion node/src/archive/archivist.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ impl<N: Network, DB: database::DB, VM: vm::VMExecution>
ArchivalData::DeletedBlock(blk_height, hex_blk_hash) => {
if let Err(e) = self
.archivist
.remove_block(blk_height, &hex_blk_hash)
.remove_block_and_events(blk_height, &hex_blk_hash)
.await
{
error!(
Expand Down
149 changes: 45 additions & 104 deletions node/src/archive/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use anyhow::Result;
use execution_core::signatures::bls::PublicKey as AccountPublicKey;
use node_data::events::contract::ContractTxEvent;
use node_data::ledger::Hash;
use serde_json::json;
use sqlx::{
sqlite::{SqliteConnectOptions, SqlitePool},
Pool, Sqlite,
Expand Down Expand Up @@ -63,7 +62,7 @@ impl Archive {
) -> Result<String> {
let mut conn = self.sqlite_archive.acquire().await?;

let events = sqlx::query!(
let events = sqlx::query_as!(data::ArchivedEvent,
r#"SELECT origin, topic, source, data FROM unfinalized_events WHERE block_height = ?
UNION ALL
SELECT origin, topic, source, data FROM finalized_events WHERE block_height = ? AND NOT EXISTS (SELECT 1 FROM unfinalized_events WHERE block_height = ?)
Expand All @@ -73,19 +72,7 @@ impl Archive {

// Convert the event related row fields from finalized_events table to
// json string
let json = json!(events
.into_iter()
.map(|record| {
json!({
"origin": record.origin,
"topic": record.topic,
"source": record.source,
"data": record.data,
})
})
.collect::<Vec<_>>());

Ok(json.to_string())
Ok(serde_json::to_string(&events)?)
}

/// Fetch all vm events from the last finalized block and return them as a
Expand All @@ -95,7 +82,7 @@ impl Archive {

// Get the last finalized block height by getting all the events from
// the largest block height
let events = sqlx::query!(
let events = sqlx::query_as!(data::ArchivedEvent,
r#"
SELECT origin, topic, source, data FROM finalized_events
WHERE block_height = (SELECT MAX(block_height) FROM finalized_events)
Expand All @@ -106,19 +93,7 @@ impl Archive {

// Convert the event related row fields from finalized_events table to
// json string
let json = json!(events
.into_iter()
.map(|record| {
json!({
"origin": record.origin,
"topic": record.topic,
"source": record.source,
"data": record.data,
})
})
.collect::<Vec<_>>());

Ok(json.to_string())
Ok(serde_json::to_string(&events)?)
}

/// Fetch all vm events from a given block hash and return them as a json
Expand All @@ -129,7 +104,7 @@ impl Archive {
) -> Result<String> {
let mut conn = self.sqlite_archive.acquire().await?;

let events = sqlx::query!(
let events = sqlx::query_as!(data::ArchivedEvent,
r#"SELECT origin, topic, source, data FROM unfinalized_events WHERE block_hash = ?
UNION ALL
SELECT origin, topic, source, data FROM finalized_events WHERE block_hash = ? AND NOT EXISTS (SELECT 1 FROM unfinalized_events WHERE block_hash = ?)
Expand All @@ -139,19 +114,7 @@ impl Archive {

// Convert the event related row fields from finalized_events table to
// json string
let json = json!(events
.into_iter()
.map(|row| {
json!({
"origin": row.origin,
"topic": row.topic,
"source": row.source,
"data": row.data,
})
})
.collect::<Vec<_>>());

Ok(json.to_string())
Ok(serde_json::to_string(&events)?)
}

/// Fetch all vm events from a given block hash
Expand All @@ -161,7 +124,7 @@ impl Archive {
) -> Result<Vec<ContractTxEvent>> {
let mut conn = self.sqlite_archive.acquire().await?;

let records = sqlx::query!(
let events = sqlx::query_as!(data::ArchivedEvent,
r#"SELECT origin, topic, source, data FROM unfinalized_events WHERE block_hash = ?
UNION ALL
SELECT origin, topic, source, data FROM finalized_events WHERE block_hash = ? AND NOT EXISTS (SELECT 1 FROM unfinalized_events WHERE block_hash = ?)
Expand All @@ -172,13 +135,7 @@ impl Archive {
// Convert the event related row fields from finalized_events table to
// data::Events and then to ContractTxEvent through into()
let mut contract_tx_events = Vec::new();
for row in records {
let event = data::ArchivedEvent {
origin: row.origin,
topic: row.topic,
source: row.source,
data: row.data,
};
for event in events {
let contract_tx_event: ContractTxEvent = event.try_into()?;
contract_tx_events.push(contract_tx_event);
}
Expand All @@ -196,20 +153,15 @@ impl Archive {
let mut conn = self.sqlite_archive.acquire().await?;

// query all events now that we have the block height
let records = sqlx::query!(
r#"SELECT origin, topic, source, data FROM finalized_events WHERE block_height = ?"#,
block_height
let records = sqlx::query_as!(data::ArchivedEvent,
r#"SELECT origin, topic, source, data FROM unfinalized_events WHERE block_height = ?
UNION ALL
SELECT origin, topic, source, data FROM finalized_events WHERE block_height = ? AND NOT EXISTS (SELECT 1 FROM unfinalized_events WHERE block_height = ?)
"#,
block_height, block_height, block_height
).fetch_all(&mut *conn).await?;

Ok(records
.into_iter()
.map(|record| data::ArchivedEvent {
origin: record.origin,
topic: record.topic,
source: record.source,
data: record.data,
})
.collect())
Ok(records)
}

/// Fetch the last finalized block height and block hash
Expand All @@ -232,22 +184,15 @@ impl Archive {
) -> Result<Vec<data::ArchivedEvent>> {
let mut conn = self.sqlite_archive.acquire().await?;

let records = sqlx::query!(
let records = sqlx::query_as!(
data::ArchivedEvent,
r#"SELECT origin, topic, source, data FROM finalized_events WHERE source = ?"#,
contract_id
)
.fetch_all(&mut *conn)
.await?;

Ok(records
.into_iter()
.map(|record| data::ArchivedEvent {
origin: record.origin,
topic: record.topic,
source: record.source,
data: record.data,
})
.collect())
Ok(records)
}

/// Fetch all unfinalized vm events from a given block hash
Expand All @@ -257,21 +202,15 @@ impl Archive {
) -> Result<Vec<ContractTxEvent>> {
let mut conn = self.sqlite_archive.acquire().await?;

let records = sqlx::query!(
let unfinalized_events = sqlx::query_as!(data::ArchivedEvent,
r#"SELECT origin, topic, source, data FROM unfinalized_events WHERE block_hash = ?"#,
hex_block_hash
)
.fetch_all(&mut *conn)
.await?;

let mut contract_tx_events = Vec::new();
for row in records {
let event = data::ArchivedEvent {
origin: row.origin,
topic: row.topic,
source: row.source,
data: row.data,
};
for event in unfinalized_events {
let contract_tx_event: ContractTxEvent = event.try_into()?;
contract_tx_events.push(contract_tx_event);
}
Expand Down Expand Up @@ -336,13 +275,11 @@ impl Archive {
data: event.event.data,
};

let mut conn = self.sqlite_archive.acquire().await?;

sqlx::query!(
r#"INSERT INTO unfinalized_events (block_height, block_hash, origin, topic, source, data) VALUES (?, ?, ?, ?, ?, ?)"#,
block_height, hex_block_hash, event.origin, event.topic, event.source, event.data
)
.execute(&mut *conn)
.execute(&mut *tx)
.await?;
}

Expand All @@ -367,8 +304,6 @@ impl Archive {
current_block_height: u64,
hex_block_hash: &str,
) -> Result<()> {
let current_block_height: i64 = current_block_height as i64;

let mut tx = self.sqlite_archive.begin().await?;

// Get the row for the block with the given hash that got finalized
Expand All @@ -395,13 +330,21 @@ impl Archive {
finalized_block_height as u64,
);

// TODO Categorize events?
// TODO: We can categorize grouped_events at one point here too and add
// this data to another table

sqlx::query!(
r#"INSERT INTO finalized_blocks (block_height, block_hash) VALUES (?, ?)"#,
finalized_block_height, hex_block_hash
).execute(&mut *tx).await?;

sqlx::query!(
r#"DELETE FROM unfinalized_events WHERE block_hash = ?"#,
hex_block_hash
)
.execute(&mut *tx)
.await?;

sqlx::query!(
r#"DELETE FROM unfinalized_blocks WHERE block_hash = ?"#,
hex_block_hash
Expand All @@ -415,18 +358,11 @@ impl Archive {
let origin = hex::encode(ident.origin());

for event in events {
// TODO: make conversion easier or remove it and insert directly
// into query
let event = data::ArchivedEvent {
origin: origin.clone(),
topic: event.topic.clone(),
source: event.target.0.to_string(),
data: event.data.clone(),
};
let source = event.target.0.to_string();

sqlx::query!(
r#"INSERT INTO finalized_events (block_height, block_hash, origin, topic, source, data) VALUES (?, ?, ?, ?, ?, ?)"#,
finalized_block_height, hex_block_hash, event.origin, event.topic, event.source, event.data
finalized_block_height, hex_block_hash, origin, event.topic, source, event.data
)
.execute(&mut *tx)
.await?;
Expand All @@ -435,7 +371,7 @@ impl Archive {

// Commit the transaction
tx.commit().await?;

let current_block_height: i64 = current_block_height as i64;
info!(
"Marked block {} with height {} as finalized. After {} blocks at height {}",
util::truncate_string(hex_block_hash),
Expand All @@ -450,8 +386,9 @@ impl Archive {
Ok(())
}

/// Remove the block of the given hash from the archive.
pub(super) async fn remove_block(
/// Remove the unfinalized block together with the unfinalized events of the
/// given hash from the archive.
pub(super) async fn remove_block_and_events(
&self,
current_block_height: u64,
hex_block_hash: &str,
Expand Down Expand Up @@ -500,6 +437,8 @@ mod data {
use node_data::events::contract::{
ContractEvent, ContractTxEvent, ORIGIN_HASH_BYTES,
};
use serde::{Deserialize, Serialize};
use sqlx::FromRow;

/// Archived ContractTxEvent
///
Expand All @@ -512,10 +451,13 @@ mod data {
/// - `source`: The source field is the hex encoded contract id of the
/// event.
/// - `data`: The data field is the data of the event.
#[serde_with::serde_as]
#[derive(Debug, Clone, FromRow, Serialize, Deserialize)]
pub struct ArchivedEvent {
pub origin: String,
pub topic: String,
pub source: String,
#[serde_as(as = "serde_with::hex::Hex")]
pub data: Vec<u8>,
}

Expand Down Expand Up @@ -679,13 +621,12 @@ mod tests {
.unwrap();

assert!(archive
.remove_block(blk_height, &hex_blk_hash)
.remove_block_and_events(blk_height, &hex_blk_hash)
.await
.unwrap());

let fetched_events = archive.fetch_vm_events(blk_height).await;

assert!(fetched_events.is_err());
let fetched_events = archive.fetch_vm_events(blk_height).await.unwrap();
assert!(fetched_events.is_empty());

archive
.store_unfinalized_vm_events(blk_height, blk_hash, events.clone())
Expand All @@ -698,7 +639,7 @@ mod tests {
.unwrap();

assert!(!archive
.remove_block(blk_height, &hex_blk_hash)
.remove_block_and_events(blk_height, &hex_blk_hash)
.await
.unwrap());

Expand Down

0 comments on commit 650fbd7

Please sign in to comment.