Skip to content

Commit

Permalink
feat(shred-network): minimize dependencies (#462)
Browse files Browse the repository at this point in the history
This is an alternative implementation for #423. It accomplishes the same goals, except that it implements the interface between `shred-network` and an epoch context provider with shared memory instead of a vtable.

The static context of an epoch is defined by EpochContext, which contains staked nodes and a leader schedule.

The storage of relevant EpochContexts is managed by EpochContextManager. This specialized struct is needed because it must be aware of the epoch schedule.

Depending on the run mode, the initial EpochContext may be created from the bank or from rpc. There is currently only one way to update the EpochContextManager during the runtime of the validator, which is via rpc.

There is a limitation where RPC cannot be used to access epoch staked nodes. This can only be acquired from the bank. So the bank is used when available, otherwise the data is not populated.

## commits

* refactor(leader_schedule): SlotLeaderProvider from PointerClosure to direct SlotLeaders interface

* refactor(sync): adapt RcSlice for single-item pointers

* feat: add Window and SharedPointerWindow data structures

* feat(shred-network): minimize dependencies

* fix(collections): window integer overflow when realigning from 0. add test to reproduce issue and fix bug

* fix(sync): shared pointer window defer lock should be unlock

* fix(rpc epoch context): wrong epoch calculation
  • Loading branch information
dnut authored Jan 2, 2025
1 parent e439c9d commit e7495af
Show file tree
Hide file tree
Showing 8 changed files with 265 additions and 90 deletions.
149 changes: 149 additions & 0 deletions src/adapter.zig
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
//! Links dependencies with dependents. Connects components from distant regions of the code.

const std = @import("std");
const sig = @import("sig.zig");

const leader_schedule = sig.core.leader_schedule;

const Allocator = std.mem.Allocator;

const Epoch = sig.core.Epoch;
const EpochContext = sig.core.EpochContext;
const EpochSchedule = sig.core.EpochSchedule;
const Slot = sig.core.Slot;

pub const EpochContextManager = struct {
schedule: sig.core.EpochSchedule,
contexts: ContextWindow,

const ContextWindow = sig.sync.SharedPointerWindow(
sig.core.EpochContext,
sig.core.EpochContext.deinit,
std.mem.Allocator,
);

const Self = @This();

/// all contexts that are `put` into this context manager must be
/// allocated using the same allocator passed here.
pub fn init(allocator: Allocator, schedule: EpochSchedule) Allocator.Error!Self {
return .{
.schedule = schedule,
.contexts = try ContextWindow.init(allocator, 3, 0, allocator),
};
}

pub fn deinit(self: Self) void {
self.contexts.deinit();
}

pub fn put(self: *Self, epoch: Epoch, context: sig.core.EpochContext) !void {
try self.contexts.put(epoch, context);
}

/// call `release` when done with pointer
pub fn get(self: *Self, epoch: Epoch) ?*const sig.core.EpochContext {
return self.contexts.get(@intCast(epoch));
}

pub fn contains(self: *Self, epoch: Epoch) bool {
return self.contexts.contains(@intCast(epoch));
}

pub fn setEpoch(self: *Self, epoch: Epoch) void {
self.contexts.realign(@intCast(epoch));
}

pub fn setSlot(self: *Self, slot: Slot) void {
self.contexts.realign(@intCast(self.schedule.getEpoch(slot)));
}

pub fn release(self: *Self, context: *const sig.core.EpochContext) void {
self.contexts.release(context);
}

pub fn getLeader(self: *Self, slot: Slot) ?sig.core.Pubkey {
const epoch, const slot_index = self.schedule.getEpochAndSlotIndex(slot);
const context = self.contexts.get(epoch) orelse return null;
defer self.contexts.release(context);
return context.leader_schedule[slot_index];
}

pub fn slotLeaders(self: *Self) sig.core.leader_schedule.SlotLeaders {
return sig.core.leader_schedule.SlotLeaders.init(self, getLeader);
}
};

pub const RpcEpochContextService = struct {
allocator: std.mem.Allocator,
logger: sig.trace.ScopedLogger(@typeName(Self)),
rpc_client: sig.rpc.Client,
state: *EpochContextManager,

const Self = @This();

pub fn init(
allocator: Allocator,
logger: sig.trace.Logger,
state: *EpochContextManager,
rpc_client: sig.rpc.Client,
) Self {
return .{
.allocator = allocator,
.logger = logger.withScope(@typeName(Self)),
.rpc_client = rpc_client,
.state = state,
};
}

pub fn run(self: *Self, exit: *std.atomic.Value(bool)) void {
var i: usize = 0;
while (!exit.load(.monotonic)) {
if (i % 1000 == 0) {
self.refresh() catch |e| {
self.logger.err().logf("failed to refresh epoch context via rpc: {}", .{e});
};
}
std.time.sleep(100 * std.time.ns_per_ms);
i += 1;
}
}

fn refresh(self: *Self) !void {
const response = try self.rpc_client.getSlot(self.allocator, .{});
defer response.deinit();
const old_slot = try response.result() - self.state.schedule.slots_per_epoch;
const last_epoch = self.state.schedule.getEpoch(old_slot);

self.state.setEpoch(last_epoch);

const ls1 = try self.getLeaderSchedule(old_slot);
const ctx1 = EpochContext{ .staked_nodes = .{}, .leader_schedule = ls1 };
try self.state.put(last_epoch, ctx1);

for (0..3) |epoch_offset| {
const selected_slot = old_slot + epoch_offset * self.state.schedule.slots_per_epoch;
const selected_epoch = last_epoch + epoch_offset;
std.debug.assert(selected_epoch == self.state.schedule.getEpoch(selected_slot));

if (self.state.contains(selected_epoch)) {
continue;
}

if (self.getLeaderSchedule(selected_slot)) |ls2| {
const ctx2 = EpochContext{ .staked_nodes = .{}, .leader_schedule = ls2 };
try self.state.put(selected_epoch, ctx2);
} else |e| if (selected_epoch == last_epoch) {
return e;
}
}
}

fn getLeaderSchedule(self: *Self, slot: sig.core.Slot) ![]const sig.core.Pubkey {
const response = try self.rpc_client.getLeaderSchedule(self.allocator, slot, .{});
defer response.deinit();
const rpc_schedule = try response.result();
const schedule = try leader_schedule.LeaderSchedule.fromMap(self.allocator, rpc_schedule);
return schedule.slot_leaders;
}
};
76 changes: 46 additions & 30 deletions src/cmd/cmd.zig
Original file line number Diff line number Diff line change
Expand Up @@ -765,9 +765,6 @@ fn validator() !void {
errdefer schedule.deinit();
try leader_schedule_cache.put(loaded_snapshot.collapsed_manifest.bank_fields.epoch, schedule);
}
// This provider will fail at epoch boundary unless another thread updated the leader schedule cache
// i.e. called leader_schedule_cache.getSlotLeaderMaybeCompute(slot, bank_fields);
const leader_provider = leader_schedule_cache.slotLeaders();

// blockstore
var blockstore_db = try sig.ledger.BlockstoreDB.open(
Expand Down Expand Up @@ -817,6 +814,28 @@ fn validator() !void {

// shred networking
const my_contact_info = sig.gossip.data.ThreadSafeContactInfo.fromContactInfo(gossip_service.my_contact_info);

const epoch_schedule = loaded_snapshot.collapsed_manifest.bank_fields.epoch_schedule;
const epoch = loaded_snapshot.collapsed_manifest.bank_fields.epoch;
const staked_nodes = try loaded_snapshot.collapsed_manifest.bank_fields.getStakedNodes(allocator, epoch);

var epoch_context_manager = try sig.adapter.EpochContextManager.init(allocator, epoch_schedule);
try epoch_context_manager.put(epoch, .{
.staked_nodes = try staked_nodes.clone(allocator),
.leader_schedule = try LeaderSchedule
.fromStakedNodes(allocator, epoch, epoch_schedule.slots_per_epoch, staked_nodes),
});
var rpc_client = sig.rpc.Client.init(allocator, loaded_snapshot.genesis_config.cluster_type, .{});
defer rpc_client.deinit();
var rpc_epoch_ctx_service = sig.adapter.RpcEpochContextService
.init(allocator, app_base.logger.unscoped(), &epoch_context_manager, rpc_client);
const rpc_epoch_ctx_service_thread = try std.Thread.spawn(
.{},
sig.adapter.RpcEpochContextService.run,
.{ &rpc_epoch_ctx_service, &app_base.exit },
);

// shred collector
var shred_col_conf = config.current.shred_network;
shred_col_conf.start_slot = shred_col_conf.start_slot orelse loaded_snapshot.collapsed_manifest.bank_fields.slot;
var shred_network_manager = try sig.shred_network.start(
Expand All @@ -830,17 +849,16 @@ fn validator() !void {
.exit = &app_base.exit,
.gossip_table_rw = &gossip_service.gossip_table_rw,
.my_shred_version = &gossip_service.my_shred_version,
.leader_schedule = leader_provider,
.epoch_context_mgr = &epoch_context_manager,
.shred_inserter = shred_inserter,
.my_contact_info = my_contact_info,
.n_retransmit_threads = config.current.turbine.num_retransmit_threads,
.overwrite_turbine_stake_for_testing = config.current.turbine.overwrite_stake_for_testing,
.leader_schedule_cache = &leader_schedule_cache,
.bank_fields = &loaded_snapshot.collapsed_manifest.bank_fields,
},
);
defer shred_network_manager.deinit();

rpc_epoch_ctx_service_thread.join();
gossip_service.service_manager.join();
shred_network_manager.join();
}
Expand All @@ -853,6 +871,13 @@ fn shredCollector() !void {
app_base.deinit();
}

const genesis_path = try config.current.genesisFilePath() orelse
return error.GenesisPathNotProvided;
const genesis_config = try GenesisConfig.init(allocator, genesis_path);

var rpc_client = sig.rpc.Client.init(allocator, genesis_config.cluster_type, .{});
defer rpc_client.deinit();

const repair_port: u16 = config.current.shred_network.repair_port;
const turbine_recv_port: u16 = config.current.shred_network.turbine_recv_port;

Expand All @@ -866,26 +891,15 @@ fn shredCollector() !void {
allocator.destroy(gossip_service);
}

var loaded_snapshot = try loadSnapshot(allocator, app_base.logger.unscoped(), .{
.gossip_service = gossip_service,
.geyser_writer = null,
.validate_snapshot = true,
.metadata_only = config.current.accounts_db.snapshot_metadata_only,
});
defer loaded_snapshot.deinit();

// leader schedule
var leader_schedule_cache = LeaderScheduleCache.init(allocator, loaded_snapshot.collapsed_manifest.bank_fields.epoch_schedule);
if (try getLeaderScheduleFromCli(allocator)) |leader_schedule| {
try leader_schedule_cache.put(loaded_snapshot.collapsed_manifest.bank_fields.epoch, leader_schedule[1]);
} else {
const schedule = try loaded_snapshot.collapsed_manifest.bank_fields.leaderSchedule(allocator);
errdefer schedule.deinit();
try leader_schedule_cache.put(loaded_snapshot.collapsed_manifest.bank_fields.epoch, schedule);
}
// This provider will fail at epoch boundary unless another thread updated the leader schedule cache
// i.e. called leader_schedule_cache.getSlotLeaderMaybeCompute(slot, bank_fields);
const leader_provider = leader_schedule_cache.slotLeaders();
var epoch_context_manager = try sig.adapter.EpochContextManager
.init(allocator, genesis_config.epoch_schedule);
var rpc_epoch_ctx_service = sig.adapter.RpcEpochContextService
.init(allocator, app_base.logger.unscoped(), &epoch_context_manager, rpc_client);
const rpc_epoch_ctx_service_thread = try std.Thread.spawn(
.{},
sig.adapter.RpcEpochContextService.run,
.{ &rpc_epoch_ctx_service, &app_base.exit },
);

// blockstore
var blockstore_db = try sig.ledger.BlockstoreDB.open(
Expand Down Expand Up @@ -936,7 +950,10 @@ fn shredCollector() !void {

// shred networking
var shred_col_conf = config.current.shred_network;
shred_col_conf.start_slot = shred_col_conf.start_slot orelse @panic("No start slot found");
shred_col_conf.start_slot = shred_col_conf.start_slot orelse blk: {
const response = try rpc_client.getSlot(allocator, .{});
break :blk try response.result();
};
var shred_network_manager = try sig.shred_network.start(
shred_col_conf,
.{
Expand All @@ -948,17 +965,16 @@ fn shredCollector() !void {
.exit = &app_base.exit,
.gossip_table_rw = &gossip_service.gossip_table_rw,
.my_shred_version = &gossip_service.my_shred_version,
.leader_schedule = leader_provider,
.epoch_context_mgr = &epoch_context_manager,
.shred_inserter = shred_inserter,
.my_contact_info = my_contact_info,
.n_retransmit_threads = config.current.turbine.num_retransmit_threads,
.overwrite_turbine_stake_for_testing = config.current.turbine.overwrite_stake_for_testing,
.leader_schedule_cache = &leader_schedule_cache,
.bank_fields = &loaded_snapshot.collapsed_manifest.bank_fields,
},
);
defer shred_network_manager.deinit();

rpc_epoch_ctx_service_thread.join();
gossip_service.service_manager.join();
shred_network_manager.join();
}
Expand Down
16 changes: 16 additions & 0 deletions src/core/epoch_context.zig
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
const std = @import("std");
const core = @import("lib.zig");

/// constant data about a particular epoch.
/// this can be computed before the epoch begins, and does not change during the epoch
pub const EpochContext = struct {
/// the staked nodes for this particular cluster to use for the leader schedule and turbine tree
staked_nodes: std.AutoArrayHashMapUnmanaged(core.Pubkey, u64),
/// the leader schedule for this epoch
leader_schedule: []const core.Pubkey,

pub fn deinit(self: *EpochContext, allocator: std.mem.Allocator) void {
self.staked_nodes.deinit(allocator);
allocator.free(self.leader_schedule);
}
};
2 changes: 2 additions & 0 deletions src/core/lib.zig
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
pub const account = @import("account.zig");
pub const entry = @import("entry.zig");
pub const epoch_schedule = @import("epoch_schedule.zig");
pub const epoch_context = @import("epoch_context.zig");
pub const hard_forks = @import("hard_forks.zig");
pub const hash = @import("hash.zig");
pub const leader_schedule = @import("leader_schedule.zig");
Expand All @@ -13,6 +14,7 @@ pub const transaction = @import("transaction.zig");
pub const Account = account.Account;
pub const Entry = entry.Entry;
pub const EpochSchedule = epoch_schedule.EpochSchedule;
pub const EpochContext = epoch_context.EpochContext;
pub const HardForks = hard_forks.HardForks;
pub const HardFork = hard_forks.HardFork;
pub const Hash = hash.Hash;
Expand Down
16 changes: 6 additions & 10 deletions src/shred_network/service.zig
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,16 @@ const Random = std.rand.Random;
const Socket = network.Socket;

const Channel = sig.sync.Channel;
const EpochContextManager = sig.adapter.EpochContextManager;
const GossipTable = sig.gossip.GossipTable;
const ThreadSafeContactInfo = sig.gossip.data.ThreadSafeContactInfo;
const Logger = sig.trace.Logger;
const Packet = sig.net.Packet;
const Pubkey = sig.core.Pubkey;
const RwMux = sig.sync.RwMux;
const Registry = sig.prometheus.Registry;
const ServiceManager = sig.utils.service_manager.ServiceManager;
const Slot = sig.core.Slot;
const SlotLeaders = sig.core.leader_schedule.SlotLeaders;
const LeaderScheduleCache = sig.core.leader_schedule.LeaderScheduleCache;
const ThreadSafeContactInfo = sig.gossip.data.ThreadSafeContactInfo;

const BasicShredTracker = shred_network.shred_tracker.BasicShredTracker;
const RepairPeerProvider = shred_network.repair_service.RepairPeerProvider;
Expand Down Expand Up @@ -52,12 +51,10 @@ pub const ShredCollectorDependencies = struct {
/// Shared state that is read from gossip
my_shred_version: *const Atomic(u16),
my_contact_info: ThreadSafeContactInfo,
leader_schedule: SlotLeaders,
epoch_context_mgr: *EpochContextManager,
shred_inserter: sig.ledger.ShredInserter,
n_retransmit_threads: ?usize,
overwrite_turbine_stake_for_testing: bool,
leader_schedule_cache: *LeaderScheduleCache,
bank_fields: *const sig.accounts_db.snapshots.BankFields,
};

/// Start the Shred Collector.
Expand Down Expand Up @@ -121,7 +118,7 @@ pub fn start(
unverified_shred_channel,
verified_shred_channel,
&retransmit_channel,
deps.leader_schedule,
deps.epoch_context_mgr.slotLeaders(),
},
);

Expand All @@ -145,7 +142,7 @@ pub fn start(
verified_shred_channel,
shred_tracker,
deps.shred_inserter,
deps.leader_schedule,
deps.epoch_context_mgr.slotLeaders(),
},
);

Expand All @@ -156,8 +153,7 @@ pub fn start(
.{.{
.allocator = deps.allocator,
.my_contact_info = deps.my_contact_info,
.bank_fields = deps.bank_fields,
.leader_schedule_cache = deps.leader_schedule_cache,
.epoch_context_mgr = deps.epoch_context_mgr,
.gossip_table_rw = deps.gossip_table_rw,
.receiver = &retransmit_channel,
.maybe_num_retransmit_threads = deps.n_retransmit_threads,
Expand Down
Loading

0 comments on commit e7495af

Please sign in to comment.