Skip to content

Commit

Permalink
Merge pull request #376 from HSF/dev
Browse files Browse the repository at this point in the history
Dev
  • Loading branch information
wguanicedew authored Dec 23, 2024
2 parents 0c0facc + 66a11c4 commit f6604b1
Show file tree
Hide file tree
Showing 4 changed files with 161 additions and 28 deletions.
97 changes: 80 additions & 17 deletions doma/lib/idds/doma/workflowv2/domapandawork.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
# http://www.apache.org/licenses/LICENSE-2.0OA
#
# Authors:
# - Wen Guan, <[email protected]>, 2020 - 2023
# - Wen Guan, <[email protected]>, 2020 - 2024
# - Sergey Padolski, <[email protected]>, 2020


Expand Down Expand Up @@ -146,6 +146,8 @@ def __init__(self, executable=None, arguments=None, parameters=None, setup=None,
self.additional_task_parameters = {}
self.additional_task_parameters_per_site = {}

self.core_to_queues = {}

def my_condition(self):
if self.is_finished():
return True
Expand Down Expand Up @@ -364,6 +366,13 @@ def set_agent_attributes(self, attrs, req_attributes=None):
except Exception as ex:
self.logger.warn(f"Failed to set additional_task_parameters_per_site: {ex}")

if 'core_to_queues' in self.agent_attributes and self.agent_attributes['core_to_queues']:
try:
self.agent_attributes['core_to_queues'] = json.loads(self.agent_attributes['core_to_queues'])
self.core_to_queues = self.agent_attributes['core_to_queues']
except Exception as ex:
self.logger.warn(f"Failed to set core_to_queues: {ex}")

def depend_on(self, work):
self.logger.debug("checking depending on")
if self.dependency_tasks is None:
Expand Down Expand Up @@ -765,9 +774,9 @@ def create_processing(self, input_output_maps=[]):
# task_param_map['ramUnit'] = 'MB'
task_param_map['ramUnit'] = 'MBPerCoreFixed'
if self.task_rss_retry_offset:
task_param_map['retryRamOffset'] = self.task_rss_retry_offset
task_param_map['retryRamOffset'] = self.task_rss_retry_offset / self.core_count if self.core_count else self.task_rss_retry_offset
if self.task_rss_retry_step:
task_param_map['retryRamStep'] = self.task_rss_retry_step
task_param_map['retryRamStep'] = self.task_rss_retry_step / self.core_count if self.core_count else self.task_rss_retry_step
if self.task_rss_max:
# todo: until PanDA supports it
# taskParamMap['maxRamCount'] = self.task_rss_max
Expand Down Expand Up @@ -842,6 +851,54 @@ def submit_panda_task(self, processing):
except Exception as ex:
self.logger.warn(f"failed to set task parameter map with additional_task_parameters_per_site: {ex}")

if self.core_to_queues:
try:
# core_to_queues = {"1": {"queues": ["Rubin", "Rubin_Extra_Himem"], "processing_type": ""},
# "Rubin_Multi": {"queues": ["Rubin_Multi"], "processing_type": "Rubin_Multi"},
# "Rubin_Merge": {"queues": ["Rubin_Merge"], "processing_type": "Rubin_Merge"},
# "any": "Rubin_Multi"}

if task_param['processingType']:
msg = f"processingType {task_param['processingType']} is already set, do nothing"
self.logger.debug(msg)
else:
num_cores = []
queue_processing_type = {}
for k in self.core_to_queues:
key = str(k)
if not key.isdigit():
num_cores.append(key)
if key not in ['any']:
queues = self.core_to_queues[k].get('queues', [])
processing_type = self.core_to_queues[k].get('processing_type', '')
for q in queues:
queue_processing_type[q] = processing_type

if str(task_param['coreCount']) in num_cores:
p_type = self.core_to_queues.get(str(task_param['coreCount']), {}).get('processing_type', None)
if p_type and not task_param['processingType']:
msg = f"processingType is not defined, set it to {p_type} based on coreCount {task_param['coreCount']}"
task_param['processingType'] = p_type
self.logger.warn(msg)
else:
if task_param['site']:
for q in queue_processing_type:
if task_param['site'] in q or q in task_param['site']:
p_type = queue_processing_type[q]
if p_type:
msg = f"processingType is not defined, set it to {p_type} based on site {task_param['site']}"
task_param['processingType'] = p_type
self.logger.debug(msg)
else:
site = 'any'
p_type = self.core_to_queues.get(site, {}).get('processing_type', None)
if p_type:
msg = f"processingType is not defined, set it to {p_type} based on site 'any'"
task_param['processingType'] = p_type
self.logger.debug(msg)
except Exception as ex:
self.logger.warn(f"failed to set task parameter map with core_to_queues: {ex}")

if self.has_dependency():
parent_tid = None
self.logger.info("parent_workload_id: %s" % self.parent_workload_id)
Expand Down Expand Up @@ -924,8 +981,8 @@ def poll_panda_task_status(self, processing):
if 'processing' in processing['processing_metadata']:
from pandaclient import Client

proc = processing['processing_metadata']['processing']
status, task_status = Client.getTaskStatus(proc.workload_id)
# proc = processing['processing_metadata']['processing']
status, task_status = Client.getTaskStatus(processing['workload_id'])
if status == 0:
return task_status
else:
Expand Down Expand Up @@ -1744,8 +1801,9 @@ def poll_panda_task(self, processing=None, input_output_maps=None, contents_ext=
from pandaclient import Client

if processing:
proc = processing['processing_metadata']['processing']
task_id = proc.workload_id
# proc = processing['processing_metadata']['processing']
# task_id = proc.workload_id
task_id = processing['workload_id']
if task_id is None:
task_id = self.get_panda_task_id(processing)

Expand Down Expand Up @@ -1793,8 +1851,9 @@ def kill_processing(self, processing, log_prefix=''):
try:
if processing:
from pandaclient import Client
proc = processing['processing_metadata']['processing']
task_id = proc.workload_id
# proc = processing['processing_metadata']['processing']
# task_id = proc.workload_id
task_id = processing['workload_id']
# task_id = processing['processing_metadata']['task_id']
# Client.killTask(task_id)
Client.finishTask(task_id, soft=False)
Expand All @@ -1808,8 +1867,9 @@ def kill_processing_force(self, processing, log_prefix=''):
try:
if processing:
from pandaclient import Client
proc = processing['processing_metadata']['processing']
task_id = proc.workload_id
# proc = processing['processing_metadata']['processing']
# task_id = proc.workload_id
task_id = processing['workload_id']
# task_id = processing['processing_metadata']['task_id']
Client.killTask(task_id)
# Client.finishTask(task_id, soft=True)
Expand All @@ -1824,8 +1884,9 @@ def reactivate_processing(self, processing, log_prefix=''):
if processing:
from pandaclient import Client
# task_id = processing['processing_metadata']['task_id']
proc = processing['processing_metadata']['processing']
task_id = proc.workload_id
# proc = processing['processing_metadata']['processing']
# task_id = proc.workload_id
task_id = processing['workload_id']

# Client.retryTask(task_id)
status, out = Client.retryTask(task_id, newParams={})
Expand All @@ -1841,8 +1902,9 @@ def abort_processing(self, processing, log_prefix=''):
try:
has_task = False
if processing:
proc = processing['processing_metadata']['processing']
task_id = proc.workload_id
# proc = processing['processing_metadata']['processing']
# task_id = proc.workload_id
task_id = processing['workload_id']
if task_id:
has_task = True
self.kill_processing_force(processing, log_prefix=log_prefix)
Expand All @@ -1866,8 +1928,9 @@ def has_external_content_id(self):
def get_external_content_ids(self, processing, log_prefix=''):
if processing:
from pandaclient import Client
proc = processing['processing_metadata']['processing']
task_id = proc.workload_id
# proc = processing['processing_metadata']['processing']
# task_id = proc.workload_id
task_id = processing['workload_id']
status, output = Client.get_files_in_datasets(task_id, verbose=False)
if status == 0:
return output
Expand Down
26 changes: 25 additions & 1 deletion main/lib/idds/agents/clerk/clerk.py
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,27 @@ def generate_transform(self, req, work, build=False, iworkflow=False):
if has_previous_conditions:
transform_status = TransformStatus.WaitForTrigger

site = req['site']
if not site:
try:
cloud = None
if hasattr(work, 'task_cloud') and work.task_cloud:
cloud = work.task_cloud

if hasattr(work, 'task_queue') and work.task_queue:
queue = work.task_queue
elif hasattr(work, 'queue') and work.queue:
queue = work.queue
else:
queue = None

task_site = None
if hasattr(work, 'task_site') and work.task_site:
task_site = work.task_site
site = f"{cloud},{task_site},{queue}"
except Exception:
pass

new_transform = {'request_id': req['request_id'],
'workload_id': req['workload_id'],
'transform_type': transform_type,
Expand All @@ -506,7 +527,7 @@ def generate_transform(self, req, work, build=False, iworkflow=False):
'triggered_conditions': triggered_conditions,
'untriggered_conditions': untriggered_conditions,
'loop_index': loop_index,
'site': req['site'],
'site': site,
'transform_metadata': {'internal_id': work.get_internal_id(),
'template_work_id': work.get_template_work_id(),
'sequence_id': work.get_sequence_id(),
Expand Down Expand Up @@ -674,6 +695,9 @@ def get_throttlers(self):
return throttlers

def whether_to_throttle(self, request):
# disable throttler in clerk. throttler will run in transformer
return False

try:
site = request['site']
if site is None:
Expand Down
64 changes: 54 additions & 10 deletions main/lib/idds/agents/transformer/transformer.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,12 +233,40 @@ def get_num_active_contents(self, site_name, active_transform_ids):
default_value = {'new': 0, 'activated': 0, 'processed': 0}
return num_input_contents.get(site_name, default_value), num_output_contents.get(site_name, default_value)

def get_closest_site(self, task_site, throttler_sites):
try:
if ',' in task_site:
cloud, site, queue = task_site.split(",")
else:
# cloud = None
site = task_site
queue = None

# Sort by length (descending) and alphabetically
sorted_sites = sorted(throttler_sites, key=lambda x: (-len(x), x))
for s in sorted_sites:
if queue and queue.startswith(s):
return s
elif site and site.startswith(s):
return s
except Exception as ex:
self.logger.warn(f"Failed to find closest site for {task_site}: {ex}")
return None

def whether_to_throttle(self, transform):
try:
throttlers = self.get_throttlers()

site = transform['site']
if site is None:
site = 'Default'
throttlers = self.get_throttlers()
else:
throttler_sites = [site for site in throttlers]
site = self.get_closest_site(site, throttler_sites)
if site is None:
site = 'Default'
self.logger.info(f"throttler closest site for {transform['site']} is {site}")

num_transforms = self.get_num_active_transforms(site)
num_processings, active_transforms = self.get_num_active_processings(site)
num_input_contents, num_output_contents = self.get_num_active_contents(site, active_transforms)
Expand Down Expand Up @@ -543,17 +571,33 @@ def handle_new_transform_real(self, transform):
work_name_to_coll_map = core_transforms.get_work_name_to_coll_map(request_id=transform['request_id'])
work.set_work_name_to_coll_map(work_name_to_coll_map)

# create processing
new_processing_model = None
processing = work.get_processing(input_output_maps=[], without_creating=False)
self.logger.debug(log_pre + "work get_processing with creating: %s" % processing)
if processing and not processing.processing_id:
new_processing_model = self.generate_processing_model(transform)

proc_work = copy.deepcopy(work)
proc_work.clean_work()
processing.work = proc_work
new_processing_model['processing_metadata'] = {'processing': processing}
processing = work.get_processing(input_output_maps=[], without_creating=True)
self.logger.debug(log_pre + "work get_processing: %s" % processing)
processing_model = core_processings.get_processing(request_id=transform['request_id'], transform_id=transform['transform_id'])
if processing_model:
work.sync_processing(processing, processing_model)
proc = processing_model['processing_metadata']['processing']
work.sync_work_data(status=processing_model['status'], substatus=processing_model['substatus'],
work=proc.work, output_data=processing_model['output_metadata'], processing=proc)
# processing_metadata = processing_model['processing_metadata']
if processing_model['errors']:
work.set_terminated_msg(processing_model['errors'])
# work.set_processing_output_metadata(processing, processing_model['output_metadata'])
work.set_output_data(processing.output_data)
transform['workload_id'] = processing_model['workload_id']
else:
# create processing
processing = work.get_processing(input_output_maps=[], without_creating=False)
self.logger.debug(log_pre + "work get_processing with creating: %s" % processing)
if processing and not processing.processing_id:
new_processing_model = self.generate_processing_model(transform)

proc_work = copy.deepcopy(work)
proc_work.clean_work()
processing.work = proc_work
new_processing_model['processing_metadata'] = {'processing': processing}

transform_parameters = {'status': TransformStatus.Transforming,
'locking': TransformLocking.Idle,
Expand Down
2 changes: 2 additions & 0 deletions main/lib/idds/tests/test_domapanda.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,8 @@ def setup_workflow():
task_name=taskN4.name, task_queue=task_queue3,
encode_command_line=True,
task_priority=981,
core_count=2,
task_rss=32000,
prodSourceLabel='managed',
task_log={"dataset": "PandaJob_#{pandaid}/",
"destination": "local",
Expand Down

0 comments on commit f6604b1

Please sign in to comment.