-
Notifications
You must be signed in to change notification settings - Fork 16
Plugin API specifications
Most of plugin methods take instances of JobSpec class or WorkSpec class as input. The dictionary of job specifications given by the PanDA server is available in JobSpec.jobParams. Each worker communicates with Harvester through the access point which is specified in WorkSpec.getAccessPoint(). Some plugins need to access FileSpec objects for their actions like staging in or out. Generally one plugin instance is instantiated for each agent action, e.g., one Submitter and one WorkerMaker instances for a single submission cycle.
Plugins can be configured per panda queue in $PANDA_HOME/etc/panda/panda_queueconfig.json. Here is an example of panda_queueconfig.json. For example,
"BNL_HARVESTER_TEST": {
...
"preparator":{
"name":"DummyPreparator",
"module":"pandaharvester.harvesterpreparator.dummy_preparator"
where name and module define the class name and the module name for the plugin, respectively. Internally “from [module] import [name]” is invoked. Each plugin should inherit from pandaharvester.harvestercore.plugin_base.PluginBase so that it can have arbitrary configuration parameters as long as their parameter names are not the same as name or module.For example,
"preparator":{
"name":"XyzPreparator",
"module":"pandaharvester.harvesterpreparator.xyz_preparator",
"host":"abc.cern.ch",
"port":123,
Then “host” and “port” can be used in the plugin as instance variables, i.e. self.host and self.port.
Specifications of plugins and their methods are explained in the following sections.
def trigger_preparation(self, jobspec)
Trigger stage-in procedure synchronously or asynchronously for the job.
If the return code of this method is True, the job goes to the next step. If it is False,
preparator immediately gives up the job. If it is None, the job is retried after triggerInterval
seconds which is defined in the preparator section of panda_harvester.cfg.
Input file attributes are available through jobspec.get_input_file_attributes(skip_ready=True)
which gives a dictionary. The key of the dictionary is LFN of the input file
and the value is a dictionary of file attributes. The attribute names are
fsize, guid, checksum, scope, dataset, attemptNr, and endpoint. attemptNr shows how many times
the file was tried so far. Grouping information such as transferID can be set to input files using
jobspec.set_group_to_files(id_map)
where id_map is
{groupID:'lfns':[lfn1, ...], 'status':status}
, and groupID and status are arbitrary strings.
Args:
name | type | description |
---|---|---|
jobspec | JobSpec | Job specifications |
Return:
name | type | description |
---|---|---|
return code | bool | True: success, False: fatal error, None: temporary error |
error dialog | string | Error dialog if any |
def resolve_input_paths(self, jobspec)
Set input file paths to jobspec.get_input_file_attributes[LFN]['path'] for the job. New input file attributes need to be set to jobspec using jobspec.set_input_file_paths() after setting the file paths.
Args:
name | type | description |
---|---|---|
jobspec | JobSpec | Job specifications |
Return:
name | type | description |
---|---|---|
return code | bool | True for success, False otherwise |
error dialog | string | Error dialog if any |
def check_status(self, jobspec)
Check status of stage-in procedure.
If the return code of this method is True, the job goes to the next step. If it is False,
preparator immediately gives up the job. If it is None, the job is retried after checkInterval
seconds which is defined in the preparator section of panda_harvester.cfg.
If preparation is done synchronously in trigger_preparation
this method should always return True. Status of file group can be updated using
jobspec.update_group_status_in_files(group_id, group_status)
if necessary.
Args:
name | type | description |
---|---|---|
jobspec | JobSpec | Job specifications |
Return:
name | type | description |
---|---|---|
return code | bool | True: transfer success, False: fatal transfer failure, None: on-going or temporary failure |
error dialog | string | Error dialog if any |
def trigger_stage_out(self, jobspec)
Trigger stage-out procedure for the job.
Output files are available through jobspec.get_outfile_specs(skip_done=False)
which gives
a list of FileSpecs not yet done.
If this method returns None,
stager retries the job after triggerInterval
seconds which is defined in the stager section of panda_harvester.cfg. This means that if stage-out is permanently failed for a file, this method should
returns False to give up further reattempts. fileSpec.attemptNr
is incremented every time the method is invoked, so that plugins can give up based on a large attemptNr. Note that fileSpec.status should be set to 'finished' only when the file is actually transferred to the destination but not when transfer requests are successfully received by 3rd party service. For the latter case, fileSpec.status should be untouched, so that the file will be checked in the check_status
method as described in the next section.
Args:
name | type | description |
---|---|---|
jobspec | JobSpec | Job specifications |
Return:
name | type | description |
---|---|---|
return code | bool | True: success, False: fatal failure, None: temporary failure |
error dialog | string | Error dialog if any |
def check_status(self, jobspec)
Check status of stage-out procedure. If staging-out is done synchronously in trigger_stage_out this method should always return True. Output files are available through jobspec.get_outfile_specs(skip_done=False) which gives a list of FileSpecs not yet done.
FileSpec.status needs to be set to 'finished' if stage-out was successful for a file, or to 'failed' if it failed. If this method returns None, stager retries the job after checkInterval
seconds. If it returns False, stager immediately gives up the job and the job goes to failed.
Args:
name | type | description |
---|---|---|
jobspec | JobSpec | Job specifications |
Return:
name | type | description |
---|---|---|
return code | bool | True: transfer success, False: fatal transfer failure, None: on-going or temporary failure |
error dialog | string | Error dialog if any |
def zip_output(self, jobspec)
Zip output files. This method loops over jobspec.outFiles, which is a list of zip file's FileSpecs, to make a zip file for each zip file's FileSpec. FileSpec.associatedFiles is a list of FileSpecs of associated files to be zipped. The path of each associated file is available in associated file's FileSpec.path. Once zip files are made, their FileSpec.path, FileSpec.fsize and FileSpec.chksum need to be set.
Args:
name | type | description |
---|---|---|
jobspec | JobSpec | Job specifications |
Return:
name | type | description |
---|---|---|
return code | bool | True for success, False otherwise |
error dialog | string | Error dialog if any |
def async_zip_output(self, jobspec)
Zip output files asynchronously. Asynchronous zipping is enabled when usePostZipping
is set to True in the zipper section in panda_harvester.cfg. This method is followed by post_zip_output(), which is typically
useful to trigger an asynchronous zipping mechanism such as batch job.
This method loops over jobspec.outFiles, which is a list of zip file's FileSpecs, to make a zip file for each zip file's FileSpec. FileSpec.associatedFiles is a list of FileSpecs for associated files to be zipped. The path of each associated file is available in associated file's FileSpec.path.
Args:
name | type | description |
---|---|---|
jobspec | JobSpec | Job specifications |
Return:
name | type | description |
---|---|---|
return code | bool | True for success, False otherwise |
error dialog | string | Error dialog if any |
def post_zip_output(self, jobspec)
This method is executed after async_zip_output(), to do post-processing for zipping. Once zip files are made, this method need to look over jobspec.outFiles to set their FileSpec.path, FileSpec.fsize, and FileSpec.chksum.
Args:
name | type | description |
---|---|---|
jobspec | JobSpec | Job specifications |
Return:
name | type | description |
---|---|---|
return code | bool | True for success, False otherwise |
error dialog | string | Error dialog if any |
def check_workers(self, workspec_list)
Check status of workers. This method takes a list of WorkSpecs as input argument and returns a list of worker's statuses. Nth element in the return list corresponds to the status of Nth WorkSpec in the given list. Worker's status is one of WorkSpec.ST_finished, WorkSpec.ST_failed, WorkSpec.ST_cancelled, WorkSpec.ST_running, WorkSpec.ST_submitted.
Args:
name | type | description |
---|---|---|
workspec_list | [WorkSpec,] | A list of Work specifications instances |
Return:
name | type | description |
---|---|---|
return code | bool | True for success, False otherwise |
status list | [string,] | a list of worker's statuses |
def submit_workers(self, workspec_list)
Submit workers to a scheduling system like batch systems and computing elements. This method takes a list of WorkSpecs as input argument, and returns a list of tuples. Each tuple is composed of a return code and a dialog message. Nth tuple in the returned list corresponds to submission status and dialog message for Nth worker in the given WorkSpec list. A unique identifier is set to WorkSpec.batchID when submission is successful, so that they can be identified in the scheduling system. It would be useful to set other attributes like queueName (batch queue name), computingElement (CE's host name), and nodeID (identifier of the node where the worker is running).
Args:
name | type | description |
---|---|---|
workspec_list | [WorkSpec,] | A list of Work specifications instances |
Return:
name | type | description |
---|---|---|
status and dialog message list | [(bool, string),] | A list of tuples. Each tuple is composed of submission status (True for success, False otherwise) and dialog message |
Submitter plugins can tell bigpandamon the URLs of batch log files, which are put by scheduling systems like batch systems to local directories where http is running, or submitter plugins can let harvester instances upload those files to a web server (essentially the panda server), so that they are reachable by using web browsers through bigpandamon. In either case, all submitter plugins need to do is to set log file URLs or paths to WorkSpec using the set_log_file
method :
def set_log_file(self, log_type, stream)
where log_type
is one of 'batch_log', 'stdout', and 'stderr'. stream
is a URL with 'http' or 'https' if the file is available on a web location, or is a local file path if harvester uploads the file. For the latter, panda_harvester.cfg needs to define something like
# base URL for write access to log cache server
pandaCacheURL_W = https://aipanda011.cern.ch:25443/server/panda
# base URL for read access to log cache server
pandaCacheURL_R = https://aipanda011.cern.ch:25443/cache
def kill_worker(self, workspec)
Kill a worker in a scheduling system like batch systems and computing elements. This method takes a WorkSpec as input argument, and returns a tuple of a return code and a dialog message.
Args:
name | type | description |
---|---|---|
workspec | WorkSpec | Work specifications |
Return:
name | type | description |
---|---|---|
return code | bool | True for success, False otherwise |
error dialog | string | Error dialog if any |
def sweep_worker(self, workspec)
Perform cleanup procedures for a worker, such as deletion of work directory. This method takes a WorkSpec as input argument, and returns a tuple of a return code and a dialog message. The list of JobSpecs associated to the worker is available in workspec.get_jobspec_list(). The list of input and output FileSpecs, which are not used by any active jobs and thus can safely be deleted, is available in JobSpec.get_files_to_delete().
Args:
name | type | description |
---|---|---|
workspec | WorkSpec | Work specifications |
Return:
name | type | description |
---|---|---|
return code | bool | True for success, False otherwise |
error dialog | string | Error dialog if any |
def make_worker(self, jobspec_list, queue_config, resource_type)
Make a single worker for a set of jobs. This method takes a list of JobSpecs, queue configuration, and a string of resource type as input arguments, and returns a WorkSpec if successful or None otherwise. When None is returned corresponding job is flagged as failed, or it is retried if the WorkerMaker sets skipOnFail=True.
Args:
name | type | description |
---|---|---|
jobspec_list | [JorkSpec,] | a list of job specification instances |
queue_config | QueueConfig | queue configuration instance |
resource_type | string | resource type |
Return:
name | type | description |
---|---|---|
work_spec | WorkSpec | a work specification if successful or None otherwise |
def get_num_jobs_per_worker(self, n_workers)
Get the number of jobs per worker.
Args:
name | type | description |
---|---|---|
n_workers | int | the number of workers to be generated at most |
Return:
name | type | description |
---|---|---|
n_jobs_per_worker | int | the number of jobs per worker |
def get_num_workers_per_job(self, n_workers)
Get the number of jobs per worker.
Args:
name | type | description |
---|---|---|
n_workers | int | the number of workers to be generated at most |
Return:
name | type | description |
---|---|---|
n_workers_per_job | int | the number of workers per job |
def is_resource_ready(self)
Check if the resource is ready. This method is called only when the WorkerMaker sets dynamicSizing=True.
Args:
name | type | description |
---|
Return:
name | type | description |
---|---|---|
is_ready | bool | True if ready, or False otherwise |
Communicator takes care of communication with workload management system (WMS). It can inherit from a base class, to that it is enough ti implement only methods useful for the WMS and/or use-case.
def get_jobs(self, site_name, node_name, prod_source_label, computing_element, n_jobs, additional_criteria)
This method is used to get jobs from WMS based on matchmaking with various parameters. Jobs are described as dictionaries and they are converted to JobSpecs in the JobFetcher agent.
Args:
name | type | description |
---|---|---|
site_name | string | The site name where computing resources are assigned |
node_name | string | The name of the node which retrieves the jobs |
prod_source_label | string | The label of jobs, such as production and analysis |
computing_element | string | The computing element to which jobs are sent |
n_jobs | int | The number of jobs to be got |
additional_criteria | dict | Additional criteria for job selection |
Return: |
name | type | description |
---|---|---|
job_dict_list | [dict,] | A list of job specification dictionaries |
error dialog | string | Error dialog if any |
def update_jobs(self, jobspec_list, id)
This methods updates job information in WMS, and returns a list of dictionaries. Each dictionary is composed of StatusCode:int and ErrorDiag:string. The StatusCode is 0 if the job is successfully updated in WMS.
Args:
name | type | description |
---|---|---|
jobspec_list [JorkSpec,] | A list of job specification instances | |
id | string | the process identifier |
Return:
name | type | description |
---|---|---|
return_list | [dict,] | A list of return dictionaries. Each dictionary is {'StatusCode':int, 'ErrorDiag':string} |
def get_event_ranges(self, data_map, scattered)
This method is used to get events from WMS for jobs with fine-grained bookkeeping. The scattered flag is set to True to get events randomly, which is useful to avoid concurrent access to the same file from many ranks, for example.
Args:
name | type | description |
---|---|---|
data_map | {jobID:data,} | A dictionary to request events. The key of the dictionary is the identifier of a job and the value is a dictionary of request parameters for the job |
scattered | bool | True to get events randomly |
Return:
name | type | description |
---|---|---|
return code | bool | True for success, False otherwise |
return_list | [dict,] | A list of return dictionaries. Each dictionary is composed of StatusCode:int and ErrorDiag:string |
def update_event_ranges(self, event_ranges, tmp_log)
This method is used to update events in WMS. It takes a logging instance to feed messages to the caller's log.
Args:
name | type | description |
---|---|---|
event_ranges | {jobID:data,} | A dictionary to event data. The key of the dictionary is the identifier of a job and the value is a list of event dictionaries. |
tmp_log | logging instance | A logging instance of the caller |
Return:
name | type | description |
---|---|---|
return_map | dict | A dictionary composed of StatusCode:int and ErrorDiag:string. The StatusCode is 0 if successful |
def get_commands(self, n_commands)
This method is used to get commands from WMS. Each command is described in a dictionary which is composed of command_id:int, unique identifier of the command, command:string, the command string, params:string, the parameter string, ack_requested:char, Y if the command requires an acknowledge, and creation_date:datetime, creation time of the command.
Args:
name | type | description |
---|---|---|
n_commands | int | The max number of commands |
Return:
name | type | description |
---|---|---|
command_list | [dict,] | A list of command dictionaries |
def ack_commands(self, command_ids)
This method sends acknowledge of commands to WMS.
Args:
name | type | description |
---|---|---|
command_ids | [int,] | A list of command IDs |
Return:
name | type | description |
---|---|---|
return code | bool | True for success, False otherwise |
def update_workers(self, workspec_list)
This method updates workers in WMS.
Args:
name | type | description |
---|---|---|
workspec_list | [WorkSpec,] | A list of Work specifications instances |
Return:
name | type | description |
---|---|---|
return code list | [bool,] | A list of return codes. Nth return code corresponds to Nth WorkSpec. True for success, False otherwise |
error dialog | string | Error dialog if update request is failed |
def is_alive(self, data)
This method sends heartbeat message of the harvester instance to WMS. Heartbeat message is a dictionary which is composed of startTime:datetime, start time of the instance, sw_version:string, software version number of the instance, and commit_stamp:string, commit timestamp of the instance.
Args:
name | type | description |
---|---|---|
data | dict | Heartbeat message |
Return:
name | type | description |
---|---|---|
return code | bool | True for success, False otherwise |
def update_worker_stats(self, site_name, stats)
This method sends statistics of workers at a site to WMS. The statistics is described as a dictionary of worker_status:n_workers.
Args:
name | type | description |
---|---|---|
site_name | string | The site name |
stats | dict | statistics of workers |
Return:
name | type | description |
---|---|---|
return code | bool | True for success, False otherwise |
error dialog | string | Error dialog if any |
def check_jobs(self, jobspec_list)
This method checks if jobs are still active in WMS.
Args:
name | type | description |
---|---|---|
jobspec_list | [JorkSpec,] | A list of job specification instances |
Return:
name | type | description |
---|---|---|
return list | [dict,] | A list of dictionaries. Each dictionary is composed of StatusCode:int, 0 if the job is active, ErrorDiag:string, dialog message if any |
send_dialog_messages(self, dialog_list)
This method sends dialog messages of the harvester instance to WMS.
Args:
name | type | description |
---|---|---|
dialog_list | [DiagSpec,] | A list of dialog instances |
Return:
name | type | description |
---|---|---|
return code | bool | True for success, False otherwise |
error dialog | string | Error dialog if any |
Communicator plugin is configured in the communicator
section in etc/panda/panda_harvester.cfg like
[communicator]
# module name of Communicator
moduleName = pandaharvester.harvestercommunicator.your_communicator
# class name of Communicator
className = XyzCommunicator
# number of connections
nConnections = 5
where you can define module and class names of the plugin and the number of connections to WMS.
Getting started |
---|
Installation and configuration |
Testing and running |
Debugging |
Work with Middleware |
Admin FAQ |
Development guides |
---|
Development workflow |
Tagging |
Production & commissioning |
---|
Scale up submission |
Condor experiences |
Commissioning on the grid |
Production servers |
Service monitoring |
Auto Queue Configuration with CRIC |
SSH+RPC middleware setup |
Kubernetes section |
---|
Kubernetes setup |
X509 credentials |
AWS setup |
GKE setup |
CERN setup |
CVMFS installation |
Generic service accounts |
Advanced payloads |
---|
Horovod integration |