Skip to content

Commit

Permalink
added burst parameter and changes in relevant files
Browse files Browse the repository at this point in the history
  • Loading branch information
naitik-supraoracles committed Apr 15, 2024
1 parent e4a2a86 commit 1c19a44
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 20 deletions.
5 changes: 3 additions & 2 deletions benchmark/benchmark/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,15 @@ def run_worker(keys, committee, store, parameters, id, debug=False):
f'--store {store} --parameters {parameters} worker --id {id}')

@staticmethod
def run_client(address, size, rate, nodes):
def run_client(address, size, burst, rate, nodes):
assert isinstance(address, str)
assert isinstance(size, int) and size > 0
assert isinstance(burst, int) and burst > 0
assert isinstance(rate, int) and rate >= 0
assert isinstance(nodes, list)
assert all(isinstance(x, str) for x in nodes)
nodes = f'--nodes {" ".join(nodes)}' if nodes else ''
return f'./benchmark_client {address} --size {size} --rate {rate} {nodes}'
return f'./benchmark_client {address} --size {size} --burst {burst} --rate {rate} {nodes}'

@staticmethod
def kill():
Expand Down
3 changes: 3 additions & 0 deletions benchmark/benchmark/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,9 @@ def __init__(self, json):
self.duration = int(json['duration'])

self.runs = int(json['runs']) if 'runs' in json else 1

self.burst = json['burst']

except KeyError as e:
raise ConfigError(f'Malformed bench parameters: missing key {e}')

Expand Down
1 change: 1 addition & 0 deletions benchmark/benchmark/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ def run(self, debug=False):
cmd = CommandMaker.run_client(
address,
self.tx_size,
self.burst,
rate_share,
[x for y in workers_addresses for _, x in y]
)
Expand Down
10 changes: 6 additions & 4 deletions benchmark/benchmark/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ def _config(self, hosts, node_parameters, bench_parameters):

return committee

def _run_single(self, rate, committee, bench_parameters, debug=False):
def _run_single(self, rate, burst, committee, bench_parameters, debug=False):
faults = bench_parameters.faults

# Kill any potentially unfinished run and delete logs.
Expand All @@ -229,6 +229,7 @@ def _run_single(self, rate, committee, bench_parameters, debug=False):
cmd = CommandMaker.run_client(
address,
bench_parameters.tx_size,
burst,
rate_share,
[x for y in workers_addresses for _, x in y]
)
Expand Down Expand Up @@ -342,15 +343,16 @@ def run(self, bench_parameters_dict, node_parameters_dict, debug=False):
committee_copy = deepcopy(committee)
committee_copy.remove_nodes(committee.size() - n)

for r in bench_parameters.rate:
Print.heading(f'\nRunning {n} nodes (input rate: {r:,} tx/s)')
for burst in bench_parameters.burst:
rate = bench_parameters.rate[0]
Print.heading(f'\nRunning {n} nodes (input rate: {rate:,} tx/s, burst : {burst:,})')

# Run the benchmark.
for i in range(bench_parameters.runs):
Print.heading(f'Run {i+1}/{bench_parameters.runs}')
try:
self._run_single(
r, committee_copy, bench_parameters, debug
rate, burst, committee_copy, bench_parameters, debug
)

faults = bench_parameters.faults
Expand Down
30 changes: 19 additions & 11 deletions benchmark/fabfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ def local(ctx, debug=True):
'rate': 50_000,
'tx_size': 512,
'duration': 20,
"burst" : 50
}
node_params = {
'header_size': 50, # bytes
Expand All @@ -37,7 +38,7 @@ def local(ctx, debug=True):


@task
def create(ctx, nodes=10):
def create(ctx, nodes=1):
''' Create a testbed'''
try:
InstanceManager.make().create_instances(nodes)
Expand All @@ -49,16 +50,16 @@ def create(ctx, nodes=10):
def destroy(ctx):
''' Destroy the testbed '''
try:
InstanceManager.make().terminate_instances()
InstanceManager.make().delete_instances()
except BenchError as e:
Print.error(e)


@task
def start(ctx, max=2):
def start(ctx):
''' Start at most `max` machines per data center '''
try:
InstanceManager.make().start_instances(max)
InstanceManager.make().start_instances()
except BenchError as e:
Print.error(e)

Expand Down Expand Up @@ -91,18 +92,25 @@ def install(ctx):


@task
def remote(ctx, debug=False):
''' Run benchmarks on AWS '''
def remote(ctx, burst = 50, debug=False):
''' Run benchmarks on GCP '''
bench_params = {
'faults': 0,
'nodes': [10, 20],
'nodes': 5,
'workers': 1,
'collocate': True,
'rate': [10_000, 50_000],
'rate': [50000],
'tx_size': 512,
'duration': 300,
'runs': 2,
}
'duration': 20,
'runs': 1,
'burst' : [burst],
}

precision = 20
nodes = bench_params['nodes']
rate = 1000 * nodes * precision
bench_params['rate'] = [rate]

node_params = {
'header_size': 50, # bytes
'max_header_delay': 5_000, # ms
Expand Down
14 changes: 11 additions & 3 deletions node/src/benchmark_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ async fn main() -> Result<()> {
.about("Benchmark client for Narwhal and Tusk.")
.args_from_usage("<ADDR> 'The network address of the node where to send txs'")
.args_from_usage("--size=<INT> 'The size of each transaction in bytes'")
.args_from_usage("--burst=<INT> 'Burst duration (in ms)'")
.args_from_usage("--rate=<INT> 'The rate (txs/s) at which to send the transactions'")
.args_from_usage("--nodes=[ADDR]... 'Network addresses that must be reachable before starting the benchmark.'")
.setting(AppSettings::ArgRequiredElseHelp)
Expand All @@ -39,6 +40,11 @@ async fn main() -> Result<()> {
.unwrap()
.parse::<usize>()
.context("The size of transactions must be a non-negative integer")?;
let burst_duration = matches
.value_of("burst")
.unwrap()
.parse::<u64>()
.context("Burst duration must be a non-negative integer")?;
let rate = matches
.value_of("rate")
.unwrap()
Expand All @@ -65,6 +71,7 @@ async fn main() -> Result<()> {
size,
rate,
nodes,
burst_duration
};

// Wait for all nodes to be online and synchronized.
Expand All @@ -79,12 +86,13 @@ struct Client {
size: usize,
rate: u64,
nodes: Vec<SocketAddr>,
burst_duration: u64,
}

impl Client {
pub async fn send(&self) -> Result<()> {
const PRECISION: u64 = 20; // Sample precision.
const BURST_DURATION: u64 = 1000 / PRECISION;
info!("Burst duration {:?}", self.burst_duration);

// The transaction size must be at least 16 bytes to ensure all txs are different.
if self.size < 9 {
Expand All @@ -104,7 +112,7 @@ impl Client {
let mut counter = 0;
let mut r = rand::thread_rng().gen();
let mut transport = Framed::new(stream, LengthDelimitedCodec::new());
let interval = interval(Duration::from_millis(BURST_DURATION));
let interval = interval(Duration::from_millis(self.burst_duration));
tokio::pin!(interval);

// NOTE: This log entry is used to compute performance.
Expand Down Expand Up @@ -134,7 +142,7 @@ impl Client {
break 'main;
}
}
if now.elapsed().as_millis() > BURST_DURATION as u128 {
if now.elapsed().as_millis() > self.burst_duration as u128 {
// NOTE: This log entry is used to compute performance.
warn!("Transaction rate too high for this client");
}
Expand Down

0 comments on commit 1c19a44

Please sign in to comment.