Skip to content

Commit

Permalink
Use Slurm array jobs to limit concurrent extraction jobs (#335)
Browse files Browse the repository at this point in the history
* Use Slurm array jobs to limit concurrent extraction jobs

* Use temporary scripts to launch Slurm array jobs

* When there are >1000 runs to reprocess, start the second batch only after the first batch finishes

* Allow overriding the limit on simultaneous jobs
  • Loading branch information
takluyver authored Sep 18, 2024
1 parent c36f975 commit 2548d5e
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 13 deletions.
93 changes: 83 additions & 10 deletions damnit/backend/extraction_control.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from ctypes import CDLL
from dataclasses import dataclass
from pathlib import Path
from secrets import token_hex
from threading import Thread

from extra_data.read_machinery import find_proposal
Expand Down Expand Up @@ -74,6 +75,16 @@ def proposal_runs(proposal):
raw_dir = Path(find_proposal(proposal_name)) / "raw"
return set(int(p.stem[1:]) for p in raw_dir.glob("*"))

def batches(l, n):
start = 0
while True:
end = start + n
batch = l[start:end]
if not batch:
return
yield batch
start = end


@dataclass
class ExtractionRequest:
Expand Down Expand Up @@ -143,17 +154,78 @@ def sbatch_cmd(self, req: ExtractionRequest):
log.info("Processing output will be written to %s",
log_path.relative_to(self.context_dir.absolute()))

if req.run == -1:
job_name = f"p{req.proposal}-damnit"
else:
# We put the run number first so that it's visible in
# squeue's default 11-character column for the JobName.
job_name = f"r{req.run}-p{req.proposal}-damnit"

return [
'sbatch', '--parsable',
*self._resource_opts(req.cluster),
'-o', log_path,
'--open-mode=append',
# Note: we put the run number first so that it's visible in
# squeue's default 11-character column for the JobName.
'--job-name', f"r{req.run}-p{req.proposal}-damnit",
'--job-name', job_name,
'--wrap', shlex.join(req.python_cmd())
]

def submit_multi(self, reqs: list[ExtractionRequest], limit_running=30):
"""Submit multiple requests using Slurm job arrays.
"""
out = []

assert len({r.cluster for r in reqs}) <= 1 # Don't mix cluster/non-cluster

# Array jobs are limited to 1001 in Slurm config (MaxArraySize)
for req_group in batches(reqs, 1000):
grpid = token_hex(8) # random unique string
scripts_dir = self.context_dir / '.tmp'
scripts_dir.mkdir(exist_ok=True)
if scripts_dir.stat().st_uid == os.getuid():
scripts_dir.chmod(0o777)

for i, req in enumerate(req_group):
script_file = scripts_dir / f'launch-{grpid}-{i}.sh'
log_path = process_log_path(req.run, req.proposal, self.context_dir)
script_file.write_text(
'rm "$0"\n' # Script cleans itself up
f'{shlex.join(req.python_cmd())} >>"{log_path}" 2>&1'
)
script_file.chmod(0o777)

script_expr = f".tmp/launch-{grpid}-$SLURM_ARRAY_TASK_ID.sh"
cmd = self.sbatch_array_cmd(script_expr, req_group, limit_running)
if out:
# 1 batch at a time, to simplify limiting concurrent jobs
prev_job = out[-1][0]
cmd.append(f"--dependency=afterany:{prev_job}")
res = subprocess.run(
cmd, stdout=subprocess.PIPE, text=True, check=True, cwd=self.context_dir,
)
job_id, _, cluster = res.stdout.partition(';')
job_id = job_id.strip()
cluster = cluster.strip() or 'maxwell'
log.info("Launched Slurm (%s) job array %s (%d runs) to run context file",
cluster, job_id, len(req_group))
out.append((job_id, cluster))

return out

def sbatch_array_cmd(self, script_expr, reqs, limit_running=30):
"""Make the sbatch command for an array job"""
req = reqs[0] # This should never be called with an empty list
return [
'sbatch', '--parsable',
*self._resource_opts(req.cluster),
# Slurm doesn't know the run number, so we redirect inside the job
'-o', '/dev/null',
'--open-mode=append',
'--job-name', f"p{req.proposal}-damnit",
'--array', f"0-{len(reqs)-1}%{limit_running}",
'--wrap', f'exec {script_expr}'
]

def execute_in_slurm(self, req: ExtractionRequest):
"""Run an extraction job in srun with live output"""
log_path = process_log_path(req.run, req.proposal, self.context_dir)
Expand Down Expand Up @@ -216,7 +288,7 @@ def _slurm_cluster_opts(self):
return opts


def reprocess(runs, proposal=None, match=(), mock=False, watch=False, direct=False):
def reprocess(runs, proposal=None, match=(), mock=False, watch=False, direct=False, limit_running=30):
"""Called by the 'amore-proto reprocess' subcommand"""
submitter = ExtractionSubmitter(Path.cwd())
if proposal is None:
Expand All @@ -243,6 +315,7 @@ def reprocess(runs, proposal=None, match=(), mock=False, watch=False, direct=Fal
else:
unavailable_runs.append((proposal, run))

props_runs.sort()
print(f"Reprocessing {len(props_runs)} runs already recorded, skipping {len(unavailable_runs)}...")
else:
try:
Expand Down Expand Up @@ -274,11 +347,11 @@ def reprocess(runs, proposal=None, match=(), mock=False, watch=False, direct=Fal
for req in reqs[1:]:
req.update_vars = False

for prop, run in props_runs:
req = ExtractionRequest(run, prop, RunData.ALL, match=match, mock=mock)
if direct:
if direct:
for req in reqs:
submitter.execute_direct(req)
elif watch:
elif watch:
for req in reqs:
submitter.execute_in_slurm(req)
else:
submitter.submit(req)
else:
submitter.submit_multi(reqs, limit_running=limit_running)
7 changes: 6 additions & 1 deletion damnit/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ def main(argv=None):
'--direct', action='store_true',
help="Run processing in subprocesses on this node, instead of via Slurm"
)
reprocess_ap.add_argument(
'--concurrent-jobs', type=int, default=30,
help="The maximum number of jobs that will run at once (default 30)"
)
reprocess_ap.add_argument(
'run', nargs='+',
help="Run number, e.g. 96. Multiple runs can be specified at once, "
Expand Down Expand Up @@ -226,7 +230,8 @@ def main(argv=None):

from .backend.extraction_control import reprocess
reprocess(
args.run, args.proposal, args.match, args.mock, args.watch, args.direct
args.run, args.proposal, args.match, args.mock, args.watch, args.direct,
limit_running=args.concurrent_jobs,
)

elif args.subcmd == 'read-context':
Expand Down
3 changes: 1 addition & 2 deletions damnit/gui/main_window.py
Original file line number Diff line number Diff line change
Expand Up @@ -928,8 +928,7 @@ def process_runs(self):

try:
reqs = dlg.extraction_requests()
for req in reqs:
submitter.submit(req)
submitter.submit_multi(reqs)
except Exception as e:
log.error("Error launching processing", exc_info=True)
self.show_status_message(f"Error launching processing: {e}",
Expand Down

0 comments on commit 2548d5e

Please sign in to comment.