From 89fee898e8f6a1d2b649d28f0001e4a41456ef7c Mon Sep 17 00:00:00 2001 From: James Chiang Date: Mon, 14 Aug 2023 14:06:19 -0700 Subject: [PATCH 1/6] switch over to quantum-backed butler --- .../gen3_workflow/etc/bps_drp_baseline.yaml | 3 --- python/desc/gen3_workflow/parsl_service.py | 17 ++++++++++++----- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/python/desc/gen3_workflow/etc/bps_drp_baseline.yaml b/python/desc/gen3_workflow/etc/bps_drp_baseline.yaml index fb08544..286ea16 100644 --- a/python/desc/gen3_workflow/etc/bps_drp_baseline.yaml +++ b/python/desc/gen3_workflow/etc/bps_drp_baseline.yaml @@ -1,9 +1,6 @@ project: DESC campaign: DC2 -executionButler: - whenCreate: "PREPARE" - pipelineYaml: "${GEN3_WORKFLOW_DIR}/pipelines/DRP.yaml" commandPrepend: "time" diff --git a/python/desc/gen3_workflow/parsl_service.py b/python/desc/gen3_workflow/parsl_service.py index e885996..8b9453a 100644 --- a/python/desc/gen3_workflow/parsl_service.py +++ b/python/desc/gen3_workflow/parsl_service.py @@ -424,6 +424,7 @@ def __init__(self, generic_workflow, config, do_init=True, dfk=None, self.dfk = dfk self.tmp_dirname = 'tmp_repos' self._ingest() + self._qgraph_file = None self._qgraph = None self.monitoring_db = monitoring_db @@ -508,13 +509,19 @@ def int_cast(value): data['status'].append(job.status) self.df = pd.DataFrame(data=data) + @property + def qgraph_file(self): + if self._qgraph_file is None: + self._qgraph_file = glob.glob(os.path.join( + self.config['submitPath'], '*.qgraph'))[0] + return self._qgraph_file + @property def qgraph(self): """The QuantumGraph associated with the current bps job.""" if self._qgraph is None: - qgraph_file = glob.glob(os.path.join(self.config['submitPath'], - '*.qgraph'))[0] - self._qgraph = QuantumGraph.loadUri(qgraph_file, DimensionUniverse()) + self._qgraph = QuantumGraph.loadUri(self.qgraph_file, + DimensionUniverse()) return self._qgraph def get_jobs(self, task_type, status='pending', query=None): @@ -696,8 +703,8 @@ def finalize(self): log_file = os.path.join(self.config['submitPath'], 'logging', 'final_merge_job.log') command = (f"(bash {self.config['submitPath']}/final_job.bash " - f"{self.config['butlerConfig']} " - f"{self.config['executionButlerTemplate']}) >& {log_file}") + f"{self.qgraph_file} " + f"{self.config['butlerConfig']}) >& {log_file}") subprocess.check_call(command, shell=True, executable='/bin/bash') def clean_up_exec_butler_files(self): From cfbdd581c4192b23214e832e2043312a68741c9c Mon Sep 17 00:00:00 2001 From: James Chiang Date: Tue, 15 Aug 2023 15:56:17 -0700 Subject: [PATCH 2/6] update stackvana and ndcctools to use recent python 3.11 compatible versions --- .github/workflows/ci.yaml | 6 +++--- conda_requirements.txt | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 77aa296..bc07610 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -18,7 +18,7 @@ jobs: strategy: matrix: os: [ ubuntu-latest ] - py: [ "3.10" ] + py: [ "3.11" ] CC: [ gcc ] CXX: [ g++ ] @@ -34,7 +34,7 @@ jobs: uses: conda-incubator/setup-miniconda@v2 with: activate-environment: stack - python-version: "3.10" + python-version: "3.11" condarc-file: etc/.condarc - name: Install conda deps @@ -50,7 +50,7 @@ jobs: run: | pip install -U --no-deps 'parsl[monitoring,workqueue] @ git+https://github.com/parsl/parsl@desc' pip install typeguard tblib paramiko dill pyzmq globus-sdk sqlalchemy_utils - conda install -c conda-forge ndcctools=7.6.1=py310he2ed3e8_0 --no-deps + conda install -c conda-forge ndcctools=7.6.1=py311h689c632_0 --no-deps - name: Clone the package and checkout the branch shell: bash -l {0} diff --git a/conda_requirements.txt b/conda_requirements.txt index dd479f1..6dd78c2 100644 --- a/conda_requirements.txt +++ b/conda_requirements.txt @@ -1,3 +1,3 @@ # conda install --file conda_requirements should install all dependencies of gen3_workflow -stackvana==0.2023.22 +stackvana>=0.2023.32 From 226ca0d8ce4674c224c6b76ee189113bd399d8c7 Mon Sep 17 00:00:00 2001 From: James Chiang Date: Tue, 29 Aug 2023 11:00:53 -0700 Subject: [PATCH 3/6] remove execution butler support for quantum-backed butler instead --- python/desc/gen3_workflow/lazy_cl_handling.py | 20 +++++- python/desc/gen3_workflow/parsl_service.py | 61 ++----------------- 2 files changed, 22 insertions(+), 59 deletions(-) diff --git a/python/desc/gen3_workflow/lazy_cl_handling.py b/python/desc/gen3_workflow/lazy_cl_handling.py index af7ca85..a2bd90a 100644 --- a/python/desc/gen3_workflow/lazy_cl_handling.py +++ b/python/desc/gen3_workflow/lazy_cl_handling.py @@ -11,11 +11,25 @@ import re -__all__ = ['fix_env_var_syntax', 'get_input_file_paths', 'insert_file_paths'] +__all__ = ['fix_env_var_syntax', 'get_input_file_paths', 'insert_file_paths', + 'resolve_env_vars'] + + +def resolve_env_vars(oldstr): + """ + Replace '' with `os.environ[env_var]` througout the + input string. + """ + newstr = oldstr + for key in re.findall(r"]+)>", oldstr): + newstr = newstr.replace(rf"", "%s" % os.environ[key]) + return newstr def fix_env_var_syntax(oldstr): - """Replace '' with '${env_var}' througout the input string.""" + """ + Replace '' with '${env_var}' throughout the input string. + """ newstr = oldstr for key in re.findall(r"]+)>", oldstr): newstr = newstr.replace(rf"", "${%s}" % key) @@ -40,7 +54,7 @@ def get_input_file_paths(generic_workflow, job_name, tmp_dirname='tmp_repos'): file_paths[item.name] \ = exec_butler_tmp_dir(exec_butler_dir, job_name, tmp_dirname) else: - file_paths[item.name] = item.src_uri + file_paths[item.name] = resolve_env_vars(item.src_uri) return file_paths diff --git a/python/desc/gen3_workflow/parsl_service.py b/python/desc/gen3_workflow/parsl_service.py index 8b9453a..0e1ba35 100644 --- a/python/desc/gen3_workflow/parsl_service.py +++ b/python/desc/gen3_workflow/parsl_service.py @@ -234,38 +234,8 @@ def command_line(self): pipetask_cmd \ = self.parent_graph.evaluate_command_line(pipetask_cmd, self.gwf_job) - exec_butler_dir = self.config['executionButlerDir'] - if not os.path.isdir(exec_butler_dir): - # We're not using the execution butler so omit the file - # staging parts. - return (pipetask_cmd + - ' && >&2 echo success || (>&2 echo failure; false)') - - # Command line for the execution butler including lines to - # stage and un-stage the copies of the registry and - # butler.yaml file. - target_dir = os.path.join(os.path.dirname(exec_butler_dir), - self.parent_graph.tmp_dirname, - self.gwf_job.name) - my_command = f""" -if [[ ! -d {target_dir} ]]; -then - mkdir {target_dir} -fi -cp {exec_butler_dir}/* {target_dir}/ -{pipetask_cmd} -retcode=$? -rm -rf {target_dir} -if [[ $retcode != "0" ]]; -then - >&2 echo failure - false -else - >&2 echo success - true -fi -""" - return my_command + return (pipetask_cmd + + ' && >&2 echo success || (>&2 echo failure; false)') def add_dependency(self, dependency): """ @@ -441,13 +411,6 @@ def _ingest(self): if job_name == 'pipetaskInit': continue - # If using the execution butler, create a temp directory - # to contain copies of the exec butler repo. - exec_butler_dir = self.config['executionButlerDir'] - if os.path.isdir(exec_butler_dir): - os.makedirs(os.path.join(os.path.dirname(exec_butler_dir), - self.tmp_dirname), exist_ok=True) - task_name = get_task_name(job_name, self.config) if task_name not in self._task_list: self._task_list.append(task_name) @@ -675,14 +638,7 @@ def run(self, jobs=None, block=False): # non-interactive python process that would otherwise end # before the futures resolve. _ = [future.exception() for future in futures] - if self.config['executionButler']['whenCreate'] != 'NEVER': - # Since we're using the execution butler and running - # non-interactively, run self.finalize() - # to transfer datasets to the destination butler. - self.finalize() - # Clean up any remaining temporary copies of the execution - # butler repos. - self.clean_up_exec_butler_files() + self.finalize() # Shutdown and dispose of the DataFlowKernel. self.shutdown() @@ -698,8 +654,8 @@ def shutdown(self): parsl.DataFlowKernelLoader.clear() def finalize(self): - """Run final job to transfer datasets from the execution butler to - the destination repo butler.""" + """Run final job to transfer datasets from the quantum-backed + butler to the destination repo butler.""" log_file = os.path.join(self.config['submitPath'], 'logging', 'final_merge_job.log') command = (f"(bash {self.config['submitPath']}/final_job.bash " @@ -707,13 +663,6 @@ def finalize(self): f"{self.config['butlerConfig']}) >& {log_file}") subprocess.check_call(command, shell=True, executable='/bin/bash') - def clean_up_exec_butler_files(self): - """Clean up the copies of the execution butler.""" - temp_root = os.path.dirname(self.config['executionButlerTemplate']) - temp_repo_dir = os.path.join(temp_root, 'tmp_repos') - if os.path.isdir(temp_repo_dir): - shutil.rmtree(temp_repo_dir) - class ParslService(BaseWmsService): """Parsl-based implementation for the WMS interface.""" From 71894452bf088e8493c8428f7d5ed2ec4535419d Mon Sep 17 00:00:00 2001 From: James Chiang Date: Tue, 29 Aug 2023 14:01:52 -0700 Subject: [PATCH 4/6] add /home/jchiang/dev/gen3_workflow to butlerConfig path --- tests/cpBias_test/bps_cpBias.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/cpBias_test/bps_cpBias.yaml b/tests/cpBias_test/bps_cpBias.yaml index c7cbc8f..9590b66 100644 --- a/tests/cpBias_test/bps_cpBias.yaml +++ b/tests/cpBias_test/bps_cpBias.yaml @@ -5,7 +5,7 @@ pipelineYaml: "${PWD}/cpBias.yaml" payload: payloadName: bot_13035_R22_S11_cpBias - butlerConfig: test_repo + butlerConfig: ${PWD}/test_repo inCollection: LSSTCam/raw/all,LSSTCam/calib dataQuery: "instrument='LSSTCam'" outputRun: "u/lsst/{payloadName}/test_run" From 7b47fb0e4608eb27a8cf99966c3d634f6b58bad7 Mon Sep 17 00:00:00 2001 From: James Chiang Date: Tue, 29 Aug 2023 14:02:51 -0700 Subject: [PATCH 5/6] set PWD env var in python since it's used to find the path to the test repo by expanding butlerConfig --- tests/test_bps_restart.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/test_bps_restart.py b/tests/test_bps_restart.py index 98efa1b..c6234b2 100644 --- a/tests/test_bps_restart.py +++ b/tests/test_bps_restart.py @@ -33,6 +33,7 @@ def test_bps_restart(self): """Test the bps restart command for the Parsl plugin.""" # Run bps restart --id . os.chdir(self.tmp_dir) + os.environ['PWD'] = self.tmp_dir os.environ['BPS_WMS_SERVICE_CLASS'] = 'desc.gen3_workflow.ParslService' workflow_name = 'u/lsst/bot_13035_R22_S11_cpBias/test_run' command = f'bps restart --id {workflow_name}' From 39051a96a7e79477f0cdf67d99dc8ebf09ec06be Mon Sep 17 00:00:00 2001 From: James Chiang Date: Tue, 29 Aug 2023 14:27:25 -0700 Subject: [PATCH 6/6] remove exec butler specific code --- python/desc/gen3_workflow/create_process_dag.py | 1 - python/desc/gen3_workflow/lazy_cl_handling.py | 16 +--------------- 2 files changed, 1 insertion(+), 16 deletions(-) diff --git a/python/desc/gen3_workflow/create_process_dag.py b/python/desc/gen3_workflow/create_process_dag.py index caa819c..4700374 100644 --- a/python/desc/gen3_workflow/create_process_dag.py +++ b/python/desc/gen3_workflow/create_process_dag.py @@ -10,7 +10,6 @@ def process(self, jobs): for job in jobs: for prereq in job.prereqs: if prereq.gwf_job is None: - # skip jobs to copy exec butler files continue self.edges.add(f'{prereq.gwf_job.label}->{job.gwf_job.label}') if prereq.gwf_job.label in task_types: diff --git a/python/desc/gen3_workflow/lazy_cl_handling.py b/python/desc/gen3_workflow/lazy_cl_handling.py index a2bd90a..c7f96ed 100644 --- a/python/desc/gen3_workflow/lazy_cl_handling.py +++ b/python/desc/gen3_workflow/lazy_cl_handling.py @@ -36,25 +36,11 @@ def fix_env_var_syntax(oldstr): return newstr -def exec_butler_tmp_dir(exec_butler_dir, job_name, tmp_dirname): - """Construct the job-specific path for the non-shared copy of the - execution butler repo.""" - return os.path.join(os.path.dirname(exec_butler_dir), tmp_dirname, - job_name) - - def get_input_file_paths(generic_workflow, job_name, tmp_dirname='tmp_repos'): """Return a dictionary of file paths, keyed by input name.""" file_paths = dict() for item in generic_workflow.get_job_inputs(job_name): - if (item.name == 'butlerConfig' and not item.job_shared and - job_name != 'pipetaskInit'): # pipetaskInit needs special handling - exec_butler_dir = os.path.dirname(item.src_uri) \ - if item.src_uri.endswith('butler.yaml') else item.src_uri - file_paths[item.name] \ - = exec_butler_tmp_dir(exec_butler_dir, job_name, tmp_dirname) - else: - file_paths[item.name] = resolve_env_vars(item.src_uri) + file_paths[item.name] = resolve_env_vars(item.src_uri) return file_paths