Skip to content

Commit

Permalink
Merge pull request #56 from LSSTDESC/u/jchiang/w_2023_32_qbb_changes
Browse files Browse the repository at this point in the history
w_2023_32 qbb changes
  • Loading branch information
jchiang87 authored Aug 29, 2023
2 parents 92eb4c9 + 39051a9 commit 926d1b7
Show file tree
Hide file tree
Showing 8 changed files with 40 additions and 87 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ jobs:
strategy:
matrix:
os: [ ubuntu-latest ]
py: [ "3.10" ]
py: [ "3.11" ]
CC: [ gcc ]
CXX: [ g++ ]

Expand All @@ -34,7 +34,7 @@ jobs:
uses: conda-incubator/setup-miniconda@v2
with:
activate-environment: stack
python-version: "3.10"
python-version: "3.11"
condarc-file: etc/.condarc

- name: Install conda deps
Expand All @@ -50,7 +50,7 @@ jobs:
run: |
pip install -U --no-deps 'parsl[monitoring,workqueue] @ git+https://github.com/parsl/parsl@desc'
pip install typeguard tblib paramiko dill pyzmq globus-sdk sqlalchemy_utils
conda install -c conda-forge ndcctools=7.6.1=py310he2ed3e8_0 --no-deps
conda install -c conda-forge ndcctools=7.6.1=py311h689c632_0 --no-deps
- name: Clone the package and checkout the branch
shell: bash -l {0}
Expand Down
2 changes: 1 addition & 1 deletion conda_requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
# conda install --file conda_requirements should install all dependencies of gen3_workflow

stackvana==0.2023.22
stackvana>=0.2023.32
1 change: 0 additions & 1 deletion python/desc/gen3_workflow/create_process_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ def process(self, jobs):
for job in jobs:
for prereq in job.prereqs:
if prereq.gwf_job is None:
# skip jobs to copy exec butler files
continue
self.edges.add(f'{prereq.gwf_job.label}->{job.gwf_job.label}')
if prereq.gwf_job.label in task_types:
Expand Down
3 changes: 0 additions & 3 deletions python/desc/gen3_workflow/etc/bps_drp_baseline.yaml
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
project: DESC
campaign: DC2

executionButler:
whenCreate: "PREPARE"

pipelineYaml: "${GEN3_WORKFLOW_DIR}/pipelines/DRP.yaml"

commandPrepend: "time"
Expand Down
34 changes: 17 additions & 17 deletions python/desc/gen3_workflow/lazy_cl_handling.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,36 +11,36 @@
import re


__all__ = ['fix_env_var_syntax', 'get_input_file_paths', 'insert_file_paths']
__all__ = ['fix_env_var_syntax', 'get_input_file_paths', 'insert_file_paths',
'resolve_env_vars']


def fix_env_var_syntax(oldstr):
"""Replace '<ENV: env_var>' with '${env_var}' througout the input string."""
def resolve_env_vars(oldstr):
"""
Replace '<ENV: env_var>' with `os.environ[env_var]` througout the
input string.
"""
newstr = oldstr
for key in re.findall(r"<ENV:([^>]+)>", oldstr):
newstr = newstr.replace(rf"<ENV:{key}>", "${%s}" % key)
newstr = newstr.replace(rf"<ENV:{key}>", "%s" % os.environ[key])
return newstr


def exec_butler_tmp_dir(exec_butler_dir, job_name, tmp_dirname):
"""Construct the job-specific path for the non-shared copy of the
execution butler repo."""
return os.path.join(os.path.dirname(exec_butler_dir), tmp_dirname,
job_name)
def fix_env_var_syntax(oldstr):
"""
Replace '<ENV: env_var>' with '${env_var}' throughout the input string.
"""
newstr = oldstr
for key in re.findall(r"<ENV:([^>]+)>", oldstr):
newstr = newstr.replace(rf"<ENV:{key}>", "${%s}" % key)
return newstr


def get_input_file_paths(generic_workflow, job_name, tmp_dirname='tmp_repos'):
"""Return a dictionary of file paths, keyed by input name."""
file_paths = dict()
for item in generic_workflow.get_job_inputs(job_name):
if (item.name == 'butlerConfig' and not item.job_shared and
job_name != 'pipetaskInit'): # pipetaskInit needs special handling
exec_butler_dir = os.path.dirname(item.src_uri) \
if item.src_uri.endswith('butler.yaml') else item.src_uri
file_paths[item.name] \
= exec_butler_tmp_dir(exec_butler_dir, job_name, tmp_dirname)
else:
file_paths[item.name] = item.src_uri
file_paths[item.name] = resolve_env_vars(item.src_uri)
return file_paths


Expand Down
78 changes: 17 additions & 61 deletions python/desc/gen3_workflow/parsl_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,38 +234,8 @@ def command_line(self):
pipetask_cmd \
= self.parent_graph.evaluate_command_line(pipetask_cmd, self.gwf_job)

exec_butler_dir = self.config['executionButlerDir']
if not os.path.isdir(exec_butler_dir):
# We're not using the execution butler so omit the file
# staging parts.
return (pipetask_cmd +
' && >&2 echo success || (>&2 echo failure; false)')

