Skip to content

Commit

Permalink
Merge pull request #282 from HSF/dev
Browse files Browse the repository at this point in the history
use a different threadpool for new spawn threads
  • Loading branch information
wguanicedew authored Feb 23, 2024
2 parents 60a5c9c + 2a0df67 commit 95c717e
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 3 deletions.
11 changes: 10 additions & 1 deletion main/lib/idds/agents/carrier/poller.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,15 @@ def __init__(self, num_threads=1, max_number_workers=3, poll_period=10, retries=

self.show_queue_size_time = None

self.extra_executors = None

def get_extra_executors(self):
if self.enable_executors:
if self.extra_executors is None:
name = self.executor_name + "_Extra"
self.extra_executors = self.create_executors(name, max_workers=self.num_threads)
return self.extra_executors

def is_ok_to_run_more_processings(self):
if self.number_workers >= self.max_number_workers:
return False
Expand Down Expand Up @@ -297,7 +306,7 @@ def handle_update_processing(self, processing):
log_prefix = self.get_log_prefix(processing)
executors = None
if self.enable_executors:
executors = self.executors
executors = self.get_extra_executors()

ret_handle_update_processing = handle_update_processing(processing,
self.agent_attributes,
Expand Down
3 changes: 2 additions & 1 deletion main/lib/idds/agents/carrier/submitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,8 @@ def handle_new_processing(self, processing):
# work = transform['transform_metadata']['work']
executors = None
if self.enable_executors:
executors = self.executors
executors = self.get_extra_executors()

ret_new_processing = handle_new_processing(processing,
self.agent_attributes,
func_site_to_cloud=self.get_site_to_cloud,
Expand Down
2 changes: 1 addition & 1 deletion main/lib/idds/agents/carrier/trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ def handle_trigger_processing(self, processing, trigger_new_updates=False):
log_prefix = self.get_log_prefix(processing)
executors = None
if self.enable_executors:
executors = self.executors
executors = self.get_extra_executors()

ret_trigger_processing = handle_trigger_processing(processing,
self.agent_attributes,
Expand Down
5 changes: 5 additions & 0 deletions main/lib/idds/agents/common/timerscheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ def __init__(self, num_threads, name=None, logger=None):
if self.num_threads < 1:
self.num_threads = 1
self.graceful_stop = threading.Event()
self.executor_name = name
self.executors = IDDSThreadPoolExecutor(max_workers=self.num_threads,
thread_name_prefix=name)

Expand All @@ -79,6 +80,10 @@ def set_logger(self, logger):
def stop(self, signum=None, frame=None):
self.graceful_stop.set()

def create_executors(self, name, max_workers=1):
executors = IDDSThreadPoolExecutor(max_workers=max_workers, thread_name_prefix=name)
return executors

def create_task(self, task_func, task_output_queue=None, task_args=tuple(), task_kwargs={}, delay_time=10, priority=1):
return TimerTask(task_func, task_output_queue, task_args, task_kwargs, delay_time, priority, self.logger)

Expand Down

0 comments on commit 95c717e

Please sign in to comment.