Skip to content

Commit

Permalink
Merge pull request #130 from gjoseph92/run-on-coiled
Browse files Browse the repository at this point in the history
[WIP] Nightly benchmarks runnable against Coiled
  • Loading branch information
quasiben authored May 5, 2021
2 parents 74d4eb7 + 7493dfa commit 444185c
Show file tree
Hide file tree
Showing 10 changed files with 269 additions and 22 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
dask-worker-space/
.vscode/
56 changes: 56 additions & 0 deletions coiled-job.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
"""
Run the benchmark script within Coiled, as well as the cluster.
Currently we do this through a Jupyter notebook, since I'm not sure the right way
to securely get a GitHub API token into a Coiled Job. So for now, you create the notebook
configs (which use the same software env as the cluster, including the correct dask config baked in),
go to https://cloud.coiled.io/gjoseph92/notebooks to launch the notebook(s), then in a terminal::
$ chmod +x run-coiled-benchmark-job.sh
$ bash # the jupyter terminal is currently opening in `sh`; only bash has our conda env set
$ ./run-coiled-benchmark-job.sh <github-api-token>
This will run the benchmark and upload all the outputs to a gist.
Apparently the jobs don't stop (Jupyter doesn't terminate?) even via the "shut down" menu option, so I'm
just doing `[coiled.stop_job(j) for j in coiled.list_jobs()]` to stop all the notebooks once they're done.
Clearly all this will be more reasonable as a plain Job, once we figure out the secrets situation.
"""

import coiled

NIGHTLY_JOB = "scheduler-benchmark"
V230_JOB = "scheduler-benchmark-230"

def create_notebook(v230=False):
coiled.create_job_configuration(
name=(V230_JOB if v230 else NIGHTLY_JOB) + "-nb",
software="gjoseph92/scheduler-benchmark-230" if v230 else "gjoseph92/scheduler-benchmark",
cpu=1,
memory="4 GiB",
command=["jupyter", "lab", "--allow-root", "--ip=0.0.0.0", "--no-browser"],
# TODO why does this start in `sh` and not `bash`?
ports=[8888],
files=["nightly-benchmark/nightly-run.py", "nightly-benchmark/assertions.py", "run-coiled-benchmark-job.sh"],
)

def create_job_config(v230=False):
coiled.create_job_configuration(
name=V230_JOB if v230 else NIGHTLY_JOB,
software="gjoseph92/scheduler-benchmark-230" if v230 else "gjoseph92/scheduler-benchmark",
cpu=1,
memory="4 GiB",
command=["python", "nightly-run.py", "coiled"],
# TODO how to securely add a github api token so we can run the script instead, and upload the gist?
# then this will actually work as a fire-and-forget job.
files=["nightly-benchmark/nightly-run.py", "nightly-benchmark/assertions.py", "run-coiled-benchmark-job.sh"],
)

def run_job(v230=False):
coiled.start_job(V230_JOB if v230 else NIGHTLY_JOB)

if __name__ == "__main__":
# TODO also create job configs, once we can get secrets into coiled jobs.
create_notebook(v230=False)
create_notebook(v230=True)
3 changes: 3 additions & 0 deletions dask-230.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
benchmark:
software: "gjoseph92/scheduler-benchmark-230"
checks: false
11 changes: 4 additions & 7 deletions dask.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,9 @@ optimization:

distributed:
version: 2
worker:
profile:
interval: 10s # Time between statistical profiling queries
cycle: 1000s # Time between starting new profile
low-level: false # Whether or not to include low-level functions
admin:
event-loop: uvloop
system-monitor:
interval: 1h

benchmark:
software: "gjoseph92/scheduler-benchmark"
checks: true
25 changes: 25 additions & 0 deletions environment-230.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
name: perf-dev-230
channels:
- conda-forge
dependencies:
- python=3.8.8
- coiled=0.0.37
- dask=2.30.0
- distributed=2.30.0
- pip=21.0.1
- compilers=1.1.3
- ipython=7.21.0
- jupyterlab=3.0.11
- jupyter-server-proxy=3.0.2
- viztracer=0.12.2
- bokeh=2.3.0
- graphviz=2.40.1
- matplotlib=3.3.4
- seaborn=0.11.1
- numpy=1.20.1
- pandas=1.2.3
- xarray=0.17.0
- pip:
- gprof2dot==2021.2.21
variables:
DASK_CONFIG: dask-230.yaml
27 changes: 27 additions & 0 deletions environment.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
name: perf-dev
channels:
- conda-forge
dependencies:
- python=3.8.8
- coiled=0.0.37
- pip=21.0.1
- compilers=1.1.3
- Cython=0.29.22
- uvloop=0.15.2
- ipython=7.21.0
- jupyterlab=3.0.11
- jupyter-server-proxy=3.0.2
- viztracer=0.12.2
- bokeh=2.3.0
- graphviz=2.40.1
- matplotlib=3.3.4
- seaborn=0.11.1
- numpy=1.20.1
- pandas=1.2.3
- xarray=0.17.0
- pip:
- gprof2dot==2021.2.21
- git+https://github.com/dask/distributed.git@main --install-option="--with-cython=profile"
- git+https://github.com/dask/dask.git@main
variables:
DASK_CONFIG: dask.yaml
47 changes: 47 additions & 0 deletions make-coiled-env.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
#!/usr/bin/env bash

