diff --git a/hera_opm/mf_tools.py b/hera_opm/mf_tools.py index 9245e21..bdc875e 100644 --- a/hera_opm/mf_tools.py +++ b/hera_opm/mf_tools.py @@ -694,6 +694,10 @@ def build_makeflow_from_config( build_lstbin_makeflow_from_config( config_file, mf_name=mf_name, work_dir=work_dir, **kwargs ) + elif makeflow_type == "lstbin-notebook": + build_lstbin_notebook_makeflow_from_config( + config_file, mf_name=mf_name, work_dir=work_dir, **kwargs + ) else: raise ValueError( "unknown makeflow_type {} specified; must be 'analysis' or 'lstbin'".format( @@ -703,6 +707,20 @@ def build_makeflow_from_config( return +def _get_timeout(config): + timeout = get_config_entry(config, "Options", "timeout", required=False) + if timeout is not None: + # check that the `timeout' command exists on the system + try: + subprocess.check_output(["timeout", "--help"]) + except OSError: # pragma: no cover + warnings.warn( + 'A value for the "timeout" option was specified,' + " but the `timeout' command does not appear to be" + " installed. Please install or remove the option" + " from the config file" + ) + return timeout def build_analysis_makeflow_from_config( obsids, config_file, mf_name=None, work_dir=None @@ -801,19 +819,7 @@ def build_analysis_makeflow_from_config( source_script = get_config_entry(config, "Options", "source_script", required=False) mail_user = get_config_entry(config, "Options", "mail_user", required=False) batch_system = get_config_entry(config, "Options", "batch_system", required=False) - timeout = get_config_entry(config, "Options", "timeout", required=False) - if timeout is not None: - # check that the `timeout' command exists on the system - try: - subprocess.check_output(["timeout", "--help"]) - except OSError: # pragma: no cover - warnings.warn( - 'A value for the "timeout" option was specified,' - " but the `timeout' command does not appear to be" - " installed. Please install or remove the option" - " from the config file" - ) - timeout = timeout + timeout = _get_timeout(config) # open file for writing cf = os.path.basename(config_file) @@ -1493,18 +1499,7 @@ def build_lstbin_makeflow_from_config( conda_env = get_config_entry(config, "Options", "conda_env", required=False) source_script = get_config_entry(config, "Options", "source_script", required=False) batch_system = get_config_entry(config, "Options", "batch_system", required=False) - timeout = get_config_entry(config, "Options", "timeout", required=False) - if timeout is not None: - # check that the `timeout' command exists on the system - try: - subprocess.check_output(["timeout", "--help"]) - except OSError: # pragma: no cover - warnings.warn( - 'A value for the "timeout" option was specified,' - " but the `timeout' command does not appear to be" - " installed. Please install or remove the option" - " from the config file" - ) + timeout = _get_timeout(config) # open file for writing if mf_name is not None: @@ -1555,20 +1550,17 @@ def build_lstbin_makeflow_from_config( else: outdir = Path(get_config_entry(config, "LSTBIN_OPTS", "outdir")) - try: - nfiles = make_lstbin_config_file(config, outdir) - except ImportError: - datafiles = get_lstbin_datafiles(config, parent_dir) + datafiles = get_lstbin_datafiles(config, parent_dir) - print("Searching for files in the following globs: ") - for df in datafiles: - print(" " + df.strip("'").strip('"')) + print("Searching for files in the following globs: ") + for df in datafiles: + print(" " + df.strip("'").strip('"')) - # pre-process files to determine the number of output files - _datafiles = [sorted(glob.glob(df.strip("'").strip('"'))) for df in datafiles] - _datafiles = [df for df in _datafiles if len(df) > 0] + # pre-process files to determine the number of output files + _datafiles = [sorted(glob.glob(df.strip("'").strip('"'))) for df in datafiles] + _datafiles = [df for df in _datafiles if len(df) > 0] - nfiles = _legacy_make_lstbin_config_file(config, outdir) + nfiles = _legacy_make_lstbin_config_file(config, outdir) if not parallelize: nfiles = 1 @@ -1645,6 +1637,161 @@ def build_lstbin_makeflow_from_config( return +def build_lstbin_notebook_makeflow_from_config( + config_file: str | Path, + mf_name: str | None=None, + work_dir: str | Path | None=None, + **kwargs +) -> None: + """Construct a notebook-based LST-binning makeflow file from input data and a config_file. + + This is used from H6C+ with hera_cal 4+. + + Parameters + ---------- + config_file : str + Full path to config file containing options. + mf_name : str + The name of makeflow file. Defaults to ".mf" if not + specified. + work_dir : str or Path, optional + The directory in which to write the makeflow file and wrapper files. + If not specified, the parent directory of the config file will be used. + """ + config_file = Path(config_file) + # read in config file + config = toml.load(config_file) + cf = config_file.name + + if mf_name is None: + mf_name = config_file.with_suffix(".mf").name + + work_dir = Path(work_dir or config_file.parent) + + makeflowfile = work_dir / mf_name + + # get LSTBIN arguments + lstbin_args = get_config_entry(config, "LSTBIN", "args", required=False) + + # set output_file_select to None + config["LSTBIN_OPTS"]["output_file_select"] = str("None") + config['LSTBIN_OPTS']['thisfile'] = str(config_file.absolute()) + + # get general options + path_to_do_scripts = Path(get_config_entry(config, "Options", "path_to_do_scripts")) + conda_env = get_config_entry(config, "Options", "conda_env", required=False) + source_script = get_config_entry(config, "Options", "source_script", required=False) + batch_system = get_config_entry(config, "Options", "batch_system", required=False) + timeout = _get_timeout(config) + + # determine whether or not to parallelize + parallelize = get_config_entry(config, "LSTBIN_OPTS", "parallelize", required=True) + + actions = get_config_entry(config, "WorkFlow", "actions", required=True) + if len(actions) > 1: + raise ValueError("This function only supports a single action in the workflow.") + if len(actions) == 0: + raise ValueError("No actions found in the workflow.") + action = actions[0] + + # define command + command = path_to_do_scripts / f"do_{action}.sh" + + # add resource information + base_mem = get_config_entry(config, "Options", "base_mem", required=True) + base_cpu = get_config_entry(config, "Options", "base_cpu", required=False) + mail_user = get_config_entry(config, "Options", "mail_user", required=False) + default_queue = get_config_entry( + config, "Options", "default_queue", required=False + ) + if default_queue is None: + default_queue = "hera" + batch_options = process_batch_options( + base_mem, base_cpu, mail_user, default_queue, batch_system + ) + + outdir = Path(get_config_entry(config, "LSTBIN_OPTS", "outdir")) + + # The new way in H6C+ (notebook interface) + nfiles = make_lstbin_config_file(config, outdir) + + if not parallelize: + nfiles = 1 + + source_script_line = f"source {source_script}" if source_script else "" + conda_env_line = f"conda activate {conda_env}" if conda_env else "" + cmd = f"{command} {{args}}" + cmdline = f"timeout {timeout} {cmdline}" if timeout is not None else cmd + + wrapper_template = f"""#!/bin/bash +{source_script_line} +{conda_env_line} +date +cd {work_dir} +{cmdline} +if [ $? -eq 0 ]; then + cd {work_dir} + touch {outfile} +else + mv {logfile} {logfile.parent / f"{logfile.name}.error"} +fi +date + """ + + # write makeflow file + with open(makeflowfile, "w") as fl: + # add comment at top of file listing date of creation and config file name + dt = time.strftime("%H:%M:%S on %d %B %Y") + fl.write( + f"""# makeflow file generated from config file {config_file.name} +# created at {dt} +export BATCH_OPTIONS = {batch_options} +""") + + # loop over output files + for output_file_index in range(nfiles): + # if parallize, update output_file_select + if parallelize: + config["LSTBIN_OPTS"]["output_file_select"] = str(output_file_index) + + # make outfile list + outfile = Path(f"{output_file_index:04}.LSTBIN.out") + + # get args list for lst-binning step + args = [ + str(get_config_entry(config, "LSTBIN_OPTS", a, required=True)) + for a in lstbin_args + ] + # turn into string + args = " ".join(args) + + # make logfile name + # logfile will capture stdout and stderr + logfile = work_dir / outfile.with_suffix(".log").name + + # make a small wrapper script that will run the actual command + # can't embed if; then statements in makeflow script + wrapper_script = work_dir / f"wrapper_{outfile.with_suffix('.sh').name}" + + with open(wrapper_script, "w") as f2: + f2.write(wrapper_template.format(args=args)) + + # make file executable + os.chmod(wrapper_script, 0o755) + + # first line lists target file to make (dummy output file), and requirements + # second line is "build rule", which runs the shell script and makes the output file + lines = f"{outfile}: {command}\n\t{wrapper_script} > {logfile} 2>&1\n" + fl.write(lines) + + # Write the toml config to the output directory. + shutil.copy2(config_file, outdir / "lstbin-config.toml") + + # Also write the conda_env export to the LSTbin dir + if conda_env is not None: + os.system( + f"conda env export -n {conda_env} --file {outdir}/environment.yaml" + ) def clean_wrapper_scripts(work_dir): """Clean up wrapper scripts from work directory.