Skip to content

Plugin API specifications

Tadashi Maeno edited this page Nov 24, 2017 · 40 revisions

General notes

Most of plugin methods take instances of JobSpec class or WorkSpec class as input. The dictionary of job specifications given by the PanDA server is available in JobSpec.jobParams. Each worker communicates with Harvester through the access point which is specified in WorkSpec.getAccessPoint(). Some plugins need to access FileSpec objects for their actions like staging in or out.

Plugins can be configured per panda queue in $PANDA_HOME/etc/panda/panda_queueconfig.json. Here is an example of panda_queueconfig.json. For example,

"BNL_HARVESTER_TEST": {
    ...
    "preparator":{
		"name":"DummyPreparator",
		"module":"pandaharvester.harvesterpreparator.dummy_preparator"

where name and module define the class name and the module name for the plugin, respectively. Internally “from [module] import [name]” is invoked. Each plugin should inherit from pandaharvester.harvestercore.plugin_base.PluginBase so that it can have arbitrary configuration parameters as long as their parameter names are not the same as name or module.For example,

"preparator":{
    "name":"XyzPreparator",
    "module":"pandaharvester.harvesterpreparator.xyz_preparator",
    "host":"abc.cern.ch",
    "port":123,

Then “host” and “port” can be used in the plugin as instance variables, i.e. self.host and self.port.

Specifications of plugins and their methods are explained in the following sections.

Stage-in

Example

Methods

def trigger_preparation(self, jobspec)

Trigger stage-in procedure synchronously or asynchronously for the job. If the return code of this method is True, the job goes to the next step. If it is False, preparator immediately gives up the job. If it is None, the job is retried after triggerInterval seconds which is defined in the preparator section of panda_harvester.cfg. Input file attributes are available through jobspec.get_input_file_attributes(skip_ready=True) which gives a dictionary. The key of the dictionary is LFN of the input file and the value is a dictionary of file attributes. The attribute names are fsize, guid, checksum, scope, dataset, attemptNr, and endpoint. attemptNr shows how many times the file was tried so far. Grouping information such as transferID can be set to input files using jobspec.set_group_to_files(id_map) where id_map is {groupID:'lfns':[lfn1, ...], 'status':status}, and groupID and status are arbitrary strings.

Args:

name type description
jobspec JobSpec Job specifications

Return:

name type description
return code bool True: success, False: fatal error, None: temporary error
error dialog string Error dialog if any

def resolve_input_paths(self, jobspec)

Set input file paths to jobspec.get_input_file_attributes[LFN]['path'] for the job. New input file attributes need to be set to jobspec using jobspec.set_input_file_paths() after setting the file paths.

Args:

name type description
jobspec JobSpec Job specifications

Return:

name type description
return code bool True for success, False otherwise
error dialog string Error dialog if any

def check_status(self, jobspec)

Check status of stage-in procedure. If the return code of this method is True, the job goes to the next step. If it is False, preparator immediately gives up the job. If it is None, the job is retried after checkInterval seconds which is defined in the preparator section of panda_harvester.cfg. If preparation is done synchronously in trigger_preparation this method should always return True. Status of file group can be updated using jobspec.update_group_status_in_files(group_id, group_status) if necessary.

Args:

name type description
jobspec JobSpec Job specifications

Return:

name type description
return code bool True: transfer success, False: fatal transfer failure, None: on-going or temporary failure
error dialog string Error dialog if any

Stage-out

Example

Methods

def trigger_stage_out(self, jobspec)

Trigger stage-out procedure for the job. Output files are available through jobspec.get_outfile_specs(skip_done=False) which gives a list of FileSpecs not yet done. If this method returns None, stager retries the job after triggerInterval seconds which is defined in the stager section of panda_harvester.cfg. This means that if stage-out is permanently failed for a file, this method should returns False to give up further reattempts. fileSpec.attemptNr is incremented every time the method is invoked, so that plugins can give up based on a large attemptNr. Note that fileSpec.status should be set to 'finished' only when the file is actually transferred to the destination but not when transfer requests are successfully received by 3rd party service. For the latter case, fileSpec.status should be untouched, so that the file will be checked in the check_status method as described in the next section.

Args:

name type description
jobspec JobSpec Job specifications

Return:

name type description
return code bool True: success, False: fatal failure, None: temporary failure
error dialog string Error dialog if any

def check_status(self, jobspec)

Check status of stage-out procedure. If staging-out is done synchronously in trigger_stage_out this method should always return True. Output files are available through jobspec.get_outfile_specs(skip_done=False) which gives a list of FileSpecs not yet done. FileSpec.status needs to be set to 'finished' if stage-out was successful for a file, or to 'failed' if it failed. If this method returns None, stager retries the job after checkInterval seconds. If it returns False, stager immediately gives up the job and the job goes to failed.

Args:

name type description
jobspec JobSpec Job specifications

Return:

name type description
return code bool True: transfer success, False: fatal transfer failure, None: on-going or temporary failure
error dialog string Error dialog if any

def zip_output(self, jobspec)

Zip output files. This method loops over jobspec.outFiles to make a zip file for each outFileSpec from FileSpec.associatedFiles which is a list of toZipFileSpec to be zipped. The file path is available in toZipFileSpec. One zip files are made, their toZipFileSpec.path and toZipFileSpec.fsize need to be set.

Args:

name type description
jobspec JobSpec Job specifications

Return:

name type description
return code bool True for success, False otherwise
error dialog string Error dialog if any

Monitor

Example

def check_workers(self, workspec_list)

Check status of workers. This method takes a list of WorkSpecs as input argument and returns a list of worker's statuses. Nth element in the return list corresponds to the status of Nth WorkSpec in the given list. Worker's status is one of WorkSpec.ST_finished, WorkSpec.ST_failed, WorkSpec.ST_cancelled, WorkSpec.ST_running, WorkSpec.ST_submitted.

Args:

name type description
workspec_list [WorkSpec,] A list of Work specifications instances

Return:

name type description
return code bool True for success, False otherwise
status list [string,] a list of worker's statuses

Submitter

Example

def submit_workers(self, workspec_list)

Submit workers to a scheduling system like batch systems and computing elements. This method takes a list of WorkSpecs as input argument, and returns a list of tuples. Each tuple is composed of a return code and a dialog message. Nth tuple in the returned list corresponds to submission status and dialog message for Nth worker in the given WorkSpec list. A unique identifier is set to WorkSpec.batchID when submission is successful, so that they can be identified in the scheduling system. It would be useful to set other attributes like queueName (batch queue name), computingElement (CE's host name), and nodeID (identifier of the node where the worker is running).

Args:

name type description
workspec_list [WorkSpec,] A list of Work specifications instances

Return:

name type description
status and dialog message list [(bool, string),] A list of tuples. Each tuple is composed of submission status (True for success, False otherwise) and dialog message

Exposing batch logs

Submitter plugins can put batch log files to local directories where http is running or can let harvester upload those files to a web server, so that they are reachable by using web browsers through bigpandamon. WorkSpec provides the set_log_file method to set a log file to the worker:

def set_log_file(self, log_type, stream)

where log_type is one of 'batch_log', 'stdout', and 'stderr'. stream is a URL with 'http' or 'https' if the plugin itself puts the file to a web location, or is a local file path if harvester uploads the file. For the latter, panda_harvester.cfg needs to define

# base URL for write access to log cache server
pandaCacheURL_W = https://aipanda011.cern.ch:25443/server/panda

# base URL for read access to log cache server
pandaCacheURL_R = https://aipanda011.cern.ch:25443/cache

Sweeper

Example

def kill_worker(self, workspec)

Kill a worker in a scheduling system like batch systems and computing elements. This method takes a WorkSpec as input argument, and returns a tuple of a return code and a dialog message.

Args:

name type description
workspec WorkSpec Work specifications

Return:

name type description
return code bool True for success, False otherwise
error dialog string Error dialog if any

def sweep_worker(self, workspec)

Perform cleanup procedures for a worker, such as deletion of work directory. This method takes a WorkSpec as input argument, and returns a tuple of a return code and a dialog message.

Args:

name type description
workspec WorkSpec Work specifications

Return:

name type description
return code bool True for success, False otherwise
error dialog string Error dialog if any

Clone this wiki locally