Skip to content

Agents and Plugins descriptions

Tadashi Maeno edited this page Jan 10, 2023 · 9 revisions

Agents

CredManager

The agent to maintain credentials like grid proxy and access key. It can manage multiple credentials. It is possible to specify a multi-line data to each parameter in the credmanager section in panda_harvester.cfg. Each line corresponds to one credential. For example, with

# module name
moduleName =
 pandaharvester.harvestercredmanager.grid_cred_manager
 pandaharvester.harvestercredmanager.no_voms_cred_manager
 pandaharvester.harvestercredmanager.no_voms_cred_manager

# class name
className =
 GridCredManager
 NoVomsCredManager
 NoVomsCredManager

CredManager maintains three credentials; one credential with the GridCredManager plugin and two credentials with the NoVomsCredManager plugin.


Cacher

The agent to cache data. It is possible to specify a multi-line parameter in the data parameter in the cacher section in panda_harvester.cfg. Each line is composed of the main key, the sub key, URI, and output file name which are concatenated with |, where sub key and output filename are optional.

[cacher]
data = 
 main_key_1|sub_key_2|URI_1[|output_file_name_1]
 main_key_2|sub_key_2|URI_2[|output_file_name_2]

Cacher periodically retrieves data from URI and store it in the database. Other agents and plugins can access data through DBProxy with the main key and the sub key as shown in this example. Currently three schemes are supported for URI, http, file and panda_cache. URI is a URL for http and a file URI for file, respectively. panda_cache scheme allows to retrieve a data pair from panda. URI for panda_cache is panda_cache:public_name#secret_name, where public_name is the name of public data like public key and user ID and secret_name is the name of secret data like private key and password. To use panda_cache, public and secret data need to be put in the cache directory on the panda server nodes, and the k flag needs to be set in the gridpref field for the user who is running the harvester instance. If output_file_name is specified, json dump of the data is written into the file in addition.

Note that plugins can access the cached information through the self.dbInterface member which is automatically installed by plugin_factory when plugins are instantiated. self.dbInterface has the get_cache(main_key, sub_key=None) for that.


Throttler

The agent throttles submission of workers based on the number of missed workers in a time window. The idea is to avoid bombing scheduling systems when submission is problematic. Each queue can define a throttler like

		"throttler": {
			"name": "SimpleThrottler",
			"module": "pandaharvester.harvesterthrottler.simple_throttler",
                        "logicType": "OR",
			"rulesForMissed": [{"level": "pq", "timeWindow": 10, "maxMissed": 10},
				               {"level": "ce", "timeWindow": 10, "maxMissed": 5}]
		},

The SimpleThrottler can have multiple rules in 'rulesForMissed'. Each rule counts the number of missed workers (nMissed) in last 'timeWindow' minutes at a 'level', where 'level' can be 'site', 'pq' (Panda Queue), or 'ce' (Computing Element), and is satisfied when
nMissed is larger than 'maxMissed'. 'logicType' should be set to 'OR' if submission should be throttled when one or more rules are satis. Or it should be set to "AND" if submission should be throttled when all rules are satis.


Watcher

The agent is watching panda-db_proxy.log, which is the busiest log in harvester log files, to take actions. Actions could be 'kill' to kill the instance and/or 'email' to send alarms to some recipients. Parameters for watcher are defined in the watcher section of panda_harvester.cfg. Actions are triggered when the latest log message is older than maxStalled seconds or it takes more than maxDuration seconds to dump nMessages messages. Each condition can be suppressed by setting 0 to the max* parameter.

To send email alarms, if you are allowed to send emails through the smtp daemon running on the same node where the harvester instance is launched it would be enough to have the following parameters in the watcher section of panda_harvester.cfg.

[watcher]
mailServer = localhost
mailPort = 25
mailUser =
mailPassword =
mailUseSSL = False

On the otherhand, if you have to logon to an email server to send emails the username and password should be encrypted as follows. First, you need to define a passphrase in the HARVESTER_WATCHER_PASSPHRASE env variable.

$ export HARVESTER_WATCHER_PASSPHRASE=<an_arbitrary_string_for_pass_phrase> 

The variable may be set in etc/sysconfig/panda-harvester or may be set just before the harvester instance is launched. Next, you can get an encrypted string for username or password by using encryptForWatcher.py

$ python lib/python*/site-packages/pandaharvester/harvestertest/encryptForWatcher.py <username or password>

