From debaa86142ab98d62f554abde19e4ccdffab678a Mon Sep 17 00:00:00 2001 From: Drew Nutter Date: Thu, 19 Dec 2024 17:40:02 -0300 Subject: [PATCH] refactor(leader_schedule): SlotLeaderProvider from PointerClosure to direct SlotLeaders interface (#457) --- src/cmd/cmd.zig | 4 +-- src/core/leader_schedule.zig | 32 +++++++++++++++++--- src/ledger/shred_inserter/shred_inserter.zig | 30 +++++++++--------- src/shred_network/service.zig | 4 +-- src/shred_network/shred_processor.zig | 2 +- src/shred_network/shred_verifier.zig | 8 ++--- 6 files changed, 51 insertions(+), 29 deletions(-) diff --git a/src/cmd/cmd.zig b/src/cmd/cmd.zig index b6f33d692..d412b94f4 100644 --- a/src/cmd/cmd.zig +++ b/src/cmd/cmd.zig @@ -767,7 +767,7 @@ fn validator() !void { } // This provider will fail at epoch boundary unless another thread updated the leader schedule cache // i.e. called leader_schedule_cache.getSlotLeaderMaybeCompute(slot, bank_fields); - const leader_provider = leader_schedule_cache.slotLeaderProvider(); + const leader_provider = leader_schedule_cache.slotLeaders(); // blockstore var blockstore_db = try sig.ledger.BlockstoreDB.open( @@ -885,7 +885,7 @@ fn shredCollector() !void { } // This provider will fail at epoch boundary unless another thread updated the leader schedule cache // i.e. called leader_schedule_cache.getSlotLeaderMaybeCompute(slot, bank_fields); - const leader_provider = leader_schedule_cache.slotLeaderProvider(); + const leader_provider = leader_schedule_cache.slotLeaders(); // blockstore var blockstore_db = try sig.ledger.BlockstoreDB.open( diff --git a/src/core/leader_schedule.zig b/src/core/leader_schedule.zig index f1236acb5..c21815932 100644 --- a/src/core/leader_schedule.zig +++ b/src/core/leader_schedule.zig @@ -14,7 +14,29 @@ const RwMux = sig.sync.RwMux; pub const NUM_CONSECUTIVE_LEADER_SLOTS: u64 = 4; pub const MAX_CACHED_LEADER_SCHEDULES: usize = 10; -pub const SlotLeaderProvider = sig.utils.closure.PointerClosure(Slot, ?Pubkey); +/// interface to express a dependency on slot leaders +pub const SlotLeaders = struct { + state: *anyopaque, + getFn: *const fn (*anyopaque, Slot) ?Pubkey, + + pub fn init( + state: anytype, + getSlotLeader: fn (@TypeOf(state), Slot) ?Pubkey, + ) SlotLeaders { + return .{ + .state = state, + .getFn = struct { + fn genericFn(generic_state: *anyopaque, slot: Slot) ?Pubkey { + return getSlotLeader(@alignCast(@ptrCast(generic_state)), slot); + } + }.genericFn, + }; + } + + pub fn get(self: SlotLeaders, slot: Slot) ?Pubkey { + return self.getFn(self.state, slot); + } +}; /// LeaderScheduleCache is a cache of leader schedules for each epoch. /// Leader schedules are expensive to compute, so this cache is used to avoid @@ -22,10 +44,10 @@ pub const SlotLeaderProvider = sig.utils.closure.PointerClosure(Slot, ?Pubkey); /// LeaderScheduleCache also keeps a copy of the epoch_schedule so that it can /// compute epoch and slot index from a slot. /// NOTE: This struct is not really a 'cache', we should consider renaming it -/// to a SlotLeaderProvider and maybe even moving it outside of the core module. +/// to a SlotLeaders and maybe even moving it outside of the core module. /// This more accurately describes the purpose of this struct as caching is a means /// to an end, not the end itself. It may then follow that we could remove the -/// above pointer closure in favor of passing the SlotLeaderProvider directly. +/// above pointer closure in favor of passing the SlotLeaders directly. pub const LeaderScheduleCache = struct { epoch_schedule: EpochSchedule, leader_schedules: RwMux(std.AutoArrayHashMap(Epoch, LeaderSchedule)), @@ -41,8 +63,8 @@ pub const LeaderScheduleCache = struct { }; } - pub fn slotLeaderProvider(self: *Self) SlotLeaderProvider { - return SlotLeaderProvider.init(self, LeaderScheduleCache.slotLeader); + pub fn slotLeaders(self: *Self) SlotLeaders { + return SlotLeaders.init(self, LeaderScheduleCache.slotLeader); } pub fn put(self: *Self, epoch: Epoch, leader_schedule: LeaderSchedule) !void { diff --git a/src/ledger/shred_inserter/shred_inserter.zig b/src/ledger/shred_inserter/shred_inserter.zig index b3536783d..f4cc130b6 100644 --- a/src/ledger/shred_inserter/shred_inserter.zig +++ b/src/ledger/shred_inserter/shred_inserter.zig @@ -23,7 +23,7 @@ const CodeShred = ledger.shred.CodeShred; const DataShred = ledger.shred.DataShred; const ReedSolomonCache = lib.recovery.ReedSolomonCache; const ShredId = ledger.shred.ShredId; -const SlotLeaderProvider = sig.core.leader_schedule.SlotLeaderProvider; +const SlotLeaders = sig.core.leader_schedule.SlotLeaders; const SortedSet = sig.utils.collections.SortedSet; const SortedMap = sig.utils.collections.SortedMap; const Timer = sig.time.Timer; @@ -145,7 +145,7 @@ pub const ShredInserter = struct { self: *Self, shreds: []const Shred, is_repaired: []const bool, - leader_schedule: ?SlotLeaderProvider, + maybe_slot_leaders: ?SlotLeaders, is_trusted: bool, retransmit_sender: ?PointerClosure([]const []const u8, void), ) !InsertShredsResult { @@ -195,7 +195,7 @@ pub const ShredInserter = struct { merkle_root_validator, write_batch, is_trusted, - leader_schedule, + maybe_slot_leaders, shred_source, )) |completed_data_sets| { if (is_repair) { @@ -239,7 +239,7 @@ pub const ShredInserter = struct { var shred_recovery_timer = try Timer.start(); var valid_recovered_shreds = ArrayList([]const u8).init(allocator); defer valid_recovered_shreds.deinit(); - if (leader_schedule) |slot_leader_provider| { + if (maybe_slot_leaders) |slot_leaders| { var reed_solomon_cache = try ReedSolomonCache.init(allocator); defer reed_solomon_cache.deinit(); const recovered_shreds = try self.tryShredRecovery( @@ -259,7 +259,7 @@ pub const ShredInserter = struct { if (shred == .data) { self.metrics.num_recovered.inc(); } - const leader = slot_leader_provider.call(shred.commonHeader().slot); + const leader = slot_leaders.get(shred.commonHeader().slot); if (leader == null) { continue; } @@ -280,7 +280,7 @@ pub const ShredInserter = struct { merkle_root_validator, write_batch, is_trusted, - leader_schedule, + maybe_slot_leaders, .recovered, )) |completed_data_sets| { defer completed_data_sets.deinit(); @@ -590,7 +590,7 @@ pub const ShredInserter = struct { merkle_root_validator: MerkleRootValidator, write_batch: *WriteBatch, is_trusted: bool, - leader_schedule: ?SlotLeaderProvider, + leader_schedule: ?SlotLeaders, shred_source: ShredSource, ) !ArrayList(CompletedDataSetInfo) { const slot = shred.common.slot; @@ -708,7 +708,7 @@ pub const ShredInserter = struct { slot_meta: *const SlotMeta, shred_store: ShredWorkingStore, max_root: Slot, - leader_schedule: ?SlotLeaderProvider, + leader_schedule: ?SlotLeaders, shred_source: ShredSource, duplicate_shreds: *ArrayList(PossibleDuplicateShred), ) !bool { @@ -975,8 +975,8 @@ fn verifyShredSlots(slot: Slot, parent: Slot, root: Slot) bool { return root <= parent and parent < slot; } -fn slotLeader(provider: ?SlotLeaderProvider, slot: Slot) ?Pubkey { - return if (provider) |p| if (p.call(slot)) |l| l else null else null; +fn slotLeader(provider: ?SlotLeaders, slot: Slot) ?Pubkey { + return if (provider) |p| if (p.get(slot)) |l| l else null else null; } /// update_slot_meta @@ -1486,7 +1486,7 @@ test "recovery" { const data_shreds = shreds[0..34]; const code_shreds = shreds[34..68]; - var leader_schedule = OneSlotLeaderProvider{ + var leader_schedule = OneSlotLeaders{ .leader = try Pubkey.fromString("2iWGQbhdWWAA15KTBJuqvAxCdKmEvY26BoFRBU4419Sn"), }; @@ -1512,15 +1512,15 @@ test "recovery" { // TODO: verify index integrity } -const OneSlotLeaderProvider = struct { +const OneSlotLeaders = struct { leader: Pubkey, - fn getLeader(self: *OneSlotLeaderProvider, _: Slot) ?Pubkey { + fn getLeader(self: *OneSlotLeaders, _: Slot) ?Pubkey { return self.leader; } - fn provider(self: *OneSlotLeaderProvider) SlotLeaderProvider { - return SlotLeaderProvider.init(self, OneSlotLeaderProvider.getLeader); + fn provider(self: *OneSlotLeaders) SlotLeaders { + return SlotLeaders.init(self, OneSlotLeaders.getLeader); } }; diff --git a/src/shred_network/service.zig b/src/shred_network/service.zig index ab15f671a..ced4594b6 100644 --- a/src/shred_network/service.zig +++ b/src/shred_network/service.zig @@ -19,7 +19,7 @@ const RwMux = sig.sync.RwMux; const Registry = sig.prometheus.Registry; const ServiceManager = sig.utils.service_manager.ServiceManager; const Slot = sig.core.Slot; -const SlotLeaderProvider = sig.core.leader_schedule.SlotLeaderProvider; +const SlotLeaders = sig.core.leader_schedule.SlotLeaders; const LeaderScheduleCache = sig.core.leader_schedule.LeaderScheduleCache; const BasicShredTracker = shred_network.shred_tracker.BasicShredTracker; @@ -52,7 +52,7 @@ pub const ShredCollectorDependencies = struct { /// Shared state that is read from gossip my_shred_version: *const Atomic(u16), my_contact_info: ThreadSafeContactInfo, - leader_schedule: SlotLeaderProvider, + leader_schedule: SlotLeaders, shred_inserter: sig.ledger.ShredInserter, n_retransmit_threads: ?usize, overwrite_turbine_stake_for_testing: bool, diff --git a/src/shred_network/shred_processor.zig b/src/shred_network/shred_processor.zig index 0e1681df9..9b018c0e4 100644 --- a/src/shred_network/shred_processor.zig +++ b/src/shred_network/shred_processor.zig @@ -33,7 +33,7 @@ pub fn runShredProcessor( verified_shred_receiver: *Channel(Packet), tracker: *BasicShredTracker, shred_inserter_: ShredInserter, - leader_schedule: sig.core.leader_schedule.SlotLeaderProvider, + leader_schedule: sig.core.leader_schedule.SlotLeaders, ) !void { const logger = logger_.withScope(LOG_SCOPE); var shred_inserter = shred_inserter_; diff --git a/src/shred_network/shred_verifier.zig b/src/shred_network/shred_verifier.zig index 9aee2d3b1..1c7332df1 100644 --- a/src/shred_network/shred_verifier.zig +++ b/src/shred_network/shred_verifier.zig @@ -10,7 +10,7 @@ const Counter = sig.prometheus.Counter; const Histogram = sig.prometheus.Histogram; const Packet = sig.net.Packet; const Registry = sig.prometheus.Registry; -const SlotLeaderProvider = sig.core.leader_schedule.SlotLeaderProvider; +const SlotLeaders = sig.core.leader_schedule.SlotLeaders; const VariantCounter = sig.prometheus.VariantCounter; const VerifiedMerkleRoots = sig.common.lru.LruCache(.non_locking, sig.core.Hash, void); @@ -25,7 +25,7 @@ pub fn runShredVerifier( verified_shred_sender: *Channel(Packet), /// me --> retransmit service maybe_retransmit_shred_sender: ?*Channel(Packet), - leader_schedule: SlotLeaderProvider, + leader_schedule: SlotLeaders, ) !void { const metrics = try registry.initStruct(Metrics); var verified_merkle_roots = try VerifiedMerkleRoots.init(std.heap.c_allocator, 1024); @@ -53,7 +53,7 @@ pub fn runShredVerifier( /// Analogous to [verify_shred_cpu](https://github.com/anza-xyz/agave/blob/83e7d84bcc4cf438905d07279bc07e012a49afd9/ledger/src/sigverify_shreds.rs#L35) fn verifyShred( packet: *const Packet, - leader_schedule: SlotLeaderProvider, + leader_schedule: SlotLeaders, verified_merkle_roots: *VerifiedMerkleRoots, metrics: Metrics, ) ShredVerificationFailure!void { @@ -66,7 +66,7 @@ fn verifyShred( return; } metrics.cache_miss_count.inc(); - const leader = leader_schedule.call(slot) orelse return error.leader_unknown; + const leader = leader_schedule.get(slot) orelse return error.leader_unknown; const valid = signature.verify(leader, &signed_data.data) catch return error.failed_verification; if (!valid) return error.failed_verification;