diff --git a/doma/lib/idds/doma/workflowv2/domapandawork.py b/doma/lib/idds/doma/workflowv2/domapandawork.py index c4d11978..72a11561 100644 --- a/doma/lib/idds/doma/workflowv2/domapandawork.py +++ b/doma/lib/idds/doma/workflowv2/domapandawork.py @@ -658,7 +658,9 @@ def poll_panda_task_status(self, processing): def get_processing_status_from_panda_status(self, task_status): if task_status in ['registered', 'defined', 'assigning']: processing_status = ProcessingStatus.Submitting - elif task_status in ['ready', 'pending', 'scouting', 'scouted', 'prepared', 'topreprocess', 'preprocessing']: + elif task_status in ['ready', 'scouting', 'scouted', 'prepared', 'topreprocess', 'preprocessing']: + processing_status = ProcessingStatus.Submitting + elif task_status in ['pending']: processing_status = ProcessingStatus.Submitted elif task_status in ['running', 'toretry', 'toincexec', 'throttled']: processing_status = ProcessingStatus.Running diff --git a/main/etc/sql/oracle_update.sql b/main/etc/sql/oracle_update.sql index 6a0594c1..e0ab29d9 100644 --- a/main/etc/sql/oracle_update.sql +++ b/main/etc/sql/oracle_update.sql @@ -274,7 +274,7 @@ END; CREATE OR REPLACE procedure update_contents_from_others(request_id_in NUMBER, transform_id_in NUMBER) AS BEGIN update (select c.content_id, c.substatus as c_substatus, t.substatus as t_substatus from contents c inner join - (select content_id, substatus from contents where request_id = request_id_in and content_relation_type = 1 and status != 0) t + (select content_id, substatus from contents where request_id = request_id_in and content_relation_type = 1) t on c.content_dep_id = t.content_id where c.request_id = request_id_in and c.transform_id = transform_id_in and c.substatus != t.substatus) set c_substatus = t_substatus; END; @@ -282,6 +282,12 @@ END; CREATE OR REPLACE procedure update_contents_to_others(request_id_in NUMBER, transform_id_in NUMBER) AS BEGIN update (select c.content_id, c.substatus as c_substatus, t.substatus as t_substatus from contents c inner join - (select content_id, substatus from contents where request_id = request_id_in and transform_id = transform_id_in and content_relation_type = 1 and status != substatus) t + (select content_id, substatus from contents where request_id = request_id_in and transform_id = transform_id_in and content_relation_type = 1) t on c.content_dep_id = t.content_id where c.request_id = request_id_in and c.substatus != t.substatus) set c_substatus = t_substatus; END; + + + +-- 2023.02.14 +drop index CONTENTS_REQ_TF_COLL_IDX +CREATE INDEX CONTENTS_REQ_TF_COLL_IDX ON CONTENTS (request_id, transform_id, workload_id, coll_id, status, substatus) LOCAL; diff --git a/main/lib/idds/agents/carrier/finisher.py b/main/lib/idds/agents/carrier/finisher.py index 036f7197..c89a89eb 100644 --- a/main/lib/idds/agents/carrier/finisher.py +++ b/main/lib/idds/agents/carrier/finisher.py @@ -32,6 +32,11 @@ class Finisher(Poller): def __init__(self, num_threads=1, poll_time_period=10, retries=3, retrieve_bulk_size=2, message_bulk_size=1000, **kwargs): + self.set_max_workers() + if hasattr(self, 'finisher_max_number_workers'): + self.max_number_workers = int(self.finisher_max_number_workers) + num_threads = self.max_number_workers + super(Finisher, self).__init__(num_threads=num_threads, name='Finisher', poll_time_period=poll_time_period, retries=retries, retrieve_bulk_size=retrieve_bulk_size, diff --git a/main/lib/idds/agents/carrier/poller.py b/main/lib/idds/agents/carrier/poller.py index 5eede04f..d3645a2c 100644 --- a/main/lib/idds/agents/carrier/poller.py +++ b/main/lib/idds/agents/carrier/poller.py @@ -6,7 +6,7 @@ # http://www.apache.org/licenses/LICENSE-2.0OA # # Authors: -# - Wen Guan, , 2019 - 2022 +# - Wen Guan, , 2019 - 2023 import datetime import random @@ -36,6 +36,9 @@ class Poller(BaseAgent): def __init__(self, num_threads=1, poll_period=10, retries=3, retrieve_bulk_size=2, name='Poller', message_bulk_size=1000, **kwargs): + self.set_max_workers() + num_threads = self.max_number_workers + super(Poller, self).__init__(num_threads=num_threads, name=name, **kwargs) self.config_section = Sections.Carrier self.poll_period = int(poll_period) @@ -184,6 +187,13 @@ def update_processing(self, processing, processing_model): processing['update_processing']['parameters']['locking'] = ProcessingLocking.Idle # self.logger.debug("wen: %s" % str(processing)) processing['update_processing']['parameters']['updated_at'] = datetime.datetime.utcnow() + # check update_processing status + if 'status' in processing['update_processing']['parameters']: + new_status = processing['update_processing']['parameters']['status'] + if new_status == ProcessingStatus.Submitting and processing_model['status'].value > ProcessingStatus.Submitting.value: + processing['update_processing']['parameters']['status'] = ProcessingStatus.Submitted + + self.logger.info(log_prefix + "update_processing: %s" % (processing['update_processing']['parameters'])) retry = True retry_num = 0 @@ -372,6 +382,7 @@ def process_update_processing(self, event): event_content['has_updates'] = True if is_process_terminated(pr['substatus']): event_content['Terminated'] = True + self.logger.info(log_pre + "TriggerProcessingEvent(processing_id: %s)" % pr['processing_id']) event = TriggerProcessingEvent(publisher_id=self.id, processing_id=pr['processing_id'], content=event_content, counter=original_event._counter) self.event_bus.send(event) diff --git a/main/lib/idds/agents/carrier/receiver.py b/main/lib/idds/agents/carrier/receiver.py index 1d62cf4a..5a991f20 100644 --- a/main/lib/idds/agents/carrier/receiver.py +++ b/main/lib/idds/agents/carrier/receiver.py @@ -24,7 +24,7 @@ from idds.core import messages as core_messages, catalog as core_catalog from idds.agents.common.baseagent import BaseAgent # from idds.agents.common.eventbus.event import TerminatedProcessingEvent -from idds.agents.common.eventbus.event import TriggerProcessingEvent +from idds.agents.common.eventbus.event import MsgTriggerProcessingEvent from .utils import handle_messages_processing @@ -36,7 +36,7 @@ class Receiver(BaseAgent): Receiver works to receive workload management messages to update task/job status. """ - def __init__(self, num_threads=1, bulk_message_delay=5, bulk_message_size=2000, + def __init__(self, num_threads=1, bulk_message_delay=30, bulk_message_size=2000, random_delay=None, **kwargs): super(Receiver, self).__init__(num_threads=num_threads, name='Receiver', **kwargs) self.config_section = Sections.Carrier @@ -108,16 +108,17 @@ def run(self): for pr_id in update_processings: # self.logger.info(log_prefix + "TerminatedProcessingEvent(processing_id: %s)" % pr_id) # event = TerminatedProcessingEvent(publisher_id=self.id, processing_id=pr_id) - self.logger.info(log_prefix + "TriggerProcessingEvent(processing_id: %s)" % pr_id) - event = TriggerProcessingEvent(publisher_id=self.id, processing_id=pr_id) + self.logger.info(log_prefix + "MsgTriggerProcessingEvent(processing_id: %s)" % pr_id) + event = MsgTriggerProcessingEvent(publisher_id=self.id, processing_id=pr_id) self.event_bus.send(event) for pr_id in terminated_processings: - self.logger.info(log_prefix + "TriggerProcessingEvent(processing_id: %s)" % pr_id) - event = TriggerProcessingEvent(publisher_id=self.id, processing_id=pr_id, content={'Terminated': True, 'source': 'Receiver'}) + self.logger.info(log_prefix + "MsgTriggerProcessingEvent(processing_id: %s)" % pr_id) + event = MsgTriggerProcessingEvent(publisher_id=self.id, processing_id=pr_id, content={'Terminated': True, 'source': 'Receiver'}) self.event_bus.send(event) time_delay = self.bulk_message_delay - (time.time() - time_start) + time_delay = self.bulk_message_delay if time_delay > 0: time.sleep(time_delay) except IDDSException as error: diff --git a/main/lib/idds/agents/carrier/submitter.py b/main/lib/idds/agents/carrier/submitter.py index 3d79561c..f6d170a6 100644 --- a/main/lib/idds/agents/carrier/submitter.py +++ b/main/lib/idds/agents/carrier/submitter.py @@ -32,6 +32,9 @@ class Submitter(Poller): def __init__(self, num_threads=1, poll_period=10, retries=3, retrieve_bulk_size=2, name='Submitter', message_bulk_size=1000, **kwargs): + self.set_max_workers() + num_threads = self.max_number_workers + super(Submitter, self).__init__(num_threads=num_threads, name=name, **kwargs) def get_new_processings(self): diff --git a/main/lib/idds/agents/carrier/trigger.py b/main/lib/idds/agents/carrier/trigger.py index 47c50983..779241a5 100644 --- a/main/lib/idds/agents/carrier/trigger.py +++ b/main/lib/idds/agents/carrier/trigger.py @@ -34,19 +34,31 @@ class Trigger(Poller): def __init__(self, num_threads=1, poll_period=10, retries=3, retrieve_bulk_size=2, name='Trigger', message_bulk_size=1000, **kwargs): + self.set_max_workers() + if hasattr(self, 'trigger_max_number_workers'): + self.max_number_workers = int(self.trigger_max_number_workers) + + num_threads = self.max_number_workers * 2 super(Trigger, self).__init__(num_threads=num_threads, name=name, **kwargs) if hasattr(self, 'trigger_max_number_workers'): self.max_number_workers = int(self.trigger_max_number_workers) + self.number_msg_workers = 0 + + def is_ok_to_run_more_msg_processings(self): + if self.number_msg_workers >= self.max_number_workers: + return False + return True def get_trigger_processings(self): """ Get trigger processing """ try: + self.show_queue_size() if not self.is_ok_to_run_more_processings(): return [] - self.show_queue_size() + # self.show_queue_size() processing_status = [ProcessingStatus.ToTrigger, ProcessingStatus.Triggering] processings = core_processings.get_processings_by_status(status=processing_status, @@ -153,8 +165,7 @@ def handle_trigger_processing(self, processing, trigger_new_updates=False): 'update_contents': []} return ret - def process_trigger_processing(self, event): - self.number_workers += 1 + def process_trigger_processing_real(self, event): try: if event: original_event = event @@ -171,26 +182,39 @@ def process_trigger_processing(self, event): new_update_contents = ret.get('new_update_contents', None) ret['new_update_contents'] = None + ret_update_contents = ret.get('update_contents', None) self.update_processing(ret, pr) - if new_update_contents: + if new_update_contents or ret_update_contents: + self.logger.info(log_pre + "update_contents_to_others_by_dep_id") core_catalog.update_contents_to_others_by_dep_id(request_id=pr['request_id'], transform_id=pr['transform_id']) + self.logger.info(log_pre + "update_contents_to_others_by_dep_id done") + # core_catalog.delete_contents_update(request_id=pr['request_id'], transform_id=pr['transform_id']) update_transforms = core_catalog.get_updated_transforms_by_content_status(request_id=pr['request_id']) self.logger.info(log_pre + "update_transforms: %s" % str(update_transforms)) for update_transform in update_transforms: if 'transform_id' in update_transform: update_transform_id = update_transform['transform_id'] - event = UpdateTransformEvent(publisher_id=self.id, - transform_id=update_transform_id, - content={'event': 'Trigger'}) - self.logger.info(log_pre + "Trigger UpdateTransformEvent(transform_id: %s" % update_transform_id) - self.event_bus.send(event) + if update_transform_id != pr['transform_id']: + event = UpdateTransformEvent(publisher_id=self.id, + transform_id=update_transform_id, + content={'event': 'Trigger'}) + self.logger.info(log_pre + "Trigger UpdateTransformEvent(transform_id: %s)" % update_transform_id) + self.event_bus.send(event) + else: + ret1 = self.handle_trigger_processing(pr) + new_update_contents = ret1.get('new_update_contents', None) + ret1['new_update_contents'] = None + self.update_processing(ret1, pr) + # pass if (('processing_status' in ret and ret['processing_status'] == ProcessingStatus.Terminating) or (event._content and 'Terminated' in event._content and event._content['Terminated'])): # noqa W503 self.logger.info(log_pre + "TerminatedProcessingEvent(processing_id: %s)" % pr['processing_id']) - event = TerminatedProcessingEvent(publisher_id=self.id, processing_id=pr['processing_id'], content=event._content, + event = TerminatedProcessingEvent(publisher_id=self.id, + processing_id=pr['processing_id'], + content=event._content, counter=original_event._counter) self.event_bus.send(event) else: @@ -205,13 +229,26 @@ def process_trigger_processing(self, event): except Exception as ex: self.logger.error(ex) self.logger.error(traceback.format_exc()) + + def process_trigger_processing(self, event): + self.number_workers += 1 + self.process_trigger_processing_real(event) self.number_workers -= 1 + def process_msg_trigger_processing(self, event): + self.number_msg_workers += 1 + self.process_trigger_processing_real(event) + self.number_msg_workers -= 1 + def init_event_function_map(self): self.event_func_map = { EventType.TriggerProcessing: { 'pre_check': self.is_ok_to_run_more_processings, 'exec_func': self.process_trigger_processing + }, + EventType.MsgTriggerProcessing: { + 'pre_check': self.is_ok_to_run_more_msg_processings, + 'exec_func': self.process_msg_trigger_processing } } diff --git a/main/lib/idds/agents/carrier/utils.py b/main/lib/idds/agents/carrier/utils.py index 975c852f..7188e99e 100644 --- a/main/lib/idds/agents/carrier/utils.py +++ b/main/lib/idds/agents/carrier/utils.py @@ -10,7 +10,7 @@ import json import logging - +import time from idds.common.constants import (ProcessingStatus, CollectionStatus, @@ -993,7 +993,10 @@ def handle_trigger_processing(processing, agent_attributes, trigger_new_updates= # logger.debug(log_prefix + "delete_contents_update: %s" % str(ret_update_transforms)) pass + logger.debug(log_prefix + "update_contents_from_others_by_dep_id") core_catalog.update_contents_from_others_by_dep_id(request_id=request_id, transform_id=transform_id) + logger.debug(log_prefix + "update_contents_from_others_by_dep_id done") + input_output_maps = get_input_output_maps(transform_id, work) logger.debug(log_prefix + "input_output_maps.keys[:2]: %s" % str(list(input_output_maps.keys())[:2])) @@ -1025,7 +1028,8 @@ def handle_trigger_processing(processing, agent_attributes, trigger_new_updates= # return processing['substatus'], content_updates, ret_msgs, {}, {}, {}, new_update_contents, ret_update_transforms # return processing['substatus'], content_updates, ret_msgs, {}, update_dep_contents_status_name, update_dep_contents_status, [], ret_update_transforms - return processing['substatus'], content_updates, ret_msgs, {}, {}, {}, [], ret_update_transforms + # return processing['substatus'], content_updates, ret_msgs, {}, {}, {}, [], ret_update_transforms + return processing['substatus'], content_updates, ret_msgs, {}, {}, {}, new_update_contents, ret_update_transforms def get_content_status_from_panda_msg_status(status): @@ -1071,7 +1075,7 @@ def get_workload_id_transform_id_map(workload_id): return None request_ids = [] - if not workload_id_transform_id_map or workload_id not in workload_id_transform_id_map: + if not workload_id_transform_id_map or workload_id not in workload_id_transform_id_map or len(workload_id_transform_id_map[workload_id]) < 5: processing_status = [ProcessingStatus.New, ProcessingStatus.Submitting, ProcessingStatus.Submitted, ProcessingStatus.Running, ProcessingStatus.FinishedOnExec, @@ -1087,7 +1091,11 @@ def get_workload_id_transform_id_map(workload_id): processing = proc['processing_metadata']['processing'] work = processing.work if work.use_dependency_to_release_jobs(): - workload_id_transform_id_map[proc['workload_id']] = (proc['request_id'], proc['transform_id'], proc['processing_id']) + workload_id_transform_id_map[proc['workload_id']] = (proc['request_id'], + proc['transform_id'], + proc['processing_id'], + proc['status'].value, + proc['substatus'].value) if proc['request_id'] not in request_ids: request_ids.append(proc['request_id']) @@ -1154,6 +1162,29 @@ def get_content_id_from_job_id(request_id, workload_id, transform_id, job_id, in return content_id, to_update_jobid +def whether_to_process_pending_workload_id(workload_id, logger=None, log_prefix=''): + cache = get_redis_cache() + processed_pending_workload_id_map_key = "processed_pending_workload_id_map" + processed_pending_workload_id_map = cache.get(processed_pending_workload_id_map_key, default={}) + processed_pending_workload_id_map_time_key = "processed_pending_workload_id_map_time" + processed_pending_workload_id_map_time = cache.get(processed_pending_workload_id_map_time_key, default=None) + + workload_id = str(workload_id) + if workload_id in processed_pending_workload_id_map: + return False + + processed_pending_workload_id_map[workload_id] = time.time() + if processed_pending_workload_id_map_time is None or processed_pending_workload_id_map_time + 86400 < time.time(): + cache.set(processed_pending_workload_id_map_time_key, int(time.time()), expire_seconds=86400) + + for workload_id in processed_pending_workload_id_map.keys(): + if processed_pending_workload_id_map[workload_id] + 86400 < time.time(): + del processed_pending_workload_id_map[workload_id] + + cache.set(processed_pending_workload_id_map_key, processed_pending_workload_id_map, expire_seconds=86400) + return True + + def handle_messages_processing(messages, logger=None, log_prefix=''): logger = get_logger(logger) if not log_prefix: @@ -1175,20 +1206,26 @@ def handle_messages_processing(messages, logger=None, log_prefix=''): continue logger.debug(log_prefix + "Received message: %s" % str(ori_msg)) + logger.debug(log_prefix + "(request_id, transform_id, processing_id, status, substatus): %s" % str(ret_req_tf_pr_id)) if msg['msg_type'] in ['task_status']: workload_id = msg['taskid'] status = msg['status'] if status in ['pending']: # 'prepared' - req_id, tf_id, processing_id = ret_req_tf_pr_id - # new_processings.append((req_id, tf_id, processing_id, workload_id, status)) - if processing_id not in update_processings: - update_processings.append(processing_id) + req_id, tf_id, processing_id, r_status, r_substatus = ret_req_tf_pr_id + if whether_to_process_pending_workload_id(workload_id, logger=logger, log_prefix=log_prefix): + # new_processings.append((req_id, tf_id, processing_id, workload_id, status)) + if processing_id not in update_processings: + update_processings.append(processing_id) + logger.debug(log_prefix + "Add to update processing: %s" % str(processing_id)) + else: + logger.debug(log_prefix + "Processing %s is already processed, not add it to update processing" % (str(processing_id))) elif status in ['finished', 'done']: - req_id, tf_id, processing_id = ret_req_tf_pr_id + req_id, tf_id, processing_id, r_status, r_substatus = ret_req_tf_pr_id # update_processings.append((processing_id, status)) if processing_id not in update_processings: terminated_processings.append(processing_id) + logger.debug(log_prefix + "Add to terminated processing: %s" % str(processing_id)) if msg['msg_type'] in ['job_status']: workload_id = msg['taskid'] @@ -1196,7 +1233,7 @@ def handle_messages_processing(messages, logger=None, log_prefix=''): status = msg['status'] inputs = msg['inputs'] if inputs and status in ['finished']: - req_id, tf_id, processing_id = ret_req_tf_pr_id + req_id, tf_id, processing_id, r_status, r_substatus = ret_req_tf_pr_id content_id, to_update_jobid = get_content_id_from_job_id(req_id, workload_id, tf_id, job_id, inputs) if content_id: if to_update_jobid: @@ -1212,6 +1249,7 @@ def handle_messages_processing(messages, logger=None, log_prefix=''): update_contents.append(u_content) if processing_id not in update_processings: update_processings.append(processing_id) + logger.debug(log_prefix + "Add to update processing: %s" % str(processing_id)) return update_processings, terminated_processings, update_contents, [] diff --git a/main/lib/idds/agents/clerk/clerk.py b/main/lib/idds/agents/clerk/clerk.py index 8686b00d..9e085ba1 100644 --- a/main/lib/idds/agents/clerk/clerk.py +++ b/main/lib/idds/agents/clerk/clerk.py @@ -6,7 +6,7 @@ # http://www.apache.org/licenses/LICENSE-2.0OA # # Authors: -# - Wen Guan, , 2019 - 2022 +# - Wen Guan, , 2019 - 2023 import datetime import random @@ -42,6 +42,8 @@ class Clerk(BaseAgent): """ def __init__(self, num_threads=1, poll_period=10, retrieve_bulk_size=10, pending_time=None, **kwargs): + self.set_max_workers() + num_threads = self.max_number_workers super(Clerk, self).__init__(num_threads=num_threads, name='Clerk', **kwargs) self.poll_period = int(poll_period) self.retrieve_bulk_size = int(retrieve_bulk_size) diff --git a/main/lib/idds/agents/common/baseagent.py b/main/lib/idds/agents/common/baseagent.py index 14e1f65f..9f2f99dc 100644 --- a/main/lib/idds/agents/common/baseagent.py +++ b/main/lib/idds/agents/common/baseagent.py @@ -66,6 +66,13 @@ def __init__(self, num_threads=1, name=None, logger=None, **kwargs): self.cache = get_redis_cache() + def set_max_workers(self): + self.number_workers = 0 + if not hasattr(self, 'max_number_workers') or not self.max_number_workers: + self.max_number_workers = 3 + else: + self.max_number_workers = int(self.max_number_workers) + def get_event_bus(self): self.event_bus diff --git a/main/lib/idds/agents/common/eventbus/event.py b/main/lib/idds/agents/common/eventbus/event.py index f133ff0e..5f2af27c 100644 --- a/main/lib/idds/agents/common/eventbus/event.py +++ b/main/lib/idds/agents/common/eventbus/event.py @@ -6,7 +6,7 @@ # http://www.apache.org/licenses/LICENSE-2.0OA # # Authors: -# - Wen Guan, , 2022 +# - Wen Guan, , 2022 - 2023 import time import uuid @@ -46,6 +46,7 @@ class EventType(Enum): SyncProcessing = 34 TerminatedProcessing = 35 TriggerProcessing = 36 + MsgTriggerProcessing = 37 UpdateCommand = 40 @@ -278,3 +279,14 @@ def to_json(self): ret = super(TriggerProcessingEvent, self).to_json() ret['processing_id'] = self._processing_id return ret + + +class MsgTriggerProcessingEvent(Event): + def __init__(self, publisher_id, processing_id, content=None, counter=1): + super(MsgTriggerProcessingEvent, self).__init__(publisher_id, event_type=EventType.MsgTriggerProcessing, content=content, counter=counter) + self._processing_id = processing_id + + def to_json(self): + ret = super(MsgTriggerProcessingEvent, self).to_json() + ret['processing_id'] = self._processing_id + return ret diff --git a/main/lib/idds/agents/transformer/transformer.py b/main/lib/idds/agents/transformer/transformer.py index df295cc8..a140c9d8 100644 --- a/main/lib/idds/agents/transformer/transformer.py +++ b/main/lib/idds/agents/transformer/transformer.py @@ -6,7 +6,7 @@ # http://www.apache.org/licenses/LICENSE-2.0OA # # Authors: -# - Wen Guan, , 2019 - 2022 +# - Wen Guan, , 2019 - 2023 import copy import datetime @@ -40,6 +40,8 @@ class Transformer(BaseAgent): def __init__(self, num_threads=1, poll_period=1800, retries=3, retrieve_bulk_size=10, message_bulk_size=10000, **kwargs): + self.set_max_workers() + num_threads = self.max_number_workers super(Transformer, self).__init__(num_threads=num_threads, name='Transformer', **kwargs) self.config_section = Sections.Transformer self.poll_period = int(poll_period) diff --git a/main/lib/idds/core/catalog.py b/main/lib/idds/core/catalog.py index b96a2717..b468432b 100644 --- a/main/lib/idds/core/catalog.py +++ b/main/lib/idds/core/catalog.py @@ -622,7 +622,7 @@ def get_output_contents_by_request_id_status(request_id, name, content_status, l @read_session -def get_updated_transforms_by_content_status(request_id=None, transform_id=None): +def get_updated_transforms_by_content_status(request_id=None, transform_id=None, session=None): """ Get updated transform ids by content status @@ -631,7 +631,7 @@ def get_updated_transforms_by_content_status(request_id=None, transform_id=None) :returns list """ - return orm_contents.get_updated_transforms_by_content_status(request_id=request_id, transform_id=transform_id) + return orm_contents.get_updated_transforms_by_content_status(request_id=request_id, transform_id=transform_id, session=session) @transactional_session diff --git a/main/lib/idds/orm/base/models.py b/main/lib/idds/orm/base/models.py index 979ed359..139fed9f 100644 --- a/main/lib/idds/orm/base/models.py +++ b/main/lib/idds/orm/base/models.py @@ -561,7 +561,7 @@ class Content(BASE, ModelBase): Index('CONTENTS_STATUS_UPDATED_IDX', 'status', 'locking', 'updated_at', 'created_at'), Index('CONTENTS_ID_NAME_IDX', 'coll_id', 'scope', 'name', 'status'), Index('CONTENTS_DEP_IDX', 'request_id', 'transform_id', 'content_dep_id'), - Index('CONTENTS_REQ_TF_COLL_IDX', 'request_id', 'transform_id', 'coll_id', 'status')) + Index('CONTENTS_REQ_TF_COLL_IDX', 'request_id', 'transform_id', 'workload_id', 'coll_id', 'status', 'substatus')) class Content_update(BASE, ModelBase): diff --git a/main/lib/idds/orm/contents.py b/main/lib/idds/orm/contents.py index 731c53b2..9a24290c 100644 --- a/main/lib/idds/orm/contents.py +++ b/main/lib/idds/orm/contents.py @@ -505,7 +505,7 @@ def update_contents_to_others_by_dep_id(request_id=None, transform_id=None, sess :param transfomr_id: The transform id. """ try: - idds_proc = sqlalchemy.text("CALL %s.update_contents_from_others(:request_id, :transform_id)" % session.schema) + idds_proc = sqlalchemy.text("CALL %s.update_contents_to_others(:request_id, :transform_id)" % session.schema) session.execute(idds_proc, {"request_id": request_id, "transform_id": transform_id}) except Exception as ex: raise ex @@ -540,10 +540,10 @@ def get_updated_transforms_by_content_status(request_id=None, transform_id=None, models.Content.transform_id, models.Content.workload_id, models.Content.coll_id) - query = query.with_hint(models.Content, "INDEX(CONTENTS CONTENTS_ID_NAME_IDX)", 'oracle') + query = query.with_hint(models.Content, "INDEX(CONTENTS CONTENTS_REQ_TF_COLL_IDX)", 'oracle') if request_id: - query = query.filter(models.Content.request_id_id == request_id) + query = query.filter(models.Content.request_id == request_id) if transform_id: query = query.filter(models.Content.transform_id == transform_id) diff --git a/main/lib/idds/tests/core_tests_dep_id.py b/main/lib/idds/tests/core_tests_dep_id.py index c0a8dc45..7d44d84f 100644 --- a/main/lib/idds/tests/core_tests_dep_id.py +++ b/main/lib/idds/tests/core_tests_dep_id.py @@ -13,12 +13,19 @@ from idds.workflowv2.work import Work # noqa F401 from idds.orm import contents as orm_contents # noqa F401 +from idds.core import catalog as core_catalog # noqa F401 setup_logging(__name__) -request_id = 486 -transform_id = 3027 # 3028, 3029 +request_id = 3347 +transform_id = 26788 # 3028, 3029 -ret = orm_contents.update_contents_from_others_by_dep_id(request_id=request_id, transform_id=transform_id) +# ret = core_catalog.update_contents_to_others_by_dep_id(request_id=request_id, transform_id=transform_id) +# print(ret) + +# ret = core_catalog.update_contents_from_others_by_dep_id(request_id=request_id, transform_id=transform_id) +# print(ret) + +ret = core_catalog.get_updated_transforms_by_content_status(request_id=3350) print(ret) diff --git a/main/lib/idds/tests/panda_test.py b/main/lib/idds/tests/panda_test.py index a2f48a33..a82f9196 100644 --- a/main/lib/idds/tests/panda_test.py +++ b/main/lib/idds/tests/panda_test.py @@ -30,9 +30,11 @@ """ jediTaskID = 10517 # 10607 -jediTaskID = 59725 +jediTaskID = 146329 ret = Client.getJediTaskDetails({'jediTaskID': jediTaskID}, True, True, verbose=False) print(ret) +# ret = Client.getTaskStatus(jediTaskID, verbose=False) +# print(ret) # ret = Client.getTaskStatus(jediTaskID, verbose=False) # print(ret) @@ -97,7 +99,9 @@ # task_ids = [i for i in range(142507, 142651)] # task_ids = [i for i in range(140349, 140954)] + [142268, 142651] # task_ids = [1851] + [i for i in range(4336, 4374)] + [i for i in range(133965, 136025)] -task_ids = [832, 2347, 3045, 66860, 67036] + [i for i in range(121273, 140349)] +# task_ids = [832, 2347, 3045, 66860, 67036] + [i for i in range(121273, 140349)] +task_ids = [i for i in range(144088, 144111)] + [144891, 144892] +# task_ids = [] for task_id in task_ids: print("Killing %s" % task_id) Client.killTask(task_id) diff --git a/main/lib/idds/tests/test_domapanda.py b/main/lib/idds/tests/test_domapanda.py index 44b62503..7b00702e 100644 --- a/main/lib/idds/tests/test_domapanda.py +++ b/main/lib/idds/tests/test_domapanda.py @@ -44,7 +44,12 @@ # task_queue = 'SLAC_TEST' # task_queue = 'DOMA_LSST_SLAC_TEST' task_queue = 'SLAC_Rubin' -# task_queue = 'CC-IN2P3_TEST' + +task_cloud = 'EU' +task_queue = 'CC-IN2P3_TEST' + +task_cloud = 'EU' +task_queue = 'LANCS_TEST' def randStr(chars=string.ascii_lowercase + string.digits, N=10): diff --git a/monitor/data/conf.js b/monitor/data/conf.js index 69e7d923..9b9020c8 100644 --- a/monitor/data/conf.js +++ b/monitor/data/conf.js @@ -1,9 +1,9 @@ var appConfig = { - 'iddsAPI_request': "https://lxplus8s15.cern.ch:443/idds/monitor_request/null/null", - 'iddsAPI_transform': "https://lxplus8s15.cern.ch:443/idds/monitor_transform/null/null", - 'iddsAPI_processing': "https://lxplus8s15.cern.ch:443/idds/monitor_processing/null/null", - 'iddsAPI_request_detail': "https://lxplus8s15.cern.ch:443/idds/monitor/null/null/true/false/false", - 'iddsAPI_transform_detail': "https://lxplus8s15.cern.ch:443/idds/monitor/null/null/false/true/false", - 'iddsAPI_processing_detail': "https://lxplus8s15.cern.ch:443/idds/monitor/null/null/false/false/true" + 'iddsAPI_request': "https://lxplus8s16.cern.ch:443/idds/monitor_request/null/null", + 'iddsAPI_transform': "https://lxplus8s16.cern.ch:443/idds/monitor_transform/null/null", + 'iddsAPI_processing': "https://lxplus8s16.cern.ch:443/idds/monitor_processing/null/null", + 'iddsAPI_request_detail': "https://lxplus8s16.cern.ch:443/idds/monitor/null/null/true/false/false", + 'iddsAPI_transform_detail': "https://lxplus8s16.cern.ch:443/idds/monitor/null/null/false/true/false", + 'iddsAPI_processing_detail': "https://lxplus8s16.cern.ch:443/idds/monitor/null/null/false/false/true" }