For example,

$ python lib/python*/site-packages/pandaharvester/harvestertest/encryptForWatcher.py atlpan
original : atlpan
encrypted: LifaYHRt5Q/KIfmlALNmvvRA2lcPDlAE3Cfw==
decrypted: atlpan

where you can find an encrypted string LifaYHRt5Q/KIfmlALNmvvRA2lcPDlAE3Cfw== for atlpan. Then you need to set the encrypted string to mailUser or mailPassword in the watcher section of panda_harvester.cfg. E.g.,

[watcher]
mailUser = LifaYHRt5Q/KIfmlALNmvvRA2lcPDlAE3Cfw==

Note that if you change the passphrase you need to get new encrypted strings again.


Sweeper

The agent to kill or cleanup workers through plugins. Each plugin provides concrete actions like sending the kill command to the batch system and deleting working directories from a shared file system. The kill action is triggered when harvester received commands from Panda or workers request to kill themselves through messenger. The cleanup action is triggered for each worker after the worker sits in a final status for a certain period. The period can be set for each status in the sweeper section in panda_harvester.cfg. There are four parameters; keepFinished for finished workers, keepFailed for failed workers, keepCancelled for cancelled workers, and keepMissed for missed workers. For example, keepFinished=24 means that leftovers of a worker such as working directory is kept for 24 hours after the worker goes to finished.


JobFetcher

The agent to fetch jobs. Each queue can set nQueueLimitJob or nQueueLimitJobRatio to define how many jobs are prefetched. nQueueLimitJob statically defines the maximum number of jobs in starting state (nStarting) for the queue, so that jobs are prefetched until the number of starting jobs exceeds or is equal to nQueueLimitJob. On the other hand, nStarting is dynamically defined and nQueueLimitJob is ignored if nQueueLimitJobRatio is used, which works together with nQueueLimitJobMax and nQueueLimitJobMin. nQueueLimitJobRatio is the target ratio of nStarting to the number of running jobs (nRunning) as a percentage. nQueueLimitJobRatio=100 if nStarting:nRunning=1:1. nStarting is not less than nQueueLimitJobMin which is 1 by default, and is not more than nQueueLimitJobMax if it is defined.


Plugins

Proxy Cache CredManager

This credential manager periodically retrieves a VOMS proxy from the panda server which caches proxies using the proxy cache mechanism. A no-VOMS proxy is used to access the panda server while a VOMS proxy is used for storage access etc.

The following procedures are required.

  1. Upload a proxy to myproxy.
$ myproxy-init -s myproxy.cern.ch -x -Z '/DC=ch/DC=cern/OU=Organic Units/OU=Users/CN=pandasv1/CN=663551/CN=Robot: ATLAS Panda Server1' -d -k panda -c 4383 -t 0
  1. Run the following command to get the DN and an extracted name from the proxy
$ python lib/python*/site-packages/pandaharvester/harvestertest/cleanDN.py [proxy_file]
  1. Submit a JIRA ticket to register the DN and extracted name.

Then

  1. The DN is registered in the ATLAS_PANDAMETA.users table. The "p" bit needs to be set in the gridpref field.
  2. The extracted name is added to ATLAS_PANDAMETA.cloudconfig.pilotowners if DN-based access (i.e. access with no-VOMS proxy) is required for all sites or ATLAS_PANDAMETA.schedconfig.DN for some sites if DN-based access is required for only those sites.

Once those procedures are done, following parameters need to be set in panda_harvester.cfg.

[credmanager]
# module name
moduleName = pandaharvester.harvestercredmanager.proxy_cache_cred_manager
# class name
className = ProxyCacheCredManager
# input cert file
certFile = <filename for no-VOMS proxy>
# output cert file
outCertFile = <filename for VOMS proxy>
# voms
voms = atlas:/atlas/Role=production

Note that no-VOMS proxy can have a very long lifetime like

$ voms-proxy-init --out <filename for no-VOMS proxy> -valid 596523:0

to avoid frequent renewal.


Messenger plugins

Shared File Messenger

This messenger propagates messages between Harvester and worker using files in a directory (access point) on a shared file system.

Filenames are defined in the payload_interaction section in harvester_config.cfg. In the following, blah stands for a filename defined in harvester_config.payload_interaction.blah.

Get jobs

