From 2025ae4621f6640de99cb48249b1ca3419253512 Mon Sep 17 00:00:00 2001 From: Josh Siegel Date: Wed, 24 Apr 2024 15:19:17 +0000 Subject: [PATCH] partial set of repair service updates --- src/app/fdctl/run/tiles/fd_repair.c | 92 +++++++---- src/disco/consensus/test_consensus.c | 4 +- src/disco/tvu/fd_tvu.c | 97 ++++++++++-- src/flamenco/gossip/fd_gossip.c | 37 +++-- src/flamenco/repair/fd_repair.c | 222 ++++++++++++++++++++++++++- src/flamenco/repair/fd_repair.h | 16 +- src/flamenco/repair/fd_repair_tool.c | 53 ++++--- src/flamenco/runtime/fd_blockstore.c | 42 +++++ src/flamenco/runtime/fd_blockstore.h | 16 +- 9 files changed, 491 insertions(+), 88 deletions(-) diff --git a/src/app/fdctl/run/tiles/fd_repair.c b/src/app/fdctl/run/tiles/fd_repair.c index 8682b38852..0fb3920810 100644 --- a/src/app/fdctl/run/tiles/fd_repair.c +++ b/src/app/fdctl/run/tiles/fd_repair.c @@ -1,6 +1,6 @@ /* Repair tile runs the repair protocol for a Firedancer node. */ -#define _GNU_SOURCE +#define _GNU_SOURCE #include "tiles.h" @@ -35,7 +35,7 @@ struct fd_repair_tile_ctx { fd_repair_t * repair; fd_repair_config_t repair_config; - + ulong repair_seed; fd_repair_peer_addr_t repair_intake_addr; @@ -89,7 +89,8 @@ struct fd_repair_tile_ctx { ushort net_id; /* Includes Ethernet, IP, UDP headers */ uchar buffer[ MAX_BUFFER_SIZE ]; - fd_net_hdrs_t hdr[1]; + fd_net_hdrs_t intake_hdr[1]; + fd_net_hdrs_t serve_hdr[1]; fd_stake_ci_t * stake_ci; @@ -126,6 +127,7 @@ mux_ctx( void * scratch ) { static void send_packet( fd_repair_tile_ctx_t * ctx, + int is_intake, uint dst_ip_addr, ushort dst_port, uchar const * payload, @@ -133,7 +135,7 @@ send_packet( fd_repair_tile_ctx_t * ctx, ulong tsorig ) { uchar * packet = fd_chunk_to_laddr( ctx->net_out_mem, ctx->net_out_chunk ); - fd_memcpy( packet, ctx->hdr, sizeof(fd_net_hdrs_t) ); + fd_memcpy( packet, ( is_intake ? ctx->intake_hdr : ctx->serve_hdr ), sizeof(fd_net_hdrs_t) ); fd_net_hdrs_t * hdr = (fd_net_hdrs_t *)packet; hdr->udp->net_dport = dst_port; @@ -148,9 +150,9 @@ send_packet( fd_repair_tile_ctx_t * ctx, ulong packet_sz = payload_sz + sizeof(fd_net_hdrs_t); fd_memcpy( packet+sizeof(fd_net_hdrs_t), payload, payload_sz ); hdr->udp->net_len = fd_ushort_bswap( (ushort)(payload_sz + sizeof(fd_udp_hdr_t)) ); - hdr->udp->check = fd_ip4_udp_check( *(uint *)FD_ADDRESS_OF_PACKED_MEMBER( hdr->ip4->saddr_c ), - *(uint *)FD_ADDRESS_OF_PACKED_MEMBER( hdr->ip4->daddr_c ), - (fd_udp_hdr_t const *)FD_ADDRESS_OF_PACKED_MEMBER( hdr->udp ), + hdr->udp->check = fd_ip4_udp_check( *(uint *)FD_ADDRESS_OF_PACKED_MEMBER( hdr->ip4->saddr_c ), + *(uint *)FD_ADDRESS_OF_PACKED_MEMBER( hdr->ip4->daddr_c ), + (fd_udp_hdr_t const *)FD_ADDRESS_OF_PACKED_MEMBER( hdr->udp ), packet + sizeof(fd_net_hdrs_t) ); ulong tspub = fd_frag_meta_ts_comp( fd_tickcount() ); @@ -177,7 +179,7 @@ handle_new_cluster_contact_info( fd_repair_tile_ctx_t * ctx, .addr = in_dests[i].ip4_addr, .port = fd_ushort_bswap( in_dests[i].udp_port ), }; - + fd_repair_add_active_peer( ctx->repair, &repair_peer, in_dests[i].pubkey ); } } @@ -207,7 +209,7 @@ handle_new_repair_requests( fd_repair_tile_ctx_t * ctx, break; } } - + if( rc < 0 ) { FD_LOG_DEBUG(( "failed to issue repair request" )); } @@ -228,26 +230,35 @@ handle_new_stake_weights( fd_repair_tile_ctx_t * ctx ) { } -static void -repair_send_packet( uchar const * msg, - size_t msglen, - fd_gossip_peer_addr_t const * addr, - void * arg ) { +static void +repair_send_intake_packet( uchar const * msg, + size_t msglen, + fd_gossip_peer_addr_t const * addr, + void * arg ) { + ulong tsorig = fd_frag_meta_ts_comp( fd_tickcount() ); + send_packet( arg, 1, addr->addr, addr->port, msg, msglen, tsorig ); +} + +static void +repair_send_serve_packet( uchar const * msg, + size_t msglen, + fd_gossip_peer_addr_t const * addr, + void * arg ) { ulong tsorig = fd_frag_meta_ts_comp( fd_tickcount() ); - send_packet( arg, addr->addr, addr->port, msg, msglen, tsorig ); + send_packet( arg, 0, addr->addr, addr->port, msg, msglen, tsorig ); } static void repair_shred_deliver( fd_shred_t const * shred, - ulong shred_sz, - fd_repair_peer_addr_t const * from FD_PARAM_UNUSED, - fd_pubkey_t const * id FD_PARAM_UNUSED, + ulong shred_sz, + fd_repair_peer_addr_t const * from FD_PARAM_UNUSED, + fd_pubkey_t const * id FD_PARAM_UNUSED, void * arg ) { fd_repair_tile_ctx_t * ctx = (fd_repair_tile_ctx_t *)arg; - + fd_shred_t * out_shred = fd_chunk_to_laddr( ctx->store_out_mem, ctx->store_out_chunk ); fd_memcpy( out_shred, shred, shred_sz ); - + ulong tspub = fd_frag_meta_ts_comp( fd_tickcount() ); ulong sig = 0UL; fd_mux_publish( ctx->mux, sig, ctx->store_out_chunk, shred_sz, 0UL, 0UL, tspub ); @@ -255,8 +266,8 @@ repair_shred_deliver( fd_shred_t const * shred, } static void -repair_shred_deliver_fail( fd_pubkey_t const * id FD_PARAM_UNUSED, - ulong slot, +repair_shred_deliver_fail( fd_pubkey_t const * id FD_PARAM_UNUSED, + ulong slot, uint shred_index, void * arg FD_PARAM_UNUSED, int reason ) { @@ -367,7 +378,13 @@ after_frag( void * _ctx, peer_addr.addr = FD_LOAD( uint, hdr->ip4->saddr_c ); peer_addr.port = hdr->udp->net_sport; - fd_repair_recv_packet( ctx->repair, ctx->buffer + hdr_sz, *opt_sz - hdr_sz, &peer_addr ); + ushort dport = hdr->udp->net_dport; + if( ctx->repair_intake_addr.port == dport ) + fd_repair_recv_clnt_packet( ctx->repair, ctx->buffer + hdr_sz, *opt_sz - hdr_sz, &peer_addr ); + else if( ctx->repair_serve_addr.port == dport ) + fd_repair_recv_serv_packet( ctx->repair, ctx->buffer + hdr_sz, *opt_sz - hdr_sz, &peer_addr ); + else + FD_LOG_WARNING(( "received packet for port %u, which seems wrong", (uint)fd_ushort_bswap( dport ) )); } static inline void @@ -381,6 +398,21 @@ after_credit( void * _ctx, fd_repair_continue( ctx->repair ); } +static long +repair_get_shred( ulong slot FD_PARAM_UNUSED, + int shred_idx FD_PARAM_UNUSED, + void * buf FD_PARAM_UNUSED, + ulong buf_max FD_PARAM_UNUSED, + void * arg FD_PARAM_UNUSED ) { + return -1; /* Always fail for now */ +} + +static long +repair_get_parent( ulong slot FD_PARAM_UNUSED, + void * arg FD_PARAM_UNUSED ) { + return -1; /* Always fail for now */ +} + static void privileged_init( fd_topo_t * topo, fd_topo_tile_t * tile, @@ -435,7 +467,7 @@ unprivileged_init( fd_topo_t * topo, FD_TEST( ( !!smem ) & ( !!fmem ) ); fd_scratch_attach( smem, fmem, FD_REPAIR_SCRATCH_MAX, FD_REPAIR_SCRATCH_DEPTH ); - + ctx->wksp = topo->workspaces[ topo->objs[ tile->tile_obj_id ].wksp_id ].wksp; ctx->repair_intake_addr.addr = tile->repair.ip_addr; @@ -443,7 +475,7 @@ unprivileged_init( fd_topo_t * topo, ctx->repair_serve_addr.addr = tile->repair.ip_addr; ctx->repair_serve_addr.port = fd_ushort_bswap( tile->repair.repair_serve_listen_port ); - + ctx->repair_intake_listen_port = tile->repair.repair_intake_listen_port; ctx->repair_serve_listen_port = tile->repair.repair_serve_listen_port; @@ -453,7 +485,8 @@ unprivileged_init( fd_topo_t * topo, ctx->net_id = (ushort)0; fd_memcpy( ctx->src_mac_addr, tile->repair.src_mac_addr, 6 ); - fd_net_create_packet_header_template( ctx->hdr, FD_REPAIR_MAX_PACKET_SIZE, ctx->repair_intake_addr.addr, ctx->src_mac_addr, ctx->repair_intake_listen_port ); + fd_net_create_packet_header_template( ctx->intake_hdr, FD_REPAIR_MAX_PACKET_SIZE, ctx->repair_intake_addr.addr, ctx->src_mac_addr, ctx->repair_intake_listen_port ); + fd_net_create_packet_header_template( ctx->serve_hdr, FD_REPAIR_MAX_PACKET_SIZE, ctx->repair_serve_addr.addr, ctx->src_mac_addr, ctx->repair_serve_listen_port ); fd_topo_link_t * netmux_link = &topo->links[ tile->in_link_id[ 0 ] ]; @@ -506,14 +539,17 @@ unprivileged_init( fd_topo_t * topo, ctx->repair = fd_repair_join( fd_repair_new( ctx->repair, ctx->repair_seed ) ); - FD_LOG_NOTICE(( "repair my addr - intake addr: " FD_IP4_ADDR_FMT ":%u, serve_addr: " FD_IP4_ADDR_FMT ":%u", + FD_LOG_NOTICE(( "repair my addr - intake addr: " FD_IP4_ADDR_FMT ":%u, serve_addr: " FD_IP4_ADDR_FMT ":%u", FD_IP4_ADDR_FMT_ARGS( ctx->repair_intake_addr.addr ), fd_ushort_bswap( ctx->repair_intake_addr.port ), FD_IP4_ADDR_FMT_ARGS( ctx->repair_serve_addr.addr ), fd_ushort_bswap( ctx->repair_serve_addr.port ) )); ctx->repair_config.fun_arg = ctx; ctx->repair_config.deliver_fun = repair_shred_deliver; ctx->repair_config.deliver_fail_fun = repair_shred_deliver_fail; - ctx->repair_config.send_fun = repair_send_packet; + ctx->repair_config.clnt_send_fun = repair_send_intake_packet; + ctx->repair_config.serv_send_fun = repair_send_serve_packet; + ctx->repair_config.serv_get_shred_fun = repair_get_shred; + ctx->repair_config.serv_get_parent_fun = repair_get_parent; if( fd_repair_set_config( ctx->repair, &ctx->repair_config ) ) { FD_LOG_ERR( ( "error setting repair config" ) ); diff --git a/src/disco/consensus/test_consensus.c b/src/disco/consensus/test_consensus.c index fae5a3c833..b9d15f465f 100644 --- a/src/disco/consensus/test_consensus.c +++ b/src/disco/consensus/test_consensus.c @@ -333,7 +333,7 @@ repair_thread( FD_PARAM_UNUSED int argc, char ** argv ) { for( uint i = 0; i < (uint)repair_rc; ++i ) { fd_repair_peer_addr_t from; from_sockaddr( &from, msgs[i].msg_hdr.msg_name ); - fd_repair_recv_packet( repair, bufs[i], msgs[i].msg_len, &from ); + fd_repair_recv_clnt_packet( repair, bufs[i], msgs[i].msg_len, &from ); } } return 0; @@ -703,7 +703,7 @@ main( int argc, char ** argv ) { snprintf( repair_addr, sizeof( repair_addr ), ":%u", repair_port ); FD_TEST( resolve_hostport( repair_addr, &repair_config.intake_addr ) ); repair_config.deliver_fun = repair_deliver_fun; - repair_config.send_fun = repair_send_fun; + repair_config.clnt_send_fun = repair_send_fun; repair_config.deliver_fail_fun = repair_deliver_fail_fun; repair_arg_t repair_arg = { .replay = replay, .sockfd = create_socket( &repair_config.intake_addr ) }; diff --git a/src/disco/tvu/fd_tvu.c b/src/disco/tvu/fd_tvu.c index 74ee6ddecb..fe8e70a392 100644 --- a/src/disco/tvu/fd_tvu.c +++ b/src/disco/tvu/fd_tvu.c @@ -99,7 +99,8 @@ #define FD_TVU_TILE_SLOT_DELAY 32 static int gossip_sockfd = -1; -static int repair_sockfd = -1; +static int repair_clnt_sockfd = -1; +static int repair_serv_sockfd = -1; /* FIXME don't hardcode this */ #define vote_acct_max (2000000UL) @@ -216,12 +217,12 @@ repair_from_sockaddr( fd_repair_peer_addr_t * dst, uchar const * src ) { } static void -send_packet( uchar const * data, size_t sz, fd_repair_peer_addr_t const * addr, void * arg ) { +repair_clnt_send_packet( uchar const * data, size_t sz, fd_repair_peer_addr_t const * addr, void * arg ) { // FD_LOG_HEXDUMP_NOTICE( ( "send: ", data, sz ) ); (void)arg; uchar saddr[sizeof( struct sockaddr_in )]; int saddrlen = repair_to_sockaddr( saddr, addr ); - if( sendto( repair_sockfd, + if( sendto( repair_clnt_sockfd, data, sz, MSG_DONTWAIT, @@ -231,6 +232,59 @@ send_packet( uchar const * data, size_t sz, fd_repair_peer_addr_t const * addr, } } +static void +repair_serv_send_packet( uchar const * data, size_t sz, fd_repair_peer_addr_t const * addr, void * arg ) { + // FD_LOG_HEXDUMP_NOTICE( ( "send: ", data, sz ) ); + (void)arg; + uchar saddr[sizeof( struct sockaddr_in )]; + int saddrlen = repair_to_sockaddr( saddr, addr ); + if( sendto( repair_serv_sockfd, + data, + sz, + MSG_DONTWAIT, + (const struct sockaddr *)saddr, + (socklen_t)saddrlen ) < 0 ) { + FD_LOG_WARNING( ( "sendto failed: %s", strerror( errno ) ) ); + } +} + +static long +repair_serv_get_shred( ulong slot, int shred_idx, void * buf, ulong buf_max, void * arg ) { + fd_replay_t * replay = (fd_replay_t *)arg; + fd_blockstore_t * blockstore = replay->blockstore; + fd_blockstore_start_read( blockstore ); + + if( shred_idx < 0 ) { + fd_slot_meta_t * meta = fd_blockstore_slot_meta_query( blockstore, slot ); + if( meta == NULL ) { + fd_blockstore_end_read( blockstore ); + return -1L; + } + shred_idx = (int)meta->last_index; + } + long sz = fd_blockstore_shred_query_copy_data( blockstore, slot, (uint)shred_idx, buf, buf_max ); + + fd_blockstore_end_read( blockstore ); + return sz; +} + +static long +repair_serv_get_parent( ulong slot, void * arg ) { + fd_replay_t * replay = (fd_replay_t *)arg; + fd_blockstore_t * blockstore = replay->blockstore; + fd_blockstore_start_read( blockstore ); + + fd_slot_meta_t * meta = fd_blockstore_slot_meta_query( blockstore, slot ); + if( meta == NULL ) { + fd_blockstore_end_read( blockstore ); + return -1L; + } + long res = (long)meta->parent_slot; + + fd_blockstore_end_read( blockstore ); + return res; +} + /* Convert a host:port string to a repair network address. If host is * missing, it assumes the local hostname. */ static fd_repair_peer_addr_t * @@ -330,7 +384,8 @@ struct fd_turbine_thread_args { static int fd_turbine_thread( int argc, char ** argv ); struct fd_repair_thread_args { - int repair_fd; + int repair_clnt_fd; + int repair_serv_fd; fd_replay_t * replay; }; @@ -367,8 +422,10 @@ fd_tvu_main( fd_runtime_ctx_t * runtime_ctx, fd_gossip_start( runtime_ctx->gossip ); /* initialize repair */ - int repair_fd = fd_tvu_create_socket( &runtime_ctx->repair_config.intake_addr ); - repair_sockfd = repair_fd; + int repair_clnt_fd = fd_tvu_create_socket( &runtime_ctx->repair_config.intake_addr ); + repair_clnt_sockfd = repair_clnt_fd; + int repair_serv_fd = fd_tvu_create_socket( &runtime_ctx->repair_config.service_addr ); + repair_serv_sockfd = repair_serv_fd; fd_repair_update_addr( replay->repair, &runtime_ctx->repair_config.intake_addr, &runtime_ctx->repair_config.service_addr ); if( fd_gossip_update_repair_addr( runtime_ctx->gossip, &runtime_ctx->repair_config.service_addr ) ) @@ -414,7 +471,7 @@ fd_tvu_main( fd_runtime_ctx_t * runtime_ctx, /* FIXME: replace with real tile */ struct fd_repair_thread_args reparg = - { .repair_fd = repair_fd, .replay = replay }; + { .repair_clnt_fd = repair_clnt_fd, .repair_serv_fd = repair_serv_fd, .replay = replay }; tile = fd_tile_exec_new( 2, fd_repair_thread, 0, fd_type_pun( &reparg ) ); if( tile == NULL ) FD_LOG_ERR( ( "error creating repair thread:" ) ); @@ -482,7 +539,8 @@ fd_tvu_main( fd_runtime_ctx_t * runtime_ctx, // shutdown: close( gossip_fd ); - close( repair_fd ); + close( repair_clnt_fd ); + close( repair_serv_fd ); close( tvu_fd ); return 0; } @@ -546,7 +604,8 @@ static int fd_repair_thread( int argc, char ** argv ) { (void)argc; struct fd_repair_thread_args * args = (struct fd_repair_thread_args *)argv; - int repair_fd = args->repair_fd; + int repair_clnt_fd = args->repair_clnt_fd; + int repair_serv_fd = args->repair_serv_fd; fd_repair_t * repair = args->replay->repair; fd_tvu_setup_scratch( args->replay->valloc ); @@ -555,7 +614,11 @@ fd_repair_thread( int argc, char ** argv ) { struct iovec iovecs[VLEN]; uchar bufs[VLEN][FD_ETH_PAYLOAD_MAX]; uchar sockaddrs[VLEN][sizeof( struct sockaddr_in6 )]; /* sockaddr is smaller than sockaddr_in6 */ - while( FD_LIKELY( 1 /* !fd_tile_shutdown_flag */ ) ) { + int which_fd = 0; + for (;;) { + /* Alternate handling client and service packets */ + which_fd = ~which_fd; + long now = fd_log_wallclock(); fd_repair_settime( repair, now ); @@ -564,7 +627,7 @@ fd_repair_thread( int argc, char ** argv ) { /* Read more packets */ CLEAR_MSGS; - int repair_rc = recvmmsg( repair_fd, msgs, VLEN, MSG_DONTWAIT, NULL ); + int repair_rc = recvmmsg( (which_fd ? repair_clnt_fd : repair_serv_fd) , msgs, VLEN, MSG_DONTWAIT, NULL ); if( repair_rc < 0 ) { if( errno == EINTR || errno == EWOULDBLOCK ) continue; FD_LOG_ERR( ( "recvmmsg failed: %s", strerror( errno ) ) ); @@ -574,7 +637,10 @@ fd_repair_thread( int argc, char ** argv ) { for( uint i = 0; i < (uint)repair_rc; ++i ) { fd_repair_peer_addr_t from; repair_from_sockaddr( &from, msgs[i].msg_hdr.msg_name ); - fd_repair_recv_packet( repair, bufs[i], msgs[i].msg_len, &from ); + if( which_fd ) + fd_repair_recv_clnt_packet( repair, bufs[i], msgs[i].msg_len, &from ); + else + fd_repair_recv_serv_packet( repair, bufs[i], msgs[i].msg_len, &from ); } } return 0; @@ -1170,9 +1236,12 @@ fd_tvu_main_setup( fd_runtime_ctx_t * runtime_ctx, runtime_ctx->repair_config.service_addr.port = 0; /* pick a port */ runtime_ctx->repair_config.deliver_fun = repair_deliver_fun; - runtime_ctx->repair_config.send_fun = send_packet; + runtime_ctx->repair_config.clnt_send_fun = repair_clnt_send_packet; + runtime_ctx->repair_config.serv_send_fun = repair_serv_send_packet; runtime_ctx->repair_config.deliver_fail_fun = repair_deliver_fail_fun; - runtime_ctx->repair_config.fun_arg = replay_setup_out.replay; + runtime_ctx->repair_config.serv_get_shred_fun = repair_serv_get_shred; + runtime_ctx->repair_config.serv_get_parent_fun = repair_serv_get_parent; + runtime_ctx->repair_config.fun_arg = replay_setup_out.replay; runtime_ctx->repair_config.sign_fun = NULL; runtime_ctx->repair_config.sign_arg = NULL; diff --git a/src/flamenco/gossip/fd_gossip.c b/src/flamenco/gossip/fd_gossip.c index 61dab4d106..de70496c7f 100644 --- a/src/flamenco/gossip/fd_gossip.c +++ b/src/flamenco/gossip/fd_gossip.c @@ -288,6 +288,8 @@ struct fd_gossip { fd_rng_t rng[1]; /* RNG seed */ ulong seed; + /* Total number of packeets received */ + ulong recv_pkt_cnt; /* Total number of duplicate values received */ ulong recv_dup_cnt; /* Total number of non-duplicate values received */ @@ -312,7 +314,7 @@ ulong fd_gossip_footprint( void ) { return sizeof(fd_gossip_t); } void * -fd_gossip_new ( void * shmem, ulong seed, fd_valloc_t valloc ) { +fd_gossip_new ( void * shmem, ulong seed, fd_valloc_t valloc ) { fd_memset(shmem, 0, sizeof(fd_gossip_t)); fd_gossip_t * glob = (fd_gossip_t *)shmem; glob->valloc = valloc; @@ -417,7 +419,7 @@ const char * fd_gossip_addr_str( char * dst, size_t dstlen, fd_gossip_peer_addr_ int fd_gossip_set_config( fd_gossip_t * glob, const fd_gossip_config_t * config ) { fd_gossip_lock( glob ); - + char tmp[100]; char keystr[ FD_BASE58_ENCODED_32_SZ ]; fd_base58_encode_32( config->public_key->uc, NULL, keystr ); @@ -445,6 +447,9 @@ fd_gossip_set_config( fd_gossip_t * glob, const fd_gossip_config_t * config ) { fd_base58_encode_32( glob->tmp_contact_info.id.uc, NULL, keystr ); FD_LOG_NOTICE(("configuring temporary id %s", keystr)); + + fd_base58_encode_32( glob->my_contact_info.id.uc, NULL, keystr ); + FD_LOG_NOTICE(("permanent id will be %s", keystr)); } fd_gossip_unlock( glob ); @@ -498,7 +503,7 @@ fd_gossip_update_tpu_addr( fd_gossip_t * glob, const fd_gossip_peer_addr_t * tpu fd_gossip_lock( glob ); fd_gossip_to_soladdr(&glob->my_contact_info.tpu, tpu); fd_gossip_unlock( glob ); - + return 0; } @@ -510,7 +515,7 @@ fd_gossip_update_tpu_vote_addr( fd_gossip_t * glob, const fd_gossip_peer_addr_t fd_gossip_lock( glob ); fd_gossip_to_soladdr(&glob->my_contact_info.tpu_vote, tpu_vote); fd_gossip_unlock( glob ); - + return 0; } @@ -626,7 +631,7 @@ fd_gossip_make_ping( fd_gossip_t * glob, fd_pending_event_arg_t * arg ) { } else { (*glob->sign_fun)(glob->sign_arg, ping->signature.uc, pre_image, FD_PING_PRE_IMAGE_SZ); } - + fd_gossip_send( glob, key, &gmsg ); } @@ -1198,8 +1203,8 @@ fd_gossip_recv_crds_value(fd_gossip_t * glob, const fd_gossip_peer_addr_t * from fd_hash_copy(&val->id, &info->id); } } - - fd_gossip_peer_addr_t peer_addr = { .addr = crd->data.inner.contact_info_v1.gossip.addr.inner.ip4, + + fd_gossip_peer_addr_t peer_addr = { .addr = crd->data.inner.contact_info_v1.gossip.addr.inner.ip4, .port = crd->data.inner.contact_info_v1.gossip.port }; if (glob->my_contact_info.shred_version == 0U && fd_gossip_is_allowed_entrypoint( glob, &peer_addr )) { FD_LOG_NOTICE(("using shred version %lu", (ulong)crd->data.inner.contact_info_v1.shred_version)); @@ -1293,7 +1298,7 @@ fd_gossip_push_updated_contact(fd_gossip_t * glob) { fd_value_table_remove( glob->values, &glob->last_contact_key ); } } - + glob->last_contact_time = glob->now; fd_crds_data_t crd; @@ -1497,7 +1502,7 @@ fd_gossip_refresh_push_states( fd_gossip_t * glob, fd_pending_event_arg_t * arg while (glob->push_states_cnt < FD_PUSH_LIST_MAX && failcnt < 5) { fd_active_elem_t * a = fd_gossip_random_active( glob ); if( a == NULL ) break; - + for (ulong i = 0; i < glob->push_states_cnt; ++i) { fd_push_state_t* s = glob->push_states[i]; if (fd_gossip_peer_addr_eq(&s->addr, &a->key)) @@ -1759,6 +1764,11 @@ fd_gossip_log_stats( fd_gossip_t * glob, fd_pending_event_arg_t * arg ) { ev->fun = fd_gossip_log_stats; } + if( glob->recv_pkt_cnt == 0 ) + FD_LOG_WARNING(("received no gossip packets!!")); + else + FD_LOG_NOTICE(("received %lu packets", glob->recv_pkt_cnt)); + glob->recv_pkt_cnt = 0; FD_LOG_NOTICE(("received %lu dup values and %lu new", glob->recv_dup_cnt, glob->recv_nondup_cnt)); glob->recv_dup_cnt = glob->recv_nondup_cnt = 0; FD_LOG_NOTICE(("pushed %lu values and filtered %lu", glob->push_cnt, glob->not_push_cnt)); @@ -1848,6 +1858,7 @@ fd_gossip_continue( fd_gossip_t * glob ) { int fd_gossip_recv_packet( fd_gossip_t * glob, uchar const * msg, ulong msglen, fd_gossip_peer_addr_t const * from ) { fd_gossip_lock( glob ); + glob->recv_pkt_cnt++; /* Deserialize the message */ fd_gossip_msg_t gmsg; fd_bincode_decode_ctx_t ctx; @@ -1883,8 +1894,8 @@ fd_gossip_get_shred_version( fd_gossip_t const * glob ) { return glob->my_contact_info.shred_version; } -void -fd_gossip_set_stake_weights( fd_gossip_t * gossip, +void +fd_gossip_set_stake_weights( fd_gossip_t * gossip, fd_stake_weight_t const * stake_weights, ulong stake_weights_cnt ) { if( stake_weights == NULL ) { @@ -1918,7 +1929,7 @@ fd_gossip_set_stake_weights( fd_gossip_t * gossip, void fd_gossip_set_entrypoints( fd_gossip_t * gossip, - uint entrypoints[static 16], + uint entrypoints[static 16], ulong entrypoints_cnt, ushort * ports ) { gossip->entrypoints_cnt = entrypoints_cnt; @@ -1926,7 +1937,7 @@ fd_gossip_set_entrypoints( fd_gossip_t * gossip, fd_gossip_peer_addr_t addr; addr.addr = entrypoints[i]; addr.port = fd_ushort_bswap( ports[i] ); - FD_LOG_NOTICE(( "gossip initial peer - addr: " FD_IP4_ADDR_FMT ":%u", + FD_LOG_NOTICE(( "gossip initial peer - addr: " FD_IP4_ADDR_FMT ":%u", FD_IP4_ADDR_FMT_ARGS( addr.addr ), fd_ushort_bswap( addr.port ) )); fd_gossip_add_active_peer( gossip, &addr ); gossip->entrypoints[i] = addr; diff --git a/src/flamenco/repair/fd_repair.c b/src/flamenco/repair/fd_repair.c index 0765638c88..c82ca0d722 100644 --- a/src/flamenco/repair/fd_repair.c +++ b/src/flamenco/repair/fd_repair.c @@ -23,6 +23,8 @@ #define FD_REPAIR_STICKY_MAX 32 /* Max number of validator identities in stake weights */ #define FD_STAKE_WEIGHTS_MAX (1<<14) +/* Max number of validator clients that we ping */ +#define FD_REPAIR_PINGED_MAX (1<<14) /* Test if two hash values are equal */ static int fd_hash_eq( const fd_hash_t * key1, const fd_hash_t * key2 ) { @@ -154,6 +156,22 @@ typedef struct fd_needed_elem fd_needed_elem_t; #define MAP_T fd_needed_elem_t #include "../../util/tmpl/fd_map_giant.c" +struct fd_pinged_elem { + fd_repair_peer_addr_t key; + ulong next; + fd_pubkey_t id; + fd_hash_t token; + int good; +}; +typedef struct fd_pinged_elem fd_pinged_elem_t; +#define MAP_NAME fd_pinged_table +#define MAP_KEY_T fd_repair_peer_addr_t +#define MAP_KEY_EQ fd_repair_peer_addr_eq +#define MAP_KEY_HASH fd_repair_peer_addr_hash +#define MAP_KEY_COPY fd_repair_peer_addr_copy +#define MAP_T fd_pinged_elem_t +#include "../../util/tmpl/fd_map_giant.c" + /* Global data for repair service */ struct fd_repair { /* Concurrency lock */ @@ -168,8 +186,12 @@ struct fd_repair { fd_repair_peer_addr_t intake_addr; /* Function used to deliver repair messages to the application */ fd_repair_shred_deliver_fun deliver_fun; + /* Functions used to handle repair requests */ + fd_repair_serv_get_shred_fun serv_get_shred_fun; + fd_repair_serv_get_parent_fun serv_get_parent_fun; /* Function used to send raw packets on the network */ - fd_repair_send_packet_fun send_fun; + fd_repair_send_packet_fun clnt_send_fun; /* Client requests */ + fd_repair_send_packet_fun serv_send_fun; /* Service responses */ /* Function used to send packets for signing to remote tile */ fd_repair_sign_fun sign_fun; /* Argument to fd_repair_sign_fun */ @@ -189,6 +211,8 @@ struct fd_repair { fd_repair_nonce_t oldest_nonce; fd_repair_nonce_t current_nonce; fd_repair_nonce_t next_nonce; + /* Table of validator clients that we have pinged */ + fd_pinged_elem_t * pinged; /* Last batch of sends */ long last_sends; /* Last statistics decay */ @@ -214,6 +238,7 @@ fd_repair_footprint( void ) { l = FD_LAYOUT_APPEND( l, fd_active_table_align(), fd_active_table_footprint(FD_ACTIVE_KEY_MAX) ); l = FD_LAYOUT_APPEND( l, fd_needed_table_align(), fd_needed_table_footprint(FD_NEEDED_KEY_MAX) ); l = FD_LAYOUT_APPEND( l, fd_dupdetect_table_align(), fd_dupdetect_table_footprint(FD_NEEDED_KEY_MAX) ); + l = FD_LAYOUT_APPEND( l, fd_pinged_table_align(), fd_pinged_table_footprint(FD_REPAIR_PINGED_MAX) ); l = FD_LAYOUT_APPEND( l, fd_stake_weight_align(), FD_STAKE_WEIGHTS_MAX * fd_stake_weight_footprint() ); return FD_LAYOUT_FINI(l, fd_repair_align() ); } @@ -230,6 +255,8 @@ fd_repair_new ( void * shmem, ulong seed ) { glob->needed = fd_needed_table_join(fd_needed_table_new(shm, FD_NEEDED_KEY_MAX, seed)); shm = FD_SCRATCH_ALLOC_APPEND( l, fd_dupdetect_table_align(), fd_dupdetect_table_footprint(FD_NEEDED_KEY_MAX) ); glob->dupdetect = fd_dupdetect_table_join(fd_dupdetect_table_new(shm, FD_NEEDED_KEY_MAX, seed)); + shm = FD_SCRATCH_ALLOC_APPEND( l, fd_pinged_table_align(), fd_pinged_table_footprint(FD_REPAIR_PINGED_MAX) ); + glob->pinged = fd_pinged_table_join(fd_pinged_table_new(shm, FD_REPAIR_PINGED_MAX, seed)); glob->stake_weights = FD_SCRATCH_ALLOC_APPEND( l, fd_stake_weight_align(), FD_STAKE_WEIGHTS_MAX * fd_stake_weight_footprint() ); glob->stake_weights_cnt = 0; glob->last_sends = 0; @@ -261,6 +288,7 @@ fd_repair_delete ( void * shmap ) { fd_active_table_delete( fd_active_table_leave( glob->actives ) ); fd_needed_table_delete( fd_needed_table_leave( glob->needed ) ); fd_dupdetect_table_delete( fd_dupdetect_table_leave( glob->dupdetect ) ); + fd_pinged_table_delete( fd_pinged_table_leave( glob->pinged ) ); return glob; } @@ -299,7 +327,10 @@ fd_repair_set_config( fd_repair_t * glob, const fd_repair_config_t * config ) { fd_repair_peer_addr_copy(&glob->intake_addr, &config->intake_addr); fd_repair_peer_addr_copy(&glob->service_addr, &config->service_addr); glob->deliver_fun = config->deliver_fun; - glob->send_fun = config->send_fun; + glob->serv_get_shred_fun = config->serv_get_shred_fun; + glob->serv_get_parent_fun = config->serv_get_parent_fun; + glob->clnt_send_fun = config->clnt_send_fun; + glob->serv_send_fun = config->serv_send_fun; glob->fun_arg = config->fun_arg; glob->sign_fun = config->sign_fun; glob->sign_arg = config->sign_arg; @@ -384,7 +415,7 @@ fd_repair_sign_and_send( fd_repair_t * glob, fd_repair_protocol_t * protocol, fd } fd_memcpy(buf + 4U, &sig, 64U); - (*glob->send_fun)(buf, buflen, addr, glob->fun_arg); + (*glob->clnt_send_fun)(buf, buflen, addr, glob->fun_arg); } static void @@ -558,11 +589,11 @@ fd_repair_recv_ping(fd_repair_t * glob, fd_gossip_ping_t const * ping, fd_gossip FD_TEST(0 == fd_repair_protocol_encode(&protocol, &ctx)); ulong buflen = (ulong)((uchar*)ctx.data - buf); - (*glob->send_fun)(buf, buflen, from, glob->fun_arg); + (*glob->clnt_send_fun)(buf, buflen, from, glob->fun_arg); } int -fd_repair_recv_packet(fd_repair_t * glob, uchar const * msg, ulong msglen, fd_gossip_peer_addr_t const * from) { +fd_repair_recv_clnt_packet(fd_repair_t * glob, uchar const * msg, ulong msglen, fd_gossip_peer_addr_t const * from) { fd_repair_lock( glob ); FD_SCRATCH_SCOPE_BEGIN { while (1) { @@ -900,3 +931,184 @@ fd_repair_set_stake_weights( fd_repair_t * repair, fd_repair_unlock( repair ); } + +static void +fd_repair_send_ping(fd_repair_t * glob, fd_gossip_peer_addr_t const * addr, fd_pinged_elem_t * val) { + fd_repair_response_t gmsg; + fd_repair_response_new_disc( &gmsg, fd_repair_response_enum_ping ); + fd_gossip_ping_t * ping = &gmsg.inner.ping; + fd_hash_copy( &ping->from, glob->public_key ); + for ( ulong i = 0; i < FD_HASH_FOOTPRINT / sizeof(ulong); ++i ) + ping->token.ul[i] = val->token.ul[i] = fd_rng_ulong(glob->rng); + + if( glob->sign_fun ) { + (*glob->sign_fun)( glob->sign_arg, ping->signature.uc, ping->token.uc, 32UL ); + } else { + fd_sha512_t sha[1]; + fd_ed25519_sign( /* sig */ ping->signature.uc, + /* msg */ ping->token.uc, + /* sz */ 32UL, + /* public_key */ glob->public_key->key, + /* private_key */ glob->private_key, + sha ); + } + + fd_bincode_encode_ctx_t ctx; + uchar buf[1024]; + ctx.data = buf; + ctx.dataend = buf + sizeof(buf); + FD_TEST(0 == fd_repair_response_encode(&gmsg, &ctx)); + ulong buflen = (ulong)((uchar*)ctx.data - buf); + + (*glob->serv_send_fun)(buf, buflen, addr, glob->fun_arg); +} + +static void +fd_repair_recv_pong(fd_repair_t * glob, fd_gossip_ping_t const * pong, fd_gossip_peer_addr_t const * from) { + fd_pinged_elem_t * val = fd_pinged_table_query(glob->pinged, from, NULL); + if( val == NULL || !fd_hash_eq( &val->id, &pong->from ) ) + return; + + /* Verify response hash token */ + fd_sha256_t sha[1]; + fd_sha256_init( sha ); + fd_sha256_append( sha, "SOLANA_PING_PONG", 16UL ); + fd_sha256_append( sha, val->token.uc, 32UL ); + fd_hash_t golden; + fd_sha256_fini( sha, golden.uc ); + + fd_sha512_t sha2[1]; + if( fd_ed25519_verify( /* msg */ golden.uc, + /* sz */ 32U, + /* sig */ pong->signature.uc, + /* public_key */ pong->from.uc, + sha2 )) { + return; + } + + val->good = 1; +} + +int +fd_repair_recv_serv_packet(fd_repair_t * glob, uchar const * msg, ulong msglen, fd_gossip_peer_addr_t const * from) { + FD_SCRATCH_SCOPE_BEGIN { + fd_repair_protocol_t protocol; + fd_bincode_decode_ctx_t ctx; + ctx.data = msg; + ctx.dataend = msg + msglen; + ctx.valloc = fd_scratch_virtual(); + if( fd_repair_protocol_decode(&protocol, &ctx) || + ctx.data != ctx.dataend ) { + FD_LOG_WARNING(( "failed to decode repair request packet" )); + return 0; + } + + fd_repair_request_header_t * header; + switch (protocol.discriminant) { + case fd_repair_protocol_enum_pong: + fd_repair_lock( glob ); + fd_repair_recv_pong( glob, &protocol.inner.pong, from ); + fd_repair_unlock( glob ); + return 0; + case fd_repair_protocol_enum_window_index: { + fd_repair_window_index_t * wi = &protocol.inner.window_index; + header = &wi->header; + break; + } + case fd_repair_protocol_enum_highest_window_index: { + fd_repair_highest_window_index_t * wi = &protocol.inner.highest_window_index; + header = &wi->header; + break; + } + case fd_repair_protocol_enum_orphan: { + fd_repair_orphan_t * wi = &protocol.inner.orphan; + header = &wi->header; + break; + } + + default: + FD_LOG_WARNING(( "received repair request of unknown type: %d", (int)protocol.discriminant )); + return 0; + } + + if( !fd_hash_eq( &header->recipient, glob->public_key ) ) { + FD_LOG_WARNING(( "received repair request with wrong recipient, %32J instead of %32J", header->recipient.uc, glob->public_key )); + return 0; + } + + /* Verify the signature */ + fd_sha512_t sha2[1]; + fd_signature_t sig; + fd_memcpy( &sig, header->signature.uc, sizeof(sig) ); + fd_memcpy( (uchar *)msg + 64U, msg, 4U ); + if( fd_ed25519_verify( /* msg */ msg + 64U, + /* sz */ msglen - 64U, + /* sig */ sig.uc, + /* public_key */ header->sender.uc, + sha2 )) { + FD_LOG_WARNING(("received repair request with with invalid signature")); + return 0; + } + + fd_repair_lock( glob ); + + fd_pinged_elem_t * val = fd_pinged_table_query(glob->pinged, from, NULL); + if( val == NULL || !val->good || !fd_hash_eq( &val->id, &header->sender ) ) { + /* Need to ping this client */ + if( val == NULL ) { + if( fd_pinged_table_is_full(glob->pinged) ) { + FD_LOG_WARNING(("pinged table is full")); + fd_repair_unlock( glob ); + return 0; + } + val = fd_pinged_table_insert(glob->pinged, from); + } + fd_hash_copy( &val->id, &header->sender ); + val->good = 0; + fd_repair_send_ping( glob, from, val ); + + } else { + uchar buf[FD_SHRED_MAX_SZ + sizeof(uint)]; + switch (protocol.discriminant) { + case fd_repair_protocol_enum_window_index: { + fd_repair_window_index_t const * wi = &protocol.inner.window_index; + long sz = (*glob->serv_get_shred_fun)( wi->slot, (int)wi->shred_index, buf, FD_SHRED_MAX_SZ, glob->fun_arg ); + if( sz < 0 ) break; + *(uint *)(buf + sz) = wi->header.nonce; + (*glob->serv_send_fun)( buf, (ulong)sz + sizeof(uint), from, glob->fun_arg ); + break; + } + + case fd_repair_protocol_enum_highest_window_index: { + fd_repair_highest_window_index_t const * wi = &protocol.inner.highest_window_index; + long sz = (*glob->serv_get_shred_fun)( wi->slot, -1, buf, FD_SHRED_MAX_SZ, glob->fun_arg ); + if( sz < 0 ) break; + *(uint *)(buf + sz) = wi->header.nonce; + (*glob->serv_send_fun)( buf, (ulong)sz + sizeof(uint), from, glob->fun_arg ); + break; + } + + case fd_repair_protocol_enum_orphan: { + fd_repair_orphan_t const * wi = &protocol.inner.orphan; + ulong slot = wi->slot; + for(unsigned i = 0; i < 10; ++i) { + long r = (*glob->serv_get_parent_fun)( slot, glob->fun_arg ); + if( r < 0 ) break; + slot = (ulong)r; + long sz = (*glob->serv_get_shred_fun)( slot, -1, buf, FD_SHRED_MAX_SZ, glob->fun_arg ); + if( sz < 0 ) continue; + *(uint *)(buf + sz) = wi->header.nonce; + (*glob->serv_send_fun)( buf, (ulong)sz + sizeof(uint), from, glob->fun_arg ); + } + break; + } + + default: + break; + } + } + + fd_repair_unlock( glob ); + } FD_SCRATCH_SCOPE_END; + return 0; +} diff --git a/src/flamenco/repair/fd_repair.h b/src/flamenco/repair/fd_repair.h index 4c9e3c0494..54f32bba9e 100644 --- a/src/flamenco/repair/fd_repair.h +++ b/src/flamenco/repair/fd_repair.h @@ -33,6 +33,10 @@ typedef fd_gossip_peer_addr_t fd_repair_peer_addr_t; /* Callback when a new shred is received */ typedef void (*fd_repair_shred_deliver_fun)( fd_shred_t const * shred, ulong shred_len, fd_repair_peer_addr_t const * from, fd_pubkey_t const * id, void * arg ); +/* Callbacks when a repair is requested. shred_idx==-1 means the last index. */ +typedef long (*fd_repair_serv_get_shred_fun)( ulong slot, int shred_idx, void * buf, ulong buf_max, void * arg ); +typedef long (*fd_repair_serv_get_parent_fun)( ulong slot, void * arg ); + /* Callback for sending a packet. addr is the address of the destination. */ typedef void (*fd_repair_send_packet_fun)( uchar const * msg, size_t msglen, fd_repair_peer_addr_t const * addr, void * arg ); @@ -48,7 +52,10 @@ struct fd_repair_config { fd_repair_peer_addr_t service_addr; fd_repair_peer_addr_t intake_addr; fd_repair_shred_deliver_fun deliver_fun; - fd_repair_send_packet_fun send_fun; + fd_repair_serv_get_shred_fun serv_get_shred_fun; + fd_repair_serv_get_parent_fun serv_get_parent_fun; + fd_repair_send_packet_fun clnt_send_fun; /* sending client requests */ + fd_repair_send_packet_fun serv_send_fun; /* sending service responses */ fd_repair_shred_deliver_fail_fun deliver_fail_fun; void * fun_arg; fd_repair_sign_fun sign_fun; @@ -78,8 +85,11 @@ int fd_repair_start( fd_repair_t * glob ); * called inside the main spin loop. calling settime first is recommended. */ int fd_repair_continue( fd_repair_t * glob ); -/* Pass a raw repair packet into the protocol. addr is the address of the sender */ -int fd_repair_recv_packet( fd_repair_t * glob, uchar const * msg, ulong msglen, fd_repair_peer_addr_t const * addr ); +/* Pass a raw client response packet into the protocol. addr is the address of the sender */ +int fd_repair_recv_clnt_packet( fd_repair_t * glob, uchar const * msg, ulong msglen, fd_repair_peer_addr_t const * addr ); + +/* Pass a raw service request packet into the protocol. addr is the address of the sender */ +int fd_repair_recv_serv_packet( fd_repair_t * glob, uchar const * msg, ulong msglen, fd_repair_peer_addr_t const * addr ); /* Determine if the request queue is full */ int fd_repair_is_full( fd_repair_t * glob ); diff --git a/src/flamenco/repair/fd_repair_tool.c b/src/flamenco/repair/fd_repair_tool.c index f8a0ec23b0..39115ec19a 100644 --- a/src/flamenco/repair/fd_repair_tool.c +++ b/src/flamenco/repair/fd_repair_tool.c @@ -147,24 +147,12 @@ main_loop( int * argc, char *** argv, fd_repair_t * glob, fd_repair_config_t * c fd_repair_peer_addr_t peeraddr; if ( fd_repair_add_active_peer(glob, resolve_hostport(addr_cstr, &peeraddr), &id) ) return -1; + fd_repair_add_sticky(glob, &id); + fd_repair_set_permanent(glob, &id); char const * slot_cstr = fd_env_strip_cmdline_cstr ( argc, argv, "--slot", NULL, NULL ); if ( slot_cstr == NULL ) FD_LOG_ERR(("--slot command line argument required")); - do { - ulong slot = strtoul(slot_cstr, (char **)&slot_cstr, 10); - if ( *slot_cstr != ':' ) - FD_LOG_ERR(("--slot takes :,:,:...")); - ++slot_cstr; - ulong idx = strtoul(slot_cstr, (char **)&slot_cstr, 10); - if ( fd_repair_need_highest_window_index(glob, slot, (uint)idx) ) - return -1; - if ( *slot_cstr == '\0' ) - break; - if ( *slot_cstr != ',' ) - FD_LOG_ERR(("--slot takes :,:,:...")); - ++slot_cstr; - } while (1); #define VLEN 32U struct mmsghdr msgs[VLEN]; @@ -172,8 +160,38 @@ main_loop( int * argc, char *** argv, fd_repair_t * glob, fd_repair_config_t * c uchar bufs[VLEN][FD_ETH_PAYLOAD_MAX]; uchar sockaddrs[VLEN][sizeof(struct sockaddr_in6)]; /* sockaddr is smaller than sockaddr_in6 */ + long lastreq = 0; while ( !*stopflag ) { - fd_repair_settime(glob, fd_log_wallclock()); + long now = fd_log_wallclock(); + + if (now - lastreq > (long)3e9) { + lastreq = now; + char* cstr = (char*)slot_cstr; + do { + ulong slot = strtoul(cstr, &cstr, 10); + if ( *cstr != ':' ) + FD_LOG_ERR(("--slot takes :,:,:...")); + ++cstr; + long idx = strtol(cstr, &cstr, 10); + if( idx == -1 ) { + if( fd_repair_need_highest_window_index(glob, slot, 0U) ) + return -1; + } else if( idx == -2 ) { + if( fd_repair_need_orphan(glob, slot) ) + return -1; + } else if( idx >= 0 ) { + if( fd_repair_need_window_index(glob, slot, (uint)idx) != 1 ) + return -1; + } + if ( *cstr == '\0' ) + break; + if ( *cstr != ',' ) + FD_LOG_ERR(("--slot takes :,:,:...")); + ++cstr; + } while (1); + } + + fd_repair_settime(glob, now); fd_repair_continue(glob); fd_memset(msgs, 0, sizeof(msgs)); @@ -201,7 +219,7 @@ main_loop( int * argc, char *** argv, fd_repair_t * glob, fd_repair_config_t * c fd_repair_peer_addr_t from; repair_from_sockaddr( &from, msgs[i].msg_hdr.msg_name ); FD_LOG_HEXDUMP_NOTICE(("recv: ", bufs[i], msgs[i].msg_len)); - fd_repair_recv_packet(glob, bufs[i], msgs[i].msg_len, &from); + fd_repair_recv_clnt_packet(glob, bufs[i], msgs[i].msg_len, &from); } } @@ -258,8 +276,7 @@ int main(int argc, char **argv) { FD_TEST( resolve_hostport(my_addr, &config.intake_addr) ); config.deliver_fun = recv_shred; - config.deliver_fail_fun = deliver_fail_fun; - config.send_fun = send_packet; + config.clnt_send_fun = send_packet; config.deliver_fail_fun = deliver_fail_fun; ulong seed = fd_hash(0, hostname, strnlen(hostname, sizeof(hostname))); diff --git a/src/flamenco/runtime/fd_blockstore.c b/src/flamenco/runtime/fd_blockstore.c index bd91ce1829..392fa488cb 100644 --- a/src/flamenco/runtime/fd_blockstore.c +++ b/src/flamenco/runtime/fd_blockstore.c @@ -600,6 +600,11 @@ fd_blockstore_deshred( fd_blockstore_t * blockstore, ulong slot ) { FD_TEST( rc >= 0 ); shreds_laddr[i].hdr = *shred; + ulong merkle_sz = shreds_laddr[i].merkle_sz = fd_shred_merkle_sz( shred->variant ); + FD_TEST( merkle_sz <= sizeof(shreds_laddr[i].merkle) ); + if( merkle_sz ) { + fd_memcpy( shreds_laddr[i].merkle, (uchar const*)shred + fd_shred_merkle_off( shred ), merkle_sz ); + } shreds_laddr[i].off = off; FD_TEST( !memcmp( &shreds_laddr[i].hdr, shred, sizeof( fd_shred_t ) ) ); @@ -815,6 +820,43 @@ fd_blockstore_shred_query( fd_blockstore_t * blockstore, ulong slot, uint shred_ return &query->hdr; } +long +fd_blockstore_shred_query_copy_data( fd_blockstore_t * blockstore, ulong slot, uint shred_idx, void * buf, ulong buf_max ) { + if( buf_max < FD_SHRED_MAX_SZ ) return -1; + + fd_blockstore_shred_t * shred_pool = fd_blockstore_shred_pool( blockstore ); + fd_blockstore_shred_map_t * shred_map = fd_blockstore_shred_map( blockstore ); + fd_shred_key_t key = { .slot = slot, .idx = shred_idx }; + fd_blockstore_shred_t * shred = + fd_blockstore_shred_map_ele_query( shred_map, &key, NULL, shred_pool ); + if( shred ) { + ulong sz = fd_shred_sz( &shred->hdr ); + fd_memcpy( buf, shred->raw, sz); + return (long)sz; + } + + fd_blockstore_slot_map_t * blk = + fd_blockstore_slot_map_query( fd_blockstore_slot_map( blockstore ), slot, NULL ); + if( FD_UNLIKELY( !blk || blk->block.data_gaddr == 0 ) ) return -1; + if( shred_idx > blk->slot_meta.last_index ) return -1; + fd_block_shred_t * shreds = fd_wksp_laddr_fast( fd_blockstore_wksp( blockstore ), blk->block.shreds_gaddr ); + ulong sz = fd_shred_payload_sz( &shreds[shred_idx].hdr ); + if( FD_SHRED_DATA_HEADER_SZ + sz > buf_max ) return -1L; + fd_memcpy( buf, &shreds[shred_idx].hdr, FD_SHRED_DATA_HEADER_SZ ); + fd_memcpy( (uchar*)buf + FD_SHRED_DATA_HEADER_SZ, fd_blockstore_block_data_laddr( blockstore, &blk->block ) + shreds[shred_idx].off, sz ); + ulong tot_sz = FD_SHRED_DATA_HEADER_SZ + sz; + ulong merkle_sz = shreds[shred_idx].merkle_sz; + if( merkle_sz ) { + if( tot_sz + merkle_sz > buf_max ) return -1; + fd_memcpy( (uchar*)buf + tot_sz, shreds[shred_idx].merkle, merkle_sz ); + tot_sz += merkle_sz; + } + if( tot_sz >= FD_SHRED_MIN_SZ ) return (long)tot_sz; + /* Zero pad */ + memset( (uchar*)buf + tot_sz, 0, FD_SHRED_MIN_SZ - tot_sz ); + return (long)FD_SHRED_MIN_SZ; +} + fd_block_t * fd_blockstore_block_query( fd_blockstore_t * blockstore, ulong slot ) { fd_blockstore_slot_map_t * query = diff --git a/src/flamenco/runtime/fd_blockstore.h b/src/flamenco/runtime/fd_blockstore.h index e15485f13e..0d95ec523e 100644 --- a/src/flamenco/runtime/fd_blockstore.h +++ b/src/flamenco/runtime/fd_blockstore.h @@ -112,6 +112,8 @@ typedef struct fd_blockstore_shred fd_blockstore_shred_t; /* A shred that has been deshredded and is part of a block (beginning at off) */ struct fd_block_shred { fd_shred_t hdr; /* ptr to the data shred header */ + uchar merkle[FD_SHRED_MERKLE_ROOT_SZ + FD_SHRED_MERKLE_NODE_SZ*9U /* FD_FEC_SET_MAX_BMTREE_DEPTH */]; + ulong merkle_sz; ulong off; /* offset to the payload relative to the start of the block's data region */ }; typedef struct fd_block_shred fd_block_shred_t; @@ -424,11 +426,7 @@ fd_blockstore_shred_insert( fd_blockstore_t * blockstore, fd_shred_t const * shr /* Query blockstore for shred at slot, shred_idx. Returns a pointer to the shred or NULL if not in * blockstore. The returned pointer lifetime is until the shred is removed. Check return value for - * error info. - * - * If the block corresponding to that shred is incomplete, the returned pointer's lifetime is only - * as long as the slot remains incomplete. Otherwise, the returned pointer's lifetime is as long as - * the slot remains in the blockstore. + * error info. This API only works for shreds from incomplete blocks. * * Callers should hold the read lock during the entirety of its read to ensure the pointer remains * valid. @@ -436,6 +434,14 @@ fd_blockstore_shred_insert( fd_blockstore_t * blockstore, fd_shred_t const * shr fd_shred_t * fd_blockstore_shred_query( fd_blockstore_t * blockstore, ulong slot, uint shred_idx ); +/* Query blockstore for shred at slot, shred_idx. Copies the shred + * data to the given buffer and returns the data size. Returns -1 on failure. + * + * Callers should hold the read lock during the entirety of this call. + */ +long +fd_blockstore_shred_query_copy_data( fd_blockstore_t * blockstore, ulong slot, uint shred_idx, void * buf, ulong buf_max ); + /* Query blockstore for block at slot. Returns a pointer to the block or NULL if not in * blockstore. The returned pointer lifetime is until the block is removed. Check return value for * error info. */