# Use a post-build script to insert the dask config into the Coiled software environment.

# HACK: Coiled offers no easy way to add auxiliary data files---or a dask config---in software environments,
# so we generate a post-build shell script that has the contents of `dask.yaml` within itself, and writes
# those contents out when executed.
OUT_CONFIG_PATH="~/.config/dask/dask.yaml"

if [[ $1 == "v230" ]]; then
YAML_CONTENTS=$(<dask-230.yaml)
ENV_NAME="scheduler-benchmark-230"
ENV_YML="environment-230.yml"
else
YAML_CONTENTS=$(<dask.yaml)
ENV_NAME="scheduler-benchmark"
ENV_YML="environment.yml"
fi

POSTBUILD_SCRIPT="postbuild.sh"
cat > $POSTBUILD_SCRIPT <<EOF
#!/usr/bin/env sh
set -x
OUT_CONFIG_PATH=$OUT_CONFIG_PATH
# ^ NOTE: no quotes, so ~ expands (https://stackoverflow.com/a/32277036)
mkdir -p \$(dirname \$OUT_CONFIG_PATH)
cat > \$OUT_CONFIG_PATH <<INNER_EOF
$YAML_CONTENTS
INNER_EOF
echo "export DASK_CONFIG=\$OUT_CONFIG_PATH" >> ~/.bashrc
echo "Wrote dask config to \$OUT_CONFIG_PATH:"
cat \$OUT_CONFIG_PATH
wget -q https://github.com/cli/cli/releases/download/v1.7.0/gh_1.7.0_linux_amd64.tar.gz
tar xzf gh_1.7.0_linux_amd64.tar.gz
mv gh_1.7.0_linux_amd64/bin/gh /usr/local/bin
rm gh_1.7.0_linux_amd64.tar.gz
rm -rf gh_1.7.0_linux_amd64
echo "Installed GitHub CLI"
EOF

coiled env create -n $ENV_NAME --conda $ENV_YML --post-build $POSTBUILD_SCRIPT
rm $POSTBUILD_SCRIPT
48 changes: 48 additions & 0 deletions nightly-benchmark/assertions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import yaml

import dask
import distributed

def check_scheduler_is_cythonized(client: distributed.Client):
# TODO is there a less hacky way to do this?
path = client.run_on_scheduler(lambda: distributed.scheduler.__file__)
if not path.endswith(".so"):
client.shutdown()
raise RuntimeError(
f"Scheduler is not Cythonized!\n{path}"
)

def assert_config_is_superset_of(config: dict, target: dict, context: str = ""):
for k, v in target.items():
try:
check_v = config[k]
except KeyError:
msg = f"Config missing expected key {k!r}"
if context:
msg = f"{msg} : {context}"
raise ValueError(msg) from None
else:
if isinstance(v, dict):
assert_config_is_superset_of(check_v, v, context=f"{context}.{k}")
else:
if check_v != v:
msg = f"Config mismatch: expected {v!r}, found {check_v!r}"
if context:
msg = f"{msg} : {context}"
raise ValueError(msg)


def check_config(client: distributed.Client):
local_config = dask.config.collect(["dask.yaml"], env={})
scheduler_config = client.run_on_scheduler(lambda: dask.config.config)

try:
assert_config_is_superset_of(scheduler_config, local_config, context="scheduler")
for worker_id, worker_config in client.run(lambda: dask.config.config).items():
assert_config_is_superset_of(worker_config, local_config, context=worker_id)
except ValueError:
client.shutdown()
raise
else:
scheduler_config.pop("coiled", None) # has a token in it, don't want it logged
print(yaml.dump(scheduler_config))
60 changes: 45 additions & 15 deletions nightly-benchmark/nightly-run.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,15 @@
import matplotlib.pyplot as plt
import seaborn as sns

