Skip to content

Commit

Permalink
Merge pull request #214 from wguanicedew/dev
Browse files Browse the repository at this point in the history
improve
  • Loading branch information
wguanicedew authored Sep 28, 2023
2 parents 26e596b + 8c28278 commit dd7738b
Show file tree
Hide file tree
Showing 7 changed files with 163 additions and 21 deletions.
9 changes: 9 additions & 0 deletions main/etc/sql/oracle_update.sql
Original file line number Diff line number Diff line change
Expand Up @@ -442,3 +442,12 @@ alter table contents add (sub_map_id NUMBER(12) default 0);
alter table contents add (dep_sub_map_id NUMBER(12) default 0);
alter table contents drop constraint CONTENT_ID_UQ;
alter table contents add constraint CONTENT_ID_UQ UNIQUE (transform_id, coll_id, map_id, sub_map_id, dep_sub_map_id, content_relation_type, name, min_id, max_id) USING INDEX LOCAL;


--- 20230927
alter table contents add (name_md5 varchar2(33), scope_name_md5 varchar2(33));
update table contents set name_md5=md5(name), scope_name_md5=md5(scope || name);
alter table contents drop constraint CONTENT_ID_UQ;
alter table contents add constraint CONTENT_ID_UQ UNIQUE (transform_id, coll_id, map_id, sub_map_id, dep_sub_map_id, content_relation_type, name_md5, scope_name_md5, min_id, max_id) USING INDEX LOCAL;
drop index CONTENTS_ID_NAME_IDX;
CREATE INDEX CONTENTS_ID_NAME_IDX ON CONTENTS (coll_id, scope, md5(name), status);
Original file line number Diff line number Diff line change
Expand Up @@ -30,31 +30,32 @@
def upgrade() -> None:
if context.get_context().dialect.name in ['oracle', 'mysql', 'postgresql']:
schema = context.get_context().version_table_schema if context.get_context().version_table_schema else ''
try:
op.drop_constraint(constraint_name="CONTENT_ID_UQ", table_name="contents", schema=schema)
except Exception as ex:
print(ex)
try:
op.drop_index(index_name="CONTENTS_ID_NAME_IDX", table_name="contents", schema=schema)
except Exception as ex:
print(ex)

op.drop_constraint(constraint_name="CONTENT_ID_UQ", table_name="contents", schema=schema)
op.drop_index(index_name="CONTENTS_ID_NAME_IDX", table_name="contents", schema=schema)

op.add_column('contents', sa.Column('name_md5', sa.String(33)), schema=schema)
op.add_column('contents', sa.Column('scope_name_md5', sa.String(33)), schema=schema)

# fill values for existing rows
op.execute('update %s.contents set name_md5=md5(name), scope_name_md5=md5(scope || name)' % schema)

op.create_unique_constraint('CONTENT_ID_UQ', 'contents',
['transform_id', 'coll_id', 'map_id', 'sub_map_id', 'dep_sub_map_id', 'content_relation_type', sa.func.adler32('name'), sa.func.md5('name'), sa.func.hash('name'), 'min_id', 'max_id'],
['transform_id', 'coll_id', 'map_id', 'sub_map_id', 'dep_sub_map_id', 'content_relation_type', 'name_md5', 'scope_name_md5', 'min_id', 'max_id'],
schema=schema)
op.create_index('CONTENTS_ID_NAME_IDX', 'contents', ['coll_id', 'scope', sa.func.hash('name'), 'status'], schema=schema)

op.create_index('CONTENTS_ID_NAME_IDX', 'contents', ['coll_id', 'scope', sa.func.md5('name'), 'status'], schema=schema)


def downgrade() -> None:
if context.get_context().dialect.name in ['oracle', 'mysql', 'postgresql']:
schema = context.get_context().version_table_schema if context.get_context().version_table_schema else ''
try:
op.drop_constraint(constraint_name="CONTENT_ID_UQ", table_name="contents", schema=schema)
except Exception as ex:
print(ex)
try:
op.drop_index(index_name="CONTENTS_ID_NAME_IDX", table_name="contents", schema=schema)
except Exception as ex:
print(ex)

op.drop_constraint(constraint_name="CONTENT_ID_UQ", table_name="contents", schema=schema)
op.drop_index(index_name="CONTENTS_ID_NAME_IDX", table_name="contents", schema=schema)

op.drop_column('contents', 'name_md5', schema=schema)
op.drop_column('contents', 'scope_name_md5', schema=schema)

op.create_unique_constraint('CONTENT_ID_UQ', 'contents', ['transform_id', 'coll_id', 'map_id', 'sub_map_id', 'dep_sub_map_id', 'content_relation_type', 'name', 'min_id', 'max_id'], schema=schema)
op.create_index('CONTENTS_ID_NAME_IDX', 'contents', ['coll_id', 'scope', 'name', 'status'], schema=schema)
7 changes: 5 additions & 2 deletions main/lib/idds/orm/base/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import datetime
from enum import Enum

