Skip to content

Commit

Permalink
Merge pull request #133 from HSF/dev
Browse files Browse the repository at this point in the history
optimize
  • Loading branch information
wguanicedew authored Feb 10, 2023
2 parents ef06693 + 4a8673d commit f5ed375
Show file tree
Hide file tree
Showing 45 changed files with 1,161 additions and 170 deletions.
4 changes: 3 additions & 1 deletion atlas/lib/idds/atlas/workflow/atlasstageinwork.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,9 @@ def poll_rule(self, processing):
scope_name = '%s:%s' % (lock['scope'], lock['name'])
if lock['state'] == 'OK':
replicases_status[scope_name] = ContentStatus.Available # 'OK'
return processing, rule['state'], replicases_status
return processing, rule['state'], replicases_status
else:
return processing, 'notOk', replicases_status
except RucioRuleNotFound as ex:
msg = "rule(%s) not found: %s" % (str(rule_id), str(ex))
raise exceptions.ProcessNotFound(msg)
Expand Down
4 changes: 3 additions & 1 deletion atlas/lib/idds/atlas/workflowv2/atlasstageinwork.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,9 @@ def poll_rule(self, processing):
scope_name = '%s:%s' % (lock['scope'], lock['name'])
if lock['state'] == 'OK':
replicases_status[scope_name] = ContentStatus.Available # 'OK'
return processing, rule['state'], replicases_status
return processing, rule['state'], replicases_status
else:
return processing, 'notOk', replicases_status
except RucioRuleNotFound as ex:
msg = "rule(%s) not found: %s" % (str(rule_id), str(ex))
raise exceptions.ProcessNotFound(msg)
Expand Down
2 changes: 2 additions & 0 deletions common/lib/idds/common/authentication.py
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,7 @@ def get_user_name_from_dn1(dn):
username = username.replace('/CN=limited proxy', '')
username = username.replace('limited proxy', '')
username = re.sub('/CN=Robot:[^/]+', '', username)
username = re.sub('/CN=Robot[^/]+', '', username)
username = re.sub('/CN=nickname:[^/]+', '', username)
pat = re.compile('.*/CN=([^\/]+)/CN=([^\/]+)') # noqa W605
mat = pat.match(username)
Expand Down Expand Up @@ -428,6 +429,7 @@ def get_user_name_from_dn2(dn):
username = username.replace(',CN=limited proxy', '')
username = username.replace('limited proxy', '')
username = re.sub(',CN=Robot:[^/]+', '', username)
username = re.sub(',CN=Robot[^/]+', '', username)
username = re.sub(',CN=nickname:[^/]+', '', username)
pat = re.compile('.*,CN=([^\,]+),CN=([^\,]+)') # noqa W605
mat = pat.match(username)
Expand Down
36 changes: 33 additions & 3 deletions doma/lib/idds/doma/workflowv2/domapandawork.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
# http://www.apache.org/licenses/LICENSE-2.0OA
#
# Authors:
# - Wen Guan, <[email protected]>, 2020 - 2022
# - Wen Guan, <[email protected]>, 2020 - 2023
# - Sergey Padolski, <[email protected]>, 2020


Expand Down Expand Up @@ -121,6 +121,36 @@ def my_condition(self):
return True
return False

@property
def dependency_map(self):
return self._dependency_map

@dependency_map.setter
def dependency_map(self, value):
if value:
if type(value) not in [list, tuple]:
raise exceptions.IDDSException("dependency_map should be a list or tuple")
item_names = {}
for item in value:
item_name = item['name']
inputs_dependency = item["dependencies"]
if item_name not in item_names:
item_names[item_name] = item
else:
raise exceptions.IDDSException("duplicated item with the same name: %s" % item_name)

uni_input_name = {}
for input_d in inputs_dependency:
task_name = input_d['task']
input_name = input_d['inputname']
task_name_input_name = task_name + input_name
if task_name_input_name not in uni_input_name:
uni_input_name[task_name_input_name] = None
else:
raise exceptions.IDDSException("duplicated input dependency for item %s: %s" % (item_name, inputs_dependency))

