From 3e78e8a78b788f85bb84906a568e0d24242dda83 Mon Sep 17 00:00:00 2001 From: zhoujunma Date: Thu, 24 Aug 2023 01:52:16 -0700 Subject: [PATCH] A more efficient implementation of the multi-round block partitioner (#9488) * init * integrate into executor benchmark * renames * update * more async drop * update * update * update * update * update * update * save a vec of storage location * update * update * update * update * update * update * update * update * update * update * update * update * exe bench partition timer * parallelize add_edges__init * update * update * prealloc r/wsets_by_txn_id * update * speculative bug fix * add_edges bugfix * give a large blockstm instance to the last shard * par fix * more v2 tests of equality to unsharded result * add_edges another impl * tweak * determinism tests * refactoring * fmt * tweak * update * v2 cmdline args for executor bench * misc * update tests * clean up * bugfix * clean up * more iterations in determinism tests * clean up * clean up * comments * API tweak * clean up * doc update * clean up * clean up * rename * polish partitionerv2 (#9619) * save * a quick bench... * update * update * timer clean up * all states in session * update * update * update * update * parallel twds * make_txn_with_dep as a func * tweak * tweak * massive breakup * tweak * update * rename * update * update * tweaks * tweaks * timers back * tweaks * tweaks * tweaks * refactor * renames * update * fmt * doc * tweak * tweak * comments * comments * tweak * tweak * tweak * tweak * tweak * comment * comment * comment * fmt * renames * fmt * ut for start_txn_idxs * renames * comments * comments * fmt * comments * comments * fmt --- Cargo.lock | 3 + .../src/transactions.rs | 7 +- aptos-move/aptos-vm-logging/src/lib.rs | 10 +- .../sharded_executor_service.rs | 2 + .../src/sharded_block_executor/test_utils.rs | 32 +- .../src/sharded_block_executor/tests.rs | 168 ++++++++-- execution/block-partitioner/Cargo.toml | 12 + execution/block-partitioner/benches/v2.rs | 52 +++ execution/block-partitioner/src/lib.rs | 89 +++-- execution/block-partitioner/src/main.rs | 64 ++-- .../src/pre_partition/mod.rs | 33 ++ .../src/pre_partition/uniform_partitioner.rs | 59 ++++ .../src/sharded_block_partitioner/config.rs | 60 ++++ .../conflict_detector.rs | 23 +- .../src/sharded_block_partitioner/mod.rs | 35 +- execution/block-partitioner/src/test_utils.rs | 241 ++++++++++++++ .../block-partitioner/src/v2/build_edge.rs | 90 +++++ execution/block-partitioner/src/v2/config.rs | 61 ++++ .../src/v2/conflicting_txn_tracker.rs | 151 +++++++++ .../block-partitioner/src/v2/counters.rs | 16 + execution/block-partitioner/src/v2/init.rs | 59 ++++ execution/block-partitioner/src/v2/mod.rs | 165 ++++++++++ .../src/v2/partition_to_matrix.rs | 224 +++++++++++++ execution/block-partitioner/src/v2/state.rs | 311 ++++++++++++++++++ execution/block-partitioner/src/v2/types.rs | 88 +++++ .../src/block_partitioning.rs | 25 +- execution/executor-benchmark/src/lib.rs | 6 +- execution/executor-benchmark/src/main.rs | 34 ++ execution/executor-benchmark/src/pipeline.rs | 6 +- execution/executor-service/src/test_utils.rs | 6 +- types/src/block_executor/partitioner.rs | 73 +++- types/src/transaction/analyzed_transaction.rs | 4 +- 32 files changed, 2007 insertions(+), 202 deletions(-) create mode 100644 execution/block-partitioner/benches/v2.rs create mode 100644 execution/block-partitioner/src/pre_partition/mod.rs create mode 100644 execution/block-partitioner/src/pre_partition/uniform_partitioner.rs create mode 100644 execution/block-partitioner/src/sharded_block_partitioner/config.rs create mode 100644 execution/block-partitioner/src/v2/build_edge.rs create mode 100644 execution/block-partitioner/src/v2/config.rs create mode 100644 execution/block-partitioner/src/v2/conflicting_txn_tracker.rs create mode 100644 execution/block-partitioner/src/v2/counters.rs create mode 100644 execution/block-partitioner/src/v2/init.rs create mode 100644 execution/block-partitioner/src/v2/mod.rs create mode 100644 execution/block-partitioner/src/v2/partition_to_matrix.rs create mode 100644 execution/block-partitioner/src/v2/state.rs create mode 100644 execution/block-partitioner/src/v2/types.rs diff --git a/Cargo.lock b/Cargo.lock index 7e5439df690c1..00e7e1fa7f6aa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -599,12 +599,15 @@ dependencies = [ "aptos-types", "bcs 0.1.4", "clap 4.3.21", + "criterion", "dashmap", "itertools", + "jemallocator", "move-core-types", "once_cell", "rand 0.7.3", "rayon", + "serde 1.0.149", ] [[package]] diff --git a/aptos-move/aptos-transaction-benchmarks/src/transactions.rs b/aptos-move/aptos-transaction-benchmarks/src/transactions.rs index 255e7739d4a44..6aa326191b2af 100644 --- a/aptos-move/aptos-transaction-benchmarks/src/transactions.rs +++ b/aptos-move/aptos-transaction-benchmarks/src/transactions.rs @@ -5,7 +5,7 @@ use aptos_bitvec::BitVec; use aptos_block_executor::txn_commit_hook::NoOpTransactionCommitHook; use aptos_block_partitioner::{ - sharded_block_partitioner::ShardedBlockPartitioner, BlockPartitionerConfig, + sharded_block_partitioner::config::PartitionerV1Config, BlockPartitioner, }; use aptos_crypto::HashValue; use aptos_language_e2e_tests::{ @@ -199,7 +199,7 @@ struct TransactionBenchState { account_universe: AccountUniverse, parallel_block_executor: Option>>>, - block_partitioner: Option, + block_partitioner: Option>, validator_set: ValidatorSet, state_view: Arc, } @@ -259,7 +259,7 @@ where ( Some(parallel_block_executor), Some( - BlockPartitionerConfig::default() + PartitionerV1Config::default() .num_shards(num_executor_shards) .max_partitioning_rounds(4) .cross_shard_dep_avoid_threshold(0.9) @@ -381,6 +381,7 @@ where .into_iter() .map(|txn| txn.into()) .collect::>(), + self.parallel_block_executor.as_ref().unwrap().num_shards(), ); parallel_block_executor .execute_block( diff --git a/aptos-move/aptos-vm-logging/src/lib.rs b/aptos-move/aptos-vm-logging/src/lib.rs index e27a21b60362b..b757e578a02de 100644 --- a/aptos-move/aptos-vm-logging/src/lib.rs +++ b/aptos-move/aptos-vm-logging/src/lib.rs @@ -143,6 +143,9 @@ pub fn flush_speculative_logs(num_to_flush: usize) { /// Clear speculative logs recorded for a specific transction, useful when transaction /// execution fails validation and aborts - setting stage for the re-execution. pub fn clear_speculative_txn_logs(txn_idx: usize) { + if speculation_disabled() { + return; + } match &*BUFFERED_LOG_EVENTS.load() { Some(log_events) => { if let Err(e) = log_events.clear_txn_events(txn_idx) { @@ -150,12 +153,7 @@ pub fn clear_speculative_txn_logs(txn_idx: usize) { }; }, None => { - if !speculation_disabled() { - // Alert only if speculation is not disabled. - speculative_alert!( - "Clear all logs called on uninitialized speculative log storage" - ); - } + speculative_alert!("Clear all logs called on uninitialized speculative log storage"); }, } } diff --git a/aptos-move/aptos-vm/src/sharded_block_executor/sharded_executor_service.rs b/aptos-move/aptos-vm/src/sharded_block_executor/sharded_executor_service.rs index c0944f866753d..0356ad5c7dcd9 100644 --- a/aptos-move/aptos-vm/src/sharded_block_executor/sharded_executor_service.rs +++ b/aptos-move/aptos-vm/src/sharded_block_executor/sharded_executor_service.rs @@ -19,6 +19,7 @@ use aptos_types::{ }, transaction::{analyzed_transaction::AnalyzedTransaction, TransactionOutput}, }; +use aptos_vm_logging::disable_speculative_logging; use futures::{channel::oneshot, executor::block_on}; use move_core_types::vm_status::VMStatus; use std::sync::Arc; @@ -64,6 +65,7 @@ impl ShardedExecutorService { concurrency_level: usize, maybe_block_gas_limit: Option, ) -> Result, VMStatus> { + disable_speculative_logging(); trace!( "executing sub block for shard {} and round {}", self.shard_id, diff --git a/aptos-move/aptos-vm/src/sharded_block_executor/test_utils.rs b/aptos-move/aptos-vm/src/sharded_block_executor/test_utils.rs index e66e272007870..551e5434cc053 100644 --- a/aptos-move/aptos-vm/src/sharded_block_executor/test_utils.rs +++ b/aptos-move/aptos-vm/src/sharded_block_executor/test_utils.rs @@ -4,7 +4,7 @@ use crate::{ sharded_block_executor::{executor_client::ExecutorClient, ShardedBlockExecutor}, AptosVM, VMExecutor, }; -use aptos_block_partitioner::BlockPartitionerConfig; +use aptos_block_partitioner::BlockPartitioner; use aptos_crypto::hash::CryptoHash; use aptos_language_e2e_tests::{ account::AccountData, common_transactions::peer_to_peer_txn, data_store::FakeDataStore, @@ -103,8 +103,8 @@ pub fn compare_txn_outputs( } pub fn test_sharded_block_executor_no_conflict>( + partitioner: Box, sharded_block_executor: ShardedBlockExecutor, - partition_last_round: bool, ) { let num_txns = 400; let num_shards = 8; @@ -113,13 +113,7 @@ pub fn test_sharded_block_executor_no_conflict> for _ in 0..num_txns { transactions.push(generate_non_conflicting_p2p(&mut executor).0) } - let partitioner = BlockPartitionerConfig::default() - .num_shards(num_shards) - .max_partitioning_rounds(2) - .cross_shard_dep_avoid_threshold(0.9) - .partition_last_round(partition_last_round) - .build(); - let partitioned_txns = partitioner.partition(transactions.clone()); + let partitioned_txns = partitioner.partition(transactions.clone(), num_shards); let sharded_txn_output = sharded_block_executor .execute_block( Arc::new(executor.data_store().clone()), @@ -138,9 +132,9 @@ pub fn test_sharded_block_executor_no_conflict> } pub fn sharded_block_executor_with_conflict>( + partitioner: Box, sharded_block_executor: ShardedBlockExecutor, concurrency: usize, - partition_last_round: bool, ) { let num_txns = 800; let num_shards = sharded_block_executor.num_shards(); @@ -165,13 +159,7 @@ pub fn sharded_block_executor_with_conflict>( } } - let partitioner = BlockPartitionerConfig::default() - .num_shards(num_shards) - .max_partitioning_rounds(8) - .cross_shard_dep_avoid_threshold(0.9) - .partition_last_round(partition_last_round) - .build(); - let partitioned_txns = partitioner.partition(transactions.clone()); + let partitioned_txns = partitioner.partition(transactions.clone(), num_shards); let execution_ordered_txns = PartitionedTransactions::flatten(partitioned_txns.clone()) .into_iter() @@ -192,9 +180,9 @@ pub fn sharded_block_executor_with_conflict>( } pub fn sharded_block_executor_with_random_transfers>( + partitioner: Box, sharded_block_executor: ShardedBlockExecutor, concurrency: usize, - partition_last_round: bool, ) { let mut rng = OsRng; let max_accounts = 200; @@ -222,13 +210,7 @@ pub fn sharded_block_executor_with_random_transfers( #[test] fn test_sharded_block_executor_no_conflict() { - for last_round_partition in [true, false].iter() { - let sharded_block_executor = setup_sharded_block_executor(8, Some(4)); - test_utils::test_sharded_block_executor_no_conflict( - sharded_block_executor, - *last_round_partition, - ); + let num_shards = 8; + for last_round_partition in [true, false] { + let partitioner = PartitionerV1Config::new() + .num_shards(num_shards) + .max_partitioning_rounds(2) + .cross_shard_dep_avoid_threshold(0.9) + .partition_last_round(last_round_partition) + .build(); + let sharded_block_executor = setup_sharded_block_executor(num_shards, Some(4)); + test_utils::test_sharded_block_executor_no_conflict(partitioner, sharded_block_executor); } } @@ -34,54 +41,167 @@ fn test_sharded_block_executor_no_conflict() { // Sharded execution with cross shard conflict doesn't work for now because we don't have // cross round dependency tracking yet. fn test_sharded_block_executor_with_conflict_parallel() { - for last_round_partition in [true, false].iter() { - let sharded_block_executor = setup_sharded_block_executor(7, Some(4)); - test_utils::sharded_block_executor_with_conflict( - sharded_block_executor, - 4, - *last_round_partition, - ); + let num_shards = 7; + for last_round_partition in [true, false] { + let partitioner = PartitionerV1Config::default() + .num_shards(num_shards) + .max_partitioning_rounds(8) + .cross_shard_dep_avoid_threshold(0.9) + .partition_last_round(last_round_partition) + .build(); + let sharded_block_executor = setup_sharded_block_executor(num_shards, Some(4)); + test_utils::sharded_block_executor_with_conflict(partitioner, sharded_block_executor, 4); } } #[test] fn test_sharded_block_executor_with_conflict_sequential() { - for last_round_partition in [true, false].iter() { - let sharded_block_executor = setup_sharded_block_executor(7, Some(1)); - test_utils::sharded_block_executor_with_conflict( - sharded_block_executor, - 1, - *last_round_partition, - ) + let num_shards = 7; + for last_round_partition in [true, false] { + let partitioner = PartitionerV1Config::default() + .num_shards(num_shards) + .cross_shard_dep_avoid_threshold(0.9) + .partition_last_round(last_round_partition) + .build(); + let sharded_block_executor = setup_sharded_block_executor(num_shards, Some(1)); + test_utils::sharded_block_executor_with_conflict(partitioner, sharded_block_executor, 1) } } #[test] fn test_sharded_block_executor_with_random_transfers_parallel() { - for last_round_partition in [true, false].iter() { + for last_round_partition in [true, false] { let mut rng = OsRng; let max_num_shards = 32; let num_shards = rng.gen_range(1, max_num_shards); let sharded_block_executor = setup_sharded_block_executor(num_shards, Some(4)); + let partitioner = PartitionerV1Config::default() + .num_shards(num_shards) + .cross_shard_dep_avoid_threshold(0.9) + .partition_last_round(last_round_partition) + .build(); test_utils::sharded_block_executor_with_random_transfers( + partitioner, sharded_block_executor, 4, - *last_round_partition, ); } } #[test] fn test_sharded_block_executor_with_random_transfers_sequential() { - for last_round_partition in [true, false].iter() { + for last_round_partition in [true, false] { let mut rng = OsRng; let max_num_shards = 32; let num_shards = rng.gen_range(1, max_num_shards); let sharded_block_executor = setup_sharded_block_executor(num_shards, Some(1)); + let partitioner = PartitionerV1Config::new() + .num_shards(num_shards) + .max_partitioning_rounds(8) + .cross_shard_dep_avoid_threshold(0.9) + .partition_last_round(last_round_partition) + .build(); + test_utils::sharded_block_executor_with_random_transfers( + partitioner, + sharded_block_executor, + 1, + ) + } +} + +#[test] +fn test_partitioner_v2_sharded_block_executor_no_conflict() { + for merge_discard in [false, true] { + let num_shards = 8; + let client = LocalExecutorService::setup_local_executor_shards(num_shards, Some(2)); + let sharded_block_executor = ShardedBlockExecutor::new(client); + let partitioner = PartitionerV2Config::default() + .num_threads(2) + .max_partitioning_rounds(4) + .cross_shard_dep_avoid_threshold(0.9) + .dashmap_num_shards(64) + .partition_last_round(merge_discard) + .build(); + test_utils::test_sharded_block_executor_no_conflict(partitioner, sharded_block_executor); + } +} + +#[test] +// Sharded execution with cross shard conflict doesn't work for now because we don't have +// cross round dependency tracking yet. +fn test_partitioner_v2_sharded_block_executor_with_conflict_parallel() { + for merge_discard in [false, true] { + let num_shards = 7; + let client = LocalExecutorService::setup_local_executor_shards(num_shards, Some(4)); + let sharded_block_executor = ShardedBlockExecutor::new(client); + let partitioner = PartitionerV2Config::default() + .num_threads(8) + .max_partitioning_rounds(4) + .cross_shard_dep_avoid_threshold(0.9) + .dashmap_num_shards(64) + .partition_last_round(merge_discard) + .build(); + test_utils::sharded_block_executor_with_conflict(partitioner, sharded_block_executor, 4); + } +} + +#[test] +fn test_partitioner_v2_sharded_block_executor_with_conflict_sequential() { + for merge_discard in [false, true] { + let num_shards = 7; + let client = LocalExecutorService::setup_local_executor_shards(num_shards, Some(1)); + let sharded_block_executor = ShardedBlockExecutor::new(client); + let partitioner = PartitionerV2Config::default() + .num_threads(8) + .max_partitioning_rounds(4) + .cross_shard_dep_avoid_threshold(0.9) + .dashmap_num_shards(64) + .partition_last_round(merge_discard) + .build(); + test_utils::sharded_block_executor_with_conflict(partitioner, sharded_block_executor, 1) + } +} + +#[test] +fn test_partitioner_v2_sharded_block_executor_with_random_transfers_parallel() { + for merge_discard in [false, true] { + let num_shards = 3; + let client = LocalExecutorService::setup_local_executor_shards(num_shards, Some(4)); + let sharded_block_executor = ShardedBlockExecutor::new(client); + let partitioner = PartitionerV2Config::default() + .num_threads(8) + .max_partitioning_rounds(4) + .cross_shard_dep_avoid_threshold(0.9) + .dashmap_num_shards(64) + .partition_last_round(merge_discard) + .build(); + test_utils::sharded_block_executor_with_random_transfers( + partitioner, + sharded_block_executor, + 4, + ) + } +} + +#[test] +fn test_partitioner_v2_sharded_block_executor_with_random_transfers_sequential() { + for merge_discard in [false, true] { + let mut rng = OsRng; + let max_num_shards = 32; + let num_shards = rng.gen_range(1, max_num_shards); + let client = LocalExecutorService::setup_local_executor_shards(num_shards, Some(1)); + let sharded_block_executor = ShardedBlockExecutor::new(client); + let partitioner = PartitionerV2Config::default() + .num_threads(8) + .max_partitioning_rounds(4) + .cross_shard_dep_avoid_threshold(0.9) + .dashmap_num_shards(64) + .partition_last_round(merge_discard) + .build(); test_utils::sharded_block_executor_with_random_transfers( + partitioner, sharded_block_executor, 1, - *last_round_partition, ) } } diff --git a/execution/block-partitioner/Cargo.toml b/execution/block-partitioner/Cargo.toml index c8d022fa52401..353dae1739889 100644 --- a/execution/block-partitioner/Cargo.toml +++ b/execution/block-partitioner/Cargo.toml @@ -27,6 +27,18 @@ move-core-types = { workspace = true } once_cell = { workspace = true } rand = { workspace = true } rayon = { workspace = true } +serde = { workspace = true } + +[dev-dependencies] +criterion = { workspace = true } + +[target.'cfg(unix)'.dependencies] +jemallocator = { workspace = true } [features] default = [] + +[[bench]] +name = "v2" +harness = false + diff --git a/execution/block-partitioner/benches/v2.rs b/execution/block-partitioner/benches/v2.rs new file mode 100644 index 0000000000000..58f2d19f2ae38 --- /dev/null +++ b/execution/block-partitioner/benches/v2.rs @@ -0,0 +1,52 @@ +// Copyright © Aptos Foundation + +// Copyright (c) Aptos +// SPDX-License-Identifier: Apache-2.0 + +#[macro_use] +extern crate criterion; + +use aptos_block_partitioner::{test_utils::P2PBlockGenerator, v2::PartitionerV2, BlockPartitioner}; +use criterion::Criterion; +use rand::thread_rng; + +fn bench_group(c: &mut Criterion) { + let mut group = c.benchmark_group("v2"); + + let num_accounts = 10000; + let block_size = 1000; + let num_shards = 5; + + let num_threads = 8; + let num_rounds_limit = 4; + let avoid_pct = 0.9; + let dashmap_num_shards = 64; + let merge_discards = true; + + let mut rng = thread_rng(); + let block_gen = P2PBlockGenerator::new(num_accounts); + let partitioner = PartitionerV2::new( + num_threads, + num_rounds_limit, + avoid_pct, + dashmap_num_shards, + merge_discards, + ); + group.bench_function(format!("acc={num_accounts},blk={block_size},shd={num_shards}/thr={num_threads},rnd={num_rounds_limit},avd={avoid_pct},mds={merge_discards}"), move |b| { + b.iter_with_setup( + || { + block_gen.rand_block(&mut rng, block_size) + }, + |txns| { + let _txns = partitioner.partition(txns, num_shards); + }, + ) + }); + group.finish(); +} + +criterion_group!( + name = v2_benches; + config = Criterion::default(); //.measurement_time(Duration::from_secs(100)); + targets = bench_group); +criterion_main!(v2_benches); diff --git a/execution/block-partitioner/src/lib.rs b/execution/block-partitioner/src/lib.rs index 1fc28664f4f9d..302382eb7544a 100644 --- a/execution/block-partitioner/src/lib.rs +++ b/execution/block-partitioner/src/lib.rs @@ -2,61 +2,60 @@ // Parts of the project are originally copyright © Meta Platforms, Inc. // SPDX-License-Identifier: Apache-2.0 -use crate::sharded_block_partitioner::ShardedBlockPartitioner; -use aptos_types::block_executor::partitioner::RoundId; +pub mod sharded_block_partitioner; //TODO: maybe v1 is a better name. +pub mod v2; -pub mod sharded_block_partitioner; pub mod test_utils; -pub struct BlockPartitionerConfig { - num_shards: usize, - max_partitioning_rounds: RoundId, - cross_shard_dep_avoid_threshold: f32, - partition_last_round: bool, +use aptos_types::{ + block_executor::partitioner::{PartitionedTransactions, ShardId}, + transaction::analyzed_transaction::{AnalyzedTransaction, StorageLocation}, +}; +use move_core_types::account_address::AccountAddress; +use sharded_block_partitioner::config::PartitionerV1Config; +use std::{ + collections::hash_map::DefaultHasher, + hash::{Hash, Hasher}, +}; +use v2::config::PartitionerV2Config; +mod pre_partition; + +pub trait BlockPartitioner: Send { + fn partition( + &self, + transactions: Vec, + num_shards: usize, //TODO: rethink about whether this is needed as part of `BlockPartitioner` API. + ) -> PartitionedTransactions; } -impl BlockPartitionerConfig { - pub fn new() -> Self { - BlockPartitionerConfig { - num_shards: 0, - max_partitioning_rounds: 3, - cross_shard_dep_avoid_threshold: 0.9, - partition_last_round: false, - } - } - - pub fn num_shards(mut self, num_shards: usize) -> Self { - self.num_shards = num_shards; - self - } +/// When multiple transactions access the same storage location, +/// use this function to pick a shard as the anchor/leader and resolve conflicts. +/// Used by `ShardedBlockPartitioner` and `V2Partitioner`. +fn get_anchor_shard_id(storage_location: &StorageLocation, num_shards: usize) -> ShardId { + let mut hasher = DefaultHasher::new(); + storage_location.hash(&mut hasher); + (hasher.finish() % num_shards as u64) as usize +} - pub fn max_partitioning_rounds(mut self, max_partitioning_rounds: RoundId) -> Self { - self.max_partitioning_rounds = max_partitioning_rounds; - self - } +type Sender = Option; - pub fn cross_shard_dep_avoid_threshold(mut self, threshold: f32) -> Self { - self.cross_shard_dep_avoid_threshold = threshold; - self - } - - pub fn partition_last_round(mut self, partition_last_round: bool) -> Self { - self.partition_last_round = partition_last_round; - self - } +#[derive(Clone, Copy, Debug)] +pub enum PartitionerConfig { + V1(PartitionerV1Config), + V2(PartitionerV2Config), +} - pub fn build(self) -> ShardedBlockPartitioner { - ShardedBlockPartitioner::new( - self.num_shards, - self.max_partitioning_rounds, - self.cross_shard_dep_avoid_threshold, - self.partition_last_round, - ) +impl Default for PartitionerConfig { + fn default() -> Self { + PartitionerConfig::V2(PartitionerV2Config::default()) } } -impl Default for BlockPartitionerConfig { - fn default() -> Self { - Self::new() +impl PartitionerConfig { + pub fn build(self) -> Box { + match self { + PartitionerConfig::V1(c) => c.build(), + PartitionerConfig::V2(c) => c.build(), + } } } diff --git a/execution/block-partitioner/src/main.rs b/execution/block-partitioner/src/main.rs index 74247304215d9..d5723a89aeb3b 100644 --- a/execution/block-partitioner/src/main.rs +++ b/execution/block-partitioner/src/main.rs @@ -1,66 +1,50 @@ // Copyright © Aptos Foundation -use aptos_block_partitioner::{ - test_utils::{create_signed_p2p_transaction, generate_test_account, TestAccount}, - BlockPartitionerConfig, -}; -use aptos_types::transaction::analyzed_transaction::AnalyzedTransaction; +use aptos_block_partitioner::{test_utils::P2PBlockGenerator, v2::config::PartitionerV2Config}; +use aptos_logger::info; use clap::Parser; -use rand::rngs::OsRng; -use rayon::iter::{IntoParallelIterator, ParallelIterator}; -use std::{sync::Mutex, time::Instant}; +use rand::thread_rng; +use std::time::Instant; + +#[cfg(unix)] +#[global_allocator] +static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc; #[derive(Debug, Parser)] struct Args { - #[clap(long, default_value_t = 2000000)] + #[clap(long, default_value_t = 1000000)] pub num_accounts: usize, #[clap(long, default_value_t = 100000)] pub block_size: usize, - #[clap(long, default_value_t = 10)] + #[clap(long, default_value_t = 9)] pub num_blocks: usize, - #[clap(long, default_value_t = 12)] + #[clap(long, default_value_t = 48)] pub num_shards: usize, } fn main() { - println!("Starting the block partitioning benchmark"); + aptos_logger::Logger::new().init(); + info!("Starting the block partitioning benchmark"); let args = Args::parse(); - let num_accounts = args.num_accounts; - println!("Creating {} accounts", num_accounts); - let accounts: Vec> = (0..num_accounts) - .into_par_iter() - .map(|_i| Mutex::new(generate_test_account())) - .collect(); - println!("Created {} accounts", num_accounts); - println!("Creating {} transactions", args.block_size); - let transactions: Vec = (0..args.block_size) - .map(|_| { - // randomly select a sender and receiver from accounts - let mut rng = OsRng; - - let indices = rand::seq::index::sample(&mut rng, num_accounts, 2); - let receiver = accounts[indices.index(1)].lock().unwrap(); - let mut sender = accounts[indices.index(0)].lock().unwrap(); - create_signed_p2p_transaction(&mut sender, vec![&receiver]).remove(0) - }) - .collect(); - - let partitioner = BlockPartitionerConfig::default() - .num_shards(args.num_shards) - .max_partitioning_rounds(2) + let block_gen = P2PBlockGenerator::new(args.num_accounts); + let partitioner = PartitionerV2Config::default() + .max_partitioning_rounds(4) + .num_threads(8) .cross_shard_dep_avoid_threshold(0.9) - .partition_last_round(true) + .dashmap_num_shards(64) + .partition_last_round(false) .build(); + let mut rng = thread_rng(); for _ in 0..args.num_blocks { - let transactions = transactions.clone(); - println!("Starting to partition"); + let transactions = block_gen.rand_block(&mut rng, args.block_size); + info!("Starting to partition"); let now = Instant::now(); - partitioner.partition(transactions); + let _partitioned = partitioner.partition(transactions.clone(), args.num_shards); let elapsed = now.elapsed(); - println!("Time taken to partition: {:?}", elapsed); + info!("Time taken to partition: {:?}", elapsed); } } diff --git a/execution/block-partitioner/src/pre_partition/mod.rs b/execution/block-partitioner/src/pre_partition/mod.rs new file mode 100644 index 0000000000000..cbadae0b8d1d3 --- /dev/null +++ b/execution/block-partitioner/src/pre_partition/mod.rs @@ -0,0 +1,33 @@ +// Copyright © Aptos Foundation + +use crate::v2::types::PrePartitionedTxnIdx; +use aptos_types::transaction::analyzed_transaction::AnalyzedTransaction; + +pub trait PrePartitioner: Send { + /// The initial partitioning phase for `ShardedBlockPartitioner`/`PartitionerV2` to divide a block into `num_shards` sub-blocks. + /// See `PartitionerV2::partition()` for more details. + fn pre_partition( + &self, + transactions: &[AnalyzedTransaction], + num_shards: usize, + ) -> Vec>; +} + +pub mod uniform_partitioner; + +pub fn start_txn_idxs( + pre_partitioned: &Vec>, +) -> Vec { + let num_shards = pre_partitioned.len(); + let mut ret: Vec = vec![0; num_shards]; + for shard_id in 1..num_shards { + ret[shard_id] = ret[shard_id - 1] + pre_partitioned[shard_id - 1].len(); + } + ret +} + +#[test] +fn test_start_txn_idxs() { + let pre_partitioned = vec![vec![0, 1], vec![2, 3, 4], vec![5, 6, 7, 8]]; + assert_eq!(vec![0, 2, 5], start_txn_idxs(&pre_partitioned)); +} diff --git a/execution/block-partitioner/src/pre_partition/uniform_partitioner.rs b/execution/block-partitioner/src/pre_partition/uniform_partitioner.rs new file mode 100644 index 0000000000000..941caeb385071 --- /dev/null +++ b/execution/block-partitioner/src/pre_partition/uniform_partitioner.rs @@ -0,0 +1,59 @@ +// Copyright © Aptos Foundation + +#[cfg(test)] +use crate::test_utils::P2PBlockGenerator; +use crate::{pre_partition::PrePartitioner, v2::types::PrePartitionedTxnIdx}; +use aptos_types::transaction::analyzed_transaction::AnalyzedTransaction; +#[cfg(test)] +use rand::thread_rng; + +/// Evenly divide txns. Example: processing txns 0..11 results in [[0,1,2,3],[4,5,6,7],[8,9,10]]. +pub struct UniformPartitioner {} + +impl PrePartitioner for UniformPartitioner { + fn pre_partition( + &self, + transactions: &[AnalyzedTransaction], + num_shards: usize, + ) -> Vec> { + let num_items = transactions.len(); + let num_chunks = num_shards; + let num_chunks_with_overflow = num_items % num_chunks; + let chunk_size = num_items / num_chunks; + let mut ret = Vec::with_capacity(num_chunks); + let mut next_chunk_start = 0; + for chunk_id in 0..num_chunks { + let extra = if chunk_id < num_chunks_with_overflow { + 1 + } else { + 0 + }; + let next_chunk_end = next_chunk_start + chunk_size + extra; + let chunk: Vec = (next_chunk_start..next_chunk_end).collect(); + next_chunk_start = next_chunk_end; + ret.push(chunk); + } + ret + } +} + +#[test] +fn test_uniform_partitioner() { + let block_gen = P2PBlockGenerator::new(10); + let mut rng = thread_rng(); + let txns = block_gen.rand_block(&mut rng, 18); + let partitioner = UniformPartitioner {}; + let actual = partitioner.pre_partition(txns.as_slice(), 5); + assert_eq!( + vec![4, 4, 4, 3, 3], + actual.iter().map(|v| v.len()).collect::>() + ); + assert_eq!((0..18).collect::>(), actual.concat()); + + let actual = partitioner.pre_partition(txns.as_slice(), 3); + assert_eq!( + vec![6, 6, 6], + actual.iter().map(|v| v.len()).collect::>() + ); + assert_eq!((0..18).collect::>(), actual.concat()); +} diff --git a/execution/block-partitioner/src/sharded_block_partitioner/config.rs b/execution/block-partitioner/src/sharded_block_partitioner/config.rs new file mode 100644 index 0000000000000..93d3ab772b45a --- /dev/null +++ b/execution/block-partitioner/src/sharded_block_partitioner/config.rs @@ -0,0 +1,60 @@ +// Copyright © Aptos Foundation + +use crate::{sharded_block_partitioner::ShardedBlockPartitioner, BlockPartitioner}; +use aptos_types::block_executor::partitioner::RoundId; + +/// The configuration for `aptos_block_partitioner::sharded_block_partitioner::ShardedBlockPartitioner`, +/// which is also referred to as `V1` in executor-benchmark after `aptos_block_partitioner::v2` is added. +#[derive(Clone, Copy, Debug)] +pub struct PartitionerV1Config { + pub num_shards: usize, + pub max_partitioning_rounds: RoundId, + pub cross_shard_dep_avoid_threshold: f32, + pub partition_last_round: bool, +} + +impl PartitionerV1Config { + pub fn new() -> Self { + PartitionerV1Config { + num_shards: 0, + max_partitioning_rounds: 3, + cross_shard_dep_avoid_threshold: 0.9, + partition_last_round: false, + } + } + + pub fn num_shards(mut self, num_shards: usize) -> Self { + self.num_shards = num_shards; + self + } + + pub fn max_partitioning_rounds(mut self, max_partitioning_rounds: RoundId) -> Self { + self.max_partitioning_rounds = max_partitioning_rounds; + self + } + + pub fn cross_shard_dep_avoid_threshold(mut self, threshold: f32) -> Self { + self.cross_shard_dep_avoid_threshold = threshold; + self + } + + pub fn partition_last_round(mut self, partition_last_round: bool) -> Self { + self.partition_last_round = partition_last_round; + self + } + + pub fn build(self) -> Box { + Box::new(ShardedBlockPartitioner::new( + self.num_shards, + self.max_partitioning_rounds, + self.cross_shard_dep_avoid_threshold, + self.partition_last_round, + )) + } +} + +impl Default for PartitionerV1Config { + fn default() -> Self { + Self::new() + } +} diff --git a/execution/block-partitioner/src/sharded_block_partitioner/conflict_detector.rs b/execution/block-partitioner/src/sharded_block_partitioner/conflict_detector.rs index d0b9c45912e7c..cfe4e11f1d7c9 100644 --- a/execution/block-partitioner/src/sharded_block_partitioner/conflict_detector.rs +++ b/execution/block-partitioner/src/sharded_block_partitioner/conflict_detector.rs @@ -1,18 +1,17 @@ // Copyright © Aptos Foundation -use crate::sharded_block_partitioner::dependency_analysis::{RWSet, WriteSetWithTxnIndex}; +use crate::{ + get_anchor_shard_id, + sharded_block_partitioner::dependency_analysis::{RWSet, WriteSetWithTxnIndex}, +}; use aptos_types::{ block_executor::partitioner::{ CrossShardDependencies, RoundId, ShardId, ShardedTxnIndex, SubBlock, TransactionWithDependencies, TxnIndex, }, - transaction::analyzed_transaction::{AnalyzedTransaction, StorageLocation}, -}; -use std::{ - collections::{hash_map::DefaultHasher, HashSet}, - hash::{Hash, Hasher}, - sync::Arc, + transaction::analyzed_transaction::AnalyzedTransaction, }; +use std::{collections::HashSet, sync::Arc}; pub struct CrossShardConflictDetector { shard_id: ShardId, @@ -171,12 +170,6 @@ impl CrossShardConflictDetector { false } - fn get_anchor_shard_id(&self, storage_location: &StorageLocation) -> ShardId { - let mut hasher = DefaultHasher::new(); - storage_location.hash(&mut hasher); - (hasher.finish() % self.num_shards as u64) as usize - } - fn check_for_read_conflict( &self, current_shard_id: ShardId, @@ -188,7 +181,7 @@ impl CrossShardConflictDetector { // During conflict resolution, shards starts scanning from the anchor shard id and // first shard id that has taken a read/write lock on this storage location is the owner of this storage location. // Please note another alternative is scan from first shard id, but this will result in non-uniform load across shards in case of conflicts. - let anchor_shard_id = self.get_anchor_shard_id(read_location); + let anchor_shard_id = get_anchor_shard_id(read_location, self.num_shards); for offset in 0..self.num_shards { let shard_id = (anchor_shard_id + offset) % self.num_shards; // Ignore if this is from the same shard @@ -211,7 +204,7 @@ impl CrossShardConflictDetector { cross_shard_rw_set: &[RWSet], ) -> bool { for write_location in txn.write_hints().iter() { - let anchor_shard_id = self.get_anchor_shard_id(write_location); + let anchor_shard_id = get_anchor_shard_id(write_location, self.num_shards); for offset in 0..self.num_shards { let shard_id = (anchor_shard_id + offset) % self.num_shards; // Ignore if this is from the same shard diff --git a/execution/block-partitioner/src/sharded_block_partitioner/mod.rs b/execution/block-partitioner/src/sharded_block_partitioner/mod.rs index b8caa92f1b38d..a3e9b320d42ea 100644 --- a/execution/block-partitioner/src/sharded_block_partitioner/mod.rs +++ b/execution/block-partitioner/src/sharded_block_partitioner/mod.rs @@ -1,16 +1,19 @@ // Copyright © Aptos Foundation // SPDX-License-Identifier: Apache-2.0 -use crate::sharded_block_partitioner::{ - counters::BLOCK_PARTITIONING_MISC_TIMERS_SECONDS, - cross_shard_messages::CrossShardMsg, - dependency_analysis::WriteSetWithTxnIndex, - messages::{ - AddWithCrossShardDep, ControlMsg, - ControlMsg::{AddCrossShardDepReq, DiscardCrossShardDepReq}, - DiscardCrossShardDep, PartitioningResp, +use crate::{ + sharded_block_partitioner::{ + counters::BLOCK_PARTITIONING_MISC_TIMERS_SECONDS, + cross_shard_messages::CrossShardMsg, + dependency_analysis::WriteSetWithTxnIndex, + messages::{ + AddWithCrossShardDep, ControlMsg, + ControlMsg::{AddCrossShardDepReq, DiscardCrossShardDepReq}, + DiscardCrossShardDep, PartitioningResp, + }, + partitioning_shard::PartitioningShard, }, - partitioning_shard::PartitioningShard, + BlockPartitioner, }; use aptos_logger::{error, info}; use aptos_types::{ @@ -31,6 +34,7 @@ use std::{ thread, }; +pub mod config; mod conflict_detector; mod counters; mod cross_shard_messages; @@ -124,7 +128,7 @@ pub struct ShardedBlockPartitioner { } impl ShardedBlockPartitioner { - pub(crate) fn new( + pub fn new( num_shards: usize, max_partitioning_rounds: RoundId, cross_shard_dep_avoid_threshold: f32, @@ -464,6 +468,17 @@ impl Drop for ShardedBlockPartitioner { } } +impl BlockPartitioner for ShardedBlockPartitioner { + fn partition( + &self, + transactions: Vec, + num_shards: usize, + ) -> PartitionedTransactions { + assert_eq!(self.num_shards, num_shards); + ShardedBlockPartitioner::partition(self, transactions) + } +} + fn spawn_partitioning_shard( shard_id: ShardId, control_rx: Receiver, diff --git a/execution/block-partitioner/src/test_utils.rs b/execution/block-partitioner/src/test_utils.rs index b1766e7f73566..5f259b5664cb1 100644 --- a/execution/block-partitioner/src/test_utils.rs +++ b/execution/block-partitioner/src/test_utils.rs @@ -1,7 +1,29 @@ // Copyright © Aptos Foundation // SPDX-License-Identifier: Apache-2.0 +#[cfg(test)] +use crate::{BlockPartitioner, Sender}; +#[cfg(test)] +use aptos_crypto::hash::CryptoHash; +#[cfg(test)] +use aptos_crypto::hash::TestOnlyHash; +#[cfg(test)] +use aptos_crypto::HashValue; use aptos_crypto::{ed25519::ed25519_keys::Ed25519PrivateKey, PrivateKey, SigningKey, Uniform}; +#[cfg(test)] +use aptos_types::block_executor::partitioner::PartitionedTransactions; +#[cfg(test)] +use aptos_types::block_executor::partitioner::RoundId; +#[cfg(test)] +use aptos_types::block_executor::partitioner::ShardId; +#[cfg(test)] +use aptos_types::block_executor::partitioner::TransactionWithDependencies; +#[cfg(test)] +use aptos_types::block_executor::partitioner::GLOBAL_ROUND_ID; +#[cfg(test)] +use aptos_types::block_executor::partitioner::GLOBAL_SHARD_ID; +#[cfg(test)] +use aptos_types::state_store::state_key::StateKey; use aptos_types::{ chain_id::ChainId, transaction::{ @@ -13,6 +35,15 @@ use aptos_types::{ use move_core_types::{ account_address::AccountAddress, identifier::Identifier, language_storage::ModuleId, }; +#[cfg(test)] +use rand::thread_rng; +use rand::Rng; +use rayon::{iter::ParallelIterator, prelude::IntoParallelIterator}; +#[cfg(test)] +use std::collections::HashMap; +#[cfg(test)] +use std::collections::HashSet; +use std::sync::{Arc, Mutex}; #[derive(Debug)] pub struct TestAccount { @@ -79,3 +110,213 @@ pub fn create_signed_p2p_transaction( } transactions } + +pub struct P2PBlockGenerator { + accounts: Arc>>, +} + +impl P2PBlockGenerator { + pub fn new(num_accounts: usize) -> Self { + let accounts = (0..num_accounts) + .into_par_iter() + .map(|_i| Mutex::new(generate_test_account())) + .collect(); + Self { + accounts: Arc::new(accounts), + } + } + + pub fn rand_block(&self, rng: &mut R, block_size: usize) -> Vec + where + R: Rng, + { + (0..block_size) + .map(|_| { + let indices = rand::seq::index::sample(rng, self.accounts.len(), 2); + let receiver = self.accounts[indices.index(1)].lock().unwrap(); + let mut sender = self.accounts[indices.index(0)].lock().unwrap(); + create_signed_p2p_transaction(&mut sender, vec![&receiver]).remove(0) + }) + .collect() + } +} + +/// Assert partitioner correctness for `ShardedBlockPartitioner` and `V2Partitioner`: +/// - Transaction set remains the same after partitioning. +/// - The relative order of the txns from the same sender +/// - For a cross-shard dependency, the consumer txn always comes after the provider txn in the sharded block. +/// - Required edge set matches dependency edge set. +/// - Before the last round, there is no in-round cross-shard dependency. +/// +/// Also print a summary of the partitioning result. +#[cfg(test)] +pub fn verify_partitioner_output( + input: &Vec, + output: &PartitionedTransactions, +) { + let old_txn_id_by_txn_hash: HashMap = HashMap::from_iter( + input + .iter() + .enumerate() + .map(|(tid, txn)| (txn.test_only_hash(), tid)), + ); + + let mut total_comm_cost = 0; + let num_txns = input.len(); + let num_shards = output.sharded_txns().len(); + let num_rounds = output + .sharded_txns() + .first() + .map(|sbs| sbs.sub_blocks.len()) + .unwrap_or(0); + for sub_block_list in output.sharded_txns().iter().take(num_shards).skip(1) { + assert_eq!(num_rounds, sub_block_list.sub_blocks.len()); + } + let mut old_txn_idxs_by_sender: HashMap> = HashMap::new(); + let mut old_txn_idxs_seen: HashSet = HashSet::new(); + let mut edge_set_from_src_view: HashSet<(usize, usize, usize, HashValue, usize, usize, usize)> = + HashSet::new(); + let mut edge_set_from_dst_view: HashSet<(usize, usize, usize, HashValue, usize, usize, usize)> = + HashSet::new(); + + let mut for_each_sub_block = |round_id: usize, + shard_id: usize, + start_txn_idx: usize, + sub_block_txns: &[TransactionWithDependencies< + AnalyzedTransaction, + >]| { + let mut cur_sub_block_inbound_costs: HashMap<(RoundId, ShardId, StateKey), u64> = + HashMap::new(); + let mut cur_sub_block_outbound_costs: HashMap<(RoundId, ShardId, StateKey), u64> = + HashMap::new(); + for (pos_in_sub_block, txn_with_dep) in sub_block_txns.iter().enumerate() { + let sender = txn_with_dep.txn.sender(); + let old_txn_idx = *old_txn_id_by_txn_hash + .get(&txn_with_dep.txn().test_only_hash()) + .unwrap(); + old_txn_idxs_seen.insert(old_txn_idx); + old_txn_idxs_by_sender + .entry(sender) + .or_insert_with(Vec::new) + .push(old_txn_idx); + let new_txn_idx = start_txn_idx + pos_in_sub_block; + for loc in txn_with_dep.txn.write_hints().iter() { + let key = loc.clone().into_state_key(); + let key_str = CryptoHash::hash(&key).to_hex(); + println!( + "MATRIX_REPORT - round={}, shard={}, old_tid={}, new_tid={}, write_hint={}", + round_id, shard_id, old_txn_idx, new_txn_idx, key_str + ); + } + for (src_txn_idx, locs) in txn_with_dep + .cross_shard_dependencies + .required_edges() + .iter() + { + for loc in locs.iter() { + let key = loc.clone().into_state_key(); + if round_id != num_rounds - 1 { + assert_ne!(src_txn_idx.round_id, round_id); + } + assert!((src_txn_idx.round_id, src_txn_idx.shard_id) < (round_id, shard_id)); + edge_set_from_dst_view.insert(( + src_txn_idx.round_id, + src_txn_idx.shard_id, + src_txn_idx.txn_index, + CryptoHash::hash(&key), + round_id, + shard_id, + new_txn_idx, + )); + let value = cur_sub_block_inbound_costs + .entry((src_txn_idx.round_id, src_txn_idx.shard_id, key)) + .or_insert_with(|| 0); + *value += 1; + } + } + for (dst_tid, locs) in txn_with_dep + .cross_shard_dependencies + .dependent_edges() + .iter() + { + for loc in locs.iter() { + let key = loc.clone().into_state_key(); + if round_id != num_rounds - 1 { + assert_ne!(dst_tid.round_id, round_id); + } + assert!((round_id, shard_id) < (dst_tid.round_id, dst_tid.shard_id)); + edge_set_from_src_view.insert(( + round_id, + shard_id, + new_txn_idx, + CryptoHash::hash(&key), + dst_tid.round_id, + dst_tid.shard_id, + dst_tid.txn_index, + )); + let value = cur_sub_block_outbound_costs + .entry((dst_tid.round_id, dst_tid.shard_id, key)) + .or_insert_with(|| 0); + *value += 1; + } + } + } + let inbound_cost: u64 = cur_sub_block_inbound_costs.values().copied().sum(); + let outbound_cost: u64 = cur_sub_block_outbound_costs.values().copied().sum(); + println!("MATRIX_REPORT: round={}, shard={}, sub_block_size={}, inbound_cost={}, outbound_cost={}", round_id, shard_id, sub_block_txns.len(), inbound_cost, outbound_cost); + if round_id == 0 { + assert_eq!(0, inbound_cost); + } + total_comm_cost += inbound_cost + outbound_cost; + }; + + for round_id in 0..num_rounds { + for (shard_id, sub_block_list) in output.sharded_txns().iter().enumerate() { + let sub_block = sub_block_list.get_sub_block(round_id).unwrap(); + for_each_sub_block( + round_id, + shard_id, + sub_block.start_index, + sub_block.transactions_with_deps().as_slice(), + ) + } + } + for_each_sub_block( + GLOBAL_ROUND_ID, + GLOBAL_SHARD_ID, + output.num_sharded_txns(), + output.global_txns.as_slice(), + ); + + assert_eq!(HashSet::from_iter(0..num_txns), old_txn_idxs_seen); + assert_eq!(edge_set_from_src_view, edge_set_from_dst_view); + for (_sender, old_tids) in old_txn_idxs_by_sender { + assert!(is_sorted(&old_tids)); + } + println!("MATRIX_REPORT: total_comm_cost={}", total_comm_cost); +} + +#[cfg(test)] +fn is_sorted(arr: &Vec) -> bool { + let num = arr.len(); + for i in 1..num { + if arr[i - 1] >= arr[i] { + return false; + } + } + true +} + +#[cfg(test)] +pub fn assert_deterministic_result(partitioner: Arc) { + let mut rng = thread_rng(); + let block_gen = P2PBlockGenerator::new(1000); + for _ in 0..10 { + let txns = block_gen.rand_block(&mut rng, 100); + let result_0 = partitioner.partition(txns.clone(), 10); + for _ in 0..2 { + let result_1 = partitioner.partition(txns.clone(), 10); + assert_eq!(result_1, result_0); + } + } +} diff --git a/execution/block-partitioner/src/v2/build_edge.rs b/execution/block-partitioner/src/v2/build_edge.rs new file mode 100644 index 0000000000000..7a94d83b7ec2e --- /dev/null +++ b/execution/block-partitioner/src/v2/build_edge.rs @@ -0,0 +1,90 @@ +// Copyright © Aptos Foundation + +use crate::v2::{counters::MISC_TIMERS_SECONDS, state::PartitionState, PartitionerV2}; +use aptos_types::{ + block_executor::partitioner::{ + PartitionedTransactions, SubBlock, SubBlocksForShard, TransactionWithDependencies, + }, + transaction::analyzed_transaction::AnalyzedTransaction, +}; +use rayon::{ + iter::ParallelIterator, + prelude::{IntoParallelIterator, IntoParallelRefIterator}, +}; +use std::sync::Mutex; + +impl PartitionerV2 { + pub(crate) fn add_edges(state: &mut PartitionState) -> PartitionedTransactions { + let _timer = MISC_TIMERS_SECONDS + .with_label_values(&["add_edges"]) + .start_timer(); + + state.sub_block_matrix = state.thread_pool.install(|| { + (0..state.num_rounds()) + .into_par_iter() + .map(|_round_id| { + (0..state.num_executor_shards) + .into_par_iter() + .map(|_shard_id| Mutex::new(None)) + .collect() + }) + .collect() + }); + + state.thread_pool.install(|| { + (0..state.num_rounds()) + .into_par_iter() + .for_each(|round_id| { + (0..state.num_executor_shards) + .into_par_iter() + .for_each(|shard_id| { + let twds = state.finalized_txn_matrix[round_id][shard_id] + .par_iter() + .map(|&ori_txn_idx| { + state.take_txn_with_dep(round_id, shard_id, ori_txn_idx) + }) + .collect(); + let sub_block = + SubBlock::new(state.start_index_matrix[round_id][shard_id], twds); + *state.sub_block_matrix[round_id][shard_id].lock().unwrap() = + Some(sub_block); + }); + }); + }); + + let global_txns: Vec> = + if !state.partition_last_round { + state + .sub_block_matrix + .pop() + .unwrap() + .last() + .unwrap() + .lock() + .unwrap() + .take() + .unwrap() + .into_transactions_with_deps() + } else { + vec![] + }; + + let final_num_rounds = state.sub_block_matrix.len(); + let sharded_txns = (0..state.num_executor_shards) + .map(|shard_id| { + let sub_blocks: Vec> = (0..final_num_rounds) + .map(|round_id| { + state.sub_block_matrix[round_id][shard_id] + .lock() + .unwrap() + .take() + .unwrap() + }) + .collect(); + SubBlocksForShard::new(shard_id, sub_blocks) + }) + .collect(); + + PartitionedTransactions::new(sharded_txns, global_txns) + } +} diff --git a/execution/block-partitioner/src/v2/config.rs b/execution/block-partitioner/src/v2/config.rs new file mode 100644 index 0000000000000..340775be61088 --- /dev/null +++ b/execution/block-partitioner/src/v2/config.rs @@ -0,0 +1,61 @@ +// Copyright © Aptos Foundation + +use crate::{v2::PartitionerV2, BlockPartitioner}; + +#[derive(Clone, Copy, Debug)] +pub struct PartitionerV2Config { + pub num_threads: usize, + pub max_partitioning_rounds: usize, + pub cross_shard_dep_avoid_threshold: f32, + pub dashmap_num_shards: usize, + pub partition_last_round: bool, +} + +impl PartitionerV2Config { + pub fn build(self) -> Box { + Box::new(PartitionerV2::new( + self.num_threads, + self.max_partitioning_rounds, + self.cross_shard_dep_avoid_threshold, + self.dashmap_num_shards, + self.partition_last_round, + )) + } + + pub fn num_threads(mut self, val: usize) -> Self { + self.num_threads = val; + self + } + + pub fn max_partitioning_rounds(mut self, val: usize) -> Self { + self.max_partitioning_rounds = val; + self + } + + pub fn cross_shard_dep_avoid_threshold(mut self, val: f32) -> Self { + self.cross_shard_dep_avoid_threshold = val; + self + } + + pub fn dashmap_num_shards(mut self, val: usize) -> Self { + self.dashmap_num_shards = val; + self + } + + pub fn partition_last_round(mut self, val: bool) -> Self { + self.partition_last_round = val; + self + } +} + +impl Default for PartitionerV2Config { + fn default() -> Self { + Self { + num_threads: 8, + max_partitioning_rounds: 4, + cross_shard_dep_avoid_threshold: 0.9, + dashmap_num_shards: 64, + partition_last_round: false, + } + } +} diff --git a/execution/block-partitioner/src/v2/conflicting_txn_tracker.rs b/execution/block-partitioner/src/v2/conflicting_txn_tracker.rs new file mode 100644 index 0000000000000..487a42f464917 --- /dev/null +++ b/execution/block-partitioner/src/v2/conflicting_txn_tracker.rs @@ -0,0 +1,151 @@ +// Copyright © Aptos Foundation + +use crate::v2::types::{PrePartitionedTxnIdx, ShardedTxnIndexV2}; +#[cfg(test)] +use aptos_types::state_store::state_key::StateKey; +use aptos_types::{ + block_executor::partitioner::{RoundId, ShardId}, + transaction::analyzed_transaction::StorageLocation, +}; +use serde::{Deserialize, Serialize}; +use std::collections::btree_set::BTreeSet; + +/// This structure is only used in `V2Partitioner`. +/// For txns that claimed to access the same storage location, +/// it caches some metadata about the location and also keeps track of their status (pending or position finalized) throughout the partitioning process. +#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] +pub struct ConflictingTxnTracker { + /// The storage location on which conflicting txns are being tracked by this tracker. + pub storage_location: StorageLocation, + /// A randomly chosen owner shard of the storage location, for conflict resolution purpose. + pub anchor_shard_id: ShardId, + /// Txns that (1) read the current storage location and (2) have not been accepted. + pending_reads: BTreeSet, + /// Txns that (1) write the current storage location and (2) have not been accepted. + pending_writes: BTreeSet, + /// Txns that have been accepted. + pub finalized: BTreeSet, + /// Txns that (1) write the current storage location and (2) have been accepted. + pub finalized_writes: BTreeSet, +} + +impl ConflictingTxnTracker { + pub fn new(storage_location: StorageLocation, anchor_shard_id: ShardId) -> Self { + Self { + storage_location, + anchor_shard_id, + pending_reads: Default::default(), + pending_writes: Default::default(), + finalized: Default::default(), + finalized_writes: Default::default(), + } + } + + pub fn add_read_candidate(&mut self, txn_id: PrePartitionedTxnIdx) { + self.pending_reads.insert(txn_id); + } + + pub fn add_write_candidate(&mut self, txn_id: PrePartitionedTxnIdx) { + self.pending_writes.insert(txn_id); + } + + /// Partitioner has finalized the position of a txn. Remove it from the pending txn list. + pub fn mark_txn_ordered( + &mut self, + txn_id: PrePartitionedTxnIdx, + round_id: RoundId, + shard_id: ShardId, + ) { + let sharded_txn_idx = ShardedTxnIndexV2::new(round_id, shard_id, txn_id); + if self.pending_writes.remove(&txn_id) { + self.finalized_writes.insert(sharded_txn_idx); + } else { + assert!(self.pending_reads.remove(&txn_id)); + } + self.finalized.insert(sharded_txn_idx); + } + + /// Check if there is a txn writing to the current storage location and its txn_id in the given wrapped range [start, end). + pub fn has_write_in_range( + &self, + start_txn_id: PrePartitionedTxnIdx, + end_txn_id: PrePartitionedTxnIdx, + ) -> bool { + if start_txn_id <= end_txn_id { + self.pending_writes + .range(start_txn_id..end_txn_id) + .next() + .is_some() + } else { + self.pending_writes.range(start_txn_id..).next().is_some() + || self.pending_writes.range(..end_txn_id).next().is_some() + } + } +} + +#[test] +fn test_conflicting_txn_tracker() { + let mut tracker = + ConflictingTxnTracker::new(StorageLocation::Specific(StateKey::raw(vec![])), 0); + tracker.add_write_candidate(4); + tracker.add_write_candidate(10); + tracker.add_write_candidate(7); + tracker.add_read_candidate(8); + tracker.add_write_candidate(9); + // candidates: T4(W), T7(W), T8(R), T9(W), T10(W) + // promoted: - + assert!(!tracker.has_write_in_range(4, 4)); // 0-length interval + assert!(tracker.has_write_in_range(4, 5)); // 0-length interval + assert!(tracker.has_write_in_range(5, 10)); + assert!(!tracker.has_write_in_range(8, 9)); + assert!(tracker.has_write_in_range(11, 5)); // wrapped range + assert!(!tracker.has_write_in_range(11, 4)); // wrapped range + tracker.mark_txn_ordered(9, 99, 10); + // candidates: T4(W), T7(W), T8(R), T10(W) + // promoted: (99,10)/T9(W) + assert!(tracker.has_write_in_range(5, 10)); + tracker.mark_txn_ordered(7, 99, 20); + // candidates: T4(W), T8(R), T10(W) + // promoted: (99,10)/T9(W), (99,20)/T7(W) + assert!(!tracker.has_write_in_range(5, 10)); + tracker.mark_txn_ordered(4, 99, 20); + tracker.mark_txn_ordered(8, 99, 30); + tracker.mark_txn_ordered(10, 99, 30); + // candidates: - + // promoted: (99,10)/T9(W), (99,20)/T4(W), (99,20)/T7(W), (99,30)/T8(R), (99,30)/T10(W) + assert_eq!( + vec![ + ShardedTxnIndexV2::new(99, 10, 9), + ShardedTxnIndexV2::new(99, 20, 4), + ShardedTxnIndexV2::new(99, 20, 7) + ], + tracker + .finalized + .range(ShardedTxnIndexV2::new(98, 0, 0)..ShardedTxnIndexV2::new(99, 20, 8)) + .copied() + .collect::>() + ); + assert_eq!( + vec![ + ShardedTxnIndexV2::new(99, 20, 7), + ShardedTxnIndexV2::new(99, 30, 8), + ShardedTxnIndexV2::new(99, 30, 10) + ], + tracker + .finalized + .range(ShardedTxnIndexV2::new(99, 20, 7)..) + .copied() + .collect::>() + ); + assert_eq!( + vec![ + ShardedTxnIndexV2::new(99, 20, 7), + ShardedTxnIndexV2::new(99, 30, 10) + ], + tracker + .finalized_writes + .range(ShardedTxnIndexV2::new(99, 20, 7)..ShardedTxnIndexV2::new(99, 40, 0)) + .copied() + .collect::>() + ); +} diff --git a/execution/block-partitioner/src/v2/counters.rs b/execution/block-partitioner/src/v2/counters.rs new file mode 100644 index 0000000000000..c37b7400f8c96 --- /dev/null +++ b/execution/block-partitioner/src/v2/counters.rs @@ -0,0 +1,16 @@ +// Copyright © Aptos Foundation + +use aptos_metrics_core::{exponential_buckets, register_histogram_vec, HistogramVec}; +use once_cell::sync::Lazy; + +pub static MISC_TIMERS_SECONDS: Lazy = Lazy::new(|| { + register_histogram_vec!( + // metric name + "aptos_block_partitioner_v2_misc_timers_seconds", + // metric description + "The time spent in seconds of miscellaneous phases of block partitioner v2.", + &["name"], + exponential_buckets(/*start=*/ 1e-3, /*factor=*/ 2.0, /*count=*/ 20).unwrap(), + ) + .unwrap() +}); diff --git a/execution/block-partitioner/src/v2/init.rs b/execution/block-partitioner/src/v2/init.rs new file mode 100644 index 0000000000000..54bf1517806bc --- /dev/null +++ b/execution/block-partitioner/src/v2/init.rs @@ -0,0 +1,59 @@ +// Copyright © Aptos Foundation + +use crate::{ + get_anchor_shard_id, + v2::{ + conflicting_txn_tracker::ConflictingTxnTracker, counters::MISC_TIMERS_SECONDS, + state::PartitionState, types::PrePartitionedTxnIdx, PartitionerV2, + }, +}; +use rayon::{iter::ParallelIterator, prelude::IntoParallelIterator}; +use std::sync::RwLock; + +impl PartitionerV2 { + pub(crate) fn init(state: &mut PartitionState) { + let _timer = MISC_TIMERS_SECONDS + .with_label_values(&["init"]) + .start_timer(); + + state.thread_pool.install(|| { + (0..state.num_txns()) + .into_par_iter() + .for_each(|txn_idx: PrePartitionedTxnIdx| { + let txn_read_guard = state.txns[txn_idx].read().unwrap(); + let txn = txn_read_guard.as_ref().unwrap(); + let sender_idx = state.add_sender(txn.sender()); + *state.sender_idxs[txn_idx].write().unwrap() = Some(sender_idx); + + let reads = txn.read_hints.iter().map(|loc| (loc, false)); + let writes = txn.write_hints.iter().map(|loc| (loc, true)); + reads + .chain(writes) + .for_each(|(storage_location, is_write)| { + let key_idx = state.add_key(storage_location.state_key()); + if is_write { + state.write_sets[txn_idx].write().unwrap().insert(key_idx); + } else { + state.read_sets[txn_idx].write().unwrap().insert(key_idx); + } + let tracker_ref = state.trackers.entry(key_idx).or_insert_with(|| { + let anchor_shard_id = get_anchor_shard_id( + storage_location, + state.num_executor_shards, + ); + RwLock::new(ConflictingTxnTracker::new( + storage_location.clone(), + anchor_shard_id, + )) + }); + let mut tracker = tracker_ref.write().unwrap(); + if is_write { + tracker.add_write_candidate(txn_idx); + } else { + tracker.add_read_candidate(txn_idx); + } + }); + }); + }); + } +} diff --git a/execution/block-partitioner/src/v2/mod.rs b/execution/block-partitioner/src/v2/mod.rs new file mode 100644 index 0000000000000..2dc44c7a24bfb --- /dev/null +++ b/execution/block-partitioner/src/v2/mod.rs @@ -0,0 +1,165 @@ +// Copyright © Aptos Foundation + +use crate::{ + pre_partition::{uniform_partitioner::UniformPartitioner, PrePartitioner}, + v2::counters::MISC_TIMERS_SECONDS, + BlockPartitioner, +}; +use aptos_types::{ + block_executor::partitioner::{PartitionedTransactions, RoundId}, + transaction::analyzed_transaction::AnalyzedTransaction, +}; +use rayon::{ThreadPool, ThreadPoolBuilder}; +use state::PartitionState; +use std::sync::{Arc, RwLock}; +use types::PrePartitionedTxnIdx; + +mod build_edge; +pub mod config; +mod conflicting_txn_tracker; +mod counters; +mod init; +mod partition_to_matrix; +pub(crate) mod state; +pub mod types; + +/// Basically `ShardedBlockPartitioner` but: +/// - Not pre-partitioned by txn sender. +/// - implemented more efficiently. +pub struct PartitionerV2 { + pre_partitioner: Box, + thread_pool: Arc, + max_partitioning_rounds: RoundId, + cross_shard_dep_avoid_threshold: f32, + dashmap_num_shards: usize, + partition_last_round: bool, +} + +impl PartitionerV2 { + pub fn new( + num_threads: usize, + num_rounds_limit: usize, + cross_shard_dep_avoid_threshold: f32, + dashmap_num_shards: usize, + partition_last_round: bool, + ) -> Self { + let thread_pool = Arc::new( + ThreadPoolBuilder::new() + .num_threads(num_threads) + .build() + .unwrap(), + ); + Self { + pre_partitioner: Box::new(UniformPartitioner {}), //TODO: parameterize it. + thread_pool, + max_partitioning_rounds: num_rounds_limit, + cross_shard_dep_avoid_threshold, + dashmap_num_shards, + partition_last_round, + } + } + + fn pre_partition( + &self, + txns: &[AnalyzedTransaction], + num_shards: usize, + ) -> Vec> { + let _timer = MISC_TIMERS_SECONDS + .with_label_values(&["pre_partition"]) + .start_timer(); + self.pre_partitioner.pre_partition(txns, num_shards) + } +} + +impl BlockPartitioner for PartitionerV2 { + fn partition( + &self, + txns: Vec, + num_executor_shards: usize, + ) -> PartitionedTransactions { + let _timer = MISC_TIMERS_SECONDS + .with_label_values(&["total"]) + .start_timer(); + + // Step 0: pre-partition. Divide a list of transactions into `num_executor_shards` chunks. + let pre_partitioned = self.pre_partition(txns.as_slice(), num_executor_shards); + + let mut state = PartitionState::new( + self.thread_pool.clone(), + self.dashmap_num_shards, + txns, + num_executor_shards, + pre_partitioned, + self.max_partitioning_rounds, + self.cross_shard_dep_avoid_threshold, + self.partition_last_round, + ); + // Step 1: build some necessary indices for txn senders/storage locations. + Self::init(&mut state); + + // Step 2: remove cross-shard dependencies by move some txns into new rounds. + // As a result, we get a txn matrix of no more than `self.max_partitioning_rounds` rows and exactly `num_executor_shards` columns. + // It's guaranteed that inside every round other than the last round, there's no cross-shard dependency. (But cross-round dependencies are always possible.) + Self::remove_cross_shard_dependencies(&mut state); + + // Step 3: build some additional indices of the resulting txn matrix from Step 2. + Self::build_index_from_txn_matrix(&mut state); + + // Step 4: calculate all the cross-shard dependencies and prepare the input for sharded execution. + let ret = Self::add_edges(&mut state); + + // Async clean-up. + self.thread_pool.spawn(move || { + drop(state); + }); + ret + } +} + +#[cfg(test)] +mod tests { + use crate::{ + test_utils::{assert_deterministic_result, P2PBlockGenerator}, + v2::PartitionerV2, + BlockPartitioner, + }; + use rand::{thread_rng, Rng}; + use std::sync::Arc; + + #[test] + fn test_partitioner_v2_correctness() { + for merge_discarded in [false, true] { + let block_generator = P2PBlockGenerator::new(100); + let partitioner = PartitionerV2::new(8, 4, 0.9, 64, merge_discarded); + let mut rng = thread_rng(); + for _run_id in 0..20 { + let block_size = 10_u64.pow(rng.gen_range(0, 4)) as usize; + let num_shards = rng.gen_range(1, 10); + let block = block_generator.rand_block(&mut rng, block_size); + let block_clone = block.clone(); + let partitioned = partitioner.partition(block, num_shards); + crate::test_utils::verify_partitioner_output(&block_clone, &partitioned); + } + } + } + + #[test] + fn test_partitioner_v2_determinism() { + for merge_discarded in [false, true] { + let partitioner = Arc::new(PartitionerV2::new(4, 4, 0.9, 64, merge_discarded)); + assert_deterministic_result(partitioner); + } + } +} + +fn extract_and_sort(arr_2d: Vec>>) -> Vec> { + arr_2d + .into_iter() + .map(|arr_1d| { + let mut arr_1d_guard = arr_1d.write().unwrap(); + let mut arr_1d_value = std::mem::take(&mut *arr_1d_guard); + arr_1d_value.sort(); + arr_1d_value + }) + .collect::>() +} diff --git a/execution/block-partitioner/src/v2/partition_to_matrix.rs b/execution/block-partitioner/src/v2/partition_to_matrix.rs new file mode 100644 index 0000000000000..39d90fb23d64a --- /dev/null +++ b/execution/block-partitioner/src/v2/partition_to_matrix.rs @@ -0,0 +1,224 @@ +// Copyright © Aptos Foundation + +use crate::v2::{ + counters::MISC_TIMERS_SECONDS, + extract_and_sort, + state::PartitionState, + types::{PrePartitionedTxnIdx, SenderIdx}, + PartitionerV2, +}; +use aptos_logger::trace; +use aptos_types::block_executor::partitioner::{RoundId, TxnIndex}; +use dashmap::DashMap; +use rayon::{ + iter::ParallelIterator, + prelude::{IntoParallelIterator, IntoParallelRefIterator}, +}; +use std::{ + mem, + sync::{ + atomic::{AtomicUsize, Ordering}, + RwLock, + }, +}; + +impl PartitionerV2 { + /// Populate `state.finalized_txn_matrix` with txns flattened into a matrix (num_rounds by num_shards), + /// in a way that avoid in-round cross-shard conflicts. + pub(crate) fn remove_cross_shard_dependencies(state: &mut PartitionState) { + let _timer = MISC_TIMERS_SECONDS + .with_label_values(&["remove_cross_shard_dependencies"]) + .start_timer(); + + let mut remaining_txns = mem::take(&mut state.pre_partitioned); + assert_eq!(state.num_executor_shards, remaining_txns.len()); + + let mut num_remaining_txns: usize; + for round_id in 0..(state.num_rounds_limit - 1) { + let (accepted, discarded) = Self::discarding_round(state, round_id, remaining_txns); + state.finalized_txn_matrix.push(accepted); + remaining_txns = discarded; + num_remaining_txns = remaining_txns.iter().map(|ts| ts.len()).sum(); + + if num_remaining_txns + < ((1.0 - state.cross_shard_dep_avoid_threshold) * state.num_txns() as f32) as usize + { + break; + } + } + + let _timer = MISC_TIMERS_SECONDS + .with_label_values(&["last_round"]) + .start_timer(); + + if !state.partition_last_round { + trace!("Merging txns after discarding stopped."); + let last_round_txns: Vec = + remaining_txns.into_iter().flatten().collect(); + remaining_txns = vec![vec![]; state.num_executor_shards]; + remaining_txns[state.num_executor_shards - 1] = last_round_txns; + } + + let last_round_id = state.finalized_txn_matrix.len(); + state.thread_pool.install(|| { + (0..state.num_executor_shards) + .into_par_iter() + .for_each(|shard_id| { + remaining_txns[shard_id] + .par_iter() + .for_each(|&ori_txn_idx| { + state.update_trackers_on_accepting( + ori_txn_idx, + last_round_id, + shard_id, + ); + }); + }); + }); + state.finalized_txn_matrix.push(remaining_txns); + } + + /// Given some pre-partitioned txns, pull some off from each shard to avoid cross-shard conflict. + /// The pulled off txns become the pre-partitioned txns for the next round. + pub(crate) fn discarding_round( + state: &mut PartitionState, + round_id: RoundId, + remaining_txns: Vec>, + ) -> ( + Vec>, + Vec>, + ) { + let _timer = MISC_TIMERS_SECONDS + .with_label_values(&[format!("round_{round_id}").as_str()]) + .start_timer(); + + let num_shards = remaining_txns.len(); + + // Overview of the logic: + // 1. Key conflicts are analyzed and a txn from `remaining_txns` either goes to `discarded` or `tentatively_accepted`. + // 2. Relative orders of txns from the same sender are analyzed and a txn from `tentatively_accepted` either goes to `finally_accepted` or `discarded`. + let mut discarded: Vec>> = Vec::with_capacity(num_shards); + let mut tentatively_accepted: Vec>> = + Vec::with_capacity(num_shards); + let mut finally_accepted: Vec>> = + Vec::with_capacity(num_shards); + + for txns in remaining_txns.iter() { + tentatively_accepted.push(RwLock::new(Vec::with_capacity(txns.len()))); + finally_accepted.push(RwLock::new(Vec::with_capacity(txns.len()))); + discarded.push(RwLock::new(Vec::with_capacity(txns.len()))); + } + + let min_discard_table: DashMap = + DashMap::with_shard_amount(state.dashmap_num_shards); + + state.thread_pool.install(|| { + // Move some txns to the next round (stored in `discarded`). + // For those who remain in the current round (`tentatively_accepted`), + // it's guaranteed to have no cross-shard conflicts. + remaining_txns + .into_iter() + .enumerate() + .collect::>() + .into_par_iter() + .for_each(|(shard_id, txn_idxs)| { + txn_idxs.into_par_iter().for_each(|txn_idx| { + let mut in_round_conflict_detected = false; + let write_set = state.write_sets[txn_idx].read().unwrap(); + let read_set = state.read_sets[txn_idx].read().unwrap(); + for &key_idx in write_set.iter().chain(read_set.iter()) { + if state.key_owned_by_another_shard(shard_id, key_idx) { + in_round_conflict_detected = true; + break; + } + } + + if in_round_conflict_detected { + let sender = state.sender_idx(txn_idx); + min_discard_table + .entry(sender) + .or_insert_with(|| AtomicUsize::new(usize::MAX)) + .fetch_min(txn_idx, Ordering::SeqCst); + discarded[shard_id].write().unwrap().push(txn_idx); + } else { + tentatively_accepted[shard_id] + .write() + .unwrap() + .push(txn_idx); + } + }); + }); + + // Additional discarding to preserve relative txn order for the same sender. + tentatively_accepted + .into_iter() + .enumerate() + .collect::>() + .into_par_iter() + .for_each(|(shard_id, txn_idxs)| { + let txn_idxs = mem::take(&mut *txn_idxs.write().unwrap()); + txn_idxs.into_par_iter().for_each(|ori_txn_idx| { + let sender_idx = state.sender_idx(ori_txn_idx); + let min_discarded = min_discard_table + .get(&sender_idx) + .map(|kv| kv.load(Ordering::SeqCst)) + .unwrap_or(usize::MAX); + if ori_txn_idx < min_discarded { + state.update_trackers_on_accepting(ori_txn_idx, round_id, shard_id); + finally_accepted[shard_id] + .write() + .unwrap() + .push(ori_txn_idx); + } else { + discarded[shard_id].write().unwrap().push(ori_txn_idx); + } + }); + }); + }); + + state.thread_pool.spawn(move || { + drop(min_discard_table); + }); + + ( + extract_and_sort(finally_accepted), + extract_and_sort(discarded), + ) + } + + pub(crate) fn build_index_from_txn_matrix(state: &mut PartitionState) { + let _timer = MISC_TIMERS_SECONDS + .with_label_values(&["build_index_from_txn_matrix"]) + .start_timer(); + + let num_rounds = state.finalized_txn_matrix.len(); + state.start_index_matrix = vec![vec![0; state.num_executor_shards]; num_rounds]; + let mut global_counter: TxnIndex = 0; + for (round_id, row) in state.finalized_txn_matrix.iter().enumerate() { + for (shard_id, txns) in row.iter().enumerate() { + state.start_index_matrix[round_id][shard_id] = global_counter; + global_counter += txns.len(); + } + } + + state.new_txn_idxs = (0..state.num_txns()).map(|_tid| RwLock::new(0)).collect(); + + state.thread_pool.install(|| { + (0..num_rounds).into_par_iter().for_each(|round_id| { + (0..state.num_executor_shards) + .into_par_iter() + .for_each(|shard_id| { + let sub_block_size = state.finalized_txn_matrix[round_id][shard_id].len(); + (0..sub_block_size) + .into_par_iter() + .for_each(|pos_in_sub_block| { + let txn_idx = state.finalized_txn_matrix[round_id][shard_id] + [pos_in_sub_block]; + *state.new_txn_idxs[txn_idx].write().unwrap() = + state.start_index_matrix[round_id][shard_id] + pos_in_sub_block; + }); + }); + }); + }); + } +} diff --git a/execution/block-partitioner/src/v2/state.rs b/execution/block-partitioner/src/v2/state.rs new file mode 100644 index 0000000000000..7a9528cd29d93 --- /dev/null +++ b/execution/block-partitioner/src/v2/state.rs @@ -0,0 +1,311 @@ +// Copyright © Aptos Foundation + +#![allow(unused_variables)] + +use crate::{ + pre_partition::start_txn_idxs, + v2::{ + conflicting_txn_tracker::ConflictingTxnTracker, + counters::MISC_TIMERS_SECONDS, + types::{PrePartitionedTxnIdx, SenderIdx, ShardedTxnIndexV2, StorageKeyIdx, SubBlockIdx}, + }, + Sender, +}; +use aptos_types::{ + block_executor::partitioner::{ + CrossShardDependencies, RoundId, ShardId, ShardedTxnIndex, SubBlock, + TransactionWithDependencies, TxnIndex, + }, + state_store::state_key::StateKey, + transaction::analyzed_transaction::{AnalyzedTransaction, StorageLocation}, +}; +use dashmap::DashMap; +use rayon::{ + iter::{IntoParallelIterator, ParallelIterator}, + ThreadPool, +}; +use std::{ + collections::HashSet, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, Mutex, RwLock, + }, +}; + +/// All the parameters, indexes, temporary states needed in a `PartitionerV2` session wrapped in a single struct +/// to make async drop easy. +pub struct PartitionState { + // Params/utils from partitioner. + pub(crate) num_executor_shards: ShardId, + pub(crate) num_rounds_limit: usize, + pub(crate) dashmap_num_shards: usize, + pub(crate) cross_shard_dep_avoid_threshold: f32, + pub(crate) partition_last_round: bool, + pub(crate) thread_pool: Arc, + + /// Holding all the txns. + /// Wrapped in `RwLock` to allow being taking in parallel in `add_edges` phase and parallel reads in other phases. + pub(crate) txns: Vec>>, + + // Pre-partitioning results. + pub(crate) pre_partitioned: Vec>, + pub(crate) start_txn_idxs_by_shard: Vec, + + // The discretized txn info, populated in `init()`. + /// Sender index by `PreParedTxnIdx`. + pub(crate) sender_idxs: Vec>>, + /// Write key indices by `PreParedTxnIdx`. + pub(crate) write_sets: Vec>>, + /// Read key indices by `PreParedTxnIdx`. + pub(crate) read_sets: Vec>>, + + // Used in `init()` to discretize senders. + pub(crate) sender_counter: AtomicUsize, + pub(crate) sender_idx_table: DashMap, + + // Used in `init()` to discretize storage locations. + pub(crate) storage_key_counter: AtomicUsize, + pub(crate) key_idx_table: DashMap, + + // A `ConflictingTxnTracker` for each key that helps resolve conflicts and speed-up edge creation. + pub(crate) trackers: DashMap>, + + // Results of `remove_cross_shard_dependencies()`. + pub(crate) finalized_txn_matrix: Vec>>, + pub(crate) start_index_matrix: Vec>, + pub(crate) new_txn_idxs: Vec>, + + // Temporary sub-block matrix used in `add_edges()`. + pub(crate) sub_block_matrix: Vec>>>>, +} + +/// Some small operations. +impl PartitionState { + pub fn new( + thread_pool: Arc, + dashmap_num_shards: usize, + txns: Vec, + num_executor_shards: ShardId, + pre_partitioned: Vec>, + num_rounds_limit: usize, + cross_shard_dep_avoid_threshold: f32, + merge_discarded: bool, + ) -> Self { + let _timer = MISC_TIMERS_SECONDS + .with_label_values(&["new"]) + .start_timer(); + let num_txns = txns.len(); + let sender_counter = AtomicUsize::new(0); + let key_counter = AtomicUsize::new(0); + let mut senders: Vec>> = Vec::with_capacity(num_txns); + let mut wsets: Vec>> = Vec::with_capacity(num_txns); + let mut rsets: Vec>> = Vec::with_capacity(num_txns); + let sender_idx_table: DashMap = + DashMap::with_shard_amount(dashmap_num_shards); + let key_idx_table: DashMap = + DashMap::with_shard_amount(dashmap_num_shards); + let trackers: DashMap> = + DashMap::with_shard_amount(dashmap_num_shards); + for txn in txns.iter() { + senders.push(RwLock::new(None)); + wsets.push(RwLock::new(HashSet::with_capacity(txn.write_hints().len()))); + rsets.push(RwLock::new(HashSet::with_capacity(txn.read_hints().len()))); + } + let takable_txns = thread_pool.install(|| { + txns.into_par_iter() + .map(|txn| RwLock::new(Some(txn))) + .collect() + }); + let start_txn_idxs_by_shard = start_txn_idxs(&pre_partitioned); + Self { + dashmap_num_shards, + partition_last_round: merge_discarded, + thread_pool, + num_executor_shards, + pre_partitioned, + start_txn_idxs_by_shard, + sender_counter, + storage_key_counter: key_counter, + sender_idxs: senders, + write_sets: wsets, + read_sets: rsets, + sender_idx_table, + key_idx_table, + trackers, + cross_shard_dep_avoid_threshold, + num_rounds_limit, + finalized_txn_matrix: Vec::with_capacity(num_rounds_limit), + new_txn_idxs: vec![], + start_index_matrix: vec![], + txns: takable_txns, + sub_block_matrix: vec![], + } + } + + pub(crate) fn num_txns(&self) -> usize { + self.txns.len() + } + + pub(crate) fn add_key(&self, key: &StateKey) -> StorageKeyIdx { + *self + .key_idx_table + .entry(key.clone()) + .or_insert_with(|| self.storage_key_counter.fetch_add(1, Ordering::SeqCst)) + } + + pub(crate) fn storage_location(&self, key_idx: StorageKeyIdx) -> StorageLocation { + let tracker_ref = self.trackers.get(&key_idx).unwrap(); + let tracker = tracker_ref.read().unwrap(); + tracker.storage_location.clone() + } + + pub(crate) fn sender_idx(&self, txn_idx: PrePartitionedTxnIdx) -> SenderIdx { + *self.sender_idxs[txn_idx].read().unwrap().as_ref().unwrap() + } + + pub(crate) fn add_sender(&self, sender: Sender) -> SenderIdx { + *self + .sender_idx_table + .entry(sender) + .or_insert_with(|| self.sender_counter.fetch_add(1, Ordering::SeqCst)) + } + + /// For a key, check if there is any write between the anchor shard and a given shard. + pub(crate) fn key_owned_by_another_shard(&self, shard_id: ShardId, key: StorageKeyIdx) -> bool { + let tracker_ref = self.trackers.get(&key).unwrap(); + let tracker = tracker_ref.read().unwrap(); + let range_start = self.start_txn_idxs_by_shard[tracker.anchor_shard_id]; + let range_end = self.start_txn_idxs_by_shard[shard_id]; + tracker.has_write_in_range(range_start, range_end) + } + + pub(crate) fn update_trackers_on_accepting( + &self, + ori_txn_idx: PrePartitionedTxnIdx, + round_id: RoundId, + shard_id: ShardId, + ) { + let write_set = self.write_sets[ori_txn_idx].read().unwrap(); + let read_set = self.read_sets[ori_txn_idx].read().unwrap(); + for &key_idx in write_set.iter().chain(read_set.iter()) { + self.trackers + .get(&key_idx) + .unwrap() + .write() + .unwrap() + .mark_txn_ordered(ori_txn_idx, round_id, shard_id); + } + } + + /// Get the last txn inside `sub_block` that writes a given key. + pub(crate) fn last_writer( + &self, + key: StorageKeyIdx, + sub_block: SubBlockIdx, + ) -> Option { + let tracker_ref = self.trackers.get(&key).unwrap(); + let tracker = tracker_ref.read().unwrap(); + let start = ShardedTxnIndexV2::new(sub_block.round_id, sub_block.shard_id, 0); + let end = ShardedTxnIndexV2::new(sub_block.round_id, sub_block.shard_id + 1, 0); + let ret = tracker + .finalized_writes + .range(start..end) + .last() + .map(|t| t.ori_txn_idx); + ret + } + + /// Get the 1st txn after `since` that writes a given key. + pub(crate) fn first_writer( + &self, + key: StorageKeyIdx, + since: ShardedTxnIndexV2, + ) -> Option { + let tracker_ref = self.trackers.get(&key).unwrap(); + let tracker = tracker_ref.read().unwrap(); + tracker.finalized_writes.range(since..).next().copied() + } + + /// Get all txns that access a certain key in a sub-block range. + pub(crate) fn all_txns_in_sub_block_range( + &self, + key: StorageKeyIdx, + start: ShardedTxnIndexV2, + end: ShardedTxnIndexV2, + ) -> Vec { + let tracker_ref = self.trackers.get(&key).unwrap(); + let tracker = tracker_ref.read().unwrap(); + tracker.finalized.range(start..end).copied().collect() + } + + pub(crate) fn num_rounds(&self) -> usize { + self.finalized_txn_matrix.len() + } + + pub(crate) fn final_sub_block_idx(&self, sub_blk_idx: SubBlockIdx) -> SubBlockIdx { + if !self.partition_last_round && sub_blk_idx.round_id == self.num_rounds() - 1 { + SubBlockIdx::global() + } else { + sub_blk_idx + } + } + + /// Take a txn out, wrap it as a `TransactionWithDependencies`. + pub(crate) fn take_txn_with_dep( + &self, + round_id: RoundId, + shard_id: ShardId, + ori_txn_idx: PrePartitionedTxnIdx, + ) -> TransactionWithDependencies { + let txn = self.txns[ori_txn_idx].write().unwrap().take().unwrap(); + let mut deps = CrossShardDependencies::default(); + + // Build required edges. + let write_set = self.write_sets[ori_txn_idx].read().unwrap(); + let read_set = self.read_sets[ori_txn_idx].read().unwrap(); + for &key_idx in write_set.iter().chain(read_set.iter()) { + let tracker_ref = self.trackers.get(&key_idx).unwrap(); + let tracker = tracker_ref.read().unwrap(); + if let Some(txn_idx) = tracker + .finalized_writes + .range(..ShardedTxnIndexV2::new(round_id, shard_id, 0)) + .last() + { + let src_txn_idx = ShardedTxnIndex { + txn_index: *self.new_txn_idxs[txn_idx.ori_txn_idx].read().unwrap(), + shard_id: txn_idx.shard_id(), + round_id: txn_idx.round_id(), + }; + deps.add_required_edge(src_txn_idx, tracker.storage_location.clone()); + } + } + + // Build dependent edges. + for &key_idx in self.write_sets[ori_txn_idx].read().unwrap().iter() { + if Some(ori_txn_idx) == self.last_writer(key_idx, SubBlockIdx { round_id, shard_id }) { + let start_of_next_sub_block = ShardedTxnIndexV2::new(round_id, shard_id + 1, 0); + let next_writer = self.first_writer(key_idx, start_of_next_sub_block); + let end_follower = match next_writer { + None => ShardedTxnIndexV2::new(self.num_rounds(), self.num_executor_shards, 0), // Guaranteed to be greater than any invalid idx... + Some(idx) => ShardedTxnIndexV2::new(idx.round_id(), idx.shard_id() + 1, 0), + }; + for follower_txn_idx in + self.all_txns_in_sub_block_range(key_idx, start_of_next_sub_block, end_follower) + { + let final_sub_blk_idx = + self.final_sub_block_idx(follower_txn_idx.sub_block_idx); + let dst_txn_idx = ShardedTxnIndex { + txn_index: *self.new_txn_idxs[follower_txn_idx.ori_txn_idx] + .read() + .unwrap(), + shard_id: final_sub_blk_idx.shard_id, + round_id: final_sub_blk_idx.round_id, + }; + deps.add_dependent_edge(dst_txn_idx, vec![self.storage_location(key_idx)]); + } + } + } + + TransactionWithDependencies::new(txn, deps) + } +} diff --git a/execution/block-partitioner/src/v2/types.rs b/execution/block-partitioner/src/v2/types.rs new file mode 100644 index 0000000000000..14a71bdb86835 --- /dev/null +++ b/execution/block-partitioner/src/v2/types.rs @@ -0,0 +1,88 @@ +// Copyright © Aptos Foundation + +use aptos_types::block_executor::partitioner::{ + RoundId, ShardId, GLOBAL_ROUND_ID, GLOBAL_SHARD_ID, +}; +use serde::{Deserialize, Serialize}; +use std::cmp; + +/// Represent which sub-block a txn is assigned to. +/// TODO: switch to enum to better represent the sub-block assigned to the global executor. +#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize, Deserialize)] +pub struct SubBlockIdx { + pub round_id: RoundId, + pub shard_id: ShardId, +} + +impl Ord for SubBlockIdx { + fn cmp(&self, other: &Self) -> cmp::Ordering { + (self.round_id, self.shard_id).cmp(&(other.round_id, other.shard_id)) + } +} + +impl PartialOrd for SubBlockIdx { + fn partial_cmp(&self, other: &Self) -> Option { + (self.round_id, self.shard_id).partial_cmp(&(other.round_id, other.shard_id)) + } +} + +impl SubBlockIdx { + pub fn new(round_id: RoundId, shard_id: ShardId) -> Self { + SubBlockIdx { round_id, shard_id } + } + + pub fn global() -> SubBlockIdx { + SubBlockIdx::new(GLOBAL_ROUND_ID, GLOBAL_SHARD_ID) + } +} + +/// The position of a txn in the block after `pre_partition` and before `partition_to_matrix`. +pub type PrePartitionedTxnIdx = usize; + +/// Represent a specific storage location in a partitioning session. +/// TODO: ensure this type can support max num of unique state keys in a block. +pub type StorageKeyIdx = usize; + +/// Represent a sender in a partitioning session. +pub type SenderIdx = usize; + +/// Represents positions of a txn after it is assigned to a sub-block. +/// +/// Different from `aptos_types::block_executor::partitioner::ShardedTxnIndex`, +#[derive(Copy, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] +pub struct ShardedTxnIndexV2 { + pub sub_block_idx: SubBlockIdx, + pub ori_txn_idx: PrePartitionedTxnIdx, +} + +impl Ord for ShardedTxnIndexV2 { + fn cmp(&self, other: &Self) -> cmp::Ordering { + (self.sub_block_idx, self.ori_txn_idx).cmp(&(other.sub_block_idx, other.ori_txn_idx)) + } +} + +impl PartialOrd for ShardedTxnIndexV2 { + fn partial_cmp(&self, other: &Self) -> Option { + (self.sub_block_idx, self.ori_txn_idx) + .partial_cmp(&(other.sub_block_idx, other.ori_txn_idx)) + } +} + +impl ShardedTxnIndexV2 { + pub fn round_id(&self) -> RoundId { + self.sub_block_idx.round_id + } + + pub fn shard_id(&self) -> ShardId { + self.sub_block_idx.shard_id + } +} + +impl ShardedTxnIndexV2 { + pub fn new(round_id: RoundId, shard_id: ShardId, ori_txn_idx: PrePartitionedTxnIdx) -> Self { + Self { + sub_block_idx: SubBlockIdx::new(round_id, shard_id), + ori_txn_idx, + } + } +} diff --git a/execution/executor-benchmark/src/block_partitioning.rs b/execution/executor-benchmark/src/block_partitioning.rs index f98c0f197a5ff..1b474fb3a0234 100644 --- a/execution/executor-benchmark/src/block_partitioning.rs +++ b/execution/executor-benchmark/src/block_partitioning.rs @@ -1,9 +1,7 @@ // Copyright © Aptos Foundation -use crate::pipeline::ExecuteBlockMessage; -use aptos_block_partitioner::{ - sharded_block_partitioner::ShardedBlockPartitioner, BlockPartitionerConfig, -}; +use crate::{metrics::TIMER, pipeline::ExecuteBlockMessage}; +use aptos_block_partitioner::{BlockPartitioner, PartitionerConfig}; use aptos_crypto::HashValue; use aptos_logger::info; use aptos_types::{ @@ -13,26 +11,22 @@ use aptos_types::{ use std::time::Instant; pub(crate) struct BlockPartitioningStage { + num_executor_shards: usize, num_blocks_processed: usize, - maybe_partitioner: Option, + maybe_partitioner: Option>, } impl BlockPartitioningStage { - pub fn new(num_shards: usize, partition_last_round: bool) -> Self { + pub fn new(num_shards: usize, partitioner_config: PartitionerConfig) -> Self { let maybe_partitioner = if num_shards <= 1 { None } else { - info!("Starting a sharded block partitioner with {} shards and last round partitioning {}", num_shards, partition_last_round); - let partitioner = BlockPartitionerConfig::default() - .num_shards(num_shards) - .max_partitioning_rounds(4) - .cross_shard_dep_avoid_threshold(0.95) - .partition_last_round(partition_last_round) - .build(); + let partitioner = partitioner_config.build(); Some(partitioner) }; Self { + num_executor_shards: num_shards, num_blocks_processed: 0, maybe_partitioner, } @@ -51,7 +45,10 @@ impl BlockPartitioningStage { Some(partitioner) => { let last_txn = txns.pop().unwrap(); let analyzed_transactions = txns.into_iter().map(|t| t.into()).collect(); - let mut partitioned_txns = partitioner.partition(analyzed_transactions); + let timer = TIMER.with_label_values(&["partition"]).start_timer(); + timer.stop_and_record(); + let mut partitioned_txns = + partitioner.partition(analyzed_transactions, self.num_executor_shards); partitioned_txns.add_checkpoint_txn(last_txn); ExecutableBlock::new(block_id, ExecutableTransactions::Sharded(partitioned_txns)) }, diff --git a/execution/executor-benchmark/src/lib.rs b/execution/executor-benchmark/src/lib.rs index a5ffa0e8cca75..3c82c5e73934a 100644 --- a/execution/executor-benchmark/src/lib.rs +++ b/execution/executor-benchmark/src/lib.rs @@ -19,6 +19,7 @@ use crate::{ transaction_executor::TransactionExecutor, transaction_generator::TransactionGenerator, }; use aptos_block_executor::counters as block_executor_counters; +use aptos_block_partitioner::PartitionerConfig; use aptos_config::config::{NodeConfig, PrunerConfig}; use aptos_db::AptosDB; use aptos_executor::{ @@ -171,6 +172,7 @@ pub fn run_benchmark( num_executor_shards: 1, async_partitioning: false, use_global_executor: false, + partitioner_config: PartitionerConfig::default(), }, ) }); @@ -178,7 +180,7 @@ pub fn run_benchmark( let version = db.reader.get_latest_version().unwrap(); let (pipeline, block_sender) = - Pipeline::new(executor, version, pipeline_config.clone(), Some(num_blocks)); + Pipeline::new(executor, version, pipeline_config, Some(num_blocks)); let mut num_accounts_to_load = num_main_signer_accounts; if let Some(mix) = &transaction_mix { @@ -584,6 +586,7 @@ mod tests { num_executor_shards: 1, async_partitioning: false, use_global_executor: false, + partitioner_config: Default::default(), }, ); @@ -614,6 +617,7 @@ mod tests { num_executor_shards: 1, async_partitioning: false, use_global_executor: false, + partitioner_config: Default::default(), }, ); } diff --git a/execution/executor-benchmark/src/main.rs b/execution/executor-benchmark/src/main.rs index 4d62cc5d739a4..722bd2e1fd862 100644 --- a/execution/executor-benchmark/src/main.rs +++ b/execution/executor-benchmark/src/main.rs @@ -2,6 +2,10 @@ // Parts of the project are originally copyright © Meta Platforms, Inc. // SPDX-License-Identifier: Apache-2.0 +use aptos_block_partitioner::{ + sharded_block_partitioner::config::PartitionerV1Config, v2::config::PartitionerV2Config, + PartitionerConfig, +}; use aptos_config::config::{ EpochSnapshotPrunerConfig, LedgerPrunerConfig, PrunerConfig, StateMerklePrunerConfig, }; @@ -100,6 +104,16 @@ pub struct PipelineOpt { async_partitioning: bool, #[clap(long)] use_global_executor: bool, + #[clap(long, default_value = "4")] + max_partitioning_rounds: usize, + #[clap(long, default_value = "0.90")] + partitioner_cross_shard_dep_avoid_threshold: f32, + #[clap(long, default_value = "2")] + partitioner_version: usize, + #[clap(long, default_value = "8")] + partitioner_v2_num_threads: usize, + #[clap(long, default_value = "64")] + partitioner_v2_dashmap_num_shards: usize, } impl PipelineOpt { @@ -113,6 +127,26 @@ impl PipelineOpt { num_executor_shards: self.num_executor_shards, async_partitioning: self.async_partitioning, use_global_executor: self.use_global_executor, + partitioner_config: self.partitioner_config(), + } + } + + fn partitioner_config(&self) -> PartitionerConfig { + match self.partitioner_version { + 1 => PartitionerConfig::V1(PartitionerV1Config { + num_shards: self.num_executor_shards, + max_partitioning_rounds: self.max_partitioning_rounds, + cross_shard_dep_avoid_threshold: self.partitioner_cross_shard_dep_avoid_threshold, + partition_last_round: !self.use_global_executor, + }), + 2 => PartitionerConfig::V2(PartitionerV2Config { + num_threads: self.partitioner_v2_num_threads, + max_partitioning_rounds: self.max_partitioning_rounds, + cross_shard_dep_avoid_threshold: self.partitioner_cross_shard_dep_avoid_threshold, + dashmap_num_shards: self.partitioner_v2_dashmap_num_shards, + partition_last_round: self.use_global_executor, + }), + _ => panic!("Unknown partitioner version: {}", self.partitioner_version), } } } diff --git a/execution/executor-benchmark/src/pipeline.rs b/execution/executor-benchmark/src/pipeline.rs index 3a37e13212851..876eabf4f23f5 100644 --- a/execution/executor-benchmark/src/pipeline.rs +++ b/execution/executor-benchmark/src/pipeline.rs @@ -5,6 +5,7 @@ use crate::{ block_partitioning::BlockPartitioningStage, GasMesurement, TransactionCommitter, TransactionExecutor, }; +use aptos_block_partitioner::PartitionerConfig; use aptos_crypto::HashValue; use aptos_executor::block_executor::{BlockExecutor, TransactionBlockExecutor}; use aptos_executor_types::BlockExecutorTrait; @@ -23,7 +24,7 @@ use std::{ time::{Duration, Instant}, }; -#[derive(Clone, Debug)] +#[derive(Clone, Copy, Debug)] pub struct PipelineConfig { pub delay_execution_start: bool, pub split_stages: bool, @@ -33,6 +34,7 @@ pub struct PipelineConfig { pub num_executor_shards: usize, pub async_partitioning: bool, pub use_global_executor: bool, + pub partitioner_config: PartitionerConfig, } pub struct Pipeline { @@ -92,7 +94,7 @@ where let mut join_handles = vec![]; let mut partitioning_stage = - BlockPartitioningStage::new(num_partitioner_shards, !config.use_global_executor); + BlockPartitioningStage::new(num_partitioner_shards, config.partitioner_config); let mut exe = TransactionExecutor::new( executor_1, diff --git a/execution/executor-service/src/test_utils.rs b/execution/executor-service/src/test_utils.rs index 39efe84cdc966..b8c3a4a8e2ec2 100644 --- a/execution/executor-service/src/test_utils.rs +++ b/execution/executor-service/src/test_utils.rs @@ -1,6 +1,6 @@ // Copyright © Aptos Foundation -use aptos_block_partitioner::BlockPartitionerConfig; +use aptos_block_partitioner::sharded_block_partitioner::config::PartitionerV1Config; use aptos_language_e2e_tests::{ account::AccountData, common_transactions::peer_to_peer_txn, data_store::FakeDataStore, executor::FakeExecutor, @@ -101,13 +101,13 @@ pub fn test_sharded_block_executor_no_conflict> for _ in 0..num_txns { transactions.push(generate_non_conflicting_p2p(&mut executor).0) } - let partitioner = BlockPartitionerConfig::default() + let partitioner = PartitionerV1Config::default() .num_shards(num_shards) .max_partitioning_rounds(2) .cross_shard_dep_avoid_threshold(0.9) .partition_last_round(true) .build(); - let partitioned_txns = partitioner.partition(transactions.clone()); + let partitioned_txns = partitioner.partition(transactions.clone(), num_shards); let sharded_txn_output = sharded_block_executor .execute_block( Arc::new(executor.data_store().clone()), diff --git a/types/src/block_executor/partitioner.rs b/types/src/block_executor/partitioner.rs index 873a86db6022d..79be45014514c 100644 --- a/types/src/block_executor/partitioner.rs +++ b/types/src/block_executor/partitioner.rs @@ -6,7 +6,10 @@ use crate::transaction::{ }; use aptos_crypto::HashValue; use serde::{Deserialize, Serialize}; -use std::collections::HashMap; +use std::{ + cmp::Ordering, + collections::{HashMap, HashSet}, +}; pub type ShardId = usize; pub type TxnIndex = usize; @@ -14,14 +17,25 @@ pub type RoundId = usize; pub static MAX_ALLOWED_PARTITIONING_ROUNDS: usize = 8; pub static GLOBAL_ROUND_ID: usize = MAX_ALLOWED_PARTITIONING_ROUNDS + 1; +pub static GLOBAL_SHARD_ID: usize = usize::MAX; -#[derive(Debug, Clone, Eq, Hash, PartialEq, Serialize, Deserialize)] +#[derive(Debug, Clone, Copy, Eq, Hash, PartialEq, Serialize, Deserialize)] pub struct ShardedTxnIndex { pub txn_index: TxnIndex, pub shard_id: ShardId, pub round_id: RoundId, } +impl PartialOrd for ShardedTxnIndex { + fn partial_cmp(&self, other: &Self) -> Option { + (self.round_id, self.shard_id, self.txn_index).partial_cmp(&( + other.round_id, + other.shard_id, + other.txn_index, + )) + } +} + impl ShardedTxnIndex { pub fn new(txn_index: TxnIndex, shard_id: ShardId, round_id: RoundId) -> Self { Self { @@ -36,9 +50,41 @@ impl ShardedTxnIndex { /// Denotes a set of cross shard edges, which contains the set (required or dependent) transaction /// indices and the relevant storage locations that are conflicting. pub struct CrossShardEdges { - edges: HashMap>, + pub edges: HashMap>, } +impl PartialEq for CrossShardEdges { + fn eq(&self, other: &Self) -> bool { + let my_key_set = self.edges.keys().copied().collect::>(); + let other_key_set = other.edges.keys().copied().collect::>(); + if my_key_set != other_key_set { + return false; + } + for key in my_key_set { + let my_value = self + .edges + .get(&key) + .unwrap() + .clone() + .into_iter() + .collect::>(); + let other_value = other + .edges + .get(&key) + .unwrap() + .clone() + .into_iter() + .collect::>(); + if my_value != other_value { + return false; + } + } + true + } +} + +impl Eq for CrossShardEdges {} + impl CrossShardEdges { pub fn new(txn_idx: ShardedTxnIndex, storage_locations: Vec) -> Self { let mut edges = HashMap::new(); @@ -79,7 +125,7 @@ impl IntoIterator for CrossShardEdges { } } -#[derive(Default, Debug, Clone, Serialize, Deserialize)] +#[derive(Default, Debug, Clone, Serialize, Deserialize, Eq, PartialEq)] /// Represents the dependencies of a transaction on other transactions across shards. Two types /// of dependencies are supported: /// 1. `required_edges`: The transaction depends on the execution of the transactions in the set. In this @@ -89,8 +135,8 @@ impl IntoIterator for CrossShardEdges { /// Dependent edge is a reverse of required edge, for example if txn 20 in shard 2 requires txn 10 in shard 1, /// then txn 10 in shard 1 will have a dependent edge to txn 20 in shard 2. pub struct CrossShardDependencies { - required_edges: CrossShardEdges, - dependent_edges: CrossShardEdges, + pub required_edges: CrossShardEdges, + pub dependent_edges: CrossShardEdges, } impl CrossShardDependencies { @@ -166,7 +212,7 @@ impl CrossShardDependencies { /// | Transaction 3 | Transaction 6 | Transaction 9 | /// +----------------+------------------+------------------+ /// ``` -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)] pub struct SubBlock { // This is the index of first transaction relative to the block. pub start_index: TxnIndex, @@ -252,7 +298,7 @@ impl IntoIterator for SubBlock { } // A set of sub blocks assigned to a shard. -#[derive(Default, Clone, Debug, Serialize, Deserialize)] +#[derive(Default, Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] pub struct SubBlocksForShard { pub shard_id: ShardId, pub sub_blocks: Vec>, @@ -346,7 +392,7 @@ impl SubBlocksForShard { } } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)] pub struct TransactionWithDependencies { pub txn: T, pub cross_shard_dependencies: CrossShardDependencies, @@ -402,7 +448,7 @@ impl From<(HashValue, Vec)> for ExecutableBlock { } } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Eq, PartialEq)] pub struct PartitionedTransactions { pub sharded_txns: Vec>, pub global_txns: Vec>, @@ -443,12 +489,15 @@ impl PartitionedTransactions { &self.sharded_txns } - pub fn num_txns(&self) -> usize { + pub fn num_sharded_txns(&self) -> usize { self.sharded_txns .iter() .map(|sub_blocks| sub_blocks.num_txns()) .sum::() - + self.global_txns.len() + } + + pub fn num_txns(&self) -> usize { + self.num_sharded_txns() + self.global_txns.len() } pub fn add_checkpoint_txn(&mut self, last_txn: Transaction) { diff --git a/types/src/transaction/analyzed_transaction.rs b/types/src/transaction/analyzed_transaction.rs index 6f6fb502ff180..6b98c2d663e3c 100644 --- a/types/src/transaction/analyzed_transaction.rs +++ b/types/src/transaction/analyzed_transaction.rs @@ -24,10 +24,10 @@ pub struct AnalyzedTransaction { /// Set of storage locations that are read by the transaction - this doesn't include location /// that are written by the transactions to avoid duplication of locations across read and write sets /// This can be accurate or strictly overestimated. - read_hints: Vec, + pub read_hints: Vec, /// Set of storage locations that are written by the transaction. This can be accurate or strictly /// overestimated. - write_hints: Vec, + pub write_hints: Vec, /// A transaction is predictable if neither the read_hint or the write_hint have wildcards. predictable_transaction: bool, /// The hash of the transaction - this is cached for performance reasons.