Skip to content

Commit

Permalink
[DO NOT MERGE] cProfile every dask worker
Browse files Browse the repository at this point in the history
  • Loading branch information
talumbau committed Feb 17, 2024
1 parent e7b564b commit 1b042b0
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 2 deletions.
29 changes: 27 additions & 2 deletions examples/run_og_usa.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import multiprocessing
from distributed import Client
from distributed import Client, LocalCluster
import os
import json
import time
Expand All @@ -12,10 +12,32 @@
from ogcore.utils import safe_read_pickle


def fetch_profiles(client):
workers = client.scheduler_info()["workers"]
profiles = client.run(
lambda dask_worker: dask_worker.profile.dump_stats(
"profile_worker_{address}.pstats".format(
address=dask_worker.address[-5:]
)
)
)
return profiles


def main():

cluster = LocalCluster(
n_workers=7,
threads_per_worker=13,
worker_dashboard_address=":0",
preload=[
"/usr/local/google/home/talumbau/src/OG-USA/examples/worker_setup.py"
],
)

# Define parameters to use for multiprocessing
num_workers = min(multiprocessing.cpu_count(), 7)
client = Client(n_workers=num_workers)
client = Client(cluster)
print("Number of workers = ", num_workers)

# Directories to save data
Expand Down Expand Up @@ -124,6 +146,9 @@ def main():
start_time = time.time()
runner(p2, time_path=True, client=client)
print("run time = ", time.time() - start_time)

fetch_profiles(client)

client.close()

"""
Expand Down
6 changes: 6 additions & 0 deletions examples/worker_setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import cProfile

def dask_setup(worker):
print("dask setup HAPPENING")
worker.profile = cProfile.Profile()
worker.profile.enable()

0 comments on commit 1b042b0

Please sign in to comment.