diff --git a/main/lib/idds/agents/carrier/finisher.py b/main/lib/idds/agents/carrier/finisher.py index 036f7197..c89a89eb 100644 --- a/main/lib/idds/agents/carrier/finisher.py +++ b/main/lib/idds/agents/carrier/finisher.py @@ -32,6 +32,11 @@ class Finisher(Poller): def __init__(self, num_threads=1, poll_time_period=10, retries=3, retrieve_bulk_size=2, message_bulk_size=1000, **kwargs): + self.set_max_workers() + if hasattr(self, 'finisher_max_number_workers'): + self.max_number_workers = int(self.finisher_max_number_workers) + num_threads = self.max_number_workers + super(Finisher, self).__init__(num_threads=num_threads, name='Finisher', poll_time_period=poll_time_period, retries=retries, retrieve_bulk_size=retrieve_bulk_size, diff --git a/main/lib/idds/agents/carrier/poller.py b/main/lib/idds/agents/carrier/poller.py index 7661ee47..d3645a2c 100644 --- a/main/lib/idds/agents/carrier/poller.py +++ b/main/lib/idds/agents/carrier/poller.py @@ -6,7 +6,7 @@ # http://www.apache.org/licenses/LICENSE-2.0OA # # Authors: -# - Wen Guan, , 2019 - 2022 +# - Wen Guan, , 2019 - 2023 import datetime import random @@ -36,6 +36,9 @@ class Poller(BaseAgent): def __init__(self, num_threads=1, poll_period=10, retries=3, retrieve_bulk_size=2, name='Poller', message_bulk_size=1000, **kwargs): + self.set_max_workers() + num_threads = self.max_number_workers + super(Poller, self).__init__(num_threads=num_threads, name=name, **kwargs) self.config_section = Sections.Carrier self.poll_period = int(poll_period) @@ -184,6 +187,13 @@ def update_processing(self, processing, processing_model): processing['update_processing']['parameters']['locking'] = ProcessingLocking.Idle # self.logger.debug("wen: %s" % str(processing)) processing['update_processing']['parameters']['updated_at'] = datetime.datetime.utcnow() + # check update_processing status + if 'status' in processing['update_processing']['parameters']: + new_status = processing['update_processing']['parameters']['status'] + if new_status == ProcessingStatus.Submitting and processing_model['status'].value > ProcessingStatus.Submitting.value: + processing['update_processing']['parameters']['status'] = ProcessingStatus.Submitted + + self.logger.info(log_prefix + "update_processing: %s" % (processing['update_processing']['parameters'])) retry = True retry_num = 0 diff --git a/main/lib/idds/agents/carrier/receiver.py b/main/lib/idds/agents/carrier/receiver.py index 1d62cf4a..5a991f20 100644 --- a/main/lib/idds/agents/carrier/receiver.py +++ b/main/lib/idds/agents/carrier/receiver.py @@ -24,7 +24,7 @@ from idds.core import messages as core_messages, catalog as core_catalog from idds.agents.common.baseagent import BaseAgent # from idds.agents.common.eventbus.event import TerminatedProcessingEvent -from idds.agents.common.eventbus.event import TriggerProcessingEvent +from idds.agents.common.eventbus.event import MsgTriggerProcessingEvent from .utils import handle_messages_processing @@ -36,7 +36,7 @@ class Receiver(BaseAgent): Receiver works to receive workload management messages to update task/job status. """ - def __init__(self, num_threads=1, bulk_message_delay=5, bulk_message_size=2000, + def __init__(self, num_threads=1, bulk_message_delay=30, bulk_message_size=2000, random_delay=None, **kwargs): super(Receiver, self).__init__(num_threads=num_threads, name='Receiver', **kwargs) self.config_section = Sections.Carrier @@ -108,16 +108,17 @@ def run(self): for pr_id in update_processings: # self.logger.info(log_prefix + "TerminatedProcessingEvent(processing_id: %s)" % pr_id) # event = TerminatedProcessingEvent(publisher_id=self.id, processing_id=pr_id) - self.logger.info(log_prefix + "TriggerProcessingEvent(processing_id: %s)" % pr_id) - event = TriggerProcessingEvent(publisher_id=self.id, processing_id=pr_id) + self.logger.info(log_prefix + "MsgTriggerProcessingEvent(processing_id: %s)" % pr_id) + event = MsgTriggerProcessingEvent(publisher_id=self.id, processing_id=pr_id) self.event_bus.send(event) for pr_id in terminated_processings: - self.logger.info(log_prefix + "TriggerProcessingEvent(processing_id: %s)" % pr_id) - event = TriggerProcessingEvent(publisher_id=self.id, processing_id=pr_id, content={'Terminated': True, 'source': 'Receiver'}) + self.logger.info(log_prefix + "MsgTriggerProcessingEvent(processing_id: %s)" % pr_id) + event = MsgTriggerProcessingEvent(publisher_id=self.id, processing_id=pr_id, content={'Terminated': True, 'source': 'Receiver'}) self.event_bus.send(event) time_delay = self.bulk_message_delay - (time.time() - time_start) + time_delay = self.bulk_message_delay if time_delay > 0: time.sleep(time_delay) except IDDSException as error: diff --git a/main/lib/idds/agents/carrier/submitter.py b/main/lib/idds/agents/carrier/submitter.py index 3d79561c..f6d170a6 100644 --- a/main/lib/idds/agents/carrier/submitter.py +++ b/main/lib/idds/agents/carrier/submitter.py @@ -32,6 +32,9 @@ class Submitter(Poller): def __init__(self, num_threads=1, poll_period=10, retries=3, retrieve_bulk_size=2, name='Submitter', message_bulk_size=1000, **kwargs): + self.set_max_workers() + num_threads = self.max_number_workers + super(Submitter, self).__init__(num_threads=num_threads, name=name, **kwargs) def get_new_processings(self): diff --git a/main/lib/idds/agents/carrier/trigger.py b/main/lib/idds/agents/carrier/trigger.py index 7baa3379..779241a5 100644 --- a/main/lib/idds/agents/carrier/trigger.py +++ b/main/lib/idds/agents/carrier/trigger.py @@ -34,10 +34,21 @@ class Trigger(Poller): def __init__(self, num_threads=1, poll_period=10, retries=3, retrieve_bulk_size=2, name='Trigger', message_bulk_size=1000, **kwargs): + self.set_max_workers() + if hasattr(self, 'trigger_max_number_workers'): + self.max_number_workers = int(self.trigger_max_number_workers) + + num_threads = self.max_number_workers * 2 super(Trigger, self).__init__(num_threads=num_threads, name=name, **kwargs) if hasattr(self, 'trigger_max_number_workers'): self.max_number_workers = int(self.trigger_max_number_workers) + self.number_msg_workers = 0 + + def is_ok_to_run_more_msg_processings(self): + if self.number_msg_workers >= self.max_number_workers: + return False + return True def get_trigger_processings(self): """ @@ -154,8 +165,7 @@ def handle_trigger_processing(self, processing, trigger_new_updates=False): 'update_contents': []} return ret - def process_trigger_processing(self, event): - self.number_workers += 1 + def process_trigger_processing_real(self, event): try: if event: original_event = event @@ -219,13 +229,26 @@ def process_trigger_processing(self, event): except Exception as ex: self.logger.error(ex) self.logger.error(traceback.format_exc()) + + def process_trigger_processing(self, event): + self.number_workers += 1 + self.process_trigger_processing_real(event) self.number_workers -= 1 + def process_msg_trigger_processing(self, event): + self.number_msg_workers += 1 + self.process_trigger_processing_real(event) + self.number_msg_workers -= 1 + def init_event_function_map(self): self.event_func_map = { EventType.TriggerProcessing: { 'pre_check': self.is_ok_to_run_more_processings, 'exec_func': self.process_trigger_processing + }, + EventType.MsgTriggerProcessing: { + 'pre_check': self.is_ok_to_run_more_msg_processings, + 'exec_func': self.process_msg_trigger_processing } } diff --git a/main/lib/idds/agents/carrier/utils.py b/main/lib/idds/agents/carrier/utils.py index 5481b3af..7188e99e 100644 --- a/main/lib/idds/agents/carrier/utils.py +++ b/main/lib/idds/agents/carrier/utils.py @@ -10,7 +10,7 @@ import json import logging - +import time from idds.common.constants import (ProcessingStatus, CollectionStatus, @@ -1162,6 +1162,29 @@ def get_content_id_from_job_id(request_id, workload_id, transform_id, job_id, in return content_id, to_update_jobid +def whether_to_process_pending_workload_id(workload_id, logger=None, log_prefix=''): + cache = get_redis_cache() + processed_pending_workload_id_map_key = "processed_pending_workload_id_map" + processed_pending_workload_id_map = cache.get(processed_pending_workload_id_map_key, default={}) + processed_pending_workload_id_map_time_key = "processed_pending_workload_id_map_time" + processed_pending_workload_id_map_time = cache.get(processed_pending_workload_id_map_time_key, default=None) + + workload_id = str(workload_id) + if workload_id in processed_pending_workload_id_map: + return False + + processed_pending_workload_id_map[workload_id] = time.time() + if processed_pending_workload_id_map_time is None or processed_pending_workload_id_map_time + 86400 < time.time(): + cache.set(processed_pending_workload_id_map_time_key, int(time.time()), expire_seconds=86400) + + for workload_id in processed_pending_workload_id_map.keys(): + if processed_pending_workload_id_map[workload_id] + 86400 < time.time(): + del processed_pending_workload_id_map[workload_id] + + cache.set(processed_pending_workload_id_map_key, processed_pending_workload_id_map, expire_seconds=86400) + return True + + def handle_messages_processing(messages, logger=None, log_prefix=''): logger = get_logger(logger) if not log_prefix: @@ -1190,13 +1213,13 @@ def handle_messages_processing(messages, logger=None, log_prefix=''): status = msg['status'] if status in ['pending']: # 'prepared' req_id, tf_id, processing_id, r_status, r_substatus = ret_req_tf_pr_id - if r_status < ProcessingStatus.Submitted.value: + if whether_to_process_pending_workload_id(workload_id, logger=logger, log_prefix=log_prefix): # new_processings.append((req_id, tf_id, processing_id, workload_id, status)) if processing_id not in update_processings: update_processings.append(processing_id) logger.debug(log_prefix + "Add to update processing: %s" % str(processing_id)) else: - logger.debug(log_prefix + "Processing %s is already in status(%s), not add it to update processing" % (str(processing_id), r_status)) + logger.debug(log_prefix + "Processing %s is already processed, not add it to update processing" % (str(processing_id))) elif status in ['finished', 'done']: req_id, tf_id, processing_id, r_status, r_substatus = ret_req_tf_pr_id # update_processings.append((processing_id, status)) diff --git a/main/lib/idds/agents/clerk/clerk.py b/main/lib/idds/agents/clerk/clerk.py index 8686b00d..9e085ba1 100644 --- a/main/lib/idds/agents/clerk/clerk.py +++ b/main/lib/idds/agents/clerk/clerk.py @@ -6,7 +6,7 @@ # http://www.apache.org/licenses/LICENSE-2.0OA # # Authors: -# - Wen Guan, , 2019 - 2022 +# - Wen Guan, , 2019 - 2023 import datetime import random @@ -42,6 +42,8 @@ class Clerk(BaseAgent): """ def __init__(self, num_threads=1, poll_period=10, retrieve_bulk_size=10, pending_time=None, **kwargs): + self.set_max_workers() + num_threads = self.max_number_workers super(Clerk, self).__init__(num_threads=num_threads, name='Clerk', **kwargs) self.poll_period = int(poll_period) self.retrieve_bulk_size = int(retrieve_bulk_size) diff --git a/main/lib/idds/agents/common/baseagent.py b/main/lib/idds/agents/common/baseagent.py index 14e1f65f..9f2f99dc 100644 --- a/main/lib/idds/agents/common/baseagent.py +++ b/main/lib/idds/agents/common/baseagent.py @@ -66,6 +66,13 @@ def __init__(self, num_threads=1, name=None, logger=None, **kwargs): self.cache = get_redis_cache() + def set_max_workers(self): + self.number_workers = 0 + if not hasattr(self, 'max_number_workers') or not self.max_number_workers: + self.max_number_workers = 3 + else: + self.max_number_workers = int(self.max_number_workers) + def get_event_bus(self): self.event_bus diff --git a/main/lib/idds/agents/common/eventbus/event.py b/main/lib/idds/agents/common/eventbus/event.py index f133ff0e..5f2af27c 100644 --- a/main/lib/idds/agents/common/eventbus/event.py +++ b/main/lib/idds/agents/common/eventbus/event.py @@ -6,7 +6,7 @@ # http://www.apache.org/licenses/LICENSE-2.0OA # # Authors: -# - Wen Guan, , 2022 +# - Wen Guan, , 2022 - 2023 import time import uuid @@ -46,6 +46,7 @@ class EventType(Enum): SyncProcessing = 34 TerminatedProcessing = 35 TriggerProcessing = 36 + MsgTriggerProcessing = 37 UpdateCommand = 40 @@ -278,3 +279,14 @@ def to_json(self): ret = super(TriggerProcessingEvent, self).to_json() ret['processing_id'] = self._processing_id return ret + + +class MsgTriggerProcessingEvent(Event): + def __init__(self, publisher_id, processing_id, content=None, counter=1): + super(MsgTriggerProcessingEvent, self).__init__(publisher_id, event_type=EventType.MsgTriggerProcessing, content=content, counter=counter) + self._processing_id = processing_id + + def to_json(self): + ret = super(MsgTriggerProcessingEvent, self).to_json() + ret['processing_id'] = self._processing_id + return ret diff --git a/main/lib/idds/agents/transformer/transformer.py b/main/lib/idds/agents/transformer/transformer.py index df295cc8..a140c9d8 100644 --- a/main/lib/idds/agents/transformer/transformer.py +++ b/main/lib/idds/agents/transformer/transformer.py @@ -6,7 +6,7 @@ # http://www.apache.org/licenses/LICENSE-2.0OA # # Authors: -# - Wen Guan, , 2019 - 2022 +# - Wen Guan, , 2019 - 2023 import copy import datetime @@ -40,6 +40,8 @@ class Transformer(BaseAgent): def __init__(self, num_threads=1, poll_period=1800, retries=3, retrieve_bulk_size=10, message_bulk_size=10000, **kwargs): + self.set_max_workers() + num_threads = self.max_number_workers super(Transformer, self).__init__(num_threads=num_threads, name='Transformer', **kwargs) self.config_section = Sections.Transformer self.poll_period = int(poll_period) diff --git a/main/lib/idds/tests/panda_test.py b/main/lib/idds/tests/panda_test.py index 2f4508d3..a82f9196 100644 --- a/main/lib/idds/tests/panda_test.py +++ b/main/lib/idds/tests/panda_test.py @@ -33,8 +33,8 @@ jediTaskID = 146329 ret = Client.getJediTaskDetails({'jediTaskID': jediTaskID}, True, True, verbose=False) print(ret) -ret = Client.getTaskStatus(jediTaskID, verbose=False) -print(ret) +# ret = Client.getTaskStatus(jediTaskID, verbose=False) +# print(ret) # ret = Client.getTaskStatus(jediTaskID, verbose=False) # print(ret) @@ -100,7 +100,8 @@ # task_ids = [i for i in range(140349, 140954)] + [142268, 142651] # task_ids = [1851] + [i for i in range(4336, 4374)] + [i for i in range(133965, 136025)] # task_ids = [832, 2347, 3045, 66860, 67036] + [i for i in range(121273, 140349)] -task_ids = [] +task_ids = [i for i in range(144088, 144111)] + [144891, 144892] +# task_ids = [] for task_id in task_ids: print("Killing %s" % task_id) Client.killTask(task_id)