From bfe181eaf6968c8e996c12d17f7d75eb5d3be079 Mon Sep 17 00:00:00 2001 From: Rohan Yadav Date: Wed, 27 Apr 2022 13:34:38 -0700 Subject: [PATCH 1/4] ray: progress towards adding a ray benchmark --- ray/task_bench.py | 67 +++++++++++++++++++++++++ ray/task_bench_core.py | 108 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 175 insertions(+) create mode 100644 ray/task_bench.py create mode 100644 ray/task_bench_core.py diff --git a/ray/task_bench.py b/ray/task_bench.py new file mode 100644 index 00000000..8710e452 --- /dev/null +++ b/ray/task_bench.py @@ -0,0 +1,67 @@ +import task_bench_core as core + +import ray +import sys +import time + +def execute_task_graph(graph): + graph_array = core.encode_task_graph(graph) + + if graph.scratch_bytes_per_task > 0: + scratch = [ + core.init_scratch_delayed(graph.scratch_bytes_per_task) + for _ in range(graph.max_width) + ] + else: + scratch = [None for _ in range(graph.max_width)] + + outputs = [] + last_row = None + for timestep in range(0, graph.timesteps): + offset = core.c.task_graph_offset_at_timestep(graph, timestep) + width = core.c.task_graph_width_at_timestep(graph, timestep) + row = [] + for point in range(0, offset): + row.append(None) + for point in range(offset, offset + width): + inputs = [] + for dep in core.task_graph_dependencies(graph, timestep, point): + inputs.append(last_row[dep]) + output, scratch[point] = core.execute_point_delayed( + graph_array, timestep, point, scratch[point], *inputs) + row.append(output) + outputs.append(output) + for point in range(offset + width, graph.max_width): + row.append(None) + assert len(row) == graph.max_width + last_row = row + return outputs + +def execute_task_bench(): + app = core.app_create(sys.argv) + task_graphs = core.app_task_graphs(app) + start_time = time.perf_counter() + results = [] + for task_graph in task_graphs: + results.extend(execute_task_graph(task_graph)) + core.join(*results).compute() + total_time = time.perf_counter() - start_time + core.c.app_report_timing(app, total_time) + + +@ray.remote +def task1(): + print("task1") + +@ray.remote +def task2(id): + print("task2") + +if __name__ == "__main__": + # TODO (rohany): Not sure what to do here for multi-node. + ray.init() + + # id1s = [task1.remote() for _ in range(10)] + # id2s = [task2.remote(id) for id in id1s] + # done = ray.get(id2s) + execute_task_bench() diff --git a/ray/task_bench_core.py b/ray/task_bench_core.py new file mode 100644 index 00000000..1db54c48 --- /dev/null +++ b/ray/task_bench_core.py @@ -0,0 +1,108 @@ +import cffi +import ray +import numpy as np +import os +import subprocess + +root_dir = os.path.dirname(os.path.dirname(__file__)) +core_header = subprocess.check_output( + [ + "gcc", "-P", "-E", # "-D", "__attribute__(x)=", "-E", "-P", + os.path.join(root_dir, "core/core_c.h") + ]).decode("utf-8") +ffi = cffi.FFI() +ffi.cdef(core_header) +print(core_header) +c = ffi.dlopen(os.path.join(root_dir, "core", "libcore.so")) + +def app_create(args): + c_args = [] + c_argv = ffi.new("char *[]", len(args) + 1) + for i, arg in enumerate(args): + c_args.append(ffi.new("char []", arg.encode('utf-8'))) + c_argv[i] = c_args[-1] + c_argv[len(args)] = ffi.NULL + app = c.app_create(len(args), c_argv) + c.app_display(app) + return app + +def app_create(args): + c_args = [] + c_argv = ffi.new("char *[]", len(args) + 1) + for i, arg in enumerate(args): + c_args.append(ffi.new("char []", arg.encode('utf-8'))) + c_argv[i] = c_args[-1] + c_argv[len(args)] = ffi.NULL + + app = c.app_create(len(args), c_argv) + c.app_display(app) + return app + +def encode_task_graph(graph): + return np.frombuffer( + ffi.buffer(ffi.addressof(graph), ffi.sizeof(graph)), dtype=np.ubyte) + +def app_task_graphs(app): + result = [] + graphs = c.app_task_graphs(app) + for i in range(c.task_graph_list_num_task_graphs(graphs)): + result.append(c.task_graph_list_task_graph(graphs, i)) + return result + +def decode_task_graph(graph_array): + return ffi.cast("task_graph_t *", graph_array.ctypes.data)[0] + +def task_graph_dependencies(graph, timestep, point): + last_offset = c.task_graph_offset_at_timestep(graph, timestep - 1) + last_width = c.task_graph_width_at_timestep(graph, timestep - 1) + + if timestep == 0: + last_offset, last_width = 0, 0 + + dset = c.task_graph_dependence_set_at_timestep(graph, timestep) + ilist = c.task_graph_dependencies(graph, dset, point) + for i in range(0, c.interval_list_num_intervals(ilist)): + interval = c.interval_list_interval(ilist, i) + for dep in range(interval.start, interval.end + 1): + if last_offset <= dep < last_offset + last_width: + yield dep + +def execute_point_impl(graph_array, timestep, point, scratch, *inputs): + graph = decode_task_graph(graph_array) + + input_ptrs = ffi.new( + "char *[]", [ffi.cast("char *", i.ctypes.data) for i in inputs]) + input_sizes = ffi.new("size_t []", [i.shape[0] for i in inputs]) + + output = np.empty(graph.output_bytes_per_task, dtype=np.ubyte) + output_ptr = ffi.cast("char *", output.ctypes.data) + + if scratch is not None: + scratch_ptr = ffi.cast("char *", scratch.ctypes.data) + scratch_size = scratch.shape[0] + else: + scratch_ptr = ffi.NULL + scratch_size = 0 + + c.task_graph_execute_point_scratch( + graph, timestep, point, output_ptr, output.shape[0], input_ptrs, + input_sizes, len(inputs), scratch_ptr, scratch_size) + + return output + +@ray.remote +def execute_point_scratch(graph_array, timestep, point, scratch, *inputs): + return execute_point_impl( + graph_array, timestep, point, scratch, *inputs), scratch + +@ray.remote +def execute_point_no_scratch(graph_array, timestep, point, *inputs): + return execute_point_impl(graph_array, timestep, point, None, *inputs) + +def execute_point_delayed(graph_array, timestep, point, scratch, *inputs): + if scratch is not None: + return execute_point_scratch( + graph_array, timestep, point, scratch, *inputs).remote() + else: + return execute_point_no_scratch( + graph_array, timestep, point, *inputs).remote(), None From 264af24a75ba1203e826c4a94c21ec34a6cf50f2 Mon Sep 17 00:00:00 2001 From: Rohan Yadav Date: Wed, 27 Apr 2022 14:59:45 -0700 Subject: [PATCH 2/4] ray: updates to ray task bench implementation --- core/core_c.h | 2 ++ ray/task_bench.py | 15 +-------------- ray/task_bench_core.py | 26 +++++++------------------- 3 files changed, 10 insertions(+), 33 deletions(-) diff --git a/core/core_c.h b/core/core_c.h index a35601c7..929b4218 100644 --- a/core/core_c.h +++ b/core/core_c.h @@ -16,9 +16,11 @@ #ifndef CORE_C_H #define CORE_C_H +#ifndef TASK_BENCH_PYTHON_CFFI #include #include #include +#endif #ifdef __cplusplus extern "C" { diff --git a/ray/task_bench.py b/ray/task_bench.py index 8710e452..d5caef0a 100644 --- a/ray/task_bench.py +++ b/ray/task_bench.py @@ -44,24 +44,11 @@ def execute_task_bench(): results = [] for task_graph in task_graphs: results.extend(execute_task_graph(task_graph)) - core.join(*results).compute() + ray.get(results) total_time = time.perf_counter() - start_time core.c.app_report_timing(app, total_time) - -@ray.remote -def task1(): - print("task1") - -@ray.remote -def task2(id): - print("task2") - if __name__ == "__main__": # TODO (rohany): Not sure what to do here for multi-node. ray.init() - - # id1s = [task1.remote() for _ in range(10)] - # id2s = [task2.remote(id) for id in id1s] - # done = ray.get(id2s) execute_task_bench() diff --git a/ray/task_bench_core.py b/ray/task_bench_core.py index 1db54c48..15d60517 100644 --- a/ray/task_bench_core.py +++ b/ray/task_bench_core.py @@ -7,12 +7,13 @@ root_dir = os.path.dirname(os.path.dirname(__file__)) core_header = subprocess.check_output( [ - "gcc", "-P", "-E", # "-D", "__attribute__(x)=", "-E", "-P", + "gcc", "-D", "__attribute__(x)=", + "-D", "TASK_BENCH_PYTHON_CFFI", + "-E", "-P", os.path.join(root_dir, "core/core_c.h") ]).decode("utf-8") ffi = cffi.FFI() ffi.cdef(core_header) -print(core_header) c = ffi.dlopen(os.path.join(root_dir, "core", "libcore.so")) def app_create(args): @@ -26,18 +27,6 @@ def app_create(args): c.app_display(app) return app -def app_create(args): - c_args = [] - c_argv = ffi.new("char *[]", len(args) + 1) - for i, arg in enumerate(args): - c_args.append(ffi.new("char []", arg.encode('utf-8'))) - c_argv[i] = c_args[-1] - c_argv[len(args)] = ffi.NULL - - app = c.app_create(len(args), c_argv) - c.app_display(app) - return app - def encode_task_graph(graph): return np.frombuffer( ffi.buffer(ffi.addressof(graph), ffi.sizeof(graph)), dtype=np.ubyte) @@ -87,7 +76,6 @@ def execute_point_impl(graph_array, timestep, point, scratch, *inputs): c.task_graph_execute_point_scratch( graph, timestep, point, output_ptr, output.shape[0], input_ptrs, input_sizes, len(inputs), scratch_ptr, scratch_size) - return output @ray.remote @@ -101,8 +89,8 @@ def execute_point_no_scratch(graph_array, timestep, point, *inputs): def execute_point_delayed(graph_array, timestep, point, scratch, *inputs): if scratch is not None: - return execute_point_scratch( - graph_array, timestep, point, scratch, *inputs).remote() + return execute_point_scratch.remote( + graph_array, timestep, point, scratch, *inputs) else: - return execute_point_no_scratch( - graph_array, timestep, point, *inputs).remote(), None + return execute_point_no_scratch.remote( + graph_array, timestep, point, *inputs), None From 2c7e9515a421c5fc55cd8c5fa7532ca49c3e53c2 Mon Sep 17 00:00:00 2001 From: Rohan Yadav Date: Wed, 27 Apr 2022 16:50:43 -0700 Subject: [PATCH 3/4] *: updates to core_c.h for Python CFFI compat --- core/core_c.h | 3 +++ dask/task_bench_core.py | 5 +++-- ray/task_bench.py | 6 ++---- ray/task_bench_core.py | 5 ++++- 4 files changed, 12 insertions(+), 7 deletions(-) diff --git a/core/core_c.h b/core/core_c.h index 929b4218..c6d50f60 100644 --- a/core/core_c.h +++ b/core/core_c.h @@ -16,6 +16,9 @@ #ifndef CORE_C_H #define CORE_C_H +// Guard these includes behind a special defines so that the Python +// CFFI parser does not get confused once these headers are expanded +// by the preprocessor. #ifndef TASK_BENCH_PYTHON_CFFI #include #include diff --git a/dask/task_bench_core.py b/dask/task_bench_core.py index 512297c2..90f746da 100644 --- a/dask/task_bench_core.py +++ b/dask/task_bench_core.py @@ -33,12 +33,13 @@ root_dir = os.path.dirname(os.path.dirname(__file__)) core_header = subprocess.check_output( [ - "gcc", "-D", "__attribute__(x)=", "-E", "-P", + "gcc", "-D", "__attribute__(x)=", + "-D", "TASK_BENCH_PYTHON_CFFI", + "-E", "-P", os.path.join(root_dir, "core/core_c.h") ]).decode("utf-8") ffi = cffi.FFI() ffi.cdef(core_header) -c = ffi.dlopen("libcore.so") def init_client(): diff --git a/ray/task_bench.py b/ray/task_bench.py index d5caef0a..77d90003 100644 --- a/ray/task_bench.py +++ b/ray/task_bench.py @@ -1,7 +1,6 @@ -import task_bench_core as core - import ray import sys +import task_bench_core as core import time def execute_task_graph(graph): @@ -49,6 +48,5 @@ def execute_task_bench(): core.c.app_report_timing(app, total_time) if __name__ == "__main__": - # TODO (rohany): Not sure what to do here for multi-node. - ray.init() + ray.init(address="auto") execute_task_bench() diff --git a/ray/task_bench_core.py b/ray/task_bench_core.py index 15d60517..6a0166b3 100644 --- a/ray/task_bench_core.py +++ b/ray/task_bench_core.py @@ -4,6 +4,9 @@ import os import subprocess +# Similiarly to the FFI loading in dask/task_bench_core.py, we load +# the CFFI handles in its own module to avoid introspection from +# cloudpickle. root_dir = os.path.dirname(os.path.dirname(__file__)) core_header = subprocess.check_output( [ @@ -14,7 +17,7 @@ ]).decode("utf-8") ffi = cffi.FFI() ffi.cdef(core_header) -c = ffi.dlopen(os.path.join(root_dir, "core", "libcore.so")) +c = ffi.dlopen("libcore.so") def app_create(args): c_args = [] From 4b137782ab0f38473cf971fefa7b3699f17b6a87 Mon Sep 17 00:00:00 2001 From: Rohan Yadav Date: Fri, 29 Apr 2022 13:14:43 -0700 Subject: [PATCH 4/4] experiments: add script to run ray on sapling --- experiments/sapling_metg_compute/metg_ray.sh | 45 ++++++++++++++++++++ 1 file changed, 45 insertions(+) create mode 100644 experiments/sapling_metg_compute/metg_ray.sh diff --git a/experiments/sapling_metg_compute/metg_ray.sh b/experiments/sapling_metg_compute/metg_ray.sh new file mode 100644 index 00000000..5114f065 --- /dev/null +++ b/experiments/sapling_metg_compute/metg_ray.sh @@ -0,0 +1,45 @@ +#!/bin/bash + +cores=40 +root_dir="$(dirname "$(dirname "$PWD")")" + +export LD_LIBRARY_PATH="$root_dir"/core:"$LD_LIBRARY_PATH" +export PYTHONPATH="$root_dir"/ray:"$PYTHONPATH" +SCHEDULER_PORT=1234 + +function launch { + python3 "$root_dir"/ray/task_bench.py "${@:2}" -skip-graph-validation +} + +function repeat { + local -n result=$1 + local n=$2 + result=() + for i in $(seq 1 $n); do + result+=("${@:3}") + if (( i < n )); then + result+=("-and") + fi + done +} + +function sweep { + for s in 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18; do + for rep in 0 1 2 3 4; do + if [[ $rep -le $s ]]; then + ray start --head --port $SCHEDULER_PORT --num-cpus $cores > /dev/null + local args + repeat args $3 -kernel compute_bound -iter $(( 1 << (30-s) )) -type $4 -radix ${RADIX:-5} -steps ${STEPS:-1000} -width $(( $2 * cores )) + $1 $2 "${args[@]}" + ray stop > /dev/null + fi + done + done +} + +n=1 +for g in ${NGRAPHS:-1}; do + for t in ${PATTERN:-stencil_1d}; do + sweep launch $n $g $t > ray_ngraphs_${g}_type_${t}_nodes_${n}.log + done +done