Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SimWF] Recompute number of workers used in TFs #1543

Merged
merged 1 commit into from
Mar 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 21 additions & 15 deletions MC/bin/o2dpg_sim_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@

sys.path.append(join(dirname(__file__), '.', 'o2dpg_workflow_utils'))

from o2dpg_workflow_utils import createTask, createGlobalInitTask, dump_workflow, adjust_RECO_environment, isActive, activate_detector, deactivate_detector
from o2dpg_workflow_utils import createTask, createGlobalInitTask, dump_workflow, adjust_RECO_environment, isActive, activate_detector, deactivate_detector, compute_n_workers
from o2dpg_qc_finalization_workflow import include_all_QC_finalization
from o2dpg_sim_config import create_sim_config, create_geant_config, constructConfigKeyArg

Expand Down Expand Up @@ -90,7 +90,10 @@
parser.add_argument('--production-offset',help='Offset determining bunch-crossing '
+ ' range within a (GRID) production. This number sets first orbit to '
+ 'Offset x Number of TimeFrames x OrbitsPerTimeframe (up for further sophistication)', default=0)
parser.add_argument('-j',help='number of workers (if applicable)', default=8, type=int)
parser.add_argument('-j', '--n-workers', dest='n_workers', help='number of workers (if applicable)', default=8, type=int)
parser.add_argument('--force-n-workers', dest='force_n_workers', action='store_true', help='by default, number of workers is re-computed '
'for given interaction rate if --pregenCollContext is set; '
'pass this to avoid that')
parser.add_argument('-mod',help='Active modules (deprecated)', default='--skipModules ZDC')
parser.add_argument('--with-ZDC', action='store_true', help='Enable ZDC in workflow')
parser.add_argument('-seed',help='random seed number', default=None)
Expand Down Expand Up @@ -313,7 +316,7 @@ def extractVertexArgs(configKeyValuesStr, finalDiamondDict):
args.timestamp = args.sor

NTIMEFRAMES=int(args.tf)
NWORKERS=args.j
NWORKERS=args.n_workers
MODULES = "--skipModules ZDC" if not isActive("ZDC") else ""
SIMENGINE=args.e
BFIELD=args.field
Expand Down Expand Up @@ -638,6 +641,9 @@ def getDPL_global_options(bigshm=False, ccdbbackend=True):
QEDdigiargs=' --simPrefixQED qed_' + str(tf) + ' --qed-x-section-ratio ' + str(QEDXSecExpected/PbPbXSec)
workflow['stages'].append(QED_task)

# recompute the number of workers to increase CPU efficiency
NWORKERS_TF = compute_n_workers(INTRATE, COLTYPE) if (args.pregenCollContext and not args.force_n_workers) else NWORKERS

# produce the signal configuration
SGN_CONFIG_task=createTask(name='gensgnconf_'+str(tf), tf=tf, cwd=timeframeworkdir)
SGN_CONFIG_task['cmd'] = 'echo "placeholder / dummy task"'
Expand Down Expand Up @@ -695,9 +701,9 @@ def getDPL_global_options(bigshm=False, ccdbbackend=True):
else:
signalneeds = signalneeds + [ BKG_HEADER_task['name'] ]
sgnmem = 6000 if COLTYPE == 'PbPb' else 4000
SGNtask=createTask(name='sgnsim_'+str(tf), needs=signalneeds, tf=tf, cwd='tf'+str(tf), lab=["GEANT"], relative_cpu=7/8, n_workers=NWORKERS, mem=str(sgnmem))
SGNtask=createTask(name='sgnsim_'+str(tf), needs=signalneeds, tf=tf, cwd='tf'+str(tf), lab=["GEANT"], relative_cpu=7/8, n_workers=NWORKERS_TF, mem=str(sgnmem))
SGNtask['cmd']='${O2_ROOT}/bin/o2-sim -e ' + str(SIMENGINE) + ' ' + str(MODULES) + ' -n ' + str(NSIGEVENTS) + ' --seed ' + str(TFSEED) \
+ ' --field ccdb -j ' + str(NWORKERS) + ' -g ' + str(GENERATOR) \
+ ' --field ccdb -j ' + str(NWORKERS_TF) + ' -g ' + str(GENERATOR) \
+ ' ' + str(TRIGGER) + ' ' + str(CONFKEY) + ' ' + str(INIFILE) \
+ ' -o ' + signalprefix + ' ' + embeddinto \
+ ('', ' --timestamp ' + str(args.timestamp))[args.timestamp!=-1] + ' --run ' + str(args.run) \
Expand Down Expand Up @@ -846,10 +852,10 @@ def putConfigValuesNew(listOfMainKeys=[], localCF = {}):

