Skip to content

Commit

Permalink
fix: optimize memory for stacks tsv import into rocksdb (#634)
Browse files Browse the repository at this point in the history
This PR changes the way chainhook imports a Stacks node TSV into
rocksdb.

Before, it loaded the entire canonical chinstate (including the full
block JSON messages) onto a `VecDeque` in memory and then drained that
data into rocksdb. This was a very memory intensive process which
crashed our dev pods ever time it ran.

Now, the process was changed to a `VecDeque` that only keeps the line
numbers of the TSV where the block data exists, so it can later read
blocks from the file 1 by 1 and insert them into rocksdb.
  • Loading branch information
rafaelcr authored Aug 12, 2024
1 parent a3aeecf commit dcf545c
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 24 deletions.
14 changes: 10 additions & 4 deletions components/chainhook-cli/src/cli/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ use crate::storage::{
delete_confirmed_entry_from_stacks_blocks, delete_unconfirmed_entry_from_stacks_blocks,
get_last_block_height_inserted, get_last_unconfirmed_block_height_inserted,
get_stacks_block_at_block_height, insert_unconfirmed_entry_in_stacks_blocks,
is_stacks_block_present, open_readonly_stacks_db_conn, open_readwrite_stacks_db_conn,
set_last_confirmed_insert_key,
is_stacks_block_present, open_readonly_stacks_db_conn, open_readonly_stacks_db_conn_with_retry,
open_readwrite_stacks_db_conn, set_last_confirmed_insert_key,
};
use chainhook_sdk::chainhooks::bitcoin::BitcoinChainhookSpecification;
use chainhook_sdk::chainhooks::bitcoin::BitcoinChainhookSpecificationNetworkMap;
Expand Down Expand Up @@ -543,16 +543,22 @@ async fn handle_command(opts: Opts, ctx: Context) -> Result<(), String> {
}
};
match open_readonly_stacks_db_conn(&config.expected_cache_path(), &ctx) {
Ok(db_conn) => {
Ok(_) => {
let _ = consolidate_local_stacks_chainstate_using_csv(
&mut config,
&ctx,
)
.await;
// Refresh DB connection so it picks up recent changes made by TSV consolidation.
let new_conn = open_readonly_stacks_db_conn_with_retry(
&config.expected_cache_path(),
5,
&ctx,
)?;
scan_stacks_chainstate_via_rocksdb_using_predicate(
&predicate_spec,
None,
&db_conn,
&new_conn,
&config,
None,
&ctx,
Expand Down
89 changes: 69 additions & 20 deletions components/chainhook-cli/src/scan/stacks.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use std::{
collections::{HashMap, VecDeque},
fs::File,
io::{BufRead, BufReader},
sync::{Arc, RwLock},
};

Expand Down Expand Up @@ -66,11 +68,13 @@ pub enum RecordKind {
AttachmentReceived,
}

/// Calculates the canonical chain of Stacks blocks based on a Stacks node events TSV file. Returns a `VecDeque` structure of
/// block hashes along with the line number where we can find the entire block message within the TSV.
pub async fn get_canonical_fork_from_tsv(
config: &mut Config,
start_block: Option<u64>,
ctx: &Context,
) -> Result<VecDeque<(BlockIdentifier, BlockIdentifier, String)>, String> {
) -> Result<VecDeque<(BlockIdentifier, BlockIdentifier, u64)>, String> {
let seed_tsv_path = config.expected_local_stacks_tsv_file()?.clone();

let (record_tx, record_rx) = std::sync::mpsc::channel();
Expand All @@ -89,10 +93,12 @@ pub async fn get_canonical_fork_from_tsv(
.from_path(&seed_tsv_path)
.expect("unable to create csv reader");

let mut line: u64 = 0;
for result in reader_builder.deserialize() {
line += 1;
let record: Record = result.unwrap();
match &record.kind {
RecordKind::StacksBlockReceived => match record_tx.send(Some(record)) {
RecordKind::StacksBlockReceived => match record_tx.send(Some((record, line))) {
Err(_e) => {
break;
}
Expand All @@ -108,9 +114,9 @@ pub async fn get_canonical_fork_from_tsv(
let stacks_db = open_readonly_stacks_db_conn_with_retry(&config.expected_cache_path(), 3, ctx)?;
let canonical_fork = {
let mut cursor = BlockIdentifier::default();
let mut dump = HashMap::new();
let mut tsv_new_blocks = HashMap::new();

while let Ok(Some(mut record)) = record_rx.recv() {
while let Ok(Some((record, line))) = record_rx.recv() {
let (block_identifier, parent_block_identifier) = match (&record.kind, &record.blob) {
(RecordKind::StacksBlockReceived, Some(blob)) => {
match standardize_stacks_serialized_block_header(&blob) {
Expand Down Expand Up @@ -141,23 +147,28 @@ pub async fn get_canonical_fork_from_tsv(
}

if block_identifier.index > cursor.index {
cursor = block_identifier.clone(); // todo(lgalabru)
cursor = block_identifier.clone();
}
dump.insert(
block_identifier,
(parent_block_identifier, record.blob.take().unwrap()),
);
tsv_new_blocks.insert(block_identifier, (parent_block_identifier, line));
}

let mut canonical_fork = VecDeque::new();
while cursor.index > 0 {
let (block_identifer, (parent_block_identifier, blob)) =
match dump.remove_entry(&cursor) {
let (block_identifer, (parent_block_identifier, line)) =
match tsv_new_blocks.remove_entry(&cursor) {
Some(entry) => entry,
None => break,
None => {
warn!(
ctx.expect_logger(),
"Unable to find block {} with index block hash {} in TSV",
cursor.index,
cursor.hash
);
break;
}
};
cursor = parent_block_identifier.clone(); // todo(lgalabru)
canonical_fork.push_front((block_identifer, parent_block_identifier, blob));
cursor = parent_block_identifier.clone();
canonical_fork.push_front((block_identifer, parent_block_identifier, line));
}
canonical_fork
};
Expand Down Expand Up @@ -474,7 +485,10 @@ pub async fn scan_stacks_chainstate_via_csv_using_predicate(
);
let mut last_block_scanned = BlockIdentifier::default();
let mut err_count = 0;
for (block_identifier, _parent_block_identifier, blob) in canonical_fork.drain(..) {
let tsv_path = config.expected_local_stacks_tsv_file()?.clone();
let mut tsv_reader = BufReader::new(File::open(tsv_path).map_err(|e| e.to_string())?);
let mut tsv_current_line = 0;
for (block_identifier, _parent_block_identifier, tsv_line_number) in canonical_fork.drain(..) {
if block_identifier.index < start_block {
continue;
}
Expand All @@ -484,11 +498,25 @@ pub async fn scan_stacks_chainstate_via_csv_using_predicate(
}
}

// Seek to required line from TSV and retrieve its block payload.
let mut tsv_line = String::new();
while tsv_current_line < tsv_line_number {
tsv_line.clear();
let bytes_read = tsv_reader.read_line(&mut tsv_line).map_err(|e| e.to_string())?;
if bytes_read == 0 {
return Err("Unexpected EOF when reading TSV".to_string());
}
tsv_current_line += 1;
}
let Some(serialized_block) = tsv_line.split('\t').last() else {
return Err("Unable to retrieve serialized block from TSV line".to_string());
};

last_block_scanned = block_identifier;
blocks_scanned += 1;
let block_data = match indexer::stacks::standardize_stacks_serialized_block(
&indexer.config,
&blob,
serialized_block,
&mut indexer.stacks_context,
ctx,
) {
Expand Down Expand Up @@ -555,12 +583,12 @@ pub async fn consolidate_local_stacks_chainstate_using_csv(
);

let downloaded_new_dataset = download_stacks_dataset_if_required(config, ctx).await?;

if downloaded_new_dataset {
let stacks_db =
open_readonly_stacks_db_conn_with_retry(&config.expected_cache_path(), 3, ctx)?;
let confirmed_tip = get_last_block_height_inserted(&stacks_db, &ctx);
let mut canonical_fork = get_canonical_fork_from_tsv(config, confirmed_tip, ctx).await?;
let mut canonical_fork: VecDeque<(BlockIdentifier, BlockIdentifier, u64)> =
get_canonical_fork_from_tsv(config, confirmed_tip, ctx).await?;

let mut indexer = Indexer::new(config.network.clone());
let mut blocks_inserted = 0;
Expand All @@ -571,7 +599,14 @@ pub async fn consolidate_local_stacks_chainstate_using_csv(
ctx.expect_logger(),
"Beginning import of {} Stacks blocks into rocks db", blocks_to_insert
);
for (block_identifier, _parent_block_identifier, blob) in canonical_fork.drain(..) {
// TODO: To avoid repeating code with `scan_stacks_chainstate_via_csv_using_predicate`, we should move this block
// retrieval code into a reusable function.
let tsv_path = config.expected_local_stacks_tsv_file()?.clone();
let mut tsv_reader = BufReader::new(File::open(tsv_path).map_err(|e| e.to_string())?);
let mut tsv_current_line = 0;
for (block_identifier, _parent_block_identifier, tsv_line_number) in
canonical_fork.drain(..)
{
blocks_read += 1;

// If blocks already stored, move on
Expand All @@ -580,9 +615,23 @@ pub async fn consolidate_local_stacks_chainstate_using_csv(
}
blocks_inserted += 1;

// Seek to required line from TSV and retrieve its block payload.
let mut tsv_line = String::new();
while tsv_current_line < tsv_line_number {
tsv_line.clear();
let bytes_read = tsv_reader.read_line(&mut tsv_line).map_err(|e| e.to_string())?;
if bytes_read == 0 {
return Err("Unexpected EOF when reading TSV".to_string());
}
tsv_current_line += 1;
}
let Some(serialized_block) = tsv_line.split('\t').last() else {
return Err("Unable to retrieve serialized block from TSV line".to_string());
};

let block_data = match indexer::stacks::standardize_stacks_serialized_block(
&indexer.config,
&blob,
serialized_block,
&mut indexer.stacks_context,
ctx,
) {
Expand Down

0 comments on commit dcf545c

Please sign in to comment.