Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into repair-service
Browse files Browse the repository at this point in the history
  • Loading branch information
asiegel-jt committed May 10, 2024
2 parents 78851a3 + 8f3a21d commit 9f3418c
Show file tree
Hide file tree
Showing 8 changed files with 168 additions and 101 deletions.
1 change: 1 addition & 0 deletions .github/workflows/test_firedancer.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ on:
workflow_call:
workflow_dispatch:
pull_request:
merge_group:
jobs:
firedancer-tests:
runs-on: private
Expand Down
20 changes: 3 additions & 17 deletions src/app/fdctl/run/tiles/fd_repair.c
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,6 @@ scratch_footprint( fd_topo_tile_t const * tile FD_PARAM_UNUSED) {
l = FD_LAYOUT_APPEND( l, fd_repair_align(), fd_repair_footprint() );
l = FD_LAYOUT_APPEND( l, fd_scratch_smem_align(), fd_scratch_smem_footprint( FD_REPAIR_SCRATCH_MAX ) );
l = FD_LAYOUT_APPEND( l, fd_scratch_fmem_align(), fd_scratch_fmem_footprint( FD_REPAIR_SCRATCH_DEPTH ) );
l = FD_LAYOUT_APPEND( l, fd_alloc_align(), fd_alloc_footprint() );
l = FD_LAYOUT_APPEND( l, fd_stake_ci_align(), fd_stake_ci_footprint() );
return FD_LAYOUT_FINI( l, scratch_align() );
}
Expand Down Expand Up @@ -471,8 +470,6 @@ unprivileged_init( fd_topo_t * topo,

ctx->wksp = topo->workspaces[ topo->objs[ tile->tile_obj_id ].wksp_id ].wksp;

void * alloc_shmem = FD_SCRATCH_ALLOC_APPEND( l, fd_alloc_align(), fd_alloc_footprint() );

ctx->repair_intake_addr.addr = tile->repair.ip_addr;
ctx->repair_intake_addr.port = fd_ushort_bswap( tile->repair.repair_intake_listen_port );

Expand Down Expand Up @@ -538,20 +535,9 @@ unprivileged_init( fd_topo_t * topo,
ctx->repair_req_in_chunk0 = fd_dcache_compact_chunk0( ctx->repair_req_in_mem, repair_req_in_link->dcache );
ctx->repair_req_in_wmark = fd_dcache_compact_wmark ( ctx->repair_req_in_mem, repair_req_in_link->dcache, repair_req_in_link->mtu );

/* Valloc setup */
void * alloc_shalloc = fd_alloc_new( alloc_shmem, 3UL );
if( FD_UNLIKELY( !alloc_shalloc ) ) {
FD_LOG_ERR( ( "fd_allow_new failed" ) ); }
fd_alloc_t * alloc = fd_alloc_join( alloc_shalloc, 3UL );
if( FD_UNLIKELY( !alloc ) ) {
FD_LOG_ERR( ( "fd_alloc_join failed" ) );
}

fd_valloc_t valloc = fd_alloc_virtual( alloc );

/* Gossip set up */
/* Repair set up */

ctx->repair = fd_repair_join( fd_repair_new( ctx->repair, ctx->repair_seed, valloc ) );
ctx->repair = fd_repair_join( fd_repair_new( ctx->repair, ctx->repair_seed ) );

FD_LOG_NOTICE(( "repair my addr - intake addr: " FD_IP4_ADDR_FMT ":%u, serve_addr: " FD_IP4_ADDR_FMT ":%u",
FD_IP4_ADDR_FMT_ARGS( ctx->repair_intake_addr.addr ), fd_ushort_bswap( ctx->repair_intake_addr.port ),
Expand All @@ -566,7 +552,7 @@ unprivileged_init( fd_topo_t * topo,
ctx->repair_config.serv_get_parent_fun = repair_get_parent;

if( fd_repair_set_config( ctx->repair, &ctx->repair_config ) ) {
FD_LOG_ERR( ( "error setting gossip config" ) );
FD_LOG_ERR( ( "error setting repair config" ) );
}

fd_repair_update_addr( ctx->repair, &ctx->repair_intake_addr, &ctx->repair_serve_addr );
Expand Down
2 changes: 1 addition & 1 deletion src/disco/consensus/test_consensus.c
Original file line number Diff line number Diff line change
Expand Up @@ -694,7 +694,7 @@ main( int argc, char ** argv ) {
void * repair_mem =
fd_wksp_alloc_laddr( wksp, fd_repair_align(), fd_repair_footprint(), TEST_CONSENSUS_MAGIC );
fd_repair_t * repair =
fd_repair_join( fd_repair_new( repair_mem, TEST_CONSENSUS_MAGIC, valloc ) );
fd_repair_join( fd_repair_new( repair_mem, TEST_CONSENSUS_MAGIC ) );

fd_repair_config_t repair_config;
repair_config.public_key = &public_key;
Expand Down
2 changes: 1 addition & 1 deletion src/disco/tvu/fd_tvu.c
Original file line number Diff line number Diff line change
Expand Up @@ -1246,7 +1246,7 @@ fd_tvu_main_setup( fd_runtime_ctx_t * runtime_ctx,
runtime_ctx->repair_config.sign_arg = NULL;

void * repair_mem = fd_valloc_malloc( valloc, fd_repair_align(), fd_repair_footprint() );
fd_repair_t * repair = fd_repair_join( fd_repair_new( repair_mem, funk_setup_out.hashseed, valloc ) );
fd_repair_t * repair = fd_repair_join( fd_repair_new( repair_mem, funk_setup_out.hashseed ) );
runtime_ctx->repair = repair;

if( fd_repair_set_config( repair, &runtime_ctx->repair_config ) ) runtime_ctx->blowup = 1;
Expand Down
164 changes: 88 additions & 76 deletions src/flamenco/repair/fd_repair.c
Original file line number Diff line number Diff line change
Expand Up @@ -226,31 +226,38 @@ struct fd_repair {
/* Stake weights */
ulong stake_weights_cnt;
fd_stake_weight_t * stake_weights;
/* Heap allocator */
fd_valloc_t valloc;
};

ulong
fd_repair_align ( void ) { return alignof(fd_repair_t); }
fd_repair_align ( void ) { return 128UL; }

ulong
fd_repair_footprint( void ) { return sizeof(fd_repair_t); }
fd_repair_footprint( void ) {
ulong l = FD_LAYOUT_INIT;
l = FD_LAYOUT_APPEND( l, alignof(fd_repair_t), sizeof(fd_repair_t) );
l = FD_LAYOUT_APPEND( l, fd_active_table_align(), fd_active_table_footprint(FD_ACTIVE_KEY_MAX) );
l = FD_LAYOUT_APPEND( l, fd_needed_table_align(), fd_needed_table_footprint(FD_NEEDED_KEY_MAX) );
l = FD_LAYOUT_APPEND( l, fd_dupdetect_table_align(), fd_dupdetect_table_footprint(FD_NEEDED_KEY_MAX) );
l = FD_LAYOUT_APPEND( l, fd_pinged_table_align(), fd_pinged_table_footprint(FD_REPAIR_PINGED_MAX) );
l = FD_LAYOUT_APPEND( l, fd_stake_weight_align(), FD_STAKE_WEIGHTS_MAX * fd_stake_weight_footprint() );
return FD_LAYOUT_FINI(l, fd_repair_align() );
}

void *
fd_repair_new ( void * shmem, ulong seed, fd_valloc_t valloc ) {
fd_memset(shmem, 0, sizeof(fd_repair_t));
fd_repair_t * glob = (fd_repair_t *)shmem;
glob->valloc = valloc;
glob->seed = seed;
void * shm = fd_valloc_malloc(valloc, fd_active_table_align(), fd_active_table_footprint(FD_ACTIVE_KEY_MAX));
fd_repair_new ( void * shmem, ulong seed ) {
FD_SCRATCH_ALLOC_INIT(l, shmem);
fd_repair_t * glob = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_repair_t), sizeof(fd_repair_t) );
fd_memset(glob, 0, sizeof(fd_repair_t));
void * shm = FD_SCRATCH_ALLOC_APPEND( l, fd_active_table_align(), fd_active_table_footprint(FD_ACTIVE_KEY_MAX) );
glob->actives = fd_active_table_join(fd_active_table_new(shm, FD_ACTIVE_KEY_MAX, seed));
shm = fd_valloc_malloc(valloc, fd_needed_table_align(), fd_needed_table_footprint(FD_NEEDED_KEY_MAX));
glob->seed = seed;
shm = FD_SCRATCH_ALLOC_APPEND( l, fd_needed_table_align(), fd_needed_table_footprint(FD_NEEDED_KEY_MAX) );
glob->needed = fd_needed_table_join(fd_needed_table_new(shm, FD_NEEDED_KEY_MAX, seed));
shm = fd_valloc_malloc(valloc, fd_dupdetect_table_align(), fd_dupdetect_table_footprint(FD_NEEDED_KEY_MAX));
shm = FD_SCRATCH_ALLOC_APPEND( l, fd_dupdetect_table_align(), fd_dupdetect_table_footprint(FD_NEEDED_KEY_MAX) );
glob->dupdetect = fd_dupdetect_table_join(fd_dupdetect_table_new(shm, FD_NEEDED_KEY_MAX, seed));
shm = fd_valloc_malloc(valloc, fd_pinged_table_align(), fd_pinged_table_footprint(FD_REPAIR_PINGED_MAX));
shm = FD_SCRATCH_ALLOC_APPEND( l, fd_pinged_table_align(), fd_pinged_table_footprint(FD_REPAIR_PINGED_MAX) );
glob->pinged = fd_pinged_table_join(fd_pinged_table_new(shm, FD_REPAIR_PINGED_MAX, seed));
glob->stake_weights = fd_valloc_malloc( valloc, fd_stake_weight_align(), FD_STAKE_WEIGHTS_MAX * fd_stake_weight_footprint() );
glob->stake_weights = FD_SCRATCH_ALLOC_APPEND( l, fd_stake_weight_align(), FD_STAKE_WEIGHTS_MAX * fd_stake_weight_footprint() );
glob->stake_weights_cnt = 0;
glob->last_sends = 0;
glob->last_decay = 0;
Expand All @@ -261,6 +268,11 @@ fd_repair_new ( void * shmem, ulong seed, fd_valloc_t valloc ) {
glob->actives_sticky_cnt = 0;
glob->actives_random_seed = 0;

ulong scratch_top = FD_SCRATCH_ALLOC_FINI(l, 1UL);
if ( scratch_top > (ulong)shmem + fd_repair_footprint() ) {
FD_LOG_ERR(("Enough space not allocated for repair"));
}

return glob;
}

Expand All @@ -271,13 +283,12 @@ void *
fd_repair_leave ( fd_repair_t * join ) { return join; }

void *
fd_repair_delete ( void * shmap, fd_valloc_t valloc ) {
fd_repair_delete ( void * shmap ) {
fd_repair_t * glob = (fd_repair_t *)shmap;
fd_valloc_free(valloc, fd_active_table_delete(fd_active_table_leave(glob->actives)));
fd_valloc_free(valloc, fd_needed_table_delete(fd_needed_table_leave(glob->needed)));
fd_valloc_free(valloc, fd_dupdetect_table_delete(fd_dupdetect_table_leave(glob->dupdetect)));
fd_valloc_free(valloc, fd_pinged_table_delete(fd_pinged_table_leave(glob->pinged)));
fd_valloc_free( valloc, glob->stake_weights );
fd_active_table_delete( fd_active_table_leave( glob->actives ) );
fd_needed_table_delete( fd_needed_table_leave( glob->needed ) );
fd_dupdetect_table_delete( fd_dupdetect_table_leave( glob->dupdetect ) );
fd_pinged_table_delete( fd_pinged_table_leave( glob->pinged ) );
return glob;
}

Expand Down Expand Up @@ -584,63 +595,64 @@ fd_repair_recv_ping(fd_repair_t * glob, fd_gossip_ping_t const * ping, fd_gossip
int
fd_repair_recv_clnt_packet(fd_repair_t * glob, uchar const * msg, ulong msglen, fd_gossip_peer_addr_t const * from) {
fd_repair_lock( glob );
while (1) {
fd_repair_response_t gmsg;
fd_bincode_decode_ctx_t ctx;
ctx.data = msg;
ctx.dataend = msg + msglen;
ctx.valloc = glob->valloc;
if (fd_repair_response_decode(&gmsg, &ctx)) {
/* Solana falls back to assuming we got a shred in this case
https://github.com/solana-labs/solana/blob/master/core/src/repair/serve_repair.rs#L1198 */
break;
}
fd_bincode_destroy_ctx_t ctx2;
ctx2.valloc = glob->valloc;
if (ctx.data != ctx.dataend) {
FD_SCRATCH_SCOPE_BEGIN {
while (1) {
fd_repair_response_t gmsg;
fd_bincode_decode_ctx_t ctx;
ctx.data = msg;
ctx.dataend = msg + msglen;
ctx.valloc = fd_scratch_virtual();
if (fd_repair_response_decode(&gmsg, &ctx)) {
/* Solana falls back to assuming we got a shred in this case
https://github.com/solana-labs/solana/blob/master/core/src/repair/serve_repair.rs#L1198 */
break;
}
fd_bincode_destroy_ctx_t ctx2;
ctx2.valloc = fd_scratch_virtual();
if (ctx.data != ctx.dataend) {
fd_repair_response_destroy(&gmsg, &ctx2);
break;
}

switch (gmsg.discriminant) {
case fd_repair_response_enum_ping:
fd_repair_recv_ping(glob, &gmsg.inner.ping, from);
break;
}

fd_repair_response_destroy(&gmsg, &ctx2);
break;
fd_repair_unlock( glob );
return 0;
}

switch (gmsg.discriminant) {
case fd_repair_response_enum_ping:
fd_repair_recv_ping(glob, &gmsg.inner.ping, from);
break;
/* Look at the nonse */
if ( msglen < sizeof(fd_repair_nonce_t) ) {
fd_repair_unlock( glob );
return 0;
}
ulong shredlen = msglen - sizeof(fd_repair_nonce_t); /* Nonce is at the end */
fd_repair_nonce_t key = *(fd_repair_nonce_t const *)(msg + shredlen);
fd_needed_elem_t * val = fd_needed_table_query(glob->needed, &key, NULL);
if ( NULL == val ) {
fd_repair_unlock( glob );
return 0;
}

fd_repair_response_destroy(&gmsg, &ctx2);
fd_repair_unlock( glob );
return 0;
}
fd_active_elem_t * active = fd_active_table_query( glob->actives, &val->id, NULL );
if ( NULL != active ) {
/* Update statistics */
active->avg_reps++;
active->avg_lat += glob->now - val->when;
}

/* Look at the nonse */
if ( msglen < sizeof(fd_repair_nonce_t) ) {
fd_shred_t const * shred = fd_shred_parse(msg, shredlen);
fd_repair_unlock( glob );
return 0;
}
ulong shredlen = msglen - sizeof(fd_repair_nonce_t); /* Nonce is at the end */
fd_repair_nonce_t key = *(fd_repair_nonce_t const *)(msg + shredlen);
fd_needed_elem_t * val = fd_needed_table_query(glob->needed, &key, NULL);
if ( NULL == val ) {
fd_repair_unlock( glob );
return 0;
}

fd_active_elem_t * active = fd_active_table_query( glob->actives, &val->id, NULL );
if ( NULL != active ) {
/* Update statistics */
active->avg_reps++;
active->avg_lat += glob->now - val->when;
}

fd_shred_t const * shred = fd_shred_parse(msg, shredlen);
fd_repair_unlock( glob );
if (shred == NULL) {
FD_LOG_WARNING(("invalid shread"));
} else {
(*glob->deliver_fun)(shred, shredlen, from, &val->id, glob->fun_arg);
}

if (shred == NULL) {
FD_LOG_WARNING(("invalid shread"));
} else {
(*glob->deliver_fun)(shred, shredlen, from, &val->id, glob->fun_arg);
}
} FD_SCRATCH_SCOPE_END;
return 0;
}

Expand Down Expand Up @@ -928,7 +940,7 @@ fd_repair_send_ping(fd_repair_t * glob, fd_gossip_peer_addr_t const * addr, fd_p
fd_hash_copy( &ping->from, glob->public_key );
for ( ulong i = 0; i < FD_HASH_FOOTPRINT / sizeof(ulong); ++i )
ping->token.ul[i] = val->token.ul[i] = fd_rng_ulong(glob->rng);

if( glob->sign_fun ) {
(*glob->sign_fun)( glob->sign_arg, ping->signature.uc, ping->token.uc, 32UL );
} else {
Expand All @@ -940,14 +952,14 @@ fd_repair_send_ping(fd_repair_t * glob, fd_gossip_peer_addr_t const * addr, fd_p
/* private_key */ glob->private_key,
sha );
}

fd_bincode_encode_ctx_t ctx;
uchar buf[1024];
ctx.data = buf;
ctx.dataend = buf + sizeof(buf);
FD_TEST(0 == fd_repair_response_encode(&gmsg, &ctx));
ulong buflen = (ulong)((uchar*)ctx.data - buf);

(*glob->serv_send_fun)(buf, buflen, addr, glob->fun_arg);
}

Expand Down Expand Up @@ -1037,7 +1049,7 @@ fd_repair_recv_serv_packet(fd_repair_t * glob, uchar const * msg, ulong msglen,
FD_LOG_WARNING(("received repair request with with invalid signature"));
return 0;
}

fd_repair_lock( glob );

fd_pinged_elem_t * val = fd_pinged_table_query(glob->pinged, from, NULL);
Expand Down Expand Up @@ -1066,7 +1078,7 @@ fd_repair_recv_serv_packet(fd_repair_t * glob, uchar const * msg, ulong msglen,
(*glob->serv_send_fun)( buf, (ulong)sz + sizeof(uint), from, glob->fun_arg );
break;
}

case fd_repair_protocol_enum_highest_window_index: {
fd_repair_highest_window_index_t const * wi = &protocol.inner.highest_window_index;
long sz = (*glob->serv_get_shred_fun)( wi->slot, -1, buf, FD_SHRED_MAX_SZ, glob->fun_arg );
Expand All @@ -1075,7 +1087,7 @@ fd_repair_recv_serv_packet(fd_repair_t * glob, uchar const * msg, ulong msglen,
(*glob->serv_send_fun)( buf, (ulong)sz + sizeof(uint), from, glob->fun_arg );
break;
}

case fd_repair_protocol_enum_orphan: {
fd_repair_orphan_t const * wi = &protocol.inner.orphan;
ulong slot = wi->slot;
Expand Down
5 changes: 2 additions & 3 deletions src/flamenco/repair/fd_repair.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
#define HEADER_fd_src_flamenco_repair_fd_repair_h

#include "../types/fd_types.h"
#include "../../util/valloc/fd_valloc.h"
#include "../gossip/fd_gossip.h"
#include "../../ballet/shred/fd_shred.h"
#include "../runtime/context/fd_exec_epoch_ctx.h"
Expand All @@ -24,10 +23,10 @@
typedef struct fd_repair fd_repair_t;
ulong fd_repair_align ( void );
ulong fd_repair_footprint( void );
void * fd_repair_new ( void * shmem, ulong seed, fd_valloc_t valloc );
void * fd_repair_new ( void * shmem, ulong seed );
fd_repair_t * fd_repair_join ( void * shmap );
void * fd_repair_leave ( fd_repair_t * join );
void * fd_repair_delete ( void * shmap, fd_valloc_t valloc );
void * fd_repair_delete ( void * shmap );

typedef fd_gossip_peer_addr_t fd_repair_peer_addr_t;

Expand Down
4 changes: 2 additions & 2 deletions src/flamenco/repair/fd_repair_tool.c
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ int main(int argc, char **argv) {
ulong seed = fd_hash(0, hostname, strnlen(hostname, sizeof(hostname)));

void * shm = fd_valloc_malloc(valloc, fd_repair_align(), fd_repair_footprint());
fd_repair_t * glob = fd_repair_join(fd_repair_new(shm, seed, valloc));
fd_repair_t * glob = fd_repair_join(fd_repair_new(shm, seed ));

if ( fd_repair_set_config(glob, &config) )
return 1;
Expand All @@ -293,7 +293,7 @@ int main(int argc, char **argv) {
if ( main_loop(&argc, &argv, glob, &config, &stopflag) )
return 1;

fd_valloc_free(valloc, fd_repair_delete(fd_repair_leave(glob), valloc));
fd_valloc_free(valloc, fd_repair_delete(fd_repair_leave(glob) ));

fd_halt();

Expand Down
Loading

0 comments on commit 9f3418c

Please sign in to comment.