self._dependency_map = value

def load_panda_config(self):
panda_config = ConfigParser.ConfigParser()
if os.environ.get('IDDS_PANDA_CONFIG', None):
Expand Down Expand Up @@ -1222,8 +1252,8 @@ def poll_processing_updates(self, processing, input_output_maps, contents_ext=No
log_prefix=log_prefix)

processing_status, update_contents, update_contents_full, new_contents_ext, update_contents_ext = ret_poll_panda_task
# self.logger.debug(log_prefix + "poll_processing_updates, processing_status: %s" % str(processing_status))
# self.logger.debug(log_prefix + "poll_processing_updates, update_contents[:10]: %s" % str(update_contents[:10]))
self.logger.debug(log_prefix + "poll_processing_updates, processing_status: %s" % str(processing_status))
self.logger.debug(log_prefix + "poll_processing_updates, update_contents[:3]: %s" % str(update_contents[:3]))

if update_contents:
proc.has_new_updates()
Expand Down
107 changes: 107 additions & 0 deletions main/config_default/alembic.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
# A generic, single database configuration.

[alembic]
# path to migration scripts
# script_location = alembic
script_location = /opt/idds/lib/python3.9/site-packages/idds/orm/base/alembic

# template used to generate migration file names; The default value is %%(rev)s_%%(slug)s
# Uncomment the line below if you want the files to be prepended with date and time
# see https://alembic.sqlalchemy.org/en/latest/tutorial.html#editing-the-ini-file
# for all available tokens
# file_template = %%(year)d_%%(month).2d_%%(day).2d_%%(hour).2d%%(minute).2d-%%(rev)s_%%(slug)s

# sys.path path, will be prepended to sys.path if present.
# defaults to the current working directory.
prepend_sys_path = .

# timezone to use when rendering the date within the migration file
# as well as the filename.
# If specified, requires the python-dateutil library that can be
# installed by adding `alembic[tz]` to the pip requirements
# string value is passed to dateutil.tz.gettz()
# leave blank for localtime
# timezone =
timezone = UTC

# max length of characters to apply to the
# "slug" field
# truncate_slug_length = 40

# set to 'true' to run the environment during
# the 'revision' command, regardless of autogenerate
# revision_environment = false

# set to 'true' to allow .pyc and .pyo files without
# a source .py file to be detected as revisions in the
# versions/ directory
# sourceless = false

# version location specification; This defaults
# to alembic/versions. When using multiple version
# directories, initial revisions must be specified with --version-path.
# The path separator used here should be the separator specified by "version_path_separator" below.
# version_locations = %(here)s/bar:%(here)s/bat:alembic/versions

# version path separator; As mentioned above, this is the character used to split
# version_locations. The default within new alembic.ini files is "os", which uses os.pathsep.
# If this key is omitted entirely, it falls back to the legacy behavior of splitting on spaces and/or commas.
# Valid values for version_path_separator are:
#
# version_path_separator = :
# version_path_separator = ;
# version_path_separator = space
version_path_separator = os # Use os.pathsep. Default configuration used for new projects.

# the output encoding used when revision files
# are written from script.py.mako
# output_encoding = utf-8

sqlalchemy.url = driver://user:pass@localhost/dbname
version_table_schema = DOMA_IDDS

[post_write_hooks]
# post_write_hooks defines scripts or Python functions that are run
# on newly generated revision scripts. See the documentation for further
# detail and examples

# format using "black" - use the console_scripts runner, against the "black" entrypoint
# hooks = black
# black.type = console_scripts
# black.entrypoint = black
# black.options = -l 79 REVISION_SCRIPT_FILENAME

# Logging configuration
[loggers]
keys = root,sqlalchemy,alembic

[handlers]
keys = console

[formatters]
keys = generic

