Skip to content

Commit

Permalink
[SimWF] Use __global_init_task__ more consistently (#1518)
Browse files Browse the repository at this point in the history
* centralise function that creates the task

* apply also when using AnalysisQC CLI

Co-authored-by: Benedikt Volkel <[email protected]>
  • Loading branch information
benedikt-voelkel and Benedikt Volkel committed Apr 26, 2024
1 parent a3f8448 commit e3709fc
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 20 deletions.
8 changes: 6 additions & 2 deletions MC/analysis_testing/o2dpg_analysis_test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@
o2dpg_workflow_utils = importlib.util.module_from_spec(spec)
sys.modules[module_name] = o2dpg_workflow_utils
spec.loader.exec_module(o2dpg_workflow_utils)
from o2dpg_workflow_utils import createTask, dump_workflow
from o2dpg_workflow_utils import createTask, dump_workflow, createGlobalInitTask

module_name = "o2dpg_analysis_test_utils"
spec = importlib.util.spec_from_file_location(module_name, join(O2DPG_ROOT, "MC", "analysis_testing", "o2dpg_analysis_test_utils.py"))
Expand Down Expand Up @@ -332,7 +332,9 @@ def run(args):
print("ERROR: QC upload was requested, however in that case a --pass-name and --period-name are required")
return 1

workflow = []
### setup global environment variables which are valid for all tasks, set as first task
global_env = {"ALICEO2_CCDB_CONDITION_NOT_AFTER": args.condition_not_after} if args.condition_not_after else None
workflow = [createGlobalInitTask(global_env)]
add_analysis_tasks(workflow, args.input_file, expanduser(args.analysis_dir), is_mc=args.is_mc, analyses_only=args.only_analyses, autoset_converters=args.autoset_converters, include_disabled_analyses=args.include_disabled, timeout=args.timeout, collision_system=args.collision_system, add_common_args=args.add_common_args)
if args.with_qc_upload:
add_analysis_qc_upload_tasks(workflow, args.period_name, args.run_number, args.pass_name)
Expand Down Expand Up @@ -360,6 +362,8 @@ def main():
parser.add_argument("--timeout", type=int, default=None, help="Timeout for analysis tasks in seconds.")
parser.add_argument("--collision-system", dest="collision_system", help="Set the collision system. If not set, tried to be derived from ALIEN_JDL_LPMInterationType. Fallback to pp")
parser.add_argument("--add-common-args", dest="add_common_args", nargs="*", help="Pass additional common arguments per analysis, for instance --add-common-args EMCAL-shm-segment-size 2500000000 will add --shm-segment-size 2500000000 to the EMCAL analysis")
parser.add_argument('--condition-not-after', dest="condition_not_after", type=int, help="only consider CCDB objects not created after this timestamp (for TimeMachine)", default=3385078236000)

parser.set_defaults(func=run)
args = parser.parse_args()
return(args.func(args))
Expand Down
18 changes: 3 additions & 15 deletions MC/bin/o2dpg_sim_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import sys
import importlib.util
import argparse
from os import environ, mkdir, getcwd
from os import environ, mkdir
from os.path import join, dirname, isdir, isabs
import random
import json
Expand Down Expand Up @@ -326,20 +326,8 @@ def extractVertexArgs(configKeyValuesStr, finalDiamondDict):
workflow['stages'] = []

### setup global environment variables which are valid for all tasks
globalenv = {}
if args.condition_not_after:
# this is for the time-machine CCDB mechanism
globalenv['ALICEO2_CCDB_CONDITION_NOT_AFTER'] = args.condition_not_after
# this is enforcing the use of local CCDB caching
if environ.get('ALICEO2_CCDB_LOCALCACHE') == None:
print ("ALICEO2_CCDB_LOCALCACHE not set; setting to default " + getcwd() + '/ccdb')
globalenv['ALICEO2_CCDB_LOCALCACHE'] = getcwd() + "/ccdb"
else:
# fixes the workflow to use and remember externally provided path
globalenv['ALICEO2_CCDB_LOCALCACHE'] = environ.get('ALICEO2_CCDB_LOCALCACHE')
globalenv['IGNORE_VALIDITYCHECK_OF_CCDB_LOCALCACHE'] = '${ALICEO2_CCDB_LOCALCACHE:+"ON"}'

globalinittask = createGlobalInitTask(globalenv)
global_env = {'ALICEO2_CCDB_CONDITION_NOT_AFTER': args.condition_not_after} if args.condition_not_after else None
globalinittask = createGlobalInitTask(global_env)
globalinittask['cmd'] = 'o2-ccdb-cleansemaphores -p ${ALICEO2_CCDB_LOCALCACHE}'
workflow['stages'].append(globalinittask)
####
Expand Down
33 changes: 30 additions & 3 deletions MC/bin/o2dpg_workflow_utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#!/usr/bin/env python3

from os import environ, getcwd
from copy import deepcopy
import json

Expand Down Expand Up @@ -84,18 +85,44 @@ def createTask(name='', needs=[], tf=-1, cwd='./', lab=[], cpu=1, relative_cpu=N
'cwd' : cwd }


def createGlobalInitTask(envdict):
def createGlobalInitTask(keys_values=None, set_defaults=True):
"""Returns a special task that is recognized by the executor as
a task whose environment section is to be globally applied to all tasks of
a workflow.
envdict: dictionary of environment variables and values to be globally applied to all tasks
Args:
keys_values: dict or None
dictionary of environment variables and values to be globally applied to all tasks
if sharing keys with defaults, keys_values takes precedence
set_defaults: bool
whether or not some default values will be added
Returns:
dict: task dictionary
"""

# dictionary holding global environment to be passed to task
env_dict = {}

if set_defaults:
if environ.get('ALICEO2_CCDB_LOCALCACHE') is None:
print ("ALICEO2_CCDB_LOCALCACHE not set; setting to default " + getcwd() + '/ccdb')
env_dict['ALICEO2_CCDB_LOCALCACHE'] = getcwd() + "/ccdb"
else:
# fixes the workflow to use and remember externally provided path
env_dict['ALICEO2_CCDB_LOCALCACHE'] = environ.get('ALICEO2_CCDB_LOCALCACHE')
env_dict['IGNORE_VALIDITYCHECK_OF_CCDB_LOCALCACHE'] = '${ALICEO2_CCDB_LOCALCACHE:+"ON"}'

if keys_values:
# keys_values takes priority in case of same keys
env_dict |= keys_values

t = createTask(name = '__global_init_task__')
t['cmd'] = 'NO-COMMAND'
t['env'] = envdict
t['env'] = env_dict
return t


def summary_workflow(workflow):
print("=== WORKFLOW SUMMARY ===\n")
print(f"-> There are {len(workflow)} tasks")
Expand Down

0 comments on commit e3709fc

Please sign in to comment.