Skip to content

Commit

Permalink
refactor(leader_schedule): SlotLeaderProvider from PointerClosure to …
Browse files Browse the repository at this point in the history
…direct SlotLeaders interface (#457)
  • Loading branch information
dnut authored Dec 19, 2024
1 parent 87b1c48 commit debaa86
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 29 deletions.
4 changes: 2 additions & 2 deletions src/cmd/cmd.zig
Original file line number Diff line number Diff line change
Expand Up @@ -767,7 +767,7 @@ fn validator() !void {
}
// This provider will fail at epoch boundary unless another thread updated the leader schedule cache
// i.e. called leader_schedule_cache.getSlotLeaderMaybeCompute(slot, bank_fields);
const leader_provider = leader_schedule_cache.slotLeaderProvider();
const leader_provider = leader_schedule_cache.slotLeaders();

// blockstore
var blockstore_db = try sig.ledger.BlockstoreDB.open(
Expand Down Expand Up @@ -885,7 +885,7 @@ fn shredCollector() !void {
}
// This provider will fail at epoch boundary unless another thread updated the leader schedule cache
// i.e. called leader_schedule_cache.getSlotLeaderMaybeCompute(slot, bank_fields);
const leader_provider = leader_schedule_cache.slotLeaderProvider();
const leader_provider = leader_schedule_cache.slotLeaders();

// blockstore
var blockstore_db = try sig.ledger.BlockstoreDB.open(
Expand Down
32 changes: 27 additions & 5 deletions src/core/leader_schedule.zig
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,40 @@ const RwMux = sig.sync.RwMux;
pub const NUM_CONSECUTIVE_LEADER_SLOTS: u64 = 4;
pub const MAX_CACHED_LEADER_SCHEDULES: usize = 10;

pub const SlotLeaderProvider = sig.utils.closure.PointerClosure(Slot, ?Pubkey);
/// interface to express a dependency on slot leaders
pub const SlotLeaders = struct {
state: *anyopaque,
getFn: *const fn (*anyopaque, Slot) ?Pubkey,

pub fn init(
state: anytype,
getSlotLeader: fn (@TypeOf(state), Slot) ?Pubkey,
) SlotLeaders {
return .{
.state = state,
.getFn = struct {
fn genericFn(generic_state: *anyopaque, slot: Slot) ?Pubkey {
return getSlotLeader(@alignCast(@ptrCast(generic_state)), slot);
}
}.genericFn,
};
}

pub fn get(self: SlotLeaders, slot: Slot) ?Pubkey {
return self.getFn(self.state, slot);
}
};

/// LeaderScheduleCache is a cache of leader schedules for each epoch.
/// Leader schedules are expensive to compute, so this cache is used to avoid
/// recomputing leader schedules for the same epoch.
/// LeaderScheduleCache also keeps a copy of the epoch_schedule so that it can
/// compute epoch and slot index from a slot.
/// NOTE: This struct is not really a 'cache', we should consider renaming it
/// to a SlotLeaderProvider and maybe even moving it outside of the core module.
/// to a SlotLeaders and maybe even moving it outside of the core module.
/// This more accurately describes the purpose of this struct as caching is a means
/// to an end, not the end itself. It may then follow that we could remove the
/// above pointer closure in favor of passing the SlotLeaderProvider directly.
/// above pointer closure in favor of passing the SlotLeaders directly.
pub const LeaderScheduleCache = struct {
epoch_schedule: EpochSchedule,
leader_schedules: RwMux(std.AutoArrayHashMap(Epoch, LeaderSchedule)),
Expand All @@ -41,8 +63,8 @@ pub const LeaderScheduleCache = struct {
};
}

pub fn slotLeaderProvider(self: *Self) SlotLeaderProvider {
return SlotLeaderProvider.init(self, LeaderScheduleCache.slotLeader);
pub fn slotLeaders(self: *Self) SlotLeaders {
return SlotLeaders.init(self, LeaderScheduleCache.slotLeader);
}

pub fn put(self: *Self, epoch: Epoch, leader_schedule: LeaderSchedule) !void {
Expand Down
30 changes: 15 additions & 15 deletions src/ledger/shred_inserter/shred_inserter.zig
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ const CodeShred = ledger.shred.CodeShred;
const DataShred = ledger.shred.DataShred;
const ReedSolomonCache = lib.recovery.ReedSolomonCache;
const ShredId = ledger.shred.ShredId;
const SlotLeaderProvider = sig.core.leader_schedule.SlotLeaderProvider;
const SlotLeaders = sig.core.leader_schedule.SlotLeaders;
const SortedSet = sig.utils.collections.SortedSet;
const SortedMap = sig.utils.collections.SortedMap;
const Timer = sig.time.Timer;
Expand Down Expand Up @@ -145,7 +145,7 @@ pub const ShredInserter = struct {
self: *Self,
shreds: []const Shred,
is_repaired: []const bool,
leader_schedule: ?SlotLeaderProvider,
maybe_slot_leaders: ?SlotLeaders,
is_trusted: bool,
retransmit_sender: ?PointerClosure([]const []const u8, void),
) !InsertShredsResult {
Expand Down Expand Up @@ -195,7 +195,7 @@ pub const ShredInserter = struct {
merkle_root_validator,
write_batch,
is_trusted,
leader_schedule,
maybe_slot_leaders,
shred_source,
)) |completed_data_sets| {
if (is_repair) {
Expand Down Expand Up @@ -239,7 +239,7 @@ pub const ShredInserter = struct {
var shred_recovery_timer = try Timer.start();
var valid_recovered_shreds = ArrayList([]const u8).init(allocator);
defer valid_recovered_shreds.deinit();
if (leader_schedule) |slot_leader_provider| {
if (maybe_slot_leaders) |slot_leaders| {
var reed_solomon_cache = try ReedSolomonCache.init(allocator);
defer reed_solomon_cache.deinit();
const recovered_shreds = try self.tryShredRecovery(
Expand All @@ -259,7 +259,7 @@ pub const ShredInserter = struct {
if (shred == .data) {
self.metrics.num_recovered.inc();
}
const leader = slot_leader_provider.call(shred.commonHeader().slot);
const leader = slot_leaders.get(shred.commonHeader().slot);
if (leader == null) {
continue;
}
Expand All @@ -280,7 +280,7 @@ pub const ShredInserter = struct {
merkle_root_validator,
write_batch,
is_trusted,
leader_schedule,
maybe_slot_leaders,
.recovered,
)) |completed_data_sets| {
defer completed_data_sets.deinit();
Expand Down Expand Up @@ -590,7 +590,7 @@ pub const ShredInserter = struct {
merkle_root_validator: MerkleRootValidator,
write_batch: *WriteBatch,
is_trusted: bool,
leader_schedule: ?SlotLeaderProvider,
leader_schedule: ?SlotLeaders,
shred_source: ShredSource,
) !ArrayList(CompletedDataSetInfo) {
const slot = shred.common.slot;
Expand Down Expand Up @@ -708,7 +708,7 @@ pub const ShredInserter = struct {
slot_meta: *const SlotMeta,
shred_store: ShredWorkingStore,
max_root: Slot,
leader_schedule: ?SlotLeaderProvider,
leader_schedule: ?SlotLeaders,
shred_source: ShredSource,
duplicate_shreds: *ArrayList(PossibleDuplicateShred),
) !bool {
Expand Down Expand Up @@ -975,8 +975,8 @@ fn verifyShredSlots(slot: Slot, parent: Slot, root: Slot) bool {
return root <= parent and parent < slot;
}

fn slotLeader(provider: ?SlotLeaderProvider, slot: Slot) ?Pubkey {
return if (provider) |p| if (p.call(slot)) |l| l else null else null;
fn slotLeader(provider: ?SlotLeaders, slot: Slot) ?Pubkey {
return if (provider) |p| if (p.get(slot)) |l| l else null else null;
}

/// update_slot_meta
Expand Down Expand Up @@ -1486,7 +1486,7 @@ test "recovery" {
const data_shreds = shreds[0..34];
const code_shreds = shreds[34..68];

var leader_schedule = OneSlotLeaderProvider{
var leader_schedule = OneSlotLeaders{
.leader = try Pubkey.fromString("2iWGQbhdWWAA15KTBJuqvAxCdKmEvY26BoFRBU4419Sn"),
};

Expand All @@ -1512,15 +1512,15 @@ test "recovery" {
// TODO: verify index integrity
}

const OneSlotLeaderProvider = struct {
const OneSlotLeaders = struct {
leader: Pubkey,

fn getLeader(self: *OneSlotLeaderProvider, _: Slot) ?Pubkey {
fn getLeader(self: *OneSlotLeaders, _: Slot) ?Pubkey {
return self.leader;
}

fn provider(self: *OneSlotLeaderProvider) SlotLeaderProvider {
return SlotLeaderProvider.init(self, OneSlotLeaderProvider.getLeader);
fn provider(self: *OneSlotLeaders) SlotLeaders {
return SlotLeaders.init(self, OneSlotLeaders.getLeader);
}
};

Expand Down
4 changes: 2 additions & 2 deletions src/shred_network/service.zig
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ const RwMux = sig.sync.RwMux;
const Registry = sig.prometheus.Registry;
const ServiceManager = sig.utils.service_manager.ServiceManager;
const Slot = sig.core.Slot;
const SlotLeaderProvider = sig.core.leader_schedule.SlotLeaderProvider;
const SlotLeaders = sig.core.leader_schedule.SlotLeaders;
const LeaderScheduleCache = sig.core.leader_schedule.LeaderScheduleCache;

const BasicShredTracker = shred_network.shred_tracker.BasicShredTracker;
Expand Down Expand Up @@ -52,7 +52,7 @@ pub const ShredCollectorDependencies = struct {
/// Shared state that is read from gossip
my_shred_version: *const Atomic(u16),
my_contact_info: ThreadSafeContactInfo,
leader_schedule: SlotLeaderProvider,
leader_schedule: SlotLeaders,
shred_inserter: sig.ledger.ShredInserter,
n_retransmit_threads: ?usize,
overwrite_turbine_stake_for_testing: bool,
Expand Down
2 changes: 1 addition & 1 deletion src/shred_network/shred_processor.zig
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ pub fn runShredProcessor(
verified_shred_receiver: *Channel(Packet),
tracker: *BasicShredTracker,
shred_inserter_: ShredInserter,
leader_schedule: sig.core.leader_schedule.SlotLeaderProvider,
leader_schedule: sig.core.leader_schedule.SlotLeaders,
) !void {
const logger = logger_.withScope(LOG_SCOPE);
var shred_inserter = shred_inserter_;
Expand Down
8 changes: 4 additions & 4 deletions src/shred_network/shred_verifier.zig
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ const Counter = sig.prometheus.Counter;
const Histogram = sig.prometheus.Histogram;
const Packet = sig.net.Packet;
const Registry = sig.prometheus.Registry;
const SlotLeaderProvider = sig.core.leader_schedule.SlotLeaderProvider;
const SlotLeaders = sig.core.leader_schedule.SlotLeaders;
const VariantCounter = sig.prometheus.VariantCounter;

const VerifiedMerkleRoots = sig.common.lru.LruCache(.non_locking, sig.core.Hash, void);
Expand All @@ -25,7 +25,7 @@ pub fn runShredVerifier(
verified_shred_sender: *Channel(Packet),
/// me --> retransmit service
maybe_retransmit_shred_sender: ?*Channel(Packet),
leader_schedule: SlotLeaderProvider,
leader_schedule: SlotLeaders,
) !void {
const metrics = try registry.initStruct(Metrics);
var verified_merkle_roots = try VerifiedMerkleRoots.init(std.heap.c_allocator, 1024);
Expand Down Expand Up @@ -53,7 +53,7 @@ pub fn runShredVerifier(
/// Analogous to [verify_shred_cpu](https://github.com/anza-xyz/agave/blob/83e7d84bcc4cf438905d07279bc07e012a49afd9/ledger/src/sigverify_shreds.rs#L35)
fn verifyShred(
packet: *const Packet,
leader_schedule: SlotLeaderProvider,
leader_schedule: SlotLeaders,
verified_merkle_roots: *VerifiedMerkleRoots,
metrics: Metrics,
) ShredVerificationFailure!void {
Expand All @@ -66,7 +66,7 @@ fn verifyShred(
return;
}
metrics.cache_miss_count.inc();
const leader = leader_schedule.call(slot) orelse return error.leader_unknown;
const leader = leader_schedule.get(slot) orelse return error.leader_unknown;
const valid = signature.verify(leader, &signed_data.data) catch
return error.failed_verification;
if (!valid) return error.failed_verification;
Expand Down

0 comments on commit debaa86

Please sign in to comment.