From c1a8ba7232d06fa7171d4ce659890085d9cc65f9 Mon Sep 17 00:00:00 2001 From: Benedikt Volkel Date: Fri, 1 Mar 2024 17:16:55 +0100 Subject: [PATCH] [SimWF] Use __global_init_task__ more consistently * centralise function that creates the task * apply also when using AnalysisQC CLI --- .../o2dpg_analysis_test_workflow.py | 8 +++-- MC/bin/o2dpg_sim_workflow.py | 18 ++-------- MC/bin/o2dpg_workflow_utils.py | 33 +++++++++++++++++-- 3 files changed, 39 insertions(+), 20 deletions(-) diff --git a/MC/analysis_testing/o2dpg_analysis_test_workflow.py b/MC/analysis_testing/o2dpg_analysis_test_workflow.py index c50ed6999..c1931911b 100755 --- a/MC/analysis_testing/o2dpg_analysis_test_workflow.py +++ b/MC/analysis_testing/o2dpg_analysis_test_workflow.py @@ -80,7 +80,7 @@ o2dpg_workflow_utils = importlib.util.module_from_spec(spec) sys.modules[module_name] = o2dpg_workflow_utils spec.loader.exec_module(o2dpg_workflow_utils) -from o2dpg_workflow_utils import createTask, dump_workflow +from o2dpg_workflow_utils import createTask, dump_workflow, createGlobalInitTask module_name = "o2dpg_analysis_test_utils" spec = importlib.util.spec_from_file_location(module_name, join(O2DPG_ROOT, "MC", "analysis_testing", "o2dpg_analysis_test_utils.py")) @@ -321,7 +321,9 @@ def run(args): print("ERROR: QC upload was requested, however in that case a --pass-name and --period-name are required") return 1 - workflow = [] + ### setup global environment variables which are valid for all tasks, set as first task + global_env = {"ALICEO2_CCDB_CONDITION_NOT_AFTER": args.condition_not_after} if args.condition_not_after else None + workflow = [createGlobalInitTask(global_env)] add_analysis_tasks(workflow, args.input_file, expanduser(args.analysis_dir), is_mc=args.is_mc, analyses_only=args.only_analyses, autoset_converters=args.autoset_converters, include_disabled_analyses=args.include_disabled, timeout=args.timeout, collision_system=args.collision_system, add_common_args=args.add_common_args) if args.with_qc_upload: add_analysis_qc_upload_tasks(workflow, args.period_name, args.run_number, args.pass_name) @@ -349,6 +351,8 @@ def main(): parser.add_argument("--timeout", type=int, default=None, help="Timeout for analysis tasks in seconds.") parser.add_argument("--collision-system", dest="collision_system", help="Set the collision system. If not set, tried to be derived from ALIEN_JDL_LPMInterationType. Fallback to pp") parser.add_argument("--add-common-args", dest="add_common_args", nargs="*", help="Pass additional common arguments per analysis, for instance --add-common-args EMCAL-shm-segment-size 2500000000 will add --shm-segment-size 2500000000 to the EMCAL analysis") + parser.add_argument('--condition-not-after', dest="condition_not_after", type=int, help="only consider CCDB objects not created after this timestamp (for TimeMachine)", default=3385078236000) + parser.set_defaults(func=run) args = parser.parse_args() return(args.func(args)) diff --git a/MC/bin/o2dpg_sim_workflow.py b/MC/bin/o2dpg_sim_workflow.py index d0812d42c..31d2ad06a 100755 --- a/MC/bin/o2dpg_sim_workflow.py +++ b/MC/bin/o2dpg_sim_workflow.py @@ -20,7 +20,7 @@ import sys import importlib.util import argparse -from os import environ, mkdir, getcwd +from os import environ, mkdir from os.path import join, dirname, isdir, isabs import random import json @@ -326,20 +326,8 @@ def extractVertexArgs(configKeyValuesStr, finalDiamondDict): workflow['stages'] = [] ### setup global environment variables which are valid for all tasks -globalenv = {} -if args.condition_not_after: - # this is for the time-machine CCDB mechanism - globalenv['ALICEO2_CCDB_CONDITION_NOT_AFTER'] = args.condition_not_after - # this is enforcing the use of local CCDB caching - if environ.get('ALICEO2_CCDB_LOCALCACHE') == None: - print ("ALICEO2_CCDB_LOCALCACHE not set; setting to default " + getcwd() + '/ccdb') - globalenv['ALICEO2_CCDB_LOCALCACHE'] = getcwd() + "/ccdb" - else: - # fixes the workflow to use and remember externally provided path - globalenv['ALICEO2_CCDB_LOCALCACHE'] = environ.get('ALICEO2_CCDB_LOCALCACHE') - globalenv['IGNORE_VALIDITYCHECK_OF_CCDB_LOCALCACHE'] = '${ALICEO2_CCDB_LOCALCACHE:+"ON"}' - -globalinittask = createGlobalInitTask(globalenv) +global_env = {'ALICEO2_CCDB_CONDITION_NOT_AFTER': args.condition_not_after} if args.condition_not_after else None +globalinittask = createGlobalInitTask(global_env) globalinittask['cmd'] = 'o2-ccdb-cleansemaphores -p ${ALICEO2_CCDB_LOCALCACHE}' workflow['stages'].append(globalinittask) #### diff --git a/MC/bin/o2dpg_workflow_utils.py b/MC/bin/o2dpg_workflow_utils.py index 748129de2..18fd600c9 100755 --- a/MC/bin/o2dpg_workflow_utils.py +++ b/MC/bin/o2dpg_workflow_utils.py @@ -1,5 +1,6 @@ #!/usr/bin/env python3 +from os import environ, getcwd from copy import deepcopy import json @@ -84,18 +85,44 @@ def createTask(name='', needs=[], tf=-1, cwd='./', lab=[], cpu=1, relative_cpu=N 'cwd' : cwd } -def createGlobalInitTask(envdict): +def createGlobalInitTask(keys_values=None, set_defaults=True): """Returns a special task that is recognized by the executor as a task whose environment section is to be globally applied to all tasks of a workflow. - envdict: dictionary of environment variables and values to be globally applied to all tasks + Args: + keys_values: dict or None + dictionary of environment variables and values to be globally applied to all tasks + if sharing keys with defaults, keys_values takes precedence + set_defaults: bool + whether or not some default values will be added + + Returns: + dict: task dictionary """ + + # dictionary holding global environment to be passed to task + env_dict = {} + + if set_defaults: + if environ.get('ALICEO2_CCDB_LOCALCACHE') is None: + print ("ALICEO2_CCDB_LOCALCACHE not set; setting to default " + getcwd() + '/ccdb') + env_dict['ALICEO2_CCDB_LOCALCACHE'] = getcwd() + "/ccdb" + else: + # fixes the workflow to use and remember externally provided path + env_dict['ALICEO2_CCDB_LOCALCACHE'] = environ.get('ALICEO2_CCDB_LOCALCACHE') + env_dict['IGNORE_VALIDITYCHECK_OF_CCDB_LOCALCACHE'] = '${ALICEO2_CCDB_LOCALCACHE:+"ON"}' + + if keys_values: + # keys_values takes priority in case of same keys + env_dict |= keys_values + t = createTask(name = '__global_init_task__') t['cmd'] = 'NO-COMMAND' - t['env'] = envdict + t['env'] = env_dict return t + def summary_workflow(workflow): print("=== WORKFLOW SUMMARY ===\n") print(f"-> There are {len(workflow)} tasks")