Skip to content

Commit

Permalink
fix(ci/cd): provision benchmark collection (#485)
Browse files Browse the repository at this point in the history
* fix(ci/cd): provision benchmark collection

* fix delete logic'

* change const name

* build fix
  • Loading branch information
0xNineteen authored Jan 15, 2025
1 parent 86cfa71 commit 4f799b4
Show file tree
Hide file tree
Showing 11 changed files with 239 additions and 216 deletions.
4 changes: 4 additions & 0 deletions scripts/benchmark_server.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
# doc:
# this script scrapes the results/metrics directory for json files
# and generates a graph for each metric in the json file

import os
import json
import plotly.express as px
Expand Down
13 changes: 10 additions & 3 deletions scripts/collect_benchmarks.sh
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
#!/usr/bin/env bash

# crontab -e
# 0 5 * * * bash /home/ubuntu/benchmarks/sig/scripts/collect_benchmarks.sh
# doc:
# this script will pull the latest change of the local repo
# and run the benchmark to collect metrics which are
# saved as results/output.json file. they are then
# moved to results/metrics/output-{commit}-{timestamp}.json
#
# these output files are then compared/visualized using the
# scripts/benchmark_server.py script

# now in the scripts/ dir
cd "$(dirname "$0")"
Expand All @@ -22,7 +28,8 @@ if ls $result_file 1> /dev/null 2>&1; then
echo "Results for commit $git_commit already exist. Skipping benchmark."
else
# Run the benchmark only if the result file doesn't exist
zig build -Doptimize=ReleaseSafe benchmark -- --metrics all
zig build -Doptimize=ReleaseSafe -Dno-run benchmark
./zig-out/bin/benchmark --metrics -e -f all

mv results/output.json "${result_dir}/output-${git_commit}-${timestamp}.json"
echo "Benchmark results saved to ${result_dir}/output-${git_commit}-${timestamp}.json"
Expand Down
13 changes: 13 additions & 0 deletions scripts/cron_jobs/setup_benchmarks.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# doc:
# this script will modify your server's crontab
# to run the collect_benchmarks.sh script at 6am everyday

SCRIPT_DIR=$(dirname "$(readlink -f "$0")")/..

# 6am everyday
(crontab -l; echo "\
0 6 * * * . $HOME/.bashrc; (bash $SCRIPT_DIR/collect_benchmarks.sh) 2>&1 | logger -t sig_bench \
") | crontab

echo "Cron job added. Current crontab:"
crontab -l
192 changes: 122 additions & 70 deletions src/accountsdb/db.zig
Original file line number Diff line number Diff line change
Expand Up @@ -291,25 +291,23 @@ pub const AccountsDB = struct {
var timer = try sig.time.Timer.start();
var fastload_dir = try self.snapshot_dir.makeOpenPath("fastload_state", .{});
defer fastload_dir.close();
self.logger.info().log("fast loading accountsdb...");
try self.fastload(fastload_dir, collapsed_manifest.accounts_db_fields);
self.logger.info().logf("loaded from snapshot in {s}", .{timer.read()});
self.logger.info().logf("fastload: total time: {s}", .{timer.read()});
} else {
const load_duration = try self.loadFromSnapshot(
collapsed_manifest.accounts_db_fields,
n_threads,
allocator,
accounts_per_file_estimate,
);
self.logger.info().logf("loaded from snapshot in {s}", .{load_duration});
self.logger.info().logf("loadFromSnapshot: total time: {s}", .{load_duration});
}

// no need to re-save if we just loaded from a fastload
if (!should_fastload and save_index) {
var fastload_dir = try self.snapshot_dir.makeOpenPath("fastload_state", .{});
defer fastload_dir.close();

try self.account_index.saveToDisk(fastload_dir);
if (save_index and !should_fastload) {
var timer = try sig.time.Timer.start();
_ = try self.saveStateForFastload();
self.logger.info().logf("saveStateForFastload: total time: {s}", .{timer.read()});
}

if (validate) {
Expand All @@ -331,17 +329,28 @@ pub const AccountsDB = struct {
.capitalization = inc_persistence.incremental_capitalization,
} else null,
});
self.logger.info().logf("validated from snapshot in {s}", .{validate_timer.read()});
self.logger.info().logf("validateLoadFromSnapshot: total time: {s}", .{validate_timer.read()});
}

