Skip to content

Commit

Permalink
partial set of repair service updates
Browse files Browse the repository at this point in the history
  • Loading branch information
jumpsiegel authored and asiegel-jt committed May 13, 2024
1 parent 74a9973 commit 43c9ccd
Show file tree
Hide file tree
Showing 9 changed files with 491 additions and 88 deletions.
92 changes: 64 additions & 28 deletions src/app/fdctl/run/tiles/fd_repair.c
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/* Repair tile runs the repair protocol for a Firedancer node. */

#define _GNU_SOURCE
#define _GNU_SOURCE

#include "tiles.h"

Expand Down Expand Up @@ -35,7 +35,7 @@
struct fd_repair_tile_ctx {
fd_repair_t * repair;
fd_repair_config_t repair_config;

ulong repair_seed;

fd_repair_peer_addr_t repair_intake_addr;
Expand Down Expand Up @@ -89,7 +89,8 @@ struct fd_repair_tile_ctx {
ushort net_id;
/* Includes Ethernet, IP, UDP headers */
uchar buffer[ MAX_BUFFER_SIZE ];
fd_net_hdrs_t hdr[1];
fd_net_hdrs_t intake_hdr[1];
fd_net_hdrs_t serve_hdr[1];

fd_stake_ci_t * stake_ci;

Expand Down Expand Up @@ -126,14 +127,15 @@ mux_ctx( void * scratch ) {

static void
send_packet( fd_repair_tile_ctx_t * ctx,
int is_intake,
uint dst_ip_addr,
ushort dst_port,
uchar const * payload,
ulong payload_sz,
ulong tsorig ) {
uchar * packet = fd_chunk_to_laddr( ctx->net_out_mem, ctx->net_out_chunk );

fd_memcpy( packet, ctx->hdr, sizeof(fd_net_hdrs_t) );
fd_memcpy( packet, ( is_intake ? ctx->intake_hdr : ctx->serve_hdr ), sizeof(fd_net_hdrs_t) );
fd_net_hdrs_t * hdr = (fd_net_hdrs_t *)packet;

hdr->udp->net_dport = dst_port;
Expand All @@ -148,9 +150,9 @@ send_packet( fd_repair_tile_ctx_t * ctx,
ulong packet_sz = payload_sz + sizeof(fd_net_hdrs_t);
fd_memcpy( packet+sizeof(fd_net_hdrs_t), payload, payload_sz );
hdr->udp->net_len = fd_ushort_bswap( (ushort)(payload_sz + sizeof(fd_udp_hdr_t)) );
hdr->udp->check = fd_ip4_udp_check( *(uint *)FD_ADDRESS_OF_PACKED_MEMBER( hdr->ip4->saddr_c ),
*(uint *)FD_ADDRESS_OF_PACKED_MEMBER( hdr->ip4->daddr_c ),
(fd_udp_hdr_t const *)FD_ADDRESS_OF_PACKED_MEMBER( hdr->udp ),
hdr->udp->check = fd_ip4_udp_check( *(uint *)FD_ADDRESS_OF_PACKED_MEMBER( hdr->ip4->saddr_c ),
*(uint *)FD_ADDRESS_OF_PACKED_MEMBER( hdr->ip4->daddr_c ),
(fd_udp_hdr_t const *)FD_ADDRESS_OF_PACKED_MEMBER( hdr->udp ),
packet + sizeof(fd_net_hdrs_t) );

ulong tspub = fd_frag_meta_ts_comp( fd_tickcount() );
Expand All @@ -177,7 +179,7 @@ handle_new_cluster_contact_info( fd_repair_tile_ctx_t * ctx,
.addr = in_dests[i].ip4_addr,
.port = fd_ushort_bswap( in_dests[i].udp_port ),
};

fd_repair_add_active_peer( ctx->repair, &repair_peer, in_dests[i].pubkey );
}
}
Expand Down Expand Up @@ -207,7 +209,7 @@ handle_new_repair_requests( fd_repair_tile_ctx_t * ctx,
break;
}
}

if( rc < 0 ) {
FD_LOG_DEBUG(( "failed to issue repair request" ));
}
Expand All @@ -228,35 +230,44 @@ handle_new_stake_weights( fd_repair_tile_ctx_t * ctx ) {
}


static void
repair_send_packet( uchar const * msg,
size_t msglen,
fd_gossip_peer_addr_t const * addr,
void * arg ) {
static void
repair_send_intake_packet( uchar const * msg,
size_t msglen,
fd_gossip_peer_addr_t const * addr,
void * arg ) {
ulong tsorig = fd_frag_meta_ts_comp( fd_tickcount() );
send_packet( arg, 1, addr->addr, addr->port, msg, msglen, tsorig );
}

static void
repair_send_serve_packet( uchar const * msg,
size_t msglen,
fd_gossip_peer_addr_t const * addr,
void * arg ) {
ulong tsorig = fd_frag_meta_ts_comp( fd_tickcount() );
send_packet( arg, addr->addr, addr->port, msg, msglen, tsorig );
send_packet( arg, 0, addr->addr, addr->port, msg, msglen, tsorig );
}

static void
repair_shred_deliver( fd_shred_t const * shred,
ulong shred_sz,
fd_repair_peer_addr_t const * from FD_PARAM_UNUSED,
fd_pubkey_t const * id FD_PARAM_UNUSED,
ulong shred_sz,
fd_repair_peer_addr_t const * from FD_PARAM_UNUSED,
fd_pubkey_t const * id FD_PARAM_UNUSED,
void * arg ) {
fd_repair_tile_ctx_t * ctx = (fd_repair_tile_ctx_t *)arg;

fd_shred_t * out_shred = fd_chunk_to_laddr( ctx->store_out_mem, ctx->store_out_chunk );
fd_memcpy( out_shred, shred, shred_sz );

ulong tspub = fd_frag_meta_ts_comp( fd_tickcount() );
ulong sig = 0UL;
fd_mux_publish( ctx->mux, sig, ctx->store_out_chunk, shred_sz, 0UL, 0UL, tspub );
ctx->store_out_chunk = fd_dcache_compact_next( ctx->store_out_chunk, shred_sz, ctx->store_out_chunk0, ctx->store_out_wmark );
}

static void
repair_shred_deliver_fail( fd_pubkey_t const * id FD_PARAM_UNUSED,
ulong slot,
repair_shred_deliver_fail( fd_pubkey_t const * id FD_PARAM_UNUSED,
ulong slot,
uint shred_index,
void * arg FD_PARAM_UNUSED,
int reason ) {
Expand Down Expand Up @@ -367,7 +378,13 @@ after_frag( void * _ctx,
peer_addr.addr = FD_LOAD( uint, hdr->ip4->saddr_c );
peer_addr.port = hdr->udp->net_sport;

fd_repair_recv_packet( ctx->repair, ctx->buffer + hdr_sz, *opt_sz - hdr_sz, &peer_addr );
ushort dport = hdr->udp->net_dport;
if( ctx->repair_intake_addr.port == dport )
fd_repair_recv_clnt_packet( ctx->repair, ctx->buffer + hdr_sz, *opt_sz - hdr_sz, &peer_addr );
else if( ctx->repair_serve_addr.port == dport )
fd_repair_recv_serv_packet( ctx->repair, ctx->buffer + hdr_sz, *opt_sz - hdr_sz, &peer_addr );
else
FD_LOG_WARNING(( "received packet for port %u, which seems wrong", (uint)fd_ushort_bswap( dport ) ));
}

static inline void
Expand All @@ -381,6 +398,21 @@ after_credit( void * _ctx,
fd_repair_continue( ctx->repair );
}

static long
repair_get_shred( ulong slot FD_PARAM_UNUSED,
int shred_idx FD_PARAM_UNUSED,
void * buf FD_PARAM_UNUSED,
ulong buf_max FD_PARAM_UNUSED,
void * arg FD_PARAM_UNUSED ) {
return -1; /* Always fail for now */
}

static long
repair_get_parent( ulong slot FD_PARAM_UNUSED,
void * arg FD_PARAM_UNUSED ) {
return -1; /* Always fail for now */
}

static void
privileged_init( fd_topo_t * topo,
fd_topo_tile_t * tile,
Expand Down Expand Up @@ -435,15 +467,15 @@ unprivileged_init( fd_topo_t * topo,

FD_TEST( ( !!smem ) & ( !!fmem ) );
fd_scratch_attach( smem, fmem, FD_REPAIR_SCRATCH_MAX, FD_REPAIR_SCRATCH_DEPTH );

ctx->wksp = topo->workspaces[ topo->objs[ tile->tile_obj_id ].wksp_id ].wksp;

ctx->repair_intake_addr.addr = tile->repair.ip_addr;
ctx->repair_intake_addr.port = fd_ushort_bswap( tile->repair.repair_intake_listen_port );

ctx->repair_serve_addr.addr = tile->repair.ip_addr;
ctx->repair_serve_addr.port = fd_ushort_bswap( tile->repair.repair_serve_listen_port );

ctx->repair_intake_listen_port = tile->repair.repair_intake_listen_port;
ctx->repair_serve_listen_port = tile->repair.repair_serve_listen_port;

Expand All @@ -453,7 +485,8 @@ unprivileged_init( fd_topo_t * topo,
ctx->net_id = (ushort)0;
fd_memcpy( ctx->src_mac_addr, tile->repair.src_mac_addr, 6 );

fd_net_create_packet_header_template( ctx->hdr, FD_REPAIR_MAX_PACKET_SIZE, ctx->repair_intake_addr.addr, ctx->src_mac_addr, ctx->repair_intake_listen_port );
fd_net_create_packet_header_template( ctx->intake_hdr, FD_REPAIR_MAX_PACKET_SIZE, ctx->repair_intake_addr.addr, ctx->src_mac_addr, ctx->repair_intake_listen_port );
fd_net_create_packet_header_template( ctx->serve_hdr, FD_REPAIR_MAX_PACKET_SIZE, ctx->repair_serve_addr.addr, ctx->src_mac_addr, ctx->repair_serve_listen_port );

fd_topo_link_t * netmux_link = &topo->links[ tile->in_link_id[ 0 ] ];

Expand Down Expand Up @@ -506,14 +539,17 @@ unprivileged_init( fd_topo_t * topo,

ctx->repair = fd_repair_join( fd_repair_new( ctx->repair, ctx->repair_seed ) );

FD_LOG_NOTICE(( "repair my addr - intake addr: " FD_IP4_ADDR_FMT ":%u, serve_addr: " FD_IP4_ADDR_FMT ":%u",
FD_LOG_NOTICE(( "repair my addr - intake addr: " FD_IP4_ADDR_FMT ":%u, serve_addr: " FD_IP4_ADDR_FMT ":%u",
FD_IP4_ADDR_FMT_ARGS( ctx->repair_intake_addr.addr ), fd_ushort_bswap( ctx->repair_intake_addr.port ),
FD_IP4_ADDR_FMT_ARGS( ctx->repair_serve_addr.addr ), fd_ushort_bswap( ctx->repair_serve_addr.port ) ));

ctx->repair_config.fun_arg = ctx;
ctx->repair_config.deliver_fun = repair_shred_deliver;
ctx->repair_config.deliver_fail_fun = repair_shred_deliver_fail;
ctx->repair_config.send_fun = repair_send_packet;
ctx->repair_config.clnt_send_fun = repair_send_intake_packet;
ctx->repair_config.serv_send_fun = repair_send_serve_packet;
ctx->repair_config.serv_get_shred_fun = repair_get_shred;
ctx->repair_config.serv_get_parent_fun = repair_get_parent;

if( fd_repair_set_config( ctx->repair, &ctx->repair_config ) ) {
FD_LOG_ERR( ( "error setting repair config" ) );
Expand Down
4 changes: 2 additions & 2 deletions src/disco/consensus/test_consensus.c
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ repair_thread( FD_PARAM_UNUSED int argc, char ** argv ) {
for( uint i = 0; i < (uint)repair_rc; ++i ) {
fd_repair_peer_addr_t from;
from_sockaddr( &from, msgs[i].msg_hdr.msg_name );
fd_repair_recv_packet( repair, bufs[i], msgs[i].msg_len, &from );
fd_repair_recv_clnt_packet( repair, bufs[i], msgs[i].msg_len, &from );
}
}
return 0;
Expand Down Expand Up @@ -703,7 +703,7 @@ main( int argc, char ** argv ) {
snprintf( repair_addr, sizeof( repair_addr ), ":%u", repair_port );
FD_TEST( resolve_hostport( repair_addr, &repair_config.intake_addr ) );
repair_config.deliver_fun = repair_deliver_fun;
repair_config.send_fun = repair_send_fun;
repair_config.clnt_send_fun = repair_send_fun;
repair_config.deliver_fail_fun = repair_deliver_fail_fun;
repair_arg_t repair_arg = { .replay = replay,
.sockfd = create_socket( &repair_config.intake_addr ) };
Expand Down
Loading

0 comments on commit 43c9ccd

Please sign in to comment.