[logger_root]
level = WARN
handlers = console
qualname =

[logger_sqlalchemy]
level = WARN
handlers =
qualname = sqlalchemy.engine

[logger_alembic]
level = INFO
handlers =
qualname = alembic

[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic

[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S
107 changes: 107 additions & 0 deletions main/etc/idds/alembic.ini.template
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
# A generic, single database configuration.

[alembic]
# path to migration scripts
# script_location = alembic
script_location = main/lib/idds/orm/base/alembic

# template used to generate migration file names; The default value is %%(rev)s_%%(slug)s
# Uncomment the line below if you want the files to be prepended with date and time
# see https://alembic.sqlalchemy.org/en/latest/tutorial.html#editing-the-ini-file
# for all available tokens
# file_template = %%(year)d_%%(month).2d_%%(day).2d_%%(hour).2d%%(minute).2d-%%(rev)s_%%(slug)s

# sys.path path, will be prepended to sys.path if present.
# defaults to the current working directory.
prepend_sys_path = .

# timezone to use when rendering the date within the migration file
# as well as the filename.
# If specified, requires the python-dateutil library that can be
# installed by adding `alembic[tz]` to the pip requirements
# string value is passed to dateutil.tz.gettz()
# leave blank for localtime
# timezone =
timezone = UTC

# max length of characters to apply to the
# "slug" field
# truncate_slug_length = 40

# set to 'true' to run the environment during
# the 'revision' command, regardless of autogenerate
# revision_environment = false

# set to 'true' to allow .pyc and .pyo files without
# a source .py file to be detected as revisions in the
# versions/ directory
# sourceless = false

# version location specification; This defaults
# to alembic/versions. When using multiple version
# directories, initial revisions must be specified with --version-path.
# The path separator used here should be the separator specified by "version_path_separator" below.
# version_locations = %(here)s/bar:%(here)s/bat:alembic/versions

# version path separator; As mentioned above, this is the character used to split
# version_locations. The default within new alembic.ini files is "os", which uses os.pathsep.
# If this key is omitted entirely, it falls back to the legacy behavior of splitting on spaces and/or commas.
# Valid values for version_path_separator are:
#
# version_path_separator = :
# version_path_separator = ;
# version_path_separator = space
version_path_separator = os # Use os.pathsep. Default configuration used for new projects.

# the output encoding used when revision files
# are written from script.py.mako
# output_encoding = utf-8

sqlalchemy.url = driver://user:pass@localhost/dbname
version_table_schema = DOMA_IDDS

[post_write_hooks]
# post_write_hooks defines scripts or Python functions that are run
# on newly generated revision scripts. See the documentation for further
# detail and examples

# format using "black" - use the console_scripts runner, against the "black" entrypoint
# hooks = black
# black.type = console_scripts
# black.entrypoint = black
# black.options = -l 79 REVISION_SCRIPT_FILENAME

# Logging configuration
[loggers]
keys = root,sqlalchemy,alembic

[handlers]
keys = console

[formatters]
keys = generic

[logger_root]
level = WARN
handlers = console
qualname =

[logger_sqlalchemy]
level = WARN
handlers =
qualname = sqlalchemy.engine

[logger_alembic]
level = INFO
handlers =
qualname = alembic

[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic

[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S
66 changes: 66 additions & 0 deletions main/etc/sql/oracle_update.sql
Original file line number Diff line number Diff line change
Expand Up @@ -219,3 +219,69 @@ INTERVAL ( 100000 )
( PARTITION initial_part VALUES LESS THAN (1) );

CREATE INDEX CONTENTS_EXT_RTF_IDX ON CONTENTS_ext (request_id, transform_id, workload_id, coll_id, content_id, panda_id, status) LOCAL;


-- 2022.12.29
create table contents_update(content_id number(12), substatus number(2))
CREATE TRIGGER update_content_dep_status before delete ON contents_update
for each row
BEGIN
update contents set substatus = :old.substatus where contents.content_dep_id = :old.content_id;
END;

-- 2023.01.24
alter table CONTENTS_ext modify max_cpu_count NUMBER(12);

-- 2023.01.25
alter table contents_update add (
request_id NUMBER(12),
transform_id NUMBER(12),
workload_id NUMBER(10),
coll_id NUMBER(14));

-- 2023.02.01
--- update (select c.content_id, c.substatus as c_substatus, t.substatus as t_substatus from contents c inner join
-- (select content_id, substatus from contents where request_id=486 and transform_id=3027 and content_relation_type =1 and status != substatus) t
--- on c.content_dep_id = t.content_id where c.request_id=486 and c.substatus != t.substatus) set c_substatus = t_substatus;

--- remove
"""
CREATE OR REPLACE FUNCTION update_contents_to_others(request_id_in IN NUMBER, transform_id_in IN NUMBER)
RETURN NUMBER
IS num_rows NUMBER;
BEGIN
update (select c.content_id, c.substatus as c_substatus, t.substatus as t_substatus from contents c inner join
(select content_id, substatus from contents where request_id = request_id_in and transform_id = transform_id_in and content_relation_type = 1 and status != substatus) t
on c.content_dep_id = t.content_id where c.request_id = request_id_in and c.substatus != t.substatus) set c_substatus = t_substatus;

num_rows := SQL%rowcount;
RETURN (num_rows);
END;

CREATE OR REPLACE FUNCTION update_contents_from_others(request_id_in IN NUMBER, transform_id_in IN NUMBER)
RETURN NUMBER
IS num_rows NUMBER;
BEGIN
update (select c.content_id, c.substatus as c_substatus, t.substatus as t_substatus from contents c inner join
(select content_id, substatus from contents where request_id = request_id_in and content_relation_type = 1 and status != 0) t
on c.content_dep_id = t.content_id where c.request_id = request_id_in and c.transform_id = transform_id_in and c.substatus != t.substatus) set c_substatus = t_substatus;

num_rows := SQL%rowcount;
RETURN (num_rows);
END;
"""

CREATE OR REPLACE procedure update_contents_from_others(request_id_in NUMBER, transform_id_in NUMBER) AS
BEGIN
update (select c.content_id, c.substatus as c_substatus, t.substatus as t_substatus from contents c inner join
(select content_id, substatus from contents where request_id = request_id_in and content_relation_type = 1 and status != 0) t
on c.content_dep_id = t.content_id where c.request_id = request_id_in and c.transform_id = transform_id_in and c.substatus != t.substatus) set c_substatus = t_substatus;
END;


CREATE OR REPLACE procedure update_contents_to_others(request_id_in NUMBER, transform_id_in NUMBER) AS
BEGIN
update (select c.content_id, c.substatus as c_substatus, t.substatus as t_substatus from contents c inner join
(select content_id, substatus from contents where request_id = request_id_in and transform_id = transform_id_in and content_relation_type = 1 and status != substatus) t
on c.content_dep_id = t.content_id where c.request_id = request_id_in and c.substatus != t.substatus) set c_substatus = t_substatus;
END;
7 changes: 4 additions & 3 deletions main/lib/idds/agents/carrier/poller.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,11 @@ def get_processing(self, processing_id, status=None, locking=False):
return None

def get_work_tag_attribute(self, work_tag, attribute):
work_tag_attribute = work_tag + "_" + attribute
work_tag_attribute_value = None
if hasattr(self, work_tag_attribute):
work_tag_attribute_value = int(getattr(self, work_tag_attribute))
if work_tag:
work_tag_attribute = work_tag + "_" + attribute
if hasattr(self, work_tag_attribute):
work_tag_attribute_value = int(getattr(self, work_tag_attribute))
return work_tag_attribute_value

def load_poll_period(self, processing, parameters):
Expand Down
Loading

0 comments on commit f5ed375

Please sign in to comment.