from sqlalchemy import func
from sqlalchemy import BigInteger, Boolean, Column, DateTime, Integer, String, Float, event, DDL, Interval
from sqlalchemy.ext.compiler import compiles
# from sqlalchemy.ext.hybrid import hybrid_property
Expand Down Expand Up @@ -543,6 +544,8 @@ class Content(BASE, ModelBase):
content_dep_id = Column(BigInteger())
scope = Column(String(SCOPE_LENGTH))
name = Column(String(LONG_NAME_LENGTH))
name_md5 = Column(String(String(33)))
scope_name_md5 = Column(String(String(33)))
min_id = Column(Integer(), default=0)
max_id = Column(Integer(), default=0)
content_type = Column(EnumWithValue(ContentType), nullable=False)
Expand Down Expand Up @@ -572,13 +575,13 @@ class Content(BASE, ModelBase):
# UniqueConstraint('name', 'scope', 'coll_id', 'min_id', 'max_id', name='CONTENT_SCOPE_NAME_UQ'),
# UniqueConstraint('content_id', 'coll_id', name='CONTENTS_UQ'),
# UniqueConstraint('transform_id', 'coll_id', 'map_id', 'name', 'min_id', 'max_id', name='CONTENT_ID_UQ'),
UniqueConstraint('transform_id', 'coll_id', 'map_id', 'sub_map_id', 'dep_sub_map_id', 'content_relation_type', 'name', 'min_id', 'max_id', name='CONTENT_ID_UQ'),
UniqueConstraint('transform_id', 'coll_id', 'map_id', 'sub_map_id', 'dep_sub_map_id', 'content_relation_type', 'name_md5', 'scope_name_md5', 'min_id', 'max_id', name='CONTENT_ID_UQ'),
ForeignKeyConstraint(['transform_id'], ['transforms.transform_id'], name='CONTENTS_TRANSFORM_ID_FK'),
ForeignKeyConstraint(['coll_id'], ['collections.coll_id'], name='CONTENTS_COLL_ID_FK'),
CheckConstraint('status IS NOT NULL', name='CONTENTS_STATUS_ID_NN'),
CheckConstraint('coll_id IS NOT NULL', name='CONTENTS_COLL_ID_NN'),
Index('CONTENTS_STATUS_UPDATED_IDX', 'status', 'locking', 'updated_at', 'created_at'),
Index('CONTENTS_ID_NAME_IDX', 'coll_id', 'scope', 'name', 'status'),
Index('CONTENTS_ID_NAME_IDX', 'coll_id', 'scope', func.md5('name'), 'status'),
Index('CONTENTS_DEP_IDX', 'request_id', 'transform_id', 'content_dep_id'),
Index('CONTENTS_REL_IDX', 'request_id', 'content_relation_type', 'transform_id', 'substatus'),
Index('CONTENTS_TF_IDX', 'transform_id', 'request_id', 'coll_id', 'map_id', 'content_relation_type'),
Expand Down
10 changes: 9 additions & 1 deletion main/lib/idds/orm/contents.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ def create_content(request_id, workload_id, transform_id, coll_id, map_id, scope
scope=scope, name=name, min_id=min_id, max_id=max_id,
content_type=content_type, content_relation_type=content_relation_type,
status=status, bytes=bytes, md5=md5,
name_md5=func.md5(name), scope_name_md5=func.md5(name),
adler32=adler32, processing_id=processing_id, storage_id=storage_id,
retries=retries, path=path, expired_at=expired_at, locking=locking,
content_metadata=content_metadata)
Expand Down Expand Up @@ -144,13 +145,16 @@ def add_contents(contents, bulk_size=10000, session=None):
'locking': ContentLocking.Idle, 'content_relation_type': ContentRelationType.Input,
'bytes': 0, 'md5': None, 'adler32': None, 'processing_id': None,
'storage_id': None, 'retries': 0, 'path': None,
'name_md5': None, 'scope_name_md5': None,
'expired_at': datetime.datetime.utcnow() + datetime.timedelta(days=30),
'content_metadata': None}

for content in contents:
for key in default_params:
if key not in content:
content[key] = default_params[key]
content['name_md5'] = func.md5(content['name'])
content['scope_name_md5'] = func.md5(content['name'])

sub_params = [contents[i:i + bulk_size] for i in range(0, len(contents), bulk_size)]