The worker (payload) needs to put a file with jobRequestFile to get jobs. This file can be empty since Harvester only checks if the file exists. Currently the number of jobs per worker is statically defined in the cfg file. This will be improved to dynamically change the number of jobs, and thus jobRequestFile will a json dump of some job request parameters such as nJobs, nCores, wallTime, etc. Once Harvester finds jobRequestFile, it makes a file with jobSpecFile which is a json dump of a dictionary {pandaID_1:jobSpec.jobParams, pandaID_2: ...}, in order to feed jobs to the worker. A pool file catalog xmlPoolCatalogFile is created at the same time. If you want to see an example of jobSpec.jobParams this page will help. Harvester deletes jobRequestFile.

Update jobs

The worker can update job attributes like coreCount, startTime, endTime, maxPSS, etc, by putting a file workerAttributesFile. The file is a json dump of {attribute:value}. Attribute names are defined as arguments of the updateJob method for JobDispatcher in the PanDA server. Especially if the job status is different from worker status, {'jobStatus':'blah'} should be added. If the final job status is added in workerAttributesFile harvester assumes that the log file was correctly produced by the payload itself and thus harvester doesn't make a log file for the job. In other words, when the final job status is not added but the worker goes to the final status, this implies that the job was wrongly terminated and thus harvester makes a log file for the job if a log file is not included in eventStatusDumpJsonFile which is explained below. If the worker puts jobReportFile it is sent to PanDA as jobReport.json.

Get events

The worker needs to put a file with eventRequestFile to get events. The file is a json dump of a dictionary {PandaID: {'nRanges':???, 'pandaID':???, 'taskID':???, 'jobsetID':???}}. Harvester makes a file with eventRangesFile which is a json dump of a list [{'eventRangeID': ???, 'LFN': ???, 'lastEvent': ???, 'startEvent': ???, 'scope':???, 'GUID':???},], and then deletes eventRequestFile.

Update events and/or stage-out

The worker needs to put eventStatusDumpJsonFile to update events and/or stage-out. The file is a json dump of a dictionary {pandaID_1:[{'eventRangeID':???, 'eventStatus':???, 'path':???, 'type':???, 'chksum':???, 'fsize':???, 'guid':???}, ...], pandaID_2:[...], ...}. 'eventRangeID' and 'eventStatus' are required only to update events, while other file information such as 'path', 'type' 'fsize' and 'chksum' are required in addition if the event produced an output file. The value for 'eventStatus' is a string among 'running', 'finished', 'failed' or 'fatal'. 'type' is the type of the file and it can be 'output', 'es_output', 'zip_output', or 'log'. 'eventRangeID' should be removed unless the type is 'es_output' or 'zip_output'. 'zip_output' is for zipped event service files. A single zipped file can contain multiple event service files from multiple event ranges. In this case, the list of event range dictionaries will contain multiple elements which are identical except 'eventRangeID'. Stager plugins can upload files depending on the 'type'. 'chksum' is calculated using adler32 if omitted. If the output has an intrinsic guid (this is the case for POOL files) 'guid' needs to be set. eventStatusDumpJsonFile is deleted once Harvester updates the database.

Plugin parameters
Name Description
accessPoint A shared director for the worker to communicate with harvester. ${workerID} is replaced to the real worker identifier, so that one directory is assigned to each worker

Http Server Messenger

This messenger inherits from Shared File Messenger and runs HTTP front-end to communicate with workers.

The messenger converts objects (such as jobs, events, and requests) to files and puts them into a directory similarly as Shared File Messenger, but the directory doesn't have to be on a shared file system. The messenger has an HTTP frontend to receive requests from workers, e.g. getting or updating jobs or events. Workers send requests with POST and Content-Type: application/json, and get the following HTTP response code for each request.

HTTP code Description
200 OK
400 Request is corrupted
500 Internal server error
501 Method is not implemented
503 Previous request is not yet processed

An HTTP session is established for each request rather than sending messages over permanent socket connections, so that the harvester instance can be stopped and restarted while workers are running. The request body is a dictionary with three keys, methodName, workerID, and data. workerID is the identifier of the worker defined in the Harvester_Workers table and is given to the worker when it is being submitted. Here is the list of methodNames.

Method name Description
requestJobs Request jobs for the worker
getJobs Get jobs for the worker
requestEventRanges Request event ranges for a job in the worker
getEventRanges Request event ranges for a job in the worker
updateJobs Update jobs in the worker
uploadJobReport Upload jobReport for jobs in the worker
uploadEventOutputDump Upload event/output dump for jobs in the worker

