diff --git a/examples/run_og_usa.py b/examples/run_og_usa.py index 97d21d7e..f0040c21 100644 --- a/examples/run_og_usa.py +++ b/examples/run_og_usa.py @@ -1,5 +1,5 @@ import multiprocessing -from distributed import Client +from distributed import Client, LocalCluster import os import json import time @@ -12,10 +12,32 @@ from ogcore.utils import safe_read_pickle +def fetch_profiles(client): + workers = client.scheduler_info()["workers"] + profiles = client.run( + lambda dask_worker: dask_worker.profile.dump_stats( + "profile_worker_{address}.pstats".format( + address=dask_worker.address[-5:] + ) + ) + ) + return profiles + + def main(): + + cluster = LocalCluster( + n_workers=7, + threads_per_worker=13, + worker_dashboard_address=":0", + preload=[ + "/usr/local/google/home/talumbau/src/OG-USA/examples/worker_setup.py" + ], + ) + # Define parameters to use for multiprocessing num_workers = min(multiprocessing.cpu_count(), 7) - client = Client(n_workers=num_workers) + client = Client(cluster) print("Number of workers = ", num_workers) # Directories to save data @@ -124,6 +146,9 @@ def main(): start_time = time.time() runner(p2, time_path=True, client=client) print("run time = ", time.time() - start_time) + + fetch_profiles(client) + client.close() """ diff --git a/examples/worker_setup.py b/examples/worker_setup.py new file mode 100644 index 00000000..93f8bf1b --- /dev/null +++ b/examples/worker_setup.py @@ -0,0 +1,6 @@ +import cProfile + +def dask_setup(worker): + print("dask setup HAPPENING") + worker.profile = cProfile.Profile() + worker.profile.enable()