From 18be65ec6d741c9c421f078fe08dc5abd5feeb77 Mon Sep 17 00:00:00 2001 From: Yufeng Zhou Date: Fri, 27 Dec 2024 11:37:51 +0000 Subject: [PATCH] flamenco: add support for entry batches to live replay --- src/app/fdctl/run/tiles/fd_store_int.c | 14 ++- src/flamenco/runtime/fd_blockstore.h | 2 +- src/flamenco/runtime/fd_runtime.c | 145 +++++++++++++++++-------- src/flamenco/runtime/fd_runtime.h | 12 +- 4 files changed, 119 insertions(+), 54 deletions(-) diff --git a/src/app/fdctl/run/tiles/fd_store_int.c b/src/app/fdctl/run/tiles/fd_store_int.c index 929b8da8d5..eaa273adbb 100644 --- a/src/app/fdctl/run/tiles/fd_store_int.c +++ b/src/app/fdctl/run/tiles/fd_store_int.c @@ -406,8 +406,6 @@ fd_store_tile_slot_prepare( fd_store_tile_ctx_t * ctx, memcpy( out_buf, block_hash->uc, sizeof(fd_hash_t) ); out_buf += sizeof(fd_hash_t); - uchar * block_data = fd_blockstore_block_data_laddr( ctx->blockstore, block ); - FD_SCRATCH_SCOPE_BEGIN { ulong caught_up = slot > ctx->store->first_turbine_slot; ulong behind = ctx->store->curr_turbine_slot - slot; @@ -432,18 +430,24 @@ fd_store_tile_slot_prepare( fd_store_tile_ctx_t * ctx, slot, caught_up )); + /* calls fd_txn_parse_core on every txn in the block and copies the result into the mcache/dcache + sent to the replay tile, sending a maximum of 4096 transactions to the replay tile at a time */ fd_raw_block_txn_iter_t iter; fd_txn_iter_t * query = fd_txn_iter_map_query( ctx->txn_iter_map, slot, NULL); if( FD_LIKELY( query ) ) { iter = query->iter; } else { - iter = fd_raw_block_txn_iter_init( block_data, block->data_sz ); + iter = fd_raw_block_txn_iter_init( + fd_blockstore_block_data_laddr( ctx->blockstore, block ), + fd_blockstore_block_batch_laddr( ctx->blockstore, block ), + block->batch_cnt + ); } - for( ; !fd_raw_block_txn_iter_done( iter ); iter = fd_raw_block_txn_iter_next( block_data, iter ) ) { + for( ; !fd_raw_block_txn_iter_done( iter ); iter = fd_raw_block_txn_iter_next( iter ) ) { /* TODO: remove magic number for txns per send */ if( txn_cnt == 4096 ) break; - fd_raw_block_txn_iter_ele( block_data, iter, txns + txn_cnt ); + fd_raw_block_txn_iter_ele( iter, txns + txn_cnt ); txn_cnt++; } diff --git a/src/flamenco/runtime/fd_blockstore.h b/src/flamenco/runtime/fd_blockstore.h index dd21114ae7..1a60335552 100644 --- a/src/flamenco/runtime/fd_blockstore.h +++ b/src/flamenco/runtime/fd_blockstore.h @@ -234,7 +234,7 @@ struct fd_block { ulong batch_cnt; ulong micros_gaddr; /* ptr to the list of fd_blockstore_micro_t */ ulong micros_cnt; - ulong txns_gaddr; /* ptr to the list of fd_blockstore_txn_ref_t */ + ulong txns_gaddr; /* ptr to the list of fd_block_txn_t */ ulong txns_cnt; ulong txns_meta_gaddr; /* ptr to the allocation for txn meta data */ ulong txns_meta_sz; diff --git a/src/flamenco/runtime/fd_runtime.c b/src/flamenco/runtime/fd_runtime.c index 81d185f631..5be3d0c7f3 100644 --- a/src/flamenco/runtime/fd_runtime.c +++ b/src/flamenco/runtime/fd_runtime.c @@ -477,77 +477,133 @@ int fd_runtime_microblock_batch_prepare(void const *buf, // FD_LOG_WARNING(( "Curr iter data sz %lu offset %lu num txns %lu num mblks %lu curr txn sz %lu", iter.data_sz, iter.curr_offset, iter.remaining_txns, iter.remaining_microblocks, iter.curr_txn_sz )); // } +/* if we are currently in the middle of a batch, batch_cnt will include the current batch. + if we are at the start of a batch, batch_cnt will include the current batch. */ static fd_raw_block_txn_iter_t -find_next_txn_in_raw_block( uchar const * data, ulong data_sz, ulong existing_offset, ulong num_microblocks ) { - uchar const * base = data; - ulong num_txns = 0UL; - ulong sz = (ulong)data - (ulong)base; - while( !num_txns && (sz < data_sz) ) { - while( num_microblocks == 0 && (sz < data_sz) ) { - num_microblocks = FD_LOAD( ulong, data ); - data += sizeof( ulong ); - sz = (ulong)data - (ulong)base; +find_next_txn_in_raw_block( uchar const * orig_data, + fd_block_entry_batch_t const * batches, /* The batch we are currently consuming. */ + ulong batch_cnt, /* Includes batch we are currently consuming. */ + ulong curr_offset, + ulong num_microblocks ) { + + /* At this point, all the transactions in the current microblock have been consumed + by fd_raw_block_txn_iter_next */ + + /* Case 1: there are microblocks remaining in the current batch */ + for ( ulong i=0UL; iend_off - curr_offset, µblock_info.microblock_hdr, µblock_hdr_size ) != 0 ) { + // TODO improve error handling + FD_LOG_ERR(( "premature end of batch" )); } + curr_offset += microblock_hdr_size; + + /* If we have found a microblock with transactions in the current batch, return that */ + if ( FD_LIKELY( microblock_info.microblock_hdr.txn_cnt ) ) { + return (fd_raw_block_txn_iter_t){ + .curr_batch = batches, + .orig_data = orig_data, + .remaining_batches = batch_cnt, + .remaining_microblocks = fd_ulong_sat_sub( fd_ulong_sat_sub(num_microblocks, i), 1UL), + .remaining_txns = microblock_info.microblock_hdr.txn_cnt, + .curr_offset = curr_offset, + .curr_txn_sz = ULONG_MAX + }; + } + } - fd_microblock_info_t microblock_info = { - .raw_microblock = data, - .signature_cnt = 0, - }; + /* If we have consumed the current batch, but did not find any txns, we need to move on to the next one */ + curr_offset = batches->end_off; + batches++; + batch_cnt = fd_ulong_sat_sub(batch_cnt, 1UL); + + /* Case 2: need to find the next batch with a microblock in that has a non-zero number of txns */ + for( ulong i=0UL; i batch_end_off ) { + FD_LOG_ERR(( "premature end of batch" )); + } - while( microblock_info.microblock_hdr.txn_cnt == 0 && num_microblocks && sz < data_sz ) { - ulong hdr_sz = 0; - memset( µblock_info, 0UL, sizeof(fd_microblock_info_t) ); - microblock_info.raw_microblock = data; - if (fd_runtime_parse_microblock_hdr(data, data_sz - sz, µblock_info.microblock_hdr, &hdr_sz) != 0) { + /* Consume the ulong describing how many microblocks there are */ + num_microblocks = FD_LOAD( ulong, orig_data + curr_offset ); + curr_offset += sizeof( ulong ); + + /* Iterate over each microblock until we find one with a non-zero txn cnt */ + for( ulong j=0UL; jend_off; fd_txn_p_t out_txn; if( iter.curr_txn_sz == ULONG_MAX ) { ulong payload_sz = 0; - ulong txn_sz = fd_txn_parse_core( data + iter.curr_offset, fd_ulong_min( iter.data_sz, FD_TXN_MTU), TXN(&out_txn), NULL, &payload_sz ); + ulong txn_sz = fd_txn_parse_core( iter.orig_data + iter.curr_offset, fd_ulong_min( batch_end_off - iter.curr_offset, FD_TXN_MTU), TXN(&out_txn), NULL, &payload_sz ); if (txn_sz == 0 || txn_sz > FD_TXN_MTU) { FD_LOG_ERR(("Invalid txn parse")); } - iter.data_sz -= payload_sz; iter.curr_offset += payload_sz; } else { - iter.data_sz -= iter.curr_txn_sz; iter.curr_offset += iter.curr_txn_sz; iter.curr_txn_sz = ULONG_MAX; } @@ -556,17 +612,18 @@ fd_raw_block_txn_iter_next( uchar const * data, fd_raw_block_txn_iter_t iter ) { return iter; } - return find_next_txn_in_raw_block( data + iter.curr_offset, iter.data_sz, iter.curr_offset, iter.remaining_microblocks ); + return find_next_txn_in_raw_block( iter.orig_data, iter.curr_batch, iter.remaining_batches, iter.curr_offset, iter.remaining_microblocks ); } void -fd_raw_block_txn_iter_ele( uchar const * data, fd_raw_block_txn_iter_t iter, fd_txn_p_t * out_txn ) { +fd_raw_block_txn_iter_ele( fd_raw_block_txn_iter_t iter, fd_txn_p_t * out_txn ) { + ulong const batch_end_off = iter.curr_batch->end_off; ulong payload_sz = 0; - ulong txn_sz = fd_txn_parse_core( data + iter.curr_offset, fd_ulong_min( iter.data_sz, FD_TXN_MTU), TXN(out_txn), NULL, &payload_sz ); + ulong txn_sz = fd_txn_parse_core( iter.orig_data + iter.curr_offset, fd_ulong_min( batch_end_off - iter.curr_offset, FD_TXN_MTU), TXN(out_txn), NULL, &payload_sz ); if (txn_sz == 0 || txn_sz > FD_TXN_MTU) { FD_LOG_ERR(("Invalid txn parse %lu", txn_sz)); } - fd_memcpy( out_txn->payload, data + iter.curr_offset, payload_sz ); + fd_memcpy( out_txn->payload, iter.orig_data + iter.curr_offset, payload_sz ); out_txn->payload_sz = (ushort)payload_sz; iter.curr_txn_sz = payload_sz; } diff --git a/src/flamenco/runtime/fd_runtime.h b/src/flamenco/runtime/fd_runtime.h index 3cf0592cd2..9971b8d78b 100644 --- a/src/flamenco/runtime/fd_runtime.h +++ b/src/flamenco/runtime/fd_runtime.h @@ -70,10 +70,12 @@ struct fd_block_txn_iter { typedef struct fd_block_txn_iter fd_block_txn_iter_t; struct fd_raw_block_txn_iter { + fd_block_entry_batch_t const * curr_batch; + uchar const * orig_data; + ulong remaining_batches; ulong remaining_microblocks; ulong remaining_txns; ulong curr_offset; - ulong data_sz; ulong curr_txn_sz; }; @@ -488,16 +490,18 @@ fd_txn_p_t * fd_block_txn_iter_ele( fd_block_info_t const * block_info, fd_block_txn_iter_t iter ); fd_raw_block_txn_iter_t -fd_raw_block_txn_iter_init( uchar const * data, ulong data_sz ); +fd_raw_block_txn_iter_init( uchar const * orig_data, + fd_block_entry_batch_t const * batches, + ulong batch_cnt ); ulong fd_raw_block_txn_iter_done( fd_raw_block_txn_iter_t iter ); fd_raw_block_txn_iter_t -fd_raw_block_txn_iter_next( uchar const * data, fd_raw_block_txn_iter_t iter ); +fd_raw_block_txn_iter_next( fd_raw_block_txn_iter_t iter ); void -fd_raw_block_txn_iter_ele( uchar const * data, fd_raw_block_txn_iter_t iter, fd_txn_p_t * out_txn ); +fd_raw_block_txn_iter_ele( fd_raw_block_txn_iter_t iter, fd_txn_p_t * out_txn ); FD_PROTOTYPES_END