From 82c53ae54e69d1f267e2a79c78a10538ea28d922 Mon Sep 17 00:00:00 2001 From: mrnaveira <47919901+mrnaveira@users.noreply.github.com> Date: Thu, 15 Aug 2024 19:34:48 +0100 Subject: [PATCH 1/8] get oldest scanned epoch --- .../tari_indexer/src/event_scanner.rs | 14 +++++++++++++ .../sqlite_substate_store_factory.rs | 20 +++++++++++++++++++ 2 files changed, 34 insertions(+) diff --git a/applications/tari_indexer/src/event_scanner.rs b/applications/tari_indexer/src/event_scanner.rs index 67928b3d4..044ed786f 100644 --- a/applications/tari_indexer/src/event_scanner.rs +++ b/applications/tari_indexer/src/event_scanner.rs @@ -127,8 +127,15 @@ impl EventScanner { let mut event_count = 0; + let oldest_scanned_epoch = self.get_oldest_scanned_epoch().await?; + info!( + target: LOG_TARGET, + "get_oldest_scanned_epoch: {:?}", oldest_scanned_epoch + ); + let current_epoch = self.epoch_manager.current_epoch().await?; let current_committees = self.epoch_manager.get_committees(current_epoch).await?; + for (shard_group, mut committee) in current_committees { info!( target: LOG_TARGET, @@ -416,6 +423,13 @@ impl EventScanner { *start_block.id() } + async fn get_oldest_scanned_epoch(&self) -> Result, anyhow::Error>{ + self + .substate_store + .with_read_tx(|tx| tx.get_oldest_scanned_epoch()) + .map_err(|e| e.into()) + } + #[allow(unused_assignments)] async fn get_new_blocks_from_committee( &self, diff --git a/applications/tari_indexer/src/substate_storage_sqlite/sqlite_substate_store_factory.rs b/applications/tari_indexer/src/substate_storage_sqlite/sqlite_substate_store_factory.rs index f8cb3d1bb..f34149d2e 100644 --- a/applications/tari_indexer/src/substate_storage_sqlite/sqlite_substate_store_factory.rs +++ b/applications/tari_indexer/src/substate_storage_sqlite/sqlite_substate_store_factory.rs @@ -230,6 +230,7 @@ pub trait SubstateStoreReadTransaction { limit: u32, ) -> Result, StorageError>; fn event_exists(&mut self, event: NewEvent) -> Result; + fn get_oldest_scanned_epoch(&mut self) -> Result, StorageError>; fn get_last_scanned_block_id( &mut self, epoch: Epoch, @@ -601,6 +602,25 @@ impl SubstateStoreReadTransaction for SqliteSubstateStoreReadTransaction<'_> { Ok(exists) } + fn get_oldest_scanned_epoch(&mut self) -> Result, StorageError> { + use crate::substate_storage_sqlite::schema::scanned_block_ids; + + let res: Option = scanned_block_ids::table + .select(diesel::dsl::min(scanned_block_ids::epoch)) + .first(self.connection()) + .map_err(|e| StorageError::QueryError { + reason: format!("get_oldest_scanned_epoch: {}", e), + })?; + + let oldest_epoch = res.map(|r| { + let epoch_as_u64 = r.try_into() + .map_err(|_| StorageError::InvalidIntegerCast)?; + Ok::(Epoch(epoch_as_u64)) + }).transpose()?; + + Ok(oldest_epoch) + } + fn get_last_scanned_block_id( &mut self, epoch: Epoch, From 5626769e6c4e4b76c02f05deca9792161bf055e1 Mon Sep 17 00:00:00 2001 From: mrnaveira <47919901+mrnaveira@users.noreply.github.com> Date: Fri, 16 Aug 2024 16:57:33 +0100 Subject: [PATCH 2/8] refactor scanning logic --- .../tari_indexer/src/event_scanner.rs | 65 ++++++++++++++----- .../sqlite_substate_store_factory.rs | 14 ++++ 2 files changed, 62 insertions(+), 17 deletions(-) diff --git a/applications/tari_indexer/src/event_scanner.rs b/applications/tari_indexer/src/event_scanner.rs index 044ed786f..613ded467 100644 --- a/applications/tari_indexer/src/event_scanner.rs +++ b/applications/tari_indexer/src/event_scanner.rs @@ -127,35 +127,64 @@ impl EventScanner { let mut event_count = 0; + let newest_epoch = self.epoch_manager.current_epoch().await?; let oldest_scanned_epoch = self.get_oldest_scanned_epoch().await?; + + match oldest_scanned_epoch { + Some(oldest_epoch) => { + // we could span multiple cuncurrent epoch scans + // but we want to avoid gaps of the latest scanned value if any of the intermediate epoch scans fail + for epoch_idx in oldest_epoch.0..=newest_epoch.0 { + let epoch = Epoch(epoch_idx); + event_count += self.scan_events_of_epoch(epoch).await?; + + // at this point we can assume the previous epochs have been fully scanned + self.delete_scanned_epochs_older_than(epoch).await?; + } + }, + None => { + // by default we start scanning since the current epoch + // TODO: it would be nice a new parameter in the indexer to spcify a custom starting epoch + event_count += self.scan_events_of_epoch(newest_epoch).await?; + }, + } + info!( target: LOG_TARGET, - "get_oldest_scanned_epoch: {:?}", oldest_scanned_epoch + "Scanned {} events", + event_count ); - let current_epoch = self.epoch_manager.current_epoch().await?; - let current_committees = self.epoch_manager.get_committees(current_epoch).await?; + Ok(event_count) + } + + async fn scan_events_of_epoch(&self, epoch: Epoch) -> Result { + let committees = self.epoch_manager.get_committees(epoch).await?; - for (shard_group, mut committee) in current_committees { + let mut event_count = 0; + + for (shard_group, mut committee) in committees { info!( target: LOG_TARGET, "Scanning committee epoch={}, sg={}", - current_epoch, + epoch, shard_group ); let new_blocks = self - .get_new_blocks_from_committee(shard_group, &mut committee, current_epoch) + .get_new_blocks_from_committee(shard_group, &mut committee, epoch) .await?; info!( target: LOG_TARGET, - "Scanned {} blocks", - new_blocks.len() + "Scanned {} blocks in epoch={}", + new_blocks.len(), + epoch, ); let transactions = self.extract_transactions_from_blocks(new_blocks); info!( target: LOG_TARGET, - "Scanned {} transactions", - transactions.len() + "Scanned {} transactions in epoch={}", + transactions.len(), + epoch, ); for transaction in transactions { @@ -168,22 +197,24 @@ impl EventScanner { events.into_iter().filter(|ev| self.should_persist_event(ev)).collect(); info!( target: LOG_TARGET, - "Filtered events: {}", + "Filtered events in epoch {}: {}", + epoch, filtered_events.len() ); self.store_events_in_db(&filtered_events, transaction).await?; } } - info!( - target: LOG_TARGET, - "Scanned {} events", - event_count - ); - Ok(event_count) } + async fn delete_scanned_epochs_older_than(&self, epoch: Epoch) -> Result<(), anyhow::Error> { + self + .substate_store + .with_write_tx(|tx| tx.delete_scanned_epochs_older_than(epoch)) + .map_err(|e| e.into()) + } + fn should_persist_event(&self, event_data: &EventData) -> bool { for filter in &self.event_filters { if Self::event_matches_filter(filter, &event_data.event) { diff --git a/applications/tari_indexer/src/substate_storage_sqlite/sqlite_substate_store_factory.rs b/applications/tari_indexer/src/substate_storage_sqlite/sqlite_substate_store_factory.rs index f34149d2e..784d167da 100644 --- a/applications/tari_indexer/src/substate_storage_sqlite/sqlite_substate_store_factory.rs +++ b/applications/tari_indexer/src/substate_storage_sqlite/sqlite_substate_store_factory.rs @@ -676,6 +676,7 @@ pub trait SubstateStoreWriteTransaction { fn add_non_fungible_index(&mut self, new_nft_index: NewNonFungibleIndex) -> Result<(), StorageError>; fn save_event(&mut self, new_event: NewEvent) -> Result<(), StorageError>; fn save_scanned_block_id(&mut self, new_scanned_block_id: NewScannedBlockId) -> Result<(), StorageError>; + fn delete_scanned_epochs_older_than(&mut self, epoch: Epoch) -> Result<(), StorageError>; } impl SubstateStoreWriteTransaction for SqliteSubstateStoreWriteTransaction<'_> { @@ -838,6 +839,19 @@ impl SubstateStoreWriteTransaction for SqliteSubstateStoreWriteTransaction<'_> { Ok(()) } + + fn delete_scanned_epochs_older_than(&mut self, epoch: Epoch) -> Result<(), StorageError> { + use crate::substate_storage_sqlite::schema::scanned_block_ids; + + diesel::delete(scanned_block_ids::table) + .filter(scanned_block_ids::epoch.lt(epoch.0 as i64)) + .execute(&mut *self.connection()) + .map_err(|e| StorageError::QueryError { + reason: format!("delete_scanned_epochs_older_than: {}", e), + })?; + + Ok(()) + } } impl<'a> Deref for SqliteSubstateStoreWriteTransaction<'a> { From 9fb07b3a531fcac2f8d7e07e4df03443885d46a5 Mon Sep 17 00:00:00 2001 From: mrnaveira <47919901+mrnaveira@users.noreply.github.com> Date: Fri, 16 Aug 2024 19:25:43 +0100 Subject: [PATCH 3/8] only save scanned block id if there is a new one --- applications/tari_indexer/src/event_scanner.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/applications/tari_indexer/src/event_scanner.rs b/applications/tari_indexer/src/event_scanner.rs index 613ded467..d61b69c2a 100644 --- a/applications/tari_indexer/src/event_scanner.rs +++ b/applications/tari_indexer/src/event_scanner.rs @@ -511,9 +511,9 @@ impl EventScanner { ); if let Some(block) = blocks.last() { last_block_id = *block.id(); - } - // Store the latest scanned block id in the database for future scans - self.save_scanned_block_id(epoch, shard_group, last_block_id)?; + // Store the latest scanned block id in the database for future scans + self.save_scanned_block_id(epoch, shard_group, last_block_id)?; + } return Ok(blocks); }, Err(e) => { From 642318a3b34c5e37143a5afc1102e771b4f42b7b Mon Sep 17 00:00:00 2001 From: mrnaveira <47919901+mrnaveira@users.noreply.github.com> Date: Tue, 20 Aug 2024 14:02:54 +0100 Subject: [PATCH 4/8] query blocks by epoch --- applications/tari_indexer/src/event_scanner.rs | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/applications/tari_indexer/src/event_scanner.rs b/applications/tari_indexer/src/event_scanner.rs index d61b69c2a..ea48e11d2 100644 --- a/applications/tari_indexer/src/event_scanner.rs +++ b/applications/tari_indexer/src/event_scanner.rs @@ -496,7 +496,7 @@ impl EventScanner { epoch, shard_group ); - let resp = self.get_blocks_from_vn(member, start_block_id).await; + let resp = self.get_blocks_from_vn(member, start_block_id, Some(epoch)).await; match resp { Ok(blocks) => { @@ -509,7 +509,11 @@ impl EventScanner { epoch, shard_group, ); - if let Some(block) = blocks.last() { + + // get the most recent block among all scanned blocks in the epoch + let last_block = blocks.iter().max_by_key(|b| b.height()); + + if let Some(block) = last_block { last_block_id = *block.id(); // Store the latest scanned block id in the database for future scans self.save_scanned_block_id(epoch, shard_group, last_block_id)?; @@ -569,6 +573,7 @@ impl EventScanner { &self, vn_addr: &PeerAddress, start_block_id: BlockId, + up_to_epoch: Option, ) -> Result, anyhow::Error> { let mut blocks = vec![]; @@ -578,7 +583,7 @@ impl EventScanner { let mut stream = client .sync_blocks(SyncBlocksRequest { start_block_id: start_block_id.as_bytes().to_vec(), - up_to_epoch: None, + up_to_epoch: up_to_epoch.map(|epoch| epoch.into()), }) .await?; while let Some(resp) = stream.next().await { From c7832839f1596fc9ac32fe7fcc8e6134ca75c913 Mon Sep 17 00:00:00 2001 From: mrnaveira <47919901+mrnaveira@users.noreply.github.com> Date: Wed, 21 Aug 2024 15:59:47 +0100 Subject: [PATCH 5/8] event scanning working --- .../tari_indexer/src/event_scanner.rs | 6 +++- .../src/p2p/rpc/block_sync_task.rs | 29 +++++++++++++++--- .../src/p2p/rpc/service_impl.rs | 9 ++++-- dan_layer/state_store_sqlite/src/reader.rs | 30 +++++++++++++++++++ dan_layer/storage/src/state_store/mod.rs | 1 + 5 files changed, 68 insertions(+), 7 deletions(-) diff --git a/applications/tari_indexer/src/event_scanner.rs b/applications/tari_indexer/src/event_scanner.rs index ea48e11d2..43bbabe42 100644 --- a/applications/tari_indexer/src/event_scanner.rs +++ b/applications/tari_indexer/src/event_scanner.rs @@ -511,7 +511,11 @@ impl EventScanner { ); // get the most recent block among all scanned blocks in the epoch - let last_block = blocks.iter().max_by_key(|b| b.height()); + let last_block = blocks + .iter() + .max_by_key(|b| + (b.epoch(), b.height()) + ); if let Some(block) = last_block { last_block_id = *block.id(); diff --git a/applications/tari_validator_node/src/p2p/rpc/block_sync_task.rs b/applications/tari_validator_node/src/p2p/rpc/block_sync_task.rs index b20ecb6d1..bcb8f9a01 100644 --- a/applications/tari_validator_node/src/p2p/rpc/block_sync_task.rs +++ b/applications/tari_validator_node/src/p2p/rpc/block_sync_task.rs @@ -30,7 +30,7 @@ type BlockBuffer = Vec; pub struct BlockSyncTask { store: TStateStore, start_block: Block, - _up_to_epoch: Option, + up_to_epoch: Option, sender: mpsc::Sender>, } @@ -44,7 +44,7 @@ impl BlockSyncTask { Self { store, start_block, - _up_to_epoch: up_to_epoch, + up_to_epoch, sender, } } @@ -110,11 +110,32 @@ impl BlockSyncTask { let mut current_block_id = *current_block_id; let mut last_block_id = current_block_id; loop { - let children = tx.blocks_get_all_by_parent(¤t_block_id)?; - let Some(child) = children.into_iter().find(|b| b.is_committed()) else { + let current_block = tx.blocks_get(¤t_block_id)?; + + // Find the next block in the database + let child = if current_block.is_epoch_end() { + // The current block is the last one in the epoch, + // so we need to find the first block in the next expoch + tx.blocks_get_first_in_epoch(current_block.epoch() + Epoch(1))? + } else { + // The current block is NOT the last one in the epoch, + // so we need to find a child block + let children = tx.blocks_get_all_by_parent(¤t_block_id)?; + children.into_iter().find(|b| b.is_committed()) + }; + + // If there is not a new block then we stop streaming + let Some(child) = child else { break; }; + // If we hit the max allowed epoch then we stop streaming + if let Some(epoch) = self.up_to_epoch { + if child.epoch() > epoch { + break; + } + } + current_block_id = *child.id(); if child.is_dummy() { continue; diff --git a/applications/tari_validator_node/src/p2p/rpc/service_impl.rs b/applications/tari_validator_node/src/p2p/rpc/service_impl.rs index 7ac5befc7..ffb3b2e46 100644 --- a/applications/tari_validator_node/src/p2p/rpc/service_impl.rs +++ b/applications/tari_validator_node/src/p2p/rpc/service_impl.rs @@ -279,12 +279,17 @@ impl ValidatorNodeRpcService for ValidatorNodeRpcServiceImpl { .map_err(RpcStatus::log_internal_error(LOG_TARGET))? .ok_or_else(|| RpcStatus::not_found(format!("start_block_id {start_block_id} not found")))?; - // Check that the start block + // Check that the start block is not after the locked block let locked_block = store .with_read_tx(|tx| LockedBlock::get(tx).optional()) .map_err(RpcStatus::log_internal_error(LOG_TARGET))? .ok_or_else(|| RpcStatus::not_found("No locked block"))?; - if start_block.height() > locked_block.height() { + let epoch_is_after = start_block.epoch() > locked_block.epoch(); + let height_is_after = + (start_block.epoch() == locked_block.epoch) && + (start_block.height() > locked_block.height()); + + if epoch_is_after || height_is_after { return Err(RpcStatus::not_found(format!( "start_block_id {} is after locked block {}", start_block_id, locked_block diff --git a/dan_layer/state_store_sqlite/src/reader.rs b/dan_layer/state_store_sqlite/src/reader.rs index d6d56b23d..8f6986785 100644 --- a/dan_layer/state_store_sqlite/src/reader.rs +++ b/dan_layer/state_store_sqlite/src/reader.rs @@ -717,6 +717,36 @@ impl<'tx, TAddr: NodeAddressable + Serialize + DeserializeOwned + 'tx> StateStor block.try_convert(qc) } + fn blocks_get_first_in_epoch(&self, epoch: Epoch) -> Result, StorageError> { + use crate::schema::{blocks, quorum_certificates}; + + let row = blocks::table + .left_join(quorum_certificates::table.on(blocks::qc_id.eq(quorum_certificates::qc_id))) + .select((blocks::all_columns, quorum_certificates::all_columns.nullable())) + .filter(blocks::epoch.eq(epoch.as_u64() as i64)) + .filter(blocks::height.eq(0)) + .first::<(sql_models::Block, Option)>(self.connection()) + .optional() + .map_err(|e| SqliteStorageError::DieselError { + operation: "blocks_get_first_in_epoch", + source: e, + })?; + + if let Some((block, qc)) = row { + let qc = qc.ok_or_else(|| SqliteStorageError::DbInconsistency { + operation: "blocks_get_first_in_epoch", + details: format!( + "block {} references non-existent quorum certificate {}", + block.id, block.qc_id + ), + })?; + let res = block.try_convert(qc)?; + Ok(Some(res)) + } else { + Ok(None) + } + } + fn blocks_get_last_n_in_epoch(&self, n: usize, epoch: Epoch) -> Result, StorageError> { use crate::schema::{blocks, quorum_certificates}; diff --git a/dan_layer/storage/src/state_store/mod.rs b/dan_layer/storage/src/state_store/mod.rs index 6bf381218..69a8bb0b4 100644 --- a/dan_layer/storage/src/state_store/mod.rs +++ b/dan_layer/storage/src/state_store/mod.rs @@ -137,6 +137,7 @@ pub trait StateStoreReadTransaction: Sized { from_block_id: &BlockId, ) -> Result; fn blocks_get(&self, block_id: &BlockId) -> Result; + fn blocks_get_first_in_epoch(&self, epoch: Epoch) -> Result, StorageError>; fn blocks_get_last_n_in_epoch(&self, n: usize, epoch: Epoch) -> Result, StorageError>; /// Returns all blocks from and excluding the start block (lower height) to the end block (inclusive) fn blocks_get_all_between( From 2eb23303fee55708301c8bd27926dc920edb1d6f Mon Sep 17 00:00:00 2001 From: mrnaveira <47919901+mrnaveira@users.noreply.github.com> Date: Wed, 21 Aug 2024 16:07:27 +0100 Subject: [PATCH 6/8] fix fmt --- .../tari_indexer/src/event_scanner.rs | 20 +++++++------------ .../sqlite_substate_store_factory.rs | 13 ++++++------ .../src/p2p/rpc/service_impl.rs | 3 +-- 3 files changed, 15 insertions(+), 21 deletions(-) diff --git a/applications/tari_indexer/src/event_scanner.rs b/applications/tari_indexer/src/event_scanner.rs index 43bbabe42..a44254fca 100644 --- a/applications/tari_indexer/src/event_scanner.rs +++ b/applications/tari_indexer/src/event_scanner.rs @@ -129,7 +129,7 @@ impl EventScanner { let newest_epoch = self.epoch_manager.current_epoch().await?; let oldest_scanned_epoch = self.get_oldest_scanned_epoch().await?; - + match oldest_scanned_epoch { Some(oldest_epoch) => { // we could span multiple cuncurrent epoch scans @@ -137,7 +137,7 @@ impl EventScanner { for epoch_idx in oldest_epoch.0..=newest_epoch.0 { let epoch = Epoch(epoch_idx); event_count += self.scan_events_of_epoch(epoch).await?; - + // at this point we can assume the previous epochs have been fully scanned self.delete_scanned_epochs_older_than(epoch).await?; } @@ -209,8 +209,7 @@ impl EventScanner { } async fn delete_scanned_epochs_older_than(&self, epoch: Epoch) -> Result<(), anyhow::Error> { - self - .substate_store + self.substate_store .with_write_tx(|tx| tx.delete_scanned_epochs_older_than(epoch)) .map_err(|e| e.into()) } @@ -454,9 +453,8 @@ impl EventScanner { *start_block.id() } - async fn get_oldest_scanned_epoch(&self) -> Result, anyhow::Error>{ - self - .substate_store + async fn get_oldest_scanned_epoch(&self) -> Result, anyhow::Error> { + self.substate_store .with_read_tx(|tx| tx.get_oldest_scanned_epoch()) .map_err(|e| e.into()) } @@ -511,17 +509,13 @@ impl EventScanner { ); // get the most recent block among all scanned blocks in the epoch - let last_block = blocks - .iter() - .max_by_key(|b| - (b.epoch(), b.height()) - ); + let last_block = blocks.iter().max_by_key(|b| (b.epoch(), b.height())); if let Some(block) = last_block { last_block_id = *block.id(); // Store the latest scanned block id in the database for future scans self.save_scanned_block_id(epoch, shard_group, last_block_id)?; - } + } return Ok(blocks); }, Err(e) => { diff --git a/applications/tari_indexer/src/substate_storage_sqlite/sqlite_substate_store_factory.rs b/applications/tari_indexer/src/substate_storage_sqlite/sqlite_substate_store_factory.rs index 784d167da..972a58840 100644 --- a/applications/tari_indexer/src/substate_storage_sqlite/sqlite_substate_store_factory.rs +++ b/applications/tari_indexer/src/substate_storage_sqlite/sqlite_substate_store_factory.rs @@ -611,12 +611,13 @@ impl SubstateStoreReadTransaction for SqliteSubstateStoreReadTransaction<'_> { .map_err(|e| StorageError::QueryError { reason: format!("get_oldest_scanned_epoch: {}", e), })?; - - let oldest_epoch = res.map(|r| { - let epoch_as_u64 = r.try_into() - .map_err(|_| StorageError::InvalidIntegerCast)?; - Ok::(Epoch(epoch_as_u64)) - }).transpose()?; + + let oldest_epoch = res + .map(|r| { + let epoch_as_u64 = r.try_into().map_err(|_| StorageError::InvalidIntegerCast)?; + Ok::(Epoch(epoch_as_u64)) + }) + .transpose()?; Ok(oldest_epoch) } diff --git a/applications/tari_validator_node/src/p2p/rpc/service_impl.rs b/applications/tari_validator_node/src/p2p/rpc/service_impl.rs index ffb3b2e46..5b52f8dd7 100644 --- a/applications/tari_validator_node/src/p2p/rpc/service_impl.rs +++ b/applications/tari_validator_node/src/p2p/rpc/service_impl.rs @@ -286,8 +286,7 @@ impl ValidatorNodeRpcService for ValidatorNodeRpcServiceImpl { .ok_or_else(|| RpcStatus::not_found("No locked block"))?; let epoch_is_after = start_block.epoch() > locked_block.epoch(); let height_is_after = - (start_block.epoch() == locked_block.epoch) && - (start_block.height() > locked_block.height()); + (start_block.epoch() == locked_block.epoch) && (start_block.height() > locked_block.height()); if epoch_is_after || height_is_after { return Err(RpcStatus::not_found(format!( From ee19751a60cd93aaa96c69f3947d89cdfadc674e Mon Sep 17 00:00:00 2001 From: mrnaveira <47919901+mrnaveira@users.noreply.github.com> Date: Thu, 22 Aug 2024 10:22:13 +0100 Subject: [PATCH 7/8] use optional combinator --- .../src/p2p/rpc/block_sync_task.rs | 4 +-- dan_layer/state_store_sqlite/src/reader.rs | 29 ++++++++----------- dan_layer/storage/src/state_store/mod.rs | 2 +- 3 files changed, 15 insertions(+), 20 deletions(-) diff --git a/applications/tari_validator_node/src/p2p/rpc/block_sync_task.rs b/applications/tari_validator_node/src/p2p/rpc/block_sync_task.rs index bcb8f9a01..d025277fa 100644 --- a/applications/tari_validator_node/src/p2p/rpc/block_sync_task.rs +++ b/applications/tari_validator_node/src/p2p/rpc/block_sync_task.rs @@ -4,7 +4,7 @@ use std::collections::HashSet; use log::*; -use tari_dan_common_types::Epoch; +use tari_dan_common_types::{optional::Optional, Epoch}; use tari_dan_p2p::proto::rpc::{sync_blocks_response::SyncData, QuorumCertificates, SyncBlocksResponse, Transactions}; use tari_dan_storage::{ consensus_models::{Block, BlockId, QuorumCertificate, SubstateUpdate, TransactionRecord}, @@ -116,7 +116,7 @@ impl BlockSyncTask { let child = if current_block.is_epoch_end() { // The current block is the last one in the epoch, // so we need to find the first block in the next expoch - tx.blocks_get_first_in_epoch(current_block.epoch() + Epoch(1))? + tx.blocks_get_genesis_for_epoch(current_block.epoch() + Epoch(1)).optional()? } else { // The current block is NOT the last one in the epoch, // so we need to find a child block diff --git a/dan_layer/state_store_sqlite/src/reader.rs b/dan_layer/state_store_sqlite/src/reader.rs index 1af497aa4..89d8baec3 100644 --- a/dan_layer/state_store_sqlite/src/reader.rs +++ b/dan_layer/state_store_sqlite/src/reader.rs @@ -720,34 +720,29 @@ impl<'tx, TAddr: NodeAddressable + Serialize + DeserializeOwned + 'tx> StateStor block.try_convert(qc) } - fn blocks_get_first_in_epoch(&self, epoch: Epoch) -> Result, StorageError> { + fn blocks_get_genesis_for_epoch(&self, epoch: Epoch) -> Result { use crate::schema::{blocks, quorum_certificates}; - let row = blocks::table + let (block, qc) = blocks::table .left_join(quorum_certificates::table.on(blocks::qc_id.eq(quorum_certificates::qc_id))) .select((blocks::all_columns, quorum_certificates::all_columns.nullable())) .filter(blocks::epoch.eq(epoch.as_u64() as i64)) .filter(blocks::height.eq(0)) .first::<(sql_models::Block, Option)>(self.connection()) - .optional() .map_err(|e| SqliteStorageError::DieselError { - operation: "blocks_get_first_in_epoch", + operation: "blocks_get_genesis_for_epoch", source: e, })?; - if let Some((block, qc)) = row { - let qc = qc.ok_or_else(|| SqliteStorageError::DbInconsistency { - operation: "blocks_get_first_in_epoch", - details: format!( - "block {} references non-existent quorum certificate {}", - block.id, block.qc_id - ), - })?; - let res = block.try_convert(qc)?; - Ok(Some(res)) - } else { - Ok(None) - } + let qc = qc.ok_or_else(|| SqliteStorageError::DbInconsistency { + operation: "blocks_get_genesis_for_epoch", + details: format!( + "block {} references non-existent quorum certificate {}", + block.id, block.qc_id + ), + })?; + + block.try_convert(qc) } fn blocks_get_last_n_in_epoch(&self, n: usize, epoch: Epoch) -> Result, StorageError> { diff --git a/dan_layer/storage/src/state_store/mod.rs b/dan_layer/storage/src/state_store/mod.rs index f8c38b71b..215f3d964 100644 --- a/dan_layer/storage/src/state_store/mod.rs +++ b/dan_layer/storage/src/state_store/mod.rs @@ -140,7 +140,7 @@ pub trait StateStoreReadTransaction: Sized { from_block_id: &BlockId, ) -> Result; fn blocks_get(&self, block_id: &BlockId) -> Result; - fn blocks_get_first_in_epoch(&self, epoch: Epoch) -> Result, StorageError>; + fn blocks_get_genesis_for_epoch(&self, epoch: Epoch) -> Result; fn blocks_get_last_n_in_epoch(&self, n: usize, epoch: Epoch) -> Result, StorageError>; /// Returns all blocks from and excluding the start block (lower height) to the end block (inclusive) fn blocks_get_all_between( From 37519612f441365c88e774ce31214762e68bae1c Mon Sep 17 00:00:00 2001 From: mrnaveira <47919901+mrnaveira@users.noreply.github.com> Date: Thu, 22 Aug 2024 12:21:33 +0100 Subject: [PATCH 8/8] fix fmt --- .../tari_validator_node/src/p2p/rpc/block_sync_task.rs | 3 ++- dan_layer/state_store_sqlite/src/reader.rs | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/applications/tari_validator_node/src/p2p/rpc/block_sync_task.rs b/applications/tari_validator_node/src/p2p/rpc/block_sync_task.rs index d025277fa..18171f008 100644 --- a/applications/tari_validator_node/src/p2p/rpc/block_sync_task.rs +++ b/applications/tari_validator_node/src/p2p/rpc/block_sync_task.rs @@ -116,7 +116,8 @@ impl BlockSyncTask { let child = if current_block.is_epoch_end() { // The current block is the last one in the epoch, // so we need to find the first block in the next expoch - tx.blocks_get_genesis_for_epoch(current_block.epoch() + Epoch(1)).optional()? + tx.blocks_get_genesis_for_epoch(current_block.epoch() + Epoch(1)) + .optional()? } else { // The current block is NOT the last one in the epoch, // so we need to find a child block diff --git a/dan_layer/state_store_sqlite/src/reader.rs b/dan_layer/state_store_sqlite/src/reader.rs index 89d8baec3..8e6a8ec5f 100644 --- a/dan_layer/state_store_sqlite/src/reader.rs +++ b/dan_layer/state_store_sqlite/src/reader.rs @@ -741,7 +741,7 @@ impl<'tx, TAddr: NodeAddressable + Serialize + DeserializeOwned + 'tx> StateStor block.id, block.qc_id ), })?; - + block.try_convert(qc) }