From 9ff783d3feb1fa75bf1a444cf4c944d17161fca6 Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Wed, 27 Sep 2023 20:54:58 +0200 Subject: [PATCH 1/2] use md5 to hash long contents name --- main/etc/sql/oracle_update.sql | 9 +++++ ...2e8514_using_hash_on_contents_long_name.py | 37 ++++++++++--------- main/lib/idds/orm/base/models.py | 7 +++- main/lib/idds/orm/contents.py | 4 ++ 4 files changed, 37 insertions(+), 20 deletions(-) diff --git a/main/etc/sql/oracle_update.sql b/main/etc/sql/oracle_update.sql index 26a200f9..143b0eed 100644 --- a/main/etc/sql/oracle_update.sql +++ b/main/etc/sql/oracle_update.sql @@ -442,3 +442,12 @@ alter table contents add (sub_map_id NUMBER(12) default 0); alter table contents add (dep_sub_map_id NUMBER(12) default 0); alter table contents drop constraint CONTENT_ID_UQ; alter table contents add constraint CONTENT_ID_UQ UNIQUE (transform_id, coll_id, map_id, sub_map_id, dep_sub_map_id, content_relation_type, name, min_id, max_id) USING INDEX LOCAL; + + +--- 20230927 +alter table contents add (name_md5 varchar2(33), scope_name_md5 varchar2(33)); +update table contents set name_md5=md5(name), scope_name_md5=md5(scope || name); +alter table contents drop constraint CONTENT_ID_UQ; +alter table contents add constraint CONTENT_ID_UQ UNIQUE (transform_id, coll_id, map_id, sub_map_id, dep_sub_map_id, content_relation_type, name_md5, scope_name_md5, min_id, max_id) USING INDEX LOCAL; +drop index CONTENTS_ID_NAME_IDX; +CREATE INDEX CONTENTS_ID_NAME_IDX ON CONTENTS (coll_id, scope, md5(name), status); diff --git a/main/lib/idds/orm/base/alembic/versions/1bc6e82e8514_using_hash_on_contents_long_name.py b/main/lib/idds/orm/base/alembic/versions/1bc6e82e8514_using_hash_on_contents_long_name.py index 567f0901..49f7e33d 100644 --- a/main/lib/idds/orm/base/alembic/versions/1bc6e82e8514_using_hash_on_contents_long_name.py +++ b/main/lib/idds/orm/base/alembic/versions/1bc6e82e8514_using_hash_on_contents_long_name.py @@ -30,31 +30,32 @@ def upgrade() -> None: if context.get_context().dialect.name in ['oracle', 'mysql', 'postgresql']: schema = context.get_context().version_table_schema if context.get_context().version_table_schema else '' - try: - op.drop_constraint(constraint_name="CONTENT_ID_UQ", table_name="contents", schema=schema) - except Exception as ex: - print(ex) - try: - op.drop_index(index_name="CONTENTS_ID_NAME_IDX", table_name="contents", schema=schema) - except Exception as ex: - print(ex) + + op.drop_constraint(constraint_name="CONTENT_ID_UQ", table_name="contents", schema=schema) + op.drop_index(index_name="CONTENTS_ID_NAME_IDX", table_name="contents", schema=schema) + + op.add_column('contents', sa.Column('name_md5', sa.String(33)), schema=schema) + op.add_column('contents', sa.Column('scope_name_md5', sa.String(33)), schema=schema) + + # fill values for existing rows + op.execute('update %s.contents set name_md5=md5(name), scope_name_md5=md5(scope || name)' % schema) op.create_unique_constraint('CONTENT_ID_UQ', 'contents', - ['transform_id', 'coll_id', 'map_id', 'sub_map_id', 'dep_sub_map_id', 'content_relation_type', sa.func.adler32('name'), sa.func.md5('name'), sa.func.hash('name'), 'min_id', 'max_id'], + ['transform_id', 'coll_id', 'map_id', 'sub_map_id', 'dep_sub_map_id', 'content_relation_type', 'name_md5', 'scope_name_md5', 'min_id', 'max_id'], schema=schema) - op.create_index('CONTENTS_ID_NAME_IDX', 'contents', ['coll_id', 'scope', sa.func.hash('name'), 'status'], schema=schema) + + op.create_index('CONTENTS_ID_NAME_IDX', 'contents', ['coll_id', 'scope', sa.func.md5('name'), 'status'], schema=schema) def downgrade() -> None: if context.get_context().dialect.name in ['oracle', 'mysql', 'postgresql']: schema = context.get_context().version_table_schema if context.get_context().version_table_schema else '' - try: - op.drop_constraint(constraint_name="CONTENT_ID_UQ", table_name="contents", schema=schema) - except Exception as ex: - print(ex) - try: - op.drop_index(index_name="CONTENTS_ID_NAME_IDX", table_name="contents", schema=schema) - except Exception as ex: - print(ex) + + op.drop_constraint(constraint_name="CONTENT_ID_UQ", table_name="contents", schema=schema) + op.drop_index(index_name="CONTENTS_ID_NAME_IDX", table_name="contents", schema=schema) + + op.drop_column('contents', 'name_md5', schema=schema) + op.drop_column('contents', 'scope_name_md5', schema=schema) + op.create_unique_constraint('CONTENT_ID_UQ', 'contents', ['transform_id', 'coll_id', 'map_id', 'sub_map_id', 'dep_sub_map_id', 'content_relation_type', 'name', 'min_id', 'max_id'], schema=schema) op.create_index('CONTENTS_ID_NAME_IDX', 'contents', ['coll_id', 'scope', 'name', 'status'], schema=schema) diff --git a/main/lib/idds/orm/base/models.py b/main/lib/idds/orm/base/models.py index b94dc7cd..59d2545d 100644 --- a/main/lib/idds/orm/base/models.py +++ b/main/lib/idds/orm/base/models.py @@ -16,6 +16,7 @@ import datetime from enum import Enum +from sqlalchemy import func from sqlalchemy import BigInteger, Boolean, Column, DateTime, Integer, String, Float, event, DDL, Interval from sqlalchemy.ext.compiler import compiles # from sqlalchemy.ext.hybrid import hybrid_property @@ -543,6 +544,8 @@ class Content(BASE, ModelBase): content_dep_id = Column(BigInteger()) scope = Column(String(SCOPE_LENGTH)) name = Column(String(LONG_NAME_LENGTH)) + name_md5 = Column(String(String(33))) + scope_name_md5 = Column(String(String(33))) min_id = Column(Integer(), default=0) max_id = Column(Integer(), default=0) content_type = Column(EnumWithValue(ContentType), nullable=False) @@ -572,13 +575,13 @@ class Content(BASE, ModelBase): # UniqueConstraint('name', 'scope', 'coll_id', 'min_id', 'max_id', name='CONTENT_SCOPE_NAME_UQ'), # UniqueConstraint('content_id', 'coll_id', name='CONTENTS_UQ'), # UniqueConstraint('transform_id', 'coll_id', 'map_id', 'name', 'min_id', 'max_id', name='CONTENT_ID_UQ'), - UniqueConstraint('transform_id', 'coll_id', 'map_id', 'sub_map_id', 'dep_sub_map_id', 'content_relation_type', 'name', 'min_id', 'max_id', name='CONTENT_ID_UQ'), + UniqueConstraint('transform_id', 'coll_id', 'map_id', 'sub_map_id', 'dep_sub_map_id', 'content_relation_type', 'name_md5', 'scope_name_md5', 'min_id', 'max_id', name='CONTENT_ID_UQ'), ForeignKeyConstraint(['transform_id'], ['transforms.transform_id'], name='CONTENTS_TRANSFORM_ID_FK'), ForeignKeyConstraint(['coll_id'], ['collections.coll_id'], name='CONTENTS_COLL_ID_FK'), CheckConstraint('status IS NOT NULL', name='CONTENTS_STATUS_ID_NN'), CheckConstraint('coll_id IS NOT NULL', name='CONTENTS_COLL_ID_NN'), Index('CONTENTS_STATUS_UPDATED_IDX', 'status', 'locking', 'updated_at', 'created_at'), - Index('CONTENTS_ID_NAME_IDX', 'coll_id', 'scope', 'name', 'status'), + Index('CONTENTS_ID_NAME_IDX', 'coll_id', 'scope', func.md5('name'), 'status'), Index('CONTENTS_DEP_IDX', 'request_id', 'transform_id', 'content_dep_id'), Index('CONTENTS_REL_IDX', 'request_id', 'content_relation_type', 'transform_id', 'substatus'), Index('CONTENTS_TF_IDX', 'transform_id', 'request_id', 'coll_id', 'map_id', 'content_relation_type'), diff --git a/main/lib/idds/orm/contents.py b/main/lib/idds/orm/contents.py index e8244df4..8cbe26c6 100644 --- a/main/lib/idds/orm/contents.py +++ b/main/lib/idds/orm/contents.py @@ -65,6 +65,7 @@ def create_content(request_id, workload_id, transform_id, coll_id, map_id, scope scope=scope, name=name, min_id=min_id, max_id=max_id, content_type=content_type, content_relation_type=content_relation_type, status=status, bytes=bytes, md5=md5, + name_md5=func.md5(name), scope_name_md5=func.md5(name), adler32=adler32, processing_id=processing_id, storage_id=storage_id, retries=retries, path=path, expired_at=expired_at, locking=locking, content_metadata=content_metadata) @@ -144,6 +145,7 @@ def add_contents(contents, bulk_size=10000, session=None): 'locking': ContentLocking.Idle, 'content_relation_type': ContentRelationType.Input, 'bytes': 0, 'md5': None, 'adler32': None, 'processing_id': None, 'storage_id': None, 'retries': 0, 'path': None, + 'name_md5': None, 'scope_name_md5': None, 'expired_at': datetime.datetime.utcnow() + datetime.timedelta(days=30), 'content_metadata': None} @@ -151,6 +153,8 @@ def add_contents(contents, bulk_size=10000, session=None): for key in default_params: if key not in content: content[key] = default_params[key] + content['name_md5'] = func.md5(content['name']) + content['scope_name_md5'] = func.md5(content['name']) sub_params = [contents[i:i + bulk_size] for i in range(0, len(contents), bulk_size)] From 8c282786859664e3ae0f0b8ffe4a6eadbdbe5931 Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Thu, 28 Sep 2023 10:07:19 +0200 Subject: [PATCH 2/2] fix combine contents and contents_ext --- main/lib/idds/orm/contents.py | 6 ++- main/lib/idds/tests/panda_client_test.py | 23 +++++++++++ main/lib/idds/tests/test_catalog_core.py | 48 +++++++++++++++++++++++ main/lib/idds/tests/test_core_debug.py | 50 ++++++++++++++++++++++++ 4 files changed, 126 insertions(+), 1 deletion(-) create mode 100644 main/lib/idds/tests/panda_client_test.py create mode 100644 main/lib/idds/tests/test_catalog_core.py create mode 100644 main/lib/idds/tests/test_core_debug.py diff --git a/main/lib/idds/orm/contents.py b/main/lib/idds/orm/contents.py index 8cbe26c6..54c2c551 100644 --- a/main/lib/idds/orm/contents.py +++ b/main/lib/idds/orm/contents.py @@ -991,10 +991,14 @@ def combine_contents_ext(contents, contents_ext, with_status_name=False): rets = [] for content in contents: content_id = content['content_id'] + ret = content if content_id in contents_ext_map: + contents_ext_map[content_id].update(content) ret = contents_ext_map[content_id] else: - ret = {'content_id': content_id} + default_params = get_contents_ext_items() + default_params.update(content) + ret = default_params if with_status_name: ret['status'] = content['status'].name else: diff --git a/main/lib/idds/tests/panda_client_test.py b/main/lib/idds/tests/panda_client_test.py new file mode 100644 index 00000000..d142a981 --- /dev/null +++ b/main/lib/idds/tests/panda_client_test.py @@ -0,0 +1,23 @@ +import pandaclient.idds_api as idds_api +import idds.common.utils as idds_utils + +# setup connection and get jtids +conn = idds_api.get_api(idds_utils.json_dumps, idds_host=None, compress=True, manager=True) +reqid = 5460 +ret = conn.get_requests(request_id=int(reqid), with_detail=True) +jtids = [task["transform_workload_id"] for task in ret[1][1] if task["transform_status"]["attributes"]["_name_"] != "Finished"] +print(jtids) + +# this first jtid works fine +jtid = jtids[0] +print(jtid) +ret = conn.get_contents_output_ext(request_id=reqid, workload_id=jtid) +print(ret[1][0]) +# print(ret[1][1]) + +# other jtids do not work +jtid = jtids[1] +print(jtid) +ret = conn.get_contents_output_ext(request_id=reqid, workload_id=jtid) +print(ret[1][0]) +print(ret[1][1]) diff --git a/main/lib/idds/tests/test_catalog_core.py b/main/lib/idds/tests/test_catalog_core.py new file mode 100644 index 00000000..ad21f43c --- /dev/null +++ b/main/lib/idds/tests/test_catalog_core.py @@ -0,0 +1,48 @@ +#!/usr/bin/env python +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# You may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0OA +# +# Authors: +# - Wen Guan, , 2023 + + +from idds.common.constants import CollectionRelationType +from idds.common.utils import json_dumps +from idds.core.catalog import get_contents, get_contents_ext, combine_contents_ext + + +def test(request_id, workload_id, transform_id=None, group_by_jedi_task_id=False): + contents = get_contents(request_id=request_id, workload_id=workload_id, transform_id=transform_id, + relation_type=CollectionRelationType.Output) + print(len(contents)) + contents_ext = get_contents_ext(request_id=request_id, workload_id=workload_id, transform_id=transform_id) + + print((len(contents_ext))) + ret_contents = combine_contents_ext(contents, contents_ext, with_status_name=True) + print(len(ret_contents)) + rets = {} + for content in ret_contents: + if group_by_jedi_task_id: + jedi_task_id = content.get('jedi_task_id', 'None') + if jedi_task_id not in rets: + rets[jedi_task_id] = [] + rets[jedi_task_id].append(content) + else: + transform_id = content.get('transform_id') + if transform_id not in rets: + rets[transform_id] = [] + rets[transform_id].append(content) + return rets + + +if __name__ == '__main__': + print('164443') + rets = test(5460, 164443) + # print(json_dumps(rets, sort_keys=True, indent=4)) + print('164448') + rets = test(5460, 164448) + # print(str(rets)) + print(json_dumps(rets, sort_keys=True, indent=4)) diff --git a/main/lib/idds/tests/test_core_debug.py b/main/lib/idds/tests/test_core_debug.py new file mode 100644 index 00000000..fb321351 --- /dev/null +++ b/main/lib/idds/tests/test_core_debug.py @@ -0,0 +1,50 @@ +import sys # noqa F401 + +from idds.common.constants import ContentStatus +from idds.common.utils import json_loads +from idds.orm.contents import update_contents_ext, update_contents # noqa F401 + + +params = [{'content_id': 188308635, 'status': ContentStatus.Available}, + {'content_id': 188308636, 'status': ContentStatus.Available}, + {'content_id': 188308637, 'status': ContentStatus.Available}, + {'content_id': 188308638, 'status': ContentStatus.Available}, + {'content_id': 188308639, 'status': ContentStatus.Available}] +params = [{'content_id': 188741016, 'status': ContentStatus.Available}, + {'content_id': 188741017, 'status': ContentStatus.Available}, + {'content_id': 188741018, 'status': ContentStatus.Available}, + {'content_id': 188741019, 'status': ContentStatus.Available}, + {'content_id': 188741020, 'status': ContentStatus.Available}, + {'content_id': 188741021, 'status': ContentStatus.Available}, + {'content_id': 188741022, 'status': ContentStatus.Available}, + {'content_id': 188741023, 'status': ContentStatus.Available}, + {'content_id': 188741024, 'status': ContentStatus.Available}, + {'content_id': 188741025, 'status': ContentStatus.Available}, + {'content_id': 188741026, 'status': ContentStatus.Available}, + {'content_id': 188741027, 'status': ContentStatus.Available}] +# update_contents(params) + +# sys.exit(0) + +file = '/afs/cern.ch/user/w/wguan/workdisk/iDDS/test/test.log' +f = open(file, "r") +c = f.read() +j = json_loads(c) +contents_ext = j['update_contents_ext'] + +new_list = [] + +for col in contents_ext: + new_l = {} + for k in col: + if col[k] is not None: + new_l[k] = col[k] + # new_l['job_status'] = 10 + new_list.append(new_l) + # break + +print("new_list") +# print(new_list) +print("new_list len: %s" % len(new_list)) +update_contents_ext(new_list) +# update_contents_ext(contents_ext)