Skip to content

Commit

Permalink
Add back --direct option to run reprocessing directly on local node
Browse files Browse the repository at this point in the history
  • Loading branch information
takluyver committed Jul 15, 2024
1 parent 2e9d345 commit 3c3d8d5
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 5 deletions.
21 changes: 17 additions & 4 deletions damnit/backend/extraction_control.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ def sbatch_cmd(self, req: ExtractionRequest):
'--wrap', shlex.join(req.python_cmd())
]

def execute(self, req: ExtractionRequest):
def execute_in_slurm(self, req: ExtractionRequest):
"""Run an extraction job in srun with live output"""
log_path = process_log_path(req.run, req.proposal, self.context_dir)
log.info("Processing output will be written to %s",
Expand All @@ -160,6 +160,17 @@ def execute(self, req: ExtractionRequest):
self.srun_cmd(req), stdout=pipe, stderr=subprocess.STDOUT, check=True
)

def execute_direct(self, req: ExtractionRequest):
log_path = process_log_path(req.run, req.proposal, self.context_dir)
log.info("Processing output will be written to %s",
log_path.relative_to(self.context_dir.absolute()))

# Duplicate output to the log file
with tee(log_path) as pipe:
subprocess.run(
req.python_cmd(), stdout=pipe, stderr=subprocess.STDOUT, check=True
)

def srun_cmd(self, req: ExtractionRequest):
return [
'srun', *self._resource_opts(req.cluster),
Expand Down Expand Up @@ -197,7 +208,7 @@ def _slurm_cluster_opts(self):
return opts


def reprocess(runs, proposal=None, match=(), mock=False, watch=False):
def reprocess(runs, proposal=None, match=(), mock=False, watch=False, direct=False):
"""Called by the 'amore-proto reprocess' subcommand"""
submitter = ExtractionSubmitter(Path.cwd())
if proposal is None:
Expand Down Expand Up @@ -257,7 +268,9 @@ def reprocess(runs, proposal=None, match=(), mock=False, watch=False):

for prop, run in props_runs:
req = ExtractionRequest(run, prop, RunData.ALL, match=match, mock=mock)
if watch:
submitter.execute(req)
if direct:
submitter.execute_direct(req)
elif watch:
submitter.execute_in_slurm(req)
else:
submitter.submit(req)
8 changes: 7 additions & 1 deletion damnit/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,10 @@ def main(argv=None):
'--watch', action='store_true',
help="Run jobs one-by-one with live output in the terminal"
)
reprocess_ap.add_argument(
'--direct', action='store_true',
help="Run processing in subprocesses on this node, instead of via Slurm"
)
reprocess_ap.add_argument(
'run', nargs='+',
help="Run number, e.g. 96. Multiple runs can be specified at once, "
Expand Down Expand Up @@ -220,7 +224,9 @@ def main(argv=None):
logging.getLogger('kafka').setLevel(logging.WARNING)

from .backend.extraction_control import reprocess
reprocess(args.run, args.proposal, args.match, args.mock, args.watch)
reprocess(
args.run, args.proposal, args.match, args.mock, args.watch, args.direct
)

elif args.subcmd == 'read-context':
from .backend.extract_data import Extractor
Expand Down

0 comments on commit 3c3d8d5

Please sign in to comment.