-
Notifications
You must be signed in to change notification settings - Fork 15
Workflows
Harvester supports various workflows with push/pull, heartbeat suppression, job late-binding, and one to many mapping between jobs and workers. It is possible to configure workflow per queue. This page explains how each workflow works.
Workers are submitted to scheduling systems without jobs. Each worker pulls a job directly from panda once the worker gets compute resources. Workers directly send heartbeats to panda.
Jobs are prefetched and bound to workers before being submitted to scheduling systems. Each worker has one job. Harvester sends heartbeats for jobs on behalf of workers.
Jobs are prefetched and bound to workers before being submitted to scheduling systems. After that workers send heartbeats by themselves.
Jobs are prefetched while workers are submitted to scheduling systems asynchronously. Harvester gives one job to each worker once it gets compute resources. Typically this workflow is used for HPCs with long waiting batch queues and jumbo jobs.
This workflow is aka multi-worker workflow. Each job is prefetched to be bound to multiple workers. Harvester collects and combines information from all workers to send heartbeats for the job. Typically this workflow is used for HPCs with backfill mode and jumbo jobs. Note that nWorkersPerJob is defined in panda_queueconfig.json like
"workerMaker": {
"name": "SimpleWorkerMaker",
"module": "pandaharvester.harvesterworkermaker.simple_worker_maker",
"nWorkersPerJob": 10
},
This workflow is similar to the above, but the job is given to workers once they get compute resources. Typically this workflow is used for spiky opportunistic resources with jumbo jobs.
This workflow is aka multi-job workflow. Multiple jobs are prefetched to be given to a single worker. Harvester converts worker's information to job attributes and send them to Panda in heartbeats. This workflow works with shared_file_messenger and simple_worker_maker as follows:
- nJobsPerWorker is defined in panda_queueconfig.json like
"workerMaker": {
"name": "SimpleWorkerMaker",
"module": "pandaharvester.harvesterworkermaker.simple_worker_maker",
"nJobsPerWorker": 100
},
- If the number of prepared jobs (nPreparedJobs) is larger than nJobsPerWorker, N
(int(nPreparedJobs/nJobsPerWorker)*nJobsPerWorker)
jobs are taken from the DB and M(N/nJobsPerWorker)
workers are generated. All jobs in each worker come from the same task by default. If the queue has "allowJobMixture":true in queueu_config, one worker is generated for jobs from multiple tasks. - Each worker makes a directory as an access point and makes a sub directory under the access point for each job. The sub directory path is
$ACCESS_POINT/$PandaID
where$PandaID
is the PandaID of the job. - JobSpec file, PoolFileCatalog, and symlinks to input files for each job are placed to the corresponding sub directory.
- The list of PandaIDs is dumped to a json file
pandaIDsFile
in the access point. The filename is defined in panda_harvester.cfg. The MPI payload should read the file so that Nth rank gets Nth PandaID and changes the run directory to$ACCESS_POINT/$PandaID
. - Monitor scans all sub directories in the access point to look for
workerAttributesFile
,eventStatusDumpJsonFile
andjobReportFile
as described in this section. - Stage-out is triggered for each rank when the tank produces
eventStatusDumpJsonFile
. - Each rank should report jobStatus in
workerAttributesFile
which can different from the final status of the worker. It would be a good practice that the final status of the worker is always 'failed', not to report 'finished' for some jobs when those ranks didn't set jobStatus correctly. - Jobs go to finished individually when each rank reports the final jobStatus in
workerAttributesFile
or altogether when the worker is done.
There could be a new worker_maker if nJobsPerWorker needs to be dynamically defined, e.g. for backfill etc.
This workflow works in combination with other workflows like OneToOne and ManyToOne. In the pull model jobs are dispatched from panda once CPUs become available. On the other hand, with job late-binding and the push model harvester prefetches jobs first, that is, jobs are dispatched from panda before CPUs become available, and then harvester sends jobs to CPUs once they become available. Typically this is useful for HPCs to fill nodes which became idle after original jobs were done since execution times of those jobs were shorter than the time length of the batch slot. This could be still relevant for huge jobs if those jobs are shared with other resources. Note that with this workflow harvester submits vanilla payloads to the batch system. The payload needs a capability to switch runtime environment and run a new job on CPU.
The idea is to dynamically change the number of jobs per worker based on real-time information such as the number of free slots in the batch system at that time. For this, a worker maker needs to return a proper number like dummy_dynamic_worker_maker for the get_num_jobs_per_worker()
method. Then harvester picks up jobs accordingly, makes and submits workers. Probably it is good to set a queue config parameter, maxNewWorkersPerCycle
, to 1, so that all jobs picked up by harvester are given to a single worker.
The PanDA brokerage assigns jobs to PQs to keep (nAssignedJobs + nActivatedJobs) ≃ (nRunningJobs + nStartingJobs) × 2, where nRunningJobs = max(nRunningJobs, 20). This doesn't work well for spiky resources if nRunningJobs is so fluctuated. Each PQ can define the number of CPUs to address the issue using PanDA API
$ curl --capath $X509_CERT_DIR --cacert $X509_USER_PROXY --cert $X509_USER_PROXY https://pandaserver.cern.ch:25443/server/panda/setNumSlotsForWP?pandaQueueName=XYZ\&numSlots=N(\&gshare=blah)(&resourceType=blah)(&validPeriod=M)
The brokerage internally uses N as nRunningJobs = max(nRunningJobs, N/nCorePerJob) so that the PQ can get jobs from a task with nCorePerJob even if there is not so many jobs running. When N=0, N is dynamically converted to nStartingJobs, so that the PQ will get jobs as long as jobs are being prefetched. The gshare and/or resourceType can be optionally specified when setting the number of slots. Set N=-1 to disable this mechanism. The validPeriod should be specified if the slots are available only in M days.
Getting started |
---|
Installation and configuration |
Testing and running |
Debugging |
Work with Middleware |
Admin FAQ |
Development guides |
---|
Development workflow |
Tagging |
Production & commissioning |
---|
Scale up submission |
Condor experiences |
Commissioning on the grid |
Production servers |
Service monitoring |
Auto Queue Configuration with CRIC |
SSH+RPC middleware setup |
Kubernetes section |
---|
Kubernetes setup |
X509 credentials |
AWS setup |
GKE setup |
CERN setup |
CVMFS installation |
Generic service accounts |
Advanced payloads |
---|
Horovod integration |