Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release to main #636

Merged
merged 14 commits into from
Aug 12, 2024
14 changes: 10 additions & 4 deletions components/chainhook-cli/src/cli/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ use crate::storage::{
delete_confirmed_entry_from_stacks_blocks, delete_unconfirmed_entry_from_stacks_blocks,
get_last_block_height_inserted, get_last_unconfirmed_block_height_inserted,
get_stacks_block_at_block_height, insert_unconfirmed_entry_in_stacks_blocks,
is_stacks_block_present, open_readonly_stacks_db_conn, open_readwrite_stacks_db_conn,
set_last_confirmed_insert_key,
is_stacks_block_present, open_readonly_stacks_db_conn, open_readonly_stacks_db_conn_with_retry,
open_readwrite_stacks_db_conn, set_last_confirmed_insert_key,
};
use chainhook_sdk::chainhooks::bitcoin::BitcoinChainhookSpecification;
use chainhook_sdk::chainhooks::bitcoin::BitcoinChainhookSpecificationNetworkMap;
Expand Down Expand Up @@ -543,16 +543,22 @@ async fn handle_command(opts: Opts, ctx: Context) -> Result<(), String> {
}
};
match open_readonly_stacks_db_conn(&config.expected_cache_path(), &ctx) {
Ok(db_conn) => {
Ok(_) => {
let _ = consolidate_local_stacks_chainstate_using_csv(
&mut config,
&ctx,
)
.await;
// Refresh DB connection so it picks up recent changes made by TSV consolidation.
let new_conn = open_readonly_stacks_db_conn_with_retry(
&config.expected_cache_path(),
5,
&ctx,
)?;
scan_stacks_chainstate_via_rocksdb_using_predicate(
&predicate_spec,
None,
&db_conn,
&new_conn,
&config,
None,
&ctx,
Expand Down
89 changes: 69 additions & 20 deletions components/chainhook-cli/src/scan/stacks.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use std::{
collections::{HashMap, VecDeque},
fs::File,
io::{BufRead, BufReader},
sync::{Arc, RwLock},
};

Expand Down Expand Up @@ -66,11 +68,13 @@ pub enum RecordKind {
AttachmentReceived,
}

/// Calculates the canonical chain of Stacks blocks based on a Stacks node events TSV file. Returns a `VecDeque` structure of
/// block hashes along with the line number where we can find the entire block message within the TSV.
pub async fn get_canonical_fork_from_tsv(
config: &mut Config,
start_block: Option<u64>,
ctx: &Context,
) -> Result<VecDeque<(BlockIdentifier, BlockIdentifier, String)>, String> {
) -> Result<VecDeque<(BlockIdentifier, BlockIdentifier, u64)>, String> {
let seed_tsv_path = config.expected_local_stacks_tsv_file()?.clone();

let (record_tx, record_rx) = std::sync::mpsc::channel();
Expand All @@ -89,10 +93,12 @@ pub async fn get_canonical_fork_from_tsv(
.from_path(&seed_tsv_path)
.expect("unable to create csv reader");

let mut line: u64 = 0;
for result in reader_builder.deserialize() {
line += 1;
let record: Record = result.unwrap();
match &record.kind {
RecordKind::StacksBlockReceived => match record_tx.send(Some(record)) {
RecordKind::StacksBlockReceived => match record_tx.send(Some((record, line))) {
Err(_e) => {
break;
}
Expand All @@ -108,9 +114,9 @@ pub async fn get_canonical_fork_from_tsv(
let stacks_db = open_readonly_stacks_db_conn_with_retry(&config.expected_cache_path(), 3, ctx)?;
let canonical_fork = {
let mut cursor = BlockIdentifier::default();
let mut dump = HashMap::new();
let mut tsv_new_blocks = HashMap::new();

while let Ok(Some(mut record)) = record_rx.recv() {
while let Ok(Some((record, line))) = record_rx.recv() {
let (block_identifier, parent_block_identifier) = match (&record.kind, &record.blob) {
(RecordKind::StacksBlockReceived, Some(blob)) => {
match standardize_stacks_serialized_block_header(&blob) {
Expand Down Expand Up @@ -141,23 +147,28 @@ pub async fn get_canonical_fork_from_tsv(
}

if block_identifier.index > cursor.index {
cursor = block_identifier.clone(); // todo(lgalabru)
cursor = block_identifier.clone();
}
dump.insert(
block_identifier,
(parent_block_identifier, record.blob.take().unwrap()),
);
tsv_new_blocks.insert(block_identifier, (parent_block_identifier, line));
}

let mut canonical_fork = VecDeque::new();
while cursor.index > 0 {
let (block_identifer, (parent_block_identifier, blob)) =
match dump.remove_entry(&cursor) {
let (block_identifer, (parent_block_identifier, line)) =
match tsv_new_blocks.remove_entry(&cursor) {
Some(entry) => entry,
None => break,
None => {
warn!(
ctx.expect_logger(),
"Unable to find block {} with index block hash {} in TSV",
cursor.index,
cursor.hash
);
break;
}
};
cursor = parent_block_identifier.clone(); // todo(lgalabru)
canonical_fork.push_front((block_identifer, parent_block_identifier, blob));
cursor = parent_block_identifier.clone();
canonical_fork.push_front((block_identifer, parent_block_identifier, line));
}
canonical_fork
};
Expand Down Expand Up @@ -474,7 +485,10 @@ pub async fn scan_stacks_chainstate_via_csv_using_predicate(
);
let mut last_block_scanned = BlockIdentifier::default();
let mut err_count = 0;
for (block_identifier, _parent_block_identifier, blob) in canonical_fork.drain(..) {
let tsv_path = config.expected_local_stacks_tsv_file()?.clone();
let mut tsv_reader = BufReader::new(File::open(tsv_path).map_err(|e| e.to_string())?);
let mut tsv_current_line = 0;
for (block_identifier, _parent_block_identifier, tsv_line_number) in canonical_fork.drain(..) {
if block_identifier.index < start_block {
continue;
}
Expand All @@ -484,11 +498,25 @@ pub async fn scan_stacks_chainstate_via_csv_using_predicate(
}
}

// Seek to required line from TSV and retrieve its block payload.
let mut tsv_line = String::new();
while tsv_current_line < tsv_line_number {
tsv_line.clear();
let bytes_read = tsv_reader.read_line(&mut tsv_line).map_err(|e| e.to_string())?;
if bytes_read == 0 {
return Err("Unexpected EOF when reading TSV".to_string());
}
tsv_current_line += 1;
}
let Some(serialized_block) = tsv_line.split('\t').last() else {
return Err("Unable to retrieve serialized block from TSV line".to_string());
};

last_block_scanned = block_identifier;
blocks_scanned += 1;
let block_data = match indexer::stacks::standardize_stacks_serialized_block(
&indexer.config,
&blob,
serialized_block,
&mut indexer.stacks_context,
ctx,
) {
Expand Down Expand Up @@ -555,12 +583,12 @@ pub async fn consolidate_local_stacks_chainstate_using_csv(
);

let downloaded_new_dataset = download_stacks_dataset_if_required(config, ctx).await?;

if downloaded_new_dataset {
let stacks_db =
open_readonly_stacks_db_conn_with_retry(&config.expected_cache_path(), 3, ctx)?;
let confirmed_tip = get_last_block_height_inserted(&stacks_db, &ctx);
let mut canonical_fork = get_canonical_fork_from_tsv(config, confirmed_tip, ctx).await?;
let mut canonical_fork: VecDeque<(BlockIdentifier, BlockIdentifier, u64)> =
get_canonical_fork_from_tsv(config, confirmed_tip, ctx).await?;

let mut indexer = Indexer::new(config.network.clone());
let mut blocks_inserted = 0;
Expand All @@ -571,7 +599,14 @@ pub async fn consolidate_local_stacks_chainstate_using_csv(
ctx.expect_logger(),
"Beginning import of {} Stacks blocks into rocks db", blocks_to_insert
);
for (block_identifier, _parent_block_identifier, blob) in canonical_fork.drain(..) {
// TODO: To avoid repeating code with `scan_stacks_chainstate_via_csv_using_predicate`, we should move this block
// retrieval code into a reusable function.
let tsv_path = config.expected_local_stacks_tsv_file()?.clone();
let mut tsv_reader = BufReader::new(File::open(tsv_path).map_err(|e| e.to_string())?);
let mut tsv_current_line = 0;
for (block_identifier, _parent_block_identifier, tsv_line_number) in
canonical_fork.drain(..)
{
blocks_read += 1;

// If blocks already stored, move on
Expand All @@ -580,9 +615,23 @@ pub async fn consolidate_local_stacks_chainstate_using_csv(
}
blocks_inserted += 1;

// Seek to required line from TSV and retrieve its block payload.
let mut tsv_line = String::new();
while tsv_current_line < tsv_line_number {
tsv_line.clear();
let bytes_read = tsv_reader.read_line(&mut tsv_line).map_err(|e| e.to_string())?;
if bytes_read == 0 {
return Err("Unexpected EOF when reading TSV".to_string());
}
tsv_current_line += 1;
}
let Some(serialized_block) = tsv_line.split('\t').last() else {
return Err("Unable to retrieve serialized block from TSV line".to_string());
};

let block_data = match indexer::stacks::standardize_stacks_serialized_block(
&indexer.config,
&blob,
serialized_block,
&mut indexer.stacks_context,
ctx,
) {
Expand Down
3 changes: 2 additions & 1 deletion components/chainhook-sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ clarity = { git = "https://github.com/stacks-network/stacks-core.git", branch =
"log",
] }
hiro-system-kit = { version = "0.3.4", optional = true }
chainhook-types = { version = "1.3.6", path = "../chainhook-types-rs" }
rocket = { version = "=0.5.0", features = ["json"] }
bitcoincore-rpc = "0.18.0"
bitcoincore-rpc-json = "0.18.0"
Expand All @@ -45,6 +44,8 @@ regex = "1.9.3"
miniscript = "11.0.0"
prometheus = "0.13.3"

chainhook-types = { path = "../chainhook-types-rs" }

[dev-dependencies]
test-case = "3.1.0"

Expand Down
2 changes: 1 addition & 1 deletion components/chainhook-sdk/src/chainhooks/bitcoin/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -795,7 +795,7 @@ pub fn handle_bitcoin_hook_action<'a>(
}
}

struct OpReturn(String);
struct OpReturn(());
impl OpReturn {
fn from_string(hex: &String) -> Result<String, String> {
// Remove the `0x` prefix if present so that we can call from_hex without errors.
Expand Down
2 changes: 1 addition & 1 deletion components/chainhook-sdk/src/chainhooks/stacks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use super::types::{
append_error_context, BlockIdentifierIndexRule, ChainhookInstance, ExactMatchingRule,
HookAction,
};
use super::types::{validate_txid, PoxConfig};
use super::types::validate_txid;
use chainhook_types::{
BlockIdentifier, StacksChainEvent, StacksNetwork, StacksTransactionData,
StacksTransactionEvent, StacksTransactionEventPayload, StacksTransactionKind,
Expand Down
2 changes: 1 addition & 1 deletion components/chainhook-sdk/src/indexer/bitcoin/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ pub fn build_http_client() -> HttpClient {
HttpClient::builder()
.timeout(Duration::from_secs(15))
.http1_only()
.no_trust_dns()
.no_hickory_dns()
.connect_timeout(Duration::from_secs(15))
.tcp_keepalive(Some(Duration::from_secs(15)))
.no_proxy()
Expand Down
Loading