Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: indexer event scanning is broken after epoch change #1121

Merged
merged 9 commits into from
Aug 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 69 additions & 21 deletions applications/tari_indexer/src/event_scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,28 +127,64 @@ impl EventScanner {

let mut event_count = 0;

let current_epoch = self.epoch_manager.current_epoch().await?;
let current_committees = self.epoch_manager.get_committees(current_epoch).await?;
for (shard_group, mut committee) in current_committees {
let newest_epoch = self.epoch_manager.current_epoch().await?;
let oldest_scanned_epoch = self.get_oldest_scanned_epoch().await?;

match oldest_scanned_epoch {
Some(oldest_epoch) => {
// we could span multiple cuncurrent epoch scans
// but we want to avoid gaps of the latest scanned value if any of the intermediate epoch scans fail
for epoch_idx in oldest_epoch.0..=newest_epoch.0 {
let epoch = Epoch(epoch_idx);
event_count += self.scan_events_of_epoch(epoch).await?;

// at this point we can assume the previous epochs have been fully scanned
self.delete_scanned_epochs_older_than(epoch).await?;
}
},
None => {
// by default we start scanning since the current epoch
// TODO: it would be nice a new parameter in the indexer to spcify a custom starting epoch
event_count += self.scan_events_of_epoch(newest_epoch).await?;
},
}

info!(
target: LOG_TARGET,
"Scanned {} events",
event_count
);

Ok(event_count)
}

async fn scan_events_of_epoch(&self, epoch: Epoch) -> Result<usize, anyhow::Error> {
let committees = self.epoch_manager.get_committees(epoch).await?;

let mut event_count = 0;

for (shard_group, mut committee) in committees {
info!(
target: LOG_TARGET,
"Scanning committee epoch={}, sg={}",
current_epoch,
epoch,
shard_group
);
let new_blocks = self
.get_new_blocks_from_committee(shard_group, &mut committee, current_epoch)
.get_new_blocks_from_committee(shard_group, &mut committee, epoch)
.await?;
info!(
target: LOG_TARGET,
"Scanned {} blocks",
new_blocks.len()
"Scanned {} blocks in epoch={}",
new_blocks.len(),
epoch,
);
let transactions = self.extract_transactions_from_blocks(new_blocks);
info!(
target: LOG_TARGET,
"Scanned {} transactions",
transactions.len()
"Scanned {} transactions in epoch={}",
transactions.len(),
epoch,
);

for transaction in transactions {
Expand All @@ -161,22 +197,23 @@ impl EventScanner {
events.into_iter().filter(|ev| self.should_persist_event(ev)).collect();
info!(
target: LOG_TARGET,
"Filtered events: {}",
"Filtered events in epoch {}: {}",
epoch,
filtered_events.len()
);
self.store_events_in_db(&filtered_events, transaction).await?;
}
}

info!(
target: LOG_TARGET,
"Scanned {} events",
event_count
);

Ok(event_count)
}

async fn delete_scanned_epochs_older_than(&self, epoch: Epoch) -> Result<(), anyhow::Error> {
self.substate_store
.with_write_tx(|tx| tx.delete_scanned_epochs_older_than(epoch))
.map_err(|e| e.into())
}

fn should_persist_event(&self, event_data: &EventData) -> bool {
for filter in &self.event_filters {
if Self::event_matches_filter(filter, &event_data.event) {
Expand Down Expand Up @@ -416,6 +453,12 @@ impl EventScanner {
*start_block.id()
}

async fn get_oldest_scanned_epoch(&self) -> Result<Option<Epoch>, anyhow::Error> {
self.substate_store
.with_read_tx(|tx| tx.get_oldest_scanned_epoch())
.map_err(|e| e.into())
}

#[allow(unused_assignments)]
async fn get_new_blocks_from_committee(
&self,
Expand Down Expand Up @@ -451,7 +494,7 @@ impl EventScanner {
epoch,
shard_group
);
let resp = self.get_blocks_from_vn(member, start_block_id).await;
let resp = self.get_blocks_from_vn(member, start_block_id, Some(epoch)).await;

match resp {
Ok(blocks) => {
Expand All @@ -464,11 +507,15 @@ impl EventScanner {
epoch,
shard_group,
);
if let Some(block) = blocks.last() {

// get the most recent block among all scanned blocks in the epoch
let last_block = blocks.iter().max_by_key(|b| (b.epoch(), b.height()));

if let Some(block) = last_block {
last_block_id = *block.id();
// Store the latest scanned block id in the database for future scans
self.save_scanned_block_id(epoch, shard_group, last_block_id)?;
}
// Store the latest scanned block id in the database for future scans
self.save_scanned_block_id(epoch, shard_group, last_block_id)?;
return Ok(blocks);
},
Err(e) => {
Expand Down Expand Up @@ -524,6 +571,7 @@ impl EventScanner {
&self,
vn_addr: &PeerAddress,
start_block_id: BlockId,
up_to_epoch: Option<Epoch>,
) -> Result<Vec<Block>, anyhow::Error> {
let mut blocks = vec![];

Expand All @@ -533,7 +581,7 @@ impl EventScanner {
let mut stream = client
.sync_blocks(SyncBlocksRequest {
start_block_id: start_block_id.as_bytes().to_vec(),
up_to_epoch: None,
up_to_epoch: up_to_epoch.map(|epoch| epoch.into()),
})
.await?;
while let Some(resp) = stream.next().await {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ pub trait SubstateStoreReadTransaction {
limit: u32,
) -> Result<Vec<Event>, StorageError>;
fn event_exists(&mut self, event: NewEvent) -> Result<bool, StorageError>;
fn get_oldest_scanned_epoch(&mut self) -> Result<Option<Epoch>, StorageError>;
fn get_last_scanned_block_id(
&mut self,
epoch: Epoch,
Expand Down Expand Up @@ -601,6 +602,26 @@ impl SubstateStoreReadTransaction for SqliteSubstateStoreReadTransaction<'_> {
Ok(exists)
}

fn get_oldest_scanned_epoch(&mut self) -> Result<Option<Epoch>, StorageError> {
use crate::substate_storage_sqlite::schema::scanned_block_ids;

let res: Option<i64> = scanned_block_ids::table
.select(diesel::dsl::min(scanned_block_ids::epoch))
.first(self.connection())
.map_err(|e| StorageError::QueryError {
reason: format!("get_oldest_scanned_epoch: {}", e),
})?;

let oldest_epoch = res
.map(|r| {
let epoch_as_u64 = r.try_into().map_err(|_| StorageError::InvalidIntegerCast)?;
Ok::<Epoch, StorageError>(Epoch(epoch_as_u64))
})
.transpose()?;

Ok(oldest_epoch)
}

fn get_last_scanned_block_id(
&mut self,
epoch: Epoch,
Expand Down Expand Up @@ -656,6 +677,7 @@ pub trait SubstateStoreWriteTransaction {
fn add_non_fungible_index(&mut self, new_nft_index: NewNonFungibleIndex) -> Result<(), StorageError>;
fn save_event(&mut self, new_event: NewEvent) -> Result<(), StorageError>;
fn save_scanned_block_id(&mut self, new_scanned_block_id: NewScannedBlockId) -> Result<(), StorageError>;
fn delete_scanned_epochs_older_than(&mut self, epoch: Epoch) -> Result<(), StorageError>;
}

impl SubstateStoreWriteTransaction for SqliteSubstateStoreWriteTransaction<'_> {
Expand Down Expand Up @@ -818,6 +840,19 @@ impl SubstateStoreWriteTransaction for SqliteSubstateStoreWriteTransaction<'_> {

Ok(())
}

fn delete_scanned_epochs_older_than(&mut self, epoch: Epoch) -> Result<(), StorageError> {
use crate::substate_storage_sqlite::schema::scanned_block_ids;

diesel::delete(scanned_block_ids::table)
.filter(scanned_block_ids::epoch.lt(epoch.0 as i64))
.execute(&mut *self.connection())
.map_err(|e| StorageError::QueryError {
reason: format!("delete_scanned_epochs_older_than: {}", e),
})?;

Ok(())
}
}

impl<'a> Deref for SqliteSubstateStoreWriteTransaction<'a> {
Expand Down
32 changes: 27 additions & 5 deletions applications/tari_validator_node/src/p2p/rpc/block_sync_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
use std::collections::HashSet;

use log::*;
use tari_dan_common_types::Epoch;
use tari_dan_common_types::{optional::Optional, Epoch};
use tari_dan_p2p::proto::rpc::{sync_blocks_response::SyncData, QuorumCertificates, SyncBlocksResponse, Transactions};
use tari_dan_storage::{
consensus_models::{Block, BlockId, QuorumCertificate, SubstateUpdate, TransactionRecord},
Expand All @@ -30,7 +30,7 @@ type BlockBuffer = Vec<BlockData>;
pub struct BlockSyncTask<TStateStore: StateStore> {
store: TStateStore,
start_block: Block,
_up_to_epoch: Option<Epoch>,
up_to_epoch: Option<Epoch>,
sender: mpsc::Sender<Result<SyncBlocksResponse, RpcStatus>>,
}

Expand All @@ -44,7 +44,7 @@ impl<TStateStore: StateStore> BlockSyncTask<TStateStore> {
Self {
store,
start_block,
_up_to_epoch: up_to_epoch,
up_to_epoch,
sender,
}
}
Expand Down Expand Up @@ -110,11 +110,33 @@ impl<TStateStore: StateStore> BlockSyncTask<TStateStore> {
let mut current_block_id = *current_block_id;
let mut last_block_id = current_block_id;
loop {
let children = tx.blocks_get_all_by_parent(&current_block_id)?;
let Some(child) = children.into_iter().find(|b| b.is_committed()) else {
let current_block = tx.blocks_get(&current_block_id)?;

// Find the next block in the database
let child = if current_block.is_epoch_end() {
// The current block is the last one in the epoch,
// so we need to find the first block in the next expoch
tx.blocks_get_genesis_for_epoch(current_block.epoch() + Epoch(1))
.optional()?
} else {
// The current block is NOT the last one in the epoch,
// so we need to find a child block
let children = tx.blocks_get_all_by_parent(&current_block_id)?;
children.into_iter().find(|b| b.is_committed())
};

// If there is not a new block then we stop streaming
let Some(child) = child else {
break;
};

// If we hit the max allowed epoch then we stop streaming
if let Some(epoch) = self.up_to_epoch {
if child.epoch() > epoch {
break;
}
}

current_block_id = *child.id();
if child.is_dummy() {
continue;
Expand Down
8 changes: 6 additions & 2 deletions applications/tari_validator_node/src/p2p/rpc/service_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,12 +279,16 @@ impl ValidatorNodeRpcService for ValidatorNodeRpcServiceImpl {
.map_err(RpcStatus::log_internal_error(LOG_TARGET))?
.ok_or_else(|| RpcStatus::not_found(format!("start_block_id {start_block_id} not found")))?;

// Check that the start block
// Check that the start block is not after the locked block
let locked_block = store
.with_read_tx(|tx| LockedBlock::get(tx).optional())
.map_err(RpcStatus::log_internal_error(LOG_TARGET))?
.ok_or_else(|| RpcStatus::not_found("No locked block"))?;
if start_block.height() > locked_block.height() {
let epoch_is_after = start_block.epoch() > locked_block.epoch();
let height_is_after =
(start_block.epoch() == locked_block.epoch) && (start_block.height() > locked_block.height());

if epoch_is_after || height_is_after {
return Err(RpcStatus::not_found(format!(
"start_block_id {} is after locked block {}",
start_block_id, locked_block
Expand Down
25 changes: 25 additions & 0 deletions dan_layer/state_store_sqlite/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -720,6 +720,31 @@ impl<'tx, TAddr: NodeAddressable + Serialize + DeserializeOwned + 'tx> StateStor
block.try_convert(qc)
}

fn blocks_get_genesis_for_epoch(&self, epoch: Epoch) -> Result<Block, StorageError> {
use crate::schema::{blocks, quorum_certificates};

let (block, qc) = blocks::table
.left_join(quorum_certificates::table.on(blocks::qc_id.eq(quorum_certificates::qc_id)))
.select((blocks::all_columns, quorum_certificates::all_columns.nullable()))
.filter(blocks::epoch.eq(epoch.as_u64() as i64))
.filter(blocks::height.eq(0))
.first::<(sql_models::Block, Option<sql_models::QuorumCertificate>)>(self.connection())
.map_err(|e| SqliteStorageError::DieselError {
operation: "blocks_get_genesis_for_epoch",
source: e,
})?;

let qc = qc.ok_or_else(|| SqliteStorageError::DbInconsistency {
operation: "blocks_get_genesis_for_epoch",
details: format!(
"block {} references non-existent quorum certificate {}",
block.id, block.qc_id
),
})?;

block.try_convert(qc)
}

fn blocks_get_last_n_in_epoch(&self, n: usize, epoch: Epoch) -> Result<Vec<Block>, StorageError> {
use crate::schema::{blocks, quorum_certificates};

Expand Down
1 change: 1 addition & 0 deletions dan_layer/storage/src/state_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ pub trait StateStoreReadTransaction: Sized {
from_block_id: &BlockId,
) -> Result<BlockTransactionExecution, StorageError>;
fn blocks_get(&self, block_id: &BlockId) -> Result<Block, StorageError>;
fn blocks_get_genesis_for_epoch(&self, epoch: Epoch) -> Result<Block, StorageError>;
fn blocks_get_last_n_in_epoch(&self, n: usize, epoch: Epoch) -> Result<Vec<Block>, StorageError>;
/// Returns all blocks from and excluding the start block (lower height) to the end block (inclusive)
fn blocks_get_all_between(
Expand Down