diff --git a/atlas/lib/idds/atlas/workflowv2/atlaspandawork.py b/atlas/lib/idds/atlas/workflowv2/atlaspandawork.py index 44802943..9ffd452b 100644 --- a/atlas/lib/idds/atlas/workflowv2/atlaspandawork.py +++ b/atlas/lib/idds/atlas/workflowv2/atlaspandawork.py @@ -179,6 +179,11 @@ def parse_task_parameters(self, task_parameters): if jobP['param_type'] == 'input': input_c = jobP['dataset'] scope, name = extract_scope_atlas(input_c, scopes=[]) + if len(name) > 255: + if "consolidate" in jobP: + scope, name = extract_scope_atlas(jobP['consolidate'], scopes=[]) + else: + name = name[:250] input_coll = {'scope': scope, 'name': name} self.set_primary_input_collection(input_coll) if jobP['param_type'] == 'output': diff --git a/doma/lib/idds/doma/workflow/domapandawork.py b/doma/lib/idds/doma/workflow/domapandawork.py index e53b2425..0a5bf035 100644 --- a/doma/lib/idds/doma/workflow/domapandawork.py +++ b/doma/lib/idds/doma/workflow/domapandawork.py @@ -856,8 +856,8 @@ def get_job_maps(self, input_output_maps): if 'panda_id' in content['content_metadata']: finished_jobs.append(content['content_metadata']['panda_id']) elif content['substatus'] in [ContentStatus.Failed, ContentStatus.FinalFailed, - ContentStatus.Lost, ContentStatus.Deleted, - ContentStatus.Missing]: + ContentStatus.Lost, ContentStatus.Deleted, + ContentStatus.Missing]: if 'panda_id' in content['content_metadata']: failed_jobs.append(content['content_metadata']['panda_id']) for content in inputs: diff --git a/main/etc/sql/oracle_11.sql b/main/etc/sql/oracle_11.sql index a6909d13..eab9b32e 100644 --- a/main/etc/sql/oracle_11.sql +++ b/main/etc/sql/oracle_11.sql @@ -147,6 +147,7 @@ CREATE TABLE TRANSFORMS started_at DATE, finished_at DATE, expired_at DATE, + name VARCHAR2(255), transform_metadata CLOB, running_metadata CLOB, CONSTRAINT TRANSFORMS_PK PRIMARY KEY (transform_id) diff --git a/main/etc/sql/oracle_19.sql b/main/etc/sql/oracle_19.sql index 5b55d0f2..a3b0ede7 100644 --- a/main/etc/sql/oracle_19.sql +++ b/main/etc/sql/oracle_19.sql @@ -127,6 +127,7 @@ CREATE TABLE TRANSFORMS started_at DATE, finished_at DATE, expired_at DATE, + name VARCHAR2(255), transform_metadata CLOB constraint TRANSFORM_METADATA_ENSURE_JSON CHECK(transform_metadata IS JSON(LAX)), running_metadata CLOB, CONSTRAINT TRANSFORMS_PK PRIMARY KEY (transform_id) diff --git a/main/etc/sql/oracle_update.sql b/main/etc/sql/oracle_update.sql index 276513b1..91c1afe9 100644 --- a/main/etc/sql/oracle_update.sql +++ b/main/etc/sql/oracle_update.sql @@ -108,3 +108,7 @@ CREATE INDEX COMMANDS_TYPE_ST_IDX ON COMMANDS (cmd_type, status, destination, re CREATE INDEX COMMANDS_TYPE_ST_TF_IDX ON COMMANDS (cmd_type, status, destination, transform_id); CREATE INDEX COMMANDS_TYPE_ST_PR_IDX ON COMMANDS (cmd_type, status, destination, processing_id); +-- 2022.11.03 +alter table transforms add name VARCHAR2(255); +alter table collections add failed_files NUMBER(10); +alter table collections add missing_files NUMBER(10); diff --git a/main/lib/idds/agents/carrier/utils.py b/main/lib/idds/agents/carrier/utils.py index fa1de5a8..c4b5f4a1 100644 --- a/main/lib/idds/agents/carrier/utils.py +++ b/main/lib/idds/agents/carrier/utils.py @@ -1160,7 +1160,8 @@ def sync_collection_status(request_id, transform_id, workload_id, work, input_ou for content in inputs + outputs + logs: if content['coll_id'] not in coll_status: - coll_status[content['coll_id']] = {'total_files': 0, 'processed_files': 0, 'processing_files': 0, 'bytes': 0} + coll_status[content['coll_id']] = {'total_files': 0, 'processed_files': 0, 'processing_files': 0, 'bytes': 0, + 'new_files': 0, 'failed_files': 0, 'missing_files': 0} coll_status[content['coll_id']]['total_files'] += 1 if content['status'] in [ContentStatus.Available, ContentStatus.Mapped, @@ -1168,6 +1169,12 @@ def sync_collection_status(request_id, transform_id, workload_id, work, input_ou ContentStatus.FakeAvailable, ContentStatus.FakeAvailable.value]: coll_status[content['coll_id']]['processed_files'] += 1 coll_status[content['coll_id']]['bytes'] += content['bytes'] + elif content['status'] in [ContentStatus.New]: + coll_status[content['coll_id']]['new_files'] += 1 + elif content['status'] in [ContentStatus.Failed, ContentStatus.FinalFailed]: + coll_status[content['coll_id']]['failed_files'] += 1 + elif content['status'] in [ContentStatus.Lost, ContentStatus.Deleted, ContentStatus.Missing]: + coll_status[content['coll_id']]['missing_files'] += 1 else: coll_status[content['coll_id']]['processing_files'] += 1 @@ -1188,15 +1195,24 @@ def sync_collection_status(request_id, transform_id, workload_id, work, input_ou coll.processed_files = coll_status[coll.coll_id]['processed_files'] coll.processing_files = coll_status[coll.coll_id]['processing_files'] coll.bytes = coll_status[coll.coll_id]['bytes'] + coll.new_files = coll_status[coll.coll_id]['new_files'] + coll.failed_files = coll_status[coll.coll_id]['failed_files'] + coll.missing_files = coll_status[coll.coll_id]['missing_files'] else: coll.total_files = 0 coll.processed_files = 0 coll.processing_files = 0 + coll.new_files = 0 + coll.failed_files = 0 + coll.missing_files = 0 u_coll = {'coll_id': coll.coll_id, 'total_files': coll.total_files, 'processed_files': coll.processed_files, 'processing_files': coll.processing_files, + 'new_files': coll.new_files, + 'failed_files': coll.failed_files, + 'missing_files': coll.missing_files, 'bytes': coll.bytes} if terminate: if force_close_collection or close_collection and all_updates_flushed or coll.status == CollectionStatus.Closed: @@ -1342,5 +1358,5 @@ def handle_resume_processing(processing, agent_attributes, logger=None, log_pref input_output_maps = get_input_output_maps(transform_id, work) update_contents = reactive_contents(request_id, transform_id, workload_id, work, input_output_maps) - processing['status'] = ProcessingStatus.Processing + processing['status'] = ProcessingStatus.Running return processing, update_collections, update_contents diff --git a/main/lib/idds/agents/clerk/clerk.py b/main/lib/idds/agents/clerk/clerk.py index 181db816..632da2c8 100644 --- a/main/lib/idds/agents/clerk/clerk.py +++ b/main/lib/idds/agents/clerk/clerk.py @@ -289,6 +289,7 @@ def generate_transform(self, req, work): 'priority': req['priority'], 'status': TransformStatus.New, 'retries': 0, + 'name': work.get_work_name(), 'new_poll_period': self.new_poll_period, 'update_poll_period': self.update_poll_period, 'max_new_retries': req['max_new_retries'] if req['max_new_retries'] is not None else self.max_new_retries, diff --git a/main/lib/idds/agents/common/eventbus/localeventbusbackend.py b/main/lib/idds/agents/common/eventbus/localeventbusbackend.py index 5a276a6a..aa1a5c42 100644 --- a/main/lib/idds/agents/common/eventbus/localeventbusbackend.py +++ b/main/lib/idds/agents/common/eventbus/localeventbusbackend.py @@ -72,7 +72,7 @@ def get(self, event_type, wait=0): def execute(self): while not self.graceful_stop.is_set(): try: - self.graceful_stop.wait(1) + self.graceful_stop.wait(0.1) except Exception as error: self.logger.critical("Caught an exception: %s\n%s" % (str(error), traceback.format_exc())) diff --git a/main/lib/idds/agents/common/plugins/messaging.py b/main/lib/idds/agents/common/plugins/messaging.py index 28949b6f..48132de5 100644 --- a/main/lib/idds/agents/common/plugins/messaging.py +++ b/main/lib/idds/agents/common/plugins/messaging.py @@ -60,6 +60,7 @@ def __init__(self, name="MessagingSender", **kwargs): self.graceful_stop = threading.Event() self.request_queue = None self.output_queue = None + self.response_queue = None if not hasattr(self, 'brokers'): raise Exception('brokers is required but not defined.') @@ -72,7 +73,7 @@ def __init__(self, name="MessagingSender", **kwargs): if not hasattr(self, 'destination'): raise Exception('destination is required but not defined.') if not hasattr(self, 'broker_timeout'): - self.broker_timeout = 10 + self.broker_timeout = 60 else: self.broker_timeout = int(self.broker_timeout) @@ -87,7 +88,10 @@ def set_request_queue(self, request_queue): def set_output_queue(self, output_queue): self.output_queue = output_queue - def connect_to_messaging_brokers(self): + def set_response_queue(self, response_queue): + self.response_queue = response_queue + + def connect_to_messaging_brokers(self, sender=True): broker_addresses = [] for b in self.brokers: try: @@ -105,18 +109,47 @@ def connect_to_messaging_brokers(self): self.logger.info("Resolved broker addresses: %s" % broker_addresses) + timeout = None + if sender: + timeout = self.broker_timeout + + conns = [] for broker, port in broker_addresses: conn = stomp.Connection12(host_and_ports=[(broker, port)], vhost=self.vhost, keepalive=True, - timeout=self.broker_timeout) - self.conns.append(conn) + heartbeats=(60000, 60000), # one minute + timeout=timeout) + conns.append(conn) + return conns - def send_message(self, msg): + def disconnect(self, conns): + for conn in conns: + try: + conn.disconnect() + except Exception: + pass + + def get_connection(self): + try: + conn = random.sample(self.conns, 1)[0] + if not conn.is_connected(): + # conn.start() + conn.connect(self.username, self.password, wait=True) + return conn + except Exception as error: + self.logger.error("Failed to connect to message broker(will re-resolve brokers): %s" % str(error)) + + self.disconnect(self.conns) + + self.conns = self.connect_to_messaging_brokers(sender=True) conn = random.sample(self.conns, 1)[0] if not conn.is_connected(): - # conn.start() conn.connect(self.username, self.password, wait=True) + return conn + + def send_message(self, msg): + conn = self.get_connection() self.logger.info("Sending message to message broker: %s" % msg['msg_id']) self.logger.debug("Sending message to message broker: %s" % json.dumps(msg['msg_content'])) @@ -128,8 +161,8 @@ def send_message(self, msg): 'vo': 'atlas', 'msg_type': str(msg['msg_type']).lower()}) - def run(self): - self.connect_to_messaging_brokers() + def execute_send(self): + self.conns = self.connect_to_messaging_brokers(sender=True) while not self.graceful_stop.is_set(): try: @@ -137,12 +170,18 @@ def run(self): msg = self.request_queue.get(False) if msg: self.send_message(msg) - self.output_queue.put(msg) + self.response_queue.put(msg) else: - time.sleep(1) + time.sleep(0.1) except Exception as error: self.logger.error("Messaging sender throws an exception: %s, %s" % (error, traceback.format_exc())) + def run(self): + try: + self.execute_send() + except Exception as error: + self.logger.error("Messaging sender throws an exception: %s, %s" % (error, traceback.format_exc())) + def __call__(self): self.run() @@ -151,6 +190,7 @@ class MessagingReceiver(MessagingSender): def __init__(self, name="MessagingReceiver", **kwargs): super(MessagingReceiver, self).__init__(name=name, **kwargs) self.listener = None + self.receiver_conns = [] def get_listener(self, broker): if self.listener is None: @@ -158,58 +198,100 @@ def get_listener(self, broker): return self.listener def subscribe(self): - self.conns = [] - - broker_addresses = [] - for b in self.brokers: - try: - if ":" in b: - b, port = b.split(":") - else: - port = self.port - - addrinfos = socket.getaddrinfo(b, 0, socket.AF_INET, 0, socket.IPPROTO_TCP) - for addrinfo in addrinfos: - b_addr = addrinfo[4][0] - broker_addresses.append((b_addr, port)) - except socket.gaierror as error: - self.logger.error('Cannot resolve hostname %s: %s' % (b, str(error))) + self.receiver_conns = self.connect_to_messaging_brokers() - self.logger.info("Resolved broker addresses: %s" % broker_addresses) - - for broker, port in broker_addresses: - conn = stomp.Connection12(host_and_ports=[(broker, port)], - vhost=self.vhost, - keepalive=True) + for conn in self.receiver_conns: + self.logger.info('connecting to %s' % conn.transport._Transport__host_and_ports[0][0]) conn.set_listener('message-receiver', self.get_listener(conn.transport._Transport__host_and_ports[0])) conn.connect(self.username, self.password, wait=True) conn.subscribe(destination=self.destination, id='atlas-idds-messaging', ack='auto') - self.conns.append(conn) + + def execute_subscribe(self): + self.subscribe() while not self.graceful_stop.is_set(): + has_failed_connection = False try: - for conn in self.conns: + for conn in self.receiver_conns: if not conn.is_connected(): - self.logger.info('connecting to %s' % conn.transport._Transport__host_and_ports[0][0]) conn.set_listener('message-receiver', self.get_listener(conn.transport._Transport__host_and_ports[0])) # conn.start() conn.connect(self.username, self.password, wait=True) conn.subscribe(destination=self.destination, id='atlas-idds-messaging', ack='auto') - time.sleep(1) + time.sleep(0.1) except Exception as error: self.logger.error("Messaging receiver throws an exception: %s, %s" % (error, traceback.format_exc())) + has_failed_connection = True + + if has_failed_connection: + # re-subscribe + self.disconnect(self.receiver_conns) + self.subscribe() self.logger.info('receiver graceful stop requested') - for conn in self.conns: + self.disconnect(self.receiver_conns) + + def run(self): + try: + self.execute_subscribe() + except Exception as error: + self.logger.error("Messaging receiver throws an exception: %s, %s" % (error, traceback.format_exc())) + + def __call__(self): + self.run() + + +class MessagingMessager(MessagingReceiver): + def __init__(self, name="MessagingMessager", **kwargs): + super(MessagingMessager, self).__init__(name=name, **kwargs) + + def execute_send_subscribe(self): + self.conns = self.connect_to_messaging_brokers(sender=True) + self.subscribe() + + while not self.graceful_stop.is_set(): + # send + while True: + try: + if not self.request_queue.empty(): + msg = self.request_queue.get(False) + if msg: + self.send_message(msg) + if self.response_queue: + self.response_queue.put(msg) + else: + break + except Exception as error: + self.logger.error("Messaging sender throws an exception: %s, %s" % (error, traceback.format_exc())) + + # subscribe + has_failed_connection = False try: - conn.disconnect() - except Exception: - pass + for conn in self.receiver_conns: + if not conn.is_connected(): + conn.set_listener('message-receiver', self.get_listener(conn.transport._Transport__host_and_ports[0])) + # conn.start() + conn.connect(self.username, self.password, wait=True) + conn.subscribe(destination=self.destination, id='atlas-idds-messaging', ack='auto') + except Exception as error: + self.logger.error("Messaging receiver throws an exception: %s, %s" % (error, traceback.format_exc())) + has_failed_connection = True + + if has_failed_connection: + # re-subscribe + self.disconnect(self.receiver_conns) + self.subscribe() + + time.sleep(0.1) + + self.logger.info('receiver graceful stop requested') + self.disconnect(self.conns) + self.disconnect(self.receiver_conns) def run(self): try: - self.subscribe() + self.execute_send_subscribe() except Exception as error: self.logger.error("Messaging receiver throws an exception: %s, %s" % (error, traceback.format_exc())) diff --git a/main/lib/idds/agents/conductor/conductor.py b/main/lib/idds/agents/conductor/conductor.py index 58787ae5..bc72f185 100644 --- a/main/lib/idds/agents/conductor/conductor.py +++ b/main/lib/idds/agents/conductor/conductor.py @@ -101,7 +101,7 @@ def start_notifier(self): self.logger.info("Starting notifier: %s" % self.notifier) self.notifier.set_request_queue(self.message_queue) - self.notifier.set_output_queue(self.output_message_queue) + self.notifier.set_response_queue(self.output_message_queue) self.notifier.start() def stop_notifier(self): diff --git a/main/lib/idds/core/requests.py b/main/lib/idds/core/requests.py index a84937c1..01087741 100644 --- a/main/lib/idds/core/requests.py +++ b/main/lib/idds/core/requests.py @@ -209,7 +209,7 @@ def generate_collection(transform, collection, relation_type=CollectionRelationT 'workload_id': transform['workload_id'], 'coll_type': coll_type, 'scope': collection.scope, - 'name': collection.name, + 'name': collection.name[:254], 'relation_type': relation_type, 'bytes': coll_metadata['bytes'] if 'bytes' in coll_metadata else 0, 'total_files': coll_metadata['total_files'] if 'total_files' in coll_metadata else 0, diff --git a/main/lib/idds/core/transforms.py b/main/lib/idds/core/transforms.py index 9587a162..638551f8 100644 --- a/main/lib/idds/core/transforms.py +++ b/main/lib/idds/core/transforms.py @@ -29,7 +29,7 @@ @transactional_session -def add_transform(request_id, workload_id, transform_type, transform_tag=None, priority=0, +def add_transform(request_id, workload_id, transform_type, transform_tag=None, priority=0, name=None, status=TransformStatus.New, substatus=TransformStatus.New, locking=TransformLocking.Idle, new_poll_period=1, update_poll_period=10, retries=0, expired_at=None, transform_metadata=None, new_retries=0, update_retries=0, max_new_retries=3, max_update_retries=0, @@ -56,7 +56,7 @@ def add_transform(request_id, workload_id, transform_type, transform_tag=None, p transform_id = orm_transforms.add_transform(request_id=request_id, workload_id=workload_id, transform_type=transform_type, transform_tag=transform_tag, priority=priority, status=status, substatus=substatus, - locking=locking, retries=retries, + locking=locking, retries=retries, name=name, new_poll_period=new_poll_period, update_poll_period=update_poll_period, new_retries=new_retries, update_retries=update_retries, diff --git a/main/lib/idds/orm/base/models.py b/main/lib/idds/orm/base/models.py index fe951b41..472d00d2 100644 --- a/main/lib/idds/orm/base/models.py +++ b/main/lib/idds/orm/base/models.py @@ -282,6 +282,7 @@ class Transform(BASE, ModelBase): max_update_retries = Column(Integer(), default=0) new_poll_period = Column(Interval(), default=datetime.timedelta(seconds=1)) update_poll_period = Column(Interval(), default=datetime.timedelta(seconds=10)) + name = Column(String(NAME_LENGTH)) errors = Column(JSONString(1024)) _transform_metadata = Column('transform_metadata', JSON()) _running_metadata = Column('running_metadata', JSON()) @@ -469,6 +470,8 @@ class Collection(BASE, ModelBase): new_files = Column(Integer()) processed_files = Column(Integer()) processing_files = Column(Integer()) + failed_files = Column(Integer()) + missing_files = Column(Integer()) processing_id = Column(Integer()) retries = Column(Integer(), default=0) created_at = Column("created_at", DateTime, default=datetime.datetime.utcnow) diff --git a/main/lib/idds/orm/requests.py b/main/lib/idds/orm/requests.py index 8b092a8a..c87ef135 100644 --- a/main/lib/idds/orm/requests.py +++ b/main/lib/idds/orm/requests.py @@ -339,6 +339,9 @@ def get_requests(request_id=None, workload_id=None, with_detail=False, with_meta models.Collection.bytes.label("input_coll_bytes"), models.Collection.total_files.label("input_total_files"), models.Collection.processed_files.label("input_processed_files"), + models.Collection.new_files.label("input_new_files"), + models.Collection.failed_files.label("input_failed_files"), + models.Collection.missing_files.label("input_missing_files"), models.Collection.processing_files.label("input_processing_files")).filter(models.Collection.relation_type == 0) subquery1 = subquery1.subquery() @@ -349,6 +352,9 @@ def get_requests(request_id=None, workload_id=None, with_detail=False, with_meta models.Collection.bytes.label("output_coll_bytes"), models.Collection.total_files.label("output_total_files"), models.Collection.processed_files.label("output_processed_files"), + models.Collection.new_files.label("output_new_files"), + models.Collection.failed_files.label("output_failed_files"), + models.Collection.missing_files.label("output_missing_files"), models.Collection.processing_files.label("output_processing_files")).filter(models.Collection.relation_type == 1) subquery2 = subquery2.subquery() if True: @@ -383,11 +389,17 @@ def get_requests(request_id=None, workload_id=None, with_detail=False, with_meta subquery1.c.input_total_files, subquery1.c.input_processed_files, subquery1.c.input_processing_files, + subquery1.c.input_new_files, + subquery1.c.input_failed_files, + subquery1.c.input_missing_files, subquery2.c.output_coll_scope, subquery2.c.output_coll_name, subquery2.c.output_coll_status, subquery2.c.output_coll_bytes, subquery2.c.output_total_files, subquery2.c.output_processed_files, - subquery2.c.output_processing_files) + subquery2.c.output_processing_files, + subquery2.c.output_new_files, + subquery2.c.output_failed_files, + subquery2.c.output_missing_files) if request_id: query = query.filter(models.Request.request_id == request_id) @@ -492,6 +504,9 @@ def get_requests(request_id=None, workload_id=None, with_detail=False, with_meta models.Collection.bytes.label("input_coll_bytes"), models.Collection.total_files.label("input_total_files"), models.Collection.processed_files.label("input_processed_files"), + models.Collection.new_files.label("input_new_files"), + models.Collection.failed_files.label("input_failed_files"), + models.Collection.missing_files.label("input_missing_files"), models.Collection.processing_files.label("input_processing_files")).filter(models.Collection.relation_type == 0) subquery1 = subquery1.subquery() @@ -502,6 +517,9 @@ def get_requests(request_id=None, workload_id=None, with_detail=False, with_meta models.Collection.bytes.label("output_coll_bytes"), models.Collection.total_files.label("output_total_files"), models.Collection.processed_files.label("output_processed_files"), + models.Collection.new_files.label("output_new_files"), + models.Collection.failed_files.label("output_failed_files"), + models.Collection.missing_files.label("output_missing_files"), models.Collection.processing_files.label("output_processing_files")).filter(models.Collection.relation_type == 1) subquery2 = subquery2.subquery() @@ -529,6 +547,7 @@ def get_requests(request_id=None, workload_id=None, with_detail=False, with_meta models.Request._processing_metadata.label('processing_metadata'), models.Transform.transform_id, models.Transform.transform_type, + models.Transform.name.label("transform_name"), models.Transform.workload_id.label("transform_workload_id"), models.Transform.status.label("transform_status"), models.Transform.created_at.label("transform_created_at"), @@ -539,11 +558,17 @@ def get_requests(request_id=None, workload_id=None, with_detail=False, with_meta subquery1.c.input_total_files, subquery1.c.input_processed_files, subquery1.c.input_processing_files, + subquery1.c.input_new_files, + subquery1.c.input_failed_files, + subquery1.c.input_missing_files, subquery2.c.output_coll_scope, subquery2.c.output_coll_name, subquery2.c.output_coll_status, subquery2.c.output_coll_bytes, subquery2.c.output_total_files, subquery2.c.output_processed_files, - subquery2.c.output_processing_files) + subquery2.c.output_processing_files, + subquery2.c.output_new_files, + subquery2.c.output_failed_files, + subquery2.c.output_missing_files,) else: query = session.query(models.Request.request_id, models.Request.scope, @@ -566,6 +591,7 @@ def get_requests(request_id=None, workload_id=None, with_detail=False, with_meta models.Request.errors, models.Transform.transform_id, models.Transform.transform_type, + models.Transform.name.label("transform_name"), models.Transform.workload_id.label("transform_workload_id"), models.Transform.status.label("transform_status"), models.Transform.created_at.label("transform_created_at"), @@ -576,11 +602,17 @@ def get_requests(request_id=None, workload_id=None, with_detail=False, with_meta subquery1.c.input_total_files, subquery1.c.input_processed_files, subquery1.c.input_processing_files, + subquery1.c.input_new_files, + subquery1.c.input_failed_files, + subquery1.c.input_missing_files, subquery2.c.output_coll_scope, subquery2.c.output_coll_name, subquery2.c.output_coll_status, subquery2.c.output_coll_bytes, subquery2.c.output_total_files, subquery2.c.output_processed_files, - subquery2.c.output_processing_files) + subquery2.c.output_processing_files, + subquery2.c.output_new_files, + subquery2.c.output_failed_files, + subquery2.c.output_missing_files,) if request_id: query = query.filter(models.Request.request_id == request_id) diff --git a/main/lib/idds/orm/transforms.py b/main/lib/idds/orm/transforms.py index f2e259aa..2f61d4df 100644 --- a/main/lib/idds/orm/transforms.py +++ b/main/lib/idds/orm/transforms.py @@ -27,7 +27,7 @@ def create_transform(request_id, workload_id, transform_type, transform_tag=None, - priority=0, status=TransformStatus.New, + priority=0, status=TransformStatus.New, name=None, substatus=TransformStatus.New, locking=TransformLocking.Idle, new_poll_period=1, update_poll_period=10, new_retries=0, update_retries=0, max_new_retries=3, max_update_retries=0, @@ -49,7 +49,7 @@ def create_transform(request_id, workload_id, transform_type, transform_tag=None :returns: transform. """ new_transform = models.Transform(request_id=request_id, workload_id=workload_id, transform_type=transform_type, - transform_tag=transform_tag, priority=priority, + transform_tag=transform_tag, priority=priority, name=name, status=status, substatus=substatus, locking=locking, retries=retries, expired_at=expired_at, new_retries=new_retries, update_retries=update_retries, @@ -65,7 +65,7 @@ def create_transform(request_id, workload_id, transform_type, transform_tag=None @transactional_session -def add_transform(request_id, workload_id, transform_type, transform_tag=None, priority=0, +def add_transform(request_id, workload_id, transform_type, transform_tag=None, priority=0, name=None, status=TransformStatus.New, substatus=TransformStatus.New, locking=TransformLocking.Idle, new_poll_period=1, update_poll_period=10, retries=0, expired_at=None, new_retries=0, update_retries=0, max_new_retries=3, max_update_retries=0, @@ -91,7 +91,7 @@ def add_transform(request_id, workload_id, transform_type, transform_tag=None, p """ try: new_transform = create_transform(request_id=request_id, workload_id=workload_id, transform_type=transform_type, - transform_tag=transform_tag, priority=priority, + transform_tag=transform_tag, priority=priority, name=name, status=status, substatus=substatus, locking=locking, retries=retries, expired_at=expired_at, new_poll_period=new_poll_period, diff --git a/main/lib/idds/rest/v1/requests.py b/main/lib/idds/rest/v1/requests.py index 26e54802..84b8fe24 100644 --- a/main/lib/idds/rest/v1/requests.py +++ b/main/lib/idds/rest/v1/requests.py @@ -243,20 +243,20 @@ def put(self, request_id, workload_id=None, task_id=None): reqs = get_requests(request_id=request_id, workload_id=workload_id, with_request=True) if not reqs: - return self.generate_http_response(HTTP_STATUS_CODE.OK, data=(-1, {'status': -1, 'message': 'No match requests'})) + return self.generate_http_response(HTTP_STATUS_CODE.OK, data=[(-1, {'status': -1, 'message': 'No match requests'})]) matched_transform_id = None if task_id: for req in reqs: if str(req['processing_workload_id']) == str(task_id): matched_transform_id = req['transform_id'] if matched_transform_id: - return self.generate_http_response(HTTP_STATUS_CODE.OK, data=(-1, {'status': -1, 'message': 'No match tasks'})) + return self.generate_http_response(HTTP_STATUS_CODE.OK, data=[(-1, {'status': -1, 'message': 'No match tasks'})]) for req in reqs: if req['username'] and req['username'] != username and not authenticate_is_super_user(username): msg = "User %s has no permission to update request %s" % (username, req['request_id']) # raise exceptions.AuthenticationNoPermission(msg) - return self.generate_http_response(HTTP_STATUS_CODE.OK, data=(-1, {'status': -1, 'message': msg})) + return self.generate_http_response(HTTP_STATUS_CODE.OK, data=[(-1, {'status': -1, 'message': msg})]) except exceptions.AuthenticationNoPermission as error: return self.generate_http_response(HTTP_STATUS_CODE.InternalError, exc_cls=error.__class__.__name__, exc_msg=error) except Exception as error: @@ -283,7 +283,7 @@ def put(self, request_id, workload_id=None, task_id=None): print(format_exc()) return self.generate_http_response(HTTP_STATUS_CODE.InternalError, exc_cls=exceptions.CoreException.__name__, exc_msg=error) - return self.generate_http_response(HTTP_STATUS_CODE.OK, data=(0, {'status': 0, 'message': 'Command registered successfully'})) + return self.generate_http_response(HTTP_STATUS_CODE.OK, data=[(0, {'status': 0, 'message': 'Command registered successfully'})]) class RequestRetry(IDDSController): @@ -308,13 +308,13 @@ def put(self, request_id, workload_id=None): username = self.get_username() reqs = get_requests(request_id=request_id, workload_id=workload_id, with_request=True) if not reqs: - return self.generate_http_response(HTTP_STATUS_CODE.OK, data=(-1, {'status': -1, 'message': 'No match requests'})) + return self.generate_http_response(HTTP_STATUS_CODE.OK, data=[(-1, {'status': -1, 'message': 'No match requests'})]) for req in reqs: if req['username'] and req['username'] != username and not authenticate_is_super_user(username): msg = "User %s has no permission to update request %s" % (username, req['request_id']) # raise exceptions.AuthenticationNoPermission(msg) - return self.generate_http_response(HTTP_STATUS_CODE.OK, data=(-1, {'status': -1, 'message': msg})) + return self.generate_http_response(HTTP_STATUS_CODE.OK, data=[(-1, {'status': -1, 'message': msg})]) except exceptions.AuthenticationNoPermission as error: return self.generate_http_response(HTTP_STATUS_CODE.InternalError, exc_cls=error.__class__.__name__, exc_msg=error) except Exception as error: @@ -336,7 +336,7 @@ def put(self, request_id, workload_id=None): print(format_exc()) return self.generate_http_response(HTTP_STATUS_CODE.InternalError, exc_cls=exceptions.CoreException.__name__, exc_msg=error) - return self.generate_http_response(HTTP_STATUS_CODE.OK, data=(0, {'status': 0, 'message': 'Command registered successfully'})) + return self.generate_http_response(HTTP_STATUS_CODE.OK, data=[(0, {'status': 0, 'message': 'Command registered successfully'})]) """---------------------- diff --git a/main/lib/idds/tests/core_tests.py b/main/lib/idds/tests/core_tests.py index 71070660..9696faf2 100644 --- a/main/lib/idds/tests/core_tests.py +++ b/main/lib/idds/tests/core_tests.py @@ -5,7 +5,7 @@ from idds.common.constants import ContentStatus, ContentType, ContentRelationType, ContentLocking # noqa F401 from idds.core.requests import get_requests # noqa F401 from idds.core.messages import retrieve_messages # noqa F401 -from idds.core.transforms import get_transforms # noqa F401 +from idds.core.transforms import get_transforms, get_transform # noqa F401 from idds.core.workprogress import get_workprogresses # noqa F401 from idds.core.processings import get_processings # noqa F401 from idds.core import transforms as core_transforms # noqa F401 @@ -111,16 +111,32 @@ def print_workflow(workflow, layers=0): prefix = " " * layers * 4 for run in workflow.runs: print(prefix + "run: " + str(run) + ", has_loop_condition: " + str(workflow.runs[run].has_loop_condition())) - if workflow.runs[run].has_loop_condition(): - print(prefix + " Loop condition: %s" % json_dumps(workflow.runs[run].loop_condition, sort_keys=True, indent=4)) + # if workflow.runs[run].has_loop_condition(): + # print(prefix + " Loop condition: %s" % json_dumps(workflow.runs[run].loop_condition, sort_keys=True, indent=4)) for work_id in workflow.runs[run].works: print(prefix + " " + str(work_id) + " " + str(type(workflow.runs[run].works[work_id]))) if type(workflow.runs[run].works[work_id]) in [Workflow]: print(prefix + " parent_num_run: " + workflow.runs[run].works[work_id].parent_num_run + ", num_run: " + str(workflow.runs[run].works[work_id].num_run)) print_workflow(workflow.runs[run].works[work_id], layers=layers + 1) + # print(prefix + " is_terminated: " + str(workflow.runs[run].works[work_id].is_terminated())) + # print(prefix + " is_finished: " + str(workflow.runs[run].works[work_id].is_finished())) # elif type(workflow.runs[run].works[work_id]) in [Work]: else: + work = workflow.runs[run].works[work_id] + tf = get_transform(transform_id=work.get_work_id()) + if tf: + transform_work = tf['transform_metadata']['work'] + # print(json_dumps(transform_work, sort_keys=True, indent=4)) + work.sync_work_data(status=tf['status'], substatus=tf['substatus'], work=transform_work, workload_id=tf['workload_id']) + + print(prefix + " or: " + str(work.or_custom_conditions) + " and: " + str(work.and_custom_conditions)) + print(prefix + " output: " + str(work.output_data)) print(prefix + " " + workflow.runs[run].works[work_id].task_name + ", num_run: " + str(workflow.runs[run].works[work_id].num_run)) + # print(prefix + " is_terminated: " + str(workflow.runs[run].works[work_id].is_terminated())) + # print(prefix + " is_finished: " + str(workflow.runs[run].works[work_id].is_finished())) + if workflow.runs[run].has_loop_condition(): + print(prefix + " Loop condition status: %s" % workflow.runs[run].get_loop_condition_status()) + print(prefix + " Loop condition: %s" % json_dumps(workflow.runs[run].loop_condition, sort_keys=True, indent=4)) def print_workflow_template(workflow, layers=0): @@ -151,7 +167,10 @@ def print_workflow_template(workflow, layers=0): # reqs = get_requests(request_id=372678, with_request=True, with_detail=False, with_metadata=True) # reqs = get_requests(request_id=373602, with_request=True, with_detail=False, with_metadata=True) # reqs = get_requests(request_id=376086, with_request=True, with_detail=False, with_metadata=True) -reqs = get_requests(request_id=380474, with_request=True, with_detail=False, with_metadata=True) +# reqs = get_requests(request_id=380474, with_request=True, with_detail=False, with_metadata=True) +# reqs = get_requests(request_id=381520, with_request=True, with_detail=False, with_metadata=True) +# reqs = get_requests(request_id=28182323, with_request=True, with_detail=False, with_metadata=True) +reqs = get_requests(request_id=385554, with_request=True, with_detail=False, with_metadata=True) for req in reqs: # print(req['request_id']) # print(req) @@ -180,13 +199,20 @@ def print_workflow_template(workflow, layers=0): print("workflow") print_workflow(workflow) - print("workflow template") - print_workflow_template(workflow) + new_works = workflow.get_new_works() + print('new_works:' + str(new_works)) + # print("workflow template") + # print_workflow_template(workflow) # workflow.sync_works() -# sys.exit(0) +sys.exit(0) + +reqs = get_requests(request_id=28182323, with_request=False, with_detail=True, with_metadata=False) +for req in reqs: + print(json_dumps(req, sort_keys=True, indent=4)) +# sys.exit(0) """ # reqs = get_requests() @@ -203,13 +229,13 @@ def print_workflow_template(workflow, layers=0): """ -tfs = get_transforms(request_id=380474) +tfs = get_transforms(request_id=2818) # tfs = get_transforms(transform_id=350723) for tf in tfs: # print(tf) # print(tf['transform_metadata']['work'].to_dict()) # print(tf) - # print(json_dumps(tf, sort_keys=True, indent=4)) + print(json_dumps(tf, sort_keys=True, indent=4)) print(tf['request_id'], tf['workload_id']) print(tf['transform_metadata']['work_name']) print(tf['transform_metadata']['work'].num_run) diff --git a/main/lib/idds/tests/fix_trasnform_name.py b/main/lib/idds/tests/fix_trasnform_name.py new file mode 100644 index 00000000..be0080ec --- /dev/null +++ b/main/lib/idds/tests/fix_trasnform_name.py @@ -0,0 +1,27 @@ +import sys # noqa F401 +import datetime # noqa F401 + +from idds.common.utils import json_dumps, setup_logging # noqa F401 +from idds.common.constants import ContentStatus, ContentType, ContentRelationType, ContentLocking # noqa F401 +from idds.core.requests import get_requests # noqa F401 +from idds.core.messages import retrieve_messages # noqa F401 +from idds.core.transforms import get_transforms, get_transform, update_transform # noqa F401 +from idds.core.workprogress import get_workprogresses # noqa F401 +from idds.core.processings import get_processings # noqa F401 +from idds.core import transforms as core_transforms # noqa F401 +from idds.orm.contents import get_input_contents # noqa F401 +from idds.core.transforms import release_inputs_by_collection, release_inputs_by_collection_old # noqa F401 +from idds.workflowv2.workflow import Workflow # noqa F401 +from idds.workflowv2.work import Work # noqa F401 + + +setup_logging(__name__) + +tfs = get_transforms() +# tfs = get_transforms(transform_id=350723) +for tf in tfs: + print(tf['transform_id']) + work = tf['transform_metadata']['work'] + name = work.get_work_name() + update_transform(transform_id=tf['transform_id'], parameters={'name': name}) + # break diff --git a/main/lib/idds/tests/test_domapanda.py b/main/lib/idds/tests/test_domapanda.py index 5731dc3c..eca0d6ad 100644 --- a/main/lib/idds/tests/test_domapanda.py +++ b/main/lib/idds/tests/test_domapanda.py @@ -41,6 +41,8 @@ # task_queue = 'DOMA_LSST_GOOGLE_MERGE' # task_queue = 'SLAC_TEST' # task_queue = 'DOMA_LSST_SLAC_TEST' +task_queue = 'SLAC_Rubin' +task_queue = 'CC-IN2P3_TEST' def randStr(chars=string.ascii_lowercase + string.digits, N=10): diff --git a/main/lib/idds/tests/test_migrate_requests.py b/main/lib/idds/tests/test_migrate_requests.py index 7f3e1d62..48467cf3 100644 --- a/main/lib/idds/tests/test_migrate_requests.py +++ b/main/lib/idds/tests/test_migrate_requests.py @@ -36,7 +36,7 @@ def migrate(): doma_google_host = 'https://34.133.138.229:443/idds' # noqa F841 cm1 = ClientManager(host=atlas_host) - # cm1 = ClientManager(host=doma_host) + cm1 = ClientManager(host=doma_host) # reqs = cm1.get_requests(request_id=290) # old_request_id = 298163 # old_request_id = 350723 @@ -45,6 +45,8 @@ def migrate(): old_request_id = 2400 old_request_id = 371204 old_request_id = 372930 + old_request_id = 2603 + old_request_id = 2802 # for old_request_id in [152]: # for old_request_id in [60]: # noqa E115 @@ -54,7 +56,7 @@ def migrate(): cm2 = ClientManager(host=dev_host) cm2 = ClientManager(host=doma_host) - cm2 = ClientManager(host=atlas_host) + # cm2 = ClientManager(host=atlas_host) # print(reqs) print("num requests: %s" % len(reqs)) diff --git a/main/tools/env/setup_panda.sh b/main/tools/env/setup_panda.sh index 8117d503..b5fa762c 100644 --- a/main/tools/env/setup_panda.sh +++ b/main/tools/env/setup_panda.sh @@ -8,10 +8,13 @@ fi export X509_USER_PROXY=/afs/cern.ch/user/w/wguan/workdisk/iDDS/test/x509up export RUCIO_ACCOUNT=pilot +#export IDDS_HOST=https://panda-idds-dev.cern.ch:443/idds + export PANDA_BEHIND_REAL_LB=true # export PANDA_SYS=/opt/idds/ if [ "$instance" == "k8s" ]; then + export IDDS_HOST=https://panda-idds-dev.cern.ch:443/idds export PANDA_AUTH=oidc export PANDA_BEHIND_REAL_LB=true export PANDA_VERIFY_HOST=off @@ -21,15 +24,19 @@ if [ "$instance" == "k8s" ]; then export PANDA_AUTH_VO=panda_dev export PANDACACHE_URL=$PANDA_URL_SSL + export PANDA_SYS=/afs/cern.ch/user/w/wguan/workdisk/iDDS/.conda/iDDS/ # export PANDA_CONFIG_ROOT=/afs/cern.ch/user/w/wguan/workdisk/iDDS/main/etc/panda/ export PANDA_CONFIG_ROOT=~/.panda/ elif [ "$instance" == "slac" ]; then export PANDA_AUTH=oidc - export PANDA_URL_SSL=https://rubin-panda-server-dev.slac.stanford.edu:443/server/panda + export PANDA_BEHIND_REAL_LB=true + export PANDA_VERIFY_HOST=off + export PANDA_URL_SSL=https://rubin-panda-server-dev.slac.stanford.edu:8443/server/panda export PANDA_URL=http://rubin-panda-server-dev.slac.stanford.edu:80/server/panda export PANDAMON_URL=https://rubin-panda-bigmon-dev.slac.stanford.edu export PANDA_AUTH_VO=Rubin + export PANDACACHE_URL=$PANDA_URL_SSL # export PANDA_CONFIG_ROOT=/afs/cern.ch/user/w/wguan/workdisk/iDDS/main/etc/panda/ export PANDA_CONFIG_ROOT=~/.panda/ else diff --git a/monitor/data/conf.js b/monitor/data/conf.js index 69dede90..b8a48092 100644 --- a/monitor/data/conf.js +++ b/monitor/data/conf.js @@ -1,9 +1,9 @@ var appConfig = { - 'iddsAPI_request': "https://lxplus8s05.cern.ch:443/idds/monitor_request/null/null", - 'iddsAPI_transform': "https://lxplus8s05.cern.ch:443/idds/monitor_transform/null/null", - 'iddsAPI_processing': "https://lxplus8s05.cern.ch:443/idds/monitor_processing/null/null", - 'iddsAPI_request_detail': "https://lxplus8s05.cern.ch:443/idds/monitor/null/null/true/false/false", - 'iddsAPI_transform_detail': "https://lxplus8s05.cern.ch:443/idds/monitor/null/null/false/true/false", - 'iddsAPI_processing_detail': "https://lxplus8s05.cern.ch:443/idds/monitor/null/null/false/false/true" + 'iddsAPI_request': "https://lxplus8s12.cern.ch:443/idds/monitor_request/null/null", + 'iddsAPI_transform': "https://lxplus8s12.cern.ch:443/idds/monitor_transform/null/null", + 'iddsAPI_processing': "https://lxplus8s12.cern.ch:443/idds/monitor_processing/null/null", + 'iddsAPI_request_detail': "https://lxplus8s12.cern.ch:443/idds/monitor/null/null/true/false/false", + 'iddsAPI_transform_detail': "https://lxplus8s12.cern.ch:443/idds/monitor/null/null/false/true/false", + 'iddsAPI_processing_detail': "https://lxplus8s12.cern.ch:443/idds/monitor/null/null/false/false/true" } diff --git a/workflow/lib/idds/workflow/work.py b/workflow/lib/idds/workflow/work.py index 486f728f..779cce15 100644 --- a/workflow/lib/idds/workflow/work.py +++ b/workflow/lib/idds/workflow/work.py @@ -68,6 +68,9 @@ def __init__(self, scope=None, name=None, coll_type=CollectionType.Dataset, coll self.processed_files = 0 self.processing_files = 0 self.bytes = 0 + self.new_files = 0 + self.failed_files = 0 + self.missing_files = 0 @property def internal_id(self): diff --git a/workflow/lib/idds/workflowv2/work.py b/workflow/lib/idds/workflowv2/work.py index 24306b3c..7ffe4e51 100644 --- a/workflow/lib/idds/workflowv2/work.py +++ b/workflow/lib/idds/workflowv2/work.py @@ -68,6 +68,9 @@ def __init__(self, scope=None, name=None, coll_type=CollectionType.Dataset, coll self.processed_files = 0 self.processing_files = 0 self.bytes = 0 + self.new_files = 0 + self.failed_files = 0 + self.missing_files = 0 @property def internal_id(self):