diff --git a/main/lib/idds/agents/clerk/clerk.py b/main/lib/idds/agents/clerk/clerk.py index a3fc3f8d..83578eda 100644 --- a/main/lib/idds/agents/clerk/clerk.py +++ b/main/lib/idds/agents/clerk/clerk.py @@ -57,6 +57,8 @@ def __init__(self, num_threads=1, max_number_workers=8, poll_period=10, retrieve self.poll_period = int(poll_period) self.retrieve_bulk_size = int(retrieve_bulk_size) self.config_section = Sections.Clerk + self.start_at = time.time() + if pending_time: self.pending_time = float(pending_time) else: @@ -152,9 +154,24 @@ def get_new_requests(self): self.show_queue_size() + if time.time() < self.start_at + 3600: + if BaseAgent.poll_new_min_request_id_times % 30 == 0: + # get_new_requests is called every 10 seconds. 30 * 10 = 300 seconds, which is 5 minutes. + min_request_id = BaseAgent.min_request_id - 1000 + else: + min_request_id = BaseAgent.min_request_id + else: + if BaseAgent.poll_new_min_request_id_times % 180 == 0: + # get_new_requests is called every 10 seconds. 180 * 10 = 300 seconds, which is 30 minutes. + min_request_id = BaseAgent.min_request_id - 1000 + else: + min_request_id = BaseAgent.min_request_id + + BaseAgent.poll_new_min_request_id_times += 1 + req_status = [RequestStatus.New, RequestStatus.Extend, RequestStatus.Built, RequestStatus.Throttling] reqs_new = core_requests.get_requests_by_status_type(status=req_status, locking=True, - not_lock=True, min_request_id=BaseAgent.min_request_id, + not_lock=True, min_request_id=min_request_id, new_poll=True, only_return_id=True, bulk_size=self.retrieve_bulk_size) @@ -164,6 +181,7 @@ def get_new_requests(self): events = [] for req_id in reqs_new: + BaseAgent.min_request_id_cache[req_id] = time.time() if BaseAgent.min_request_id is None or BaseAgent.min_request_id > req_id: BaseAgent.min_request_id = req_id core_requests.set_min_request_id(BaseAgent.min_request_id) @@ -192,6 +210,21 @@ def get_running_requests(self): self.show_queue_size() + if time.time() < self.start_at + 3600: + if BaseAgent.poll_running_min_request_id_times % 30 == 0: + # get_new_requests is called every 10 seconds. 30 * 10 = 300 seconds, which is 5 minutes. + min_request_id = BaseAgent.min_request_id - 1000 + else: + min_request_id = BaseAgent.min_request_id + else: + if BaseAgent.poll_running_min_request_id_times % 180 == 0: + # get_new_requests is called every 10 seconds. 180 * 10 = 1800 seconds, which is 30 minutes. + min_request_id = BaseAgent.min_request_id - 1000 + else: + min_request_id = BaseAgent.min_request_id + + BaseAgent.poll_running_min_request_id_times += 1 + req_status = [RequestStatus.Transforming, RequestStatus.ToCancel, RequestStatus.Cancelling, RequestStatus.ToSuspend, RequestStatus.Suspending, RequestStatus.ToExpire, RequestStatus.Expiring, @@ -199,7 +232,7 @@ def get_running_requests(self): RequestStatus.ToResume, RequestStatus.Resuming, RequestStatus.Building] reqs = core_requests.get_requests_by_status_type(status=req_status, time_period=None, - min_request_id=BaseAgent.min_request_id, + min_request_id=min_request_id, locking=True, bulk_size=self.retrieve_bulk_size, not_lock=True, update_poll=True, only_return_id=True) @@ -209,6 +242,7 @@ def get_running_requests(self): events = [] for req_id in reqs: + BaseAgent.min_request_id_cache[req_id] = time.time() if BaseAgent.min_request_id is None or BaseAgent.min_request_id > req_id: BaseAgent.min_request_id = req_id core_requests.set_min_request_id(BaseAgent.min_request_id) @@ -261,6 +295,7 @@ def get_operation_requests(self): if BaseAgent.min_request_id is None or BaseAgent.min_request_id > request_id: BaseAgent.min_request_id = request_id + BaseAgent.min_request_id_cache[request_id] = time.time() core_requests.set_min_request_id(BaseAgent.min_request_id) event = None @@ -295,6 +330,32 @@ def get_operation_requests(self): self.logger.error(traceback.format_exc()) return [] + def clean_min_request_id(self): + try: + if BaseAgent.checking_min_request_id_times <= 0: + old_min_request_id = core_requests.get_min_request_id() + self.logger.info("old_min_request_id: %s" % old_min_request_id) + if not old_min_request_id: + min_request_id = 0 + else: + min_request_id = old_min_request_id - 1000 + BaseAgent.min_request_id = min_request_id + else: + for req_id in BaseAgent.min_request_id_cache: + time_stamp = BaseAgent.min_request_id_cache[req_id] + if time_stamp < time.time() - 12 * 3600: # older than 12 hours + del BaseAgent.min_request_id_cache[req_id] + + if BaseAgent.min_request_id_cache: + min_request_id = min(list(BaseAgent.min_request_id_cache.keys())) + BaseAgent.min_request_id = min_request_id + core_requests.set_min_request_id(BaseAgent.min_request_id) + + BaseAgent.checking_min_request_id_times += 1 + except Exception as ex: + self.logger.error(ex) + self.logger.error(traceback.format_exc()) + def get_request(self, request_id, status=None, locking=False): try: return core_requests.get_request_by_id_status(request_id=request_id, status=status, locking=locking) @@ -1311,6 +1372,8 @@ def run(self): self.add_task(task) task = self.create_task(task_func=self.get_operation_requests, task_output_queue=None, task_args=tuple(), task_kwargs={}, delay_time=10, priority=1) self.add_task(task) + task = self.create_task(task_func=self.clean_min_request_id, task_output_queue=None, task_args=tuple(), task_kwargs={}, delay_time=3600, priority=1) + self.add_task(task) task = self.create_task(task_func=self.clean_locks, task_output_queue=None, task_args=tuple(), task_kwargs={}, delay_time=1800, priority=1) self.add_task(task) diff --git a/main/lib/idds/agents/common/baseagent.py b/main/lib/idds/agents/common/baseagent.py index aa6a9ae3..7b017514 100644 --- a/main/lib/idds/agents/common/baseagent.py +++ b/main/lib/idds/agents/common/baseagent.py @@ -6,7 +6,7 @@ # http://www.apache.org/licenses/LICENSE-2.0OA # # Authors: -# - Wen Guan, , 2019 - 2023 +# - Wen Guan, , 2019 - 2024 import os import socket @@ -37,6 +37,10 @@ class BaseAgent(TimerScheduler, PluginBase): """ min_request_id = None + min_request_id_cache = {} + checking_min_request_id_times = 0 + poll_new_min_request_id_times = 0 + poll_running_min_request_id_times = 0 def __init__(self, num_threads=1, name=None, logger=None, **kwargs): super(BaseAgent, self).__init__(num_threads, name=name)