Skip to content

Commit

Permalink
Basic implementation of event pooling in O2DPG workflow (#1760)
Browse files Browse the repository at this point in the history
* Allow to generate events for event-pool usage (no vertex applied + kinematic merging)
* Example script demonstrating simple event pool creation and reading events from pool

https://its.cern.ch/jira/browse/O2-5216
  • Loading branch information
jackal1-66 authored Oct 31, 2024
1 parent f26dfd2 commit 15049b0
Show file tree
Hide file tree
Showing 3 changed files with 140 additions and 23 deletions.
67 changes: 44 additions & 23 deletions MC/bin/o2dpg_sim_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import importlib.util
import argparse
from os import environ, mkdir
from os.path import join, dirname, isdir, isabs
from os.path import join, dirname, isdir, isabs, isfile
import random
import json
import itertools
Expand Down Expand Up @@ -61,6 +61,7 @@
parser.add_argument('-ini',help='generator init parameters file (full paths required), for example: ${O2DPG_ROOT}/MC/config/PWGHF/ini/GeneratorHF.ini', default='')
parser.add_argument('-confKey',help='generator or trigger configuration key values, for example: "GeneratorPythia8.config=pythia8.cfg;A.x=y"', default='')
parser.add_argument('--readoutDets',help='comma separated string of detectors readout (does not modify material budget - only hit creation)', default='all')
parser.add_argument('--make-evtpool', help='Generate workflow for event pool creation.', action='store_true')

parser.add_argument('-interactionRate',help='Interaction rate, used in digitization', default=-1)
parser.add_argument('-bcPatternFile',help='Bunch crossing pattern file, used in digitization (a file name or "ccdb")', default='')
Expand Down Expand Up @@ -699,9 +700,14 @@ def getDPL_global_options(bigshm=False, ccdbbackend=True):
# possible generator)

workflow['stages'].append(SGN_CONFIG_task)

# default flags for extkinO2 signal simulation (no transport)
extkinO2Config = ''
if GENERATOR == 'extkinO2':
extkinO2Config = ';GeneratorFromO2Kine.randomize=true;GeneratorFromO2Kine.rngseed=' + str(TFSEED)

# determine final conf key for signal simulation
CONFKEY = constructConfigKeyArg(create_geant_config(args, args.confKey))
CONFKEY = constructConfigKeyArg(create_geant_config(args, args.confKey + extkinO2Config))
# -----------------
# transport signals
# -----------------
Expand Down Expand Up @@ -741,7 +747,11 @@ def getDPL_global_options(bigshm=False, ccdbbackend=True):
# determine the skip number
cmd = 'export HEPMCEVENTSKIP=$(${O2DPG_ROOT}/UTILS/ReadHepMCEventSkip.sh ../HepMCEventSkip.json ' + str(tf) + ');'
SGNGENtask['cmd'] = cmd
SGNGENtask['cmd'] +='${O2_ROOT}/bin/o2-sim --noGeant -j 1 --field ccdb --vertexMode kCCDB' \

# No vertexing for event pool generation
vtxmode = 'kNoVertex' if args.make_evtpool else 'kCCDB'

SGNGENtask['cmd'] +='${O2_ROOT}/bin/o2-sim --noGeant -j 1 --field ccdb --vertexMode ' + vtxmode \
+ ' --run ' + str(args.run) + ' ' + str(CONFKEY) + str(TRIGGER) \
+ ' -g ' + str(GENERATOR) + ' ' + str(INIFILE) + ' -o genevents ' + embeddinto \
+ ('', ' --timestamp ' + str(args.timestamp))[args.timestamp!=-1] \
Expand All @@ -754,6 +764,11 @@ def getDPL_global_options(bigshm=False, ccdbbackend=True):
if sep_event_mode == True:
workflow['stages'].append(SGNGENtask)
signalneeds = signalneeds + [SGNGENtask['name']]
if args.make_evtpool:
continue

# GeneratorFromO2Kine parameters are needed only before the transport
CONFKEY = re.sub(r'GeneratorFromO2Kine.*?;', '', CONFKEY)

sgnmem = 6000 if COLTYPE == 'PbPb' else 4000
SGNtask=createTask(name='sgnsim_'+str(tf), needs=signalneeds, tf=tf, cwd='tf'+str(tf), lab=["GEANT"],
Expand Down Expand Up @@ -1520,26 +1535,32 @@ def addQCPerTF(taskName, needs, readerCommand, configFilePath, objectsFile=''):
TFcleanup['cmd'] += 'rm *cluster*.root'
workflow['stages'].append(TFcleanup)

# AOD merging as one global final step
aodmergerneeds = ['aod_' + str(tf) for tf in range(1, NTIMEFRAMES + 1)]
AOD_merge_task = createTask(name='aodmerge', needs = aodmergerneeds, lab=["AOD"], mem='2000', cpu='1')
AOD_merge_task['cmd'] = ' set -e ; [ -f aodmerge_input.txt ] && rm aodmerge_input.txt; '
AOD_merge_task['cmd'] += ' for i in `seq 1 ' + str(NTIMEFRAMES) + '`; do echo "tf${i}/AO2D.root" >> aodmerge_input.txt; done; '
AOD_merge_task['cmd'] += ' o2-aod-merger --input aodmerge_input.txt --output AO2D.root'
# produce MonaLisa event stat file
AOD_merge_task['cmd'] += ' ; ${O2DPG_ROOT}/MC/bin/o2dpg_determine_eventstat.py'
workflow['stages'].append(AOD_merge_task)

job_merging = False
if includeFullQC:
workflow['stages'].extend(include_all_QC_finalization(ntimeframes=NTIMEFRAMES, standalone=False, run=args.run, productionTag=args.productionTag, conditionDB=args.conditionDB, qcdbHost=args.qcdbHost))


if includeAnalysis:
# include analyses and potentially final QC upload tasks
add_analysis_tasks(workflow["stages"], needs=[AOD_merge_task["name"]], is_mc=True, collision_system=COLTYPE)
if QUALITYCONTROL_ROOT:
add_analysis_qc_upload_tasks(workflow["stages"], args.productionTag, args.run, "passMC")
if not args.make_evtpool:
# AOD merging as one global final step
aodmergerneeds = ['aod_' + str(tf) for tf in range(1, NTIMEFRAMES + 1)]
AOD_merge_task = createTask(name='aodmerge', needs = aodmergerneeds, lab=["AOD"], mem='2000', cpu='1')
AOD_merge_task['cmd'] = ' set -e ; [ -f aodmerge_input.txt ] && rm aodmerge_input.txt; '
AOD_merge_task['cmd'] += ' for i in `seq 1 ' + str(NTIMEFRAMES) + '`; do echo "tf${i}/AO2D.root" >> aodmerge_input.txt; done; '
AOD_merge_task['cmd'] += ' o2-aod-merger --input aodmerge_input.txt --output AO2D.root'
# produce MonaLisa event stat file
AOD_merge_task['cmd'] += ' ; ${O2DPG_ROOT}/MC/bin/o2dpg_determine_eventstat.py'
workflow['stages'].append(AOD_merge_task)

job_merging = False
if includeFullQC:
workflow['stages'].extend(include_all_QC_finalization(ntimeframes=NTIMEFRAMES, standalone=False, run=args.run, productionTag=args.productionTag, conditionDB=args.conditionDB, qcdbHost=args.qcdbHost))

if includeAnalysis:
# include analyses and potentially final QC upload tasks
add_analysis_tasks(workflow["stages"], needs=[AOD_merge_task["name"]], is_mc=True, collision_system=COLTYPE)
if QUALITYCONTROL_ROOT:
add_analysis_qc_upload_tasks(workflow["stages"], args.productionTag, args.run, "passMC")
else:
wfneeds=['sgngen_' + str(tf) for tf in range(1, NTIMEFRAMES + 1)]
tfpool=['tf' + str(tf) + '/genevents_Kine.root' for tf in range(1, NTIMEFRAMES + 1)]
POOL_merge_task = createTask(name='poolmerge', needs=wfneeds, lab=["POOL"], mem='2000', cpu='1')
POOL_merge_task['cmd'] = '${O2DPG_ROOT}/UTILS/root_merger.py -o evtpool.root -i ' + ','.join(tfpool)
workflow['stages'].append(POOL_merge_task)

# adjust for alternate (RECO) software environments
adjust_RECO_environment(workflow, args.alternative_reco_software)
Expand Down
55 changes: 55 additions & 0 deletions MC/run/examples/event_pool.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
#!/usr/bin/env bash

# Example on how to produce an event pool and how to feed it
# to the O2DPG simulation workflow

# make sure O2DPG + O2 is loaded
[ ! "${O2DPG_ROOT}" ] && echo "Error: This needs O2DPG loaded" && exit 1
[ ! "${O2_ROOT}" ] && echo "Error: This needs O2 loaded" && exit 1

# Parse arguments
MAKE=false
INPUT=""

help() {
echo "Usage: $0 [--make] [-i|--input <input_file>]"
echo " --make: Create the event pool"
echo " -i|--input: Input event pool file to be used in the simulation workflow. Alien paths are supported."
echo " A full path must be provided (use of environment variables allowed), otherwise generation will fail."
echo " -h|--help: Display this help message"
exit 0
}

while [[ "$#" -gt 0 ]]; do
case $1 in
--make) MAKE=true ;;
-i|--input) INPUT="$2"; shift ;;
-h|--help) help ;;
*) echo "Unknown operation requested: $1"; help ;;
esac
shift
done

