From 650fbd7cc22d0294d906178d39eb4b91cc5f650a Mon Sep 17 00:00:00 2001 From: Neotamandua <107320179+Neotamandua@users.noreply.github.com> Date: Tue, 12 Nov 2024 13:44:28 +0200 Subject: [PATCH] node: introduce query_as - Fix tests - Fix queries - Rename remove_block to remove_block_and_events - Add hex serialization to the ArchivedEvent data field - Remove TODO comment --- ...6f88395b35949d9a213891f7eb1b0ccbc2fff.json | 38 ----- node/src/archive/archivist.rs | 2 +- node/src/archive/sqlite.rs | 149 ++++++------------ 3 files changed, 46 insertions(+), 143 deletions(-) delete mode 100644 node/.sqlx/query-4c90fe749d3dbd3f04844450d966f88395b35949d9a213891f7eb1b0ccbc2fff.json diff --git a/node/.sqlx/query-4c90fe749d3dbd3f04844450d966f88395b35949d9a213891f7eb1b0ccbc2fff.json b/node/.sqlx/query-4c90fe749d3dbd3f04844450d966f88395b35949d9a213891f7eb1b0ccbc2fff.json deleted file mode 100644 index 00753fe8e..000000000 --- a/node/.sqlx/query-4c90fe749d3dbd3f04844450d966f88395b35949d9a213891f7eb1b0ccbc2fff.json +++ /dev/null @@ -1,38 +0,0 @@ -{ - "db_name": "SQLite", - "query": "SELECT origin, topic, source, data FROM finalized_events WHERE block_height = ?", - "describe": { - "columns": [ - { - "name": "origin", - "ordinal": 0, - "type_info": "Text" - }, - { - "name": "topic", - "ordinal": 1, - "type_info": "Text" - }, - { - "name": "source", - "ordinal": 2, - "type_info": "Text" - }, - { - "name": "data", - "ordinal": 3, - "type_info": "Blob" - } - ], - "parameters": { - "Right": 1 - }, - "nullable": [ - false, - false, - false, - false - ] - }, - "hash": "4c90fe749d3dbd3f04844450d966f88395b35949d9a213891f7eb1b0ccbc2fff" -} diff --git a/node/src/archive/archivist.rs b/node/src/archive/archivist.rs index bdb85ffc8..c2b02749a 100644 --- a/node/src/archive/archivist.rs +++ b/node/src/archive/archivist.rs @@ -54,7 +54,7 @@ impl ArchivalData::DeletedBlock(blk_height, hex_blk_hash) => { if let Err(e) = self .archivist - .remove_block(blk_height, &hex_blk_hash) + .remove_block_and_events(blk_height, &hex_blk_hash) .await { error!( diff --git a/node/src/archive/sqlite.rs b/node/src/archive/sqlite.rs index dbd93599e..9742e3e84 100644 --- a/node/src/archive/sqlite.rs +++ b/node/src/archive/sqlite.rs @@ -10,7 +10,6 @@ use anyhow::Result; use execution_core::signatures::bls::PublicKey as AccountPublicKey; use node_data::events::contract::ContractTxEvent; use node_data::ledger::Hash; -use serde_json::json; use sqlx::{ sqlite::{SqliteConnectOptions, SqlitePool}, Pool, Sqlite, @@ -63,7 +62,7 @@ impl Archive { ) -> Result { let mut conn = self.sqlite_archive.acquire().await?; - let events = sqlx::query!( + let events = sqlx::query_as!(data::ArchivedEvent, r#"SELECT origin, topic, source, data FROM unfinalized_events WHERE block_height = ? UNION ALL SELECT origin, topic, source, data FROM finalized_events WHERE block_height = ? AND NOT EXISTS (SELECT 1 FROM unfinalized_events WHERE block_height = ?) @@ -73,19 +72,7 @@ impl Archive { // Convert the event related row fields from finalized_events table to // json string - let json = json!(events - .into_iter() - .map(|record| { - json!({ - "origin": record.origin, - "topic": record.topic, - "source": record.source, - "data": record.data, - }) - }) - .collect::>()); - - Ok(json.to_string()) + Ok(serde_json::to_string(&events)?) } /// Fetch all vm events from the last finalized block and return them as a @@ -95,7 +82,7 @@ impl Archive { // Get the last finalized block height by getting all the events from // the largest block height - let events = sqlx::query!( + let events = sqlx::query_as!(data::ArchivedEvent, r#" SELECT origin, topic, source, data FROM finalized_events WHERE block_height = (SELECT MAX(block_height) FROM finalized_events) @@ -106,19 +93,7 @@ impl Archive { // Convert the event related row fields from finalized_events table to // json string - let json = json!(events - .into_iter() - .map(|record| { - json!({ - "origin": record.origin, - "topic": record.topic, - "source": record.source, - "data": record.data, - }) - }) - .collect::>()); - - Ok(json.to_string()) + Ok(serde_json::to_string(&events)?) } /// Fetch all vm events from a given block hash and return them as a json @@ -129,7 +104,7 @@ impl Archive { ) -> Result { let mut conn = self.sqlite_archive.acquire().await?; - let events = sqlx::query!( + let events = sqlx::query_as!(data::ArchivedEvent, r#"SELECT origin, topic, source, data FROM unfinalized_events WHERE block_hash = ? UNION ALL SELECT origin, topic, source, data FROM finalized_events WHERE block_hash = ? AND NOT EXISTS (SELECT 1 FROM unfinalized_events WHERE block_hash = ?) @@ -139,19 +114,7 @@ impl Archive { // Convert the event related row fields from finalized_events table to // json string - let json = json!(events - .into_iter() - .map(|row| { - json!({ - "origin": row.origin, - "topic": row.topic, - "source": row.source, - "data": row.data, - }) - }) - .collect::>()); - - Ok(json.to_string()) + Ok(serde_json::to_string(&events)?) } /// Fetch all vm events from a given block hash @@ -161,7 +124,7 @@ impl Archive { ) -> Result> { let mut conn = self.sqlite_archive.acquire().await?; - let records = sqlx::query!( + let events = sqlx::query_as!(data::ArchivedEvent, r#"SELECT origin, topic, source, data FROM unfinalized_events WHERE block_hash = ? UNION ALL SELECT origin, topic, source, data FROM finalized_events WHERE block_hash = ? AND NOT EXISTS (SELECT 1 FROM unfinalized_events WHERE block_hash = ?) @@ -172,13 +135,7 @@ impl Archive { // Convert the event related row fields from finalized_events table to // data::Events and then to ContractTxEvent through into() let mut contract_tx_events = Vec::new(); - for row in records { - let event = data::ArchivedEvent { - origin: row.origin, - topic: row.topic, - source: row.source, - data: row.data, - }; + for event in events { let contract_tx_event: ContractTxEvent = event.try_into()?; contract_tx_events.push(contract_tx_event); } @@ -196,20 +153,15 @@ impl Archive { let mut conn = self.sqlite_archive.acquire().await?; // query all events now that we have the block height - let records = sqlx::query!( - r#"SELECT origin, topic, source, data FROM finalized_events WHERE block_height = ?"#, - block_height + let records = sqlx::query_as!(data::ArchivedEvent, + r#"SELECT origin, topic, source, data FROM unfinalized_events WHERE block_height = ? + UNION ALL + SELECT origin, topic, source, data FROM finalized_events WHERE block_height = ? AND NOT EXISTS (SELECT 1 FROM unfinalized_events WHERE block_height = ?) + "#, + block_height, block_height, block_height ).fetch_all(&mut *conn).await?; - Ok(records - .into_iter() - .map(|record| data::ArchivedEvent { - origin: record.origin, - topic: record.topic, - source: record.source, - data: record.data, - }) - .collect()) + Ok(records) } /// Fetch the last finalized block height and block hash @@ -232,22 +184,15 @@ impl Archive { ) -> Result> { let mut conn = self.sqlite_archive.acquire().await?; - let records = sqlx::query!( + let records = sqlx::query_as!( + data::ArchivedEvent, r#"SELECT origin, topic, source, data FROM finalized_events WHERE source = ?"#, contract_id ) .fetch_all(&mut *conn) .await?; - Ok(records - .into_iter() - .map(|record| data::ArchivedEvent { - origin: record.origin, - topic: record.topic, - source: record.source, - data: record.data, - }) - .collect()) + Ok(records) } /// Fetch all unfinalized vm events from a given block hash @@ -257,7 +202,7 @@ impl Archive { ) -> Result> { let mut conn = self.sqlite_archive.acquire().await?; - let records = sqlx::query!( + let unfinalized_events = sqlx::query_as!(data::ArchivedEvent, r#"SELECT origin, topic, source, data FROM unfinalized_events WHERE block_hash = ?"#, hex_block_hash ) @@ -265,13 +210,7 @@ impl Archive { .await?; let mut contract_tx_events = Vec::new(); - for row in records { - let event = data::ArchivedEvent { - origin: row.origin, - topic: row.topic, - source: row.source, - data: row.data, - }; + for event in unfinalized_events { let contract_tx_event: ContractTxEvent = event.try_into()?; contract_tx_events.push(contract_tx_event); } @@ -336,13 +275,11 @@ impl Archive { data: event.event.data, }; - let mut conn = self.sqlite_archive.acquire().await?; - sqlx::query!( r#"INSERT INTO unfinalized_events (block_height, block_hash, origin, topic, source, data) VALUES (?, ?, ?, ?, ?, ?)"#, block_height, hex_block_hash, event.origin, event.topic, event.source, event.data ) - .execute(&mut *conn) + .execute(&mut *tx) .await?; } @@ -367,8 +304,6 @@ impl Archive { current_block_height: u64, hex_block_hash: &str, ) -> Result<()> { - let current_block_height: i64 = current_block_height as i64; - let mut tx = self.sqlite_archive.begin().await?; // Get the row for the block with the given hash that got finalized @@ -395,13 +330,21 @@ impl Archive { finalized_block_height as u64, ); - // TODO Categorize events? + // TODO: We can categorize grouped_events at one point here too and add + // this data to another table sqlx::query!( r#"INSERT INTO finalized_blocks (block_height, block_hash) VALUES (?, ?)"#, finalized_block_height, hex_block_hash ).execute(&mut *tx).await?; + sqlx::query!( + r#"DELETE FROM unfinalized_events WHERE block_hash = ?"#, + hex_block_hash + ) + .execute(&mut *tx) + .await?; + sqlx::query!( r#"DELETE FROM unfinalized_blocks WHERE block_hash = ?"#, hex_block_hash @@ -415,18 +358,11 @@ impl Archive { let origin = hex::encode(ident.origin()); for event in events { - // TODO: make conversion easier or remove it and insert directly - // into query - let event = data::ArchivedEvent { - origin: origin.clone(), - topic: event.topic.clone(), - source: event.target.0.to_string(), - data: event.data.clone(), - }; + let source = event.target.0.to_string(); sqlx::query!( r#"INSERT INTO finalized_events (block_height, block_hash, origin, topic, source, data) VALUES (?, ?, ?, ?, ?, ?)"#, - finalized_block_height, hex_block_hash, event.origin, event.topic, event.source, event.data + finalized_block_height, hex_block_hash, origin, event.topic, source, event.data ) .execute(&mut *tx) .await?; @@ -435,7 +371,7 @@ impl Archive { // Commit the transaction tx.commit().await?; - + let current_block_height: i64 = current_block_height as i64; info!( "Marked block {} with height {} as finalized. After {} blocks at height {}", util::truncate_string(hex_block_hash), @@ -450,8 +386,9 @@ impl Archive { Ok(()) } - /// Remove the block of the given hash from the archive. - pub(super) async fn remove_block( + /// Remove the unfinalized block together with the unfinalized events of the + /// given hash from the archive. + pub(super) async fn remove_block_and_events( &self, current_block_height: u64, hex_block_hash: &str, @@ -500,6 +437,8 @@ mod data { use node_data::events::contract::{ ContractEvent, ContractTxEvent, ORIGIN_HASH_BYTES, }; + use serde::{Deserialize, Serialize}; + use sqlx::FromRow; /// Archived ContractTxEvent /// @@ -512,10 +451,13 @@ mod data { /// - `source`: The source field is the hex encoded contract id of the /// event. /// - `data`: The data field is the data of the event. + #[serde_with::serde_as] + #[derive(Debug, Clone, FromRow, Serialize, Deserialize)] pub struct ArchivedEvent { pub origin: String, pub topic: String, pub source: String, + #[serde_as(as = "serde_with::hex::Hex")] pub data: Vec, } @@ -679,13 +621,12 @@ mod tests { .unwrap(); assert!(archive - .remove_block(blk_height, &hex_blk_hash) + .remove_block_and_events(blk_height, &hex_blk_hash) .await .unwrap()); - let fetched_events = archive.fetch_vm_events(blk_height).await; - - assert!(fetched_events.is_err()); + let fetched_events = archive.fetch_vm_events(blk_height).await.unwrap(); + assert!(fetched_events.is_empty()); archive .store_unfinalized_vm_events(blk_height, blk_hash, events.clone()) @@ -698,7 +639,7 @@ mod tests { .unwrap(); assert!(!archive - .remove_block(blk_height, &hex_blk_hash) + .remove_block_and_events(blk_height, &hex_blk_hash) .await .unwrap());