tpcdigimem = 12000 if havePbPb else 9000
TPCDigitask=createTask(name='tpcdigi_'+str(tf), needs=tpcdigineeds,
tf=tf, cwd=timeframeworkdir, lab=["DIGI"], cpu=NWORKERS, mem=str(tpcdigimem))
tf=tf, cwd=timeframeworkdir, lab=["DIGI"], cpu=NWORKERS_TF, mem=str(tpcdigimem))
TPCDigitask['cmd'] = ('','ln -nfs ../bkg_HitsTPC.root . ;')[doembedding]
TPCDigitask['cmd'] += '${O2_ROOT}/bin/o2-sim-digitizer-workflow ' + getDPL_global_options() + ' -n ' + str(args.ns) + simsoption \
+ ' --onlyDet TPC --TPCuseCCDB --interactionRate ' + str(INTRATE) + ' --tpc-lanes ' + str(NWORKERS) \
+ ' --onlyDet TPC --TPCuseCCDB --interactionRate ' + str(INTRATE) + ' --tpc-lanes ' + str(NWORKERS_TF) \
+ ' --incontext ' + str(CONTEXTFILE) + ' --disable-write-ini --early-forward-policy always --forceSelectedDets ' \
+ putConfigValuesNew(["TPCGasParam","TPCGEMParam","TPCEleParam","TPCITCorr","TPCDetParam"],
localCF={"DigiParams.maxOrbitsToDigitize" : str(orbitsPerTF), "DigiParams.seed" : str(TFSEED)})
Expand All @@ -864,11 +870,11 @@ def putConfigValuesNew(listOfMainKeys=[], localCF = {}):
if usebkgcache:
trddigineeds += [ BKG_HITDOWNLOADER_TASKS['TRD']['name'] ]
TRDDigitask=createTask(name='trddigi_'+str(tf), needs=trddigineeds,
tf=tf, cwd=timeframeworkdir, lab=["DIGI"], cpu=NWORKERS, mem='8000')
tf=tf, cwd=timeframeworkdir, lab=["DIGI"], cpu=NWORKERS_TF, mem='8000')
TRDDigitask['cmd'] = ('','ln -nfs ../bkg_HitsTRD.root . ;')[doembedding]
TRDDigitask['cmd'] += '${O2_ROOT}/bin/o2-sim-digitizer-workflow ' + getDPL_global_options() + ' -n ' + str(args.ns) + simsoption \
+ ' --onlyDet TRD --interactionRate ' + str(INTRATE) + ' --incontext ' + str(CONTEXTFILE) + ' --disable-write-ini' \
+ putConfigValuesNew(localCF={"TRDSimParams.digithreads" : NWORKERS, "DigiParams.seed" : str(TFSEED)}) + " --forceSelectedDets"
+ putConfigValuesNew(localCF={"TRDSimParams.digithreads" : NWORKERS_TF, "DigiParams.seed" : str(TFSEED)}) + " --forceSelectedDets"
TRDDigitask['cmd'] += ('',' --disable-mc')[args.no_mc_labels]
workflow['stages'].append(TRDDigitask)

