Skip to content

Commit

Permalink
Merge pull request #55 from LSSTDESC/u/jchiang/monitoring_hub_configs
Browse files Browse the repository at this point in the history
monitoring hub configs
  • Loading branch information
jchiang87 authored Aug 15, 2023
2 parents 70e5b92 + a4ace87 commit 92eb4c9
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 16 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ jobs:
strategy:
matrix:
os: [ ubuntu-latest ]
py: [ 3.8 ]
py: [ "3.10" ]
CC: [ gcc ]
CXX: [ g++ ]

Expand All @@ -34,7 +34,7 @@ jobs:
uses: conda-incubator/setup-miniconda@v2
with:
activate-environment: stack
python-version: 3.8
python-version: "3.10"
condarc-file: etc/.condarc

- name: Install conda deps
Expand All @@ -50,7 +50,7 @@ jobs:
run: |
pip install -U --no-deps 'parsl[monitoring,workqueue] @ git+https://github.com/parsl/parsl@desc'
pip install typeguard tblib paramiko dill pyzmq globus-sdk sqlalchemy_utils
conda install -c conda-forge ndcctools=7.3.4=py38h4630a5e_0 --no-deps
conda install -c conda-forge ndcctools=7.6.1=py310he2ed3e8_0 --no-deps
- name: Clone the package and checkout the branch
shell: bash -l {0}
Expand Down
2 changes: 1 addition & 1 deletion conda_requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
# conda install --file conda_requirements should install all dependencies of gen3_workflow

stackvana>=0.2021.40
stackvana==0.2023.22
30 changes: 19 additions & 11 deletions python/desc/gen3_workflow/config/parsl_configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ def slurm_provider(nodes_per_block=1, constraint='knl', qos='regular',


def pbspro_provider(nodes_per_block=1, qos='expert', parallelism=0,
walltime='10:00:00', min_blocks=0, max_blocks=32,
walltime='10:00:00', min_blocks=0, max_blocks=32,
init_blocks=1, worker_init=None, cpus_per_node=1,
scheduler_options=None, **unused_options):
"""Factory function to provide a PBSProProvider.
Expand All @@ -105,8 +105,8 @@ def pbspro_provider(nodes_per_block=1, qos='expert', parallelism=0,
queue=qos,
channel=LocalChannel(),
nodes_per_block=nodes_per_block,
worker_init = worker_init,
cpus_per_node = cpus_per_node,
worker_init=worker_init,
cpus_per_node=cpus_per_node,
init_blocks=init_blocks,
min_blocks=min_blocks,
max_blocks=max_blocks,
Expand All @@ -117,7 +117,8 @@ def pbspro_provider(nodes_per_block=1, qos='expert', parallelism=0,


def set_config_options(retries, monitoring, workflow_name, checkpoint,
monitoring_debug):
monitoring_debug, monitoring_hub_port,
monitoring_interval):
"""
Package retries, monitoring, and checkpoint options for
parsl.config.Config as a dict.
Expand All @@ -126,9 +127,9 @@ def set_config_options(retries, monitoring, workflow_name, checkpoint,
if monitoring:
config_options['monitoring'] \
= MonitoringHub(hub_address=address_by_hostname(),
hub_port=55055,
hub_port=monitoring_hub_port,
monitoring_debug=monitoring_debug,
resource_monitoring_interval=60,
resource_monitoring_interval=monitoring_interval,
workflow_name=workflow_name)
if checkpoint:
config_options['checkpoint_mode'] = 'task_exit'
Expand All @@ -140,8 +141,11 @@ def set_config_options(retries, monitoring, workflow_name, checkpoint,
def workqueue_config(provider=None, monitoring=False, workflow_name=None,
checkpoint=False, retries=1, worker_options="",
wq_max_retries=1, port=9000, monitoring_debug=False,
monitoring_hub_port=None, monitoring_interval=60,
**unused_options):
"""Load a parsl config for a WorkQueueExecutor and the supplied provider."""
"""
Load a parsl config for a WorkQueueExecutor and the supplied provider.
"""
executors = [WorkQueueExecutor(label='work_queue', port=port,
shared_fs=True, provider=provider,
worker_options=worker_options,
Expand All @@ -150,7 +154,9 @@ def workqueue_config(provider=None, monitoring=False, workflow_name=None,
ThreadPoolExecutor(max_threads=1, label='submit-node')]

config_options = set_config_options(retries, monitoring, workflow_name,
checkpoint, monitoring_debug)
checkpoint, monitoring_debug,
monitoring_hub_port,
monitoring_interval)

config = parsl.config.Config(strategy='simple',
garbage_collect=False,
Expand All @@ -164,13 +170,15 @@ def thread_pool_config(max_threads=1, monitoring=False, workflow_name=None,
checkpoint=False, retries=1,
labels=('submit-node', 'batch-small',
'batch-medium', 'batch-large'),
monitoring_debug=False,
**unused_options):
monitoring_debug=False, monitoring_hub_port=None,
monitoring_interval=60, **unused_options):
"""Load a parsl config using ThreadPoolExecutor."""
executors = [ThreadPoolExecutor(max_threads=max_threads, label=label)
for label in labels]
config_options = set_config_options(retries, monitoring, workflow_name,
checkpoint, monitoring_debug)
checkpoint, monitoring_debug,
monitoring_hub_port,
monitoring_interval)
config = parsl.config.Config(executors=executors, **config_options)
return parsl.load(config)

Expand Down
2 changes: 1 addition & 1 deletion tests/cpBias_test/cpBias.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
description: Modified cpBias pipeline to run with unit tests
imports:
location: "$CP_PIPE_DIR/pipelines/cpBias.yaml"
location: "$CP_PIPE_DIR/pipelines/_ingredients/cpBias.yaml"
tasks:
isr:
class: lsst.ip.isr.IsrTask
Expand Down

0 comments on commit 92eb4c9

Please sign in to comment.