Skip to content

Commit

Permalink
flamenco: add support for entry batches to live replay
Browse files Browse the repository at this point in the history
  • Loading branch information
yufeng-jump authored and topointon-jump committed Dec 30, 2024
1 parent bfbddba commit 18be65e
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 54 deletions.
14 changes: 9 additions & 5 deletions src/app/fdctl/run/tiles/fd_store_int.c
Original file line number Diff line number Diff line change
Expand Up @@ -406,8 +406,6 @@ fd_store_tile_slot_prepare( fd_store_tile_ctx_t * ctx,
memcpy( out_buf, block_hash->uc, sizeof(fd_hash_t) );
out_buf += sizeof(fd_hash_t);

uchar * block_data = fd_blockstore_block_data_laddr( ctx->blockstore, block );

FD_SCRATCH_SCOPE_BEGIN {
ulong caught_up = slot > ctx->store->first_turbine_slot;
ulong behind = ctx->store->curr_turbine_slot - slot;
Expand All @@ -432,18 +430,24 @@ fd_store_tile_slot_prepare( fd_store_tile_ctx_t * ctx,
slot,
caught_up ));

/* calls fd_txn_parse_core on every txn in the block and copies the result into the mcache/dcache
sent to the replay tile, sending a maximum of 4096 transactions to the replay tile at a time */
fd_raw_block_txn_iter_t iter;
fd_txn_iter_t * query = fd_txn_iter_map_query( ctx->txn_iter_map, slot, NULL);
if( FD_LIKELY( query ) ) {
iter = query->iter;
} else {
iter = fd_raw_block_txn_iter_init( block_data, block->data_sz );
iter = fd_raw_block_txn_iter_init(
fd_blockstore_block_data_laddr( ctx->blockstore, block ),
fd_blockstore_block_batch_laddr( ctx->blockstore, block ),
block->batch_cnt
);
}

for( ; !fd_raw_block_txn_iter_done( iter ); iter = fd_raw_block_txn_iter_next( block_data, iter ) ) {
for( ; !fd_raw_block_txn_iter_done( iter ); iter = fd_raw_block_txn_iter_next( iter ) ) {
/* TODO: remove magic number for txns per send */
if( txn_cnt == 4096 ) break;
fd_raw_block_txn_iter_ele( block_data, iter, txns + txn_cnt );
fd_raw_block_txn_iter_ele( iter, txns + txn_cnt );
txn_cnt++;
}

Expand Down
2 changes: 1 addition & 1 deletion src/flamenco/runtime/fd_blockstore.h
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ struct fd_block {
ulong batch_cnt;
ulong micros_gaddr; /* ptr to the list of fd_blockstore_micro_t */
ulong micros_cnt;
ulong txns_gaddr; /* ptr to the list of fd_blockstore_txn_ref_t */
ulong txns_gaddr; /* ptr to the list of fd_block_txn_t */
ulong txns_cnt;
ulong txns_meta_gaddr; /* ptr to the allocation for txn meta data */
ulong txns_meta_sz;
Expand Down
145 changes: 101 additions & 44 deletions src/flamenco/runtime/fd_runtime.c
Original file line number Diff line number Diff line change
Expand Up @@ -477,77 +477,133 @@ int fd_runtime_microblock_batch_prepare(void const *buf,
// FD_LOG_WARNING(( "Curr iter data sz %lu offset %lu num txns %lu num mblks %lu curr txn sz %lu", iter.data_sz, iter.curr_offset, iter.remaining_txns, iter.remaining_microblocks, iter.curr_txn_sz ));
// }

/* if we are currently in the middle of a batch, batch_cnt will include the current batch.
if we are at the start of a batch, batch_cnt will include the current batch. */
static fd_raw_block_txn_iter_t
find_next_txn_in_raw_block( uchar const * data, ulong data_sz, ulong existing_offset, ulong num_microblocks ) {
uchar const * base = data;
ulong num_txns = 0UL;
ulong sz = (ulong)data - (ulong)base;
while( !num_txns && (sz < data_sz) ) {
while( num_microblocks == 0 && (sz < data_sz) ) {
num_microblocks = FD_LOAD( ulong, data );
data += sizeof( ulong );
sz = (ulong)data - (ulong)base;
find_next_txn_in_raw_block( uchar const * orig_data,
fd_block_entry_batch_t const * batches, /* The batch we are currently consuming. */
ulong batch_cnt, /* Includes batch we are currently consuming. */
ulong curr_offset,
ulong num_microblocks ) {

/* At this point, all the transactions in the current microblock have been consumed
by fd_raw_block_txn_iter_next */

/* Case 1: there are microblocks remaining in the current batch */
for ( ulong i=0UL; i<num_microblocks; i++ ) {
ulong microblock_hdr_size = 0UL;
fd_microblock_info_t microblock_info = { 0 };
if ( fd_runtime_parse_microblock_hdr( orig_data + curr_offset, batches->end_off - curr_offset, &microblock_info.microblock_hdr, &microblock_hdr_size ) != 0 ) {
// TODO improve error handling
FD_LOG_ERR(( "premature end of batch" ));
}
curr_offset += microblock_hdr_size;

/* If we have found a microblock with transactions in the current batch, return that */
if ( FD_LIKELY( microblock_info.microblock_hdr.txn_cnt ) ) {
return (fd_raw_block_txn_iter_t){
.curr_batch = batches,
.orig_data = orig_data,
.remaining_batches = batch_cnt,
.remaining_microblocks = fd_ulong_sat_sub( fd_ulong_sat_sub(num_microblocks, i), 1UL),
.remaining_txns = microblock_info.microblock_hdr.txn_cnt,
.curr_offset = curr_offset,
.curr_txn_sz = ULONG_MAX
};
}
}

fd_microblock_info_t microblock_info = {
.raw_microblock = data,
.signature_cnt = 0,
};
/* If we have consumed the current batch, but did not find any txns, we need to move on to the next one */
curr_offset = batches->end_off;
batches++;
batch_cnt = fd_ulong_sat_sub(batch_cnt, 1UL);

/* Case 2: need to find the next batch with a microblock in that has a non-zero number of txns */
for( ulong i=0UL; i<batch_cnt; i++ ) {
/* Sanity-check that we have not over-shot the end of the batch */
ulong const batch_end_off = batches[i].end_off;
if( curr_offset + sizeof( ulong ) > batch_end_off ) {
FD_LOG_ERR(( "premature end of batch" ));
}

while( microblock_info.microblock_hdr.txn_cnt == 0 && num_microblocks && sz < data_sz ) {
ulong hdr_sz = 0;
memset( &microblock_info, 0UL, sizeof(fd_microblock_info_t) );
microblock_info.raw_microblock = data;
if (fd_runtime_parse_microblock_hdr(data, data_sz - sz, &microblock_info.microblock_hdr, &hdr_sz) != 0) {
/* Consume the ulong describing how many microblocks there are */
num_microblocks = FD_LOAD( ulong, orig_data + curr_offset );
curr_offset += sizeof( ulong );

/* Iterate over each microblock until we find one with a non-zero txn cnt */
for( ulong j=0UL; j<num_microblocks; j++ ) {
ulong microblock_hdr_size = 0UL;
fd_microblock_info_t microblock_info = { 0 };
if ( fd_runtime_parse_microblock_hdr( orig_data + curr_offset, batch_end_off - curr_offset, &microblock_info.microblock_hdr, &microblock_hdr_size ) != 0 ) {
// TODO improve error handling
FD_LOG_ERR(( "premature end of batch" ));
}
curr_offset += microblock_hdr_size;

/* If we have found a microblock with a non-zero number of transactions in, return that */
if( FD_LIKELY( microblock_info.microblock_hdr.txn_cnt ) ) {
return (fd_raw_block_txn_iter_t){
.data_sz = 0,
.curr_offset = data_sz,
.remaining_microblocks = 0,
.remaining_txns = 0,
.curr_txn_sz = ULONG_MAX
.curr_batch = &batches[i],
.orig_data = orig_data,
.remaining_batches = fd_ulong_sat_sub(batch_cnt, i),
.remaining_microblocks = fd_ulong_sat_sub( fd_ulong_sat_sub(num_microblocks, j), 1UL),
.remaining_txns = microblock_info.microblock_hdr.txn_cnt,
.curr_offset = curr_offset,
.curr_txn_sz = ULONG_MAX
};
}
data += hdr_sz;
sz = (ulong)data - (ulong)base;
num_microblocks--;
}

num_txns = microblock_info.microblock_hdr.txn_cnt;
/* skip to the start of the next batch */
curr_offset = batch_end_off;
}

ulong curr_off = sz;
/* Case 3: we didn't manage to find any microblocks with non-zero transaction counts in */
return (fd_raw_block_txn_iter_t){
.data_sz = fd_ulong_sat_sub(data_sz, curr_off),
.curr_offset = existing_offset + curr_off,
.remaining_microblocks = num_microblocks,
.remaining_txns = num_txns,
.curr_txn_sz = ULONG_MAX
.curr_batch = batches,
.orig_data = orig_data,
.remaining_batches = 0UL,
.remaining_microblocks = 0UL,
.remaining_txns = 0UL,
.curr_offset = curr_offset,
.curr_txn_sz = ULONG_MAX
};
}

fd_raw_block_txn_iter_t
fd_raw_block_txn_iter_init( uchar const * data, ulong data_sz ) {
return find_next_txn_in_raw_block( data, data_sz, 0, 0 );
fd_raw_block_txn_iter_init( uchar const * orig_data,
fd_block_entry_batch_t const * batches,
ulong batch_cnt ) {
/*
* In general, every read of a lower level count should lead to a
* decrement of a higher level count. For example, reading a count
* of microblocks should lead to a decrement of the number of
* remaining batches. In some sense, the batch count is drained into
* the microblock count.
*
*/

ulong num_microblocks = FD_LOAD( ulong, orig_data );
return find_next_txn_in_raw_block( orig_data, batches, batch_cnt, sizeof( ulong ), num_microblocks );
}

ulong
fd_raw_block_txn_iter_done( fd_raw_block_txn_iter_t iter ) {
return iter.data_sz == 0;
return iter.remaining_batches == 0UL && iter.remaining_microblocks == 0UL && iter.remaining_txns == 0UL;
}

fd_raw_block_txn_iter_t
fd_raw_block_txn_iter_next( uchar const * data, fd_raw_block_txn_iter_t iter ) {
fd_raw_block_txn_iter_next( fd_raw_block_txn_iter_t iter ) {
ulong const batch_end_off = iter.curr_batch->end_off;
fd_txn_p_t out_txn;
if( iter.curr_txn_sz == ULONG_MAX ) {
ulong payload_sz = 0;
ulong txn_sz = fd_txn_parse_core( data + iter.curr_offset, fd_ulong_min( iter.data_sz, FD_TXN_MTU), TXN(&out_txn), NULL, &payload_sz );
ulong txn_sz = fd_txn_parse_core( iter.orig_data + iter.curr_offset, fd_ulong_min( batch_end_off - iter.curr_offset, FD_TXN_MTU), TXN(&out_txn), NULL, &payload_sz );
if (txn_sz == 0 || txn_sz > FD_TXN_MTU) {
FD_LOG_ERR(("Invalid txn parse"));
}
iter.data_sz -= payload_sz;
iter.curr_offset += payload_sz;
} else {
iter.data_sz -= iter.curr_txn_sz;
iter.curr_offset += iter.curr_txn_sz;
iter.curr_txn_sz = ULONG_MAX;
}
Expand All @@ -556,17 +612,18 @@ fd_raw_block_txn_iter_next( uchar const * data, fd_raw_block_txn_iter_t iter ) {
return iter;
}

return find_next_txn_in_raw_block( data + iter.curr_offset, iter.data_sz, iter.curr_offset, iter.remaining_microblocks );
return find_next_txn_in_raw_block( iter.orig_data, iter.curr_batch, iter.remaining_batches, iter.curr_offset, iter.remaining_microblocks );
}

void
fd_raw_block_txn_iter_ele( uchar const * data, fd_raw_block_txn_iter_t iter, fd_txn_p_t * out_txn ) {
fd_raw_block_txn_iter_ele( fd_raw_block_txn_iter_t iter, fd_txn_p_t * out_txn ) {
ulong const batch_end_off = iter.curr_batch->end_off;
ulong payload_sz = 0;
ulong txn_sz = fd_txn_parse_core( data + iter.curr_offset, fd_ulong_min( iter.data_sz, FD_TXN_MTU), TXN(out_txn), NULL, &payload_sz );
ulong txn_sz = fd_txn_parse_core( iter.orig_data + iter.curr_offset, fd_ulong_min( batch_end_off - iter.curr_offset, FD_TXN_MTU), TXN(out_txn), NULL, &payload_sz );
if (txn_sz == 0 || txn_sz > FD_TXN_MTU) {
FD_LOG_ERR(("Invalid txn parse %lu", txn_sz));
}
fd_memcpy( out_txn->payload, data + iter.curr_offset, payload_sz );
fd_memcpy( out_txn->payload, iter.orig_data + iter.curr_offset, payload_sz );
out_txn->payload_sz = (ushort)payload_sz;
iter.curr_txn_sz = payload_sz;
}
Expand Down
12 changes: 8 additions & 4 deletions src/flamenco/runtime/fd_runtime.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,12 @@ struct fd_block_txn_iter {
typedef struct fd_block_txn_iter fd_block_txn_iter_t;

struct fd_raw_block_txn_iter {
fd_block_entry_batch_t const * curr_batch;
uchar const * orig_data;
ulong remaining_batches;
ulong remaining_microblocks;
ulong remaining_txns;
ulong curr_offset;
ulong data_sz;

ulong curr_txn_sz;
};
Expand Down Expand Up @@ -488,16 +490,18 @@ fd_txn_p_t *
fd_block_txn_iter_ele( fd_block_info_t const * block_info, fd_block_txn_iter_t iter );

fd_raw_block_txn_iter_t
fd_raw_block_txn_iter_init( uchar const * data, ulong data_sz );
fd_raw_block_txn_iter_init( uchar const * orig_data,
fd_block_entry_batch_t const * batches,
ulong batch_cnt );

ulong
fd_raw_block_txn_iter_done( fd_raw_block_txn_iter_t iter );

fd_raw_block_txn_iter_t
fd_raw_block_txn_iter_next( uchar const * data, fd_raw_block_txn_iter_t iter );
fd_raw_block_txn_iter_next( fd_raw_block_txn_iter_t iter );

void
fd_raw_block_txn_iter_ele( uchar const * data, fd_raw_block_txn_iter_t iter, fd_txn_p_t * out_txn );
fd_raw_block_txn_iter_ele( fd_raw_block_txn_iter_t iter, fd_txn_p_t * out_txn );

FD_PROTOTYPES_END

Expand Down

0 comments on commit 18be65e

Please sign in to comment.