Skip to content

Commit

Permalink
refs #15: Implement ZeroMQ-based latency histogram recorder.
Browse files Browse the repository at this point in the history
 * It records latency histogram automagically.

 * TODO: auto-detect peer pktgen address.
         (currently hard-coded..)

 * TODO: auto-calculate CDF for easier plotting
  • Loading branch information
achimnol committed Mar 20, 2016
1 parent 00b5865 commit 8f551ce
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 10 deletions.
8 changes: 4 additions & 4 deletions scripts/exprlib/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,9 @@ def fix_ownership(path):
is usually done as plain user accounts. It is strongly suggested to
call this function after data recording is finished.
'''
execute('chown -R {user}:{group} {0}'.format(
path, user=ExperimentEnv.get_user(), group=ExperimentEnv.get_group(),
))
execute(['chown', '-R',
'{user}:{group}'.format(user=ExperimentEnv.get_user(), group=ExperimentEnv.get_group()),
'{0}'.format(path)])

@staticmethod
def get_num_nodes():
Expand All @@ -127,7 +127,7 @@ def get_hyperthreading_degree():

@staticmethod
def get_current_commit(short=True):
commit = execute('git rev-parse HEAD', read=True).strip()
commit = execute(['git', 'rev-parse', 'HEAD'], read=True).strip()
if short:
return commit[:7]
return commit
Expand Down
26 changes: 26 additions & 0 deletions scripts/exprlib/latency.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import asyncio
import aiozmq, zmq

class LatencyHistogramReader:

def __init__(self, loop=None):
self._loop = loop if loop else asyncio.get_event_loop()
self.records = []

async def subscribe(self, remote_host, remote_cpu_idx):
self.remote_addr = 'tcp://{}:{}'.format(remote_host, remote_cpu_idx)
self._conn = await aiozmq.create_zmq_stream(zmq.SUB, loop=self._loop,
connect=self.remote_addr)
self._conn.transport.setsockopt(zmq.SUBSCRIBE, b'')
while True:
try:
recv_data = await self._conn.read()
except asyncio.CancelledError:
self._conn.close()
break
cpu_idx = int(recv_data[0].decode())
elapsed_sec = int(recv_data[1].decode())
# TODO: convert to sparse DataFrame
histogram = recv_data[2].decode().splitlines()
self.records.append((cpu_idx, elapsed_sec, histogram))

43 changes: 37 additions & 6 deletions scripts/run_throughput.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from exprlib.records import AppThruputRecord, AppThruputReader
from exprlib.plotting.template import plot_thruput
from exprlib.pktgen import PktGenController
from exprlib.latency import LatencyHistogramReader


async def do_experiment(loop, env, args, conds, thruput_reader, all_tput_recs):
Expand All @@ -26,13 +27,19 @@ async def do_experiment(loop, env, args, conds, thruput_reader, all_tput_recs):

if 'ipv6' in args.element_config_to_use:
# All random ipv6 pkts
pktgen.args = ("-i", "all", "-f", "0", "-v", "6", "-p", str(pktsz))
pktgen.args = ['-i', 'all', '-f', '0', '-v', '6', '-p', str(pktsz)]
if args.latency:
pktgen.args += ['-g', '10', '-l', '--latency-histogram']
elif 'ipsec' in args.element_config_to_use:
# ipv4 pkts with fixed 1K flows
pktgen.args = ("-i", "all", "-f", "1024", "-r", "0", "-v", "4", "-p", str(pktsz))
pktgen.args = ['-i', 'all', '-f', '1024', '-r', '0', '-v', '4', '-p', str(pktsz)]
if args.latency:
pktgen.args += ['-g', '3', '-l', '--latency-histogram']
else:
# All random ipv4 pkts
pktgen.args = ("-i", "all", "-f", "0", "-v", "4", "-p", str(pktsz))
pktgen.args = ['-i', 'all', '-f', '0', '-v', '4', '-p', str(pktsz)]
if args.latency:
pktgen.args += ['-g', '10', '-l', '--latency-histogram']

#cpu_records = env.measure_cpu_usage(interval=2, begin_after=26.0, repeat=True)

Expand All @@ -41,6 +48,12 @@ async def do_experiment(loop, env, args, conds, thruput_reader, all_tput_recs):
thruput_reader.pktsize_hint = pktsz
thruput_reader.conf_hint = conf_name

# Run latency subscriber. (address = "tcp://generator-host:(54000 + cpu_idx)")
# FIXME: get generator addr
if args.latency:
lhreader = LatencyHistogramReader(loop)
hist_task = loop.create_task(lhreader.subscribe('shader-marcel.anlab', 54001))

# Run.
async with pktgen:
if args.transparent:
Expand All @@ -54,11 +67,15 @@ async def do_experiment(loop, env, args, conds, thruput_reader, all_tput_recs):
else:
retcode = await env.execute_main(args.sys_config_to_use, conf_name + '.click', running_time=32.0)

if args.latency:
hist_task.cancel()
await asyncio.sleep(0)

if args.transparent:
return

# Fetch results of throughput measurement and compute average.
# TODO: generalize mean calcuation
# FIXME: generalize mean calcuation
thruput_records = thruput_reader.get_records()
for n in range(env.get_num_nodes()):
all_tput_recs.ix[(conf_name, io_batchsz, comp_batchsz, coproc_ppdepth, n, pktsz)] \
Expand All @@ -72,6 +89,13 @@ async def do_experiment(loop, env, args, conds, thruput_reader, all_tput_recs):
all_tput_recs.ix[(conf_name, io_batchsz, comp_batchsz, coproc_ppdepth, n, pktsz)] \
/= per_node_cnt[n]

# TODO: Store latency histograms
if args.latency:
for r in reversed(lhreader.records):
if len(r[2]) > 10:
print(r)
break

## Fetch results of cpu util measurement and compute average.
#io_usr_avg = io_sys_avg = coproc_usr_avg = coproc_sys_avg = 0
#io_usr_avgs = []; io_sys_avgs = []; coproc_usr_avgs = []; coproc_sys_avgs = []
Expand Down Expand Up @@ -114,6 +138,9 @@ async def do_experiment(loop, env, args, conds, thruput_reader, all_tput_recs):
parser.add_argument('-t', '--transparent', action='store_true', default=False, help='Pass-through the standard output instead of parsing it. No default timeout is applied.')
parser.add_argument('--timeout', type=int, default=None, help='Set a forced timeout for transparent mode.')
parser.add_argument('--combine-cpu-gpu', action='store_true', default=False, help='Run the same config for CPU-only and GPU-only to compare.')
parser.add_argument('-l', '--latency', action='store_true', default=False, help='Save the latency histogram.'
'The packet generation rate is fixed to'
'3 Gbps (for IPsec) or 10 Gbps (otherwise).')
parser.add_argument('-v', '--verbose', action='store_true', default=False)
args = parser.parse_args()

Expand Down Expand Up @@ -182,8 +209,8 @@ async def do_experiment(loop, env, args, conds, thruput_reader, all_tput_recs):
# Sum over node_id while preserving other indexes
pd.set_option('display.expand_frame_repr', False)
pd.set_option('display.float_format', lambda f: '{:.2f}'.format(f))
system_tput = all_tput_recs.sum(level=['conf','io_batchsz','comp_batchsz',
'coproc_ppdepth','pktsz'])
system_tput = all_tput_recs.sum(level=['conf', 'io_batchsz', 'comp_batchsz',
'coproc_ppdepth', 'pktsz'])
now = datetime.now()
dir_name = 'apptput.{:%Y-%m-%d.%H%M%S}'.format(now)
base_path = os.path.join(os.path.expanduser('~/Dropbox/temp/plots/nba'), dir_name)
Expand All @@ -195,11 +222,15 @@ async def do_experiment(loop, env, args, conds, thruput_reader, all_tput_recs):
print('Throughput per system')
print('=====================')
print(system_tput)
print()
with open(os.path.join(base_path, 'version.txt'), 'w') as fout:
print(env.get_current_commit(short=False), file=fout)
all_tput_recs.to_csv(os.path.join(base_filename + '.pernode.csv'), float_format='%.2f')
system_tput.to_csv(os.path.join(base_filename + '.csv'), float_format='%.2f')
#plot_thruput('apptput', all_tput_recs, args.element_config_to_use,
# base_path='~/Dropbox/temp/plots/nba/',
# combine_cpu_gpu=args.combine_cpu_gpu)
env.fix_ownership(base_path)
print('all done.')
sys.exit(0)

0 comments on commit 8f551ce

Please sign in to comment.