diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index ee942c2..77aa296 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -18,7 +18,7 @@ jobs: strategy: matrix: os: [ ubuntu-latest ] - py: [ 3.8 ] + py: [ "3.10" ] CC: [ gcc ] CXX: [ g++ ] @@ -34,7 +34,7 @@ jobs: uses: conda-incubator/setup-miniconda@v2 with: activate-environment: stack - python-version: 3.8 + python-version: "3.10" condarc-file: etc/.condarc - name: Install conda deps @@ -50,7 +50,7 @@ jobs: run: | pip install -U --no-deps 'parsl[monitoring,workqueue] @ git+https://github.com/parsl/parsl@desc' pip install typeguard tblib paramiko dill pyzmq globus-sdk sqlalchemy_utils - conda install -c conda-forge ndcctools=7.3.4=py38h4630a5e_0 --no-deps + conda install -c conda-forge ndcctools=7.6.1=py310he2ed3e8_0 --no-deps - name: Clone the package and checkout the branch shell: bash -l {0} diff --git a/conda_requirements.txt b/conda_requirements.txt index 331ee9e..dd479f1 100644 --- a/conda_requirements.txt +++ b/conda_requirements.txt @@ -1,3 +1,3 @@ # conda install --file conda_requirements should install all dependencies of gen3_workflow -stackvana>=0.2021.40 +stackvana==0.2023.22 diff --git a/python/desc/gen3_workflow/config/parsl_configs.py b/python/desc/gen3_workflow/config/parsl_configs.py index 1076394..cc321cb 100644 --- a/python/desc/gen3_workflow/config/parsl_configs.py +++ b/python/desc/gen3_workflow/config/parsl_configs.py @@ -92,7 +92,7 @@ def slurm_provider(nodes_per_block=1, constraint='knl', qos='regular', def pbspro_provider(nodes_per_block=1, qos='expert', parallelism=0, - walltime='10:00:00', min_blocks=0, max_blocks=32, + walltime='10:00:00', min_blocks=0, max_blocks=32, init_blocks=1, worker_init=None, cpus_per_node=1, scheduler_options=None, **unused_options): """Factory function to provide a PBSProProvider. @@ -105,8 +105,8 @@ def pbspro_provider(nodes_per_block=1, qos='expert', parallelism=0, queue=qos, channel=LocalChannel(), nodes_per_block=nodes_per_block, - worker_init = worker_init, - cpus_per_node = cpus_per_node, + worker_init=worker_init, + cpus_per_node=cpus_per_node, init_blocks=init_blocks, min_blocks=min_blocks, max_blocks=max_blocks, @@ -117,7 +117,8 @@ def pbspro_provider(nodes_per_block=1, qos='expert', parallelism=0, def set_config_options(retries, monitoring, workflow_name, checkpoint, - monitoring_debug): + monitoring_debug, monitoring_hub_port, + monitoring_interval): """ Package retries, monitoring, and checkpoint options for parsl.config.Config as a dict. @@ -126,9 +127,9 @@ def set_config_options(retries, monitoring, workflow_name, checkpoint, if monitoring: config_options['monitoring'] \ = MonitoringHub(hub_address=address_by_hostname(), - hub_port=55055, + hub_port=monitoring_hub_port, monitoring_debug=monitoring_debug, - resource_monitoring_interval=60, + resource_monitoring_interval=monitoring_interval, workflow_name=workflow_name) if checkpoint: config_options['checkpoint_mode'] = 'task_exit' @@ -140,8 +141,11 @@ def set_config_options(retries, monitoring, workflow_name, checkpoint, def workqueue_config(provider=None, monitoring=False, workflow_name=None, checkpoint=False, retries=1, worker_options="", wq_max_retries=1, port=9000, monitoring_debug=False, + monitoring_hub_port=None, monitoring_interval=60, **unused_options): - """Load a parsl config for a WorkQueueExecutor and the supplied provider.""" + """ + Load a parsl config for a WorkQueueExecutor and the supplied provider. + """ executors = [WorkQueueExecutor(label='work_queue', port=port, shared_fs=True, provider=provider, worker_options=worker_options, @@ -150,7 +154,9 @@ def workqueue_config(provider=None, monitoring=False, workflow_name=None, ThreadPoolExecutor(max_threads=1, label='submit-node')] config_options = set_config_options(retries, monitoring, workflow_name, - checkpoint, monitoring_debug) + checkpoint, monitoring_debug, + monitoring_hub_port, + monitoring_interval) config = parsl.config.Config(strategy='simple', garbage_collect=False, @@ -164,13 +170,15 @@ def thread_pool_config(max_threads=1, monitoring=False, workflow_name=None, checkpoint=False, retries=1, labels=('submit-node', 'batch-small', 'batch-medium', 'batch-large'), - monitoring_debug=False, - **unused_options): + monitoring_debug=False, monitoring_hub_port=None, + monitoring_interval=60, **unused_options): """Load a parsl config using ThreadPoolExecutor.""" executors = [ThreadPoolExecutor(max_threads=max_threads, label=label) for label in labels] config_options = set_config_options(retries, monitoring, workflow_name, - checkpoint, monitoring_debug) + checkpoint, monitoring_debug, + monitoring_hub_port, + monitoring_interval) config = parsl.config.Config(executors=executors, **config_options) return parsl.load(config) diff --git a/tests/cpBias_test/cpBias.yaml b/tests/cpBias_test/cpBias.yaml index 5874cfb..90f56ab 100644 --- a/tests/cpBias_test/cpBias.yaml +++ b/tests/cpBias_test/cpBias.yaml @@ -1,6 +1,6 @@ description: Modified cpBias pipeline to run with unit tests imports: - location: "$CP_PIPE_DIR/pipelines/cpBias.yaml" + location: "$CP_PIPE_DIR/pipelines/_ingredients/cpBias.yaml" tasks: isr: class: lsst.ip.isr.IsrTask