Skip to content

Commit

Permalink
add options to support additional task parameters per site
Browse files Browse the repository at this point in the history
  • Loading branch information
wguanicedew committed Dec 11, 2024
1 parent c2f8cb2 commit c837999
Showing 1 changed file with 34 additions and 8 deletions.
42 changes: 34 additions & 8 deletions doma/lib/idds/doma/workflowv2/domapandawork.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import concurrent.futures
import datetime
import json
import os
import time
import traceback
Expand Down Expand Up @@ -143,6 +144,7 @@ def __init__(self, executable=None, arguments=None, parameters=None, setup=None,
self.dependency_map_deleted = []

self.additional_task_parameters = {}
self.additional_task_parameters_per_site = {}

def my_condition(self):
if self.is_finished():
Expand Down Expand Up @@ -341,12 +343,27 @@ def set_agent_attributes(self, attrs, req_attributes=None):
if not self.additional_task_parameters:
self.additional_task_parameters = {}
try:
self.agent_attributes['additional_task_parameters'] = json.loads(self.agent_attributes['additional_task_parameters'])
for key, value in self.agent_attributes['additional_task_parameters'].items():
if key not in self.additional_task_parameters:
self.additional_task_parameters[key] = value
except Exception as ex:
self.logger.warn(f"Failed to set additional_task_parameters: {ex}")

if 'additional_task_parameters_per_site' in self.agent_attributes and self.agent_attributes['additional_task_parameters_per_site']:
if not self.additional_task_parameters_per_site:
self.additional_task_parameters_per_site = {}
try:
self.agent_attributes['additional_task_parameters_per_site'] = json.loads(self.agent_attributes['additional_task_parameters_per_site'])
for site in self.agent_attributes['additional_task_parameters_per_site']:
if site not in self.additional_task_parameters_per_site:
self.additional_task_parameters_per_site[site] = {}
for key, value in self.agent_attributes['additional_task_parameters_per_site'][site].items():
if key not in self.additional_task_parameters_per_site[site]:
self.additional_task_parameters_per_site[site][key] = value
except Exception as ex:
self.logger.warn(f"Failed to set additional_task_parameters_per_site: {ex}")

def depend_on(self, work):
self.logger.debug("checking depending on")
if self.dependency_tasks is None:
Expand Down Expand Up @@ -776,14 +793,6 @@ def create_processing(self, input_output_maps=[]):

task_param_map['reqID'] = self.get_request_id()

if self.additional_task_parameters:
try:
for key, value in self.additional_task_parameters.items():
if key not in task_param_map:
task_param_map[key] = value
except Exception as ex:
self.logger.warn(f"failed to set task parameter map with additional_task_parameters: {ex}")

processing_metadata = {'task_param': task_param_map}
proc = Processing(processing_metadata=processing_metadata)
proc.workload_id = None
Expand Down Expand Up @@ -816,6 +825,23 @@ def submit_panda_task(self, processing):
self.logger.info(f"Task cloud was set to {task_param['cloud']}, which is different from {cloud}, reset it to {cloud}")
task_param['cloud'] = cloud

if self.additional_task_parameters:
try:
for key, value in self.additional_task_parameters.items():
if key not in task_param:
task_param[key] = value
except Exception as ex:
self.logger.warn(f"failed to set task parameter map with additional_task_parameters: {ex}")
if self.additional_task_parameters_per_site:
try:
for site in self.additional_task_parameters_per_site:
if (task_param['PandaSite'] and site in task_param['PandaSite']) or (task_param['site'] and site in task_param['site']):
for key, value in self.additional_task_parameters_per_site[site].items():
if key not in task_param:
task_param[key] = value
except Exception as ex:
self.logger.warn(f"failed to set task parameter map with additional_task_parameters_per_site: {ex}")

if self.has_dependency():
parent_tid = None
self.logger.info("parent_workload_id: %s" % self.parent_workload_id)
Expand Down

0 comments on commit c837999

Please sign in to comment.