# Command line for the execution butler including lines to
# stage and un-stage the copies of the registry and
# butler.yaml file.
target_dir = os.path.join(os.path.dirname(exec_butler_dir),
self.parent_graph.tmp_dirname,
self.gwf_job.name)
my_command = f"""
if [[ ! -d {target_dir} ]];
then
mkdir {target_dir}
fi
cp {exec_butler_dir}/* {target_dir}/
{pipetask_cmd}
retcode=$?
rm -rf {target_dir}
if [[ $retcode != "0" ]];
then
>&2 echo failure
false
else
>&2 echo success
true
fi
"""
return my_command
return (pipetask_cmd +
' && >&2 echo success || (>&2 echo failure; false)')

def add_dependency(self, dependency):
"""
Expand Down Expand Up @@ -424,6 +394,7 @@ def __init__(self, generic_workflow, config, do_init=True, dfk=None,
self.dfk = dfk
self.tmp_dirname = 'tmp_repos'
self._ingest()
self._qgraph_file = None
self._qgraph = None
self.monitoring_db = monitoring_db

Expand All @@ -440,13 +411,6 @@ def _ingest(self):
if job_name == 'pipetaskInit':
continue

# If using the execution butler, create a temp directory
# to contain copies of the exec butler repo.
exec_butler_dir = self.config['executionButlerDir']
if os.path.isdir(exec_butler_dir):
os.makedirs(os.path.join(os.path.dirname(exec_butler_dir),
self.tmp_dirname), exist_ok=True)

task_name = get_task_name(job_name, self.config)
if task_name not in self._task_list:
self._task_list.append(task_name)
Expand Down Expand Up @@ -508,13 +472,19 @@ def int_cast(value):
data['status'].append(job.status)
self.df = pd.DataFrame(data=data)

@property
def qgraph_file(self):
if self._qgraph_file is None:
self._qgraph_file = glob.glob(os.path.join(
self.config['submitPath'], '*.qgraph'))[0]
return self._qgraph_file

@property
def qgraph(self):
"""The QuantumGraph associated with the current bps job."""
if self._qgraph is None:
qgraph_file = glob.glob(os.path.join(self.config['submitPath'],
'*.qgraph'))[0]
self._qgraph = QuantumGraph.loadUri(qgraph_file, DimensionUniverse())
self._qgraph = QuantumGraph.loadUri(self.qgraph_file,
DimensionUniverse())
return self._qgraph

def get_jobs(self, task_type, status='pending', query=None):
Expand Down Expand Up @@ -668,14 +638,7 @@ def run(self, jobs=None, block=False):
# non-interactive python process that would otherwise end
# before the futures resolve.
_ = [future.exception() for future in futures]
if self.config['executionButler']['whenCreate'] != 'NEVER':
# Since we're using the execution butler and running
# non-interactively, run self.finalize()
# to transfer datasets to the destination butler.
self.finalize()
# Clean up any remaining temporary copies of the execution
# butler repos.
self.clean_up_exec_butler_files()
self.finalize()
# Shutdown and dispose of the DataFlowKernel.
self.shutdown()

Expand All @@ -691,22 +654,15 @@ def shutdown(self):
parsl.DataFlowKernelLoader.clear()

def finalize(self):
"""Run final job to transfer datasets from the execution butler to
the destination repo butler."""
"""Run final job to transfer datasets from the quantum-backed
butler to the destination repo butler."""
log_file = os.path.join(self.config['submitPath'], 'logging',
'final_merge_job.log')
command = (f"(bash {self.config['submitPath']}/final_job.bash "
f"{self.config['butlerConfig']} "
f"{self.config['executionButlerTemplate']}) >& {log_file}")
f"{self.qgraph_file} "
f"{self.config['butlerConfig']}) >& {log_file}")
subprocess.check_call(command, shell=True, executable='/bin/bash')

def clean_up_exec_butler_files(self):
"""Clean up the copies of the execution butler."""
temp_root = os.path.dirname(self.config['executionButlerTemplate'])
temp_repo_dir = os.path.join(temp_root, 'tmp_repos')
if os.path.isdir(temp_repo_dir):
shutil.rmtree(temp_repo_dir)


class ParslService(BaseWmsService):
"""Parsl-based implementation for the WMS interface."""
Expand Down
2 changes: 1 addition & 1 deletion tests/cpBias_test/bps_cpBias.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ pipelineYaml: "${PWD}/cpBias.yaml"

payload:
payloadName: bot_13035_R22_S11_cpBias
butlerConfig: test_repo
butlerConfig: ${PWD}/test_repo
inCollection: LSSTCam/raw/all,LSSTCam/calib
dataQuery: "instrument='LSSTCam'"
outputRun: "u/lsst/{payloadName}/test_run"
Expand Down
1 change: 1 addition & 0 deletions tests/test_bps_restart.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ def test_bps_restart(self):
"""Test the bps restart command for the Parsl plugin."""
# Run bps restart --id <workflow_name>.
os.chdir(self.tmp_dir)
os.environ['PWD'] = self.tmp_dir
os.environ['BPS_WMS_SERVICE_CLASS'] = 'desc.gen3_workflow.ParslService'
workflow_name = 'u/lsst/bot_13035_R22_S11_cpBias/test_run'
command = f'bps restart --id {workflow_name}'
Expand Down

0 comments on commit 926d1b7

Please sign in to comment.