diff --git a/exp_scripts/sluice/part1_comparison/extract_cdf_per_setting.py b/exp_scripts/sluice/part1_comparison/extract_cdf_per_setting.py index 07fd84c6..cf4785dd 100644 --- a/exp_scripts/sluice/part1_comparison/extract_cdf_per_setting.py +++ b/exp_scripts/sluice/part1_comparison/extract_cdf_per_setting.py @@ -2,6 +2,8 @@ import sys import numpy as np import matplotlib +from matplotlib.lines import Line2D + matplotlib.use('Agg') import matplotlib.pyplot as plt import os @@ -895,8 +897,8 @@ def plot_success_rate_bar(success_rate_per_label, output_dir, workload_name: str plt.savefig(os.path.join(output_dir, 'success_rate_bar_' + str(workload_name) + '.png'), bbox_inches='tight') plt.close(fig) -def plot_latency_curves(latency_curves, latency_limit, output_dir, start_time, exp_length, workload_name: str): - fig, ax = plt.subplots(figsize=(12, 6)) +def plot_latency_curves(latency_curves, latency_limit, output_dir, start_time, exp_length, workload_name: str, ax, ylabel_flag): + #fig, ax = plt.subplots(figsize=(12, 6)) legend_elements = [] from matplotlib.lines import Line2D for label, latency_curve in latency_curves.items(): @@ -909,8 +911,7 @@ def plot_latency_curves(latency_curves, latency_limit, output_dir, start_time, e marker_indices = np.arange(0, len(latency_curve[0]), 50) # Every 100 points = 10 seconds marker_x = [latency_curve[0][index] for index in marker_indices] marker_y = [latency_curve[1][index] for index in marker_indices] - plt.scatter(marker_x, marker_y, s=64, color=COLOR_MAP[label], marker=MARKER_MAP[label][0], label=label, zorder=10) - legend_elements.append(Line2D([0], [0], linestyle=MARKER_MAP[label][1:], markersize=6, marker=MARKER_MAP[label][0], color=COLOR_MAP[label], label=label)) #, markersize=8)) + ax.scatter(marker_x, marker_y, s=64, color=COLOR_MAP[label], marker=MARKER_MAP[label][0], label=label, zorder=10) ax.plot([0, 10000000], [latency_limit, latency_limit], color='red', linestyle='--') ax.set_ylim(0, 5000) @@ -923,25 +924,27 @@ def plot_latency_curves(latency_curves, latency_limit, output_dir, start_time, e (exp_length / 5) * 1000)]) ax.tick_params(axis='x', labelsize=FONT_SIZE) ax.tick_params(axis='y', labelsize=FONT_SIZE) - ax.set_ylabel('Latency (ms)', fontsize=FONT_SIZE) + if ylabel_flag: + ax.set_ylabel('Latency (ms)', fontsize=FONT_SIZE) ax.set_xlabel('Time (minute)', fontsize=FONT_SIZE) - ax.legend(handles=legend_elements, loc='upper center', bbox_to_anchor=(0.5, 1.25), ncol=2, fontsize=FONT_SIZE) + ax.set_title(workload_name, fontsize=FONT_SIZE) + #ax.legend(handles=legend_elements, loc='upper center', bbox_to_anchor=(0.5, 1.25), ncol=2, fontsize=FONT_SIZE) #ax.legend(loc='upper center', bbox_to_anchor=(0.5, 1.25), ncol=4) ax.grid(True) - # Save the plot - if not os.path.exists(output_dir): - os.makedirs(output_dir) + # # Save the plot + # if not os.path.exists(output_dir): + # os.makedirs(output_dir) + # + # if output_pdf_flag: + # plt.savefig(os.path.join(output_dir, 'latency_curves_' + str(workload_name) + '.pdf'), bbox_inches='tight') + # else: + # plt.savefig(os.path.join(output_dir, 'latency_curves_' + str(workload_name) + '.png'), bbox_inches='tight') + # plt.close(fig) - if output_pdf_flag: - plt.savefig(os.path.join(output_dir, 'latency_curves_' + str(workload_name) + '.pdf'), bbox_inches='tight') - else: - plt.savefig(os.path.join(output_dir, 'latency_curves_' + str(workload_name) + '.png'), bbox_inches='tight') - plt.close(fig) +def plot_parallelism_curves(parallelism_curve, arrival_curve, output_dir, start_time, exp_length, workload_name: str, axs, y1label_flag, y2label_flag): + #fig, axs = plt.subplots(1, 1, figsize=(12, 6), layout='constrained') -def plot_parallelism_curves(parallelism_curve, arrival_curve, output_dir, start_time, exp_length, workload_name: str): - fig, axs = plt.subplots(1, 1, figsize=(12, 6), layout='constrained') - # fig.tight_layout(rect=[0.02, 0, 0.953, 1]) ax1 = axs ax2 = ax1.twinx() ax2.plot(arrival_curve[0], arrival_curve[1], color='red', linestyle='-', label="Arrival Rate") @@ -979,8 +982,10 @@ def plot_parallelism_curves(parallelism_curve, arrival_curve, output_dir, start_ ax1.tick_params(axis='x', labelsize=FONT_SIZE) ax1.tick_params(axis='y', labelsize=FONT_SIZE) ax2.tick_params(axis='y', labelsize=FONT_SIZE) - ax1.set_ylabel('# of Slots', fontsize=FONT_SIZE) - ax2.set_ylabel('Arrival Rate (tps)', fontsize=FONT_SIZE) + if y1label_flag: + ax1.set_ylabel('# of Slots', fontsize=FONT_SIZE) + if y2label_flag: + ax2.set_ylabel('Arrival Rate (tps)', fontsize=FONT_SIZE) axs.set_xlabel('Time (minute)', fontsize=FONT_SIZE) axs.set_xlim((start_time) * 1000, (start_time + exp_length) * 1000) axs.set_xticks(np.arange((start_time) * 1000, (start_time + exp_length) * 1000 + (exp_length / 5) * 1000, @@ -990,20 +995,20 @@ def plot_parallelism_curves(parallelism_curve, arrival_curve, output_dir, start_ (exp_length / 5) * 1000)]) lines1, labels1 = ax1.get_legend_handles_labels() lines2, labels2 = ax2.get_legend_handles_labels() - axs.legend(lines1 + lines2, labels1 + labels2, loc='upper center', bbox_to_anchor=(0.5, 1.4), ncol=2, fontsize=FONT_SIZE) + #axs.legend(lines1 + lines2, labels1 + labels2, loc='upper center', bbox_to_anchor=(0.5, 1.4), ncol=2, fontsize=FONT_SIZE) #axs.legend(loc='upper center', bbox_to_anchor=(0.5, 1.25), ncol=4) axs.grid(True) - # Save the plot - if not os.path.exists(output_dir): - os.makedirs(output_dir) - - if output_pdf_flag: - plt.savefig(os.path.join(output_dir, 'parallelism_curves_' + str(workload_name) + '.pdf'), bbox_inches='tight') - else: - plt.savefig(os.path.join(output_dir, 'parallelism_curves_' + str(workload_name) + '.png'), bbox_inches='tight') - plt.close(fig) + # # Save the plot + # if not os.path.exists(output_dir): + # os.makedirs(output_dir) + # + # if output_pdf_flag: + # plt.savefig(os.path.join(output_dir, 'parallelism_curves_' + str(workload_name) + '.pdf'), bbox_inches='tight') + # else: + # plt.savefig(os.path.join(output_dir, 'parallelism_curves_' + str(workload_name) + '.png'), bbox_inches='tight') + # plt.close(fig) output_pdf_flag = True @@ -1019,7 +1024,8 @@ def plot_parallelism_curves(parallelism_curve, arrival_curve, output_dir, start_ "StreamSwitch": "s-", "Sluice": "o-", } -FONT_SIZE = 30 + +FONT_SIZE = 20 def main(): raw_dir = "/Users/swrrt/Workplace/BacklogDelayPaper/experiments/raw/" @@ -1041,21 +1047,26 @@ def main(): "StreamSwitch": "tweet-streamswitch-streamswitch-5-60-1350-90-1700-1-19-3333-9-500-1-50-1-50-1250-2000-100-true-0.1-1", "Sluice": "tweet-streamsluice-streamsluice-5-60-1350-90-3400-1-19-3333-9-500-1-50-1-50-1250-2000-100-true-0.1-1", }, - # "Stock-Analysis_30min":{ - # "Static": "stock-ds2-ds2-5-8-60-1350-90-1000-20-1-200-4-3333-1-200-1-500-1-7-5000-3000-100-0.1-false-false-1", - # #"Static-Adequate": "stock-ds2-ds2-5-8-60-1350-90-1000-20-1-200-11-3333-1-200-2-500-1-15-5000-3000-100-0.1-false-false-1", - # "DS2": "stock-ds2-ds2-5-8-60-1350-90-1000-20-1-200-11-3333-1-200-2-500-1-15-5000-3000-100-0.1-true-false-1", - # "StreamSwitch": "stock-streamswitch-streamswitch-5-8-60-1350-90-1000-20-1-200-11-3333-1-200-2-500-1-15-5000-3000-100-0.1-true-false-1", - # "Sluice": "stock-streamsluice-streamsluice-5-8-60-1350-90-1000-20-1-200-11-3333-1-200-2-500-1-15-5000-3000-100-0.1-true-true-1", - # }, - # "Linear-Road_30min": { - # "Static": "lr-ds2-ds2-5-8-60-1380-150-1300-10-1-50-3-1000-1-50-14-3333-2000-0.1-100-1-25-0.0-false-2500-0.8-2", - # #"Static-Adequate": "lr-ds2-ds2-5-8-60-1380-150-1300-10-1-50-4-1000-1-50-20-3333-2000-0.1-100-1-25-0.0-false-2500-0.8-2", - # "DS2": "lr-ds2-ds2-5-8-60-1380-150-1300-10-1-50-3-1000-1-50-27-3333-2000-0.1-100-1-25-0.0-true-2500-0.8-2", - # "StreamSwitch": "lr-streamswitch-streamswitch-5-8-60-1380-150-1300-10-1-50-3-1000-1-50-27-3333-2000-0.1-100-1-25-0.0-true-2500-0.8-2", - # "Sluice": "lr-streamsluice-streamsluice-5-8-60-1380-150-1300-10-1-50-3-1000-1-50-27-3333-2000-0.1-100-1-25-0.0-true-1000-0.8-2", - # }, + "Stock-Analysis_30min":{ + "Static": "stock-ds2-ds2-5-8-60-1350-90-1000-20-1-200-4-3333-1-200-1-500-1-7-5000-3000-100-0.1-false-false-1", + #"Static-Adequate": "stock-ds2-ds2-5-8-60-1350-90-1000-20-1-200-11-3333-1-200-2-500-1-15-5000-3000-100-0.1-false-false-1", + "DS2": "stock-ds2-ds2-5-8-60-1350-90-1000-20-1-200-11-3333-1-200-2-500-1-15-5000-3000-100-0.1-true-false-1", + "StreamSwitch": "stock-streamswitch-streamswitch-5-8-60-1350-90-1000-20-1-200-11-3333-1-200-2-500-1-15-5000-3000-100-0.1-true-false-1", + "Sluice": "stock-streamsluice-streamsluice-5-8-60-1350-90-1000-20-1-200-11-3333-1-200-2-500-1-15-5000-3000-100-0.1-true-true-1", + }, + "Linear-Road_30min": { + "Static": "lr-ds2-ds2-5-8-60-1380-150-1300-10-1-50-3-1000-1-50-14-3333-2000-0.1-100-1-25-0.0-false-2500-0.8-2", + #"Static-Adequate": "lr-ds2-ds2-5-8-60-1380-150-1300-10-1-50-4-1000-1-50-20-3333-2000-0.1-100-1-25-0.0-false-2500-0.8-2", + "DS2": "lr-ds2-ds2-5-8-60-1380-150-1300-10-1-50-3-1000-1-50-27-3333-2000-0.1-100-1-25-0.0-true-2500-0.8-2", + "StreamSwitch": "lr-streamswitch-streamswitch-5-8-60-1380-150-1300-10-1-50-3-1000-1-50-27-3333-2000-0.1-100-1-25-0.0-true-2500-0.8-2", + "Sluice": "lr-streamsluice-streamsluice-5-8-60-1380-150-1300-10-1-50-3-1000-1-50-27-3333-2000-0.1-100-1-25-0.0-true-1000-0.8-2", + }, } + + fig, axs = plt.subplots(2, 3, figsize=(24, 10), layout='constrained') + index = 0 + legend_elements = [] + for workload_name, exps_per_label in exps_per_label_per_setting.items(): latency_bar_this_workload = 0 latency_per_label = {} @@ -1102,19 +1113,36 @@ def main(): avg_parallelism_per_label[label] = avg_parallelism max_parallelism_per_label[label] = max_parallelism min_parallelism_per_label[label] = min_parallelism + if index == 0: + legend_elements.append( + Line2D([0], [0], linestyle=MARKER_MAP[label][1:], markersize=6, marker=MARKER_MAP[label][0], + color=COLOR_MAP[label], label=label)) # , markersize=8)) # plot_latency_cdf(latency_per_label, latency_bar_this_workload, overall_output_dir, workload_name) # plot_average_latency(latency_per_label, overall_output_dir, workload_name) # plot_success_rate_bar(success_rate_per_label, overall_output_dir, workload_name) # plot_avg_parallelism_bar(avg_parallelism_per_label, overall_output_dir, workload_name) - plot_latency_curves(latency_curves, latency_bar_this_workload, overall_output_dir, start_time, exp_length, workload_name) + plot_latency_curves(latency_curves, latency_bar_this_workload, overall_output_dir, start_time, exp_length, workload_name, axs[0][index], index == 0) plot_parallelism_curves(parallelism_curves, arrival_curve, overall_output_dir, start_time, exp_length, - workload_name) + workload_name, axs[1][index], index == 0, index == 2) #plot_parallism_curves(parallelism_curves, arrival_curve, overall_output_dir, workload_name) print("success rate: " + str(success_rate_per_label)) print("mean parallelism:" + str(avg_parallelism_per_label)) print("min parallelism:" + str(min_parallelism_per_label)) print("max parallelism:" + str(max_parallelism_per_label)) print("Latency: " + str(calculate_latency_stats(latency_per_label))) + index += 1 + legend_elements.append( + Line2D([0], [0], linestyle="-", + color="r", label="Arrival Rate")) + legend_elements.append( + Line2D([0], [0], linestyle="--", + color="r", label="Latency Limit")) + fig.legend(handles=legend_elements, loc='upper center', bbox_to_anchor=(0.5, 1.1), ncol=6, markerscale=5) + + if output_pdf_flag: + plt.savefig(overall_output_dir + "all_in_one.pdf", bbox_inches='tight') + else: + plt.savefig(overall_output_dir + "all_in_one.png", bbox_inches='tight') if __name__ == "__main__": diff --git a/streamsluice_scripts/scripts/run_part1and2_three_workloads.sh b/streamsluice_scripts/scripts/run_part1and2_three_workloads.sh index 57f0403b..4aad3b8f 100755 --- a/streamsluice_scripts/scripts/run_part1and2_three_workloads.sh +++ b/streamsluice_scripts/scripts/run_part1and2_three_workloads.sh @@ -2,7 +2,7 @@ # List of your script files scripts=( - "linear-road.sh" + # "linear-road.sh" "stock-analysis.sh" "twitter-alert.sh" # Add more scripts as needed diff --git a/streamsluice_scripts/scripts/stock-analysis.sh b/streamsluice_scripts/scripts/stock-analysis.sh index 5743d9cf..de775453 100755 --- a/streamsluice_scripts/scripts/stock-analysis.sh +++ b/streamsluice_scripts/scripts/stock-analysis.sh @@ -168,8 +168,8 @@ run_stock_test(){ autotune=true repeat=1 for scaling_decision_option in 1; do # 2 0 - for autotuner_increase_bar_alpha in 0.1; do # 0.1 0.2 0.4 - for L in 2500 3500 4000; do # 1500 2000 3000 + for autotuner_increase_bar_alpha in 0.2; do # 0.1 0.2 0.4 + for L in 3000; do # 1500 2000 3000 whether_type="streamsluice" how_type="streamsluice" scalein_type="streamsluice" @@ -196,8 +196,8 @@ run_stock_test(){ how_type="ds2" scalein_type="ds2" migration_interval=2500 - run_one_exp - printf "${EXP_NAME}\n" >> stock_result.txt +# run_one_exp +# printf "${EXP_NAME}\n" >> stock_result.txt P1=1 P2=1 P3=11 @@ -211,58 +211,58 @@ run_stock_test(){ how_type="ds2" scalein_type="ds2" migration_interval=2500 - run_one_exp - printf "${EXP_NAME}\n" >> stock_result.txt +# run_one_exp +# printf "${EXP_NAME}\n" >> stock_result.txt is_treat=true whether_type="ds2" how_type="ds2" scalein_type="ds2" migration_interval=2500 - run_one_exp - printf "${EXP_NAME}\n" >> stock_result.txt +# run_one_exp +# printf "${EXP_NAME}\n" >> stock_result.txt whether_type="streamswitch" how_type="streamswitch" scalein_type="streamswitch" migration_interval=1000 - run_one_exp - printf "${EXP_NAME}\n" >> stock_result.txt - - # Part 3 System sensitivity - autotune=true - is_treat=true - migration_interval=1000 - whether_type="streamsluice" - how_type="streamsluice" - scalein_type="streamsluice" - printf "Part_3\n" >> stock_result.txt - printf "Epoch Length\n" >> stock_result.txt - for epoch in 25 50 200 500 1000 2000; do - for L in 3000; do - run_one_exp - printf "${EXP_NAME}\n" >> stock_result.txt - done - done - epoch=100 - - printf "Tuning window Length\n" >> stock_result.txt - for autotune_interval in 30 120 240 480; do # - for L in 3000; do - run_one_exp - printf "${EXP_NAME}\n" >> stock_result.txt - done - done - autotune_interval=60 +# run_one_exp +# printf "${EXP_NAME}\n" >> stock_result.txt - printf "Alpha\n" >> stock_result.txt - for autotuner_increase_bar_alpha in 0.2 0.4 0.8 1.0; do - for L in 3000; do - run_one_exp - printf "${EXP_NAME}\n" >> stock_result.txt - done - done - autotuner_increase_bar_alpha=0.1 +# # Part 3 System sensitivity +# autotune=true +# is_treat=true +# migration_interval=1000 +# whether_type="streamsluice" +# how_type="streamsluice" +# scalein_type="streamsluice" +# printf "Part_3\n" >> stock_result.txt +# printf "Epoch Length\n" >> stock_result.txt +# for epoch in 25 50 200 500 1000 2000; do +# for L in 3000; do +# run_one_exp +# printf "${EXP_NAME}\n" >> stock_result.txt +# done +# done +# epoch=100 +# +# printf "Tuning window Length\n" >> stock_result.txt +# for autotune_interval in 30 120 240 480; do # +# for L in 3000; do +# run_one_exp +# printf "${EXP_NAME}\n" >> stock_result.txt +# done +# done +# autotune_interval=60 +# +# printf "Alpha\n" >> stock_result.txt +# for autotuner_increase_bar_alpha in 0.2 0.4 0.8 1.0; do +# for L in 3000; do +# run_one_exp +# printf "${EXP_NAME}\n" >> stock_result.txt +# done +# done +# autotuner_increase_bar_alpha=0.1 } run_stock_test \ No newline at end of file diff --git a/streamsluice_scripts/scripts/twitter-alert.sh b/streamsluice_scripts/scripts/twitter-alert.sh index 52d12731..d52ad4c5 100755 --- a/streamsluice_scripts/scripts/twitter-alert.sh +++ b/streamsluice_scripts/scripts/twitter-alert.sh @@ -164,7 +164,7 @@ run_stock_test(){ is_treat=true autotune=true for repeat in 2; do #1 - for autotuner_increase_bar_alpha in 0.1; do # 0.1 0.2 0.4 + for autotuner_increase_bar_alpha in 0.2; do # 0.1 0.2 0.4 for L in 2000; do # 500 1000 1500 2000 2500 3000 3500 whether_type="streamsluice" how_type="streamsluice"