diff --git a/scripts/exprlib/env.py b/scripts/exprlib/env.py index 582d646..554d0a8 100644 --- a/scripts/exprlib/env.py +++ b/scripts/exprlib/env.py @@ -106,9 +106,9 @@ def fix_ownership(path): is usually done as plain user accounts. It is strongly suggested to call this function after data recording is finished. ''' - execute('chown -R {user}:{group} {0}'.format( - path, user=ExperimentEnv.get_user(), group=ExperimentEnv.get_group(), - )) + execute(['chown', '-R', + '{user}:{group}'.format(user=ExperimentEnv.get_user(), group=ExperimentEnv.get_group()), + '{0}'.format(path)]) @staticmethod def get_num_nodes(): @@ -127,7 +127,7 @@ def get_hyperthreading_degree(): @staticmethod def get_current_commit(short=True): - commit = execute('git rev-parse HEAD', read=True).strip() + commit = execute(['git', 'rev-parse', 'HEAD'], read=True).strip() if short: return commit[:7] return commit diff --git a/scripts/exprlib/latency.py b/scripts/exprlib/latency.py new file mode 100644 index 0000000..2cf186c --- /dev/null +++ b/scripts/exprlib/latency.py @@ -0,0 +1,26 @@ +import asyncio +import aiozmq, zmq + +class LatencyHistogramReader: + + def __init__(self, loop=None): + self._loop = loop if loop else asyncio.get_event_loop() + self.records = [] + + async def subscribe(self, remote_host, remote_cpu_idx): + self.remote_addr = 'tcp://{}:{}'.format(remote_host, remote_cpu_idx) + self._conn = await aiozmq.create_zmq_stream(zmq.SUB, loop=self._loop, + connect=self.remote_addr) + self._conn.transport.setsockopt(zmq.SUBSCRIBE, b'') + while True: + try: + recv_data = await self._conn.read() + except asyncio.CancelledError: + self._conn.close() + break + cpu_idx = int(recv_data[0].decode()) + elapsed_sec = int(recv_data[1].decode()) + # TODO: convert to sparse DataFrame + histogram = recv_data[2].decode().splitlines() + self.records.append((cpu_idx, elapsed_sec, histogram)) + diff --git a/scripts/run_throughput.py b/scripts/run_throughput.py index 1d7aef8..88125e0 100755 --- a/scripts/run_throughput.py +++ b/scripts/run_throughput.py @@ -15,6 +15,7 @@ from exprlib.records import AppThruputRecord, AppThruputReader from exprlib.plotting.template import plot_thruput from exprlib.pktgen import PktGenController +from exprlib.latency import LatencyHistogramReader async def do_experiment(loop, env, args, conds, thruput_reader, all_tput_recs): @@ -26,13 +27,19 @@ async def do_experiment(loop, env, args, conds, thruput_reader, all_tput_recs): if 'ipv6' in args.element_config_to_use: # All random ipv6 pkts - pktgen.args = ("-i", "all", "-f", "0", "-v", "6", "-p", str(pktsz)) + pktgen.args = ['-i', 'all', '-f', '0', '-v', '6', '-p', str(pktsz)] + if args.latency: + pktgen.args += ['-g', '10', '-l', '--latency-histogram'] elif 'ipsec' in args.element_config_to_use: # ipv4 pkts with fixed 1K flows - pktgen.args = ("-i", "all", "-f", "1024", "-r", "0", "-v", "4", "-p", str(pktsz)) + pktgen.args = ['-i', 'all', '-f', '1024', '-r', '0', '-v', '4', '-p', str(pktsz)] + if args.latency: + pktgen.args += ['-g', '3', '-l', '--latency-histogram'] else: # All random ipv4 pkts - pktgen.args = ("-i", "all", "-f", "0", "-v", "4", "-p", str(pktsz)) + pktgen.args = ['-i', 'all', '-f', '0', '-v', '4', '-p', str(pktsz)] + if args.latency: + pktgen.args += ['-g', '10', '-l', '--latency-histogram'] #cpu_records = env.measure_cpu_usage(interval=2, begin_after=26.0, repeat=True) @@ -41,6 +48,12 @@ async def do_experiment(loop, env, args, conds, thruput_reader, all_tput_recs): thruput_reader.pktsize_hint = pktsz thruput_reader.conf_hint = conf_name + # Run latency subscriber. (address = "tcp://generator-host:(54000 + cpu_idx)") + # FIXME: get generator addr + if args.latency: + lhreader = LatencyHistogramReader(loop) + hist_task = loop.create_task(lhreader.subscribe('shader-marcel.anlab', 54001)) + # Run. async with pktgen: if args.transparent: @@ -54,11 +67,15 @@ async def do_experiment(loop, env, args, conds, thruput_reader, all_tput_recs): else: retcode = await env.execute_main(args.sys_config_to_use, conf_name + '.click', running_time=32.0) + if args.latency: + hist_task.cancel() + await asyncio.sleep(0) + if args.transparent: return # Fetch results of throughput measurement and compute average. - # TODO: generalize mean calcuation + # FIXME: generalize mean calcuation thruput_records = thruput_reader.get_records() for n in range(env.get_num_nodes()): all_tput_recs.ix[(conf_name, io_batchsz, comp_batchsz, coproc_ppdepth, n, pktsz)] \ @@ -72,6 +89,13 @@ async def do_experiment(loop, env, args, conds, thruput_reader, all_tput_recs): all_tput_recs.ix[(conf_name, io_batchsz, comp_batchsz, coproc_ppdepth, n, pktsz)] \ /= per_node_cnt[n] + # TODO: Store latency histograms + if args.latency: + for r in reversed(lhreader.records): + if len(r[2]) > 10: + print(r) + break + ## Fetch results of cpu util measurement and compute average. #io_usr_avg = io_sys_avg = coproc_usr_avg = coproc_sys_avg = 0 #io_usr_avgs = []; io_sys_avgs = []; coproc_usr_avgs = []; coproc_sys_avgs = [] @@ -114,6 +138,9 @@ async def do_experiment(loop, env, args, conds, thruput_reader, all_tput_recs): parser.add_argument('-t', '--transparent', action='store_true', default=False, help='Pass-through the standard output instead of parsing it. No default timeout is applied.') parser.add_argument('--timeout', type=int, default=None, help='Set a forced timeout for transparent mode.') parser.add_argument('--combine-cpu-gpu', action='store_true', default=False, help='Run the same config for CPU-only and GPU-only to compare.') + parser.add_argument('-l', '--latency', action='store_true', default=False, help='Save the latency histogram.' + 'The packet generation rate is fixed to' + '3 Gbps (for IPsec) or 10 Gbps (otherwise).') parser.add_argument('-v', '--verbose', action='store_true', default=False) args = parser.parse_args() @@ -182,8 +209,8 @@ async def do_experiment(loop, env, args, conds, thruput_reader, all_tput_recs): # Sum over node_id while preserving other indexes pd.set_option('display.expand_frame_repr', False) pd.set_option('display.float_format', lambda f: '{:.2f}'.format(f)) - system_tput = all_tput_recs.sum(level=['conf','io_batchsz','comp_batchsz', - 'coproc_ppdepth','pktsz']) + system_tput = all_tput_recs.sum(level=['conf', 'io_batchsz', 'comp_batchsz', + 'coproc_ppdepth', 'pktsz']) now = datetime.now() dir_name = 'apptput.{:%Y-%m-%d.%H%M%S}'.format(now) base_path = os.path.join(os.path.expanduser('~/Dropbox/temp/plots/nba'), dir_name) @@ -195,11 +222,15 @@ async def do_experiment(loop, env, args, conds, thruput_reader, all_tput_recs): print('Throughput per system') print('=====================') print(system_tput) + print() + with open(os.path.join(base_path, 'version.txt'), 'w') as fout: + print(env.get_current_commit(short=False), file=fout) all_tput_recs.to_csv(os.path.join(base_filename + '.pernode.csv'), float_format='%.2f') system_tput.to_csv(os.path.join(base_filename + '.csv'), float_format='%.2f') #plot_thruput('apptput', all_tput_recs, args.element_config_to_use, # base_path='~/Dropbox/temp/plots/nba/', # combine_cpu_gpu=args.combine_cpu_gpu) + env.fix_ownership(base_path) print('all done.') sys.exit(0)