diff --git a/python/desc/gen3_workflow/config/parsl_configs.py b/python/desc/gen3_workflow/config/parsl_configs.py index 5632d6e..1394faa 100644 --- a/python/desc/gen3_workflow/config/parsl_configs.py +++ b/python/desc/gen3_workflow/config/parsl_configs.py @@ -140,7 +140,7 @@ def set_config_options(retries, monitoring, workflow_name, checkpoint, def workqueue_config(provider=None, monitoring=False, workflow_name=None, checkpoint=False, retries=1, worker_options="", - wq_max_retries=1, port=9000, monitoring_debug=False, + wq_max_retries=1, port=0, monitoring_debug=False, monitoring_hub_port=None, monitoring_interval=60, coprocess=False, **unused_options): diff --git a/python/desc/gen3_workflow/parsl_service.py b/python/desc/gen3_workflow/parsl_service.py index 206be9c..7f628b2 100644 --- a/python/desc/gen3_workflow/parsl_service.py +++ b/python/desc/gen3_workflow/parsl_service.py @@ -496,7 +496,7 @@ def get_jobs(self, task_type, status='pending', query=None): my_query += f' and status == "{status}"' if query is not None: my_query = ' and '.join((my_query, query)) - return list(self.df.query(my_query)['job_name']) + return sorted(self.df.query(my_query)['job_name']) def status(self, use_logs=False): """Print a summary of the workflow status.""" @@ -617,7 +617,7 @@ def restore(config_file, parsl_config=None, use_dfk=True): return ParslGraph(generic_workflow, config, do_init=False, dfk=dfk) - def run(self, jobs=None, block=False): + def run(self, jobs=None, block=False, shutdown=True): """ Run the encapsulated workflow by requesting the futures of the requested jobs or of those at the endpoints of the DAG. @@ -638,8 +638,9 @@ def run(self, jobs=None, block=False): # before the futures resolve. _ = [future.exception() for future in futures] self.finalize() - # Shutdown and dispose of the DataFlowKernel. - self.shutdown() + if shutdown: + # Shutdown and dispose of the DataFlowKernel. + self.shutdown() def shutdown(self): """Shutdown and dispose of the Parsl DataFlowKernel. This will stop diff --git a/python/desc/gen3_workflow/resource_estimator/OverlapFinder.py b/python/desc/gen3_workflow/resource_estimator/OverlapFinder.py index 55ffaa9..385daa3 100644 --- a/python/desc/gen3_workflow/resource_estimator/OverlapFinder.py +++ b/python/desc/gen3_workflow/resource_estimator/OverlapFinder.py @@ -13,7 +13,7 @@ import lsst.log from lsst.afw.cameraGeom import DetectorType from lsst.obs.base.utils import createInitialSkyWcsFromBoresight -from lsst.obs.lsst import LsstCamMapper +from lsst.obs.lsst import LsstCam lsst.log.setLevel('', lsst.log.ERROR) @@ -22,7 +22,7 @@ 'unique_tuples'] -LSSTCAM = LsstCamMapper().camera +LSSTCAM = LsstCam.getCamera() class SkyMapPolygons: diff --git a/python/desc/gen3_workflow/resource_estimator/tabulate_pipetask_resources.py b/python/desc/gen3_workflow/resource_estimator/tabulate_pipetask_resources.py index a9640ff..c7d8db1 100644 --- a/python/desc/gen3_workflow/resource_estimator/tabulate_pipetask_resources.py +++ b/python/desc/gen3_workflow/resource_estimator/tabulate_pipetask_resources.py @@ -7,7 +7,7 @@ import json import numpy as np import pandas as pd -from lsst.daf.butler import Butler, DimensionUniverse +from lsst.daf.butler import Butler from lsst.pipe.base.graph import QuantumGraph @@ -98,8 +98,8 @@ def tabulate_pipetask_resources(coadd_df, task_counts, pipetask_funcs, Dataframe with the number of visits for each band-tract-patch combination. task_counts : dict - Dictionary of number of instances per task. Only counts for 'isr', - 'makeWarp', and 'assembleCoadd' are used. + Dictionary of number of instances per task. Only counts for 'isr' + and 'makeWarp' pipetask_funcs : dict Dictionary of functions, keyed by task type. Each function should return a tuple (cpu_time in hours, memory usage in GB) taking @@ -119,7 +119,8 @@ def tabulate_pipetask_resources(coadd_df, task_counts, pipetask_funcs, # sensor-visits: num_ccd_visits = task_counts['isr'] - for task_name in 'isr characterizeImage calibrate'.split(): + for task_name in ('isr', 'characterizeImage', 'calibrate', + 'writeSourceTable', 'transformSourceTable'): if verbose: print("processing", task_name) pt_data['pipetask'].append(task_name) @@ -142,7 +143,10 @@ def tabulate_pipetask_resources(coadd_df, task_counts, pipetask_funcs, pt_data['avg_GB'].append(mem_GB) for task_name in ('assembleCoadd', 'detection', 'measure', - 'forcedPhotCoadd'): + 'forcedPhotCoadd', 'selectGoodSeeingVisits', + 'templateGen', 'selectDeepCoaddVisits', + 'healSparsePropertyMaps', + ): if verbose: print("processing", task_name, end=' ') pt_data['pipetask'].append(task_name) @@ -158,10 +162,11 @@ def tabulate_pipetask_resources(coadd_df, task_counts, pipetask_funcs, pt_data['cpu_hours'].append(cpu_hours_total) pt_data['max_GB'].append(np.max(memory)) pt_data['avg_GB'].append(np.mean(memory)) - print(time.time() - t0) + if verbose: + print(time.time() - t0) for task_name in ('mergeCoaddDetections', 'deblend', - 'mergeCoaddMeasurements'): + 'mergeMeasurements'): try: cpu_hours, mem_GB = pipetask_funcs[task_name]() except KeyError: @@ -198,7 +203,7 @@ def tabulate_data_product_sizes(qgraph_file, repo, collection): by dataset type with tuple of (mean file size (GB), std file sizes (GB), number of files in examples). """ - qgraph = QuantumGraph.loadUri(qgraph_file, DimensionUniverse()) + qgraph = QuantumGraph.loadUri(qgraph_file) butler = Butler(repo, collections=[collection]) registry = butler.registry @@ -223,8 +228,8 @@ def tabulate_data_product_sizes(qgraph_file, repo, collection): return data -def total_node_hours(pt_df, cpu_factor=8, cores_per_node=68, - memory_per_node=96, memory_min=10): +def total_node_hours(pt_df, cpu_factor=1, cores_per_node=128, + memory_per_node=512, memory_min=10): """ Estimate the total number of node hours to do an image processing run.