Skip to content

Commit

Permalink
feat: new notebook-based lstbin workflow creator
Browse files Browse the repository at this point in the history
  • Loading branch information
steven-murray committed Mar 7, 2024
1 parent 5cdc9ab commit 088a0d4
Showing 1 changed file with 183 additions and 36 deletions.
219 changes: 183 additions & 36 deletions hera_opm/mf_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -694,6 +694,10 @@ def build_makeflow_from_config(
build_lstbin_makeflow_from_config(
config_file, mf_name=mf_name, work_dir=work_dir, **kwargs
)
elif makeflow_type == "lstbin-notebook":
build_lstbin_notebook_makeflow_from_config(
config_file, mf_name=mf_name, work_dir=work_dir, **kwargs
)
else:
raise ValueError(
"unknown makeflow_type {} specified; must be 'analysis' or 'lstbin'".format(
Expand All @@ -703,6 +707,20 @@ def build_makeflow_from_config(

return

def _get_timeout(config):
timeout = get_config_entry(config, "Options", "timeout", required=False)
if timeout is not None:
# check that the `timeout' command exists on the system
try:
subprocess.check_output(["timeout", "--help"])
except OSError: # pragma: no cover
warnings.warn(
'A value for the "timeout" option was specified,'
" but the `timeout' command does not appear to be"
" installed. Please install or remove the option"
" from the config file"
)
return timeout

def build_analysis_makeflow_from_config(
obsids, config_file, mf_name=None, work_dir=None
Expand Down Expand Up @@ -801,19 +819,7 @@ def build_analysis_makeflow_from_config(
source_script = get_config_entry(config, "Options", "source_script", required=False)
mail_user = get_config_entry(config, "Options", "mail_user", required=False)
batch_system = get_config_entry(config, "Options", "batch_system", required=False)
timeout = get_config_entry(config, "Options", "timeout", required=False)
if timeout is not None:
# check that the `timeout' command exists on the system
try:
subprocess.check_output(["timeout", "--help"])
except OSError: # pragma: no cover
warnings.warn(
'A value for the "timeout" option was specified,'
" but the `timeout' command does not appear to be"
" installed. Please install or remove the option"
" from the config file"
)
timeout = timeout
timeout = _get_timeout(config)

# open file for writing
cf = os.path.basename(config_file)
Expand Down Expand Up @@ -1493,18 +1499,7 @@ def build_lstbin_makeflow_from_config(
conda_env = get_config_entry(config, "Options", "conda_env", required=False)
source_script = get_config_entry(config, "Options", "source_script", required=False)
batch_system = get_config_entry(config, "Options", "batch_system", required=False)
timeout = get_config_entry(config, "Options", "timeout", required=False)
if timeout is not None:
# check that the `timeout' command exists on the system
try:
subprocess.check_output(["timeout", "--help"])
except OSError: # pragma: no cover
warnings.warn(
'A value for the "timeout" option was specified,'
" but the `timeout' command does not appear to be"
" installed. Please install or remove the option"
" from the config file"
)
timeout = _get_timeout(config)

# open file for writing
if mf_name is not None:
Expand Down Expand Up @@ -1555,20 +1550,17 @@ def build_lstbin_makeflow_from_config(
else:
outdir = Path(get_config_entry(config, "LSTBIN_OPTS", "outdir"))

try:
nfiles = make_lstbin_config_file(config, outdir)
except ImportError:
datafiles = get_lstbin_datafiles(config, parent_dir)
datafiles = get_lstbin_datafiles(config, parent_dir)

print("Searching for files in the following globs: ")
for df in datafiles:
print(" " + df.strip("'").strip('"'))
print("Searching for files in the following globs: ")
for df in datafiles:
print(" " + df.strip("'").strip('"'))

# pre-process files to determine the number of output files
_datafiles = [sorted(glob.glob(df.strip("'").strip('"'))) for df in datafiles]
_datafiles = [df for df in _datafiles if len(df) > 0]
# pre-process files to determine the number of output files
_datafiles = [sorted(glob.glob(df.strip("'").strip('"'))) for df in datafiles]
_datafiles = [df for df in _datafiles if len(df) > 0]

nfiles = _legacy_make_lstbin_config_file(config, outdir)
nfiles = _legacy_make_lstbin_config_file(config, outdir)

if not parallelize:
nfiles = 1
Expand Down Expand Up @@ -1645,6 +1637,161 @@ def build_lstbin_makeflow_from_config(

return

def build_lstbin_notebook_makeflow_from_config(
config_file: str | Path,
mf_name: str | None=None,
work_dir: str | Path | None=None,
**kwargs
) -> None:
"""Construct a notebook-based LST-binning makeflow file from input data and a config_file.
This is used from H6C+ with hera_cal 4+.
Parameters
----------
config_file : str
Full path to config file containing options.
mf_name : str
The name of makeflow file. Defaults to "<config_file_basename>.mf" if not
specified.
work_dir : str or Path, optional
The directory in which to write the makeflow file and wrapper files.
If not specified, the parent directory of the config file will be used.
"""
config_file = Path(config_file)
# read in config file
config = toml.load(config_file)
cf = config_file.name

if mf_name is None:
mf_name = config_file.with_suffix(".mf").name

work_dir = Path(work_dir or config_file.parent)

makeflowfile = work_dir / mf_name

# get LSTBIN arguments
lstbin_args = get_config_entry(config, "LSTBIN", "args", required=False)

# set output_file_select to None
config["LSTBIN_OPTS"]["output_file_select"] = str("None")
config['LSTBIN_OPTS']['thisfile'] = str(config_file.absolute())

# get general options
path_to_do_scripts = Path(get_config_entry(config, "Options", "path_to_do_scripts"))
conda_env = get_config_entry(config, "Options", "conda_env", required=False)
source_script = get_config_entry(config, "Options", "source_script", required=False)
batch_system = get_config_entry(config, "Options", "batch_system", required=False)
timeout = _get_timeout(config)

# determine whether or not to parallelize
parallelize = get_config_entry(config, "LSTBIN_OPTS", "parallelize", required=True)

actions = get_config_entry(config, "WorkFlow", "actions", required=True)
if len(actions) > 1:
raise ValueError("This function only supports a single action in the workflow.")
if len(actions) == 0:
raise ValueError("No actions found in the workflow.")
action = actions[0]

# define command
command = path_to_do_scripts / f"do_{action}.sh"

# add resource information
base_mem = get_config_entry(config, "Options", "base_mem", required=True)
base_cpu = get_config_entry(config, "Options", "base_cpu", required=False)
mail_user = get_config_entry(config, "Options", "mail_user", required=False)
default_queue = get_config_entry(
config, "Options", "default_queue", required=False
)
if default_queue is None:
default_queue = "hera"
batch_options = process_batch_options(
base_mem, base_cpu, mail_user, default_queue, batch_system
)

outdir = Path(get_config_entry(config, "LSTBIN_OPTS", "outdir"))

# The new way in H6C+ (notebook interface)
nfiles = make_lstbin_config_file(config, outdir)

if not parallelize:
nfiles = 1

source_script_line = f"source {source_script}" if source_script else ""
conda_env_line = f"conda activate {conda_env}" if conda_env else ""
cmd = f"{command} {{args}}"
cmdline = f"timeout {timeout} {cmdline}" if timeout is not None else cmd

wrapper_template = f"""#!/bin/bash
{source_script_line}
{conda_env_line}
date
cd {work_dir}
{cmdline}
if [ $? -eq 0 ]; then
cd {work_dir}
touch {outfile}
else
mv {logfile} {logfile.parent / f"{logfile.name}.error"}
fi
date
"""

# write makeflow file
with open(makeflowfile, "w") as fl:
# add comment at top of file listing date of creation and config file name
dt = time.strftime("%H:%M:%S on %d %B %Y")
fl.write(
f"""# makeflow file generated from config file {config_file.name}
# created at {dt}
export BATCH_OPTIONS = {batch_options}
""")

# loop over output files
for output_file_index in range(nfiles):
# if parallize, update output_file_select
if parallelize:
config["LSTBIN_OPTS"]["output_file_select"] = str(output_file_index)

# make outfile list
outfile = Path(f"{output_file_index:04}.LSTBIN.out")

# get args list for lst-binning step
args = [
str(get_config_entry(config, "LSTBIN_OPTS", a, required=True))
for a in lstbin_args
]
# turn into string
args = " ".join(args)

# make logfile name
# logfile will capture stdout and stderr
logfile = work_dir / outfile.with_suffix(".log").name

# make a small wrapper script that will run the actual command
# can't embed if; then statements in makeflow script
wrapper_script = work_dir / f"wrapper_{outfile.with_suffix('.sh').name}"

with open(wrapper_script, "w") as f2:
f2.write(wrapper_template.format(args=args))

# make file executable
os.chmod(wrapper_script, 0o755)

# first line lists target file to make (dummy output file), and requirements
# second line is "build rule", which runs the shell script and makes the output file
lines = f"{outfile}: {command}\n\t{wrapper_script} > {logfile} 2>&1\n"
fl.write(lines)

# Write the toml config to the output directory.
shutil.copy2(config_file, outdir / "lstbin-config.toml")

# Also write the conda_env export to the LSTbin dir
if conda_env is not None:
os.system(
f"conda env export -n {conda_env} --file {outdir}/environment.yaml"
)

def clean_wrapper_scripts(work_dir):
"""Clean up wrapper scripts from work directory.
Expand Down

0 comments on commit 088a0d4

Please sign in to comment.