diff --git a/MC/bin/o2dpg_sim_workflow.py b/MC/bin/o2dpg_sim_workflow.py index 7e018485f..bf131753e 100755 --- a/MC/bin/o2dpg_sim_workflow.py +++ b/MC/bin/o2dpg_sim_workflow.py @@ -34,7 +34,7 @@ sys.path.append(join(dirname(__file__), '.', 'o2dpg_workflow_utils')) -from o2dpg_workflow_utils import createTask, createGlobalInitTask, dump_workflow, adjust_RECO_environment, isActive, activate_detector, deactivate_detector +from o2dpg_workflow_utils import createTask, createGlobalInitTask, dump_workflow, adjust_RECO_environment, isActive, activate_detector, deactivate_detector, compute_n_workers from o2dpg_qc_finalization_workflow import include_all_QC_finalization from o2dpg_sim_config import create_sim_config, create_geant_config, constructConfigKeyArg @@ -90,7 +90,10 @@ parser.add_argument('--production-offset',help='Offset determining bunch-crossing ' + ' range within a (GRID) production. This number sets first orbit to ' + 'Offset x Number of TimeFrames x OrbitsPerTimeframe (up for further sophistication)', default=0) -parser.add_argument('-j',help='number of workers (if applicable)', default=8, type=int) +parser.add_argument('-j', '--n-workers', dest='n_workers', help='number of workers (if applicable)', default=8, type=int) +parser.add_argument('--force-n-workers', dest='force_n_workers', action='store_true', help='by default, number of workers is re-computed ' + 'for given interaction rate if --pregenCollContext is set; ' + 'pass this to avoid that') parser.add_argument('-mod',help='Active modules (deprecated)', default='--skipModules ZDC') parser.add_argument('--with-ZDC', action='store_true', help='Enable ZDC in workflow') parser.add_argument('-seed',help='random seed number', default=None) @@ -313,7 +316,7 @@ def extractVertexArgs(configKeyValuesStr, finalDiamondDict): args.timestamp = args.sor NTIMEFRAMES=int(args.tf) -NWORKERS=args.j +NWORKERS=args.n_workers MODULES = "--skipModules ZDC" if not isActive("ZDC") else "" SIMENGINE=args.e BFIELD=args.field @@ -638,6 +641,9 @@ def getDPL_global_options(bigshm=False, ccdbbackend=True): QEDdigiargs=' --simPrefixQED qed_' + str(tf) + ' --qed-x-section-ratio ' + str(QEDXSecExpected/PbPbXSec) workflow['stages'].append(QED_task) + # recompute the number of workers to increase CPU efficiency + NWORKERS_TF = compute_n_workers(INTRATE, COLTYPE) if (args.pregenCollContext and not args.force_n_workers) else NWORKERS + # produce the signal configuration SGN_CONFIG_task=createTask(name='gensgnconf_'+str(tf), tf=tf, cwd=timeframeworkdir) SGN_CONFIG_task['cmd'] = 'echo "placeholder / dummy task"' @@ -695,9 +701,9 @@ def getDPL_global_options(bigshm=False, ccdbbackend=True): else: signalneeds = signalneeds + [ BKG_HEADER_task['name'] ] sgnmem = 6000 if COLTYPE == 'PbPb' else 4000 - SGNtask=createTask(name='sgnsim_'+str(tf), needs=signalneeds, tf=tf, cwd='tf'+str(tf), lab=["GEANT"], relative_cpu=7/8, n_workers=NWORKERS, mem=str(sgnmem)) + SGNtask=createTask(name='sgnsim_'+str(tf), needs=signalneeds, tf=tf, cwd='tf'+str(tf), lab=["GEANT"], relative_cpu=7/8, n_workers=NWORKERS_TF, mem=str(sgnmem)) SGNtask['cmd']='${O2_ROOT}/bin/o2-sim -e ' + str(SIMENGINE) + ' ' + str(MODULES) + ' -n ' + str(NSIGEVENTS) + ' --seed ' + str(TFSEED) \ - + ' --field ccdb -j ' + str(NWORKERS) + ' -g ' + str(GENERATOR) \ + + ' --field ccdb -j ' + str(NWORKERS_TF) + ' -g ' + str(GENERATOR) \ + ' ' + str(TRIGGER) + ' ' + str(CONFKEY) + ' ' + str(INIFILE) \ + ' -o ' + signalprefix + ' ' + embeddinto \ + ('', ' --timestamp ' + str(args.timestamp))[args.timestamp!=-1] + ' --run ' + str(args.run) \ @@ -846,10 +852,10 @@ def putConfigValuesNew(listOfMainKeys=[], localCF = {}): tpcdigimem = 12000 if havePbPb else 9000 TPCDigitask=createTask(name='tpcdigi_'+str(tf), needs=tpcdigineeds, - tf=tf, cwd=timeframeworkdir, lab=["DIGI"], cpu=NWORKERS, mem=str(tpcdigimem)) + tf=tf, cwd=timeframeworkdir, lab=["DIGI"], cpu=NWORKERS_TF, mem=str(tpcdigimem)) TPCDigitask['cmd'] = ('','ln -nfs ../bkg_HitsTPC.root . ;')[doembedding] TPCDigitask['cmd'] += '${O2_ROOT}/bin/o2-sim-digitizer-workflow ' + getDPL_global_options() + ' -n ' + str(args.ns) + simsoption \ - + ' --onlyDet TPC --TPCuseCCDB --interactionRate ' + str(INTRATE) + ' --tpc-lanes ' + str(NWORKERS) \ + + ' --onlyDet TPC --TPCuseCCDB --interactionRate ' + str(INTRATE) + ' --tpc-lanes ' + str(NWORKERS_TF) \ + ' --incontext ' + str(CONTEXTFILE) + ' --disable-write-ini --early-forward-policy always --forceSelectedDets ' \ + putConfigValuesNew(["TPCGasParam","TPCGEMParam","TPCEleParam","TPCITCorr","TPCDetParam"], localCF={"DigiParams.maxOrbitsToDigitize" : str(orbitsPerTF), "DigiParams.seed" : str(TFSEED)}) @@ -864,11 +870,11 @@ def putConfigValuesNew(listOfMainKeys=[], localCF = {}): if usebkgcache: trddigineeds += [ BKG_HITDOWNLOADER_TASKS['TRD']['name'] ] TRDDigitask=createTask(name='trddigi_'+str(tf), needs=trddigineeds, - tf=tf, cwd=timeframeworkdir, lab=["DIGI"], cpu=NWORKERS, mem='8000') + tf=tf, cwd=timeframeworkdir, lab=["DIGI"], cpu=NWORKERS_TF, mem='8000') TRDDigitask['cmd'] = ('','ln -nfs ../bkg_HitsTRD.root . ;')[doembedding] TRDDigitask['cmd'] += '${O2_ROOT}/bin/o2-sim-digitizer-workflow ' + getDPL_global_options() + ' -n ' + str(args.ns) + simsoption \ + ' --onlyDet TRD --interactionRate ' + str(INTRATE) + ' --incontext ' + str(CONTEXTFILE) + ' --disable-write-ini' \ - + putConfigValuesNew(localCF={"TRDSimParams.digithreads" : NWORKERS, "DigiParams.seed" : str(TFSEED)}) + " --forceSelectedDets" + + putConfigValuesNew(localCF={"TRDSimParams.digithreads" : NWORKERS_TF, "DigiParams.seed" : str(TFSEED)}) + " --forceSelectedDets" TRDDigitask['cmd'] += ('',' --disable-mc')[args.no_mc_labels] workflow['stages'].append(TRDDigitask) @@ -958,7 +964,7 @@ def getDigiTaskName(det): taskname = 'tpcclusterpart' + str((int)(s/sectorpertask)) + '_' + str(tf) tpcclustertasks.append(taskname) tpcclussect = createTask(name=taskname, needs=[TPCDigitask['name']], tf=tf, cwd=timeframeworkdir, lab=["RECO"], cpu='2', mem='8000') - digitmergerstr = '${O2_ROOT}/bin/o2-tpc-chunkeddigit-merger --tpc-sectors ' + str(s)+'-'+str(s+sectorpertask-1) + ' --tpc-lanes ' + str(NWORKERS) + ' | ' + digitmergerstr = '${O2_ROOT}/bin/o2-tpc-chunkeddigit-merger --tpc-sectors ' + str(s)+'-'+str(s+sectorpertask-1) + ' --tpc-lanes ' + str(NWORKERS_TF) + ' | ' tpcclussect['cmd'] = (digitmergerstr,'')[args.no_tpc_digitchunking] + ' ${O2_ROOT}/bin/o2-tpc-reco-workflow ' + getDPL_global_options(bigshm=True) + ' --input-type ' + ('digitizer','digits')[args.no_tpc_digitchunking] + ' --output-type clusters,send-clusters-per-sector --tpc-native-cluster-writer \" --outfile tpc-native-clusters-part'+ str((int)(s/sectorpertask)) + '.root\" --tpc-sectors ' + str(s)+'-'+str(s+sectorpertask-1) + ' ' + putConfigValuesNew(["GPU_global"], {"GPU_proc.ompThreads" : 4}) + ('',' --disable-mc')[args.no_mc_labels] tpcclussect['env'] = { "OMP_NUM_THREADS" : "4", "SHMSIZE" : "16000000000" } tpcclussect['semaphore'] = "tpctriggers.root" @@ -970,8 +976,8 @@ def getDigiTaskName(det): workflow['stages'].append(TPCCLUSMERGEtask) tpcreconeeds.append(TPCCLUSMERGEtask['name']) else: - tpcclus = createTask(name='tpccluster_' + str(tf), needs=[TPCDigitask['name']], tf=tf, cwd=timeframeworkdir, lab=["RECO"], cpu=NWORKERS, mem='2000') - tpcclus['cmd'] = '${O2_ROOT}/bin/o2-tpc-chunkeddigit-merger --tpc-lanes ' + str(NWORKERS) + tpcclus = createTask(name='tpccluster_' + str(tf), needs=[TPCDigitask['name']], tf=tf, cwd=timeframeworkdir, lab=["RECO"], cpu=NWORKERS_TF, mem='2000') + tpcclus['cmd'] = '${O2_ROOT}/bin/o2-tpc-chunkeddigit-merger --tpc-lanes ' + str(NWORKERS_TF) tpcclus['cmd'] += ' | ${O2_ROOT}/bin/o2-tpc-reco-workflow ' + getDPL_global_options() + ' --input-type digitizer --output-type clusters,send-clusters-per-sector ' + putConfigValuesNew(["GPU_global","TPCGasParam","TPCCorrMap"],{"GPU_proc.ompThreads" : 1}) + ('',' --disable-mc')[args.no_mc_labels] workflow['stages'].append(tpcclus) tpcreconeeds.append(tpcclus['name']) @@ -979,7 +985,7 @@ def getDigiTaskName(det): tpc_corr_scaling_options = anchorConfig.get('tpc-corr-scaling','') TPCRECOtask=createTask(name='tpcreco_'+str(tf), needs=tpcreconeeds, tf=tf, cwd=timeframeworkdir, lab=["RECO"], relative_cpu=3/8, mem='16000') TPCRECOtask['cmd'] = '${O2_ROOT}/bin/o2-tpc-reco-workflow ' + getDPL_global_options(bigshm=True) + ' --input-type clusters --output-type tracks,send-clusters-per-sector ' \ - + putConfigValuesNew(["GPU_global","TPCGasParam", "TPCCorrMap", "GPU_rec_tpc", "trackTuneParams"], {"GPU_proc.ompThreads":NWORKERS}) + ('',' --disable-mc')[args.no_mc_labels] \ + + putConfigValuesNew(["GPU_global","TPCGasParam", "TPCCorrMap", "GPU_rec_tpc", "trackTuneParams"], {"GPU_proc.ompThreads":NWORKERS_TF}) + ('',' --disable-mc')[args.no_mc_labels] \ + tpc_corr_scaling_options workflow['stages'].append(TPCRECOtask) @@ -1142,7 +1148,7 @@ def getDigiTaskName(det): pvfinder_matching_sources = anchorConfig.get('', {}).get('vertex-track-matching-sources', 'ITS-TPC,TPC-TRD,ITS-TPC-TRD,TPC-TOF,ITS-TPC-TOF,TPC-TRD-TOF,ITS-TPC-TRD-TOF,MFT-MCH,MCH-MID,ITS,MFT,TPC,TOF,FT0,MID,EMC,PHS,CPV,ZDC,FDD,HMP,FV0,TRD,MCH,CTP') pvfinderneeds = [TRDTRACKINGtask2['name'], FT0RECOtask['name'], FV0RECOtask['name'], EMCRECOtask['name'], PHSRECOtask['name'], CPVRECOtask['name'], FDDRECOtask['name'], ZDCRECOtask['name'], HMPMATCHtask['name'], HMPMATCHtask['name'], ITSTPCMATCHtask['name'], TOFTPCMATCHERtask['name'], MFTMCHMATCHtask['name'], MCHMIDMATCHtask['name']] - PVFINDERtask = createTask(name='pvfinder_'+str(tf), needs=pvfinderneeds, tf=tf, cwd=timeframeworkdir, lab=["RECO"], cpu=NWORKERS, mem='4000') + PVFINDERtask = createTask(name='pvfinder_'+str(tf), needs=pvfinderneeds, tf=tf, cwd=timeframeworkdir, lab=["RECO"], cpu=NWORKERS_TF, mem='4000') PVFINDERtask['cmd'] = '${O2_ROOT}/bin/o2-primary-vertexing-workflow ' \ + getDPL_global_options() + putConfigValuesNew(['ITSAlpideParam','MFTAlpideParam', 'pvertexer', 'TPCGasParam', 'TPCCorrMap', 'ft0tag'], {"NameConf.mDirMatLUT" : ".."}) PVFINDERtask['cmd'] += ' --vertexing-sources ' + pvfinder_sources + ' --vertex-track-matching-sources ' + pvfinder_matching_sources + (' --combine-source-devices','')[args.no_combine_dpl_devices] @@ -1300,7 +1306,7 @@ def addQCPerTF(taskName, needs, readerCommand, configFilePath, objectsFile=''): needs=[PHSRECOtask['name']], readerCommand='o2-phos-reco-workflow --input-type cells --output-type clusters --disable-mc --disable-root-output', configFilePath='json://${O2DPG_ROOT}/MC/config/QC/json/phs-cells-clusters-task.json') - + ### MID if isActive('MID'): addQCPerTF(taskName='MIDTaskQC', diff --git a/MC/bin/o2dpg_workflow_utils.py b/MC/bin/o2dpg_workflow_utils.py index 18fd600c9..b28333674 100755 --- a/MC/bin/o2dpg_workflow_utils.py +++ b/MC/bin/o2dpg_workflow_utils.py @@ -24,6 +24,28 @@ def deactivate_detector(det): def isActive(det): return det not in INACTIVE_DETECTORS and ("all" in ACTIVE_DETECTORS or det in ACTIVE_DETECTORS) +def compute_n_workers(interaction_rate, collision_system, n_workers_user=8, n_workers_min=1, interaction_rate_linear_below=300000): + """ + Compute number of workers + + n_workers = m * IR + b + + based on + https://indico.cern.ch/event/1395900/contributions/5868567/attachments/2823967/4932440/20240320_slides_cpu_eff.pdf, slide 3 + + Assume n_workers_in=8 to be ideal for pp IR > interaction_rate_linear_below + + Start with 1 worker at IR=0 + Go linearly until interaction_rate_linear_below + """ + if collision_system == "PbPb" or interaction_rate >= interaction_rate_linear_below: + return n_workers_user + + n_workers_min = max(1, n_workers_min) + m = (n_workers_user - n_workers_min) / interaction_rate_linear_below + # at least 1 worker + return max(1, round(m * interaction_rate + n_workers_min)) + def relativeCPU(n_rel, n_workers): # compute number of CPUs from a given number of workers # n_workers and a fraction n_rel