Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

blockstore: fix and document locking #3734

Merged
merged 1 commit into from
Jan 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions src/app/fdctl/run/tiles/fd_replay.c
Original file line number Diff line number Diff line change
Expand Up @@ -1131,17 +1131,19 @@ after_frag( fd_replay_tile_ctx_t * ctx,
ulong parent_slot = ctx->parent_slot;
ulong flags = ctx->flags;
ulong bank_idx = ctx->bank_idx;
if ( FD_UNLIKELY( curr_slot < ctx->tower->root ) ) {
if( FD_UNLIKELY( curr_slot < ctx->tower->root ) ) {
FD_LOG_WARNING(( "ignoring replay of slot %lu (parent: %lu). earlier than our root %lu.", curr_slot, parent_slot, ctx->tower->root ));
return;
}

if ( FD_UNLIKELY( parent_slot < ctx->tower->root ) ) {
if( FD_UNLIKELY( parent_slot < ctx->tower->root ) ) {
FD_LOG_WARNING(( "ignoring replay of slot %lu (parent: %lu). parent slot is earlier than our root %lu.", curr_slot, parent_slot, ctx->tower->root ));
return;
}

fd_blockstore_start_write( ctx->blockstore );
fd_block_map_t * parent_block_map_entry = fd_blockstore_block_map_query( ctx->blockstore, parent_slot );
fd_blockstore_end_write( ctx->blockstore );
if( FD_UNLIKELY( !parent_block_map_entry ) ) {
FD_LOG_WARNING(( "[%s] unable to find slot %lu's parent block_map_entry", __func__, curr_slot ));
return;
Expand All @@ -1163,7 +1165,7 @@ after_frag( fd_replay_tile_ctx_t * ctx,
fork->frozen = 0). */

fd_fork_t * parent_fork = fd_fork_frontier_ele_query( ctx->forks->frontier, &ctx->parent_slot, NULL, ctx->forks->pool );
if( FD_UNLIKELY ( parent_fork && parent_fork->lock ) ) {
if( FD_UNLIKELY( parent_fork && parent_fork->lock ) ) {
FD_LOG_ERR(
( "parent slot is frozen in frontier. cannot execute. slot: %lu, parent_slot: %lu",
curr_slot,
Expand Down Expand Up @@ -1211,10 +1213,11 @@ after_frag( fd_replay_tile_ctx_t * ctx,
if( res != 0UL && !( flags & REPLAY_FLAG_PACKED_MICROBLOCK ) ) {
FD_LOG_WARNING(( "block invalid - slot: %lu", curr_slot ));

fd_blockstore_start_write( ctx->blockstore );

fd_block_map_t * block_map_entry = fd_blockstore_block_map_query( ctx->blockstore, curr_slot );
fd_block_t * block_ = fd_blockstore_block_query( ctx->blockstore, curr_slot );

fd_blockstore_start_write( ctx->blockstore );

if( FD_LIKELY( block_ ) ) {
block_map_entry->flags = fd_uchar_set_bit( block_map_entry->flags, FD_BLOCK_FLAG_DEADBLOCK );
Expand Down Expand Up @@ -1248,8 +1251,11 @@ after_frag( fd_replay_tile_ctx_t * ctx,
for( ulong i = 0UL; i<ctx->bank_cnt; i++ ) {
fd_tpool_wait( ctx->tpool, i+1 );
}

fd_blockstore_start_read( ctx->blockstore );
fd_block_map_t * block_map_entry = fd_blockstore_block_map_query( ctx->blockstore, curr_slot );
fd_block_t * block_ = fd_blockstore_block_query( ctx->blockstore, curr_slot );
fd_blockstore_end_read( ctx->blockstore );
fork->slot_ctx.block = block_;

/* TODO:FIXME: This needs to be unhacked. */
Expand Down
6 changes: 4 additions & 2 deletions src/app/fdctl/run/tiles/fd_store_int.c
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,7 @@ fd_store_tile_slot_prepare( fd_store_tile_ctx_t * ctx,

uchar * out_buf = fd_chunk_to_laddr( ctx->replay_out_mem, ctx->replay_out_chunk );

fd_blockstore_start_read( ctx->blockstore );
fd_block_t * block = fd_blockstore_block_query( ctx->blockstore, slot );
if( block == NULL ) {
FD_LOG_ERR(( "could not find block - slot: %lu", slot ));
Expand All @@ -396,6 +397,7 @@ fd_store_tile_slot_prepare( fd_store_tile_ctx_t * ctx,
}

fd_hash_t const * block_hash = fd_blockstore_block_hash_query( ctx->blockstore, slot );
fd_blockstore_end_read( ctx->blockstore );
if( block_hash == NULL ) {
FD_LOG_ERR(( "could not find slot meta" ));
}
Expand Down Expand Up @@ -743,8 +745,8 @@ unprivileged_init( fd_topo_t * topo,
while( fgets( buf, sizeof( buf ), file ) ) {
char * endptr;
ulong slot = strtoul( buf, &endptr, 10 );
fd_block_map_t * block_map_entry = fd_blockstore_block_map_query( ctx->blockstore, slot );
block_map_entry->flags = 0;
fd_block_map_t * block_map_entry = fd_blockstore_block_map_query( ctx->blockstore, slot );
block_map_entry->flags = 0;
fd_store_add_pending( ctx->store, slot, (long)cnt++, 0, 0 );
}
fd_blockstore_end_write( ctx->blockstore );
Expand Down
2 changes: 2 additions & 0 deletions src/choreo/forks/fd_forks.c
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,9 @@ fd_forks_prepare( fd_forks_t const * forks,

/* Check the parent block is present in the blockstore and executed. */

fd_blockstore_start_read( blockstore );
fd_block_t * block = fd_blockstore_block_query( blockstore, parent_slot );
fd_blockstore_end_read( blockstore );
if( FD_UNLIKELY( !block ) ) {
FD_LOG_WARNING(( "fd_forks_prepare missing parent_slot %lu", parent_slot ));
}
Expand Down
9 changes: 8 additions & 1 deletion src/disco/store/fd_store.c
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,9 @@ fd_store_slot_prepare( fd_store_t * store,
goto end;
}

fd_blockstore_start_read( store->blockstore );
fd_block_t * parent_block = fd_blockstore_block_query( store->blockstore, parent_slot );
fd_blockstore_end_read( store->blockstore );

/* We have a parent slot meta, and therefore have at least one shred of the parent block, so we
have the ancestry and need to repair that block directly (as opposed to calling repair orphan).
Expand Down Expand Up @@ -351,6 +353,7 @@ fd_store_slot_repair( fd_store_t * store,
backoff->last_repair_time = store->now;

ulong repair_req_cnt = 0;
fd_blockstore_start_read( store->blockstore );
fd_block_map_t * block_map_entry = fd_blockstore_block_map_query( store->blockstore, slot );

if( FD_LIKELY( !block_map_entry ) ) {
Expand All @@ -377,6 +380,7 @@ fd_store_slot_repair( fd_store_t * store,
if( repair_req_cnt==out_repair_reqs_sz ) {
backoff->last_backoff_duration += backoff->last_backoff_duration>>2;
FD_LOG_INFO( ( "[repair] MAX need %lu [%u, %u], sent %lu requests (backoff: %ld ms)", slot, block_map_entry->consumed_idx + 1, complete_idx, repair_req_cnt, backoff->last_backoff_duration/(long)1e6 ) );
fd_blockstore_end_read( store->blockstore );
return repair_req_cnt;
}

Expand All @@ -395,12 +399,13 @@ fd_store_slot_repair( fd_store_t * store,
}

if( !good ) {
fd_blockstore_end_read( store->blockstore );
return repair_req_cnt;
}

/* Fill in what's missing */
for( uint i = block_map_entry->consumed_idx + 1; i <= complete_idx; i++ ) {
if( fd_buf_shred_query( store->blockstore, slot, i ) != NULL ) continue;
if( FD_UNLIKELY( fd_buf_shred_query( store->blockstore, slot, i ) != NULL) ) continue;

fd_repair_request_t * repair_req = &out_repair_reqs[repair_req_cnt++];
repair_req->shred_index = i;
Expand All @@ -410,9 +415,11 @@ fd_store_slot_repair( fd_store_t * store,
if( repair_req_cnt == out_repair_reqs_sz ) {
backoff->last_backoff_duration += backoff->last_backoff_duration>>2;
FD_LOG_INFO( ( "[repair] MAX need %lu [%u, %u], sent %lu requests (backoff: %ld ms)", slot, block_map_entry->consumed_idx + 1, complete_idx, repair_req_cnt, backoff->last_backoff_duration/(long)1e6 ) );
fd_blockstore_end_read( store->blockstore );
return repair_req_cnt;
}
}
fd_blockstore_end_read( store->blockstore );
if( repair_req_cnt ) {
backoff->last_backoff_duration += backoff->last_backoff_duration>>2;
FD_LOG_INFO( ( "[repair] need %lu [%u, %u], sent %lu requests (backoff: %ld ms)", slot, block_map_entry->consumed_idx + 1, complete_idx, repair_req_cnt, backoff->last_backoff_duration/(long)1e6 ) );
Expand Down
1 change: 0 additions & 1 deletion src/flamenco/runtime/fd_blockstore.c
Original file line number Diff line number Diff line change
Expand Up @@ -661,7 +661,6 @@ fd_blockstore_slot_remove( fd_blockstore_t * blockstore, ulong slot ) {
return;
}

/* Remove all the unassembled shreds for a slot */
int
fd_blockstore_buffered_shreds_remove( fd_blockstore_t * blockstore, ulong slot ) {
fd_block_map_t * block_map = fd_blockstore_block_map( blockstore );
Expand Down
129 changes: 88 additions & 41 deletions src/flamenco/runtime/fd_blockstore.h
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,10 @@ fd_blockstore_delete( void * shblockstore );
fd_blockstore_t *
fd_blockstore_init( fd_blockstore_t * blockstore, int fd, ulong fd_size_max, fd_slot_bank_t const * slot_bank );

/* fd_blockstore_fini finalizes a blockstore. */
/* fd_blockstore_fini finalizes a blockstore.

IMPORTANT! Caller MUST hold the read lock when calling this
function. */

void
fd_blockstore_fini( fd_blockstore_t * blockstore );
Expand Down Expand Up @@ -555,8 +558,8 @@ fd_blockstore_alloc( fd_blockstore_t * blockstore ) {
return fd_wksp_laddr_fast( fd_blockstore_wksp( blockstore), blockstore->alloc_gaddr );
}

/* fd_blockstore_block_data_laddr returns a local pointer to the block's data. The returned pointer
* lifetime is until the block is removed. Check return value for error info. */
/* fd_blockstore_block_data_laddr returns a local pointer to the block's
data. The returned pointer lifetime is until the block is removed. */

FD_FN_PURE static inline uchar *
fd_blockstore_block_data_laddr( fd_blockstore_t * blockstore, fd_block_t * block ) {
Expand All @@ -568,50 +571,70 @@ fd_blockstore_block_batch_laddr( fd_blockstore_t * blockstore, fd_block_t * bloc
return fd_wksp_laddr_fast( fd_blockstore_wksp( blockstore ), block->batch_gaddr );
}

/* Query blockstore for shred at slot, shred_idx. Returns a pointer to the shred or NULL if not in
* blockstore. The returned pointer lifetime is until the shred is removed. Check return value for
* error info. This API only works for shreds from incomplete blocks.
*
* Callers should hold the read lock during the entirety of its read to ensure the pointer remains
* valid.
*/
/* fd_buf_shred_query queries the blockstore for shred at slot,
shred_idx. Returns a pointer to the shred or NULL if not in
blockstore. The returned pointer lifetime is until the shred is
removed. Check return value for error info. This API only works for
shreds from incomplete blocks.

Callers should hold the read lock during the entirety of its read to
ensure the pointer remains valid. */
fd_shred_t *
fd_buf_shred_query( fd_blockstore_t * blockstore, ulong slot, uint shred_idx );

/* Query blockstore for shred at slot, shred_idx. Copies the shred
* data to the given buffer and returns the data size. Returns -1 on failure.
*
* Callers should hold the read lock during the entirety of this call.
*/
/* fd_buf_shred_query_copy_data queries the blockstore for shred at
slot, shred_idx. Copies the shred data to the given buffer and
returns the data size. Returns -1 on failure.

IMPORTANT! Caller MUST hold the read lock when calling this
function. */
long
fd_buf_shred_query_copy_data( fd_blockstore_t * blockstore,
ulong slot,
uint shred_idx,
void * buf,
ulong buf_max );

/* Query blockstore for block at slot. Returns a pointer to the block or NULL if not in
* blockstore. The returned pointer lifetime is until the block is removed. Check return value for
* error info. */
/* fd_blockstore_block_query queries blockstore for block at slot.
Returns a pointer to the block or NULL if not in blockstore. The
returned pointer lifetime is until the block is removed. Check
return value for error info.

IMPORTANT! Caller MUST hold the read lock when calling this
function. */
fd_block_t *
fd_blockstore_block_query( fd_blockstore_t * blockstore, ulong slot );

/* Query blockstore for the block hash at slot. This is the final poh
hash for a slot. */
/* fd_blockstore_block_hash_query queries blockstore for the block hash
at slot. This is the final poh hash for a slot.

IMPORTANT! Caller MUST hold the read lock when calling this
function. */
fd_hash_t const *
fd_blockstore_block_hash_query( fd_blockstore_t * blockstore, ulong slot );

/* Query blockstore for the bank hash for a given slot. */
/* fd_blockstore_bank_hash_query query blockstore for the bank hash for
a given slot.

IMPORTANT! Caller MUST hold the read lock when calling this
function. */
fd_hash_t const *
fd_blockstore_bank_hash_query( fd_blockstore_t * blockstore, ulong slot );

/* Query blockstore for the block map entry at slot. Returns a pointer
to the slot meta or NULL if not in blockstore. The returned pointer
lifetime is until the slot meta is removed. */
/* fd_blockstore_block_map_query queries the blockstore for the block
map entry at slot. Returns a pointer to the slot meta or NULL if not
in blockstore. The returned pointer lifetime is until the slot meta
is removed.

IMPORTANT! Caller MUST hold the read lock when calling this
function. */
fd_block_map_t *
fd_blockstore_block_map_query( fd_blockstore_t * blockstore, ulong slot );

/* Query the parent slot of slot. */
/* fd_blockstore_parent_slot_query queries the parent slot of slot.

IMPORTANT! Caller MUST hold the read lock when calling this
function. */
ulong
fd_blockstore_parent_slot_query( fd_blockstore_t * blockstore, ulong slot );

Expand All @@ -620,13 +643,20 @@ fd_blockstore_parent_slot_query( fd_blockstore_t * blockstore, ulong slot );
on success, FD_BLOCKSTORE_ERR_SLOT_MISSING if slot is not in the
blockstore. The returned slot array is always <= the max size
FD_BLOCKSTORE_CHILD_SLOT_MAX and contiguous. Empty slots in the
array are set to FD_SLOT_NULL. */
array are set to FD_SLOT_NULL.

IMPORTANT! Caller MUST hold the read lock when calling this
function. */

int
fd_blockstore_child_slots_query( fd_blockstore_t * blockstore, ulong slot, ulong ** slots_out, ulong * slot_cnt );

/* Query the frontier ie. all the blocks that need to be replayed that haven't been. These are the
slot children of the current frontier that are shred complete. */
/* fd_blockstore_block_frontier_query query the frontier i.e. all the
blocks that need to be replayed that haven't been. These are the
slot children of the current frontier that are shred complete.

IMPORTANT! Caller MUST hold the read lock when calling this
function. */
fd_block_t *
fd_blockstore_block_frontier_query( fd_blockstore_t * blockstore,
ulong * parents,
Expand Down Expand Up @@ -665,7 +695,11 @@ fd_blockstore_block_map_query_volatile( fd_blockstore_t * blockstore,
ulong slot,
fd_block_map_t * block_map_entry_out );

/* Query the transaction data for the given signature */
/* fd_blockstore_txn_query queries the transaction data for the given
signature.

IMPORTANT! Caller MUST hold the read lock when calling this
function. */
fd_txn_map_t *
fd_blockstore_txn_query( fd_blockstore_t * blockstore, uchar const sig[static FD_ED25519_SIG_SZ] );

Expand All @@ -681,25 +715,38 @@ fd_blockstore_txn_query_volatile( fd_blockstore_t * blockstore,
uchar * blk_flags,
uchar txn_data_out[FD_TXN_MTU] );

/* Remove slot from blockstore, including all relevant internal structures. */
/* fd_blockstore_slot_remove removes slot from blockstore, including all
relevant internal structures.

IMPORTANT! Caller MUST hold the write lock when calling this
function. */
void
fd_blockstore_slot_remove( fd_blockstore_t * blockstore, ulong slot );

/* Operations */

/* Insert shred into the blockstore, fast O(1). Fail if this shred is already in the blockstore or
* the blockstore is full. Returns an error code indicating success or failure.
*
* TODO eventually this will need to support "upsert" duplicate shred handling.
*/
/* fd_buf_shred_insert inserts shred into the blockstore, fast O(1).
Fail if this shred is already in the blockstore or the blockstore is
full. Returns an error code indicating success or failure.
TODO eventually this will need to support "upsert" duplicate shred handling.

IMPORTANT! Caller MUST hold the write lock when calling this
function. */
int
fd_buf_shred_insert( fd_blockstore_t * blockstore, fd_shred_t const * shred );

/* Remove all the unassembled shreds for a slot */
/* fd_blockstore_buffered_shreds_remove removes all the unassembled shreds
for a slot

IMPORTANT! Caller MUST hold the write lock when calling this
function. */
int
fd_blockstore_buffered_shreds_remove( fd_blockstore_t * blockstore, ulong slot );

/* Set the block height. */
/* fd_blockstore_block_height_update sets the block height.

IMPORTANT! Caller MUST hold the write lock when calling this
function. */
void
fd_blockstore_block_height_update( fd_blockstore_t * blockstore, ulong slot, ulong block_height );

Expand All @@ -720,25 +767,25 @@ fd_blockstore_block_height_update( fd_blockstore_t * blockstore, ulong slot, ulo
void
fd_blockstore_publish( fd_blockstore_t * blockstore, int fd, ulong smr );

/* Acquire a read lock */
/* fd_blockstore_start_read acquires the read lock */
static inline void
fd_blockstore_start_read( fd_blockstore_t * blockstore ) {
fd_rwseq_start_read( &blockstore->lock );
}

/* Release a read lock */
/* fd_blockstore_end_read releases the read lock */
static inline void
fd_blockstore_end_read( fd_blockstore_t * blockstore ) {
fd_rwseq_end_read( &blockstore->lock );
}

/* Acquire a write lock */
/* fd_blockstore_start_write acquire the write lock */
static inline void
fd_blockstore_start_write( fd_blockstore_t * blockstore ) {
fd_rwseq_start_write( &blockstore->lock );
}

/* Release a write lock */
/* fd_blockstore_end_write releases the write lock */
static inline void
fd_blockstore_end_write( fd_blockstore_t * blockstore ) {
fd_rwseq_end_write( &blockstore->lock );
Expand Down
4 changes: 2 additions & 2 deletions src/flamenco/runtime/fd_rocksdb.c
Original file line number Diff line number Diff line change
Expand Up @@ -739,12 +739,12 @@ fd_rocksdb_import_block_blockstore( fd_rocksdb_t * db,
FD_BLOCK_FLAG_FINALIZED );
}

fd_blockstore_end_write(blockstore);
fd_blockstore_end_write( blockstore );
return 0;
}

int
fd_rocksdb_import_block_shredcap( fd_rocksdb_t * db,
fd_rocksdb_import_block_shredcap( fd_rocksdb_t * db,
fd_slot_meta_t * metadata,
fd_io_buffered_ostream_t * ostream,
fd_io_buffered_ostream_t * bank_hash_ostream ) {
Expand Down
Loading
Loading