Skip to content

Commit

Permalink
Increase num users
Browse files Browse the repository at this point in the history
  • Loading branch information
vusirikala committed Nov 19, 2024
1 parent 648372d commit 3ca32c4
Show file tree
Hide file tree
Showing 3 changed files with 198 additions and 14 deletions.
90 changes: 86 additions & 4 deletions crates/transaction-generator-lib/src/accounts_pool_wrapper.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,99 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use crate::{ObjectPool, TransactionGenerator, TransactionGeneratorCreator};
use aptos_sdk::types::{
transaction::{SignedTransaction, TransactionPayload},
LocalAccount,
use crate::{ObjectPool, BucketedAccountPool, TransactionGenerator, TransactionGeneratorCreator};
use aptos_sdk::{
types::{
transaction::{SignedTransaction, TransactionPayload},
LocalAccount,
},
move_types::account_address::AccountAddress,
};
use rand::{rngs::StdRng, Rng, SeedableRng};
use std::{
collections::HashMap,
sync::{atomic::AtomicU64, Arc},
};

pub struct BucketedAccountsPoolWrapperGenerator {
rng: StdRng,
generator: Box<dyn TransactionGenerator>,
source_accounts_pool: Arc<ObjectPool<LocalAccount>>,
destination_accounts_pool: Option<Arc<BucketedAccountPool<AccountAddress>>>,
}

impl BucketedAccountsPoolWrapperGenerator {
pub fn new(
rng: StdRng,
generator: Box<dyn TransactionGenerator>,
source_accounts_pool: Arc<ObjectPool<LocalAccount>>,
destination_accounts_pool: Option<Arc<BucketedAccountPool<AccountAddress>>>,
) -> Self {
Self {
rng,
generator,
source_accounts_pool,
destination_accounts_pool,
}
}
}

impl TransactionGenerator for BucketedAccountsPoolWrapperGenerator {
fn generate_transactions(
&mut self,
_account: &LocalAccount,
num_to_create: usize,
history: &[String],
market_maker: bool,
) -> Vec<SignedTransaction> {
let accounts_to_use =
self.source_accounts_pool
.take_from_pool(num_to_create, true, &mut self.rng);
if accounts_to_use.is_empty() {
return Vec::new();
}
let txns = accounts_to_use
.iter()
.flat_map(|account| self.generator.generate_transactions(account, num_to_create, history, market_maker))
.collect();
if let Some(destination_accounts_pool) = &self.destination_accounts_pool {
destination_accounts_pool.add_to_pool(accounts_to_use);
}
txns
}
}

pub struct BucketedAccountsPoolWrapperCreator {
creator: Box<dyn TransactionGeneratorCreator>,
source_accounts_pool: Arc<ObjectPool<LocalAccount>>,
destination_accounts_pool: Option<Arc<BucketedAccountPool<AccountAddress>>>,
}

impl BucketedAccountsPoolWrapperCreator {
pub fn new(
creator: Box<dyn TransactionGeneratorCreator>,
source_accounts_pool: Arc<ObjectPool<LocalAccount>>,
destination_accounts_pool: Option<Arc<BucketedAccountPool<AccountAddress>>>,
) -> Self {
Self {
creator,
source_accounts_pool,
destination_accounts_pool,
}
}
}

impl TransactionGeneratorCreator for BucketedAccountsPoolWrapperCreator {
fn create_transaction_generator(&self, txn_counter: Arc<AtomicU64>) -> Box<dyn TransactionGenerator> {
Box::new(BucketedAccountsPoolWrapperGenerator::new(
StdRng::from_entropy(),
self.creator.create_transaction_generator(txn_counter),
self.source_accounts_pool.clone(),
self.destination_accounts_pool.clone(),
))
}
}

/// Wrapper that allows inner transaction generator to have unique accounts
/// for all transactions (instead of having 5-20 transactions per account, as default)
/// This is achieved via using accounts from the pool that account creatin can fill,
Expand Down
20 changes: 10 additions & 10 deletions crates/transaction-generator-lib/src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,7 @@ progress_type: WorkflowProgress::WhenDone {
},
TransactionTypeArg::EconiaBasic1MarketReuseAccounts => TransactionType::Workflow {
workflow_kind: WorkflowKind::Econia {
num_users: 20000,
num_users: 600000,
flow_type: crate::EconiaFlowType::Basic,
num_markets: 1,
reuse_accounts_for_orders: true,
Expand All @@ -529,7 +529,7 @@ progress_type: WorkflowProgress::WhenDone {
},
TransactionTypeArg::EconiaMixed1MarketReuseAccounts => TransactionType::Workflow {
workflow_kind: WorkflowKind::Econia {
num_users: 20000,
num_users: 600000,
flow_type: crate::EconiaFlowType::Mixed,
num_markets: 1,
reuse_accounts_for_orders: true,
Expand All @@ -544,7 +544,7 @@ progress_type: WorkflowProgress::WhenDone {
},
TransactionTypeArg::EconiaMixed10MarketReuseAccounts => TransactionType::Workflow {
workflow_kind: WorkflowKind::Econia {
num_users: 20000,
num_users: 600000,
flow_type: crate::EconiaFlowType::Mixed,
num_markets: 10,
reuse_accounts_for_orders: true,
Expand All @@ -559,7 +559,7 @@ progress_type: WorkflowProgress::WhenDone {
},
TransactionTypeArg::EconiaMixed100MarketReuseAccounts => TransactionType::Workflow {
workflow_kind: WorkflowKind::Econia {
num_users: 20000,
num_users: 600000,
flow_type: crate::EconiaFlowType::Mixed,
num_markets: 100,
reuse_accounts_for_orders: true,
Expand All @@ -574,7 +574,7 @@ progress_type: WorkflowProgress::WhenDone {
},
TransactionTypeArg::EconiaReal => TransactionType::Workflow {
workflow_kind: WorkflowKind::Econia {
num_users: 20000,
num_users: 600000,
flow_type: crate::EconiaFlowType::Real,
num_markets: 2,
reuse_accounts_for_orders: true,
Expand All @@ -589,7 +589,7 @@ progress_type: WorkflowProgress::WhenDone {
},
TransactionTypeArg::EconiaRealNoPublish => TransactionType::Workflow {
workflow_kind: WorkflowKind::Econia {
num_users: 20000,
num_users: 600000,
flow_type: crate::EconiaFlowType::Real,
num_markets: 2,
reuse_accounts_for_orders: true,
Expand All @@ -604,7 +604,7 @@ progress_type: WorkflowProgress::WhenDone {
},
TransactionTypeArg::EconiaMarket1MarketReuseAccounts => TransactionType::Workflow {
workflow_kind: WorkflowKind::Econia {
num_users: 20000,
num_users: 600000,
flow_type: crate::EconiaFlowType::Market,
num_markets: 1,
reuse_accounts_for_orders: true,
Expand All @@ -619,7 +619,7 @@ progress_type: WorkflowProgress::WhenDone {
},
TransactionTypeArg::EconiaMarket10MarketReuseAccounts => TransactionType::Workflow {
workflow_kind: WorkflowKind::Econia {
num_users: 200000,
num_users: 600000,
flow_type: crate::EconiaFlowType::Market,
num_markets: 10,
reuse_accounts_for_orders: true,
Expand All @@ -634,7 +634,7 @@ progress_type: WorkflowProgress::WhenDone {
},
TransactionTypeArg::EconiaMarket100MarketReuseAccounts => TransactionType::Workflow {
workflow_kind: WorkflowKind::Econia {
num_users: 2000000,
num_users: 600000,
flow_type: crate::EconiaFlowType::Market,
num_markets: 100,
reuse_accounts_for_orders: true,
Expand All @@ -650,7 +650,7 @@ progress_type: WorkflowProgress::WhenDone {
TransactionTypeArg::EconiaMarket1MarketReuseAccountsNoPublish => {
TransactionType::Workflow {
workflow_kind: WorkflowKind::Econia {
num_users: 20000,
num_users: 600000,
flow_type: crate::EconiaFlowType::Market,
num_markets: 1,
reuse_accounts_for_orders: true,
Expand Down
102 changes: 102 additions & 0 deletions crates/transaction-generator-lib/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use std::{
Arc,
},
time::Duration,
hash::Hash,
};

mod account_generator;
Expand Down Expand Up @@ -418,6 +419,107 @@ pub async fn create_txn_generator_creator(
)
}

pub struct BucketedAccountPool<Bucket> {
pool: RwLock<HashMap<Bucket, Vec<LocalAccount>>>,
all_buckets: Arc<Vec<Bucket>>,
current_index: AtomicUsize,
object_to_bucket_map: RwLock<HashMap<AccountAddress, Bucket>>,
}

impl<Bucket: Clone + Eq + PartialEq + Hash> BucketedAccountPool<Bucket> {
pub(crate) fn new(buckets: Arc<Vec<Bucket>>) -> Self {
let mut pool = HashMap::new();
for bucket in buckets.iter() {
pool.insert(bucket.clone(), Vec::new());
}
Self {
pool: RwLock::new(pool),
all_buckets: buckets,
current_index: AtomicUsize::new(0),
object_to_bucket_map: RwLock::new(HashMap::new()),
}
}

pub(crate) fn add_to_bucket(&self, bucket: Bucket, mut addition: Vec<LocalAccount>) {
assert!(!addition.is_empty());
let mut current = self.pool.write();
let mut object_to_bucket_map = self.object_to_bucket_map.write();
object_to_bucket_map.extend(addition.iter().map(|object| (object.address(), bucket.clone())));
current
.entry(bucket)
.or_insert_with(Vec::new)
.append(&mut addition);
}


pub(crate) fn add_to_pool(&self, addition: Vec<LocalAccount>) {
assert!(!addition.is_empty());
let mut current = self.pool.write();
let mut object_to_bucket_map = self.object_to_bucket_map.write();
for object in addition {
let current_index = self.current_index.load(Ordering::Relaxed);
let current_bucket = self.all_buckets[current_index].clone();
let object_address = object.address();
current
.entry(current_bucket.clone())
.or_insert_with(Vec::new)
.append(&mut vec![object]);
self.current_index.store((current_index + 1) % self.all_buckets.len(), Ordering::Relaxed);
object_to_bucket_map.insert(object_address, current_bucket);
}
}

pub(crate) fn take_from_pool(
&self,
bucket: Bucket,
needed: usize,
return_partial: bool,
rng: &mut StdRng,
) -> Vec<LocalAccount> {
let mut current = self.pool.write();
let num_in_pool = current.get_mut(&bucket).map_or(0, |v| v.len());
if !return_partial && num_in_pool < needed {
sample!(
SampleRate::Duration(Duration::from_secs(10)),
warn!("Cannot fetch enough from shared pool, left in pool {}, needed {}", num_in_pool, needed);
);
return Vec::new();
}
let num_to_return = std::cmp::min(num_in_pool, needed);
let current_bucket = current.get_mut(&bucket).unwrap();
let mut result = current_bucket
.drain((num_in_pool - num_to_return)..)
.collect::<Vec<_>>();

if current_bucket.len() > num_to_return {
let start = rng.gen_range(0, current_bucket.len() - num_to_return);
current_bucket[start..start + num_to_return].swap_with_slice(&mut result);
}
result
}

pub(crate) fn update_sequence_number(
&self,
object_address: &AccountAddress,
sequence_number: u64,
) {
info!("Called update sequence number for {} {}", object_address, sequence_number);
let mut current = self.pool.write();
if let Some(bucket) = self.object_to_bucket_map.read().get(object_address).and_then(|bucket| current.get_mut(bucket)) {
for object in bucket.iter_mut() {
if object.address() == *object_address {
if sequence_number < object.sequence_number() {
info!("Sequence number for {} decreased from {} to {}", object_address, object.sequence_number(), sequence_number);
object.set_sequence_number(sequence_number);
} else {
info!("Sequence number for {} not updated", object_address);
}
}
}
}
}
}

/// Simple object pool structure, that you can add and remove from multiple threads.
/// Taking is done at random positions, but sequentially.
/// Overflow replaces at random positions as well.
Expand Down

0 comments on commit 3ca32c4

Please sign in to comment.