Each method is implemented to replace the corresponding file-based interaction. For example, with Shared File Messenger the worker makes a json dump file of a dictionary as described above to update jobs, i.e., the worker does something like

data = {key:value}
json.dump(data, file)

With Http server messenger, the worker needs to do something like

form = {'methodName': 'updateJobs', 'workerID': XYZ, 'data': {key:value}}
req = urllib2.Request('http://edge_node_ip:25080', json.dumps(form), {'Content-Type': 'application/json'})
res = urllib2.urlopen(req)

Following parameters need to be set in panda_harvester.cfg.

[frontend]
# port number
portNumber = 25080
# number of threads
nThreads = 10
# verbose
verbose = False

Note : The messenger is implemented on top of python's SimpleHTTPServer to be lightweight. Although there is a thread pool to concurrently process multiple requests it will not so scale as other production web servers like Apache. There is another messenger based on Apache which is good for some usages which require more scalability and allow more resource consumption for the harvester instance.


Preparator plugins

RseDirect Preparator

The workflow for RseDirectPreparator is as follows. First panda makes a rule to transfer files to an RSE which is associated to the resource. Once files are transferred to the RSE, job status is changed to activated from assigned. Then Harvester fetches the job and constructs input file paths that point to pfns in the storage. Those paths are given to the job. This means that the job directly read input files from the storage without any data motion by Harvester. Parameters for the plugin are as follows:

Name Description
basePath The base storage path to construct full input file paths using LFN and rucio convention

Go Preparator

This plugin transfers input files using Globus Online. Typically, input files are transferred to a remote RSE/Globus dual endpoint first, and then the plugin copies the input files to a local storage using Globus Online. Parameters for the plugin are as follows:

Name Description
basePath The base storage path to where Globus Online transfer input files
Globus_srcPath The base storage path from where Globus Online transfer input files
srcEndpoint The source (remote) Globus endpoint name
dstEndpoint The destination (local) Globus endpoint name

GlobusBulk Preparator

This plugin copies input files using Globus Online with bulk transfers. Parameters for the plugin are the same as that for Go Preparator.


PilotMover Preparator

This plugin copies input files from a RSE using Pilot2.0 Data API. Typically, input files are transferred to a remote RSE first, and then the plugin copies the input files to a local storage where rucio cannot directly write into. Parameters for the plugin are as follows:

Name Description
basePath The base storage path to where PilotMover transfers input files

Stager plugins

RseDirect Stager

In the workflow for RseDirectStager, workers directly upload output files to RSE and thus there is no data motion in Harvester unless workers die and harvester has to upload log files. Parameters for the plugin are as follows:

Name Description
zipDir The directory name where zipped log files are created. ${SRCDIR} should be used if zipped log files are created in the same directory as original log files

RucioStagerHPC Stager

This plugin copies output files to a RSE using Pilot2.0 Data API. Parameters for the plugin are as follows:

Name Description
zipDir The directory name where zipped log files are created. ${SRCDIR} should be used if zipped log files are created in the same directory as original log files
dstRSE_Out The name of RSE for output files
dstRSE_Log The name of RSE for log files

Rucio Stager

This plugin copies output files to a remote RSE from a local RSE using replication rules in rucio. Parameters for the plugin are as follows:

Name Description
zipDir The directory name where zipped log files are created. ${SRCDIR} should be used if zipped log files are created in the same directory as original log files
srcRSE The name of local RSE
dstRSE_Out The name of remote RSE for output files
dstRSE_Log The name of remote RSE for log files
dstRSE_ES The name of remote RSE for pre-merged event service files
srcBasePath The base storage path from where rucio transfers output files
objStoreID_ES The ID of the object store where pre-merged event service files are uploaded

Globus Stager

This plugin transfers output files using Globus Online.
Parameters for the plugin are as follows:

Name Description
basePath The base storage path from where Globus Online transfer output files
Globus_dstPath The base storage path to where Globus Online transfer output files
srcEndpoint The source (local) Globus endpoint name
dstEndpoint The destination (remote) Globus endpoint name

GlobusBulk Stager

This plugin transfers output files using Globus Online with bulk transfers. Parameters for the plugin are the same as that for Globus Stager.


FTS Stager

