diff --git a/src/app/fdctl/Local.mk b/src/app/fdctl/Local.mk index bb7952bd5b..2bb4a419a8 100644 --- a/src/app/fdctl/Local.mk +++ b/src/app/fdctl/Local.mk @@ -50,8 +50,8 @@ $(call add-objs,run/tiles/fd_poh_int,fd_fdctl) $(call add-objs,run/tiles/fd_sender,fd_fdctl) $(call add-objs,run/tiles/fd_eqvoc,fd_fdctl) $(call add-objs,run/tiles/fd_rpcserv,fd_fdctl) -$(call add-objs,run/tiles/fd_snapshot,fd_fdctl) -$(call add-objs,run/tiles/fd_snapshot_thread,fd_fdctl) +$(call add-objs,run/tiles/fd_batch,fd_fdctl) +$(call add-objs,run/tiles/fd_batch_thread,fd_fdctl) endif # fdctl topologies diff --git a/src/app/fdctl/config.h b/src/app/fdctl/config.h index a73fccb5ea..f627aff32b 100644 --- a/src/app/fdctl/config.h +++ b/src/app/fdctl/config.h @@ -310,7 +310,7 @@ typedef struct { ulong incremental_interval; char out_dir[ PATH_MAX ]; ulong hash_tpool_thread_count; - } snaps; + } batch; } tiles; } config_t; diff --git a/src/app/fdctl/config/default-firedancer.toml b/src/app/fdctl/config/default-firedancer.toml index b042cb8086..f9f43dd424 100644 --- a/src/app/fdctl/config/default-firedancer.toml +++ b/src/app/fdctl/config/default-firedancer.toml @@ -22,8 +22,8 @@ cluster_version = "1.18.0" [tiles.pack] use_consumed_cus = false - [tiles.snaps] - hash_tpool_thread_count = 2 + [tiles.batch] + hash_tpool_thread_count = 3 [consensus] vote = true diff --git a/src/app/fdctl/config_parse.c b/src/app/fdctl/config_parse.c index b283ad1ab8..2441e52189 100644 --- a/src/app/fdctl/config_parse.c +++ b/src/app/fdctl/config_parse.c @@ -389,10 +389,10 @@ fdctl_pod_to_cfg( config_t * config, CFG_POP ( cstr, tiles.store_int.shred_cap_archive ); CFG_POP ( cstr, tiles.store_int.shred_cap_replay ); - CFG_POP ( ulong, tiles.snaps.full_interval ); - CFG_POP ( ulong, tiles.snaps.incremental_interval ); - CFG_POP ( cstr, tiles.snaps.out_dir ); - CFG_POP ( ulong, tiles.snaps.hash_tpool_thread_count ); + CFG_POP ( ulong, tiles.batch.full_interval ); + CFG_POP ( ulong, tiles.batch.incremental_interval ); + CFG_POP ( cstr, tiles.batch.out_dir ); + CFG_POP ( ulong, tiles.batch.hash_tpool_thread_count ); # undef CFG_POP # undef CFG_ARRAY diff --git a/src/app/fdctl/main.c b/src/app/fdctl/main.c index 97438782fe..ab5d0a6b38 100644 --- a/src/app/fdctl/main.c +++ b/src/app/fdctl/main.c @@ -37,7 +37,8 @@ extern fd_topo_run_tile_t fd_tile_repair; extern fd_topo_run_tile_t fd_tile_store_int; extern fd_topo_run_tile_t fd_tile_replay; extern fd_topo_run_tile_t fd_tile_replay_thread; -extern fd_topo_run_tile_t fd_tile_snaps_thread; +extern fd_topo_run_tile_t fd_tile_batch; +extern fd_topo_run_tile_t fd_tile_batch_thread; extern fd_topo_run_tile_t fd_tile_poh_int; extern fd_topo_run_tile_t fd_tile_sender; extern fd_topo_run_tile_t fd_tile_eqvoc; @@ -67,6 +68,8 @@ fd_topo_run_tile_t * TILES[] = { &fd_tile_store_int, &fd_tile_replay, &fd_tile_replay_thread, + &fd_tile_batch, + &fd_tile_batch_thread, &fd_tile_poh_int, &fd_tile_sender, &fd_tile_eqvoc, diff --git a/src/app/fdctl/ready.c b/src/app/fdctl/ready.c index bbf9035019..388be35745 100644 --- a/src/app/fdctl/ready.c +++ b/src/app/fdctl/ready.c @@ -23,9 +23,9 @@ ready_cmd_fn( args_t * args, anyway. */ if( FD_UNLIKELY( tile->is_agave ) ) continue; - /* Don't wait for rtpool/stpool tiles, they will not report ready. */ + /* Don't wait for rtpool/btpool tiles, they will not report ready. */ if( strncmp( tile->name, "rtpool", 7 )==0 ) continue; - if( strncmp( tile->name, "stpool", 7 )==0 ) continue; + if( strncmp( tile->name, "btpool", 7 )==0 ) continue; long start = fd_log_wallclock(); int printed = 0; diff --git a/src/app/fdctl/run/tiles/snapshot.seccomppolicy b/src/app/fdctl/run/tiles/batch.seccomppolicy similarity index 100% rename from src/app/fdctl/run/tiles/snapshot.seccomppolicy rename to src/app/fdctl/run/tiles/batch.seccomppolicy diff --git a/src/app/fdctl/run/tiles/fd_snapshot.c b/src/app/fdctl/run/tiles/fd_batch.c similarity index 62% rename from src/app/fdctl/run/tiles/fd_snapshot.c rename to src/app/fdctl/run/tiles/fd_batch.c index 18c91bd40a..97d2001f9a 100644 --- a/src/app/fdctl/run/tiles/fd_snapshot.c +++ b/src/app/fdctl/run/tiles/fd_batch.c @@ -2,20 +2,22 @@ #include "../../../../disco/topo/fd_pod_format.h" #include "../../../../funk/fd_funk.h" +#include "../../../../funk/fd_funk_filemap.h" + +#include "../../../../flamenco/runtime/fd_hashes.h" #include "../../../../flamenco/runtime/fd_txncache.h" #include "../../../../flamenco/runtime/fd_runtime.h" #include "../../../../flamenco/snapshot/fd_snapshot_create.h" -#include "../../../../funk/fd_funk_filemap.h" -#include -#include /* SEEK_SET */ -#include /* readlink, lseek, ftruncate */ +#include "generated/batch_seccomp.h" -#include "generated/snapshot_seccomp.h" +#include +#include -#define SCRATCH_MAX (1024UL << 24 ) /* 24 MiB */ -#define SCRATCH_DEPTH (256UL) /* 256 scratch frames */ -#define TPOOL_WORKER_MEM_SZ (1UL<<30UL) /* 256MB */ +#define SCRATCH_MAX (1024UL << 24) /* 24 MiB */ +#define SCRATCH_DEPTH (256UL) /* 256 scratch frames */ +#define TPOOL_WORKER_MEM_SZ (1UL<<30UL) /* 256MB */ +#define REPLAY_OUT_IDX (0UL) struct fd_snapshot_tile_ctx { /* User defined parameters. */ @@ -46,27 +48,39 @@ struct fd_snapshot_tile_ctx { ulong last_full_snap_slot; fd_hash_t last_hash; ulong last_capitalization; + + /* Replay out link fields for epoch account hash. */ + fd_wksp_t * replay_out_mem; + ulong replay_out_chunk; }; typedef struct fd_snapshot_tile_ctx fd_snapshot_tile_ctx_t; -void FD_FN_UNUSED -tpool_snap_boot( fd_topo_t * topo, ulong total_thread_count ) { - ushort tile_to_cpu[ FD_TILE_MAX ] = { 0 }; +void +tpool_batch_boot( fd_topo_t * topo, ulong total_thread_count ) { + ushort tile_to_cpu[ FD_TILE_MAX ] = {0}; ulong thread_count = 0UL; ulong main_thread_seen = 0UL; for( ulong i=0UL; itile_cnt; i++ ) { - if( !strcmp( topo->tiles[i].name, "stpool" ) ) { - tile_to_cpu[ thread_count++ ] = (ushort)topo->tiles[ i ].cpu_idx; + if( strcmp( topo->tiles[i].name, "btpool" ) == 0 ) { + tile_to_cpu[ 1+thread_count ] = (ushort)topo->tiles[i].cpu_idx; + thread_count++; } + if( strcmp( topo->tiles[i].name, "batch" ) == 0 ) { + tile_to_cpu[ 0 ] = (ushort)topo->tiles[i].cpu_idx; + main_thread_seen = 1; + } + } + + if( main_thread_seen ) { + thread_count++; } - if( FD_UNLIKELY( thread_count!=total_thread_count ) ) { + if( thread_count != total_thread_count ) FD_LOG_ERR(( "thread count mismatch thread_count=%lu total_thread_count=%lu main_thread_seen=%lu", - thread_count, - total_thread_count, + thread_count, + total_thread_count, main_thread_seen )); - } fd_tile_private_map_boot( tile_to_cpu, thread_count ); } @@ -80,7 +94,7 @@ FD_FN_PURE static inline ulong scratch_footprint( fd_topo_tile_t const * tile FD_PARAM_UNUSED ) { ulong l = FD_LAYOUT_INIT; l = FD_LAYOUT_APPEND( l, alignof(fd_snapshot_tile_ctx_t), sizeof(fd_snapshot_tile_ctx_t) ); - l = FD_LAYOUT_APPEND( l, FD_SCRATCH_ALIGN_DEFAULT, tile->snaps.hash_tpool_thread_count * TPOOL_WORKER_MEM_SZ ); + l = FD_LAYOUT_APPEND( l, FD_SCRATCH_ALIGN_DEFAULT, tile->batch.hash_tpool_thread_count * TPOOL_WORKER_MEM_SZ ); l = FD_LAYOUT_APPEND( l, fd_scratch_smem_align(), fd_scratch_smem_footprint( SCRATCH_MAX ) ); l = FD_LAYOUT_APPEND( l, fd_scratch_fmem_align(), fd_scratch_fmem_footprint( SCRATCH_DEPTH ) ); return FD_LAYOUT_FINI( l, scratch_align() ); @@ -94,48 +108,48 @@ privileged_init( fd_topo_t * topo FD_PARAM_UNUSED, this to support multiple files. */ char tmp_dir_buf[ FD_SNAPSHOT_DIR_MAX ]; - int err = snprintf( tmp_dir_buf, FD_SNAPSHOT_DIR_MAX, "%s/%s", tile->snaps.out_dir, FD_SNAPSHOT_TMP_ARCHIVE ); + int err = snprintf( tmp_dir_buf, FD_SNAPSHOT_DIR_MAX, "%s/%s", tile->batch.out_dir, FD_SNAPSHOT_TMP_ARCHIVE ); if( FD_UNLIKELY( err<0 ) ) { FD_LOG_ERR(( "Failed to format directory string" )); } char tmp_inc_dir_buf[ FD_SNAPSHOT_DIR_MAX ]; - err = snprintf( tmp_inc_dir_buf, FD_SNAPSHOT_DIR_MAX, "%s/%s", tile->snaps.out_dir, FD_SNAPSHOT_TMP_INCR_ARCHIVE ); + err = snprintf( tmp_inc_dir_buf, FD_SNAPSHOT_DIR_MAX, "%s/%s", tile->batch.out_dir, FD_SNAPSHOT_TMP_INCR_ARCHIVE ); if( FD_UNLIKELY( err<0 ) ) { FD_LOG_ERR(( "Failed to format directory string" )); } char zstd_dir_buf[ FD_SNAPSHOT_DIR_MAX ]; - err = snprintf( zstd_dir_buf, FD_SNAPSHOT_DIR_MAX, "%s/%s", tile->snaps.out_dir, FD_SNAPSHOT_TMP_FULL_ARCHIVE_ZSTD ); + err = snprintf( zstd_dir_buf, FD_SNAPSHOT_DIR_MAX, "%s/%s", tile->batch.out_dir, FD_SNAPSHOT_TMP_FULL_ARCHIVE_ZSTD ); if( FD_UNLIKELY( err<0 ) ) { FD_LOG_ERR(( "Failed to format directory string" )); } char zstd_inc_dir_buf[ FD_SNAPSHOT_DIR_MAX ]; - err = snprintf( zstd_inc_dir_buf, FD_SNAPSHOT_DIR_MAX, "%s/%s", tile->snaps.out_dir, FD_SNAPSHOT_TMP_INCR_ARCHIVE_ZSTD ); + err = snprintf( zstd_inc_dir_buf, FD_SNAPSHOT_DIR_MAX, "%s/%s", tile->batch.out_dir, FD_SNAPSHOT_TMP_INCR_ARCHIVE_ZSTD ); if( FD_UNLIKELY( err<0 ) ) { FD_LOG_ERR(( "Failed to format directory string" )); } /* Create and open the relevant files for snapshots. */ - tile->snaps.tmp_fd = open( tmp_dir_buf, O_CREAT | O_RDWR | O_TRUNC, 0644 ); - if( FD_UNLIKELY( tile->snaps.tmp_fd==-1 ) ) { + tile->batch.tmp_fd = open( tmp_dir_buf, O_CREAT | O_RDWR | O_TRUNC, 0644 ); + if( FD_UNLIKELY( tile->batch.tmp_fd==-1 ) ) { FD_LOG_ERR(( "Failed to open and create tarball for file=%s (%i-%s)", tmp_dir_buf, errno, fd_io_strerror( errno ) )); } - tile->snaps.tmp_inc_fd = open( tmp_inc_dir_buf, O_CREAT | O_RDWR | O_TRUNC, 0644 ); - if( FD_UNLIKELY( tile->snaps.tmp_inc_fd==-1 ) ) { + tile->batch.tmp_inc_fd = open( tmp_inc_dir_buf, O_CREAT | O_RDWR | O_TRUNC, 0644 ); + if( FD_UNLIKELY( tile->batch.tmp_inc_fd==-1 ) ) { FD_LOG_ERR(( "Failed to open and create tarball for file=%s (%i-%s)", tmp_dir_buf, errno, fd_io_strerror( errno ) )); } - tile->snaps.full_snapshot_fd = open( zstd_dir_buf, O_RDWR | O_CREAT | O_TRUNC, 0644 ); - if( FD_UNLIKELY( tile->snaps.full_snapshot_fd==-1 ) ) { + tile->batch.full_snapshot_fd = open( zstd_dir_buf, O_RDWR | O_CREAT | O_TRUNC, 0644 ); + if( FD_UNLIKELY( tile->batch.full_snapshot_fd==-1 ) ) { FD_LOG_WARNING(( "Failed to open the snapshot file (%i-%s)", errno, fd_io_strerror( errno ) )); } - tile->snaps.incremental_snapshot_fd = open( zstd_inc_dir_buf, O_RDWR | O_CREAT | O_TRUNC, 0644 ); - if( FD_UNLIKELY( tile->snaps.incremental_snapshot_fd==-1 ) ) { + tile->batch.incremental_snapshot_fd = open( zstd_inc_dir_buf, O_RDWR | O_CREAT | O_TRUNC, 0644 ); + if( FD_UNLIKELY( tile->batch.incremental_snapshot_fd==-1 ) ) { FD_LOG_WARNING(( "Failed to open the snapshot file (%i-%s)", errno, fd_io_strerror( errno ) )); } } @@ -146,6 +160,11 @@ unprivileged_init( fd_topo_t * topo FD_PARAM_UNUSED, void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id ); + if( FD_UNLIKELY( tile->out_cnt!=1UL || strcmp( topo->links[ tile->out_link_id[ REPLAY_OUT_IDX ] ].name, "batch_replay" ) ) ) { + FD_LOG_ERR(( "batch tile has none or unexpected output links %lu %s", + tile->out_cnt, topo->links[ tile->out_link_id[ REPLAY_OUT_IDX ] ].name )); + } + /**********************************************************************/ /* scratch (bump)-allocate memory owned by the replay tile */ /**********************************************************************/ @@ -155,35 +174,35 @@ unprivileged_init( fd_topo_t * topo FD_PARAM_UNUSED, FD_SCRATCH_ALLOC_INIT( l, scratch ); fd_snapshot_tile_ctx_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_snapshot_tile_ctx_t), sizeof(fd_snapshot_tile_ctx_t) ); memset( ctx, 0, sizeof(fd_snapshot_tile_ctx_t) ); - void * tpool_worker_mem = FD_SCRATCH_ALLOC_APPEND( l, FD_SCRATCH_ALIGN_DEFAULT, tile->snaps.hash_tpool_thread_count * TPOOL_WORKER_MEM_SZ ); + void * tpool_worker_mem = FD_SCRATCH_ALLOC_APPEND( l, FD_SCRATCH_ALIGN_DEFAULT, tile->batch.hash_tpool_thread_count * TPOOL_WORKER_MEM_SZ ); void * scratch_smem = FD_SCRATCH_ALLOC_APPEND( l, fd_scratch_smem_align(), fd_scratch_smem_footprint( SCRATCH_MAX ) ); void * scratch_fmem = FD_SCRATCH_ALLOC_APPEND( l, fd_scratch_fmem_align(), fd_scratch_fmem_footprint( SCRATCH_DEPTH ) ); ulong scratch_alloc_mem = FD_SCRATCH_ALLOC_FINI ( l, scratch_align() ); - ctx->full_interval = tile->snaps.full_interval; - ctx->incremental_interval = tile->snaps.incremental_interval; - ctx->out_dir = tile->snaps.out_dir; - ctx->tmp_fd = tile->snaps.tmp_fd; - ctx->tmp_inc_fd = tile->snaps.tmp_inc_fd; - ctx->full_snapshot_fd = tile->snaps.full_snapshot_fd; - ctx->incremental_snapshot_fd = tile->snaps.incremental_snapshot_fd; + ctx->full_interval = tile->batch.full_interval; + ctx->incremental_interval = tile->batch.incremental_interval; + ctx->out_dir = tile->batch.out_dir; + ctx->tmp_fd = tile->batch.tmp_fd; + ctx->tmp_inc_fd = tile->batch.tmp_inc_fd; + ctx->full_snapshot_fd = tile->batch.full_snapshot_fd; + ctx->incremental_snapshot_fd = tile->batch.incremental_snapshot_fd; /**********************************************************************/ /* tpool */ /**********************************************************************/ - FD_LOG_NOTICE(( "Number of threads in hash tpool: %lu", tile->snaps.hash_tpool_thread_count )); + FD_LOG_NOTICE(( "Number of threads in hash tpool: %lu", tile->batch.hash_tpool_thread_count )); - if( FD_LIKELY( tile->snaps.hash_tpool_thread_count>1UL ) ) { - tpool_snap_boot( topo, tile->snaps.hash_tpool_thread_count ); - ctx->tpool = fd_tpool_init( ctx->tpool_mem, tile->snaps.hash_tpool_thread_count ); + if( FD_LIKELY( tile->batch.hash_tpool_thread_count>1UL ) ) { + tpool_batch_boot( topo, tile->batch.hash_tpool_thread_count ); + ctx->tpool = fd_tpool_init( ctx->tpool_mem, tile->batch.hash_tpool_thread_count ); } else { ctx->tpool = NULL; } - if( FD_LIKELY( tile->snaps.hash_tpool_thread_count>1UL ) ) { + if( FD_LIKELY( tile->batch.hash_tpool_thread_count>1UL ) ) { /* Start the tpool workers */ - for( ulong i=1UL; isnaps.hash_tpool_thread_count; i++ ) { + for( ulong i=1UL; ibatch.hash_tpool_thread_count; i++ ) { if( FD_UNLIKELY( !fd_tpool_worker_push( ctx->tpool, i, (uchar *)tpool_worker_mem + TPOOL_WORKER_MEM_SZ*(i - 1U), TPOOL_WORKER_MEM_SZ ) ) ) { FD_LOG_ERR(( "failed to launch worker" )); } @@ -254,39 +273,22 @@ unprivileged_init( fd_topo_t * topo FD_PARAM_UNUSED, ctx->last_full_snap_slot = 0UL; ctx->last_capitalization = 0UL; fd_memset( &ctx->last_hash, 0, sizeof(fd_hash_t) ); -} - -static void -after_credit( fd_snapshot_tile_ctx_t * ctx FD_PARAM_UNUSED, - fd_stem_context_t * stem FD_PARAM_UNUSED, - int * opt_poll_in FD_PARAM_UNUSED, - int * charge_busy FD_PARAM_UNUSED ) { - ulong is_constipated = fd_fseq_query( ctx->is_constipated ); + /****************************************************************************/ + /* Replay Tile Link */ + /****************************************************************************/ - if( !is_constipated ) { - return; - } - - if( FD_UNLIKELY( !ctx->is_funk_active ) ) { - ctx->funk = fd_funk_open_file( ctx->funk_file, - 1, - 0, - 0, - 0, - 0, - FD_FUNK_READ_WRITE, - NULL ); - if( FD_UNLIKELY( !ctx->funk ) ) { - FD_LOG_ERR(( "failed to join a funky" )); - } - ctx->is_funk_active = 1; + /* Set up replay output */ + fd_topo_link_t * replay_out = &topo->links[ tile->out_link_id[ REPLAY_OUT_IDX ] ]; + ctx->replay_out_mem = topo->workspaces[ topo->objs[ replay_out->dcache_obj_id ].wksp_id ].wksp; + ctx->replay_out_chunk = fd_dcache_compact_chunk0( ctx->replay_out_mem, replay_out->dcache );; +} - FD_LOG_WARNING(( "Just joined funk at file=%s", ctx->funk_file )); - } +static void +produce_snapshot( fd_snapshot_tile_ctx_t * ctx, ulong batch_fseq ) { - ulong is_incremental = fd_snapshot_create_get_is_incremental( is_constipated ); - ulong snapshot_slot = fd_snapshot_create_get_slot( is_constipated ); + ulong is_incremental = fd_batch_fseq_is_incremental( batch_fseq ); + ulong snapshot_slot = fd_batch_fseq_get_slot( batch_fseq ); if( !is_incremental ) { ctx->last_full_snap_slot = snapshot_slot; @@ -366,10 +368,136 @@ after_credit( fd_snapshot_tile_ctx_t * ctx FD_PARAM_UNUSED, FD_LOG_NOTICE(( "Done creating a snapshot in %s", snapshot_ctx.out_dir )); + /* At this point the snapshot has been succesfully created, so we can + unconstipate funk and any related data structures in the replay tile. */ + + fd_fseq_update( ctx->is_constipated, 0UL ); + +} + +static fd_funk_txn_t* +get_eah_txn( fd_funk_t * funk, ulong slot ) { + + fd_funk_txn_t * txn_map = fd_funk_txn_map( funk, fd_funk_wksp( funk ) ); + for( fd_funk_txn_map_iter_t iter = fd_funk_txn_map_iter_init( txn_map ); + !fd_funk_txn_map_iter_done( txn_map, iter ); + iter = fd_funk_txn_map_iter_next( txn_map, iter ) ) { + fd_funk_txn_t * txn = fd_funk_txn_map_iter_ele( txn_map, iter ); + if( txn->xid.ul[0]==slot ) { + FD_LOG_NOTICE(( "Found transaction for eah" )); + return txn; + } + } + FD_LOG_NOTICE(( "Calculating eah from root" )); + return NULL; +} + +static void +produce_eah( fd_snapshot_tile_ctx_t * ctx, fd_stem_context_t * stem, ulong batch_fseq ) { + + ulong eah_slot = fd_batch_fseq_get_slot( batch_fseq ); + + FD_LOG_WARNING(( "Begining to produce epoch account hash in background for slot=%lu", eah_slot )); + + /* TODO: Perhaps it makes sense to factor this out into a function in the + runtime as this could technically be considered a layering violation. */ + + /* First, we must retrieve the corresponding slot_bank. We have the guarantee + that the root record is frozen from the replay tile. */ + + fd_funk_t * funk = ctx->funk; + fd_funk_txn_t * eah_txn = get_eah_txn( funk, eah_slot ); + fd_funk_rec_key_t slot_id = fd_runtime_slot_bank_key(); + fd_funk_rec_t const * slot_rec = fd_funk_rec_query( funk, eah_txn, &slot_id ); + if( FD_UNLIKELY( !slot_rec ) ) { + FD_LOG_ERR(( "Failed to read slot bank record: missing record" )); + } + void * slot_val = fd_funk_val( slot_rec, fd_funk_wksp( funk ) ); + + if( FD_UNLIKELY( fd_funk_val_sz( slot_rec )tpool, &epoch_account_hash ); + + FD_LOG_NOTICE(( "Done computing epoch account hash (%s)", FD_BASE58_ENC_32_ALLOCA( &epoch_account_hash ) )); + + /* Once the hash is calculated, we are ready to push the computed hash + onto the out link to replay. We don't need to add any other information + as this is the only type of message that is transmitted. */ + + uchar * out_buf = fd_chunk_to_laddr( ctx->replay_out_mem, ctx->replay_out_chunk ); + fd_memcpy( out_buf, epoch_account_hash.uc, sizeof(fd_hash_t) ); + fd_stem_publish( stem, 0UL, 0UL, ctx->replay_out_chunk, 1UL, 0UL, 0UL, 0UL ); + + /* Reset the fseq allowing for the un-constipation of funk and allow for + snapshots to be created again. */ + fd_fseq_update( ctx->is_constipated, 0UL ); } +static void +after_credit( fd_snapshot_tile_ctx_t * ctx, + fd_stem_context_t * stem, + int * opt_poll_in FD_PARAM_UNUSED, + int * charge_busy FD_PARAM_UNUSED ) { + + ulong batch_fseq = fd_fseq_query( ctx->is_constipated ); + + /* If batch_fseq == 0, this means that we don't want to calculate/produce + anything. Keep this tile spinning. */ + if( !batch_fseq ) { + return; + } + + if( FD_UNLIKELY( !ctx->is_funk_active ) ) { + /* Setting these parameters are not required because we are joining the + funk that was setup in the replay tile. */ + ctx->funk = fd_funk_open_file( ctx->funk_file, + 1UL, + 0UL, + 0UL, + 0UL, + 0UL, + FD_FUNK_READ_WRITE, + NULL ); + if( FD_UNLIKELY( !ctx->funk ) ) { + FD_LOG_ERR(( "failed to join a funky" )); + } + ctx->is_funk_active = 1; + + FD_LOG_WARNING(( "Just joined funk at file=%s", ctx->funk_file )); + } + + if( fd_batch_fseq_is_snapshot( batch_fseq ) ) { + produce_snapshot( ctx, batch_fseq ); + } else { + produce_eah( ctx, stem, batch_fseq ); + } +} + static ulong populate_allowed_seccomp( fd_topo_t const * topo, fd_topo_tile_t const * tile, @@ -377,14 +505,14 @@ populate_allowed_seccomp( fd_topo_t const * topo, struct sock_filter * out ) { (void)topo; - populate_sock_filter_policy_snapshot( out_cnt, + populate_sock_filter_policy_batch( out_cnt, out, (uint)fd_log_private_logfile_fd(), - (uint)tile->snaps.tmp_fd, - (uint)tile->snaps.tmp_inc_fd, - (uint)tile->snaps.full_snapshot_fd, - (uint)tile->snaps.incremental_snapshot_fd ); - return sock_filter_policy_snapshot_instr_cnt; + (uint)tile->batch.tmp_fd, + (uint)tile->batch.tmp_inc_fd, + (uint)tile->batch.full_snapshot_fd, + (uint)tile->batch.incremental_snapshot_fd ); + return sock_filter_policy_batch_instr_cnt; } static ulong @@ -403,10 +531,10 @@ populate_allowed_fds( fd_topo_t const * topo, if( FD_LIKELY( -1!=fd_log_private_logfile_fd() ) ) out_fds[ out_cnt++ ] = fd_log_private_logfile_fd(); /* logfile */ - out_fds[ out_cnt++ ] = tile->snaps.tmp_fd; - out_fds[ out_cnt++ ] = tile->snaps.tmp_inc_fd; - out_fds[ out_cnt++ ] = tile->snaps.full_snapshot_fd; - out_fds[ out_cnt++ ] = tile->snaps.incremental_snapshot_fd; + out_fds[ out_cnt++ ] = tile->batch.tmp_fd; + out_fds[ out_cnt++ ] = tile->batch.tmp_inc_fd; + out_fds[ out_cnt++ ] = tile->batch.full_snapshot_fd; + out_fds[ out_cnt++ ] = tile->batch.incremental_snapshot_fd; return out_cnt; } @@ -419,8 +547,8 @@ populate_allowed_fds( fd_topo_t const * topo, #include "../../../../disco/stem/fd_stem.c" -fd_topo_run_tile_t fd_tile_snaps = { - .name = "snaps", +fd_topo_run_tile_t fd_tile_batch = { + .name = "batch", .populate_allowed_seccomp = populate_allowed_seccomp, .populate_allowed_fds = populate_allowed_fds, .scratch_align = scratch_align, diff --git a/src/app/fdctl/run/tiles/fd_batch_thread.c b/src/app/fdctl/run/tiles/fd_batch_thread.c new file mode 100644 index 0000000000..e3611ee413 --- /dev/null +++ b/src/app/fdctl/run/tiles/fd_batch_thread.c @@ -0,0 +1,3 @@ +#include "../../../../disco/tiles.h" + +fd_topo_run_tile_t fd_tile_batch_thread = { .name = "btpool", .for_tpool = 1 }; diff --git a/src/app/fdctl/run/tiles/fd_replay.c b/src/app/fdctl/run/tiles/fd_replay.c index 92c8922639..09d65cc56d 100644 --- a/src/app/fdctl/run/tiles/fd_replay.c +++ b/src/app/fdctl/run/tiles/fd_replay.c @@ -56,6 +56,7 @@ #define STORE_IN_IDX (0UL) #define PACK_IN_IDX (1UL) #define GOSSIP_IN_IDX (2UL) +#define BATCH_IN_IDX (3UL) #define STAKE_OUT_IDX (0UL) #define NOTIF_OUT_IDX (1UL) @@ -112,6 +113,11 @@ struct fd_replay_tile_ctx { ulong gossip_in_chunk0; ulong gossip_in_wmark; + // Batch tile input for epoch account hash + fd_wksp_t * batch_in_mem; + ulong batch_in_chunk0; + ulong batch_in_wmark; + // Notification output defs fd_frag_meta_t * notif_out_mcache; ulong * notif_out_sync; @@ -274,13 +280,16 @@ struct fd_replay_tile_ctx { ulong spad_cnt; /* TODO: refactor this all into fd_replay_tile_snapshot_ctx_t. */ - ulong snapshot_interval; /* User defined parameter */ - ulong incremental_interval; /* User defined parameter */ - ulong last_full_snap; /* If a full snapshot has been produced */ - ulong * is_funk_constipated; /* Shared fseq to determine if funk should be constipated */ - ulong prev_full_snapshot_dist; /* Tracking for snapshot creation */ - ulong prev_incr_snapshot_dist; /* Tracking for incremental snapshot creation */ - int first_constipation; + ulong snapshot_interval; /* User defined parameter */ + ulong incremental_interval; /* User defined parameter */ + ulong last_full_snap; /* If a full snapshot has been produced */ + ulong * is_constipated; /* Shared fseq to determine if funk should be constipated */ + ulong prev_full_snapshot_dist; /* Tracking for snapshot creation */ + ulong prev_incr_snapshot_dist; /* Tracking for incremental snapshot creation */ + ulong double_constipation_slot; /* Tracking for double constipation if any */ + + fd_funk_txn_t * false_root; + fd_funk_txn_t * second_false_root; int is_caught_up; }; @@ -507,6 +516,10 @@ during_frag( fd_replay_tile_ctx_t * ctx, FD_LOG_WARNING(( "Received a gossip message for wen-restart while FD is not in wen-restart mode" )); } return; + } else if( in_idx==BATCH_IN_IDX ) { + uchar * src = (uchar *)fd_chunk_to_laddr( ctx->batch_in_mem, chunk ); + fd_memcpy( ctx->slot_ctx->slot_bank.epoch_account_hash.uc, src, sizeof(fd_hash_t) ); + FD_LOG_NOTICE(( "Epoch account hash calculated to be %s", FD_BASE58_ENC_32_ALLOCA( ctx->slot_ctx->slot_bank.epoch_account_hash.uc ) )); } // if( ctx->flags & REPLAY_FLAG_PACKED_MICROBLOCK ) { @@ -587,39 +600,23 @@ blockstore_publish( fd_replay_tile_ctx_t * ctx ) { static void txncache_publish( fd_replay_tile_ctx_t * ctx, fd_funk_txn_t * txn_map, - fd_funk_txn_t * root_txn, - uchar is_funk_constipated ) { + fd_funk_txn_t * to_root_txn, + fd_funk_txn_t * rooted_txn ) { + /* For the status cache, we stop rooting until the status cache has been written out to the current snapshot. We also need to iterate up the - funk transaction tree to figure out what slots should be constipated. + funk transaction tree up until the current "root" to figure out what slots + should be registered. This root can correspond to the latest false root if + one exists. */ - As a note, when funk is constipated we don't want to iterate all the way - up to the root because then we will register previously registered slots - that are in the constipated root. This introduces an edge case where - we will never register the slots that in the original constipated txn. - This currently gets handled in a hacky way by first tracking the first - call to is_funk_constipated. This gets set to 1 as soon as funk gets - constipated. By setting this, we are able to register the slot that - corresponds to the constipated root. */ if( FD_UNLIKELY( !ctx->slot_ctx->status_cache ) ) { return; } - if( ctx->first_constipation ) { - FD_LOG_NOTICE(( "Starting constipation at slot=%lu", root_txn->xid.ul[0] )); - fd_txncache_register_constipated_slot( ctx->slot_ctx->status_cache, root_txn->xid.ul[0] ); - ctx->first_constipation = 0; - } - - fd_funk_txn_t * txn = root_txn; - while( txn ) { - - if( !fd_funk_txn_parent( txn, txn_map ) && is_funk_constipated ) { - break; - } - + fd_funk_txn_t * txn = to_root_txn; + while( txn!=rooted_txn ) { ulong slot = txn->xid.ul[0]; if( FD_LIKELY( !fd_txncache_get_is_constipated( ctx->slot_ctx->status_cache ) ) ) { FD_LOG_INFO(( "Registering slot %lu", slot )); @@ -633,7 +630,7 @@ txncache_publish( fd_replay_tile_ctx_t * ctx, } static void -snapshot_state_update( fd_replay_tile_ctx_t * ctx, ulong smr, uchar is_constipated ) { +snapshot_state_update( fd_replay_tile_ctx_t * ctx, ulong wmk ) { /* We are ready for a snapshot if either we are on or just passed a snapshot interval and no snapshot is currently in progress. This is to handle the @@ -649,6 +646,12 @@ snapshot_state_update( fd_replay_tile_ctx_t * ctx, ulong smr, uchar is_constipat status cache. This will also notify the status cache via the funk constipation fseq. */ + if( ctx->snapshot_interval==ULONG_MAX ) { + return; + } + + uchar is_constipated = fd_fseq_query( ctx->is_constipated ) != 0UL; + if( !ctx->is_caught_up ) { return; } @@ -662,140 +665,372 @@ snapshot_state_update( fd_replay_tile_ctx_t * ctx, ulong smr, uchar is_constipat /* The distance from the last snapshot should only grow until we skip past the last full snapshot. If it has shrunk that means we skipped over the snapshot interval. */ - ulong curr_full_snapshot_dist = smr % ctx->snapshot_interval; + ulong curr_full_snapshot_dist = wmk % ctx->snapshot_interval; uchar is_full_snapshot_ready = curr_full_snapshot_dist < ctx->prev_full_snapshot_dist; ctx->prev_full_snapshot_dist = curr_full_snapshot_dist; /* Do the same for incrementals, only try to create one if there has been a full snapshot. */ - ulong curr_incr_snapshot_dist = smr % ctx->incremental_interval; - uchar is_inc_snapshot_ready = smr % ctx->incremental_interval < ctx->prev_incr_snapshot_dist && ctx->last_full_snap; + + ulong curr_incr_snapshot_dist = wmk % ctx->incremental_interval; + + uchar is_inc_snapshot_ready = wmk % ctx->incremental_interval < ctx->prev_incr_snapshot_dist && ctx->last_full_snap; ctx->prev_incr_snapshot_dist = curr_incr_snapshot_dist; ulong updated_fseq = 0UL; - /* TODO: We need a better check if the smr fell on an epoch boundary due to + /* TODO: We need a better check if the wmk fell on an epoch boundary due to skipped slots. We just don't want to make a snapshot on an epoch boundary */ if( (is_full_snapshot_ready || is_inc_snapshot_ready) && - !fd_runtime_is_epoch_boundary( epoch_bank, smr, smr-1UL ) ) { + !fd_runtime_is_epoch_boundary( epoch_bank, wmk, wmk-1UL ) ) { /* Constipate the status cache when a snapshot is ready to be created. */ if( is_full_snapshot_ready ) { - ctx->last_full_snap = smr; + ctx->last_full_snap = wmk; FD_LOG_NOTICE(( "Ready to create a full snapshot" )); - updated_fseq = fd_snapshot_create_pack_fseq( 0, smr ); + updated_fseq = fd_batch_fseq_pack( 1, 0, wmk ); } else { FD_LOG_NOTICE(( "Ready to create an incremental snapshot" )); - updated_fseq = fd_snapshot_create_pack_fseq( 1, smr ); + updated_fseq = fd_batch_fseq_pack( 1, 1, wmk ); } - ctx->first_constipation = 1; /* TODO: I hate this hack. */ fd_txncache_set_is_constipated( ctx->slot_ctx->status_cache, 1 ); - fd_fseq_update( ctx->is_funk_constipated, updated_fseq ); + fd_fseq_update( ctx->is_constipated, updated_fseq ); } } static void -funk_and_txncache_publish( fd_replay_tile_ctx_t * ctx, ulong wmk, fd_funk_txn_xid_t const * xid ) { +funk_publish( fd_replay_tile_ctx_t * ctx, + fd_funk_txn_t * to_root_txn, + fd_funk_txn_t * txn_map, + ulong wmk, + uchar is_constipated ) { - /* When we are trying to root for an smr that we want a snapshot for, we need - to constipate funk as well as the txncache. The snapshot tile will notify - the replay tile that funk is ready to be unconstipated via the - is_funk_constipated fseq. Txncache constipation will be handled differently. - All operations on the status cache are bounded by a rw lock making - operation atomic. The status cache will internally track if it is in a - constipated state. The snapshot tile will be directly responsible for - unconstipating the txncache. */ - - fd_funk_txn_t * txn_map = fd_funk_txn_map( ctx->funk, fd_funk_wksp( ctx->funk ) ); - fd_funk_txn_t * root_txn = fd_funk_txn_query( xid, txn_map ); - - /* Once all of the banking tiles have finished executing, grab a write - lock on funk and publish the transaction. - - The publishing mechanism for funk and the status cache will change - if constipation is enabled. If constipation is enabled, - constipate the current transaction into the constipated root. This means - we will treat the oldest ancestor as the new root of the transaction tree. - All slots that are "rooted" in the constipated state will be published - into the constipated root. When constipation is disabled, flush the backed - up transactions into the root. - - There is some unfortunate behavior we have to consider here with - publishing and constipation. When the status cache is in a constipated - state, we want to register all of the slots except for the slot that - corresponds to the constipated root. However, during the first pass when - we are constipated, we also need to register the constipated root into - the txn cache. If we don't then the constipated root slot will never be - included in the status cache. TODO: There is probably a better way to - hhandle this. - - Constipation can be activated for a variety of reasons including snapshot - creation and epoch account hash generation. - TODO: Currently epoch account hash generation is unimplemented but the - funk fseq should be repurposed to be more generalized. */ - - for( ulong i = 0UL; ibank_cnt; i++ ) { - fd_tpool_wait( ctx->tpool, i+1 ); - } fd_funk_start_write( ctx->funk ); - uchar is_funk_constipated = fd_fseq_query( ctx->is_funk_constipated ) != 0; - - txncache_publish( ctx, txn_map, root_txn, is_funk_constipated ); + fd_epoch_bank_t * epoch_bank = fd_exec_epoch_ctx_epoch_bank( ctx->slot_ctx->epoch_ctx ); /* Now try to publish into funk, this is handled differently based on if - funk is constipated. */ - - if( !is_funk_constipated ) { - FD_LOG_NOTICE(( "Publishing slot=%lu", wmk )); - - ulong rc = fd_funk_txn_publish( ctx->funk, root_txn, 1 ); - if( FD_UNLIKELY( !rc ) ) { - FD_LOG_ERR(( "failed to funk publish slot %lu", wmk )); + funk is constipated or if funk is double-constipated. Even if funk was + double-constipated and now no-longer is we still want to preserve the + root for the epoch account hash. */ + if( ctx->double_constipation_slot ) { + FD_LOG_NOTICE(( "Double constipation publish for wmk=%lu", wmk )); + + fd_funk_txn_t * txn = to_root_txn; + while( txn!=ctx->second_false_root ) { + if( FD_UNLIKELY( fd_funk_txn_publish_into_parent( ctx->funk, txn, 0 ) ) ) { + FD_LOG_ERR(( "Can't publish funk transaction" )); + } + txn = fd_funk_txn_parent( txn, txn_map ); } - } else { + } else if( is_constipated ) { FD_LOG_WARNING(( "Publishing slot=%lu while constipated", wmk )); /* At this point, first collapse the current transaction that should be published into the oldest child transaction. */ - fd_funk_txn_t * txn = root_txn; - fd_funk_txn_t * parent_txn = fd_funk_txn_parent( txn, txn_map ); + if( FD_UNLIKELY( wmk>=epoch_bank->eah_start_slot ) ) { + /* We need to double-constipate at this point. */ - while( parent_txn ) { - if( FD_UNLIKELY( fd_funk_txn_publish_into_parent( ctx->funk, txn, 0 ) ) ) { - FD_LOG_ERR(( "Can't publish funk transaction" )); + /* First, find the txn where the corresponding slot is the minimum + pending transaction where >= eah_start_slot. */ + + fd_funk_txn_t * txn = to_root_txn; + fd_funk_txn_t * parent_txn = fd_funk_txn_parent( txn, txn_map ); + + while( parent_txn ) { + + int is_curr_gteq_eah_start = txn->xid.ul[0] >= epoch_bank->eah_start_slot; + int is_prev_lt_eah_start = parent_txn->xid.ul[0] < epoch_bank->eah_start_slot; + if( is_curr_gteq_eah_start && is_prev_lt_eah_start ) { + break; + } + txn = parent_txn; + parent_txn = fd_funk_txn_parent( txn, txn_map ); + } + + FD_TEST( parent_txn ); /* We should never get to this point because of + the constipated root. The constipated root + is guaranteed to have a slot thats < eah_start_slot. */ + + /* This transaction will now become the double-constipated root. */ + + FD_LOG_NOTICE(( "Entering a double constipated state eah_start=%lu eah_slot=%lu", + epoch_bank->eah_start_slot, txn->xid.ul[0] )); + + ctx->double_constipation_slot = txn->xid.ul[0]; + + /* Other pending transactions will get published into the child during + the next invocation of funk_publish. */ + } else { + + FD_LOG_NOTICE(( "Publishing into constipated root for wmk=%lu", wmk )); + /* Standard constipated case where we aren't considering the eah. */ + fd_funk_txn_t * txn = to_root_txn; + + while( txn!=ctx->false_root ) { + if( FD_UNLIKELY( fd_funk_txn_publish_into_parent( ctx->funk, txn, 0 ) ) ) { + FD_LOG_ERR(( "Can't publish funk transaction" )); + } + txn = fd_funk_txn_parent( txn, txn_map ); + } + } + } else { + + /* This is the case where we are not in the constipated case. We only need + to do special handling in the case where the epoch account hash is about + to be calculated. */ + + FD_LOG_NOTICE(( "Publishing slot=%lu", wmk )); + + // if( FD_UNLIKELY( wmk%11==0) ) { + // epoch_bank->eah_start_slot = wmk; + + if( FD_UNLIKELY( wmk>=epoch_bank->eah_start_slot ) ) { + + FD_LOG_NOTICE(( "EAH is ready to be calculated" )); + + /* This condition means that we want to start producing an epoch account + hash at a slot that is in the set of transactions we are about to + publish. We only want to publish all slots that are <= the slot that + we will calculate the epoch account hash for. */ + + fd_funk_txn_t * txn = to_root_txn; + fd_funk_txn_t * parent_txn = fd_funk_txn_parent( txn, txn_map ); + while( parent_txn ) { + /* We need to be careful here because the eah start slot may be skipped + so the actual slot that we calculate the eah for may be greater than + the eah start slot. The transaction must correspond to a slot greater + than or equal to the eah start slot, but its parent transaction must + either have been published already or must be less than the eah start + slot. */ + + int is_curr_gteq_eah_start = txn->xid.ul[0] >= epoch_bank->eah_start_slot; + int is_prev_lt_eah_start = parent_txn->xid.ul[0] < epoch_bank->eah_start_slot; + if( is_curr_gteq_eah_start && is_prev_lt_eah_start ) { + break; + } + txn = parent_txn; + parent_txn = fd_funk_txn_parent( txn, txn_map ); + } + + /* At this point, we know txn is the funk txn that we will want to + calculate the eah for since it's the minimum slot that is >= + eah_start_slot. */ + + FD_LOG_NOTICE(( "The eah has an expected start slot of %lu and is being created for slot %lu", epoch_bank->eah_start_slot, txn->xid.ul[0] )); + + if( FD_UNLIKELY( !fd_funk_txn_publish( ctx->funk, txn, 1 ) ) ) { + FD_LOG_ERR(( "failed to funk publish" )); + } + + /* At this point, we have the root for which we want to calculate the + epoch account hash for. The other children that are > eah_start_slot + but <= wmk will be published into the constipated root during the next + invocation of funk_and_txncache_publish. + + Notify the batch tile that an eah should be computed. */ + + ulong updated_fseq = fd_batch_fseq_pack( 0UL, 0UL, txn->xid.ul[0] ); + fd_fseq_update( ctx->is_constipated, updated_fseq ); + epoch_bank->eah_start_slot = FD_SLOT_NULL; + + } else { + /* This is the standard case. Publish all transactions up to and + including the watermark. This will publish any in-prep ancestors + of root_txn as well. */ + + if( FD_UNLIKELY( !fd_funk_txn_publish( ctx->funk, to_root_txn, 1 ) ) ) { + FD_LOG_ERR(( "failed to funk publish slot %lu", wmk )); } - txn = parent_txn; - parent_txn = fd_funk_txn_parent( txn, txn_map ); } } fd_funk_end_write( ctx->funk ); - /* TODO: This needs to get integrated into the snapshot tile. */ - fd_epoch_bank_t * epoch_bank = fd_exec_epoch_ctx_epoch_bank( ctx->slot_ctx->epoch_ctx ); - if( wmk >= epoch_bank->eah_start_slot ) { - fd_accounts_hash( ctx->slot_ctx->acc_mgr->funk, &ctx->slot_ctx->slot_bank, - ctx->slot_ctx->valloc, ctx->tpool, &ctx->slot_ctx->slot_bank.epoch_account_hash ); +} + +static fd_funk_txn_t* +get_rooted_txn( fd_replay_tile_ctx_t * ctx, + fd_funk_txn_t * to_root_txn, + fd_funk_txn_t * txn_map, + uchar is_constipated ) { + + /* We need to get the rooted transaction that we are publishing into. This + needs to account for the three different cases: no constipation, single + constipation, double constipation. + + Also, if it's the first time that we are setting the false root(s), then + we must also register them into the status cache because we don't register + the root in txncache_publish to avoid registering the same slot multiple times. */ + + if( FD_UNLIKELY( ctx->double_constipation_slot ) ) { + + if( FD_UNLIKELY( !ctx->second_false_root ) ) { + + /* Set value of second false root, save it and publish to txncache. */ + fd_funk_txn_t * txn = to_root_txn; + while( txn->xid.ul[0]>ctx->double_constipation_slot ) { + txn = fd_funk_txn_parent( txn, txn_map ); + } + + if( FD_LIKELY( !fd_txncache_get_is_constipated( ctx->slot_ctx->status_cache ) ) ) { + fd_txncache_register_root_slot( ctx->slot_ctx->status_cache, txn->xid.ul[0] ); + } else { + fd_txncache_register_constipated_slot( ctx->slot_ctx->status_cache, txn->xid.ul[0] ); + } + + if( txn->xid.ul[0] != ctx->double_constipation_slot ) { + FD_LOG_ERR(( "txn->xid.ul[0] = %lu, ctx->double_constipation_slot = %lu", txn->xid.ul[0], ctx->double_constipation_slot )); + } + ctx->second_false_root = txn; + } + return ctx->second_false_root; + } else if( is_constipated ) { + + if( FD_UNLIKELY( !ctx->false_root ) ) { + + fd_funk_txn_t * txn = to_root_txn; + fd_funk_txn_t * parent_txn = fd_funk_txn_parent( txn, txn_map ); + while( parent_txn ) { + txn = parent_txn; + parent_txn = fd_funk_txn_parent( txn, txn_map ); + } + + ctx->false_root = txn; + if( !fd_txncache_get_is_constipated( ctx->slot_ctx->status_cache ) ) { + fd_txncache_register_root_slot( ctx->slot_ctx->status_cache, txn->xid.ul[0] ); + } else { + fd_txncache_register_constipated_slot( ctx->slot_ctx->status_cache, txn->xid.ul[0] ); + } + } + return ctx->false_root; + } else { + return NULL; + } +} + +static void +funk_and_txncache_publish( fd_replay_tile_ctx_t * ctx, ulong wmk, fd_funk_txn_xid_t const * xid ) { + + FD_LOG_NOTICE(( "Entering funk_and_txncache_publish for wmk=%lu", wmk )); + + /* This function is responsible for publishing/registering all in-prep slots + up to and including the watermark slot into funk and the transaction cache. + + However, we need to modify this behavior to support snapshot creation and + epoch account hash generation (which is handled by the batch tile). + Specifically, we need to change the mechanism by introducing the concept of + a constipated root. We want to keep the root of funk/txncache constant + while the batch tile reads from the root of funk. At the same time, we + want to keep publishing into funk. We accomplish this by treating the + oldest in-prep ancestor of funk as the "constipated/false" root. While + the batch tile "works", we will only publish into the false root. Once the + batch tile is done producing a snapshot/eah, we will then flush the + constipated root into the real root of funk as we no longer need a frozen + funk transaction to read from. The batch tile will communicate with the + replay tile via the is_constipated fseq and a link. + + There is a pretty important edge case to consider here: what do we do if + we are currently in the middle of creating a snapshot, but we need to + record our state for the epoch account hash? The epoch account hash must + be created for a specific slot and we can't block execution to calculate + this hash. The solution will be to introduce a second constipation via a + second false root. This new false root will correspond to the oldest + child transaction of the transaction that corresponds to the eah + calculation slot. When the snapshot is done being produced, any further + snapshot creation will be blocked until the epoch account hash is created. + We will use the second false root to publish into while the batch tile + produces the epoch account hash. We do not modify any of the parents of + the second constipated root until we are done producing a snapshot. + + A similar mechanism for txncache constipation is needed only for snapshot + creation. This is simpler than for funk because txncache operations are + atomic and we can just register slots into a constipated set while the + txncache is getting copied out. This is a much faster operation and the + txncache will likely get unconstipated before funk. + + Single Funk Constipation Example: + + If we want to create a snapshot/eah for slot n, then we will publish + all transactions up to and including those that correspond to slot n. + We will then publish all transactions into the immediate child of n (lets + assume it's n+1) in this case. So every transaction will be published into + n+1 and NOT n. When the computation is done, we resume publishing as normal. + + Double Funk Constipation Example: + + Let's say we are creating a snapshot for slot n and we want + the epoch account hash for slot m. A snapshot will take x slots to produce + and we can assume that n + x > m. So at some slot y where n < y < m, the + state of funk will be: a root at slot n with a constipated root at + n+1 which gets published into. However, once it is time to publish slot m, + we will now have a root at slot n, a constipated root at slot m, and we will + then start publishing into the second constipated root at slot m + 1. */ + + /* First wait for all tpool threads to finish. */ + + for( ulong i = 0UL; ibank_cnt; i++ ) { + fd_tpool_wait( ctx->tpool, i+1 ); + } + + fd_epoch_bank_t * epoch_bank = fd_exec_epoch_ctx_epoch_bank( ctx->slot_ctx->epoch_ctx ); + uchar is_constipated = fd_fseq_query( ctx->is_constipated ) != 0; + + /* If the is_constipated fseq is set to 0 that means that the batch tile + is currently in an idle state. However, if there was a double constipation + active, that means that we need to kick off the pending epoch account hash + calculation. */ + if( ctx->double_constipation_slot && !is_constipated ) { + FD_LOG_NOTICE(( "No longer double constipated, ready to start computing the epoch account hash" )); + + /* At this point, the snapshot has been completed, so we are now ready to + start the eah computation. */ + ulong updated_fseq = fd_batch_fseq_pack( 0UL, 0UL, ctx->double_constipation_slot ); + fd_fseq_update( ctx->is_constipated, updated_fseq ); epoch_bank->eah_start_slot = FD_SLOT_NULL; } - snapshot_state_update( ctx, wmk, is_funk_constipated ); + /* If the (second) false root is no longer needed, then we should stop + tracking it. */ + if( FD_UNLIKELY( ctx->false_root && !is_constipated ) ) { + FD_LOG_NOTICE(( "Unsetting false root tracking" )); + ctx->false_root = NULL; + } + if( FD_UNLIKELY( ctx->second_false_root && !ctx->double_constipation_slot ) ) { + FD_LOG_NOTICE(( "Unsetting second false root tracking" )); + ctx->second_false_root = NULL; + } + + + /* Handle updates to funk and the status cache. */ + + fd_funk_txn_t * txn_map = fd_funk_txn_map( ctx->funk, fd_funk_wksp( ctx->funk ) ); + fd_funk_txn_t * to_root_txn = fd_funk_txn_query( xid, txn_map ); + fd_funk_txn_t * rooted_txn = get_rooted_txn( ctx, to_root_txn, txn_map, is_constipated ); + + txncache_publish( ctx, txn_map, to_root_txn, rooted_txn ); + + funk_publish( ctx, to_root_txn, txn_map, wmk, is_constipated ); + + /* Update the snapshot state and determine if one is ready to be created. */ + + snapshot_state_update( ctx, wmk ); if( FD_UNLIKELY( ctx->capture_ctx ) ) { fd_runtime_checkpt( ctx->capture_ctx, ctx->slot_ctx, wmk ); } + } static int suppress_notify( const fd_pubkey_t * prog ) { /* Certain accounts are just noise and a waste of notification bandwidth */ - if ( !memcmp( prog, fd_solana_vote_program_id.key, sizeof( fd_pubkey_t ) ) ) { + if( !memcmp( prog, fd_solana_vote_program_id.key, sizeof(fd_pubkey_t) ) ) { return 1; - } else if ( !memcmp( prog, fd_solana_system_program_id.key, sizeof( fd_pubkey_t ) ) ) { + } else if( !memcmp( prog, fd_solana_system_program_id.key, sizeof(fd_pubkey_t) ) ) { return 1; - } else if ( !memcmp( prog, fd_solana_compute_budget_program_id.key, sizeof( fd_pubkey_t ) ) ) { + } else if( !memcmp( prog, fd_solana_compute_budget_program_id.key, sizeof(fd_pubkey_t) ) ) { return 1; } else { return 0; @@ -1250,7 +1485,7 @@ after_frag( fd_replay_tile_ctx_t * ctx, fd_blockstore_end_read( ctx->blockstore ); fork->slot_ctx.block = block_; - /* TODO:FIXME: This needs to be unhacked. */ + /* FIXME: This needs to be unhacked. */ fork->slot_ctx.slot_bank.max_tick_height += 64UL * (curr_slot - ctx->parent_slot); fork->slot_ctx.slot_bank.tick_height += 64UL * (curr_slot - ctx->parent_slot); @@ -1690,6 +1925,7 @@ init_after_snapshot( fd_replay_tile_ctx_t * ctx ) { FD_LOG_NOTICE( ( "snapshot slot %lu", snapshot_slot ) ); FD_LOG_NOTICE( ( "total stake %lu", bank_hash_cmp->total_stake ) ); + } void @@ -1847,6 +2083,7 @@ after_credit( fd_replay_tile_ctx_t * ctx, static void during_housekeeping( void * _ctx ) { + fd_replay_tile_ctx_t * ctx = (fd_replay_tile_ctx_t *)_ctx; /* Update watermark. The publish watermark is the minimum of the tower @@ -1906,8 +2143,17 @@ unprivileged_init( fd_topo_t * topo, FD_LOG_NOTICE(("finished unprivileged init")); void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id ); + if( FD_UNLIKELY( tile->in_cnt < 4 || + strcmp( topo->links[ tile->in_link_id[ STORE_IN_IDX ] ].name, "store_replay" ) || + strcmp( topo->links[ tile->in_link_id[ PACK_IN_IDX ] ].name, "pack_replay") || + strcmp( topo->links[ tile->in_link_id[ GOSSIP_IN_IDX ] ].name, "gossip_repla") || + strcmp( topo->links[ tile->in_link_id[ BATCH_IN_IDX ] ].name, "batch_replay" ) ) ) { + FD_LOG_ERR(( "replay tile has none or unexpected input links %lu %s %s", + tile->in_cnt, topo->links[ tile->in_link_id[ 0 ] ].name, topo->links[ tile->in_link_id[ 1 ] ].name )); + } + /**********************************************************************/ - /* scratch (bump)-allocate memory owned by the replay tile */ + /* scratch (bump)-allocate memory owned by the replay tile */ /**********************************************************************/ /* Do not modify order! This is join-order in unprivileged_init. */ @@ -2028,10 +2274,10 @@ unprivileged_init( fd_topo_t * topo, ulong constipated_obj_id = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "constipate" ); FD_TEST( constipated_obj_id!=ULONG_MAX ); - ctx->is_funk_constipated = fd_fseq_join( fd_topo_obj_laddr( topo, constipated_obj_id ) ); - if( FD_UNLIKELY( !ctx->is_funk_constipated ) ) FD_LOG_ERR(( "replay tile has no root_slot fseq" )); - fd_fseq_update( ctx->is_funk_constipated, 0UL ); - FD_TEST( 0UL==fd_fseq_query( ctx->is_funk_constipated ) ); + ctx->is_constipated = fd_fseq_join( fd_topo_obj_laddr( topo, constipated_obj_id ) ); + if( FD_UNLIKELY( !ctx->is_constipated ) ) FD_LOG_ERR(( "replay tile has no constipated fseq" )); + fd_fseq_update( ctx->is_constipated, 0UL ); + FD_TEST( 0UL==fd_fseq_query( ctx->is_constipated ) ); /**********************************************************************/ /* poh_slot fseq */ @@ -2156,8 +2402,8 @@ unprivileged_init( fd_topo_t * topo, ctx->tpool = fd_tpool_init( ctx->tpool_mem, tile->replay.tpool_thread_count ); if( FD_LIKELY( tile->replay.tpool_thread_count > 1 ) ) { - /* start the tpool workers */ - for( ulong i =1; ireplay.tpool_thread_count; i++ ) { + /* Start the tpool workers */ + for( ulong i=1UL; ireplay.tpool_thread_count; i++ ) { if( fd_tpool_worker_push( ctx->tpool, i, (uchar *)tpool_worker_mem + TPOOL_WORKER_MEM_SZ*(i - 1U), TPOOL_WORKER_MEM_SZ ) == NULL ) { FD_LOG_ERR(( "failed to launch worker" )); } @@ -2257,24 +2503,30 @@ unprivileged_init( fd_topo_t * topo, /* links */ /**********************************************************************/ - /* Set up store tile input */ + /* Setup store tile input */ fd_topo_link_t * store_in_link = &topo->links[ tile->in_link_id[ STORE_IN_IDX ] ]; ctx->store_in_mem = topo->workspaces[ topo->objs[ store_in_link->dcache_obj_id ].wksp_id ].wksp; ctx->store_in_chunk0 = fd_dcache_compact_chunk0( ctx->store_in_mem, store_in_link->dcache ); ctx->store_in_wmark = fd_dcache_compact_wmark( ctx->store_in_mem, store_in_link->dcache, store_in_link->mtu ); - /* Set up pack tile input */ + /* Setup pack tile input */ fd_topo_link_t * pack_in_link = &topo->links[ tile->in_link_id[ PACK_IN_IDX ] ]; ctx->pack_in_mem = topo->workspaces[ topo->objs[ pack_in_link->dcache_obj_id ].wksp_id ].wksp; ctx->pack_in_chunk0 = fd_dcache_compact_chunk0( ctx->pack_in_mem, pack_in_link->dcache ); ctx->pack_in_wmark = fd_dcache_compact_wmark( ctx->pack_in_mem, pack_in_link->dcache, pack_in_link->mtu ); - /* Set up gossip tile input for wen-restart */ + /* Setup gossip tile input for wen-restart */ fd_topo_link_t * gossip_in_link = &topo->links[ tile->in_link_id[ GOSSIP_IN_IDX ] ]; ctx->gossip_in_mem = topo->workspaces[ topo->objs[ gossip_in_link->dcache_obj_id ].wksp_id ].wksp; ctx->gossip_in_chunk0 = fd_dcache_compact_chunk0( ctx->gossip_in_mem, gossip_in_link->dcache ); ctx->gossip_in_wmark = fd_dcache_compact_wmark( ctx->gossip_in_mem, gossip_in_link->dcache, gossip_in_link->mtu ); + /* Setup batch tile input for epoch account hash */ + fd_topo_link_t * batch_in_link = &topo->links[ tile->in_link_id[ BATCH_IN_IDX ] ]; + ctx->batch_in_mem = topo->workspaces[ topo->objs[ batch_in_link->dcache_obj_id ].wksp_id ].wksp; + ctx->batch_in_chunk0 = fd_dcache_compact_chunk0( ctx->batch_in_mem, batch_in_link->dcache ); + ctx->batch_in_wmark = fd_dcache_compact_wmark( ctx->batch_in_mem, batch_in_link->dcache, batch_in_link->mtu ); + fd_topo_link_t * notif_out = &topo->links[ tile->out_link_id[ NOTIF_OUT_IDX ] ]; ctx->notif_out_mcache = notif_out->mcache; ctx->notif_out_sync = fd_mcache_seq_laddr( ctx->notif_out_mcache ); diff --git a/src/app/fdctl/run/tiles/fd_snapshot_thread.c b/src/app/fdctl/run/tiles/fd_snapshot_thread.c deleted file mode 100644 index aabcd666ee..0000000000 --- a/src/app/fdctl/run/tiles/fd_snapshot_thread.c +++ /dev/null @@ -1,3 +0,0 @@ -#include "../../../../disco/tiles.h" - -fd_topo_run_tile_t fd_tile_snaps_thread = { .name = "stpool", .for_tpool = 1 }; diff --git a/src/app/fdctl/run/tiles/generated/snapshot_seccomp.h b/src/app/fdctl/run/tiles/generated/batch_seccomp.h similarity index 93% rename from src/app/fdctl/run/tiles/generated/snapshot_seccomp.h rename to src/app/fdctl/run/tiles/generated/batch_seccomp.h index 7b1baa6abe..f29ba2e5e5 100644 --- a/src/app/fdctl/run/tiles/generated/snapshot_seccomp.h +++ b/src/app/fdctl/run/tiles/generated/batch_seccomp.h @@ -1,6 +1,6 @@ /* THIS FILE WAS GENERATED BY generate_filters.py. DO NOT EDIT BY HAND! */ -#ifndef HEADER_fd_src_app_fdctl_run_tiles_generated_snapshot_seccomp_h -#define HEADER_fd_src_app_fdctl_run_tiles_generated_snapshot_seccomp_h +#ifndef HEADER_fd_src_app_fdctl_run_tiles_generated_batch_seccomp_h +#define HEADER_fd_src_app_fdctl_run_tiles_generated_batch_seccomp_h #include "../../../../../../src/util/fd_util_base.h" #include @@ -21,9 +21,9 @@ #else # error "Target architecture is unsupported by seccomp." #endif -static const unsigned int sock_filter_policy_snapshot_instr_cnt = 51; +static const unsigned int sock_filter_policy_batch_instr_cnt = 51; -static void populate_sock_filter_policy_snapshot( ulong out_cnt, struct sock_filter * out, unsigned int logfile_fd, unsigned int tmp_fd, unsigned int tmp_inc_fd, unsigned int full_snapshot_fd, unsigned int incremental_snapshot_fd) { +static void populate_sock_filter_policy_batch( ulong out_cnt, struct sock_filter * out, unsigned int logfile_fd, unsigned int tmp_fd, unsigned int tmp_inc_fd, unsigned int full_snapshot_fd, unsigned int incremental_snapshot_fd) { FD_TEST( out_cnt >= 51 ); struct sock_filter filter[51] = { /* Check: Jump to RET_KILL_PROCESS if the script's arch != the runtime arch */ diff --git a/src/app/fdctl/run/topos/fd_firedancer.c b/src/app/fdctl/run/topos/fd_firedancer.c index ed987ff651..a50169366c 100644 --- a/src/app/fdctl/run/topos/fd_firedancer.c +++ b/src/app/fdctl/run/topos/fd_firedancer.c @@ -71,7 +71,7 @@ fd_topo_initialize( config_t * config ) { ulong bank_tile_cnt = config->layout.bank_tile_count; ulong replay_tpool_thread_count = config->tiles.replay.tpool_thread_count; - ulong snaps_tpool_thread_count = config->tiles.snaps.hash_tpool_thread_count; + ulong batch_tpool_thread_count = config->tiles.batch.hash_tpool_thread_count; int enable_rpc = ( config->rpc.port != 0 ); @@ -128,6 +128,7 @@ fd_topo_initialize( config_t * config ) { fd_topob_wksp( topo, "gossip_voter" ); fd_topob_wksp( topo, "voter_gossip" ); fd_topob_wksp( topo, "voter_dedup" ); + fd_topob_wksp( topo, "batch_replay" ); fd_topob_wksp( topo, "net" ); fd_topob_wksp( topo, "quic" ); @@ -149,8 +150,8 @@ fd_topo_initialize( config_t * config ) { fd_topob_wksp( topo, "voter" ); fd_topob_wksp( topo, "poh_slot" ); fd_topob_wksp( topo, "eqvoc" ); - fd_topob_wksp( topo, "snaps" ); - fd_topob_wksp( topo, "stpool" ); + fd_topob_wksp( topo, "batch" ); + fd_topob_wksp( topo, "btpool" ); fd_topob_wksp( topo, "constipate" ); @@ -210,6 +211,8 @@ fd_topo_initialize( config_t * config ) { /**/ fd_topob_link( topo, "voter_sign", "voter_sign", 128UL, FD_TXN_MTU, 1UL ); /**/ fd_topob_link( topo, "sign_voter", "sign_voter", 128UL, 64UL, 1UL ); + /**/ fd_topob_link( topo, "batch_replay", "batch_replay", 128UL, 32UL, 1UL ); + ushort parsed_tile_to_cpu[ FD_TILE_MAX ]; /* Unassigned tiles will be floating, unless auto topology is enabled. */ for( ulong i=0UL; itile_cnt ], 0 ); /* These thread tiles must be defined immediately after the replay tile. We subtract one because the replay tile acts as a thread in the tpool as well. */ FOR(replay_tpool_thread_count-1) fd_topob_tile( topo, "rtpool", "rtpool", "metric_in", tile_to_cpu[ topo->tile_cnt ], 0 ); - /**/ fd_topob_tile( topo, "snaps", "snaps", "metric_in", tile_to_cpu[ topo->tile_cnt ], 0 ); + /**/ fd_topob_tile( topo, "batch", "batch", "metric_in", tile_to_cpu[ topo->tile_cnt ], 0 ); /* These thread tiles must be defined immediately after the snapshot tile. */ - FOR(snaps_tpool_thread_count) fd_topob_tile( topo, "stpool", "stpool", "metric_in", tile_to_cpu[ topo->tile_cnt ], 0 ); + FOR(batch_tpool_thread_count-1) fd_topob_tile( topo, "btpool", "btpool", "metric_in", tile_to_cpu[ topo->tile_cnt ], 0 ); if( enable_rpc ) fd_topob_tile( topo, "rpcsrv", "rpcsrv", "metric_in", tile_to_cpu[ topo->tile_cnt ], 0 ); fd_topo_tile_t * store_tile = &topo->tiles[ fd_topo_find_tile( topo, "storei", 0UL ) ]; fd_topo_tile_t * replay_tile = &topo->tiles[ fd_topo_find_tile( topo, "replay", 0UL ) ]; fd_topo_tile_t * repair_tile = &topo->tiles[ fd_topo_find_tile( topo, "repair", 0UL ) ]; - fd_topo_tile_t * snaps_tile = &topo->tiles[ fd_topo_find_tile( topo, "snaps", 0UL ) ]; + fd_topo_tile_t * snaps_tile = &topo->tiles[ fd_topo_find_tile( topo, "batch", 0UL ) ]; /* Create a shared blockstore to be used by store and replay. */ fd_topo_obj_t * blockstore_obj = setup_topo_blockstore( topo, @@ -423,6 +426,7 @@ fd_topo_initialize( config_t * config ) { /**/ fd_topob_tile_out( topo, "replay", 0UL, "replay_notif", 0UL ); /**/ fd_topob_tile_in( topo, "replay", 0UL, "metric_in", "pack_replay", 0UL, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED ); /**/ fd_topob_tile_in( topo, "replay", 0UL, "metric_in", "gossip_repla", 0UL, FD_TOPOB_UNRELIABLE, FD_TOPOB_POLLED ); + /**/ fd_topob_tile_in( topo, "replay", 0UL, "metric_in", "batch_replay", 0UL, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED ); /**/ fd_topob_tile_out( topo, "replay", 0UL, "replay_voter", 0UL ); /**/ fd_topob_tile_out( topo, "replay", 0UL, "replay_gossi", 0UL ); /**/ fd_topob_tile_out( topo, "replay", 0UL, "replay_store", 0UL ); @@ -459,6 +463,8 @@ fd_topo_initialize( config_t * config ) { FOR(shred_tile_cnt) fd_topob_tile_in( topo, "eqvoc", 0UL, "metric_in", "shred_net", i, FD_TOPOB_UNRELIABLE, FD_TOPOB_POLLED ); /* No reliable consumers of networking fragments, may be dropped or overrun */ + /**/ fd_topob_tile_out( topo, "batch", 0UL, "batch_replay", 0UL ); + if( enable_rpc ) { fd_topob_tile_in( topo, "rpcsrv", 0UL, "metric_in", "replay_notif", 0UL, FD_TOPOB_UNRELIABLE, FD_TOPOB_POLLED ); fd_topob_tile_in( topo, "rpcsrv", 0UL, "metric_in", "stake_out", 0UL, FD_TOPOB_UNRELIABLE, FD_TOPOB_POLLED ); @@ -638,8 +644,8 @@ fd_topo_initialize( config_t * config ) { memcpy( tile->replay.src_mac_addr, config->tiles.net.mac_addr, 6UL ); tile->replay.vote = config->consensus.vote; strncpy( tile->replay.vote_account_path, config->consensus.vote_account_path, sizeof(tile->replay.vote_account_path) ); - tile->replay.full_interval = config->tiles.snaps.full_interval; - tile->replay.incremental_interval = config->tiles.snaps.incremental_interval; + tile->replay.full_interval = config->tiles.batch.full_interval; + tile->replay.incremental_interval = config->tiles.batch.incremental_interval; FD_LOG_NOTICE(("config->consensus.identity_path: %s", config->consensus.identity_path)); FD_LOG_NOTICE(("config->consensus.vote_account_path: %s", config->consensus.vote_account_path)); @@ -687,13 +693,13 @@ fd_topo_initialize( config_t * config ) { tile->rpcserv.tpu_port = config->tiles.quic.regular_transaction_listen_port; tile->rpcserv.tpu_ip_addr = config->tiles.net.ip_addr; strncpy( tile->rpcserv.identity_key_path, config->consensus.identity_path, sizeof(tile->rpcserv.identity_key_path) ); - } else if( FD_UNLIKELY( !strcmp( tile->name, "snaps" ) ) ) { - tile->snaps.full_interval = config->tiles.snaps.full_interval; - tile->snaps.incremental_interval = config->tiles.snaps.incremental_interval; - strncpy( tile->snaps.out_dir, config->tiles.snaps.out_dir, sizeof(tile->snaps.out_dir) ); - tile->snaps.hash_tpool_thread_count = config->tiles.snaps.hash_tpool_thread_count; + } else if( FD_UNLIKELY( !strcmp( tile->name, "batch" ) ) ) { + tile->batch.full_interval = config->tiles.batch.full_interval; + tile->batch.incremental_interval = config->tiles.batch.incremental_interval; + strncpy( tile->batch.out_dir, config->tiles.batch.out_dir, sizeof(tile->batch.out_dir) ); + tile->batch.hash_tpool_thread_count = config->tiles.batch.hash_tpool_thread_count; strncpy( tile->replay.funk_file, config->tiles.replay.funk_file, sizeof(tile->replay.funk_file) ); - } else if( FD_UNLIKELY( !strcmp( tile->name, "stpool" ) ) ) { + } else if( FD_UNLIKELY( !strcmp( tile->name, "btpool" ) ) ) { /* Nothing for now */ } else if( FD_UNLIKELY( !strcmp( tile->name, "gui" ) ) ) { if( FD_UNLIKELY( !fd_cstr_to_ip4_addr( config->tiles.gui.gui_listen_address, &tile->gui.listen_addr ) ) ) diff --git a/src/app/fddev/main1.c b/src/app/fddev/main1.c index cd170af60d..081029be14 100644 --- a/src/app/fddev/main1.c +++ b/src/app/fddev/main1.c @@ -58,12 +58,12 @@ extern fd_topo_run_tile_t fd_tile_repair; extern fd_topo_run_tile_t fd_tile_store_int; extern fd_topo_run_tile_t fd_tile_replay; extern fd_topo_run_tile_t fd_tile_replay_thread; -extern fd_topo_run_tile_t fd_tile_snaps_thread; +extern fd_topo_run_tile_t fd_tile_batch; +extern fd_topo_run_tile_t fd_tile_batch_thread; extern fd_topo_run_tile_t fd_tile_poh_int; extern fd_topo_run_tile_t fd_tile_sender; extern fd_topo_run_tile_t fd_tile_eqvoc; extern fd_topo_run_tile_t fd_tile_rpcserv; -extern fd_topo_run_tile_t fd_tile_snaps; #endif fd_topo_run_tile_t * TILES[] = { @@ -92,12 +92,12 @@ fd_topo_run_tile_t * TILES[] = { &fd_tile_store_int, &fd_tile_replay, &fd_tile_replay_thread, - &fd_tile_snaps_thread, + &fd_tile_batch, + &fd_tile_batch_thread, &fd_tile_poh_int, &fd_tile_sender, &fd_tile_eqvoc, &fd_tile_rpcserv, - &fd_tile_snaps, #endif NULL, }; diff --git a/src/disco/topo/fd_topo.c b/src/disco/topo/fd_topo.c index 8213ea09ef..0075d0824e 100644 --- a/src/disco/topo/fd_topo.c +++ b/src/disco/topo/fd_topo.c @@ -198,10 +198,10 @@ fd_topo_tile_extra_huge_pages( fd_topo_tile_t const * tile ) { pages need to be reserved as well. */ extra_pages += tile->replay.tpool_thread_count*((FD_TILE_PRIVATE_STACK_SZ/FD_SHMEM_HUGE_PAGE_SZ)+2UL); } - else if( FD_UNLIKELY ( !strcmp( tile->name, "snaps" ) ) ) { - /* Snapshot tile spawns a bunch of extra threads which also require + else if( FD_UNLIKELY ( !strcmp( tile->name, "batch" ) ) ) { + /* Batch tile spawns a bunch of extra threads which also require stack space. These huge pages need to be reserved as well. */ - extra_pages += tile->snaps.hash_tpool_thread_count *((FD_TILE_PRIVATE_STACK_SZ/FD_SHMEM_HUGE_PAGE_SZ)+2UL); + extra_pages += tile->batch.hash_tpool_thread_count*((FD_TILE_PRIVATE_STACK_SZ/FD_SHMEM_HUGE_PAGE_SZ)+2UL); } diff --git a/src/disco/topo/fd_topo.h b/src/disco/topo/fd_topo.h index dd44c07404..ea2ee4fd13 100644 --- a/src/disco/topo/fd_topo.h +++ b/src/disco/topo/fd_topo.h @@ -351,7 +351,7 @@ typedef struct { int full_snapshot_fd; int incremental_snapshot_fd; ulong hash_tpool_thread_count; - } snaps; + } batch; }; } fd_topo_tile_t; diff --git a/src/disco/topo/fd_topob.c b/src/disco/topo/fd_topob.c index fe713ea82f..467f6e9a8d 100644 --- a/src/disco/topo/fd_topob.c +++ b/src/disco/topo/fd_topob.c @@ -358,8 +358,8 @@ fd_topob_auto_layout( fd_topo_t * topo ) { "sender", /* FIREDANCER only */ "eqvoc", /* FIREDANCER only */ "rpcsrv", /* FIREDANCER only */ - "snaps", /* FIREDANCER only */ - "stpool", /* FIREDANCER only */ + "batch", /* FIREDANCER only */ + "btpool", /* FIREDANCER only */ #endif }; diff --git a/src/flamenco/runtime/fd_blockstore.c b/src/flamenco/runtime/fd_blockstore.c index 0d3401b8b2..390d4ab6a8 100644 --- a/src/flamenco/runtime/fd_blockstore.c +++ b/src/flamenco/runtime/fd_blockstore.c @@ -554,7 +554,7 @@ fd_blockstore_scan_block( fd_blockstore_t * blockstore, ulong slot, fd_block_t * } if( FD_UNLIKELY( blockoff < batch_end_off ) ) { if( FD_LIKELY( allow_trailing ) ) { - FD_LOG_NOTICE(( "ignoring %lu trailing bytes in slot %lu batch %lu", batch_end_off-blockoff, slot, batch_i )); + FD_LOG_DEBUG(( "ignoring %lu trailing bytes in slot %lu batch %lu", batch_end_off-blockoff, slot, batch_i )); } if( FD_UNLIKELY( !allow_trailing ) ) { FD_LOG_ERR(( "%lu trailing bytes in slot %lu batch %lu", batch_end_off-blockoff, slot, batch_i )); diff --git a/src/flamenco/snapshot/fd_snapshot_create.h b/src/flamenco/snapshot/fd_snapshot_create.h index 57fbae7f2d..1d026a375a 100644 --- a/src/flamenco/snapshot/fd_snapshot_create.h +++ b/src/flamenco/snapshot/fd_snapshot_create.h @@ -24,7 +24,12 @@ #define FD_SNAPSHOT_TMP_FULL_ARCHIVE_ZSTD (".tmp.tar.zst") #define FD_SNAPSHOT_TMP_INCR_ARCHIVE_ZSTD (".tmp_inc.tar.zst") -#define FD_SNAPSHOT_APPEND_VEC_SZ_MAX (16UL * 1024UL * 1024UL * 1024UL) /* 16 MiB */ +/* This is a relatively arbitrary constant. The max size of a snapshot append + vec file is 16MiB but this value can cause problems in practice according + to the Agave team. + + TODO: Figure out exactly what those problems are. */ +#define FD_SNAPSHOT_APPEND_VEC_SZ_MAX (2UL * 1024UL * 1024UL * 1024UL) /* 2 MiB */ FD_PROTOTYPES_BEGIN @@ -74,28 +79,54 @@ struct fd_snapshot_ctx { }; typedef struct fd_snapshot_ctx fd_snapshot_ctx_t; -/* fd_snapshot_create_populate_fseq, fd_snapshot_create_is_incremental, and +/* TODO: These functions should be moved elsewhere to a more common file as + these functions are used by the replay tile and the batch tile for more + than just snapshot creation. + + fd_snapshot_create_populate_fseq, fd_snapshot_create_is_incremental, and fd_snapshot_create_get_slot are helpers used to pack and unpack the fseq used to indicate if a snapshot is ready to be created and if the snapshot shoudl be incremental. The other bytes represent the slot that the snapshot will be created for. The most significant 8 bits determine if the snapshot is incremental and the least significant 56 bits are reserved for the slot. + + These functions are used for snapshot creation in the full client. + + fd_batch_fseq_pack, fd_batch_fseq_is_snapshot, fd_batch_fseq_is_eah, + fd_batch_fseq_is_incremental, and fd_batch_fseq_get_slot are helpers used + by the replay tile and the batch tile to communicate what work the + batch tile should do. At the moment of this writing, the batch tile can + either calculate the epoch account hash or create a snapshot. + + The msb is used to determine if the batch tile should calculate the epoch + account hash or produce a snapshot. The next msb is used to determine if + the snapshot is incremental, this bit is ignored if the epoch account + hash is being calculated. The remaining 62 bits are used to store the slot + at which the snapshot/hash should be calculated for. */ + +static ulong FD_FN_UNUSED +fd_batch_fseq_pack( ulong is_snapshot, ulong is_incremental, ulong smr ) { + return ((is_snapshot & 0x1UL) << 63UL) | ((is_incremental & 0x1UL) << 62UL) | (smr & 0x3FFFFFFFFFFFFFFUL); +} - These functions are used for snapshot creation in the full client. */ +static ulong FD_FN_UNUSED +fd_batch_fseq_is_snapshot( ulong fseq ) { + return (fseq >> 63UL) & 0x1UL; +} static ulong FD_FN_UNUSED -fd_snapshot_create_pack_fseq( ulong is_incremental, ulong smr ) { - return (is_incremental << 56UL) | (smr & 0xFFFFFFFFFFFFFFUL); +fd_batch_fseq_is_eah( ulong fseq ) { + return !((fseq >> 63UL) & 0x1UL); } static ulong FD_FN_UNUSED -fd_snapshot_create_get_is_incremental( ulong fseq ) { - return (fseq >> 56UL) & 0xFF; +fd_batch_fseq_is_incremental( ulong fseq ) { + return (fseq >> 62UL) & 0x1UL; } static ulong FD_FN_UNUSED -fd_snapshot_create_get_slot( ulong fseq ) { - return fseq & 0xFFFFFFFFFFFFFFUL; +fd_batch_fseq_get_slot( ulong fseq ) { + return fseq & 0x3FFFFFFFFFFFFFFUL; } /* fd_snapshot_create_new_snapshot is responsible for creating the different