Skip to content

Commit

Permalink
Merge pull request #61 from LSSTDESC/u/jchiang/prune_intermediates
Browse files Browse the repository at this point in the history
set port=0 for WorkQueueExecutor
  • Loading branch information
jchiang87 authored Jun 9, 2024
2 parents dfb717b + 586360e commit e5ea2e1
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 17 deletions.
2 changes: 1 addition & 1 deletion python/desc/gen3_workflow/config/parsl_configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ def set_config_options(retries, monitoring, workflow_name, checkpoint,

def workqueue_config(provider=None, monitoring=False, workflow_name=None,
checkpoint=False, retries=1, worker_options="",
wq_max_retries=1, port=9000, monitoring_debug=False,
wq_max_retries=1, port=0, monitoring_debug=False,
monitoring_hub_port=None, monitoring_interval=60,
coprocess=False,
**unused_options):
Expand Down
9 changes: 5 additions & 4 deletions python/desc/gen3_workflow/parsl_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,7 @@ def get_jobs(self, task_type, status='pending', query=None):
my_query += f' and status == "{status}"'
if query is not None:
my_query = ' and '.join((my_query, query))
return list(self.df.query(my_query)['job_name'])
return sorted(self.df.query(my_query)['job_name'])

def status(self, use_logs=False):
"""Print a summary of the workflow status."""
Expand Down Expand Up @@ -617,7 +617,7 @@ def restore(config_file, parsl_config=None, use_dfk=True):

return ParslGraph(generic_workflow, config, do_init=False, dfk=dfk)

def run(self, jobs=None, block=False):
def run(self, jobs=None, block=False, shutdown=True):
"""
Run the encapsulated workflow by requesting the futures of
the requested jobs or of those at the endpoints of the DAG.
Expand All @@ -638,8 +638,9 @@ def run(self, jobs=None, block=False):
# before the futures resolve.
_ = [future.exception() for future in futures]
self.finalize()
# Shutdown and dispose of the DataFlowKernel.
self.shutdown()
if shutdown:
# Shutdown and dispose of the DataFlowKernel.
self.shutdown()

def shutdown(self):
"""Shutdown and dispose of the Parsl DataFlowKernel. This will stop
Expand Down
4 changes: 2 additions & 2 deletions python/desc/gen3_workflow/resource_estimator/OverlapFinder.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import lsst.log
from lsst.afw.cameraGeom import DetectorType
from lsst.obs.base.utils import createInitialSkyWcsFromBoresight
from lsst.obs.lsst import LsstCamMapper
from lsst.obs.lsst import LsstCam

lsst.log.setLevel('', lsst.log.ERROR)

Expand All @@ -22,7 +22,7 @@
'unique_tuples']


LSSTCAM = LsstCamMapper().camera
LSSTCAM = LsstCam.getCamera()

class SkyMapPolygons:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import json
import numpy as np
import pandas as pd
from lsst.daf.butler import Butler, DimensionUniverse
from lsst.daf.butler import Butler
from lsst.pipe.base.graph import QuantumGraph


Expand Down Expand Up @@ -98,8 +98,8 @@ def tabulate_pipetask_resources(coadd_df, task_counts, pipetask_funcs,
Dataframe with the number of visits for each band-tract-patch
combination.
task_counts : dict
Dictionary of number of instances per task. Only counts for 'isr',
'makeWarp', and 'assembleCoadd' are used.
Dictionary of number of instances per task. Only counts for 'isr'
and 'makeWarp'
pipetask_funcs : dict
Dictionary of functions, keyed by task type. Each function should
return a tuple (cpu_time in hours, memory usage in GB) taking
Expand All @@ -119,7 +119,8 @@ def tabulate_pipetask_resources(coadd_df, task_counts, pipetask_funcs,

# sensor-visits:
num_ccd_visits = task_counts['isr']
for task_name in 'isr characterizeImage calibrate'.split():
for task_name in ('isr', 'characterizeImage', 'calibrate',
'writeSourceTable', 'transformSourceTable'):
if verbose:
print("processing", task_name)
pt_data['pipetask'].append(task_name)
Expand All @@ -142,7 +143,10 @@ def tabulate_pipetask_resources(coadd_df, task_counts, pipetask_funcs,
pt_data['avg_GB'].append(mem_GB)

for task_name in ('assembleCoadd', 'detection', 'measure',
'forcedPhotCoadd'):
'forcedPhotCoadd', 'selectGoodSeeingVisits',
'templateGen', 'selectDeepCoaddVisits',
'healSparsePropertyMaps',
):
if verbose:
print("processing", task_name, end=' ')
pt_data['pipetask'].append(task_name)
Expand All @@ -158,10 +162,11 @@ def tabulate_pipetask_resources(coadd_df, task_counts, pipetask_funcs,
pt_data['cpu_hours'].append(cpu_hours_total)
pt_data['max_GB'].append(np.max(memory))
pt_data['avg_GB'].append(np.mean(memory))
print(time.time() - t0)
if verbose:
print(time.time() - t0)

for task_name in ('mergeCoaddDetections', 'deblend',
'mergeCoaddMeasurements'):
'mergeMeasurements'):
try:
cpu_hours, mem_GB = pipetask_funcs[task_name]()
except KeyError:
Expand Down Expand Up @@ -198,7 +203,7 @@ def tabulate_data_product_sizes(qgraph_file, repo, collection):
by dataset type with tuple of (mean file size (GB), std file sizes (GB),
number of files in examples).
"""
qgraph = QuantumGraph.loadUri(qgraph_file, DimensionUniverse())
qgraph = QuantumGraph.loadUri(qgraph_file)

butler = Butler(repo, collections=[collection])
registry = butler.registry
Expand All @@ -223,8 +228,8 @@ def tabulate_data_product_sizes(qgraph_file, repo, collection):
return data


def total_node_hours(pt_df, cpu_factor=8, cores_per_node=68,
memory_per_node=96, memory_min=10):
def total_node_hours(pt_df, cpu_factor=1, cores_per_node=128,
memory_per_node=512, memory_min=10):
"""
Estimate the total number of node hours to do an image processing
run.
Expand Down

0 comments on commit e5ea2e1

Please sign in to comment.