diff --git a/doma/lib/idds/doma/workflowv2/domapandawork.py b/doma/lib/idds/doma/workflowv2/domapandawork.py index 6cdaf01d..d460cd92 100644 --- a/doma/lib/idds/doma/workflowv2/domapandawork.py +++ b/doma/lib/idds/doma/workflowv2/domapandawork.py @@ -17,6 +17,7 @@ import concurrent.futures import datetime +import json import os import time import traceback @@ -143,6 +144,7 @@ def __init__(self, executable=None, arguments=None, parameters=None, setup=None, self.dependency_map_deleted = [] self.additional_task_parameters = {} + self.additional_task_parameters_per_site = {} def my_condition(self): if self.is_finished(): @@ -341,12 +343,27 @@ def set_agent_attributes(self, attrs, req_attributes=None): if not self.additional_task_parameters: self.additional_task_parameters = {} try: + self.agent_attributes['additional_task_parameters'] = json.loads(self.agent_attributes['additional_task_parameters']) for key, value in self.agent_attributes['additional_task_parameters'].items(): if key not in self.additional_task_parameters: self.additional_task_parameters[key] = value except Exception as ex: self.logger.warn(f"Failed to set additional_task_parameters: {ex}") + if 'additional_task_parameters_per_site' in self.agent_attributes and self.agent_attributes['additional_task_parameters_per_site']: + if not self.additional_task_parameters_per_site: + self.additional_task_parameters_per_site = {} + try: + self.agent_attributes['additional_task_parameters_per_site'] = json.loads(self.agent_attributes['additional_task_parameters_per_site']) + for site in self.agent_attributes['additional_task_parameters_per_site']: + if site not in self.additional_task_parameters_per_site: + self.additional_task_parameters_per_site[site] = {} + for key, value in self.agent_attributes['additional_task_parameters_per_site'][site].items(): + if key not in self.additional_task_parameters_per_site[site]: + self.additional_task_parameters_per_site[site][key] = value + except Exception as ex: + self.logger.warn(f"Failed to set additional_task_parameters_per_site: {ex}") + def depend_on(self, work): self.logger.debug("checking depending on") if self.dependency_tasks is None: @@ -776,14 +793,6 @@ def create_processing(self, input_output_maps=[]): task_param_map['reqID'] = self.get_request_id() - if self.additional_task_parameters: - try: - for key, value in self.additional_task_parameters.items(): - if key not in task_param_map: - task_param_map[key] = value - except Exception as ex: - self.logger.warn(f"failed to set task parameter map with additional_task_parameters: {ex}") - processing_metadata = {'task_param': task_param_map} proc = Processing(processing_metadata=processing_metadata) proc.workload_id = None @@ -816,6 +825,23 @@ def submit_panda_task(self, processing): self.logger.info(f"Task cloud was set to {task_param['cloud']}, which is different from {cloud}, reset it to {cloud}") task_param['cloud'] = cloud + if self.additional_task_parameters: + try: + for key, value in self.additional_task_parameters.items(): + if key not in task_param: + task_param[key] = value + except Exception as ex: + self.logger.warn(f"failed to set task parameter map with additional_task_parameters: {ex}") + if self.additional_task_parameters_per_site: + try: + for site in self.additional_task_parameters_per_site: + if (task_param['PandaSite'] and site in task_param['PandaSite']) or (task_param['site'] and site in task_param['site']): + for key, value in self.additional_task_parameters_per_site[site].items(): + if key not in task_param: + task_param[key] = value + except Exception as ex: + self.logger.warn(f"failed to set task parameter map with additional_task_parameters_per_site: {ex}") + if self.has_dependency(): parent_tid = None self.logger.info("parent_workload_id: %s" % self.parent_workload_id)