diff --git a/benchmark/benchmark/commands.py b/benchmark/benchmark/commands.py index cbffcac..f453eb3 100644 --- a/benchmark/benchmark/commands.py +++ b/benchmark/benchmark/commands.py @@ -46,14 +46,15 @@ def run_worker(keys, committee, store, parameters, id, debug=False): f'--store {store} --parameters {parameters} worker --id {id}') @staticmethod - def run_client(address, size, rate, nodes): + def run_client(address, size, burst, rate, nodes): assert isinstance(address, str) assert isinstance(size, int) and size > 0 + assert isinstance(burst, int) and burst > 0 assert isinstance(rate, int) and rate >= 0 assert isinstance(nodes, list) assert all(isinstance(x, str) for x in nodes) nodes = f'--nodes {" ".join(nodes)}' if nodes else '' - return f'./benchmark_client {address} --size {size} --rate {rate} {nodes}' + return f'./benchmark_client {address} --size {size} --burst {burst} --rate {rate} {nodes}' @staticmethod def kill(): diff --git a/benchmark/benchmark/config.py b/benchmark/benchmark/config.py index eb39bb8..07da728 100644 --- a/benchmark/benchmark/config.py +++ b/benchmark/benchmark/config.py @@ -216,6 +216,9 @@ def __init__(self, json): self.duration = int(json['duration']) self.runs = int(json['runs']) if 'runs' in json else 1 + + self.burst = json['burst'] + except KeyError as e: raise ConfigError(f'Malformed bench parameters: missing key {e}') diff --git a/benchmark/benchmark/local.py b/benchmark/benchmark/local.py index 07cc17a..a37c674 100644 --- a/benchmark/benchmark/local.py +++ b/benchmark/benchmark/local.py @@ -81,6 +81,7 @@ def run(self, debug=False): cmd = CommandMaker.run_client( address, self.tx_size, + self.burst, rate_share, [x for y in workers_addresses for _, x in y] ) diff --git a/benchmark/benchmark/remote.py b/benchmark/benchmark/remote.py index 30b1532..30114d5 100644 --- a/benchmark/benchmark/remote.py +++ b/benchmark/benchmark/remote.py @@ -210,7 +210,7 @@ def _config(self, hosts, node_parameters, bench_parameters): return committee - def _run_single(self, rate, committee, bench_parameters, debug=False): + def _run_single(self, rate, burst, committee, bench_parameters, debug=False): faults = bench_parameters.faults # Kill any potentially unfinished run and delete logs. @@ -229,6 +229,7 @@ def _run_single(self, rate, committee, bench_parameters, debug=False): cmd = CommandMaker.run_client( address, bench_parameters.tx_size, + burst, rate_share, [x for y in workers_addresses for _, x in y] ) @@ -342,15 +343,16 @@ def run(self, bench_parameters_dict, node_parameters_dict, debug=False): committee_copy = deepcopy(committee) committee_copy.remove_nodes(committee.size() - n) - for r in bench_parameters.rate: - Print.heading(f'\nRunning {n} nodes (input rate: {r:,} tx/s)') + for burst in bench_parameters.burst: + rate = bench_parameters.rate[0] + Print.heading(f'\nRunning {n} nodes (input rate: {rate:,} tx/s, burst : {burst:,})') # Run the benchmark. for i in range(bench_parameters.runs): Print.heading(f'Run {i+1}/{bench_parameters.runs}') try: self._run_single( - r, committee_copy, bench_parameters, debug + rate, burst, committee_copy, bench_parameters, debug ) faults = bench_parameters.faults diff --git a/benchmark/fabfile.py b/benchmark/fabfile.py index eef2d28..b162c6d 100644 --- a/benchmark/fabfile.py +++ b/benchmark/fabfile.py @@ -19,6 +19,7 @@ def local(ctx, debug=True): 'rate': 50_000, 'tx_size': 512, 'duration': 20, + "burst" : 50 } node_params = { 'header_size': 50, # bytes @@ -37,7 +38,7 @@ def local(ctx, debug=True): @task -def create(ctx, nodes=10): +def create(ctx, nodes=1): ''' Create a testbed''' try: InstanceManager.make().create_instances(nodes) @@ -49,16 +50,16 @@ def create(ctx, nodes=10): def destroy(ctx): ''' Destroy the testbed ''' try: - InstanceManager.make().terminate_instances() + InstanceManager.make().delete_instances() except BenchError as e: Print.error(e) @task -def start(ctx, max=2): +def start(ctx): ''' Start at most `max` machines per data center ''' try: - InstanceManager.make().start_instances(max) + InstanceManager.make().start_instances() except BenchError as e: Print.error(e) @@ -91,18 +92,25 @@ def install(ctx): @task -def remote(ctx, debug=False): - ''' Run benchmarks on AWS ''' +def remote(ctx, burst = 50, debug=False): + ''' Run benchmarks on GCP ''' bench_params = { 'faults': 0, - 'nodes': [10, 20], + 'nodes': 5, 'workers': 1, 'collocate': True, - 'rate': [10_000, 50_000], + 'rate': [50000], 'tx_size': 512, - 'duration': 300, - 'runs': 2, - } + 'duration': 20, + 'runs': 1, + 'burst' : [burst], + } + + precision = 20 + nodes = bench_params['nodes'] + rate = 1000 * nodes * precision + bench_params['rate'] = [rate] + node_params = { 'header_size': 50, # bytes 'max_header_delay': 5_000, # ms diff --git a/node/src/benchmark_client.rs b/node/src/benchmark_client.rs index 35703bf..a3b1c55 100644 --- a/node/src/benchmark_client.rs +++ b/node/src/benchmark_client.rs @@ -20,6 +20,7 @@ async fn main() -> Result<()> { .about("Benchmark client for Narwhal and Tusk.") .args_from_usage(" 'The network address of the node where to send txs'") .args_from_usage("--size= 'The size of each transaction in bytes'") + .args_from_usage("--burst= 'Burst duration (in ms)'") .args_from_usage("--rate= 'The rate (txs/s) at which to send the transactions'") .args_from_usage("--nodes=[ADDR]... 'Network addresses that must be reachable before starting the benchmark.'") .setting(AppSettings::ArgRequiredElseHelp) @@ -39,6 +40,11 @@ async fn main() -> Result<()> { .unwrap() .parse::() .context("The size of transactions must be a non-negative integer")?; + let burst_duration = matches + .value_of("burst") + .unwrap() + .parse::() + .context("Burst duration must be a non-negative integer")?; let rate = matches .value_of("rate") .unwrap() @@ -65,6 +71,7 @@ async fn main() -> Result<()> { size, rate, nodes, + burst_duration }; // Wait for all nodes to be online and synchronized. @@ -79,12 +86,13 @@ struct Client { size: usize, rate: u64, nodes: Vec, + burst_duration: u64, } impl Client { pub async fn send(&self) -> Result<()> { const PRECISION: u64 = 20; // Sample precision. - const BURST_DURATION: u64 = 1000 / PRECISION; + info!("Burst duration {:?}", self.burst_duration); // The transaction size must be at least 16 bytes to ensure all txs are different. if self.size < 9 { @@ -104,7 +112,7 @@ impl Client { let mut counter = 0; let mut r = rand::thread_rng().gen(); let mut transport = Framed::new(stream, LengthDelimitedCodec::new()); - let interval = interval(Duration::from_millis(BURST_DURATION)); + let interval = interval(Duration::from_millis(self.burst_duration)); tokio::pin!(interval); // NOTE: This log entry is used to compute performance. @@ -134,7 +142,7 @@ impl Client { break 'main; } } - if now.elapsed().as_millis() > BURST_DURATION as u128 { + if now.elapsed().as_millis() > self.burst_duration as u128 { // NOTE: This log entry is used to compute performance. warn!("Transaction rate too high for this client"); }