return collapsed_manifest;
}

pub fn saveStateForFastload(
self: *Self,
) !void {
self.logger.info().log("running saveStateForFastload...");
var fastload_dir = try self.snapshot_dir.makeOpenPath("fastload_state", .{});
defer fastload_dir.close();
try self.account_index.saveToDisk(fastload_dir);
}

pub fn fastload(
self: *Self,
dir: std.fs.Dir,
snapshot_manifest: AccountsDbFields,
) !void {
self.logger.info().log("running fastload...");

var accounts_dir = try self.snapshot_dir.openDir("accounts", .{});
defer accounts_dir.close();

Expand Down Expand Up @@ -383,7 +392,6 @@ pub const AccountsDB = struct {
}

// NOTE: index loading was the most expensive part which we fastload here
self.logger.info().log("loading account index");
try self.account_index.loadFromDisk(dir);
}

Expand All @@ -397,7 +405,7 @@ pub const AccountsDB = struct {
per_thread_allocator: std.mem.Allocator,
accounts_per_file_estimate: u64,
) !sig.time.Duration {
self.logger.info().log("loading from snapshot...");
self.logger.info().log("running loadFromSnapshot...");

// used to read account files
const n_parse_threads = n_threads;
Expand Down Expand Up @@ -472,12 +480,11 @@ pub const AccountsDB = struct {
try geyser_writer.writePayloadToPipe(end_of_snapshot);
}

self.logger.info().logf("[{d} threads]: merging thread indexes...", .{n_combine_threads});
var merge_timer = try sig.time.Timer.start();
try self.mergeMultipleDBs(loading_threads, n_combine_threads);
self.logger.debug().logf("merging thread indexes took: {}", .{merge_timer.read()});
self.logger.debug().logf("mergeMultipleDBs: total time: {}", .{merge_timer.read()});

self.logger.debug().logf("total time: {s}", .{timer.read()});
self.logger.debug().logf("loadFromSnapshot: total time: {s}", .{timer.read()});
return timer.read();
}

