Skip to content

Commit

Permalink
Generate transactions in multiple threads. (#9842)
Browse files Browse the repository at this point in the history
  • Loading branch information
grao1991 authored Aug 31, 2023
1 parent 40f80e3 commit 7e7d8c8
Show file tree
Hide file tree
Showing 7 changed files with 200 additions and 172 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions execution/executor-benchmark/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ async-trait = { workspace = true }
bcs = { workspace = true }
chrono = { workspace = true }
clap = { workspace = true }
derivative = { workspace = true }
indicatif = { workspace = true }
itertools = { workspace = true }
move-core-types = { workspace = true }
Expand Down
45 changes: 21 additions & 24 deletions execution/executor-benchmark/src/account_generator.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use aptos_sdk::{move_types::account_address::AccountAddress, types::LocalAccount};
use crate::transaction_generator::get_progress_bar;
use aptos_sdk::types::LocalAccount;
use rand::{rngs::StdRng, RngCore, SeedableRng};
use std::{collections::VecDeque, sync::mpsc};

Expand Down Expand Up @@ -60,18 +61,25 @@ impl AccountGenerator {
}

pub struct AccountCache {
generator: AccountGenerator,
pub accounts: VecDeque<LocalAccount>,
pub rng: StdRng,
}

impl AccountCache {
const SEED: Seed = [1; 32];

pub fn new(generator: AccountGenerator) -> Self {
pub fn new(mut generator: AccountGenerator, num_accounts: usize) -> Self {
let bar = get_progress_bar(num_accounts);
let accounts = (0..num_accounts)
.map(|_| {
let account = generator.generate();
bar.inc(1);
account
})
.collect();
bar.finish();
Self {
generator,
accounts: VecDeque::new(),
accounts,
rng: StdRng::from_seed(Self::SEED),
}
}
Expand All @@ -89,30 +97,19 @@ impl AccountCache {
&self.accounts
}

pub fn grow(&mut self, n: usize) {
let accounts: Vec<_> = (0..n).map(|_| self.generator.generate()).collect();
self.accounts.extend(accounts);
}

pub fn get_random(&mut self) -> &mut LocalAccount {
let indices = rand::seq::index::sample(&mut self.rng, self.accounts.len(), 1);
let index = indices.index(0);

let index = self.get_random_index();
&mut self.accounts[index]
}

pub fn get_random_transfer_batch(
&mut self,
batch_size: usize,
) -> (&LocalAccount, Vec<AccountAddress>) {
pub fn get_random_index(&mut self) -> usize {
rand::seq::index::sample(&mut self.rng, self.accounts.len(), 1).index(0)
}

pub fn get_random_transfer_batch(&mut self, batch_size: usize) -> (usize, Vec<usize>) {
let indices = rand::seq::index::sample(&mut self.rng, self.accounts.len(), batch_size + 1);
let sender_idx = indices.index(0);
let receivers = indices
.iter()
.skip(1)
.map(|i| self.accounts[i].address())
.collect();
let sender = &self.accounts[sender_idx];
(sender, receivers)
let receivers = indices.iter().skip(1).collect();
(sender_idx, receivers)
}
}
50 changes: 10 additions & 40 deletions execution/executor-benchmark/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use crate::{
transaction_executor::TransactionExecutor, transaction_generator::TransactionGenerator,
};
use aptos_block_executor::counters as block_executor_counters;
use aptos_block_partitioner::PartitionerConfig;
use aptos_config::config::{NodeConfig, PrunerConfig};
use aptos_db::AptosDB;
use aptos_executor::{
Expand Down Expand Up @@ -166,17 +165,7 @@ pub fn run_benchmark<V>(
db.clone(),
// Initialization pipeline is temporary, so needs to be fully committed.
// No discards/aborts allowed during initialization, even if they are allowed later.
PipelineConfig {
delay_execution_start: false,
split_stages: false,
skip_commit: false,
allow_discards: false,
allow_aborts: false,
num_executor_shards: 1,
async_partitioning: false,
use_global_executor: false,
partitioner_config: PartitionerConfig::default(),
},
PipelineConfig::default(),
)
});

Expand Down Expand Up @@ -207,8 +196,8 @@ pub fn run_benchmark<V>(
genesis_key,
block_sender,
source_dir,
version,
Some(num_accounts_to_load),
pipeline_config.num_generator_workers,
);

let mut start_time = Instant::now();
Expand Down Expand Up @@ -446,11 +435,11 @@ fn add_accounts_impl<V>(
config.storage.rocksdb_configs.skip_index_and_usage = skip_index_and_usage;
let (db, executor) = init_db_and_executor::<V>(&config);

let version = db.reader.get_latest_version().unwrap();
let start_version = db.reader.get_latest_version().unwrap();

let (pipeline, block_sender) = Pipeline::new(
executor,
version,
start_version,
pipeline_config,
Some(1 + num_new_accounts / block_size * 101 / 100),
);
Expand All @@ -460,8 +449,8 @@ fn add_accounts_impl<V>(
genesis_key,
block_sender,
&source_dir,
version,
None,
pipeline_config.num_generator_workers,
);

let start_time = Instant::now();
Expand All @@ -477,7 +466,8 @@ fn add_accounts_impl<V>(
pipeline.join();

let elapsed = start_time.elapsed().as_secs_f32();
let delta_v = db.reader.get_latest_version().unwrap() - version;
let now_version = db.reader.get_latest_version().unwrap();
let delta_v = now_version - start_version;
info!(
"Overall TPS: account creation: {} txn/s",
delta_v as f32 / elapsed,
Expand All @@ -492,7 +482,7 @@ fn add_accounts_impl<V>(
println!(
"Created {} new accounts. Now at version {}, total # of accounts {}.",
num_new_accounts,
generator.version(),
now_version,
generator.num_existing_accounts() + num_new_accounts,
);

Expand Down Expand Up @@ -587,17 +577,7 @@ mod tests {
false,
false,
false,
PipelineConfig {
delay_execution_start: false,
split_stages: false,
skip_commit: false,
allow_discards: false,
allow_aborts: false,
num_executor_shards: 1,
async_partitioning: false,
use_global_executor: false,
partitioner_config: Default::default(),
},
PipelineConfig::default(),
);

println!("run_benchmark");
Expand All @@ -619,17 +599,7 @@ mod tests {
false,
false,
false,
PipelineConfig {
delay_execution_start: false,
split_stages: true,
skip_commit: false,
allow_discards: false,
allow_aborts: false,
num_executor_shards: 1,
async_partitioning: false,
use_global_executor: false,
partitioner_config: Default::default(),
},
PipelineConfig::default(),
);
}

Expand Down
3 changes: 3 additions & 0 deletions execution/executor-benchmark/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ pub struct PipelineOpt {
#[clap(long)]
use_global_executor: bool,
#[clap(long, default_value = "4")]
num_generator_workers: usize,
#[clap(long, default_value = "4")]
max_partitioning_rounds: usize,
#[clap(long, default_value = "0.90")]
partitioner_cross_shard_dep_avoid_threshold: f32,
Expand All @@ -127,6 +129,7 @@ impl PipelineOpt {
num_executor_shards: self.num_executor_shards,
async_partitioning: self.async_partitioning,
use_global_executor: self.use_global_executor,
num_generator_workers: self.num_generator_workers,
partitioner_config: self.partitioner_config(),
}
}
Expand Down
9 changes: 7 additions & 2 deletions execution/executor-benchmark/src/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use aptos_types::{
block_executor::partitioner::ExecutableBlock,
transaction::{Transaction, Version},
};
use derivative::Derivative;
use std::{
marker::PhantomData,
sync::{
Expand All @@ -24,16 +25,20 @@ use std::{
time::{Duration, Instant},
};

#[derive(Clone, Copy, Debug)]
#[derive(Clone, Copy, Debug, Derivative)]
#[derivative(Default)]
pub struct PipelineConfig {
pub delay_execution_start: bool,
pub split_stages: bool,
pub skip_commit: bool,
pub allow_discards: bool,
pub allow_aborts: bool,
#[derivative(Default(value = "1"))]
pub num_executor_shards: usize,
pub async_partitioning: bool,
pub use_global_executor: bool,
#[derivative(Default(value = "4"))]
pub num_generator_workers: usize,
pub partitioner_config: PartitionerConfig,
}

Expand Down Expand Up @@ -63,7 +68,7 @@ where
if config.delay_execution_start {
(num_blocks.unwrap() + 1).max(50)
} else {
50
10
}, /* bound */
);

Expand Down
Loading

0 comments on commit 7e7d8c8

Please sign in to comment.