diff --git a/main/lib/idds/agents/carrier/poller.py b/main/lib/idds/agents/carrier/poller.py index 40d6be3f..a2ab47e2 100644 --- a/main/lib/idds/agents/carrier/poller.py +++ b/main/lib/idds/agents/carrier/poller.py @@ -97,6 +97,15 @@ def __init__(self, num_threads=1, max_number_workers=3, poll_period=10, retries= self.show_queue_size_time = None + self.extra_executors = None + + def get_extra_executors(self): + if self.enable_executors: + if self.extra_executors is None: + name = self.executor_name + "_Extra" + self.extra_executors = self.create_executors(name, max_workers=self.num_threads) + return self.extra_executors + def is_ok_to_run_more_processings(self): if self.number_workers >= self.max_number_workers: return False @@ -297,7 +306,7 @@ def handle_update_processing(self, processing): log_prefix = self.get_log_prefix(processing) executors = None if self.enable_executors: - executors = self.executors + executors = self.get_extra_executors() ret_handle_update_processing = handle_update_processing(processing, self.agent_attributes, diff --git a/main/lib/idds/agents/carrier/submitter.py b/main/lib/idds/agents/carrier/submitter.py index 4a21e3c1..83fde5cf 100644 --- a/main/lib/idds/agents/carrier/submitter.py +++ b/main/lib/idds/agents/carrier/submitter.py @@ -113,7 +113,8 @@ def handle_new_processing(self, processing): # work = transform['transform_metadata']['work'] executors = None if self.enable_executors: - executors = self.executors + executors = self.get_extra_executors() + ret_new_processing = handle_new_processing(processing, self.agent_attributes, func_site_to_cloud=self.get_site_to_cloud, diff --git a/main/lib/idds/agents/carrier/trigger.py b/main/lib/idds/agents/carrier/trigger.py index 0f19c551..6adbd37f 100644 --- a/main/lib/idds/agents/carrier/trigger.py +++ b/main/lib/idds/agents/carrier/trigger.py @@ -104,7 +104,7 @@ def handle_trigger_processing(self, processing, trigger_new_updates=False): log_prefix = self.get_log_prefix(processing) executors = None if self.enable_executors: - executors = self.executors + executors = self.get_extra_executors() ret_trigger_processing = handle_trigger_processing(processing, self.agent_attributes, diff --git a/main/lib/idds/agents/common/timerscheduler.py b/main/lib/idds/agents/common/timerscheduler.py index b1fb654a..8bbd0cef 100644 --- a/main/lib/idds/agents/common/timerscheduler.py +++ b/main/lib/idds/agents/common/timerscheduler.py @@ -61,6 +61,7 @@ def __init__(self, num_threads, name=None, logger=None): if self.num_threads < 1: self.num_threads = 1 self.graceful_stop = threading.Event() + self.executor_name = name self.executors = IDDSThreadPoolExecutor(max_workers=self.num_threads, thread_name_prefix=name) @@ -79,6 +80,10 @@ def set_logger(self, logger): def stop(self, signum=None, frame=None): self.graceful_stop.set() + def create_executors(self, name, max_workers=1): + executors = IDDSThreadPoolExecutor(max_workers=max_workers, thread_name_prefix=name) + return executors + def create_task(self, task_func, task_output_queue=None, task_args=tuple(), task_kwargs={}, delay_time=10, priority=1): return TimerTask(task_func, task_output_queue, task_args, task_kwargs, delay_time, priority, self.logger)