import assertions

sns.set(font_scale=1.5, style="whitegrid")


today = datetime.now().strftime("%Y%m%d")


def main():
client = Client(n_workers=10, threads_per_worker=1)
print(client)

def main(client: Client, filename_suffix: str = ""):
df = dask.datasets.timeseries(
start="2000-01-01",
end="2000-01-31",
Expand All @@ -35,32 +34,32 @@ def main():
wait(df)
iterations = 10

with performance_report(filename=f"{today}-simple-scheduler.html"):
with performance_report(filename=f"{today}-simple-scheduler{filename_suffix}.html"):
simple = []
# print('start simple: ', flush=True)
for i in range(iterations):
start = time.time()
start = time.perf_counter()
z = df.x + 1 + 2 - df.y
z.sum().compute()
stop = time.time()
stop = time.perf_counter()
simple.append(stop - start)
simple = np.array(simple)

df2 = None
with performance_report(filename=f"{today}-shuffle-scheduler.html"):
with performance_report(filename=f"{today}-shuffle-scheduler{filename_suffix}.html"):
shuffle_t = []
# print('start shuffle: ', flush=True)
for i in range(iterations):
client.cancel(df2)
start = time.time()
start = time.perf_counter()
# shuffle(df, "id", shuffle="tasks")
df2 = df.set_index("id").persist()
wait(df2)
stop = time.time()
stop = time.perf_counter()
shuffle_t.append(stop - start)
shuffle_t = np.array(shuffle_t)

with performance_report(filename=f"{today}-rand-access-scheduler.html"):
with performance_report(filename=f"{today}-rand-access-scheduler{filename_suffix}.html"):
rand_access = []
for i in range(iterations):
start = time.time()
Expand All @@ -74,7 +73,7 @@ def main():
clim = da.groupby('day').mean(dim='time')
anom = da.groupby('day') - clim
anom_mean = anom.mean(dim='time')
with performance_report(filename=f"{today}-anom-mean-scheduler.html"):
with performance_report(filename=f"{today}-anom-mean-scheduler{filename_suffix}.html"):
anom_mean_t = []
for i in range(iterations):
start = time.time()
Expand All @@ -88,13 +87,44 @@ def main():
anom_mean=anom_mean_t)

if __name__ == "__main__":
data = main()
import sys

if sys.argv[-1] == "coiled":
import coiled
software = dask.config.get("benchmark.software", "gjoseph92/scheduler-benchmark")
print(f"Using software environment {software!r} for cluster.")
start = time.perf_counter()
cluster = coiled.Cluster(
n_workers=10,
worker_memory="54 GiB",
worker_cpu=1,
# ^ NOTE: Coiled VM backend required to get these resources
worker_options={"nthreads": 1},
scheduler_cpu=1,
scheduler_memory="8 GiB",
software=software,
shutdown_on_close=True,
)
elapsed = time.perf_counter() - start
print(f"Created Coiled cluster in {elapsed / 60:.1f} min")
client = Client(cluster)
filename_suffix = "-coiled"
else:
client = Client(n_workers=10, threads_per_worker=1)
filename_suffix = ""

print(client)
print(f"Distributed Version: {distributed.__version__}")
if dask.config.get("benchmark.checks", False):
assertions.check_scheduler_is_cythonized(client)
assertions.check_config(client)
data = main(client, filename_suffix=filename_suffix)
client.shutdown()

today = datetime.now().strftime("%Y%m%d")

bench_data_name = "benchmark-historic-runs.csv"
bench_image = f"{today}-benchmark-history.png"
bench_data_name = f"benchmark-historic-runs{filename_suffix}.csv"
bench_image = f"{today}-benchmark-history{filename_suffix}.png"
if os.path.exists("/etc/dgx-release"):
bench_data_name = "dgx-" + bench_data_name
bench_image = "dgx-" + bench_image
Expand Down
13 changes: 13 additions & 0 deletions run-coiled-benchmark-job.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#!/usr/bin/env bash

if ! gh auth status 2> /dev/null; then
if [ -z $1 ] ; then
echo "Give your GitHub API token as a command-line argument"
exit 1
fi
echo "$1" | gh auth login --with-token
fi

python nightly-run.py coiled > out.txt

gh gist create *.html *.png out.txt

0 comments on commit 444185c

Please sign in to comment.