if $MAKE; then
echo "Started generation of event pool"
# Workflow creation. All the parameters are used as examples
# No transport will be executed. The workflow will stop at the event generation and will conclude with the merging of all the
# kinematic root files of the timeframes in a file called evtpool.root in the current working directory
${O2DPG_ROOT}/MC/bin/o2dpg_sim_workflow.py -eCM 14000 -col pp -gen pythia8 -proc cdiff -tf 2 -ns 5000 --make-evtpool -seed 546 -interactionRate 500000 -productionTag "evtpoolcreation" -o evtpool
# Workflow runner
${O2DPG_ROOT}/MC/bin/o2dpg_workflow_runner.py -f evtpool.json -tt pool
elif [[ -n "$INPUT" ]]; then
echo "Input file provided: $INPUT"
if [[ -f "$INPUT" && -s "$INPUT" ]] || [[ "$INPUT" == alien://* ]]; then
# Workflow creation. Phi Rotation is set manually, while the event randomisation of the pool is set by default
${O2DPG_ROOT}/MC/bin/o2dpg_sim_workflow.py -eCM 14000 -confKey "GeneratorFromO2Kine.randomphi=true;GeneratorFromO2Kine.fileName=$INPUT" -gen extkinO2 -tf 2 -ns 10 -e TGeant4 -j 4 -interactionRate 500000 -seed 546 -productionTag "evtpooltest"
# Workflow runner. The rerun option is set in case you will run directly the script in the same folder (no need to manually delete files)
${O2DPG_ROOT}/MC/bin/o2dpg_workflow_runner.py -f workflow.json -tt aod --rerun-from grpcreate
else
echo "Error: File does not exist or is empty: $INPUT"
exit 1
fi
else
echo "Usage: $0 [--make] [-i|--input <input_file>]"
exit 1
fi
41 changes: 41 additions & 0 deletions UTILS/root_merger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
#!/usr/bin/env python3

# Simple ROOT files merger

from ROOT import TFile, TFileMerger
import sys
import os
import argparse

output_file = ''
input_files = []
# defining command line options

parser = argparse.ArgumentParser(description='Simple ROOT files merger',
formatter_class=argparse.ArgumentDefaultsHelpFormatter)

parser.add_argument('-o','--output', help='Output ROOT filename', required=True)
parser.add_argument('-i','--input', help='Input ROOT files to be merged, separated by a comma', required=True)

args = parser.parse_args()

output_file = args.output
input_files = args.input.split(',')

merger = TFileMerger(False)
merger.OutputFile(output_file)

for input_file in input_files:
if os.path.exists(input_file):
merger.AddFile(input_file)
else:
print(f"Fatal: {input_file} does not exist.")
sys.exit(1)

if not merger.Merge():
print("Error: Merging failed.")
sys.exit(2)
else:
print(f"Successfully merged files into {output_file}")

sys.exit(0)

0 comments on commit 15049b0

Please sign in to comment.