Expand Down Expand Up @@ -987,10 +991,14 @@ def combine_contents_ext(contents, contents_ext, with_status_name=False):
rets = []
for content in contents:
content_id = content['content_id']
ret = content
if content_id in contents_ext_map:
contents_ext_map[content_id].update(content)
ret = contents_ext_map[content_id]
else:
ret = {'content_id': content_id}
default_params = get_contents_ext_items()
default_params.update(content)
ret = default_params
if with_status_name:
ret['status'] = content['status'].name
else:
Expand Down
23 changes: 23 additions & 0 deletions main/lib/idds/tests/panda_client_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import pandaclient.idds_api as idds_api
import idds.common.utils as idds_utils

# setup connection and get jtids
conn = idds_api.get_api(idds_utils.json_dumps, idds_host=None, compress=True, manager=True)
reqid = 5460
ret = conn.get_requests(request_id=int(reqid), with_detail=True)
jtids = [task["transform_workload_id"] for task in ret[1][1] if task["transform_status"]["attributes"]["_name_"] != "Finished"]
print(jtids)

# this first jtid works fine
jtid = jtids[0]
print(jtid)
ret = conn.get_contents_output_ext(request_id=reqid, workload_id=jtid)
print(ret[1][0])
# print(ret[1][1])

# other jtids do not work
jtid = jtids[1]
print(jtid)
ret = conn.get_contents_output_ext(request_id=reqid, workload_id=jtid)
print(ret[1][0])
print(ret[1][1])
48 changes: 48 additions & 0 deletions main/lib/idds/tests/test_catalog_core.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
#!/usr/bin/env python
#
# Licensed under the Apache License, Version 2.0 (the "License");
# You may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0OA
#
# Authors:
# - Wen Guan, <[email protected]>, 2023


from idds.common.constants import CollectionRelationType
from idds.common.utils import json_dumps
from idds.core.catalog import get_contents, get_contents_ext, combine_contents_ext


def test(request_id, workload_id, transform_id=None, group_by_jedi_task_id=False):
contents = get_contents(request_id=request_id, workload_id=workload_id, transform_id=transform_id,
relation_type=CollectionRelationType.Output)
print(len(contents))
contents_ext = get_contents_ext(request_id=request_id, workload_id=workload_id, transform_id=transform_id)

print((len(contents_ext)))
ret_contents = combine_contents_ext(contents, contents_ext, with_status_name=True)
print(len(ret_contents))
rets = {}
for content in ret_contents:
if group_by_jedi_task_id:
jedi_task_id = content.get('jedi_task_id', 'None')
if jedi_task_id not in rets:
rets[jedi_task_id] = []
rets[jedi_task_id].append(content)
else:
transform_id = content.get('transform_id')
if transform_id not in rets:
rets[transform_id] = []
rets[transform_id].append(content)
return rets


if __name__ == '__main__':
print('164443')
rets = test(5460, 164443)
# print(json_dumps(rets, sort_keys=True, indent=4))
print('164448')
rets = test(5460, 164448)
# print(str(rets))
print(json_dumps(rets, sort_keys=True, indent=4))
50 changes: 50 additions & 0 deletions main/lib/idds/tests/test_core_debug.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import sys # noqa F401

from idds.common.constants import ContentStatus
from idds.common.utils import json_loads
from idds.orm.contents import update_contents_ext, update_contents # noqa F401


params = [{'content_id': 188308635, 'status': ContentStatus.Available},
{'content_id': 188308636, 'status': ContentStatus.Available},
{'content_id': 188308637, 'status': ContentStatus.Available},
{'content_id': 188308638, 'status': ContentStatus.Available},
{'content_id': 188308639, 'status': ContentStatus.Available}]
params = [{'content_id': 188741016, 'status': ContentStatus.Available},
{'content_id': 188741017, 'status': ContentStatus.Available},
{'content_id': 188741018, 'status': ContentStatus.Available},
{'content_id': 188741019, 'status': ContentStatus.Available},
{'content_id': 188741020, 'status': ContentStatus.Available},
{'content_id': 188741021, 'status': ContentStatus.Available},
{'content_id': 188741022, 'status': ContentStatus.Available},
{'content_id': 188741023, 'status': ContentStatus.Available},
{'content_id': 188741024, 'status': ContentStatus.Available},
{'content_id': 188741025, 'status': ContentStatus.Available},
{'content_id': 188741026, 'status': ContentStatus.Available},
{'content_id': 188741027, 'status': ContentStatus.Available}]
# update_contents(params)

# sys.exit(0)

file = '/afs/cern.ch/user/w/wguan/workdisk/iDDS/test/test.log'
f = open(file, "r")
c = f.read()
j = json_loads(c)
contents_ext = j['update_contents_ext']

new_list = []

for col in contents_ext:
new_l = {}
for k in col:
if col[k] is not None:
new_l[k] = col[k]
# new_l['job_status'] = 10
new_list.append(new_l)
# break

print("new_list")
# print(new_list)
print("new_list len: %s" % len(new_list))
update_contents_ext(new_list)
# update_contents_ext(contents_ext)

0 comments on commit dd7738b

Please sign in to comment.