This plugin transfers output files using FTS.
Parameters for the plugin are as follows:

Name Description
basePath The base storage path from where Globus Online transfer output files
ftsServer FTS server name
ftsLookupTimeout The interval to check transfer status in second
ca_cert The certificate of the FTS server
srcEndpointOut The base path of the local storage for pre-merged event service files
dstEndpointOut The base path of the remote storage for pre-merged event service files
srcEndpointLog The base path of the local storage for log files
dstEndpointLog The base path of the remote storage for log files
srcEndpointES The base path of the local storage for output files
dstEndpointES The base path of the remote storage for output files

Submitter plugins

Slurm Submitter

This plugins submits workers to SLURM batch systems. Parameters for the plugin are as follows:

Name Description
templateFile The template file name to generate batch scripts. Placeholders in the template file like ${blah} is replaced to actual values when being submitted
nCore The number of cores for one worker
nCorePerNode The number of cores per node

HTCondor Submitter

This plugins submits workers with HTCondor. Parameters for the plugin are as follows:

Name Description
templateFile The path of template of SDF (submit description file of HTCondor). Placeholders in the template file like {blah} (python str.format placeholder) is replaced to actual values when being submitted. (default: None)
logDir The log directory for HTCondor log files. (mandatory)
nProcesses The number of threads for each bulk submission. (default: 1)
nCorePerNode The number of cores per node. If omitted, queue or job setting is used (default: None)
logBaseURL The base URL to access HTCondor log files remotely.
x509UserProxy The path of x509 proxy file to use. It will be used to substitute for the placeholder {x509UserProxy} in template SDF. (default: OS env $X509_USER_PROXY)
useAtlasGridCE The boolean about whether to use ATLAS Grid CEs. If true, CE information will be automatically obtained from AGIS, and substitute for the placeholders in SDF template about CE {ceXXX}. Used in Harvester GRID commissioning. (default: false)
CEtemplateDir The path of the directory containing SDF template files for different flavors of CEs (Now arc-ce.sdf, cream-ce.sdf, htcondor-ce.sdf). Only works if templateFile is omitted AND useAtlasGridCE is true. If working, harvester chooses the SDF template of corresponding filename under the directory for condor submission. (default: None)

Extractor and Auxiliary preparator plugins

There are two types of special plugins to handle auxiliary files of jobs, which are not normal input files, such as sandbox files for analysis, container images, and transformations. Extractor plugins are used in JobFetcher to extract information about auxiliary files from jobs. For example, AnalysisExtractor parses job attributes and parameters to get URLs of sandbox file, container image, and transformation. Auxiliary preparator plugins are used in Preparator in addition to primary preparator plugins. Auxiliary preparator plugins take care of only auxiliary files while primary preparator plugins take care of normal input files. It is possible to use the same plugins for normal input and auxiliary files if they can be transferred using the same protocol and/or machinery. Extractor and auxiliary preparator plugins need to be defined in queue_config.json with extractor and aux_preparator keys for each queue. For example,

                "aux_preparator": {
                        "name": "AnalysisAuxPreparator",
                        "module": "pandaharvester.harvesterpreparator.analysis_aux_preparator",
                        "localBasePath":"/data/aux/rucio"
                },
                "extractor": {
                        "name": "AnalysisExtractor",
                        "module": "pandaharvester.harvesterextractor.analysis_extractor"
                }

which uses AnalysisExtractor to extract information and AnalysisAuxPreparator to transfer auxiliary files. It uses python requests to fetch files with HTTP or HTTPS URLs, docker save or singularity build to fetch docker files depending on the containerRuntime attribute of the plugin in panda_queueconfig.json. It is possible for AnalysisAuxPreparator to use external commands for specific protocols. For example, the following configration uses command1 to trigger preparation and command2 to check status for auxiliary files with some_protocol://

                "aux_preparator": {
                        "name": "AnalysisAuxPreparator",
                        "module": "pandaharvester.harvesterpreparator.analysis_aux_preparator",
                        "externalCommand": {"some_protocol": {"trigger": {"args":["command1","-i","{src}","-o","{dst}"]}, "check": {"args":["command2","{id}","xyz"]}}},
                        ...

where {src},{dst}, and {id} are replaced with the source URL, the destination path, and the identifer retuned by the trigger command, respectively, when commands are acutally executed. If preparation is synchronously done the "check" field can be ommit.

Clone this wiki locally