Expand Down Expand Up @@ -958,7 +964,7 @@ def getDigiTaskName(det):
taskname = 'tpcclusterpart' + str((int)(s/sectorpertask)) + '_' + str(tf)
tpcclustertasks.append(taskname)
tpcclussect = createTask(name=taskname, needs=[TPCDigitask['name']], tf=tf, cwd=timeframeworkdir, lab=["RECO"], cpu='2', mem='8000')
digitmergerstr = '${O2_ROOT}/bin/o2-tpc-chunkeddigit-merger --tpc-sectors ' + str(s)+'-'+str(s+sectorpertask-1) + ' --tpc-lanes ' + str(NWORKERS) + ' | '
digitmergerstr = '${O2_ROOT}/bin/o2-tpc-chunkeddigit-merger --tpc-sectors ' + str(s)+'-'+str(s+sectorpertask-1) + ' --tpc-lanes ' + str(NWORKERS_TF) + ' | '
tpcclussect['cmd'] = (digitmergerstr,'')[args.no_tpc_digitchunking] + ' ${O2_ROOT}/bin/o2-tpc-reco-workflow ' + getDPL_global_options(bigshm=True) + ' --input-type ' + ('digitizer','digits')[args.no_tpc_digitchunking] + ' --output-type clusters,send-clusters-per-sector --tpc-native-cluster-writer \" --outfile tpc-native-clusters-part'+ str((int)(s/sectorpertask)) + '.root\" --tpc-sectors ' + str(s)+'-'+str(s+sectorpertask-1) + ' ' + putConfigValuesNew(["GPU_global"], {"GPU_proc.ompThreads" : 4}) + ('',' --disable-mc')[args.no_mc_labels]
tpcclussect['env'] = { "OMP_NUM_THREADS" : "4", "SHMSIZE" : "16000000000" }
tpcclussect['semaphore'] = "tpctriggers.root"
Expand All @@ -970,16 +976,16 @@ def getDigiTaskName(det):
workflow['stages'].append(TPCCLUSMERGEtask)
tpcreconeeds.append(TPCCLUSMERGEtask['name'])
else:
tpcclus = createTask(name='tpccluster_' + str(tf), needs=[TPCDigitask['name']], tf=tf, cwd=timeframeworkdir, lab=["RECO"], cpu=NWORKERS, mem='2000')
tpcclus['cmd'] = '${O2_ROOT}/bin/o2-tpc-chunkeddigit-merger --tpc-lanes ' + str(NWORKERS)
tpcclus = createTask(name='tpccluster_' + str(tf), needs=[TPCDigitask['name']], tf=tf, cwd=timeframeworkdir, lab=["RECO"], cpu=NWORKERS_TF, mem='2000')
tpcclus['cmd'] = '${O2_ROOT}/bin/o2-tpc-chunkeddigit-merger --tpc-lanes ' + str(NWORKERS_TF)
tpcclus['cmd'] += ' | ${O2_ROOT}/bin/o2-tpc-reco-workflow ' + getDPL_global_options() + ' --input-type digitizer --output-type clusters,send-clusters-per-sector ' + putConfigValuesNew(["GPU_global","TPCGasParam","TPCCorrMap"],{"GPU_proc.ompThreads" : 1}) + ('',' --disable-mc')[args.no_mc_labels]
workflow['stages'].append(tpcclus)
tpcreconeeds.append(tpcclus['name'])

tpc_corr_scaling_options = anchorConfig.get('tpc-corr-scaling','')
TPCRECOtask=createTask(name='tpcreco_'+str(tf), needs=tpcreconeeds, tf=tf, cwd=timeframeworkdir, lab=["RECO"], relative_cpu=3/8, mem='16000')
TPCRECOtask['cmd'] = '${O2_ROOT}/bin/o2-tpc-reco-workflow ' + getDPL_global_options(bigshm=True) + ' --input-type clusters --output-type tracks,send-clusters-per-sector ' \
+ putConfigValuesNew(["GPU_global","TPCGasParam", "TPCCorrMap", "GPU_rec_tpc", "trackTuneParams"], {"GPU_proc.ompThreads":NWORKERS}) + ('',' --disable-mc')[args.no_mc_labels] \
+ putConfigValuesNew(["GPU_global","TPCGasParam", "TPCCorrMap", "GPU_rec_tpc", "trackTuneParams"], {"GPU_proc.ompThreads":NWORKERS_TF}) + ('',' --disable-mc')[args.no_mc_labels] \
+ tpc_corr_scaling_options
workflow['stages'].append(TPCRECOtask)

