Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

A more efficient implementation of the multi-round block partitioner #9488

Merged
merged 99 commits into from
Aug 24, 2023
Merged
Show file tree
Hide file tree
Changes from 85 commits
Commits
Show all changes
99 commits
Select commit Hold shift + click to select a range
d2112ef
init
zjma Jul 28, 2023
06a998b
integrate into executor benchmark
zjma Jul 28, 2023
f29a145
renames
zjma Jul 28, 2023
f41cf0a
update
zjma Jul 28, 2023
8a53af6
more async drop
zjma Jul 28, 2023
71a92cc
update
zjma Jul 28, 2023
d4d722c
update
zjma Jul 28, 2023
6c18dcc
update
zjma Jul 28, 2023
af91bf7
update
zjma Jul 28, 2023
f11c942
update
zjma Jul 28, 2023
0c97ab8
update
zjma Jul 28, 2023
19c87c3
save a vec of storage location
zjma Jul 28, 2023
a28a21c
update
zjma Jul 28, 2023
867c38e
update
zjma Jul 28, 2023
daf76a9
update
zjma Jul 28, 2023
381f0cd
update
zjma Jul 28, 2023
743e649
update
zjma Jul 28, 2023
7c47145
update
zjma Jul 31, 2023
c8a2018
update
zjma Jul 31, 2023
7d036d7
update
zjma Jul 31, 2023
0eaf210
update
zjma Jul 31, 2023
3a223c8
update
zjma Jul 31, 2023
eeab18b
update
zjma Jul 31, 2023
6e3ae72
update
zjma Jul 31, 2023
da39762
exe bench partition timer
zjma Jul 31, 2023
80aa176
parallelize add_edges__init
zjma Jul 31, 2023
04273cb
update
zjma Jul 31, 2023
92ebba0
update
zjma Jul 31, 2023
acbfbcf
prealloc r/wsets_by_txn_id
zjma Jul 31, 2023
4add30e
update
zjma Jul 31, 2023
9981da4
speculative bug fix
zjma Jul 31, 2023
944c21c
add_edges bugfix
zjma Aug 1, 2023
2531f9e
give a large blockstm instance to the last shard
zjma Aug 1, 2023
e8d4c3b
par fix
zjma Aug 2, 2023
e949d05
more v2 tests of equality to unsharded result
zjma Aug 3, 2023
f3400aa
add_edges another impl
zjma Aug 3, 2023
2c24392
tweak
zjma Aug 3, 2023
736969b
determinism tests
zjma Aug 4, 2023
25f2bc3
refactoring
zjma Aug 4, 2023
a952d5e
fmt
zjma Aug 4, 2023
2e40765
tweak
zjma Aug 4, 2023
d92801a
Merge remote-tracking branch 'origin/main' into scalable-partitioner
zjma Aug 4, 2023
9b0a74b
Merge remote-tracking branch 'origin/main' into scalable-partitioner
zjma Aug 9, 2023
7a8fd4d
update
zjma Aug 9, 2023
7dfa022
v2 cmdline args for executor bench
zjma Aug 9, 2023
66b23dd
misc
zjma Aug 9, 2023
d47ca75
update tests
zjma Aug 9, 2023
ac813c4
clean up
zjma Aug 9, 2023
1fb3a90
bugfix
zjma Aug 9, 2023
5cfdf53
clean up
zjma Aug 9, 2023
37702ab
more iterations in determinism tests
zjma Aug 9, 2023
3e0c46c
clean up
zjma Aug 9, 2023
4dc1eee
clean up
zjma Aug 9, 2023
88bbb81
comments
zjma Aug 9, 2023
e3d20fc
API tweak
zjma Aug 9, 2023
9c8be4c
clean up
zjma Aug 9, 2023
82acfa3
doc update
zjma Aug 9, 2023
78a641d
clean up
zjma Aug 9, 2023
8e66752
clean up
zjma Aug 9, 2023
c34a065
rename
zjma Aug 9, 2023
7f9f7f2
polish partitionerv2 (#9619)
zjma Aug 11, 2023
e65671a
tweaks
zjma Aug 11, 2023
cab297c
tweaks
zjma Aug 11, 2023
a63ef84
timers back
zjma Aug 11, 2023
109b517
tweaks
zjma Aug 11, 2023
70c73ce
tweaks
zjma Aug 11, 2023
0a6d205
tweaks
zjma Aug 11, 2023
5a08413
refactor
zjma Aug 11, 2023
30154db
renames
zjma Aug 14, 2023
5b11b7d
update
zjma Aug 15, 2023
6756064
fmt
zjma Aug 15, 2023
ac8f0e2
doc
zjma Aug 15, 2023
0d90589
tweak
zjma Aug 15, 2023
c6e8fc4
tweak
zjma Aug 15, 2023
2699805
comments
zjma Aug 15, 2023
09bc0d1
comments
zjma Aug 15, 2023
fe218f2
tweak
zjma Aug 15, 2023
50426df
tweak
zjma Aug 15, 2023
67811b6
tweak
zjma Aug 15, 2023
b0f66b8
tweak
zjma Aug 15, 2023
e953e06
tweak
zjma Aug 15, 2023
843ec79
comment
zjma Aug 15, 2023
2b2110b
comment
zjma Aug 15, 2023
d0268bd
comment
zjma Aug 15, 2023
ca3d844
fmt
zjma Aug 15, 2023
b4eb4f4
renames
zjma Aug 21, 2023
691ab38
Merge remote-tracking branch 'origin/main' into scalable-partitioner
zjma Aug 21, 2023
7d86690
fmt
zjma Aug 21, 2023
6d55790
ut for start_txn_idxs
zjma Aug 23, 2023
f61deac
renames
zjma Aug 23, 2023
608905c
comments
zjma Aug 23, 2023
09055bd
comments
zjma Aug 23, 2023
664f53b
fmt
zjma Aug 23, 2023
3692f47
comments
zjma Aug 23, 2023
77304d5
comments
zjma Aug 23, 2023
6c82041
Merge remote-tracking branch 'origin/main' into scalable-partitioner
zjma Aug 23, 2023
7b677c6
fmt
zjma Aug 24, 2023
43a3791
fix tests
zjma Aug 24, 2023
98156ea
Merge remote-tracking branch 'origin/main' into scalable-partitioner
zjma Aug 24, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 4 additions & 3 deletions aptos-move/aptos-transaction-benchmarks/src/transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
use aptos_bitvec::BitVec;
use aptos_block_executor::txn_commit_hook::NoOpTransactionCommitHook;
use aptos_block_partitioner::{
sharded_block_partitioner::ShardedBlockPartitioner, BlockPartitionerConfig,
sharded_block_partitioner::config::PartitionerV1Config, BlockPartitioner,
};
use aptos_crypto::HashValue;
use aptos_language_e2e_tests::{
Expand Down Expand Up @@ -199,7 +199,7 @@ struct TransactionBenchState<S> {
account_universe: AccountUniverse,
parallel_block_executor:
Option<Arc<ShardedBlockExecutor<FakeDataStore, LocalExecutorClient<FakeDataStore>>>>,
block_partitioner: Option<ShardedBlockPartitioner>,
block_partitioner: Option<Box<dyn BlockPartitioner>>,
validator_set: ValidatorSet,
state_view: Arc<FakeDataStore>,
}
Expand Down Expand Up @@ -259,7 +259,7 @@ where
(
Some(parallel_block_executor),
Some(
BlockPartitionerConfig::default()
PartitionerV1Config::default()
.num_shards(num_executor_shards)
.max_partitioning_rounds(4)
.cross_shard_dep_avoid_threshold(0.9)
Expand Down Expand Up @@ -381,6 +381,7 @@ where
.into_iter()
.map(|txn| txn.into())
.collect::<Vec<AnalyzedTransaction>>(),
self.parallel_block_executor.as_ref().unwrap().num_shards(),
);
parallel_block_executor
.execute_block(
Expand Down
10 changes: 4 additions & 6 deletions aptos-move/aptos-vm-logging/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,19 +143,17 @@ pub fn flush_speculative_logs(num_to_flush: usize) {
/// Clear speculative logs recorded for a specific transction, useful when transaction
/// execution fails validation and aborts - setting stage for the re-execution.
pub fn clear_speculative_txn_logs(txn_idx: usize) {
if speculation_disabled() {
return;
}
match &*BUFFERED_LOG_EVENTS.load() {
Some(log_events) => {
if let Err(e) = log_events.clear_txn_events(txn_idx) {
speculative_alert!("{:?}", e);
};
},
None => {
if !speculation_disabled() {
// Alert only if speculation is not disabled.
speculative_alert!(
"Clear all logs called on uninitialized speculative log storage"
);
}
speculative_alert!("Clear all logs called on uninitialized speculative log storage");
},
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use aptos_types::{
},
transaction::{analyzed_transaction::AnalyzedTransaction, TransactionOutput},
};
use aptos_vm_logging::disable_speculative_logging;
use futures::{channel::oneshot, executor::block_on};
use move_core_types::vm_status::VMStatus;
use std::sync::Arc;
Expand Down Expand Up @@ -64,6 +65,7 @@ impl<S: StateView + Sync + Send + 'static> ShardedExecutorService<S> {
concurrency_level: usize,
maybe_block_gas_limit: Option<u64>,
) -> Result<Vec<TransactionOutput>, VMStatus> {
disable_speculative_logging();
trace!(
"executing sub block for shard {} and round {}",
self.shard_id,
Expand Down
32 changes: 7 additions & 25 deletions aptos-move/aptos-vm/src/sharded_block_executor/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::{
sharded_block_executor::{executor_client::ExecutorClient, ShardedBlockExecutor},
AptosVM, VMExecutor,
};
use aptos_block_partitioner::BlockPartitionerConfig;
use aptos_block_partitioner::BlockPartitioner;
use aptos_crypto::hash::CryptoHash;
use aptos_language_e2e_tests::{
account::AccountData, common_transactions::peer_to_peer_txn, data_store::FakeDataStore,
Expand Down Expand Up @@ -103,8 +103,8 @@ pub fn compare_txn_outputs(
}

pub fn test_sharded_block_executor_no_conflict<E: ExecutorClient<FakeDataStore>>(
partitioner: Box<dyn BlockPartitioner>,
sharded_block_executor: ShardedBlockExecutor<FakeDataStore, E>,
partition_last_round: bool,
) {
let num_txns = 400;
let num_shards = 8;
Expand All @@ -113,13 +113,7 @@ pub fn test_sharded_block_executor_no_conflict<E: ExecutorClient<FakeDataStore>>
for _ in 0..num_txns {
transactions.push(generate_non_conflicting_p2p(&mut executor).0)
}
let partitioner = BlockPartitionerConfig::default()
.num_shards(num_shards)
.max_partitioning_rounds(2)
.cross_shard_dep_avoid_threshold(0.9)
.partition_last_round(partition_last_round)
.build();
let partitioned_txns = partitioner.partition(transactions.clone());
let partitioned_txns = partitioner.partition(transactions.clone(), num_shards);
let sharded_txn_output = sharded_block_executor
.execute_block(
Arc::new(executor.data_store().clone()),
Expand All @@ -138,9 +132,9 @@ pub fn test_sharded_block_executor_no_conflict<E: ExecutorClient<FakeDataStore>>
}

pub fn sharded_block_executor_with_conflict<E: ExecutorClient<FakeDataStore>>(
partitioner: Box<dyn BlockPartitioner>,
sharded_block_executor: ShardedBlockExecutor<FakeDataStore, E>,
concurrency: usize,
partition_last_round: bool,
) {
let num_txns = 800;
let num_shards = sharded_block_executor.num_shards();
Expand All @@ -165,13 +159,7 @@ pub fn sharded_block_executor_with_conflict<E: ExecutorClient<FakeDataStore>>(
}
}

let partitioner = BlockPartitionerConfig::default()
.num_shards(num_shards)
.max_partitioning_rounds(8)
.cross_shard_dep_avoid_threshold(0.9)
.partition_last_round(partition_last_round)
.build();
let partitioned_txns = partitioner.partition(transactions.clone());
let partitioned_txns = partitioner.partition(transactions.clone(), num_shards);

let execution_ordered_txns = PartitionedTransactions::flatten(partitioned_txns.clone())
.into_iter()
Expand All @@ -192,9 +180,9 @@ pub fn sharded_block_executor_with_conflict<E: ExecutorClient<FakeDataStore>>(
}

pub fn sharded_block_executor_with_random_transfers<E: ExecutorClient<FakeDataStore>>(
partitioner: Box<dyn BlockPartitioner>,
sharded_block_executor: ShardedBlockExecutor<FakeDataStore, E>,
concurrency: usize,
partition_last_round: bool,
) {
let mut rng = OsRng;
let max_accounts = 200;
Expand Down Expand Up @@ -222,13 +210,7 @@ pub fn sharded_block_executor_with_random_transfers<E: ExecutorClient<FakeDataSt
transactions.push(txn)
}

let partitioner = BlockPartitionerConfig::default()
.num_shards(num_shards)
.max_partitioning_rounds(8)
.cross_shard_dep_avoid_threshold(0.9)
.partition_last_round(partition_last_round)
.build();
let partitioned_txns = partitioner.partition(transactions.clone());
let partitioned_txns = partitioner.partition(transactions.clone(), num_shards);

let execution_ordered_txns = PartitionedTransactions::flatten(partitioned_txns.clone())
.into_iter()
Expand Down
168 changes: 144 additions & 24 deletions aptos-move/aptos-vm/src/sharded_block_executor/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ use crate::{
},
ShardedBlockExecutor,
};
use aptos_block_partitioner::{
sharded_block_partitioner::config::PartitionerV1Config, v2::config::PartitionerV2Config,
manudhundi marked this conversation as resolved.
Show resolved Hide resolved
};
use aptos_state_view::StateView;
use rand::{rngs::OsRng, Rng};

Expand All @@ -21,67 +24,184 @@ fn setup_sharded_block_executor<S: StateView + Sync + Send + 'static>(

#[test]
fn test_sharded_block_executor_no_conflict() {
for last_round_partition in [true, false].iter() {
let sharded_block_executor = setup_sharded_block_executor(8, Some(4));
test_utils::test_sharded_block_executor_no_conflict(
sharded_block_executor,
*last_round_partition,
);
let num_shards = 8;
for last_round_partition in [true, false] {
let partitioner = PartitionerV1Config::new()
.num_shards(num_shards)
.max_partitioning_rounds(2)
.cross_shard_dep_avoid_threshold(0.9)
.partition_last_round(last_round_partition)
.build();
let sharded_block_executor = setup_sharded_block_executor(num_shards, Some(4));
test_utils::test_sharded_block_executor_no_conflict(partitioner, sharded_block_executor);
}
}

#[test]
// Sharded execution with cross shard conflict doesn't work for now because we don't have
// cross round dependency tracking yet.
fn test_sharded_block_executor_with_conflict_parallel() {
for last_round_partition in [true, false].iter() {
let sharded_block_executor = setup_sharded_block_executor(7, Some(4));
test_utils::sharded_block_executor_with_conflict(
sharded_block_executor,
4,
*last_round_partition,
);
let num_shards = 7;
for last_round_partition in [true, false] {
let partitioner = PartitionerV1Config::default()
.num_shards(num_shards)
.max_partitioning_rounds(8)
.cross_shard_dep_avoid_threshold(0.9)
.partition_last_round(last_round_partition)
.build();
let sharded_block_executor = setup_sharded_block_executor(num_shards, Some(4));
test_utils::sharded_block_executor_with_conflict(partitioner, sharded_block_executor, 4);
}
}

#[test]
fn test_sharded_block_executor_with_conflict_sequential() {
for last_round_partition in [true, false].iter() {
let sharded_block_executor = setup_sharded_block_executor(7, Some(1));
test_utils::sharded_block_executor_with_conflict(
sharded_block_executor,
1,
*last_round_partition,
)
let num_shards = 7;
for last_round_partition in [true, false] {
let partitioner = PartitionerV1Config::default()
.num_shards(8)
.cross_shard_dep_avoid_threshold(0.9)
.partition_last_round(last_round_partition)
.build();
let sharded_block_executor = setup_sharded_block_executor(num_shards, Some(1));
test_utils::sharded_block_executor_with_conflict(partitioner, sharded_block_executor, 1)
}
}

#[test]
fn test_sharded_block_executor_with_random_transfers_parallel() {
for last_round_partition in [true, false].iter() {
for last_round_partition in [true, false] {
let mut rng = OsRng;
let max_num_shards = 32;
let num_shards = rng.gen_range(1, max_num_shards);
let sharded_block_executor = setup_sharded_block_executor(num_shards, Some(4));
let partitioner = PartitionerV1Config::default()
.num_shards(8)
.cross_shard_dep_avoid_threshold(0.9)
.partition_last_round(last_round_partition)
.build();
test_utils::sharded_block_executor_with_random_transfers(
partitioner,
sharded_block_executor,
4,
*last_round_partition,
);
}
}

#[test]
fn test_sharded_block_executor_with_random_transfers_sequential() {
for last_round_partition in [true, false].iter() {
for last_round_partition in [true, false] {
let mut rng = OsRng;
let max_num_shards = 32;
let num_shards = rng.gen_range(1, max_num_shards);
let sharded_block_executor = setup_sharded_block_executor(num_shards, Some(1));
let partitioner = PartitionerV1Config::new()
.num_shards(num_shards)
.max_partitioning_rounds(8)
.cross_shard_dep_avoid_threshold(0.9)
.partition_last_round(last_round_partition)
.build();
test_utils::sharded_block_executor_with_random_transfers(
partitioner,
sharded_block_executor,
1,
)
}
}

#[test]
fn test_partitioner_v2_sharded_block_executor_no_conflict() {
zjma marked this conversation as resolved.
Show resolved Hide resolved
for merge_discard in [false, true] {
let num_shards = 8;
let client = LocalExecutorService::setup_local_executor_shards(num_shards, Some(2));
let sharded_block_executor = ShardedBlockExecutor::new(client);
let partitioner = PartitionerV2Config::default()
.num_threads(2)
.max_partitioning_rounds(4)
.cross_shard_dep_avoid_threshold(0.9)
.dashmap_num_shards(64)
.partition_last_round(merge_discard)
.build();
test_utils::test_sharded_block_executor_no_conflict(partitioner, sharded_block_executor);
}
}

#[test]
// Sharded execution with cross shard conflict doesn't work for now because we don't have
// cross round dependency tracking yet.
fn test_partitioner_v2_sharded_block_executor_with_conflict_parallel() {
for merge_discard in [false, true] {
let num_shards = 7;
let client = LocalExecutorService::setup_local_executor_shards(num_shards, Some(4));
let sharded_block_executor = ShardedBlockExecutor::new(client);
let partitioner = PartitionerV2Config::default()
.num_threads(8)
.max_partitioning_rounds(4)
.cross_shard_dep_avoid_threshold(0.9)
.dashmap_num_shards(64)
.partition_last_round(merge_discard)
.build();
test_utils::sharded_block_executor_with_conflict(partitioner, sharded_block_executor, 4);
}
}

#[test]
fn test_partitioner_v2_sharded_block_executor_with_conflict_sequential() {
for merge_discard in [false, true] {
let num_shards = 7;
let client = LocalExecutorService::setup_local_executor_shards(num_shards, Some(1));
let sharded_block_executor = ShardedBlockExecutor::new(client);
let partitioner = PartitionerV2Config::default()
.num_threads(8)
.max_partitioning_rounds(4)
.cross_shard_dep_avoid_threshold(0.9)
.dashmap_num_shards(64)
.partition_last_round(merge_discard)
.build();
test_utils::sharded_block_executor_with_conflict(partitioner, sharded_block_executor, 1)
}
}

#[test]
fn test_partitioner_v2_sharded_block_executor_with_random_transfers_parallel() {
for merge_discard in [false, true] {
let num_shards = 3;
let client = LocalExecutorService::setup_local_executor_shards(num_shards, Some(4));
let sharded_block_executor = ShardedBlockExecutor::new(client);
let partitioner = PartitionerV2Config::default()
.num_threads(8)
.max_partitioning_rounds(4)
.cross_shard_dep_avoid_threshold(0.9)
.dashmap_num_shards(64)
.partition_last_round(merge_discard)
.build();
test_utils::sharded_block_executor_with_random_transfers(
partitioner,
sharded_block_executor,
4,
)
}
}

#[test]
fn test_partitioner_v2_sharded_block_executor_with_random_transfers_sequential() {
for merge_discard in [false, true] {
let mut rng = OsRng;
let max_num_shards = 32;
let num_shards = rng.gen_range(1, max_num_shards);
let client = LocalExecutorService::setup_local_executor_shards(num_shards, Some(1));
let sharded_block_executor = ShardedBlockExecutor::new(client);
let partitioner = PartitionerV2Config::default()
.num_threads(8)
.max_partitioning_rounds(4)
.cross_shard_dep_avoid_threshold(0.9)
.dashmap_num_shards(64)
.partition_last_round(merge_discard)
.build();
test_utils::sharded_block_executor_with_random_transfers(
partitioner,
sharded_block_executor,
1,
*last_round_partition,
)
}
}
Loading
Loading