Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: only import stacks tsv if chainstate is empty #684

Merged
merged 3 commits into from
Nov 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion components/chainhook-cli/src/archive/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ pub async fn download_stacks_dataset_if_required(
) -> Result<bool, String> {
if config.is_initial_ingestion_required() {
// Download default tsv.
if config.rely_on_remote_stacks_tsv() && config.should_download_remote_stacks_tsv() {
if config.contains_remote_stacks_tsv_url() && config.should_download_remote_stacks_tsv() {
let url = config.expected_remote_stacks_tsv_url()?;
let mut tsv_file_path = config.expected_cache_path();
tsv_file_path.push(default_tsv_file_path(&config.network.stacks_network));
Expand Down
13 changes: 6 additions & 7 deletions components/chainhook-cli/src/cli/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::config::generator::generate_config;
use crate::config::Config;
use crate::scan::bitcoin::scan_bitcoin_chainstate_via_rpc_using_predicate;
use crate::scan::stacks::{
consolidate_local_stacks_chainstate_using_csv, scan_stacks_chainstate_via_csv_using_predicate,
import_stacks_chainstate_from_remote_tsv, scan_stacks_chainstate_via_csv_using_predicate,
scan_stacks_chainstate_via_rocksdb_using_predicate,
};
use crate::service::http_api::document_predicate_api_server;
Expand All @@ -24,6 +24,7 @@ use chainhook_sdk::chainhooks::stacks::StacksChainhookSpecificationNetworkMap;
use chainhook_sdk::chainhooks::stacks::StacksPredicate;
use chainhook_sdk::chainhooks::stacks::StacksPrintEventBasedPredicate;
use chainhook_sdk::chainhooks::types::{ChainhookSpecificationNetworkMap, FileHook, HookAction};
use chainhook_sdk::try_info;
use chainhook_sdk::types::{BitcoinNetwork, BlockIdentifier, StacksNetwork};
use chainhook_sdk::utils::{BlockHeights, Context};
use clap::{Parser, Subcommand};
Expand Down Expand Up @@ -342,19 +343,17 @@ async fn handle_command(opts: Opts, ctx: Context) -> Result<(), String> {
ServiceCommand::Start(cmd) => {
let mut config =
Config::default(cmd.devnet, cmd.testnet, cmd.mainnet, &cmd.config_path)?;

if cmd.prometheus_monitoring_port.is_some() {
config.monitoring.prometheus_monitoring_port = cmd.prometheus_monitoring_port;
}

let predicates = cmd
.predicates_paths
.iter()
.map(|p| load_predicate_from_path(p))
.collect::<Result<Vec<ChainhookSpecificationNetworkMap>, _>>()?;

info!(ctx.expect_logger(), "Starting service...",);

try_info!(ctx, "Starting chainhook service");
import_stacks_chainstate_from_remote_tsv(&mut config, &ctx).await?;
let mut service = Service::new(config, ctx);
return service.run(predicates, None).await;
}
Expand Down Expand Up @@ -541,7 +540,7 @@ async fn handle_command(opts: Opts, ctx: Context) -> Result<(), String> {
};
match open_readonly_stacks_db_conn(&config.expected_cache_path(), &ctx) {
Ok(_) => {
let _ = consolidate_local_stacks_chainstate_using_csv(
let _ = import_stacks_chainstate_from_remote_tsv(
&mut config,
&ctx,
)
Expand Down Expand Up @@ -812,7 +811,7 @@ async fn handle_command(opts: Opts, ctx: Context) -> Result<(), String> {
}
StacksCommand::Db(StacksDbCommand::Update(cmd)) => {
let mut config = Config::default(false, false, false, &cmd.config_path)?;
consolidate_local_stacks_chainstate_using_csv(&mut config, &ctx).await?;
import_stacks_chainstate_from_remote_tsv(&mut config, &ctx).await?;
}
StacksCommand::Db(StacksDbCommand::Check(cmd)) => {
let config = Config::default(false, false, false, &cmd.config_path)?;
Expand Down
13 changes: 12 additions & 1 deletion components/chainhook-cli/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,17 @@ impl Config {
destination_path
}

pub fn is_cache_path_empty(&self) -> Result<bool, String> {
let mut dir = match std::fs::read_dir(self.expected_cache_path()) {
Ok(dir) => dir,
Err(error) => match error.kind() {
std::io::ErrorKind::NotFound => return Ok(true),
_ => return Err(format!("unable to read cache directory: {error}"))
},
};
Ok(dir.next().is_none())
}

fn expected_remote_stacks_tsv_base_url(&self) -> Result<&String, String> {
for source in self.event_sources.iter() {
if let EventSourceConfig::StacksTsvUrl(config) = source {
Expand All @@ -323,7 +334,7 @@ impl Config {
.map(|url| format!("{}.gz", url))
}

pub fn rely_on_remote_stacks_tsv(&self) -> bool {
pub fn contains_remote_stacks_tsv_url(&self) -> bool {
for source in self.event_sources.iter() {
if let EventSourceConfig::StacksTsvUrl(_config) = source {
return true;
Expand Down
172 changes: 85 additions & 87 deletions components/chainhook-cli/src/scan/stacks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use crate::{
use chainhook_sdk::{
chainhooks::stacks::evaluate_stacks_chainhook_on_blocks,
indexer::{self, stacks::standardize_stacks_serialized_block_header, Indexer},
try_info,
utils::Context,
};
use chainhook_sdk::{
Expand Down Expand Up @@ -338,11 +339,8 @@ pub async fn scan_stacks_chainstate_via_rocksdb_using_predicate(
&mut db_conns.signers_db,
&block_data.block_identifier,
)?;
let (hits_per_events, _) = evaluate_stacks_predicate_on_non_consensus_events(
&events,
predicate_spec,
ctx,
);
let (hits_per_events, _) =
evaluate_stacks_predicate_on_non_consensus_events(&events, predicate_spec, ctx);

if hits_per_blocks.is_empty() && hits_per_events.is_empty() {
continue;
Expand Down Expand Up @@ -584,101 +582,101 @@ pub async fn scan_stacks_chainstate_via_csv_using_predicate(
Ok(last_block_scanned)
}

pub async fn consolidate_local_stacks_chainstate_using_csv(
/// Downloads a remote archive TSV that contains Stacks node events and imports it into chainhook in order to fill up the Stacks
/// blocks database. This import will only happen if chainhook is starting from a fresh install with an empty index.
pub async fn import_stacks_chainstate_from_remote_tsv(
config: &mut Config,
ctx: &Context,
) -> Result<(), String> {
#[cfg(not(test))]
{
if !config.is_cache_path_empty()? {
try_info!(ctx, "A Stacks chainstate already exists, skipping TSV chainstante import");
return Ok(());
}
if !config.contains_remote_stacks_tsv_url() {
try_info!(ctx, "No remote Stacks TSV location was specified in config file, skipping TSV chainstante import");
return Ok(());
}
}
try_info!(ctx, "Importing Stacks chainstate from TSV");

download_stacks_dataset_if_required(config, ctx).await?;
let stacks_db = open_readonly_stacks_db_conn_with_retry(&config.expected_cache_path(), 3, ctx)?;
let confirmed_tip = get_last_block_height_inserted(&stacks_db, ctx);
let mut canonical_fork: VecDeque<(BlockIdentifier, BlockIdentifier, u64)> =
get_canonical_fork_from_tsv(config, confirmed_tip, ctx).await?;

let mut indexer = Indexer::new(config.network.clone());
let mut blocks_inserted = 0;
let mut blocks_read = 0;
let blocks_to_insert = canonical_fork.len();
let stacks_db_rw = open_readwrite_stacks_db_conn(&config.expected_cache_path(), ctx)?;
info!(
ctx.expect_logger(),
"Building local chainstate from Stacks archive file"
"Beginning import of {} Stacks blocks into rocks db", blocks_to_insert
);
// TODO: To avoid repeating code with `scan_stacks_chainstate_via_csv_using_predicate`, we should move this block
// retrieval code into a reusable function.
let tsv_path = config.expected_local_stacks_tsv_file()?.clone();
let mut tsv_reader = BufReader::new(File::open(tsv_path).map_err(|e| e.to_string())?);
let mut tsv_current_line = 0;
for (block_identifier, _parent_block_identifier, tsv_line_number) in canonical_fork.drain(..) {
blocks_read += 1;

let downloaded_new_dataset = download_stacks_dataset_if_required(config, ctx).await?;
if downloaded_new_dataset {
let stacks_db =
open_readonly_stacks_db_conn_with_retry(&config.expected_cache_path(), 3, ctx)?;
let confirmed_tip = get_last_block_height_inserted(&stacks_db, ctx);
let mut canonical_fork: VecDeque<(BlockIdentifier, BlockIdentifier, u64)> =
get_canonical_fork_from_tsv(config, confirmed_tip, ctx).await?;

let mut indexer = Indexer::new(config.network.clone());
let mut blocks_inserted = 0;
let mut blocks_read = 0;
let blocks_to_insert = canonical_fork.len();
let stacks_db_rw = open_readwrite_stacks_db_conn(&config.expected_cache_path(), ctx)?;
info!(
ctx.expect_logger(),
"Beginning import of {} Stacks blocks into rocks db", blocks_to_insert
);
// TODO: To avoid repeating code with `scan_stacks_chainstate_via_csv_using_predicate`, we should move this block
// retrieval code into a reusable function.
let tsv_path = config.expected_local_stacks_tsv_file()?.clone();
let mut tsv_reader = BufReader::new(File::open(tsv_path).map_err(|e| e.to_string())?);
let mut tsv_current_line = 0;
for (block_identifier, _parent_block_identifier, tsv_line_number) in
canonical_fork.drain(..)
{
blocks_read += 1;
// If blocks already stored, move on
if is_stacks_block_present(&block_identifier, 3, &stacks_db_rw) {
continue;
}
blocks_inserted += 1;

// If blocks already stored, move on
if is_stacks_block_present(&block_identifier, 3, &stacks_db_rw) {
continue;
}
blocks_inserted += 1;

// Seek to required line from TSV and retrieve its block payload.
let mut tsv_line = String::new();
while tsv_current_line < tsv_line_number {
tsv_line.clear();
let bytes_read = tsv_reader
.read_line(&mut tsv_line)
.map_err(|e| e.to_string())?;
if bytes_read == 0 {
return Err("Unexpected EOF when reading TSV".to_string());
}
tsv_current_line += 1;
// Seek to required line from TSV and retrieve its block payload.
let mut tsv_line = String::new();
while tsv_current_line < tsv_line_number {
tsv_line.clear();
let bytes_read = tsv_reader
.read_line(&mut tsv_line)
.map_err(|e| e.to_string())?;
if bytes_read == 0 {
return Err("Unexpected EOF when reading TSV".to_string());
}
let Some(serialized_block) = tsv_line.split('\t').last() else {
return Err("Unable to retrieve serialized block from TSV line".to_string());
};

let block_data = match indexer::stacks::standardize_stacks_serialized_block(
&indexer.config,
serialized_block,
&mut indexer.stacks_context,
ctx,
) {
Ok(block) => block,
Err(e) => {
error!(
&ctx.expect_logger(),
"Failed to standardize stacks block: {e}"
);
continue;
}
};

// TODO(rafaelcr): Store signer messages
insert_entry_in_stacks_blocks(&block_data, &stacks_db_rw, ctx)?;
tsv_current_line += 1;
}
let Some(serialized_block) = tsv_line.split('\t').last() else {
return Err("Unable to retrieve serialized block from TSV line".to_string());
};

if blocks_inserted % 2500 == 0 {
info!(
ctx.expect_logger(),
"Importing Stacks blocks into rocks db: {}/{}", blocks_read, blocks_to_insert
let block_data = match indexer::stacks::standardize_stacks_serialized_block(
&indexer.config,
serialized_block,
&mut indexer.stacks_context,
ctx,
) {
Ok(block) => block,
Err(e) => {
error!(
&ctx.expect_logger(),
"Failed to standardize stacks block: {e}"
);
let _ = stacks_db_rw.flush();
continue;
}
};

// TODO(rafaelcr): Store signer messages
insert_entry_in_stacks_blocks(&block_data, &stacks_db_rw, ctx)?;

if blocks_inserted % 2500 == 0 {
info!(
ctx.expect_logger(),
"Importing Stacks blocks into rocks db: {}/{}", blocks_read, blocks_to_insert
);
let _ = stacks_db_rw.flush();
}
let _ = stacks_db_rw.flush();
info!(
ctx.expect_logger(),
"{blocks_read} Stacks blocks read, {blocks_inserted} inserted"
);
} else {
info!(
ctx.expect_logger(),
"Skipping database consolidation - no new archive found since last consolidation."
);
}
let _ = stacks_db_rw.flush();
info!(
ctx.expect_logger(),
"{blocks_read} Stacks blocks read, {blocks_inserted} inserted"
);
Ok(())
}
27 changes: 0 additions & 27 deletions components/chainhook-cli/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ pub(crate) mod http_api;
mod runloops;

use crate::config::{Config, PredicatesApi, PredicatesApiConfig};
use crate::scan::stacks::consolidate_local_stacks_chainstate_using_csv;
use crate::service::http_api::{load_predicates_from_redis, start_predicate_api_server};
use crate::service::runloops::{start_bitcoin_scan_runloop, start_stacks_scan_runloop};
use crate::storage::signers::{initialize_signers_db, store_signer_db_messages};
Expand Down Expand Up @@ -164,11 +163,6 @@ impl Service {
let mut event_observer_config = self.config.get_event_observer_config();
event_observer_config.registered_chainhooks = chainhook_store;

// Download and ingest a Stacks dump
if self.config.rely_on_remote_stacks_tsv() {
consolidate_local_stacks_chainstate_using_csv(&mut self.config, &self.ctx).await?;
}

// Stacks scan operation threadpool
let (stacks_scan_op_tx, stacks_scan_op_rx) = crossbeam_channel::unbounded();
let ctx = self.ctx.clone();
Expand Down Expand Up @@ -292,8 +286,6 @@ impl Service {
self.ctx.clone(),
);

let mut stacks_event = 0;

let ctx = self.ctx.clone();
match self.config.http_api {
PredicatesApi::On(ref api_config) => {
Expand Down Expand Up @@ -586,7 +578,6 @@ impl Service {

match &chain_event {
StacksChainEvent::ChainUpdatedWithBlocks(data) => {
stacks_event += 1;
for confirmed_block in &data.confirmed_blocks {
if let Some(expired_predicate_uuids) =
expire_predicates_for_block(
Expand Down Expand Up @@ -649,24 +640,6 @@ impl Service {
&ctx,
);
};

// Every 32 blocks, we will check if there's a new Stacks file archive to ingest
if stacks_event > 32 {
stacks_event = 0;
if self.config.rely_on_remote_stacks_tsv() {
if let Err(e) = consolidate_local_stacks_chainstate_using_csv(
&mut self.config,
&self.ctx,
)
.await
{
error!(
self.ctx.expect_logger(),
"Failed to update database from archive: {e}"
)
};
}
}
}
ObserverEvent::PredicateInterrupted(PredicateInterruptedData {
predicate_key,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::config::{
Config, EventSourceConfig, LimitsConfig, MonitoringConfig, PathConfig, PredicatesApi,
PredicatesApiConfig, StorageConfig, DEFAULT_REDIS_URI,
};
use crate::scan::stacks::consolidate_local_stacks_chainstate_using_csv;
use crate::scan::stacks::import_stacks_chainstate_from_remote_tsv;
use crate::service::{
http_api::start_predicate_api_server, update_predicate_spec, update_predicate_status,
PredicateStatus, Service,
Expand Down Expand Up @@ -442,7 +442,7 @@ pub async fn setup_stacks_chainhook_test(
Some(prometheus_port),
);

consolidate_local_stacks_chainstate_using_csv(&mut config, &ctx)
import_stacks_chainstate_from_remote_tsv(&mut config, &ctx)
.await
.unwrap_or_else(|e| {
std::fs::remove_dir_all(&working_dir).unwrap();
Expand Down
4 changes: 2 additions & 2 deletions components/chainhook-cli/src/service/tests/runloop_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use chainhook_sdk::{

use crate::{
config::{Config, EventSourceConfig, PathConfig},
scan::stacks::consolidate_local_stacks_chainstate_using_csv,
scan::stacks::import_stacks_chainstate_from_remote_tsv,
service::{
runloops::{
start_bitcoin_scan_runloop, start_stacks_scan_runloop, BitcoinScanOp, StacksScanOp,
Expand Down Expand Up @@ -49,7 +49,7 @@ async fn test_stacks_runloop_kill_scan() {
tracer: false,
};

consolidate_local_stacks_chainstate_using_csv(&mut config, &ctx)
import_stacks_chainstate_from_remote_tsv(&mut config, &ctx)
.await
.unwrap_or_else(|e| {
std::fs::remove_dir_all(&working_dir).unwrap();
Expand Down
Loading