Expand Down Expand Up @@ -768,6 +775,8 @@ pub const AccountsDB = struct {
thread_dbs: []AccountsDB,
n_threads: usize,
) !void {
self.logger.info().logf("[{d} threads]: running mergeMultipleDBs...", .{n_threads});

var merge_indexes_wg: std.Thread.WaitGroup = .{};
defer merge_indexes_wg.wait();
try spawnThreadTasks(mergeThreadIndexesMultiThread, .{
Expand Down Expand Up @@ -928,8 +937,8 @@ pub const AccountsDB = struct {
) !struct { Hash, u64 } {
var timer = try sig.time.Timer.start();
// TODO: make cli arg
const n_threads = @as(u32, @truncate(try std.Thread.getCpuCount())) * 2;
// const n_threads = 1;
const n_threads = @as(u32, @truncate(try std.Thread.getCpuCount()));
// const n_threads = 4;

// alloc the result
const hashes = try self.allocator.alloc(std.ArrayListUnmanaged(Hash), n_threads);
Expand All @@ -944,7 +953,10 @@ pub const AccountsDB = struct {
@memset(lamports, 0);

// split processing the bins over muliple threads
self.logger.info().logf("collecting hashes from accounts...", .{});
self.logger.info().logf(
"collecting hashes from accounts using {} threads...",
.{n_threads},
);
if (n_threads == 1) {
try getHashesFromIndex(
self,
Expand Down Expand Up @@ -1044,17 +1056,17 @@ pub const AccountsDB = struct {

if (params.expected_full.accounts_hash.order(&accounts_hash) != .eq) {
self.logger.err().logf(
\\ incorrect accounts hash
\\ expected vs calculated: {d} vs {d}
, .{ params.expected_full.accounts_hash, accounts_hash });
"incorrect accounts hash: expected vs calculated: {d} vs {d}",
.{ params.expected_full.accounts_hash, accounts_hash },
);
return error.IncorrectAccountsHash;
}

if (params.expected_full.capitalization != total_lamports) {
self.logger.err().logf(
\\ incorrect total lamports
\\ expected vs calculated: {d} vs {d}
, .{ params.expected_full.capitalization, total_lamports });
"incorrect total lamports: expected vs calculated: {d} vs {d}",
.{ params.expected_full.capitalization, total_lamports },
);
return error.IncorrectTotalLamports;
}

Expand Down Expand Up @@ -1093,17 +1105,17 @@ pub const AccountsDB = struct {

if (expected_incremental.capitalization != incremental_lamports) {
self.logger.err().logf(
\\ incorrect incremental lamports
\\ expected vs calculated: {d} vs {d}
, .{ expected_incremental.capitalization, incremental_lamports });
"incorrect incremental lamports: expected vs calculated: {d} vs {d}",
.{ expected_incremental.capitalization, incremental_lamports },
);
return error.IncorrectIncrementalLamports;
}

if (expected_incremental.accounts_hash.order(&accounts_delta_hash) != .eq) {
self.logger.err().logf(
\\ incorrect accounts delta hash
\\ expected vs calculated: {d} vs {d}
, .{ expected_incremental.accounts_hash, accounts_delta_hash });
"incorrect accounts delta hash: expected vs calculated: {d} vs {d}",
.{ expected_incremental.accounts_hash, accounts_delta_hash },
);
return error.IncorrectAccountsDeltaHash;
}

Expand Down Expand Up @@ -3199,6 +3211,15 @@ pub fn indexAndValidateAccountFile(
accounts_file.number_of_accounts = number_of_accounts;
}

pub fn getAccountPerFileEstimateFromCluster(
cluster: sig.core.Cluster,
) error{NotImplementedYet}!u64 {
return switch (cluster) {
.testnet => 1_000,
else => error.NotImplementedYet,
};
}

/// All entries in `manifest.accounts_db_fields.file_map` must correspond to an entry in `file_map`,
/// with the association defined by the file id (a field of the value of the former, the key of the latter).
pub fn writeSnapshotTarWithFields(
Expand Down Expand Up @@ -4390,15 +4411,6 @@ test "generate snapshot & update gossip snapshot hashes" {
}
}

pub fn getAccountPerFileEstimateFromCluster(
cluster: sig.core.Cluster,
) error{NotImplementedYet}!u64 {
return switch (cluster) {
.testnet => 500,
else => error.NotImplementedYet,
};
}

pub const BenchmarkAccountsDBSnapshotLoad = struct {
pub const min_iterations = 1;
pub const max_iterations = 1;
Expand All @@ -4425,13 +4437,18 @@ pub const BenchmarkAccountsDBSnapshotLoad = struct {
pub fn loadAndVerifySnapshot(units: BenchTimeUnit, bench_args: BenchArgs) !struct {
load_time: u64,
validate_time: u64,
fastload_save_time: u64,
fastload_time: u64,
} {
const allocator = std.heap.c_allocator;
var print_logger = sig.trace.DirectPrintLogger.init(allocator, .debug);
const logger = print_logger.logger();

// unpack the snapshot
var snapshot_dir = std.fs.cwd().openDir(SNAPSHOT_DIR_PATH, .{ .iterate = true }) catch {
var snapshot_dir = std.fs.cwd().openDir(
SNAPSHOT_DIR_PATH,
.{ .iterate = true },
) catch {
// not snapshot -> early exit
std.debug.print(
"need to setup a snapshot in {s} for this benchmark...\n",
Expand All @@ -4441,6 +4458,8 @@ pub const BenchmarkAccountsDBSnapshotLoad = struct {
return .{
.load_time = zero_duration.asNanos(),
.validate_time = zero_duration.asNanos(),
.fastload_save_time = zero_duration.asNanos(),
.fastload_time = zero_duration.asNanos(),
};
};
defer snapshot_dir.close();
Expand All @@ -4455,43 +4474,76 @@ pub const BenchmarkAccountsDBSnapshotLoad = struct {
defer full_inc_manifest.deinit(allocator);
const collapsed_manifest = try full_inc_manifest.collapse(allocator);

var accounts_db = try AccountsDB.init(.{
.allocator = allocator,
.logger = logger,
.snapshot_dir = snapshot_dir,
.geyser_writer = null,
.gossip_view = null,
.index_allocation = if (bench_args.use_disk) .disk else .ram,
.number_of_index_shards = 32,
.lru_size = null,
});
defer accounts_db.deinit();
const loading_duration, const fastload_save_duration, const validate_duration = duration_blk: {
var accounts_db = try AccountsDB.init(.{
.allocator = allocator,
.logger = logger,
.snapshot_dir = snapshot_dir,
.geyser_writer = null,
.gossip_view = null,
.index_allocation = if (bench_args.use_disk) .disk else .ram,
.number_of_index_shards = 32,
.lru_size = null,
});
defer accounts_db.deinit();

const loading_duration = try accounts_db.loadFromSnapshot(
collapsed_manifest.accounts_db_fields,
bench_args.n_threads,
allocator,
try getAccountPerFileEstimateFromCluster(bench_args.cluster),
);
const loading_duration = try accounts_db.loadFromSnapshot(
collapsed_manifest.accounts_db_fields,
bench_args.n_threads,
allocator,
try getAccountPerFileEstimateFromCluster(bench_args.cluster),
);

const full_snapshot = full_inc_manifest.full;
var validate_timer = try sig.time.Timer.start();
try accounts_db.validateLoadFromSnapshot(.{
.full_slot = full_snapshot.bank_fields.slot,
.expected_full = .{
.accounts_hash = collapsed_manifest.accounts_db_fields.bank_hash_info.accounts_hash,
.capitalization = full_snapshot.bank_fields.capitalization,
},
.expected_incremental = if (collapsed_manifest.bank_extra.snapshot_persistence) |inc_persistence| .{
.accounts_hash = inc_persistence.incremental_hash,
.capitalization = inc_persistence.incremental_capitalization,
} else null,
});
const validate_duration = validate_timer.read();
const fastload_save_duration = blk: {
var timer = try sig.time.Timer.start();
try accounts_db.saveStateForFastload();
break :blk timer.read();
};

const full_snapshot = full_inc_manifest.full;
var validate_timer = try sig.time.Timer.start();
try accounts_db.validateLoadFromSnapshot(.{
.full_slot = full_snapshot.bank_fields.slot,
.expected_full = .{
.accounts_hash = collapsed_manifest.accounts_db_fields.bank_hash_info.accounts_hash,
.capitalization = full_snapshot.bank_fields.capitalization,
},
.expected_incremental = if (collapsed_manifest.bank_extra.snapshot_persistence) |inc_persistence| .{
.accounts_hash = inc_persistence.incremental_hash,
.capitalization = inc_persistence.incremental_capitalization,
} else null,
});
const validate_duration = validate_timer.read();

break :duration_blk .{ loading_duration, fastload_save_duration, validate_duration };
};

const fastload_duration = blk: {
var fastload_accounts_db = try AccountsDB.init(.{
.allocator = allocator,
.logger = logger,
.snapshot_dir = snapshot_dir,
.geyser_writer = null,
.gossip_view = null,
.index_allocation = if (bench_args.use_disk) .disk else .ram,
.number_of_index_shards = 32,
.lru_size = null,
});
defer fastload_accounts_db.deinit();

var fastload_dir = try snapshot_dir.makeOpenPath("fastload_state", .{});
defer fastload_dir.close();

var fastload_timer = try sig.time.Timer.start();
try fastload_accounts_db.fastload(fastload_dir, collapsed_manifest.accounts_db_fields);
break :blk fastload_timer.read();
};

return .{
.load_time = units.convertDuration(loading_duration),
.validate_time = units.convertDuration(validate_duration),
.fastload_save_time = units.convertDuration(fastload_save_duration),
.fastload_time = units.convertDuration(fastload_duration),
};
}
};
Expand Down
Loading

0 comments on commit 4f799b4

Please sign in to comment.