Skip to content

Commit

Permalink
A more efficient implementation of the multi-round block partitioner (#…
Browse files Browse the repository at this point in the history
…9488)

* init

* integrate into executor benchmark

* renames

* update

* more async drop

* update

* update

* update

* update

* update

* update

* save a vec of storage location

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* exe bench partition timer

* parallelize add_edges__init

* update

* update

* prealloc r/wsets_by_txn_id

* update

* speculative bug fix

* add_edges bugfix

* give a large blockstm instance to the last shard

* par fix

* more v2 tests of equality to unsharded result

* add_edges another impl

* tweak

* determinism tests

* refactoring

* fmt

* tweak

* update

* v2 cmdline args for executor bench

* misc

* update tests

* clean up

* bugfix

* clean up

* more iterations in determinism tests

* clean up

* clean up

* comments

* API tweak

* clean up

* doc update

* clean up

* clean up

* rename

* polish partitionerv2 (#9619)

* save

* a quick bench...

* update

* update

* timer clean up

* all states in session

* update

* update

* update

* update

* parallel twds

* make_txn_with_dep as a func

* tweak

* tweak

* massive breakup

* tweak

* update

* rename

* update

* update

* tweaks

* tweaks

* timers back

* tweaks

* tweaks

* tweaks

* refactor

* renames

* update

* fmt

* doc

* tweak

* tweak

* comments

* comments

* tweak

* tweak

* tweak

* tweak

* tweak

* comment

* comment

* comment

* fmt

* renames

* fmt

* ut for start_txn_idxs

* renames

* comments

* comments

* fmt

* comments

* comments

* fmt
  • Loading branch information
zjma authored Aug 24, 2023
1 parent fad6e81 commit 3e78e8a
Show file tree
Hide file tree
Showing 32 changed files with 2,007 additions and 202 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 4 additions & 3 deletions aptos-move/aptos-transaction-benchmarks/src/transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
use aptos_bitvec::BitVec;
use aptos_block_executor::txn_commit_hook::NoOpTransactionCommitHook;
use aptos_block_partitioner::{
sharded_block_partitioner::ShardedBlockPartitioner, BlockPartitionerConfig,
sharded_block_partitioner::config::PartitionerV1Config, BlockPartitioner,
};
use aptos_crypto::HashValue;
use aptos_language_e2e_tests::{
Expand Down Expand Up @@ -199,7 +199,7 @@ struct TransactionBenchState<S> {
account_universe: AccountUniverse,
parallel_block_executor:
Option<Arc<ShardedBlockExecutor<FakeDataStore, LocalExecutorClient<FakeDataStore>>>>,
block_partitioner: Option<ShardedBlockPartitioner>,
block_partitioner: Option<Box<dyn BlockPartitioner>>,
validator_set: ValidatorSet,
state_view: Arc<FakeDataStore>,
}
Expand Down Expand Up @@ -259,7 +259,7 @@ where
(
Some(parallel_block_executor),
Some(
BlockPartitionerConfig::default()
PartitionerV1Config::default()
.num_shards(num_executor_shards)
.max_partitioning_rounds(4)
.cross_shard_dep_avoid_threshold(0.9)
Expand Down Expand Up @@ -381,6 +381,7 @@ where
.into_iter()
.map(|txn| txn.into())
.collect::<Vec<AnalyzedTransaction>>(),
self.parallel_block_executor.as_ref().unwrap().num_shards(),
);
parallel_block_executor
.execute_block(
Expand Down
10 changes: 4 additions & 6 deletions aptos-move/aptos-vm-logging/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,19 +143,17 @@ pub fn flush_speculative_logs(num_to_flush: usize) {
/// Clear speculative logs recorded for a specific transction, useful when transaction
/// execution fails validation and aborts - setting stage for the re-execution.
pub fn clear_speculative_txn_logs(txn_idx: usize) {
if speculation_disabled() {
return;
}
match &*BUFFERED_LOG_EVENTS.load() {
Some(log_events) => {
if let Err(e) = log_events.clear_txn_events(txn_idx) {
speculative_alert!("{:?}", e);
};
},
None => {
if !speculation_disabled() {
// Alert only if speculation is not disabled.
speculative_alert!(
"Clear all logs called on uninitialized speculative log storage"
);
}
speculative_alert!("Clear all logs called on uninitialized speculative log storage");
},
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use aptos_types::{
},
transaction::{analyzed_transaction::AnalyzedTransaction, TransactionOutput},
};
use aptos_vm_logging::disable_speculative_logging;
use futures::{channel::oneshot, executor::block_on};
use move_core_types::vm_status::VMStatus;
use std::sync::Arc;
Expand Down Expand Up @@ -64,6 +65,7 @@ impl<S: StateView + Sync + Send + 'static> ShardedExecutorService<S> {
concurrency_level: usize,
maybe_block_gas_limit: Option<u64>,
) -> Result<Vec<TransactionOutput>, VMStatus> {
disable_speculative_logging();
trace!(
"executing sub block for shard {} and round {}",
self.shard_id,
Expand Down
32 changes: 7 additions & 25 deletions aptos-move/aptos-vm/src/sharded_block_executor/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::{
sharded_block_executor::{executor_client::ExecutorClient, ShardedBlockExecutor},
AptosVM, VMExecutor,
};
use aptos_block_partitioner::BlockPartitionerConfig;
use aptos_block_partitioner::BlockPartitioner;
use aptos_crypto::hash::CryptoHash;
use aptos_language_e2e_tests::{
account::AccountData, common_transactions::peer_to_peer_txn, data_store::FakeDataStore,
Expand Down Expand Up @@ -103,8 +103,8 @@ pub fn compare_txn_outputs(
}

pub fn test_sharded_block_executor_no_conflict<E: ExecutorClient<FakeDataStore>>(
partitioner: Box<dyn BlockPartitioner>,
sharded_block_executor: ShardedBlockExecutor<FakeDataStore, E>,
partition_last_round: bool,
) {
let num_txns = 400;
let num_shards = 8;
Expand All @@ -113,13 +113,7 @@ pub fn test_sharded_block_executor_no_conflict<E: ExecutorClient<FakeDataStore>>
for _ in 0..num_txns {
transactions.push(generate_non_conflicting_p2p(&mut executor).0)
}
let partitioner = BlockPartitionerConfig::default()
.num_shards(num_shards)
.max_partitioning_rounds(2)
.cross_shard_dep_avoid_threshold(0.9)
.partition_last_round(partition_last_round)
.build();
let partitioned_txns = partitioner.partition(transactions.clone());
let partitioned_txns = partitioner.partition(transactions.clone(), num_shards);
let sharded_txn_output = sharded_block_executor
.execute_block(
Arc::new(executor.data_store().clone()),
Expand All @@ -138,9 +132,9 @@ pub fn test_sharded_block_executor_no_conflict<E: ExecutorClient<FakeDataStore>>
}

pub fn sharded_block_executor_with_conflict<E: ExecutorClient<FakeDataStore>>(
partitioner: Box<dyn BlockPartitioner>,
sharded_block_executor: ShardedBlockExecutor<FakeDataStore, E>,
concurrency: usize,
partition_last_round: bool,
) {
let num_txns = 800;
let num_shards = sharded_block_executor.num_shards();
Expand All @@ -165,13 +159,7 @@ pub fn sharded_block_executor_with_conflict<E: ExecutorClient<FakeDataStore>>(
}
}

let partitioner = BlockPartitionerConfig::default()
.num_shards(num_shards)
.max_partitioning_rounds(8)
.cross_shard_dep_avoid_threshold(0.9)
.partition_last_round(partition_last_round)
.build();
let partitioned_txns = partitioner.partition(transactions.clone());
let partitioned_txns = partitioner.partition(transactions.clone(), num_shards);

let execution_ordered_txns = PartitionedTransactions::flatten(partitioned_txns.clone())
.into_iter()
Expand All @@ -192,9 +180,9 @@ pub fn sharded_block_executor_with_conflict<E: ExecutorClient<FakeDataStore>>(
}

pub fn sharded_block_executor_with_random_transfers<E: ExecutorClient<FakeDataStore>>(
partitioner: Box<dyn BlockPartitioner>,
sharded_block_executor: ShardedBlockExecutor<FakeDataStore, E>,
concurrency: usize,
partition_last_round: bool,
) {
let mut rng = OsRng;
let max_accounts = 200;
Expand Down Expand Up @@ -222,13 +210,7 @@ pub fn sharded_block_executor_with_random_transfers<E: ExecutorClient<FakeDataSt
transactions.push(txn)
}

let partitioner = BlockPartitionerConfig::default()
.num_shards(num_shards)
.max_partitioning_rounds(8)
.cross_shard_dep_avoid_threshold(0.9)
.partition_last_round(partition_last_round)
.build();
let partitioned_txns = partitioner.partition(transactions.clone());
let partitioned_txns = partitioner.partition(transactions.clone(), num_shards);

let execution_ordered_txns = PartitionedTransactions::flatten(partitioned_txns.clone())
.into_iter()
Expand Down
168 changes: 144 additions & 24 deletions aptos-move/aptos-vm/src/sharded_block_executor/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ use crate::{
},
ShardedBlockExecutor,
};
use aptos_block_partitioner::{
sharded_block_partitioner::config::PartitionerV1Config, v2::config::PartitionerV2Config,
};
use aptos_state_view::StateView;
use rand::{rngs::OsRng, Rng};

Expand All @@ -21,67 +24,184 @@ fn setup_sharded_block_executor<S: StateView + Sync + Send + 'static>(

#[test]
fn test_sharded_block_executor_no_conflict() {
for last_round_partition in [true, false].iter() {
let sharded_block_executor = setup_sharded_block_executor(8, Some(4));
test_utils::test_sharded_block_executor_no_conflict(
sharded_block_executor,
*last_round_partition,
);
let num_shards = 8;
for last_round_partition in [true, false] {
let partitioner = PartitionerV1Config::new()
.num_shards(num_shards)
.max_partitioning_rounds(2)
.cross_shard_dep_avoid_threshold(0.9)
.partition_last_round(last_round_partition)
.build();
let sharded_block_executor = setup_sharded_block_executor(num_shards, Some(4));
test_utils::test_sharded_block_executor_no_conflict(partitioner, sharded_block_executor);
}
}

#[test]
// Sharded execution with cross shard conflict doesn't work for now because we don't have
// cross round dependency tracking yet.
fn test_sharded_block_executor_with_conflict_parallel() {
for last_round_partition in [true, false].iter() {
let sharded_block_executor = setup_sharded_block_executor(7, Some(4));
test_utils::sharded_block_executor_with_conflict(
sharded_block_executor,
4,
*last_round_partition,
);
let num_shards = 7;
for last_round_partition in [true, false] {
let partitioner = PartitionerV1Config::default()
.num_shards(num_shards)
.max_partitioning_rounds(8)
.cross_shard_dep_avoid_threshold(0.9)
.partition_last_round(last_round_partition)
.build();
let sharded_block_executor = setup_sharded_block_executor(num_shards, Some(4));
test_utils::sharded_block_executor_with_conflict(partitioner, sharded_block_executor, 4);
}
}

#[test]
fn test_sharded_block_executor_with_conflict_sequential() {
for last_round_partition in [true, false].iter() {
let sharded_block_executor = setup_sharded_block_executor(7, Some(1));
test_utils::sharded_block_executor_with_conflict(
sharded_block_executor,
1,
*last_round_partition,
)
let num_shards = 7;
for last_round_partition in [true, false] {
let partitioner = PartitionerV1Config::default()
.num_shards(num_shards)
.cross_shard_dep_avoid_threshold(0.9)
.partition_last_round(last_round_partition)
.build();
let sharded_block_executor = setup_sharded_block_executor(num_shards, Some(1));
test_utils::sharded_block_executor_with_conflict(partitioner, sharded_block_executor, 1)
}
}

#[test]
fn test_sharded_block_executor_with_random_transfers_parallel() {
for last_round_partition in [true, false].iter() {
for last_round_partition in [true, false] {
let mut rng = OsRng;
let max_num_shards = 32;
let num_shards = rng.gen_range(1, max_num_shards);
let sharded_block_executor = setup_sharded_block_executor(num_shards, Some(4));
let partitioner = PartitionerV1Config::default()
.num_shards(num_shards)
.cross_shard_dep_avoid_threshold(0.9)
.partition_last_round(last_round_partition)
.build();
test_utils::sharded_block_executor_with_random_transfers(
partitioner,
sharded_block_executor,
4,
*last_round_partition,
);
}
}

#[test]
fn test_sharded_block_executor_with_random_transfers_sequential() {
for last_round_partition in [true, false].iter() {
for last_round_partition in [true, false] {
let mut rng = OsRng;
let max_num_shards = 32;
let num_shards = rng.gen_range(1, max_num_shards);
let sharded_block_executor = setup_sharded_block_executor(num_shards, Some(1));
let partitioner = PartitionerV1Config::new()
.num_shards(num_shards)
.max_partitioning_rounds(8)
.cross_shard_dep_avoid_threshold(0.9)
.partition_last_round(last_round_partition)
.build();
test_utils::sharded_block_executor_with_random_transfers(
partitioner,
sharded_block_executor,
1,
)
}
}

#[test]
fn test_partitioner_v2_sharded_block_executor_no_conflict() {
for merge_discard in [false, true] {
let num_shards = 8;
let client = LocalExecutorService::setup_local_executor_shards(num_shards, Some(2));
let sharded_block_executor = ShardedBlockExecutor::new(client);
let partitioner = PartitionerV2Config::default()
.num_threads(2)
.max_partitioning_rounds(4)
.cross_shard_dep_avoid_threshold(0.9)
.dashmap_num_shards(64)
.partition_last_round(merge_discard)
.build();
test_utils::test_sharded_block_executor_no_conflict(partitioner, sharded_block_executor);
}
}

#[test]
// Sharded execution with cross shard conflict doesn't work for now because we don't have
// cross round dependency tracking yet.
fn test_partitioner_v2_sharded_block_executor_with_conflict_parallel() {
for merge_discard in [false, true] {
let num_shards = 7;
let client = LocalExecutorService::setup_local_executor_shards(num_shards, Some(4));
let sharded_block_executor = ShardedBlockExecutor::new(client);
let partitioner = PartitionerV2Config::default()
.num_threads(8)
.max_partitioning_rounds(4)
.cross_shard_dep_avoid_threshold(0.9)
.dashmap_num_shards(64)
.partition_last_round(merge_discard)
.build();
test_utils::sharded_block_executor_with_conflict(partitioner, sharded_block_executor, 4);
}
}

#[test]
fn test_partitioner_v2_sharded_block_executor_with_conflict_sequential() {
for merge_discard in [false, true] {
let num_shards = 7;
let client = LocalExecutorService::setup_local_executor_shards(num_shards, Some(1));
let sharded_block_executor = ShardedBlockExecutor::new(client);
let partitioner = PartitionerV2Config::default()
.num_threads(8)
.max_partitioning_rounds(4)
.cross_shard_dep_avoid_threshold(0.9)
.dashmap_num_shards(64)
.partition_last_round(merge_discard)
.build();
test_utils::sharded_block_executor_with_conflict(partitioner, sharded_block_executor, 1)
}
}

#[test]
fn test_partitioner_v2_sharded_block_executor_with_random_transfers_parallel() {
for merge_discard in [false, true] {
let num_shards = 3;
let client = LocalExecutorService::setup_local_executor_shards(num_shards, Some(4));
let sharded_block_executor = ShardedBlockExecutor::new(client);
let partitioner = PartitionerV2Config::default()
.num_threads(8)
.max_partitioning_rounds(4)
.cross_shard_dep_avoid_threshold(0.9)
.dashmap_num_shards(64)
.partition_last_round(merge_discard)
.build();
test_utils::sharded_block_executor_with_random_transfers(
partitioner,
sharded_block_executor,
4,
)
}
}

#[test]
fn test_partitioner_v2_sharded_block_executor_with_random_transfers_sequential() {
for merge_discard in [false, true] {
let mut rng = OsRng;
let max_num_shards = 32;
let num_shards = rng.gen_range(1, max_num_shards);
let client = LocalExecutorService::setup_local_executor_shards(num_shards, Some(1));
let sharded_block_executor = ShardedBlockExecutor::new(client);
let partitioner = PartitionerV2Config::default()
.num_threads(8)
.max_partitioning_rounds(4)
.cross_shard_dep_avoid_threshold(0.9)
.dashmap_num_shards(64)
.partition_last_round(merge_discard)
.build();
test_utils::sharded_block_executor_with_random_transfers(
partitioner,
sharded_block_executor,
1,
*last_round_partition,
)
}
}
Loading

0 comments on commit 3e78e8a

Please sign in to comment.