Skip to content

Commit

Permalink
Merge pull request #135 from wguanicedew/dev
Browse files Browse the repository at this point in the history
optimize
  • Loading branch information
wguanicedew authored Feb 21, 2023
2 parents 96274d8 + 340a2a6 commit 7d13adf
Show file tree
Hide file tree
Showing 11 changed files with 107 additions and 18 deletions.
5 changes: 5 additions & 0 deletions main/lib/idds/agents/carrier/finisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ class Finisher(Poller):

def __init__(self, num_threads=1, poll_time_period=10, retries=3, retrieve_bulk_size=2,
message_bulk_size=1000, **kwargs):
self.set_max_workers()
if hasattr(self, 'finisher_max_number_workers'):
self.max_number_workers = int(self.finisher_max_number_workers)
num_threads = self.max_number_workers

super(Finisher, self).__init__(num_threads=num_threads, name='Finisher',
poll_time_period=poll_time_period, retries=retries,
retrieve_bulk_size=retrieve_bulk_size,
Expand Down
12 changes: 11 additions & 1 deletion main/lib/idds/agents/carrier/poller.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
# http://www.apache.org/licenses/LICENSE-2.0OA
#
# Authors:
# - Wen Guan, <[email protected]>, 2019 - 2022
# - Wen Guan, <[email protected]>, 2019 - 2023

import datetime
import random
Expand Down Expand Up @@ -36,6 +36,9 @@ class Poller(BaseAgent):

def __init__(self, num_threads=1, poll_period=10, retries=3, retrieve_bulk_size=2,
name='Poller', message_bulk_size=1000, **kwargs):
self.set_max_workers()
num_threads = self.max_number_workers

super(Poller, self).__init__(num_threads=num_threads, name=name, **kwargs)
self.config_section = Sections.Carrier
self.poll_period = int(poll_period)
Expand Down Expand Up @@ -184,6 +187,13 @@ def update_processing(self, processing, processing_model):
processing['update_processing']['parameters']['locking'] = ProcessingLocking.Idle
# self.logger.debug("wen: %s" % str(processing))
processing['update_processing']['parameters']['updated_at'] = datetime.datetime.utcnow()
# check update_processing status
if 'status' in processing['update_processing']['parameters']:
new_status = processing['update_processing']['parameters']['status']
if new_status == ProcessingStatus.Submitting and processing_model['status'].value > ProcessingStatus.Submitting.value:
processing['update_processing']['parameters']['status'] = ProcessingStatus.Submitted

self.logger.info(log_prefix + "update_processing: %s" % (processing['update_processing']['parameters']))

retry = True
retry_num = 0
Expand Down
13 changes: 7 additions & 6 deletions main/lib/idds/agents/carrier/receiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from idds.core import messages as core_messages, catalog as core_catalog
from idds.agents.common.baseagent import BaseAgent
# from idds.agents.common.eventbus.event import TerminatedProcessingEvent
from idds.agents.common.eventbus.event import TriggerProcessingEvent
from idds.agents.common.eventbus.event import MsgTriggerProcessingEvent

from .utils import handle_messages_processing

Expand All @@ -36,7 +36,7 @@ class Receiver(BaseAgent):
Receiver works to receive workload management messages to update task/job status.
"""

def __init__(self, num_threads=1, bulk_message_delay=5, bulk_message_size=2000,
def __init__(self, num_threads=1, bulk_message_delay=30, bulk_message_size=2000,
random_delay=None, **kwargs):
super(Receiver, self).__init__(num_threads=num_threads, name='Receiver', **kwargs)
self.config_section = Sections.Carrier
Expand Down Expand Up @@ -108,16 +108,17 @@ def run(self):
for pr_id in update_processings:
# self.logger.info(log_prefix + "TerminatedProcessingEvent(processing_id: %s)" % pr_id)
# event = TerminatedProcessingEvent(publisher_id=self.id, processing_id=pr_id)
self.logger.info(log_prefix + "TriggerProcessingEvent(processing_id: %s)" % pr_id)
event = TriggerProcessingEvent(publisher_id=self.id, processing_id=pr_id)
self.logger.info(log_prefix + "MsgTriggerProcessingEvent(processing_id: %s)" % pr_id)
event = MsgTriggerProcessingEvent(publisher_id=self.id, processing_id=pr_id)
self.event_bus.send(event)

for pr_id in terminated_processings:
self.logger.info(log_prefix + "TriggerProcessingEvent(processing_id: %s)" % pr_id)
event = TriggerProcessingEvent(publisher_id=self.id, processing_id=pr_id, content={'Terminated': True, 'source': 'Receiver'})
self.logger.info(log_prefix + "MsgTriggerProcessingEvent(processing_id: %s)" % pr_id)
event = MsgTriggerProcessingEvent(publisher_id=self.id, processing_id=pr_id, content={'Terminated': True, 'source': 'Receiver'})
self.event_bus.send(event)

time_delay = self.bulk_message_delay - (time.time() - time_start)
time_delay = self.bulk_message_delay
if time_delay > 0:
time.sleep(time_delay)
except IDDSException as error:
Expand Down
3 changes: 3 additions & 0 deletions main/lib/idds/agents/carrier/submitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ class Submitter(Poller):

def __init__(self, num_threads=1, poll_period=10, retries=3, retrieve_bulk_size=2,
name='Submitter', message_bulk_size=1000, **kwargs):
self.set_max_workers()
num_threads = self.max_number_workers

super(Submitter, self).__init__(num_threads=num_threads, name=name, **kwargs)

def get_new_processings(self):
Expand Down
27 changes: 25 additions & 2 deletions main/lib/idds/agents/carrier/trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,21 @@ class Trigger(Poller):

def __init__(self, num_threads=1, poll_period=10, retries=3, retrieve_bulk_size=2,
name='Trigger', message_bulk_size=1000, **kwargs):
self.set_max_workers()
if hasattr(self, 'trigger_max_number_workers'):
self.max_number_workers = int(self.trigger_max_number_workers)

num_threads = self.max_number_workers * 2
super(Trigger, self).__init__(num_threads=num_threads, name=name, **kwargs)

if hasattr(self, 'trigger_max_number_workers'):
self.max_number_workers = int(self.trigger_max_number_workers)
self.number_msg_workers = 0

def is_ok_to_run_more_msg_processings(self):
if self.number_msg_workers >= self.max_number_workers:
return False
return True

def get_trigger_processings(self):
"""
Expand Down Expand Up @@ -154,8 +165,7 @@ def handle_trigger_processing(self, processing, trigger_new_updates=False):
'update_contents': []}
return ret