Expand Down Expand Up @@ -1142,7 +1148,7 @@ def getDigiTaskName(det):
pvfinder_matching_sources = anchorConfig.get('', {}).get('vertex-track-matching-sources', 'ITS-TPC,TPC-TRD,ITS-TPC-TRD,TPC-TOF,ITS-TPC-TOF,TPC-TRD-TOF,ITS-TPC-TRD-TOF,MFT-MCH,MCH-MID,ITS,MFT,TPC,TOF,FT0,MID,EMC,PHS,CPV,ZDC,FDD,HMP,FV0,TRD,MCH,CTP')
pvfinderneeds = [TRDTRACKINGtask2['name'], FT0RECOtask['name'], FV0RECOtask['name'], EMCRECOtask['name'], PHSRECOtask['name'], CPVRECOtask['name'], FDDRECOtask['name'], ZDCRECOtask['name'], HMPMATCHtask['name'], HMPMATCHtask['name'], ITSTPCMATCHtask['name'], TOFTPCMATCHERtask['name'], MFTMCHMATCHtask['name'], MCHMIDMATCHtask['name']]

PVFINDERtask = createTask(name='pvfinder_'+str(tf), needs=pvfinderneeds, tf=tf, cwd=timeframeworkdir, lab=["RECO"], cpu=NWORKERS, mem='4000')
PVFINDERtask = createTask(name='pvfinder_'+str(tf), needs=pvfinderneeds, tf=tf, cwd=timeframeworkdir, lab=["RECO"], cpu=NWORKERS_TF, mem='4000')
PVFINDERtask['cmd'] = '${O2_ROOT}/bin/o2-primary-vertexing-workflow ' \
+ getDPL_global_options() + putConfigValuesNew(['ITSAlpideParam','MFTAlpideParam', 'pvertexer', 'TPCGasParam', 'TPCCorrMap', 'ft0tag'], {"NameConf.mDirMatLUT" : ".."})
PVFINDERtask['cmd'] += ' --vertexing-sources ' + pvfinder_sources + ' --vertex-track-matching-sources ' + pvfinder_matching_sources + (' --combine-source-devices','')[args.no_combine_dpl_devices]
Expand Down Expand Up @@ -1300,7 +1306,7 @@ def addQCPerTF(taskName, needs, readerCommand, configFilePath, objectsFile=''):
needs=[PHSRECOtask['name']],
readerCommand='o2-phos-reco-workflow --input-type cells --output-type clusters --disable-mc --disable-root-output',
configFilePath='json://${O2DPG_ROOT}/MC/config/QC/json/phs-cells-clusters-task.json')

### MID
if isActive('MID'):
addQCPerTF(taskName='MIDTaskQC',
Expand Down
22 changes: 22 additions & 0 deletions MC/bin/o2dpg_workflow_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,28 @@ def deactivate_detector(det):
def isActive(det):
return det not in INACTIVE_DETECTORS and ("all" in ACTIVE_DETECTORS or det in ACTIVE_DETECTORS)

def compute_n_workers(interaction_rate, collision_system, n_workers_user=8, n_workers_min=1, interaction_rate_linear_below=300000):
"""
Compute number of workers
n_workers = m * IR + b
based on
https://indico.cern.ch/event/1395900/contributions/5868567/attachments/2823967/4932440/20240320_slides_cpu_eff.pdf, slide 3
Assume n_workers_in=8 to be ideal for pp IR > interaction_rate_linear_below
Start with 1 worker at IR=0
Go linearly until interaction_rate_linear_below
"""
if collision_system == "PbPb" or interaction_rate >= interaction_rate_linear_below:
return n_workers_user

n_workers_min = max(1, n_workers_min)
m = (n_workers_user - n_workers_min) / interaction_rate_linear_below
# at least 1 worker
return max(1, round(m * interaction_rate + n_workers_min))

def relativeCPU(n_rel, n_workers):
# compute number of CPUs from a given number of workers
# n_workers and a fraction n_rel
Expand Down