def process_trigger_processing(self, event):
self.number_workers += 1
def process_trigger_processing_real(self, event):
try:
if event:
original_event = event
Expand Down Expand Up @@ -219,13 +229,26 @@ def process_trigger_processing(self, event):
except Exception as ex:
self.logger.error(ex)
self.logger.error(traceback.format_exc())

def process_trigger_processing(self, event):
self.number_workers += 1
self.process_trigger_processing_real(event)
self.number_workers -= 1

def process_msg_trigger_processing(self, event):
self.number_msg_workers += 1
self.process_trigger_processing_real(event)
self.number_msg_workers -= 1

def init_event_function_map(self):
self.event_func_map = {
EventType.TriggerProcessing: {
'pre_check': self.is_ok_to_run_more_processings,
'exec_func': self.process_trigger_processing
},
EventType.MsgTriggerProcessing: {
'pre_check': self.is_ok_to_run_more_msg_processings,
'exec_func': self.process_msg_trigger_processing
}
}

Expand Down
29 changes: 26 additions & 3 deletions main/lib/idds/agents/carrier/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

import json
import logging

import time

from idds.common.constants import (ProcessingStatus,
CollectionStatus,
Expand Down Expand Up @@ -1162,6 +1162,29 @@ def get_content_id_from_job_id(request_id, workload_id, transform_id, job_id, in
return content_id, to_update_jobid


def whether_to_process_pending_workload_id(workload_id, logger=None, log_prefix=''):
cache = get_redis_cache()
processed_pending_workload_id_map_key = "processed_pending_workload_id_map"
processed_pending_workload_id_map = cache.get(processed_pending_workload_id_map_key, default={})
processed_pending_workload_id_map_time_key = "processed_pending_workload_id_map_time"
processed_pending_workload_id_map_time = cache.get(processed_pending_workload_id_map_time_key, default=None)

workload_id = str(workload_id)
if workload_id in processed_pending_workload_id_map:
return False

processed_pending_workload_id_map[workload_id] = time.time()
if processed_pending_workload_id_map_time is None or processed_pending_workload_id_map_time + 86400 < time.time():
cache.set(processed_pending_workload_id_map_time_key, int(time.time()), expire_seconds=86400)

for workload_id in processed_pending_workload_id_map.keys():
if processed_pending_workload_id_map[workload_id] + 86400 < time.time():
del processed_pending_workload_id_map[workload_id]

cache.set(processed_pending_workload_id_map_key, processed_pending_workload_id_map, expire_seconds=86400)
return True


def handle_messages_processing(messages, logger=None, log_prefix=''):
logger = get_logger(logger)
if not log_prefix:
Expand Down Expand Up @@ -1190,13 +1213,13 @@ def handle_messages_processing(messages, logger=None, log_prefix=''):
status = msg['status']
if status in ['pending']: # 'prepared'
req_id, tf_id, processing_id, r_status, r_substatus = ret_req_tf_pr_id
if r_status < ProcessingStatus.Submitted.value:
if whether_to_process_pending_workload_id(workload_id, logger=logger, log_prefix=log_prefix):
# new_processings.append((req_id, tf_id, processing_id, workload_id, status))
if processing_id not in update_processings:
update_processings.append(processing_id)
logger.debug(log_prefix + "Add to update processing: %s" % str(processing_id))
else:
logger.debug(log_prefix + "Processing %s is already in status(%s), not add it to update processing" % (str(processing_id), r_status))
logger.debug(log_prefix + "Processing %s is already processed, not add it to update processing" % (str(processing_id)))
elif status in ['finished', 'done']:
req_id, tf_id, processing_id, r_status, r_substatus = ret_req_tf_pr_id
# update_processings.append((processing_id, status))
Expand Down
4 changes: 3 additions & 1 deletion main/lib/idds/agents/clerk/clerk.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
# http://www.apache.org/licenses/LICENSE-2.0OA
#
# Authors:
# - Wen Guan, <[email protected]>, 2019 - 2022
# - Wen Guan, <[email protected]>, 2019 - 2023

import datetime
import random
Expand Down Expand Up @@ -42,6 +42,8 @@ class Clerk(BaseAgent):
"""

def __init__(self, num_threads=1, poll_period=10, retrieve_bulk_size=10, pending_time=None, **kwargs):
self.set_max_workers()
num_threads = self.max_number_workers
super(Clerk, self).__init__(num_threads=num_threads, name='Clerk', **kwargs)
self.poll_period = int(poll_period)
self.retrieve_bulk_size = int(retrieve_bulk_size)
Expand Down
7 changes: 7 additions & 0 deletions main/lib/idds/agents/common/baseagent.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,13 @@ def __init__(self, num_threads=1, name=None, logger=None, **kwargs):

self.cache = get_redis_cache()

def set_max_workers(self):
self.number_workers = 0
if not hasattr(self, 'max_number_workers') or not self.max_number_workers:
self.max_number_workers = 3
else:
self.max_number_workers = int(self.max_number_workers)

def get_event_bus(self):
self.event_bus

Expand Down
14 changes: 13 additions & 1 deletion main/lib/idds/agents/common/eventbus/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
# http://www.apache.org/licenses/LICENSE-2.0OA
#
# Authors:
# - Wen Guan, <[email protected]>, 2022
# - Wen Guan, <[email protected]>, 2022 - 2023

import time
import uuid
Expand Down Expand Up @@ -46,6 +46,7 @@ class EventType(Enum):
SyncProcessing = 34
TerminatedProcessing = 35
TriggerProcessing = 36
MsgTriggerProcessing = 37

UpdateCommand = 40

Expand Down Expand Up @@ -278,3 +279,14 @@ def to_json(self):
ret = super(TriggerProcessingEvent, self).to_json()
ret['processing_id'] = self._processing_id
return ret


class MsgTriggerProcessingEvent(Event):
def __init__(self, publisher_id, processing_id, content=None, counter=1):
super(MsgTriggerProcessingEvent, self).__init__(publisher_id, event_type=EventType.MsgTriggerProcessing, content=content, counter=counter)
self._processing_id = processing_id

def to_json(self):
ret = super(MsgTriggerProcessingEvent, self).to_json()
ret['processing_id'] = self._processing_id
return ret
4 changes: 3 additions & 1 deletion main/lib/idds/agents/transformer/transformer.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
# http://www.apache.org/licenses/LICENSE-2.0OA
#
# Authors:
# - Wen Guan, <[email protected]>, 2019 - 2022
# - Wen Guan, <[email protected]>, 2019 - 2023

import copy
import datetime
Expand Down Expand Up @@ -40,6 +40,8 @@ class Transformer(BaseAgent):

def __init__(self, num_threads=1, poll_period=1800, retries=3, retrieve_bulk_size=10,
message_bulk_size=10000, **kwargs):
self.set_max_workers()
num_threads = self.max_number_workers
super(Transformer, self).__init__(num_threads=num_threads, name='Transformer', **kwargs)
self.config_section = Sections.Transformer
self.poll_period = int(poll_period)
Expand Down
7 changes: 4 additions & 3 deletions main/lib/idds/tests/panda_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@
jediTaskID = 146329
ret = Client.getJediTaskDetails({'jediTaskID': jediTaskID}, True, True, verbose=False)
print(ret)
ret = Client.getTaskStatus(jediTaskID, verbose=False)
print(ret)
# ret = Client.getTaskStatus(jediTaskID, verbose=False)
# print(ret)

# ret = Client.getTaskStatus(jediTaskID, verbose=False)
# print(ret)
Expand Down Expand Up @@ -100,7 +100,8 @@
# task_ids = [i for i in range(140349, 140954)] + [142268, 142651]
# task_ids = [1851] + [i for i in range(4336, 4374)] + [i for i in range(133965, 136025)]
# task_ids = [832, 2347, 3045, 66860, 67036] + [i for i in range(121273, 140349)]
task_ids = []
task_ids = [i for i in range(144088, 144111)] + [144891, 144892]
# task_ids = []
for task_id in task_ids:
print("Killing %s" % task_id)
Client.killTask(task_id)
Expand Down

0 comments on commit 7d13adf

Please sign in to comment.