From 181035fea8bca3b99482c040f87a8cad967978dd Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Wed, 18 Jan 2023 14:39:03 +0100 Subject: [PATCH 01/31] optimize syslog-ng format --- main/tools/syslog-ng/config_syslog_ng.py | 7 ++++--- main/tools/syslog-ng/http.conf | 6 +++--- main/tools/syslog-ng/idds.conf | 8 ++++---- main/tools/syslog-ng/syslog-ng.conf | 2 +- 4 files changed, 12 insertions(+), 11 deletions(-) diff --git a/main/tools/syslog-ng/config_syslog_ng.py b/main/tools/syslog-ng/config_syslog_ng.py index 63fcce6b..5656a2a7 100644 --- a/main/tools/syslog-ng/config_syslog_ng.py +++ b/main/tools/syslog-ng/config_syslog_ng.py @@ -14,10 +14,11 @@ def get_files(source): sources.append(name) return sources - +# multi-line-mode(indented) +# multi-line-mode(regexp) multi-line-prefix("[0-9]{4}\.[0-9]{2}\.[0-9]{2}\.") flags(no-parse) def get_file_template(): template = """source s_${filename} { - file("$source"); + file("$source" multi-line-mode(indented)); }; destination d_${filename} { file( @@ -32,7 +33,7 @@ def get_file_template(): def get_pipe_template(): template = """source s_${filename} { - file("$source"); + file("$source" multi-line-mode(indented)); }; destination d_${filename} { pipe( diff --git a/main/tools/syslog-ng/http.conf b/main/tools/syslog-ng/http.conf index 975d7f81..3c10ea0a 100644 --- a/main/tools/syslog-ng/http.conf +++ b/main/tools/syslog-ng/http.conf @@ -1,5 +1,5 @@ source s_httpd_error { - file("/var/log/idds/httpd_error_log"); + file("/var/log/idds/httpd_error_log" multi-line-mode(indented)); }; destination d_httpd_error { pipe( @@ -9,7 +9,7 @@ destination d_httpd_error { log { source(s_httpd_error); destination(d_httpd_error); }; source s_httpd_access { - file("/var/log/idds/httpd_access_log"); + file("/var/log/idds/httpd_access_log" multi-line-mode(indented)); }; destination d_httpd_access { pipe( @@ -19,7 +19,7 @@ destination d_httpd_access { log { source(s_httpd_access); destination(d_httpd_access); }; source s_httpd_ssl { - file("/var/log/idds/httpd_ssl_log"); + file("/var/log/idds/httpd_ssl_log" multi-line-mode(indented)); }; destination d_httpd_ssl { pipe( diff --git a/main/tools/syslog-ng/idds.conf b/main/tools/syslog-ng/idds.conf index 51b7cde9..98443ae9 100644 --- a/main/tools/syslog-ng/idds.conf +++ b/main/tools/syslog-ng/idds.conf @@ -1,5 +1,5 @@ source s_Receiver { - file("/var/log/idds/Receiver.log"); + file("/var/log/idds/Receiver.log" multi-line-mode(regexp) multi-line-prefix("[0-9]{4}\.[0-9]{2}\.[0-9]{2}\.") flags(no-parse)); }; destination d_Receiver { pipe( @@ -9,7 +9,7 @@ destination d_Receiver { log { source(s_Receiver); destination(d_Receiver); }; source s_idds-server-stdout { - file("/var/log/idds/idds-server-stdout.log"); + file("/var/log/idds/idds-server-stdout.log" multi-line-mode(regexp) multi-line-prefix("[0-9]{4}\.[0-9]{2}\.[0-9]{2}\.") flags(no-parse)); }; destination d_idds-server-stdout { pipe( @@ -19,7 +19,7 @@ destination d_idds-server-stdout { log { source(s_idds-server-stdout); destination(d_idds-server-stdout); }; source s_Conductor { - file("/var/log/idds/Conductor.log"); + file("/var/log/idds/Conductor.log" multi-line-mode(regexp) multi-line-prefix("[0-9]{4}\.[0-9]{2}\.[0-9]{2}\.") flags(no-parse)); }; destination d_Conductor { pipe( @@ -29,7 +29,7 @@ destination d_Conductor { log { source(s_Conductor); destination(d_Conductor); }; source s_idds-server-stderr { - file("/var/log/idds/idds-server-stderr.log"); + file("/var/log/idds/idds-server-stderr.log" multi-line-mode(indented)); }; destination d_idds-server-stderr { pipe( diff --git a/main/tools/syslog-ng/syslog-ng.conf b/main/tools/syslog-ng/syslog-ng.conf index 608a9c2c..1f29d5ce 100644 --- a/main/tools/syslog-ng/syslog-ng.conf +++ b/main/tools/syslog-ng/syslog-ng.conf @@ -14,7 +14,7 @@ options { flush_lines (0); time_reopen (10); - log_fifo_size (1000); + log_fifo_size (1000000); chain_hostnames (off); use_dns (no); use_fqdn (no); From 09edd18f493e2c93eddf562608cbe19334d2b86a Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Wed, 18 Jan 2023 14:51:25 +0100 Subject: [PATCH 02/31] optimize syslog-ng format --- main/tools/syslog-ng/config_syslog_ng.py | 37 +++++++++++++++--------- main/tools/syslog-ng/idds.conf | 6 ++-- 2 files changed, 27 insertions(+), 16 deletions(-) diff --git a/main/tools/syslog-ng/config_syslog_ng.py b/main/tools/syslog-ng/config_syslog_ng.py index 5656a2a7..7778981a 100644 --- a/main/tools/syslog-ng/config_syslog_ng.py +++ b/main/tools/syslog-ng/config_syslog_ng.py @@ -16,51 +16,61 @@ def get_files(source): # multi-line-mode(indented) # multi-line-mode(regexp) multi-line-prefix("[0-9]{4}\.[0-9]{2}\.[0-9]{2}\.") flags(no-parse) -def get_file_template(): +def get_file_template(format_mult_lines): + if format_mult_lines == 'regexp': + format_mult_lines = 'multi-line-mode(regexp) multi-line-prefix("[0-9]{4}\\-[0-9]{2}\\-[0-9]{2}") flags(no-parse)' + else: + format_mult_lines = "multi-line-mode(indented)" + template = """source s_${filename} { - file("$source" multi-line-mode(indented)); + file("$source" %s); }; destination d_${filename} { file( "${destination}" - template("$${ISODATE} ${flag} ${filename} $${HOST} $${MESSAGE}\\n")); + template("$${ISODATE} ${flag} ${filename} $${HOST} $$(indent-multi-line $${MESSAGE}\\n")); }; log { source(s_${filename}); destination(d_${filename}); }; -""" +""" % format_mult_lines return string.Template(template) -def get_pipe_template(): +def get_pipe_template(format_mult_lines): + if format_mult_lines == 'regexp': + format_mult_lines = 'multi-line-mode(regexp) multi-line-prefix("[0-9]{4}\\-[0-9]{2}\\-[0-9]{2}") flags(no-parse)' + else: + format_mult_lines = "multi-line-mode(indented)" + template = """source s_${filename} { - file("$source" multi-line-mode(indented)); + file("$source" %s); }; destination d_${filename} { pipe( "${destination}" - template("$${ISODATE} ${flag} ${filename} $${HOST} $${MESSAGE}\\n")); + template("$${ISODATE} ${flag} ${filename} $${HOST} $$(indent-multi-line $${MESSAGE}\\n")); }; log { source(s_${filename}); destination(d_${filename}); }; -""" +""" % format_mult_lines return string.Template(template) -def generate_source_dest_pair(source, destination, flag, pipe=False): +def generate_source_dest_pair(source, destination, flag, pipe=False, format_mult_lines='indent'): filename = os.path.basename(source).replace(".log", "").replace("_log", "") if pipe: - template = get_pipe_template() + template = get_pipe_template(format_mult_lines) else: - template = get_file_template() + template = get_file_template(format_mult_lines) ret = template.substitute(filename=filename, source=source, destination=destination, flag=flag) return ret -def generate_config(config_file, source, destination, flag, pipe=False): +def generate_config(config_file, source, destination, flag, pipe=False, format_mult_lines='indent'): with open(config_file, 'w') as fd: sources = get_files(source) for src in sources: - src_dest = generate_source_dest_pair(src, destination, flag, pipe) + src_dest = generate_source_dest_pair(src, destination, flag, pipe, format_mult_lines=format_mult_lines) fd.write(src_dest) @@ -71,6 +81,7 @@ def generate_config(config_file, source, destination, flag, pipe=False): parser.add_argument('-f', '--flag', default=None, help='Flag name') parser.add_argument('-c', '--config', default=None, help='Configuration file to be generated') parser.add_argument('-p', "--pipe", action="store_true", default=False, help='Use pipe') +parser.add_argument('-m', "--mulLineFormat", default='indent', help='indent or regext') args = parser.parse_args() diff --git a/main/tools/syslog-ng/idds.conf b/main/tools/syslog-ng/idds.conf index 98443ae9..169eb9d7 100644 --- a/main/tools/syslog-ng/idds.conf +++ b/main/tools/syslog-ng/idds.conf @@ -1,5 +1,5 @@ source s_Receiver { - file("/var/log/idds/Receiver.log" multi-line-mode(regexp) multi-line-prefix("[0-9]{4}\.[0-9]{2}\.[0-9]{2}\.") flags(no-parse)); + file("/var/log/idds/Receiver.log" multi-line-mode(regexp) multi-line-prefix("[0-9]{4}\-[0-9]{2}\-[0-9]{2}") flags(no-parse)); }; destination d_Receiver { pipe( @@ -9,7 +9,7 @@ destination d_Receiver { log { source(s_Receiver); destination(d_Receiver); }; source s_idds-server-stdout { - file("/var/log/idds/idds-server-stdout.log" multi-line-mode(regexp) multi-line-prefix("[0-9]{4}\.[0-9]{2}\.[0-9]{2}\.") flags(no-parse)); + file("/var/log/idds/idds-server-stdout.log" multi-line-mode(regexp) multi-line-prefix("[0-9]{4}\-[0-9]{2}\-[0-9]{2}") flags(no-parse)); }; destination d_idds-server-stdout { pipe( @@ -19,7 +19,7 @@ destination d_idds-server-stdout { log { source(s_idds-server-stdout); destination(d_idds-server-stdout); }; source s_Conductor { - file("/var/log/idds/Conductor.log" multi-line-mode(regexp) multi-line-prefix("[0-9]{4}\.[0-9]{2}\.[0-9]{2}\.") flags(no-parse)); + file("/var/log/idds/Conductor.log" multi-line-mode(regexp) multi-line-prefix("[0-9]{4}\-[0-9]{2}\-[0-9]{2}") flags(no-parse)); }; destination d_Conductor { pipe( From 5a0055bf176ac6484387974407b4541f79b6d2a0 Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Wed, 18 Jan 2023 16:45:25 +0100 Subject: [PATCH 03/31] validate doma dependency map --- .../lib/idds/doma/workflowv2/domapandawork.py | 32 ++++++++++++++++++- 1 file changed, 31 insertions(+), 1 deletion(-) diff --git a/doma/lib/idds/doma/workflowv2/domapandawork.py b/doma/lib/idds/doma/workflowv2/domapandawork.py index f2eaf666..ad2b0a73 100644 --- a/doma/lib/idds/doma/workflowv2/domapandawork.py +++ b/doma/lib/idds/doma/workflowv2/domapandawork.py @@ -6,7 +6,7 @@ # http://www.apache.org/licenses/LICENSE-2.0OA # # Authors: -# - Wen Guan, , 2020 - 2022 +# - Wen Guan, , 2020 - 2023 # - Sergey Padolski, , 2020 @@ -121,6 +121,36 @@ def my_condition(self): return True return False + @property + def dependency_map(self): + return self._dependency_map + + @dependency_map.setter + def dependency_map(self, value): + if value: + if type(value) not in [list, tuple]: + raise exceptions.IDDSException("dependency_map should be a list or tuple") + item_names = {} + for item in value: + item_name = item['name'] + inputs_dependency = item["dependencies"] + if item_name not in item_names: + item_names[item_name] = item + else: + raise exceptions.IDDSException("duplicated item with the same name: %s" % item_name) + + uni_input_name = {} + for input_d in inputs_dependency: + task_name = input_d['task'] + input_name = input_d['inputname'] + task_name_input_name = task_name + input_name + if task_name_input_name not in uni_input_name: + uni_input_name[task_name_input_name] = None + else: + raise exceptions.IDDSException("duplicated input dependency for item %s: %s" % (item_name, inputs_dependency)) + + self._dependency_map = value + def load_panda_config(self): panda_config = ConfigParser.ConfigParser() if os.environ.get('IDDS_PANDA_CONFIG', None): From a325bf3c98e0d16dcb86dfa711f0de2ee3761309 Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Mon, 23 Jan 2023 18:40:22 +0100 Subject: [PATCH 04/31] fix postgresql trigger --- main/lib/idds/orm/base/models.py | 46 +++++++++++++++++++++++--------- 1 file changed, 34 insertions(+), 12 deletions(-) diff --git a/main/lib/idds/orm/base/models.py b/main/lib/idds/orm/base/models.py index d39bb54a..4d2ff491 100644 --- a/main/lib/idds/orm/base/models.py +++ b/main/lib/idds/orm/base/models.py @@ -6,7 +6,7 @@ # http://www.apache.org/licenses/LICENSE-2.0OA # # Authors: -# - Wen Guan, , 2019 - 2022 +# - Wen Guan, , 2019 - 2023 """ @@ -740,6 +740,27 @@ def register_models(engine): # models = (Request, Workprogress, Transform, Workprogress2transform, Processing, Collection, Content, Health, Message) models = (Request, Transform, Processing, Collection, Content, Content_update, Content_ext, Health, Message, Command) + func = DDL(""" + SET search_path TO %s; + CREATE OR REPLACE FUNCTION update_dep_contents_status() + RETURNS TRIGGER AS $$ + BEGIN + UPDATE %s.contents set substatus = old.substatus where %s.contents.content_dep_id = old.content_id; + RETURN OLD; + END; + $$ LANGUAGE PLPGSQL + """ % (DEFAULT_SCHEMA_NAME, DEFAULT_SCHEMA_NAME, DEFAULT_SCHEMA_NAME)) + + trigger_ddl = DDL(""" + SET search_path TO %s; + DROP TRIGGER IF EXISTS update_content_dep_status ON %s.contents_update; + CREATE TRIGGER update_content_dep_status BEFORE DELETE ON %s.contents_update + for each row EXECUTE PROCEDURE update_dep_contents_status(); + """ % (DEFAULT_SCHEMA_NAME, DEFAULT_SCHEMA_NAME, DEFAULT_SCHEMA_NAME)) + + event.listen(Content_update.__table__, "after_create", func.execute_if(dialect="postgresql")) + event.listen(Content_update.__table__, "after_create", trigger_ddl.execute_if(dialect="postgresql")) + for model in models: # if not engine.has_table(model.__tablename__, model.metadata.schema): model.metadata.create_all(engine) # pylint: disable=maybe-no-member @@ -753,16 +774,17 @@ def unregister_models(engine): # models = (Request, Workprogress, Transform, Workprogress2transform, Processing, Collection, Content, Health, Message) models = (Request, Transform, Processing, Collection, Content, Content_update, Content_ext, Health, Message, Command) - for model in models: - model.metadata.drop_all(engine) # pylint: disable=maybe-no-member + func = DDL(""" + SET search_path TO %s; + DROP FUNCTION IF EXISTS update_dep_contents_status; + """ % (DEFAULT_SCHEMA_NAME)) + trigger_ddl = DDL(""" + SET search_path TO %s; + DROP TRIGGER IF EXISTS update_content_dep_status ON %s.contents_update; + """ % (DEFAULT_SCHEMA_NAME, DEFAULT_SCHEMA_NAME)) + event.listen(Content_update.__table__, "before_drop", func.execute_if(dialect="postgresql")) + event.listen(Content_update.__table__, "before_drop", trigger_ddl.execute_if(dialect="postgresql")) -@event.listens_for(Content_update.__table__, "after_create") -def _update_content_dep_status(target, connection, **kw): - DDL(""" - CREATE TRIGGER update_content_dep_status BEFORE DELETE ON contents_update - for each row - BEGIN - UPDATE contents set substatus = :old.substatus where contents.content_dep_id = :old.content_id; - END; - """) + for model in models: + model.metadata.drop_all(engine) # pylint: disable=maybe-no-member From c246b9633735f24c02ada5c17e2cda6648f0d184 Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Tue, 24 Jan 2023 10:13:15 +0100 Subject: [PATCH 05/31] add alembic support --- main/config_default/alembic.ini | 107 ++++++++++++++++++ main/etc/idds/alembic.ini.template | 107 ++++++++++++++++++ main/lib/idds/orm/base/alembic/README | 1 + main/lib/idds/orm/base/alembic/env.py | 100 ++++++++++++++++ main/lib/idds/orm/base/alembic/script.py.mako | 34 ++++++ main/lib/idds/orm/base/utils.py | 11 +- main/tools/env/environment.yml | 1 + main/tools/env/setup_dev.sh | 1 + requirements.yaml | 1 + start-daemon.sh | 18 +++ 10 files changed, 380 insertions(+), 1 deletion(-) create mode 100644 main/config_default/alembic.ini create mode 100644 main/etc/idds/alembic.ini.template create mode 100644 main/lib/idds/orm/base/alembic/README create mode 100644 main/lib/idds/orm/base/alembic/env.py create mode 100644 main/lib/idds/orm/base/alembic/script.py.mako diff --git a/main/config_default/alembic.ini b/main/config_default/alembic.ini new file mode 100644 index 00000000..5909bd0e --- /dev/null +++ b/main/config_default/alembic.ini @@ -0,0 +1,107 @@ +# A generic, single database configuration. + +[alembic] +# path to migration scripts +# script_location = alembic +script_location = /opt/idds/lib/python3.9/site-packages/idds/orm/base/alembic + +# template used to generate migration file names; The default value is %%(rev)s_%%(slug)s +# Uncomment the line below if you want the files to be prepended with date and time +# see https://alembic.sqlalchemy.org/en/latest/tutorial.html#editing-the-ini-file +# for all available tokens +# file_template = %%(year)d_%%(month).2d_%%(day).2d_%%(hour).2d%%(minute).2d-%%(rev)s_%%(slug)s + +# sys.path path, will be prepended to sys.path if present. +# defaults to the current working directory. +prepend_sys_path = . + +# timezone to use when rendering the date within the migration file +# as well as the filename. +# If specified, requires the python-dateutil library that can be +# installed by adding `alembic[tz]` to the pip requirements +# string value is passed to dateutil.tz.gettz() +# leave blank for localtime +# timezone = +timezone = UTC + +# max length of characters to apply to the +# "slug" field +# truncate_slug_length = 40 + +# set to 'true' to run the environment during +# the 'revision' command, regardless of autogenerate +# revision_environment = false + +# set to 'true' to allow .pyc and .pyo files without +# a source .py file to be detected as revisions in the +# versions/ directory +# sourceless = false + +# version location specification; This defaults +# to alembic/versions. When using multiple version +# directories, initial revisions must be specified with --version-path. +# The path separator used here should be the separator specified by "version_path_separator" below. +# version_locations = %(here)s/bar:%(here)s/bat:alembic/versions + +# version path separator; As mentioned above, this is the character used to split +# version_locations. The default within new alembic.ini files is "os", which uses os.pathsep. +# If this key is omitted entirely, it falls back to the legacy behavior of splitting on spaces and/or commas. +# Valid values for version_path_separator are: +# +# version_path_separator = : +# version_path_separator = ; +# version_path_separator = space +version_path_separator = os # Use os.pathsep. Default configuration used for new projects. + +# the output encoding used when revision files +# are written from script.py.mako +# output_encoding = utf-8 + +sqlalchemy.url = driver://user:pass@localhost/dbname +version_table_schema = DOMA_IDDS + +[post_write_hooks] +# post_write_hooks defines scripts or Python functions that are run +# on newly generated revision scripts. See the documentation for further +# detail and examples + +# format using "black" - use the console_scripts runner, against the "black" entrypoint +# hooks = black +# black.type = console_scripts +# black.entrypoint = black +# black.options = -l 79 REVISION_SCRIPT_FILENAME + +# Logging configuration +[loggers] +keys = root,sqlalchemy,alembic + +[handlers] +keys = console + +[formatters] +keys = generic + +[logger_root] +level = WARN +handlers = console +qualname = + +[logger_sqlalchemy] +level = WARN +handlers = +qualname = sqlalchemy.engine + +[logger_alembic] +level = INFO +handlers = +qualname = alembic + +[handler_console] +class = StreamHandler +args = (sys.stderr,) +level = NOTSET +formatter = generic + +[formatter_generic] +format = %(levelname)-5.5s [%(name)s] %(message)s +datefmt = %H:%M:%S diff --git a/main/etc/idds/alembic.ini.template b/main/etc/idds/alembic.ini.template new file mode 100644 index 00000000..fbf8ee81 --- /dev/null +++ b/main/etc/idds/alembic.ini.template @@ -0,0 +1,107 @@ +# A generic, single database configuration. + +[alembic] +# path to migration scripts +# script_location = alembic +script_location = main/lib/idds/orm/base/alembic + +# template used to generate migration file names; The default value is %%(rev)s_%%(slug)s +# Uncomment the line below if you want the files to be prepended with date and time +# see https://alembic.sqlalchemy.org/en/latest/tutorial.html#editing-the-ini-file +# for all available tokens +# file_template = %%(year)d_%%(month).2d_%%(day).2d_%%(hour).2d%%(minute).2d-%%(rev)s_%%(slug)s + +# sys.path path, will be prepended to sys.path if present. +# defaults to the current working directory. +prepend_sys_path = . + +# timezone to use when rendering the date within the migration file +# as well as the filename. +# If specified, requires the python-dateutil library that can be +# installed by adding `alembic[tz]` to the pip requirements +# string value is passed to dateutil.tz.gettz() +# leave blank for localtime +# timezone = +timezone = UTC + +# max length of characters to apply to the +# "slug" field +# truncate_slug_length = 40 + +# set to 'true' to run the environment during +# the 'revision' command, regardless of autogenerate +# revision_environment = false + +# set to 'true' to allow .pyc and .pyo files without +# a source .py file to be detected as revisions in the +# versions/ directory +# sourceless = false + +# version location specification; This defaults +# to alembic/versions. When using multiple version +# directories, initial revisions must be specified with --version-path. +# The path separator used here should be the separator specified by "version_path_separator" below. +# version_locations = %(here)s/bar:%(here)s/bat:alembic/versions + +# version path separator; As mentioned above, this is the character used to split +# version_locations. The default within new alembic.ini files is "os", which uses os.pathsep. +# If this key is omitted entirely, it falls back to the legacy behavior of splitting on spaces and/or commas. +# Valid values for version_path_separator are: +# +# version_path_separator = : +# version_path_separator = ; +# version_path_separator = space +version_path_separator = os # Use os.pathsep. Default configuration used for new projects. + +# the output encoding used when revision files +# are written from script.py.mako +# output_encoding = utf-8 + +sqlalchemy.url = driver://user:pass@localhost/dbname +version_table_schema = DOMA_IDDS + +[post_write_hooks] +# post_write_hooks defines scripts or Python functions that are run +# on newly generated revision scripts. See the documentation for further +# detail and examples + +# format using "black" - use the console_scripts runner, against the "black" entrypoint +# hooks = black +# black.type = console_scripts +# black.entrypoint = black +# black.options = -l 79 REVISION_SCRIPT_FILENAME + +# Logging configuration +[loggers] +keys = root,sqlalchemy,alembic + +[handlers] +keys = console + +[formatters] +keys = generic + +[logger_root] +level = WARN +handlers = console +qualname = + +[logger_sqlalchemy] +level = WARN +handlers = +qualname = sqlalchemy.engine + +[logger_alembic] +level = INFO +handlers = +qualname = alembic + +[handler_console] +class = StreamHandler +args = (sys.stderr,) +level = NOTSET +formatter = generic + +[formatter_generic] +format = %(levelname)-5.5s [%(name)s] %(message)s +datefmt = %H:%M:%S diff --git a/main/lib/idds/orm/base/alembic/README b/main/lib/idds/orm/base/alembic/README new file mode 100644 index 00000000..98e4f9c4 --- /dev/null +++ b/main/lib/idds/orm/base/alembic/README @@ -0,0 +1 @@ +Generic single-database configuration. \ No newline at end of file diff --git a/main/lib/idds/orm/base/alembic/env.py b/main/lib/idds/orm/base/alembic/env.py new file mode 100644 index 00000000..5db5c611 --- /dev/null +++ b/main/lib/idds/orm/base/alembic/env.py @@ -0,0 +1,100 @@ +#!/usr/bin/env python +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# You may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0OA +# +# Authors: +# - Wen Guan, , 2023 + + +from logging.config import fileConfig + +from sqlalchemy import engine_from_config +from sqlalchemy import pool + +from alembic import context + +# from idds.orm.base.models import Base +from idds.orm.base.session import BASE + +# this is the Alembic Config object, which provides +# access to the values within the .ini file in use. +config = context.config + +# Interpret the config file for Python logging. +# This line sets up loggers basically. +if config.config_file_name is not None: + fileConfig(config.config_file_name) + +# add your model's MetaData object here +# for 'autogenerate' support +# from myapp import mymodel +# target_metadata = mymodel.Base.metadata +# target_metadata = None +target_metadata = BASE.metadata + +# other values from the config, defined by the needs of env.py, +# can be acquired: +# my_important_option = config.get_main_option("my_important_option") +# ... etc. + + +def run_migrations_offline() -> None: + """Run migrations in 'offline' mode. + + This configures the context with just a URL + and not an Engine, though an Engine is acceptable + here as well. By skipping the Engine creation + we don't even need a DBAPI to be available. + + Calls to context.execute() here emit the given string to the + script output. + + """ + url = config.get_main_option("sqlalchemy.url") + + version_table_schema = config.get_main_option("version_table_schema") + + context.configure( + url=url, + version_table_schema=version_table_schema, + target_metadata=target_metadata, + literal_binds=True, + dialect_opts={"paramstyle": "named"}, + include_schemas=True + ) + + with context.begin_transaction(): + context.run_migrations() + + +def run_migrations_online() -> None: + """Run migrations in 'online' mode. + + In this scenario we need to create an Engine + and associate a connection with the context. + + """ + connectable = engine_from_config( + config.get_section(config.config_ini_section), + prefix="sqlalchemy.", + poolclass=pool.NullPool, + ) + + with connectable.connect() as connection: + context.configure( + connection=connection, target_metadata=target_metadata, + version_table_schema=config.get_main_option("version_table_schema"), + include_schemas=True + ) + + with context.begin_transaction(): + context.run_migrations() + + +if context.is_offline_mode(): + run_migrations_offline() +else: + run_migrations_online() diff --git a/main/lib/idds/orm/base/alembic/script.py.mako b/main/lib/idds/orm/base/alembic/script.py.mako new file mode 100644 index 00000000..d6f2e054 --- /dev/null +++ b/main/lib/idds/orm/base/alembic/script.py.mako @@ -0,0 +1,34 @@ +#!/usr/bin/env python +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# You may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0OA +# +# Authors: +# - Wen Guan, , 2023 + +"""${message} + +Revision ID: ${up_revision} +Revises: ${down_revision | comma,n} +Create Date: ${create_date} + +""" +from alembic import op +import sqlalchemy as sa +${imports if imports else ""} + +# revision identifiers, used by Alembic. +revision = ${repr(up_revision)} +down_revision = ${repr(down_revision)} +branch_labels = ${repr(branch_labels)} +depends_on = ${repr(depends_on)} + + +def upgrade() -> None: + ${upgrades if upgrades else "pass"} + + +def downgrade() -> None: + ${downgrades if downgrades else "pass"} diff --git a/main/lib/idds/orm/base/utils.py b/main/lib/idds/orm/base/utils.py index d3ffe4a8..6d820067 100644 --- a/main/lib/idds/orm/base/utils.py +++ b/main/lib/idds/orm/base/utils.py @@ -6,16 +6,20 @@ # http://www.apache.org/licenses/LICENSE-2.0OA # # Authors: -# - Wen Guan, , 2019 +# - Wen Guan, , 2019 - 2023 """ Utils to create the database or destroy the database """ +import os import traceback from typing import Union +from alembic.config import Config +from alembic import command + import sqlalchemy # from sqlalchemy.engine import reflection from sqlalchemy.engine import Inspector @@ -44,6 +48,11 @@ def build_database(echo=True, tests=False): models.register_models(engine) + # record the head version of alembic + alembic_cfg_file = os.environ.get("ALEMBIC_CONFIG") + alembic_cfg = Config(alembic_cfg_file) + command.stamp(alembic_cfg, "head") + def destroy_database(echo=True): """Destroy the database""" diff --git a/main/tools/env/environment.yml b/main/tools/env/environment.yml index ee34fd3f..343330f7 100644 --- a/main/tools/env/environment.yml +++ b/main/tools/env/environment.yml @@ -27,6 +27,7 @@ dependencies: - pyjwt - cryptography - redis + - alembic - idds-common==0.11.5 - idds-workflow==0.11.5 - idds-client==0.11.5 diff --git a/main/tools/env/setup_dev.sh b/main/tools/env/setup_dev.sh index bdc87d14..c6da726a 100644 --- a/main/tools/env/setup_dev.sh +++ b/main/tools/env/setup_dev.sh @@ -18,6 +18,7 @@ CondaDir=${RootDir}/../.conda/iDDS echo 'Root dir: ' $RootDir export IDDS_HOME=$RootDir +export ALEMBIC_CONFIG=${IDDS_HOME}/etc/idds/alembic.ini source /afs/cern.ch/user/w/wguan/workdisk/conda/setup.sh diff --git a/requirements.yaml b/requirements.yaml index 38b56e69..1e240f4f 100644 --- a/requirements.yaml +++ b/requirements.yaml @@ -27,3 +27,4 @@ dependencies: - pyjwt - cryptography - redis + - alembic diff --git a/start-daemon.sh b/start-daemon.sh index 809573a0..d6c7d7ae 100755 --- a/start-daemon.sh +++ b/start-daemon.sh @@ -39,6 +39,20 @@ else -d /opt/idds/config/idds/idds.cfg fi +if [ -f /opt/idds/config/idds/alembic.ini ]; then + echo "alembic.ini already mounted." +else + echo "alembic.ini not found. will generate one." + python3 /opt/idds/tools/env/merge_idds_configs.py \ + -s /opt/idds/config_default/alembic.ini $IDDS_OVERRIDE_IDDS_CONFIGS \ + --use-env \ + --prefix IDDS_CFG_ALEMBIC \ + -d /opt/idds/config/idds/alembic.ini + python3 /opt/idds/tools/env/merge_configmap.py \ + -s /opt/idds/configmap/idds_configmap.json \ + -d /opt/idds/config/idds/alembic.ini +fi + if [ -f /opt/idds/config/idds/auth.cfg ]; then echo "auth.cfg already mounted." else @@ -157,6 +171,10 @@ fi # create database if not exists python /opt/idds/tools/env/create_database.py +# upgrade database +export ALEMBIC_CONFIG=/opt/idds/config/idds/alembic.ini +alembic upgrade head +# configure monitor python /opt/idds/tools/env/config_monitor.py -s ${IDDS_HOME}/monitor/data/conf.js.template -d ${IDDS_HOME}/monitor/data/conf.js --host ${IDDS_SERVER} if ! [ -f /opt/idds/config/.token ]; then From dc2029ea748615fedd7cb08a083b536ee33abd64 Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Tue, 24 Jan 2023 13:48:44 +0100 Subject: [PATCH 06/31] fix setup alembic.ini --- start-daemon.sh | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/start-daemon.sh b/start-daemon.sh index d6c7d7ae..1f50670b 100755 --- a/start-daemon.sh +++ b/start-daemon.sh @@ -43,11 +43,7 @@ if [ -f /opt/idds/config/idds/alembic.ini ]; then echo "alembic.ini already mounted." else echo "alembic.ini not found. will generate one." - python3 /opt/idds/tools/env/merge_idds_configs.py \ - -s /opt/idds/config_default/alembic.ini $IDDS_OVERRIDE_IDDS_CONFIGS \ - --use-env \ - --prefix IDDS_CFG_ALEMBIC \ - -d /opt/idds/config/idds/alembic.ini + cp /opt/idds/config_default/alembic.ini /opt/idds/config/idds/alembic.ini python3 /opt/idds/tools/env/merge_configmap.py \ -s /opt/idds/configmap/idds_configmap.json \ -d /opt/idds/config/idds/alembic.ini From 4512d2de3d81f74b7a35f2efa05b19bc08b43397 Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Tue, 24 Jan 2023 14:01:05 +0100 Subject: [PATCH 07/31] add logg --- doma/lib/idds/doma/workflowv2/domapandawork.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/doma/lib/idds/doma/workflowv2/domapandawork.py b/doma/lib/idds/doma/workflowv2/domapandawork.py index ad2b0a73..d6aa4b63 100644 --- a/doma/lib/idds/doma/workflowv2/domapandawork.py +++ b/doma/lib/idds/doma/workflowv2/domapandawork.py @@ -1252,8 +1252,8 @@ def poll_processing_updates(self, processing, input_output_maps, contents_ext=No log_prefix=log_prefix) processing_status, update_contents, update_contents_full, new_contents_ext, update_contents_ext = ret_poll_panda_task - # self.logger.debug(log_prefix + "poll_processing_updates, processing_status: %s" % str(processing_status)) - # self.logger.debug(log_prefix + "poll_processing_updates, update_contents[:10]: %s" % str(update_contents[:10])) + self.logger.debug(log_prefix + "poll_processing_updates, processing_status: %s" % str(processing_status)) + self.logger.debug(log_prefix + "poll_processing_updates, update_contents[:10]: %s" % str(update_contents[:10])) if update_contents: proc.has_new_updates() From 40b7b77955dfee24fc8654c21b1911900d21a461 Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Tue, 24 Jan 2023 14:01:36 +0100 Subject: [PATCH 08/31] add logg --- doma/lib/idds/doma/workflowv2/domapandawork.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doma/lib/idds/doma/workflowv2/domapandawork.py b/doma/lib/idds/doma/workflowv2/domapandawork.py index d6aa4b63..c4d11978 100644 --- a/doma/lib/idds/doma/workflowv2/domapandawork.py +++ b/doma/lib/idds/doma/workflowv2/domapandawork.py @@ -1253,7 +1253,7 @@ def poll_processing_updates(self, processing, input_output_maps, contents_ext=No processing_status, update_contents, update_contents_full, new_contents_ext, update_contents_ext = ret_poll_panda_task self.logger.debug(log_prefix + "poll_processing_updates, processing_status: %s" % str(processing_status)) - self.logger.debug(log_prefix + "poll_processing_updates, update_contents[:10]: %s" % str(update_contents[:10])) + self.logger.debug(log_prefix + "poll_processing_updates, update_contents[:3]: %s" % str(update_contents[:3])) if update_contents: proc.has_new_updates() From 72054a69967847f4b4fc9d5d5a4554b09f7d36f3 Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Tue, 24 Jan 2023 18:59:47 +0100 Subject: [PATCH 09/31] fix getting attributes for different work tag --- main/lib/idds/agents/carrier/poller.py | 7 ++++--- main/lib/idds/agents/clerk/clerk.py | 7 ++++--- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/main/lib/idds/agents/carrier/poller.py b/main/lib/idds/agents/carrier/poller.py index ac244ec4..5eede04f 100644 --- a/main/lib/idds/agents/carrier/poller.py +++ b/main/lib/idds/agents/carrier/poller.py @@ -144,10 +144,11 @@ def get_processing(self, processing_id, status=None, locking=False): return None def get_work_tag_attribute(self, work_tag, attribute): - work_tag_attribute = work_tag + "_" + attribute work_tag_attribute_value = None - if hasattr(self, work_tag_attribute): - work_tag_attribute_value = int(getattr(self, work_tag_attribute)) + if work_tag: + work_tag_attribute = work_tag + "_" + attribute + if hasattr(self, work_tag_attribute): + work_tag_attribute_value = int(getattr(self, work_tag_attribute)) return work_tag_attribute_value def load_poll_period(self, processing, parameters): diff --git a/main/lib/idds/agents/clerk/clerk.py b/main/lib/idds/agents/clerk/clerk.py index 83303ab8..8686b00d 100644 --- a/main/lib/idds/agents/clerk/clerk.py +++ b/main/lib/idds/agents/clerk/clerk.py @@ -278,10 +278,11 @@ def load_poll_period(self, req, parameters): return parameters def get_work_tag_attribute(self, work_tag, attribute): - work_tag_attribute = work_tag + "_" + attribute work_tag_attribute_value = None - if hasattr(self, work_tag_attribute): - work_tag_attribute_value = int(getattr(self, work_tag_attribute)) + if work_tag: + work_tag_attribute = work_tag + "_" + attribute + if hasattr(self, work_tag_attribute): + work_tag_attribute_value = int(getattr(self, work_tag_attribute)) return work_tag_attribute_value def generate_transform(self, req, work, build=False): From ee1a1c39356fae7ee47617191715b4a53df9b10d Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Tue, 24 Jan 2023 23:12:36 +0100 Subject: [PATCH 10/31] fix data carousel when rule not exists --- atlas/lib/idds/atlas/workflow/atlasstageinwork.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/atlas/lib/idds/atlas/workflow/atlasstageinwork.py b/atlas/lib/idds/atlas/workflow/atlasstageinwork.py index d700df60..f64275ae 100644 --- a/atlas/lib/idds/atlas/workflow/atlasstageinwork.py +++ b/atlas/lib/idds/atlas/workflow/atlasstageinwork.py @@ -311,7 +311,9 @@ def poll_rule(self, processing): scope_name = '%s:%s' % (lock['scope'], lock['name']) if lock['state'] == 'OK': replicases_status[scope_name] = ContentStatus.Available # 'OK' - return processing, rule['state'], replicases_status + return processing, rule['state'], replicases_status + else: + return processing, 'notOk', replicases_status except RucioRuleNotFound as ex: msg = "rule(%s) not found: %s" % (str(rule_id), str(ex)) raise exceptions.ProcessNotFound(msg) From d14b7635146bded0d853324f1a936d4d2442d575 Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Tue, 24 Jan 2023 23:16:51 +0100 Subject: [PATCH 11/31] fix hpo by adding is_starting funct --- atlas/lib/idds/atlas/workflowv2/atlasstageinwork.py | 4 +++- workflow/lib/idds/workflow/work.py | 3 +++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/atlas/lib/idds/atlas/workflowv2/atlasstageinwork.py b/atlas/lib/idds/atlas/workflowv2/atlasstageinwork.py index d56917fd..deb9730a 100644 --- a/atlas/lib/idds/atlas/workflowv2/atlasstageinwork.py +++ b/atlas/lib/idds/atlas/workflowv2/atlasstageinwork.py @@ -311,7 +311,9 @@ def poll_rule(self, processing): scope_name = '%s:%s' % (lock['scope'], lock['name']) if lock['state'] == 'OK': replicases_status[scope_name] = ContentStatus.Available # 'OK' - return processing, rule['state'], replicases_status + return processing, rule['state'], replicases_status + else: + return processing, 'notOk', replicases_status except RucioRuleNotFound as ex: msg = "rule(%s) not found: %s" % (str(rule_id), str(ex)) raise exceptions.ProcessNotFound(msg) diff --git a/workflow/lib/idds/workflow/work.py b/workflow/lib/idds/workflow/work.py index 8432e469..b6dafe0c 100644 --- a/workflow/lib/idds/workflow/work.py +++ b/workflow/lib/idds/workflow/work.py @@ -1372,6 +1372,9 @@ def get_backup_to_release_inputs(self): self.backup_to_release_inputs['0'] = [] return to_release_inputs + def is_starting(self): + return self.transforming + def is_started(self): return self.started or self.submitted From deb9b6a52e8618692f24a03fbea84411ee73fa1b Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Wed, 25 Jan 2023 10:25:02 +0100 Subject: [PATCH 12/31] fix receiver logs --- main/lib/idds/agents/carrier/receiver.py | 6 ++++-- main/lib/idds/agents/carrier/utils.py | 7 ++++--- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/main/lib/idds/agents/carrier/receiver.py b/main/lib/idds/agents/carrier/receiver.py index 39d400e8..1d62cf4a 100644 --- a/main/lib/idds/agents/carrier/receiver.py +++ b/main/lib/idds/agents/carrier/receiver.py @@ -6,7 +6,7 @@ # http://www.apache.org/licenses/LICENSE-2.0OA # # Authors: -# - Wen Guan, , 2019 - 2022 +# - Wen Guan, , 2019 - 2023 import time import traceback @@ -93,7 +93,9 @@ def run(self): try: time_start = time.time() output_messages = self.get_output_messages() - update_processings, terminated_processings, update_contents, msgs = handle_messages_processing(output_messages) + update_processings, terminated_processings, update_contents, msgs = handle_messages_processing(output_messages, + logger=self.logger, + log_prefix=log_prefix) if msgs: # self.logger.debug(log_prefix + "adding messages[:3]: %s" % json_dumps(msgs[:3])) diff --git a/main/lib/idds/agents/carrier/utils.py b/main/lib/idds/agents/carrier/utils.py index 8f5baacd..fee84865 100644 --- a/main/lib/idds/agents/carrier/utils.py +++ b/main/lib/idds/agents/carrier/utils.py @@ -1104,9 +1104,10 @@ def get_content_id_from_job_id(request_id, workload_id, transform_id, job_id, in return content_id, to_update_jobid -def handle_messages_processing(messages): - logger = get_logger() - log_prefix = "" +def handle_messages_processing(messages, logger=None, log_prefix=''): + logger = get_logger(logger) + if not log_prefix: + log_prefix = "" update_processings = [] terminated_processings = [] From e9d32bcf2de83d71349e9ed5644f6700aea4c0d0 Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Wed, 25 Jan 2023 15:26:49 +0100 Subject: [PATCH 13/31] fix to get content_dep_id --- main/lib/idds/core/processings.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/main/lib/idds/core/processings.py b/main/lib/idds/core/processings.py index e39cbc8c..6b778a29 100644 --- a/main/lib/idds/core/processings.py +++ b/main/lib/idds/core/processings.py @@ -6,7 +6,7 @@ # http://www.apache.org/licenses/LICENSE-2.0OA # # Authors: -# - Wen Guan, , 2019 - 2022 +# - Wen Guan, , 2019 - 2023 """ @@ -16,7 +16,7 @@ import datetime from idds.orm.base.session import read_session, transactional_session -from idds.common.constants import ProcessingLocking, ProcessingStatus, GranularityType +from idds.common.constants import ProcessingLocking, ProcessingStatus, GranularityType, ContentRelationType from idds.orm import (processings as orm_processings, collections as orm_collections, contents as orm_contents, @@ -288,7 +288,7 @@ def resolve_input_dependency_id(new_input_dependency_contents, session=None): coll_ids = [] for content in new_input_dependency_contents: coll_ids.append(content['coll_id']) - contents = orm_contents.get_contents(coll_id=coll_ids, session=session) + contents = orm_contents.get_contents(coll_id=coll_ids, relation_type=ContentRelationType.Output, session=session) content_name_id_map = {} for content in contents: if content['coll_id'] not in content_name_id_map: From ce8a93c2997060b510eb6ad758a1512d74189f47 Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Thu, 26 Jan 2023 14:19:57 +0100 Subject: [PATCH 14/31] fix resolve_dep_id avoiding duplication --- main/lib/idds/core/processings.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/main/lib/idds/core/processings.py b/main/lib/idds/core/processings.py index 6b778a29..49549680 100644 --- a/main/lib/idds/core/processings.py +++ b/main/lib/idds/core/processings.py @@ -287,7 +287,8 @@ def update_processing_with_collection_contents(updated_processing, new_processin def resolve_input_dependency_id(new_input_dependency_contents, session=None): coll_ids = [] for content in new_input_dependency_contents: - coll_ids.append(content['coll_id']) + if content['coll_id'] not in coll_ids: + coll_ids.append(content['coll_id']) contents = orm_contents.get_contents(coll_id=coll_ids, relation_type=ContentRelationType.Output, session=session) content_name_id_map = {} for content in contents: From b4dd67e3d0d1bd85da6d7075353c473d484c3f1e Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Thu, 26 Jan 2023 14:26:43 +0100 Subject: [PATCH 15/31] trigger new events when new files available --- main/lib/idds/agents/carrier/trigger.py | 21 +++++++++++-- main/lib/idds/agents/carrier/utils.py | 31 +++++++++++++++---- .../idds/agents/transformer/transformer.py | 18 +++++++---- main/lib/idds/orm/base/models.py | 4 +++ main/lib/idds/orm/contents.py | 14 +++++++++ 5 files changed, 73 insertions(+), 15 deletions(-) diff --git a/main/lib/idds/agents/carrier/trigger.py b/main/lib/idds/agents/carrier/trigger.py index 68426855..21ec4e3c 100644 --- a/main/lib/idds/agents/carrier/trigger.py +++ b/main/lib/idds/agents/carrier/trigger.py @@ -15,6 +15,7 @@ from idds.common.utils import setup_logging, truncate_string from idds.core import processings as core_processings from idds.agents.common.eventbus.event import (EventType, + UpdateTransformEvent, TriggerProcessingEvent, TerminatedProcessingEvent, SyncProcessingEvent) @@ -78,7 +79,9 @@ def handle_trigger_processing(self, processing, trigger_new_updates=False): trigger_new_updates=trigger_new_updates, logger=self.logger, log_prefix=log_prefix) - process_status, update_contents, ret_msgs, parameters, update_dep_contents_status_name, update_dep_contents_status, new_update_contents = ret_trigger_processing + process_status, update_contents, ret_msgs, parameters, update_dep_contents_status_name, update_dep_contents_status, new_update_contents, ret_update_transforms = ret_trigger_processing + + self.logger.debug(log_prefix + "handle_trigger_processing: ret_update_transforms: %s" % str(ret_update_transforms)) new_process_status = process_status if is_process_terminated(process_status): @@ -98,6 +101,7 @@ def handle_trigger_processing(self, processing, trigger_new_updates=False): 'update_contents': update_contents, 'messages': ret_msgs, 'new_update_contents': new_update_contents, + 'update_transforms': ret_update_transforms, 'update_dep_contents': (processing['request_id'], update_dep_contents_status_name, update_dep_contents_status), 'processing_status': new_process_status} except exceptions.ProcessFormatNotSupported as ex: @@ -168,8 +172,19 @@ def process_trigger_processing(self, event): new_update_contents = ret.get('new_update_contents', None) if new_update_contents: - ret = self.handle_trigger_processing(pr, trigger_new_updates=True) - self.update_processing(ret, pr) + ret1 = self.handle_trigger_processing(pr, trigger_new_updates=True) + self.logger.info(log_pre + "process_trigger_processing1 result: %s" % str(ret1)) + self.update_processing(ret1, pr) + if 'update_transforms' in ret1 and ret1['update_transforms']: + # contents for some other transforms are updated. + for update_transform in ret1['update_transforms']: + if 'transform_id' in update_transform: + update_transform_id = update_transform['transform_id'] + event = UpdateTransformEvent(publisher_id=self.id, + transform_id=update_transform_id, + content={'event': 'Trigger'}) + self.logger.info(log_pre + "Trigger UpdateTransformEvent(transform_id: %s" % update_transform_id) + self.event_bus.send(event) if (('processing_status' in ret and ret['processing_status'] == ProcessingStatus.Terminating) or (event._content and 'Terminated' in event._content and event._content['Terminated'])): # noqa W503 diff --git a/main/lib/idds/agents/carrier/utils.py b/main/lib/idds/agents/carrier/utils.py index fee84865..e7b15fc3 100644 --- a/main/lib/idds/agents/carrier/utils.py +++ b/main/lib/idds/agents/carrier/utils.py @@ -535,7 +535,11 @@ def get_updated_contents_by_input_output_maps(input_output_maps=None, logger=Non 'status': content['substatus']} updated_contents.append(u_content) u_content_substatus = {'content_id': content['content_id'], - 'substatus': content['substatus']} + 'substatus': content['substatus'], + 'request_id': content['request_id'], + 'transform_id': content['transform_id'], + 'workload_id': content['workload_id'], + 'coll_id': content['coll_id']} new_update_contents.append(u_content_substatus) updated_contents_full_input.append(content) for content in outputs: @@ -544,7 +548,11 @@ def get_updated_contents_by_input_output_maps(input_output_maps=None, logger=Non 'status': content['substatus']} updated_contents.append(u_content) u_content_substatus = {'content_id': content['content_id'], - 'substatus': content['substatus']} + 'substatus': content['substatus'], + 'request_id': content['request_id'], + 'transform_id': content['transform_id'], + 'workload_id': content['workload_id'], + 'coll_id': content['coll_id']} new_update_contents.append(u_content_substatus) updated_contents_full_output.append(content) for content in inputs_dependency: @@ -570,7 +578,11 @@ def get_updated_contents_by_input_output_maps(input_output_maps=None, logger=Non content['substatus'] = input_content_update_status updated_contents_full_input.append(content) u_content_substatus = {'content_id': content['content_id'], - 'substatus': content['substatus']} + 'substatus': content['substatus'], + 'request_id': content['request_id'], + 'transform_id': content['transform_id'], + 'workload_id': content['workload_id'], + 'coll_id': content['coll_id']} new_update_contents.append(u_content_substatus) output_content_update_status = None @@ -590,7 +602,11 @@ def get_updated_contents_by_input_output_maps(input_output_maps=None, logger=Non content['substatus'] = output_content_update_status updated_contents_full_output.append(content) u_content_substatus = {'content_id': content['content_id'], - 'substatus': content['substatus']} + 'substatus': content['substatus'], + 'request_id': content['request_id'], + 'transform_id': content['transform_id'], + 'workload_id': content['workload_id'], + 'coll_id': content['coll_id']} new_update_contents.append(u_content_substatus) return updated_contents, updated_contents_full_input, updated_contents_full_output, updated_contents_full_input_deps, new_update_contents @@ -939,6 +955,7 @@ def handle_trigger_processing(processing, agent_attributes, trigger_new_updates= ret_msgs = [] content_updates = [] + ret_update_transforms = [] request_id = processing['request_id'] transform_id = processing['transform_id'] @@ -953,7 +970,9 @@ def handle_trigger_processing(processing, agent_attributes, trigger_new_updates= else: if trigger_new_updates: # delete information in the contents_update table, to invoke the trigger. - core_catalog.delete_contents_update() + ret_update_transforms = core_catalog.delete_contents_update() + logger.debug(log_prefix + "delete_contents_update: %s" % str(ret_update_transforms)) + input_output_maps = get_input_output_maps(transform_id, work) logger.debug(log_prefix + "input_output_maps.keys[:2]: %s" % str(list(input_output_maps.keys())[:2])) @@ -975,7 +994,7 @@ def handle_trigger_processing(processing, agent_attributes, trigger_new_updates= content_updates = content_updates + updated_contents - return processing['substatus'], content_updates, ret_msgs, {}, {}, {}, new_update_contents + return processing['substatus'], content_updates, ret_msgs, {}, {}, {}, new_update_contents, ret_update_transforms def get_content_status_from_panda_msg_status(status): diff --git a/main/lib/idds/agents/transformer/transformer.py b/main/lib/idds/agents/transformer/transformer.py index 02f662a5..df295cc8 100644 --- a/main/lib/idds/agents/transformer/transformer.py +++ b/main/lib/idds/agents/transformer/transformer.py @@ -418,10 +418,12 @@ def handle_update_transform_real(self, transform, event): # link processings new_processing_model, processing_model = None, None + ret_processing_id = None processing = work.get_processing(input_output_maps=[], without_creating=True) self.logger.debug(log_pre + "work get_processing: %s" % processing) if processing and processing.processing_id: + ret_processing_id = processing.processing_id processing_model = core_processings.get_processing(processing_id=processing.processing_id) work.sync_processing(processing, processing_model) proc = processing_model['processing_metadata']['processing'] @@ -479,7 +481,7 @@ def handle_update_transform_real(self, transform, event): ret = {'transform': transform, 'transform_parameters': transform_parameters, 'new_processing': new_processing_model} - return ret, is_terminated + return ret, is_terminated, ret_processing_id def handle_update_transform(self, transform, event): """ @@ -489,9 +491,9 @@ def handle_update_transform(self, transform, event): log_pre = self.get_log_prefix(transform) self.logger.info(log_pre + "handle_update_transform: %s" % transform) - ret, is_terminated = self.handle_update_transform_real(transform, event) + ret, is_terminated, ret_processing_id = self.handle_update_transform_real(transform, event) self.logger.info(log_pre + "handle_update_transform result: %s" % str(ret)) - return ret, is_terminated + return ret, is_terminated, ret_processing_id except Exception as ex: self.logger.error(ex) self.logger.error(traceback.format_exc()) @@ -517,7 +519,7 @@ def handle_update_transform(self, transform, event): ret = {'transform': transform, 'transform_parameters': transform_parameters} self.logger.warn(log_pre + "handle_update_transform exception result: %s" % str(ret)) - return ret, False + return ret, False, None def process_update_transform(self, event): self.number_workers += 1 @@ -535,7 +537,7 @@ def process_update_transform(self, event): else: log_pre = self.get_log_prefix(tf) - ret, is_terminated = self.handle_update_transform(tf, event) + ret, is_terminated, ret_processing_id = self.handle_update_transform(tf, event) new_pr_ids, update_pr_ids = self.update_transform(ret) if is_terminated or (event._content and 'event' in event._content and event._content['event'] == 'submitted'): @@ -547,9 +549,13 @@ def process_update_transform(self, event): event = NewProcessingEvent(publisher_id=self.id, processing_id=pr_id, content=event._content) self.event_bus.send(event) for pr_id in update_pr_ids: - self.logger.info(log_pre + "NewProcessingEvent(processing_id: %s)" % pr_id) + self.logger.info(log_pre + "UpdateProcessingEvent(processing_id: %s)" % pr_id) event = UpdateProcessingEvent(publisher_id=self.id, processing_id=pr_id, content=event._content) self.event_bus.send(event) + if ret_processing_id and (event._content and 'event' in event._content and event._content['event'] == 'Trigger'): + self.logger.info(log_pre + "UpdateProcessingEvent(processing_id: %s)" % ret_processing_id) + event = UpdateProcessingEvent(publisher_id=self.id, processing_id=ret_processing_id) + self.event_bus.send(event) except Exception as ex: self.logger.error(ex) self.logger.error(traceback.format_exc()) diff --git a/main/lib/idds/orm/base/models.py b/main/lib/idds/orm/base/models.py index 4d2ff491..aa095589 100644 --- a/main/lib/idds/orm/base/models.py +++ b/main/lib/idds/orm/base/models.py @@ -569,6 +569,10 @@ class Content_update(BASE, ModelBase): __tablename__ = 'contents_update' content_id = Column(BigInteger().with_variant(Integer, "sqlite"), primary_key=True) substatus = Column(EnumWithValue(ContentStatus)) + request_id = Column(BigInteger().with_variant(Integer, "sqlite")) + transform_id = Column(BigInteger().with_variant(Integer, "sqlite")) + workload_id = Column(Integer()) + coll_id = Column(BigInteger().with_variant(Integer, "sqlite")) class Content_ext(BASE, ModelBase): diff --git a/main/lib/idds/orm/contents.py b/main/lib/idds/orm/contents.py index 80f6eb8d..efbfb8ee 100644 --- a/main/lib/idds/orm/contents.py +++ b/main/lib/idds/orm/contents.py @@ -589,7 +589,21 @@ def delete_contents_update(session=None): :raises DatabaseException: If there is a database error. """ try: + subquery = session.query(models.Content_update.content_id).subquery() + query = session.query(models.Content.request_id, + models.Content.transform_id, + models.Content.workload_id, + models.Content.coll_id) + query = query.filter(models.Content.content_id.in_(subquery)) + + tmp = query.distinct() + rets = [] + if tmp: + for t in tmp: + t2 = dict(zip(t.keys(), t)) + rets.append(t2) session.query(models.Content_update).delete() + return rets except Exception as error: raise exceptions.NoObject('Content_update deletion error: %s' % (error)) From b95a361d3eb4bf96184c71d3fbf64a97d07a9939 Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Thu, 26 Jan 2023 15:21:30 +0100 Subject: [PATCH 16/31] oracle updates --- main/etc/sql/oracle_update.sql | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/main/etc/sql/oracle_update.sql b/main/etc/sql/oracle_update.sql index c46d1f70..5ef63790 100644 --- a/main/etc/sql/oracle_update.sql +++ b/main/etc/sql/oracle_update.sql @@ -219,3 +219,22 @@ INTERVAL ( 100000 ) ( PARTITION initial_part VALUES LESS THAN (1) ); CREATE INDEX CONTENTS_EXT_RTF_IDX ON CONTENTS_ext (request_id, transform_id, workload_id, coll_id, content_id, panda_id, status) LOCAL; + + +-- 2022.12.29 +create table contents_update(content_id number(12), substatus number(2)) +CREATE TRIGGER update_content_dep_status before delete ON contents_update + for each row + BEGIN + update contents set substatus = :old.substatus where contents.content_dep_id = :old.content_id; + END; + +-- 2023.01.24 +alter table CONTENTS_ext modify max_cpu_count NUMBER(12); + +-- 2023.01.25 +alter table contents_update add ( + request_id NUMBER(12), + transform_id NUMBER(12), + workload_id NUMBER(10), + coll_id NUMBER(14)); From b900789c20d18c8e715149dbb7c32f2ea74b49a0 Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Thu, 26 Jan 2023 15:22:12 +0100 Subject: [PATCH 17/31] update tests --- main/lib/idds/tests/core_tests.py | 22 +++++++++++++------- main/lib/idds/tests/core_tests_stat.py | 18 +++++++++++----- main/lib/idds/tests/panda_test.py | 6 +++++- main/lib/idds/tests/test_migrate_requests.py | 5 +++-- monitor/data/conf.js | 12 +++++------ 5 files changed, 41 insertions(+), 22 deletions(-) diff --git a/main/lib/idds/tests/core_tests.py b/main/lib/idds/tests/core_tests.py index 9696faf2..bb92d36a 100644 --- a/main/lib/idds/tests/core_tests.py +++ b/main/lib/idds/tests/core_tests.py @@ -9,7 +9,7 @@ from idds.core.workprogress import get_workprogresses # noqa F401 from idds.core.processings import get_processings # noqa F401 from idds.core import transforms as core_transforms # noqa F401 -from idds.orm.contents import get_input_contents +from idds.orm.contents import get_contents from idds.core.transforms import release_inputs_by_collection, release_inputs_by_collection_old # noqa F401 from idds.workflowv2.workflow import Workflow # noqa F401 from idds.workflowv2.work import Work # noqa F401 @@ -65,9 +65,9 @@ def release_inputs_test(): {'map_id': 7, 'status': ContentStatus.Available, 'retries': 0, 'scope': 'pseudo_dataset', 'substatus': ContentStatus.Available, 'path': None, 'name': 'u_huanlin_panda_test_ci_imsim_w26_20210714T214732Z.qgraph+31_isr_226983_36+1626299263.3909254-24148+31', 'locking': ContentLocking.Idle, 'created_at': datetime.datetime(2021, 7, 14, 21, 48, 10), 'content_id': 2254919, 'min_id': 0, 'bytes': 1, 'updated_at': datetime.datetime(2021, 7, 14, 22, 8, 30), 'coll_id': 4042, 'max_id': 1, 'md5': None, 'accessed_at': datetime.datetime(2021, 7, 14, 22, 8, 30), 'request_id': 107, 'content_type': ContentType.File, 'adler32': '12345678', 'expired_at': datetime.datetime(2021, 8, 13, 21, 48, 10), 'workload_id': 1626299273, 'content_relation_type': ContentRelationType.Output, 'processing_id': None, 'content_metadata': {'events': 1, 'panda_id': 1412278}, 'transform_id': 2021, 'storage_id': None}]} # noqa E501 for coll_id in to_release_inputs: - contents = get_input_contents(request_id=to_release_inputs[coll_id][0]['request_id'], - coll_id=coll_id, - name=None) + contents = get_contents(request_id=to_release_inputs[coll_id][0]['request_id'], + coll_id=coll_id, + name=None) print(len(contents)) in_dep_contents = [] for content in contents: @@ -170,7 +170,8 @@ def print_workflow_template(workflow, layers=0): # reqs = get_requests(request_id=380474, with_request=True, with_detail=False, with_metadata=True) # reqs = get_requests(request_id=381520, with_request=True, with_detail=False, with_metadata=True) # reqs = get_requests(request_id=28182323, with_request=True, with_detail=False, with_metadata=True) -reqs = get_requests(request_id=385554, with_request=True, with_detail=False, with_metadata=True) +# reqs = get_requests(request_id=385554, with_request=True, with_detail=False, with_metadata=True) +reqs = get_requests(request_id=474, with_request=True, with_detail=False, with_metadata=True) for req in reqs: # print(req['request_id']) # print(req) @@ -178,11 +179,11 @@ def print_workflow_template(workflow, layers=0): # print(json_dumps(req, sort_keys=True, indent=4)) # show_works(req) pass - workflow = req['request_metadata']['workflow'] + workflow = req['request_metadata']['build_workflow'] # workflow.get_new_works() print(workflow.runs.keys()) # print(workflow.runs["1"]) - # print(json_dumps(workflow.runs["1"], sort_keys=True, indent=4)) + print(json_dumps(workflow.runs["1"], sort_keys=True, indent=4)) # print(workflow.runs["1"].works.keys()) # print(workflow.runs["1"].has_loop_condition()) @@ -201,6 +202,11 @@ def print_workflow_template(workflow, layers=0): print_workflow(workflow) new_works = workflow.get_new_works() print('new_works:' + str(new_works)) + all_works = workflow.get_all_works() + print('all_works:' + str(all_works)) + for work in all_works: + print("work %s signature: %s" % (work.get_work_id(), work.signature)) + # print("workflow template") # print_workflow_template(workflow) @@ -229,7 +235,7 @@ def print_workflow_template(workflow, layers=0): """ -tfs = get_transforms(request_id=2818) +tfs = get_transforms(request_id=470) # tfs = get_transforms(transform_id=350723) for tf in tfs: # print(tf) diff --git a/main/lib/idds/tests/core_tests_stat.py b/main/lib/idds/tests/core_tests_stat.py index 46bda88c..9e1be43a 100644 --- a/main/lib/idds/tests/core_tests_stat.py +++ b/main/lib/idds/tests/core_tests_stat.py @@ -1,3 +1,4 @@ +from datetime import datetime from idds.common.utils import json_dumps, setup_logging # noqa F401 from idds.common.constants import ContentStatus, ContentType, ContentRelationType, ContentLocking # noqa F401 @@ -14,14 +15,21 @@ setup_logging(__name__) +time_start = "Jan 1 00:00:00 2023" +time_start = datetime.strptime(time_start, "%b %d %H:%M:%S %Y") + +time_end = "Jan 1 00:00:00 2024" +time_end = datetime.strptime(time_end, "%b %d %H:%M:%S %Y") + output_total = 0 output_processed = 0 reqs = get_requests(with_transform=True) for req in reqs: - if "HSC" in req['name'] or "hsc" in req['name']: - print("name: %s, output_total: %s, output_processed: %s" % (req['name'], req['output_total_files'], req['output_processed_files'])) - if req['output_total_files'] and req['output_processed_files']: - output_total += req['output_total_files'] - output_processed += req['output_processed_files'] + if "HSC" in req['name'] or "hsc" in req['name'] or True: + if req['created_at'] > time_start and req['created_at'] < time_end: + print("id: %s, created_at: %s, name: %s, output_total: %s, output_processed: %s" % (req['request_id'], req['created_at'], req['name'], req['output_total_files'], req['output_processed_files'])) + if req['output_total_files'] and req['output_processed_files']: + output_total += req['output_total_files'] + output_processed += req['output_processed_files'] print("Total: %s, processed: %s" % (output_total, output_processed)) diff --git a/main/lib/idds/tests/panda_test.py b/main/lib/idds/tests/panda_test.py index 0bf0db2b..a2f48a33 100644 --- a/main/lib/idds/tests/panda_test.py +++ b/main/lib/idds/tests/panda_test.py @@ -93,7 +93,11 @@ # task_ids = [i for i in range(3692, 3723)] # task_ids = [3834, 3835, 3836] # task_ids = [i for i in range(141294, 142200)] + [i for i in range(141003, 141077)] + [i for i in range(141145, 141255)] -task_ids = [140954, 140955, 142228] +# task_ids = [140954, 140955, 142228] +# task_ids = [i for i in range(142507, 142651)] +# task_ids = [i for i in range(140349, 140954)] + [142268, 142651] +# task_ids = [1851] + [i for i in range(4336, 4374)] + [i for i in range(133965, 136025)] +task_ids = [832, 2347, 3045, 66860, 67036] + [i for i in range(121273, 140349)] for task_id in task_ids: print("Killing %s" % task_id) Client.killTask(task_id) diff --git a/main/lib/idds/tests/test_migrate_requests.py b/main/lib/idds/tests/test_migrate_requests.py index 6b6bcac7..5dcd41fc 100644 --- a/main/lib/idds/tests/test_migrate_requests.py +++ b/main/lib/idds/tests/test_migrate_requests.py @@ -53,6 +53,7 @@ def migrate(): old_request_id = 2603 old_request_id = 2802 old_request_id = 2816 + old_request_id = 3178 # old_request_id = 1 # for old_request_id in [152]: @@ -62,9 +63,9 @@ def migrate(): reqs = cm1.get_requests(request_id=old_request_id, with_metadata=True) # cm2 = ClientManager(host=dev_host) - # cm2 = ClientManager(host=doma_host) + cm2 = ClientManager(host=doma_host) # cm2 = ClientManager(host=atlas_host) - cm2 = ClientManager(host=slac_k8s_dev_host) + # cm2 = ClientManager(host=slac_k8s_dev_host) # cm2 = ClientManager(host=cern_k8s_dev_host) # print(reqs) diff --git a/monitor/data/conf.js b/monitor/data/conf.js index dd0f9da8..b8a48092 100644 --- a/monitor/data/conf.js +++ b/monitor/data/conf.js @@ -1,9 +1,9 @@ var appConfig = { - 'iddsAPI_request': "https://lxplus8s08.cern.ch:443/idds/monitor_request/null/null", - 'iddsAPI_transform': "https://lxplus8s08.cern.ch:443/idds/monitor_transform/null/null", - 'iddsAPI_processing': "https://lxplus8s08.cern.ch:443/idds/monitor_processing/null/null", - 'iddsAPI_request_detail': "https://lxplus8s08.cern.ch:443/idds/monitor/null/null/true/false/false", - 'iddsAPI_transform_detail': "https://lxplus8s08.cern.ch:443/idds/monitor/null/null/false/true/false", - 'iddsAPI_processing_detail': "https://lxplus8s08.cern.ch:443/idds/monitor/null/null/false/false/true" + 'iddsAPI_request': "https://lxplus8s12.cern.ch:443/idds/monitor_request/null/null", + 'iddsAPI_transform': "https://lxplus8s12.cern.ch:443/idds/monitor_transform/null/null", + 'iddsAPI_processing': "https://lxplus8s12.cern.ch:443/idds/monitor_processing/null/null", + 'iddsAPI_request_detail': "https://lxplus8s12.cern.ch:443/idds/monitor/null/null/true/false/false", + 'iddsAPI_transform_detail': "https://lxplus8s12.cern.ch:443/idds/monitor/null/null/false/true/false", + 'iddsAPI_processing_detail': "https://lxplus8s12.cern.ch:443/idds/monitor/null/null/false/false/true" } From 89261ecee16da88876d6a477b8a8e2260f3e165e Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Thu, 26 Jan 2023 15:22:50 +0100 Subject: [PATCH 18/31] update tests --- main/lib/idds/tests/fix_content_dep_id.py | 69 +++++++++++++++++++++++ 1 file changed, 69 insertions(+) create mode 100644 main/lib/idds/tests/fix_content_dep_id.py diff --git a/main/lib/idds/tests/fix_content_dep_id.py b/main/lib/idds/tests/fix_content_dep_id.py new file mode 100644 index 00000000..2a53ac62 --- /dev/null +++ b/main/lib/idds/tests/fix_content_dep_id.py @@ -0,0 +1,69 @@ +#!/usr/bin/env python +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# You may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0OA +# +# Authors: +# - Wen Guan, , 2023 + + +""" +Test client. +""" + + +from idds.client.clientmanager import ClientManager # noqa F401 +from idds.common.utils import json_dumps # noqa F401 +from idds.rest.v1.utils import convert_old_req_2_workflow_req # noqa F401 +from idds.common.utils import setup_logging + +from idds.common.utils import json_dumps, setup_logging # noqa F401 +from idds.common.constants import ContentStatus, ContentType, ContentRelationType, ContentLocking # noqa F401 +from idds.core.requests import get_requests # noqa F401 +from idds.core.messages import retrieve_messages # noqa F401 +from idds.core.transforms import get_transforms, get_transform # noqa F401 +from idds.core.workprogress import get_workprogresses # noqa F401 +from idds.core.processings import get_processings # noqa F401 +from idds.core import transforms as core_transforms # noqa F401 +from idds.core import catalog as core_catalog # noqa F401 +from idds.orm.contents import get_contents # noqa F401 +from idds.orm import contents as orm_contents # noqa F401 +from idds.core.transforms import release_inputs_by_collection, release_inputs_by_collection_old # noqa F401 +from idds.workflowv2.workflow import Workflow # noqa F401 +from idds.workflowv2.work import Work # noqa F401 + + +setup_logging(__name__) + + +def fix_content_dep_id(request_id): + contents = core_catalog.get_contents_by_request_transform(request_id=request_id) + available_output_contents = {} + output_content_name_id = {} + contents_updates = [] + for content in contents: + if content['content_relation_type'] == ContentRelationType.Output: + if content['coll_id'] not in output_content_name_id: + output_content_name_id[content['coll_id']] = {} + output_content_name_id[content['coll_id']][content['name']] = content['content_id'] + if content['substatus'] in [ContentStatus.Available]: + available_output_contents[content['content_id']] = content + n_content = {'content_id': content['content_id'], + 'substatus': content['substatus']} + contents_updates.append(n_content) + to_update_contents = [] + for content in contents: + if content['content_relation_type'] == ContentRelationType.InputDependency: + u_content = {'content_id': content['content_id'], + 'content_dep_id': output_content_name_id[content['coll_id']][content['name']]} + to_update_contents.append(u_content) + orm_contents.update_contents(to_update_contents) + orm_contents.add_contents_update(contents_updates) + + +if __name__ == "__main__": + request_id = 3188 + fix_content_dep_id(request_id) + core_catalog.delete_contents_update() From 1cbb4ab3750fafd2047a4fd0559f4d903830b438 Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Fri, 3 Feb 2023 09:33:00 +0100 Subject: [PATCH 19/31] add database procedure to trigger updating contents --- main/lib/idds/orm/base/models.py | 139 ++++++++++++++++++++++++++---- main/lib/idds/orm/base/session.py | 1 + 2 files changed, 121 insertions(+), 19 deletions(-) diff --git a/main/lib/idds/orm/base/models.py b/main/lib/idds/orm/base/models.py index aa095589..979ed359 100644 --- a/main/lib/idds/orm/base/models.py +++ b/main/lib/idds/orm/base/models.py @@ -736,14 +736,7 @@ class Command(BASE, ModelBase): Index('COMMANDS_TYPE_ST_PR_IDX', 'cmd_type', 'status', 'destination', 'processing_id')) -def register_models(engine): - """ - Creates database tables for all models with the given engine - """ - - # models = (Request, Workprogress, Transform, Workprogress2transform, Processing, Collection, Content, Health, Message) - models = (Request, Transform, Processing, Collection, Content, Content_update, Content_ext, Health, Message, Command) - +def create_trigger(): func = DDL(""" SET search_path TO %s; CREATE OR REPLACE FUNCTION update_dep_contents_status() @@ -765,6 +758,124 @@ def register_models(engine): event.listen(Content_update.__table__, "after_create", func.execute_if(dialect="postgresql")) event.listen(Content_update.__table__, "after_create", trigger_ddl.execute_if(dialect="postgresql")) + +def delete_trigger(): + func = DDL(""" + SET search_path TO %s; + DROP FUNCTION IF EXISTS update_dep_contents_status; + """ % (DEFAULT_SCHEMA_NAME)) + trigger_ddl = DDL(""" + SET search_path TO %s; + DROP TRIGGER IF EXISTS update_content_dep_status ON %s.contents_update; + """ % (DEFAULT_SCHEMA_NAME, DEFAULT_SCHEMA_NAME)) + + event.listen(Content_update.__table__, "before_drop", func.execute_if(dialect="postgresql")) + event.listen(Content_update.__table__, "before_drop", trigger_ddl.execute_if(dialect="postgresql")) + + +def create_func_to_update_contents(): + func1 = DDL(""" + SET search_path TO %s; + CREATE OR REPLACE FUNCTION update_contents_to_others(request_id_in int, transform_id_in int) + RETURNS INTEGER + AS $$ + DECLARE num_rows INTEGER; + BEGIN + num_rows := 0; + + UPDATE %s.contents set substatus = d.substatus from + (select content_id, content_dep_id, substatus from %s.contents where request_id = request_id_in and transform_id = transform_id_in and content_relation_type = 1 and status != 0) d + where %s.contents.request_id = request_id_in and %s.contents.substatus != d.substatus and d.content_id = %s.contents.content_dep_id; + GET DIAGNOSTICS num_rows = ROW_COUNT; + return num_rows; + END; + $$ LANGUAGE PLPGSQL + """ % (DEFAULT_SCHEMA_NAME, DEFAULT_SCHEMA_NAME, DEFAULT_SCHEMA_NAME, + DEFAULT_SCHEMA_NAME, DEFAULT_SCHEMA_NAME, DEFAULT_SCHEMA_NAME)) + + func2 = DDL(""" + SET search_path TO %s; + CREATE OR REPLACE FUNCTION update_contents_from_others(request_id_in int, transform_id_in int) + RETURNS INTEGER + AS $$ + DECLARE num_rows INTEGER; + BEGIN + num_rows := 0; + + UPDATE %s.contents set substatus = d.substatus from + (select content_id, content_dep_id, substatus from %s.contents where request_id = request_id_in and content_relation_type = 1 and status != 0) d + where %s.contents.request_id = request_id_in and %s.contents.transform_id = transform_id_in and %s.contents.substatus != d.substatus and d.content_id = %s.contents.content_dep_id; + GET DIAGNOSTICS num_rows = ROW_COUNT; + return num_rows; + END; + $$ LANGUAGE PLPGSQL + """ % (DEFAULT_SCHEMA_NAME, DEFAULT_SCHEMA_NAME, DEFAULT_SCHEMA_NAME, DEFAULT_SCHEMA_NAME, + DEFAULT_SCHEMA_NAME, DEFAULT_SCHEMA_NAME, DEFAULT_SCHEMA_NAME)) + + event.listen(Content.__table__, "after_create", func1.execute_if(dialect="postgresql")) + event.listen(Content.__table__, "after_create", func2.execute_if(dialect="postgresql")) + + +def drop_func_to_update_contents(): + func = DDL(""" + SET search_path TO %s; + DROP FUNCTION IF EXISTS update_contents_to_others; + DROP FUNCTION IF EXISTS update_contents_from_others; + """ % (DEFAULT_SCHEMA_NAME)) + event.listen(Content.__table__, "before_drop", func.execute_if(dialect="postgresql")) + + +def create_proc_to_update_contents(): + func1 = DDL(""" + SET search_path TO %s; + CREATE OR REPLACE PROCEDURE update_contents_to_others(request_id_in int, transform_id_in int) + AS $$ + BEGIN + UPDATE %s.contents set substatus = d.substatus from + (select content_id, content_dep_id, substatus from %s.contents where request_id = request_id_in and transform_id = transform_id_in and content_relation_type = 1 and status != 0) d + where %s.contents.request_id = request_id_in and %s.contents.substatus != d.substatus and d.content_id = %s.contents.content_dep_id; + END; + $$ LANGUAGE PLPGSQL + """ % (DEFAULT_SCHEMA_NAME, DEFAULT_SCHEMA_NAME, DEFAULT_SCHEMA_NAME, + DEFAULT_SCHEMA_NAME, DEFAULT_SCHEMA_NAME, DEFAULT_SCHEMA_NAME)) + + func2 = DDL(""" + SET search_path TO %s; + CREATE OR REPLACE PROCEDURE update_contents_from_others(request_id_in int, transform_id_in int) + AS $$ + BEGIN + + UPDATE %s.contents set substatus = d.substatus from + (select content_id, content_dep_id, substatus from %s.contents where request_id = request_id_in and content_relation_type = 1 and status != 0) d + where %s.contents.request_id = request_id_in and %s.contents.transform_id = transform_id_in and %s.contents.substatus != d.substatus and d.content_id = %s.contents.content_dep_id; + END; + $$ LANGUAGE PLPGSQL + """ % (DEFAULT_SCHEMA_NAME, DEFAULT_SCHEMA_NAME, DEFAULT_SCHEMA_NAME, DEFAULT_SCHEMA_NAME, + DEFAULT_SCHEMA_NAME, DEFAULT_SCHEMA_NAME, DEFAULT_SCHEMA_NAME)) + + event.listen(Content.__table__, "after_create", func1.execute_if(dialect="postgresql")) + event.listen(Content.__table__, "after_create", func2.execute_if(dialect="postgresql")) + + +def drop_proc_to_update_contents(): + func = DDL(""" + SET search_path TO %s; + DROP PROCEDURE IF EXISTS update_contents_to_others; + DROP PROCEDURE IF EXISTS update_contents_from_others; + """ % (DEFAULT_SCHEMA_NAME)) + event.listen(Content.__table__, "before_drop", func.execute_if(dialect="postgresql")) + + +def register_models(engine): + """ + Creates database tables for all models with the given engine + """ + + # models = (Request, Workprogress, Transform, Workprogress2transform, Processing, Collection, Content, Health, Message) + models = (Request, Transform, Processing, Collection, Content, Content_update, Content_ext, Health, Message, Command) + + create_proc_to_update_contents() + for model in models: # if not engine.has_table(model.__tablename__, model.metadata.schema): model.metadata.create_all(engine) # pylint: disable=maybe-no-member @@ -778,17 +889,7 @@ def unregister_models(engine): # models = (Request, Workprogress, Transform, Workprogress2transform, Processing, Collection, Content, Health, Message) models = (Request, Transform, Processing, Collection, Content, Content_update, Content_ext, Health, Message, Command) - func = DDL(""" - SET search_path TO %s; - DROP FUNCTION IF EXISTS update_dep_contents_status; - """ % (DEFAULT_SCHEMA_NAME)) - trigger_ddl = DDL(""" - SET search_path TO %s; - DROP TRIGGER IF EXISTS update_content_dep_status ON %s.contents_update; - """ % (DEFAULT_SCHEMA_NAME, DEFAULT_SCHEMA_NAME)) - - event.listen(Content_update.__table__, "before_drop", func.execute_if(dialect="postgresql")) - event.listen(Content_update.__table__, "before_drop", trigger_ddl.execute_if(dialect="postgresql")) + drop_proc_to_update_contents() for model in models: model.metadata.drop_all(engine) # pylint: disable=maybe-no-member diff --git a/main/lib/idds/orm/base/session.py b/main/lib/idds/orm/base/session.py index 03a610c4..11d40aa6 100644 --- a/main/lib/idds/orm/base/session.py +++ b/main/lib/idds/orm/base/session.py @@ -210,6 +210,7 @@ def get_session(): _LOCK.release() assert _MAKER session = scoped_session(_MAKER) + session.schema = DEFAULT_SCHEMA_NAME return session From 7542e038e3194484c507fdd3c520a69be39acaf5 Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Fri, 3 Feb 2023 09:41:26 +0100 Subject: [PATCH 20/31] add funcs to update contents by dep id --- main/lib/idds/core/catalog.py | 41 +++++++++++++- main/lib/idds/core/processings.py | 2 + main/lib/idds/orm/contents.py | 92 +++++++++++++++++++++++++------ 3 files changed, 114 insertions(+), 21 deletions(-) diff --git a/main/lib/idds/core/catalog.py b/main/lib/idds/core/catalog.py index 06bbd8ae..169bc8aa 100644 --- a/main/lib/idds/core/catalog.py +++ b/main/lib/idds/core/catalog.py @@ -6,7 +6,7 @@ # http://www.apache.org/licenses/LICENSE-2.0OA # # Authors: -# - Wen Guan, , 2019 - 2021 +# - Wen Guan, , 2019 - 2023 """ @@ -621,6 +621,41 @@ def get_output_contents_by_request_id_status(request_id, name, content_status, l return contents +@read_session +def get_updated_transforms_by_content_status(request_id=None, transform_id=None): + """ + Get updated transform ids by content status + + :param request_id: The Request id. + :param transfomr_id: The transform id. + + :returns list + """ + return orm_contents.get_updated_transforms_by_content_status(request_id=request_id, transform_id=transform_id) + + +@transactional_session +def update_contents_to_others_by_dep_id(request_id=None, transform_id=None): + """ + Update contents to others by content_dep_id. + + :param request_id: The Request id. + :param transfomr_id: The transform id. + """ + return orm_contents.update_contents_to_others_by_dep_id(request_id=request_id, transform_id=transform_id) + + +@transactional_session +def update_contents_from_others_by_dep_id(request_id=None, transform_id=None): + """ + Update contents from others by content_dep_id + + :param request_id: The Request id. + :param transfomr_id: The transform id. + """ + return orm_contents.update_contents_from_others_by_dep_id(request_id=request_id, transform_id=transform_id) + + @transactional_session def add_contents_update(contents, bulk_size=10000, session=None): """ @@ -638,7 +673,7 @@ def add_contents_update(contents, bulk_size=10000, session=None): @transactional_session -def delete_contents_update(session=None): +def delete_contents_update(request_id=None, transform_id=None, session=None): """ delete a content. @@ -647,7 +682,7 @@ def delete_contents_update(session=None): :raises NoObject: If no content is founded. :raises DatabaseException: If there is a database error. """ - return orm_contents.delete_contents_update(session=session) + return orm_contents.delete_contents_update(request_id=request_id, transform_id=transform_id, session=session) def get_contents_ext_maps(): diff --git a/main/lib/idds/core/processings.py b/main/lib/idds/core/processings.py index 49549680..e16c0896 100644 --- a/main/lib/idds/core/processings.py +++ b/main/lib/idds/core/processings.py @@ -319,8 +319,10 @@ def update_processing_contents(update_processing, update_contents, update_messag orm_contents.update_contents(update_contents, session=session) if new_update_contents: # first add and then delete, to trigger the trigger 'update_content_dep_status'. + # too slow orm_contents.add_contents_update(new_update_contents, session=session) # orm_contents.delete_contents_update(session=session) + pass if new_contents: orm_contents.add_contents(new_contents, session=session) if new_contents_ext: diff --git a/main/lib/idds/orm/contents.py b/main/lib/idds/orm/contents.py index efbfb8ee..731c53b2 100644 --- a/main/lib/idds/orm/contents.py +++ b/main/lib/idds/orm/contents.py @@ -6,7 +6,7 @@ # http://www.apache.org/licenses/LICENSE-2.0OA # # Authors: -# - Wen Guan, , 2019 - 2022 +# - Wen Guan, , 2019 - 2023 """ @@ -455,7 +455,7 @@ def update_contents(parameters, session=None): @transactional_session -def update_dep_contents(request_id, content_dep_ids, status, bulk_size=1000, session=None): +def update_dep_contents(request_id, content_dep_ids, status, bulk_size=10000, session=None): """ update dependency contents. @@ -496,6 +496,70 @@ def delete_content(content_id=None, session=None): raise exceptions.NoObject('Content %s cannot be found: %s' % (content_id, error)) +@transactional_session +def update_contents_to_others_by_dep_id(request_id=None, transform_id=None, session=None): + """ + Update contents to others by content_dep_id. + + :param request_id: The Request id. + :param transfomr_id: The transform id. + """ + try: + idds_proc = sqlalchemy.text("CALL %s.update_contents_from_others(:request_id, :transform_id)" % session.schema) + session.execute(idds_proc, {"request_id": request_id, "transform_id": transform_id}) + except Exception as ex: + raise ex + + +@transactional_session +def update_contents_from_others_by_dep_id(request_id=None, transform_id=None, session=None): + """ + Update contents from others by content_dep_id + + :param request_id: The Request id. + :param transfomr_id: The transform id. + """ + try: + idds_proc = sqlalchemy.text("CALL %s.update_contents_from_others(:request_id, :transform_id)" % session.schema) + session.execute(idds_proc, {"request_id": request_id, "transform_id": transform_id}) + except Exception as ex: + raise ex + + +@read_session +def get_updated_transforms_by_content_status(request_id=None, transform_id=None, session=None): + """ + Get updated transform ids by content status + + :param request_id: The Request id. + + :returns list + """ + try: + query = session.query(models.Content.request_id, + models.Content.transform_id, + models.Content.workload_id, + models.Content.coll_id) + query = query.with_hint(models.Content, "INDEX(CONTENTS CONTENTS_ID_NAME_IDX)", 'oracle') + + if request_id: + query = query.filter(models.Content.request_id_id == request_id) + if transform_id: + query = query.filter(models.Content.transform_id == transform_id) + + query = query.filter(models.Content.status != models.Content.substatus) + tmp = query.distinct() + + rets = [] + if tmp: + for t in tmp: + t2 = dict(zip(t.keys(), t)) + rets.append(t2) + return rets + except Exception as error: + raise error + + def get_contents_ext_items(): default_params = {'pandaID': None, 'jobDefinitionID': None, 'schedulerID': None, 'pilotID': None, 'creationTime': None, 'modificationTime': None, @@ -579,7 +643,7 @@ def add_contents_update(contents, bulk_size=10000, session=None): @transactional_session -def delete_contents_update(session=None): +def delete_contents_update(request_id=None, transform_id=None, session=None): """ delete a content. @@ -589,21 +653,13 @@ def delete_contents_update(session=None): :raises DatabaseException: If there is a database error. """ try: - subquery = session.query(models.Content_update.content_id).subquery() - query = session.query(models.Content.request_id, - models.Content.transform_id, - models.Content.workload_id, - models.Content.coll_id) - query = query.filter(models.Content.content_id.in_(subquery)) - - tmp = query.distinct() - rets = [] - if tmp: - for t in tmp: - t2 = dict(zip(t.keys(), t)) - rets.append(t2) - session.query(models.Content_update).delete() - return rets + del_query = session.query(models.Content_update) + if request_id: + del_query = del_query.filter(models.Content_update.request_id == request_id) + if transform_id: + del_query = del_query.filter(models.Content_update.transform_id == transform_id) + del_query.with_for_update(nowait=True, skip_locked=True) + del_query.delete() except Exception as error: raise exceptions.NoObject('Content_update deletion error: %s' % (error)) From c5614639d62a598915dd34a0c5604ac81cced880 Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Fri, 3 Feb 2023 09:43:27 +0100 Subject: [PATCH 21/31] optimize carrier.trigger --- main/lib/idds/agents/carrier/trigger.py | 31 +++++++++++++------------ main/lib/idds/agents/carrier/utils.py | 20 ++++++++++++---- 2 files changed, 32 insertions(+), 19 deletions(-) diff --git a/main/lib/idds/agents/carrier/trigger.py b/main/lib/idds/agents/carrier/trigger.py index 21ec4e3c..47c50983 100644 --- a/main/lib/idds/agents/carrier/trigger.py +++ b/main/lib/idds/agents/carrier/trigger.py @@ -6,13 +6,14 @@ # http://www.apache.org/licenses/LICENSE-2.0OA # # Authors: -# - Wen Guan, , 2019 - 2022 +# - Wen Guan, , 2019 - 2023 import traceback from idds.common import exceptions from idds.common.constants import ProcessingStatus, ProcessingLocking from idds.common.utils import setup_logging, truncate_string +from idds.core import catalog as core_catalog from idds.core import processings as core_processings from idds.agents.common.eventbus.event import (EventType, UpdateTransformEvent, @@ -168,23 +169,23 @@ def process_trigger_processing(self, event): ret = self.handle_trigger_processing(pr) # self.logger.info(log_pre + "process_trigger_processing result: %s" % str(ret)) + new_update_contents = ret.get('new_update_contents', None) + ret['new_update_contents'] = None self.update_processing(ret, pr) - new_update_contents = ret.get('new_update_contents', None) if new_update_contents: - ret1 = self.handle_trigger_processing(pr, trigger_new_updates=True) - self.logger.info(log_pre + "process_trigger_processing1 result: %s" % str(ret1)) - self.update_processing(ret1, pr) - if 'update_transforms' in ret1 and ret1['update_transforms']: - # contents for some other transforms are updated. - for update_transform in ret1['update_transforms']: - if 'transform_id' in update_transform: - update_transform_id = update_transform['transform_id'] - event = UpdateTransformEvent(publisher_id=self.id, - transform_id=update_transform_id, - content={'event': 'Trigger'}) - self.logger.info(log_pre + "Trigger UpdateTransformEvent(transform_id: %s" % update_transform_id) - self.event_bus.send(event) + core_catalog.update_contents_to_others_by_dep_id(request_id=pr['request_id'], transform_id=pr['transform_id']) + # core_catalog.delete_contents_update(request_id=pr['request_id'], transform_id=pr['transform_id']) + update_transforms = core_catalog.get_updated_transforms_by_content_status(request_id=pr['request_id']) + self.logger.info(log_pre + "update_transforms: %s" % str(update_transforms)) + for update_transform in update_transforms: + if 'transform_id' in update_transform: + update_transform_id = update_transform['transform_id'] + event = UpdateTransformEvent(publisher_id=self.id, + transform_id=update_transform_id, + content={'event': 'Trigger'}) + self.logger.info(log_pre + "Trigger UpdateTransformEvent(transform_id: %s" % update_transform_id) + self.event_bus.send(event) if (('processing_status' in ret and ret['processing_status'] == ProcessingStatus.Terminating) or (event._content and 'Terminated' in event._content and event._content['Terminated'])): # noqa W503 diff --git a/main/lib/idds/agents/carrier/utils.py b/main/lib/idds/agents/carrier/utils.py index e7b15fc3..8b4ef8df 100644 --- a/main/lib/idds/agents/carrier/utils.py +++ b/main/lib/idds/agents/carrier/utils.py @@ -6,7 +6,7 @@ # http://www.apache.org/licenses/LICENSE-2.0OA # # Authors: -# - Wen Guan, , 2022 +# - Wen Guan, , 2022 - 2023 import json import logging @@ -970,9 +970,11 @@ def handle_trigger_processing(processing, agent_attributes, trigger_new_updates= else: if trigger_new_updates: # delete information in the contents_update table, to invoke the trigger. - ret_update_transforms = core_catalog.delete_contents_update() - logger.debug(log_prefix + "delete_contents_update: %s" % str(ret_update_transforms)) + # ret_update_transforms = core_catalog.delete_contents_update(request_id=request_id, transform_id=transform_id) + # logger.debug(log_prefix + "delete_contents_update: %s" % str(ret_update_transforms)) + pass + core_catalog.update_contents_from_others_by_dep_id(request_id=request_id, transform_id=transform_id) input_output_maps = get_input_output_maps(transform_id, work) logger.debug(log_prefix + "input_output_maps.keys[:2]: %s" % str(list(input_output_maps.keys())[:2])) @@ -994,7 +996,17 @@ def handle_trigger_processing(processing, agent_attributes, trigger_new_updates= content_updates = content_updates + updated_contents - return processing['substatus'], content_updates, ret_msgs, {}, {}, {}, new_update_contents, ret_update_transforms + # update_dep_contents_status_name = {} + # update_dep_contents_status = {} + # for content in new_update_contents: + # if content['substatus'] not in update_dep_contents_status_name: + # update_dep_contents_status_name[content['substatus'].name] = content['substatus'] + # update_dep_contents_status[content['substatus'].name] = [] + # update_dep_contents_status[content['substatus'].name].append(content['content_id']) + + # return processing['substatus'], content_updates, ret_msgs, {}, {}, {}, new_update_contents, ret_update_transforms + # return processing['substatus'], content_updates, ret_msgs, {}, update_dep_contents_status_name, update_dep_contents_status, [], ret_update_transforms + return processing['substatus'], content_updates, ret_msgs, {}, {}, {}, [], ret_update_transforms def get_content_status_from_panda_msg_status(status): From 1e95e72a4da9f7e50342f66ffdd7c49e48574791 Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Fri, 3 Feb 2023 09:44:36 +0100 Subject: [PATCH 22/31] add tests --- main/lib/idds/tests/core_tests_dep_id.py | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) create mode 100644 main/lib/idds/tests/core_tests_dep_id.py diff --git a/main/lib/idds/tests/core_tests_dep_id.py b/main/lib/idds/tests/core_tests_dep_id.py new file mode 100644 index 00000000..c0a8dc45 --- /dev/null +++ b/main/lib/idds/tests/core_tests_dep_id.py @@ -0,0 +1,24 @@ +from datetime import datetime # noqa F401 + +from idds.common.utils import json_dumps, setup_logging # noqa F401 +from idds.common.constants import ContentStatus, ContentType, ContentRelationType, ContentLocking # noqa F401 +from idds.core.requests import get_requests # noqa F401 +from idds.core.messages import retrieve_messages # noqa F401 +from idds.core.transforms import get_transforms, get_transform # noqa F401 +from idds.core.workprogress import get_workprogresses # noqa F401 +from idds.core.processings import get_processings # noqa F401 +from idds.core import transforms as core_transforms # noqa F401 +from idds.core.transforms import release_inputs_by_collection, release_inputs_by_collection_old # noqa F401 +from idds.workflowv2.workflow import Workflow # noqa F401 +from idds.workflowv2.work import Work # noqa F401 + +from idds.orm import contents as orm_contents # noqa F401 + + +setup_logging(__name__) + +request_id = 486 +transform_id = 3027 # 3028, 3029 + +ret = orm_contents.update_contents_from_others_by_dep_id(request_id=request_id, transform_id=transform_id) +print(ret) From c88e5a191cf8cd99299c72eb5e39688cf0de0d11 Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Fri, 3 Feb 2023 13:47:11 +0100 Subject: [PATCH 23/31] fix session --- main/lib/idds/core/catalog.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/main/lib/idds/core/catalog.py b/main/lib/idds/core/catalog.py index 169bc8aa..b96a2717 100644 --- a/main/lib/idds/core/catalog.py +++ b/main/lib/idds/core/catalog.py @@ -635,25 +635,25 @@ def get_updated_transforms_by_content_status(request_id=None, transform_id=None) @transactional_session -def update_contents_to_others_by_dep_id(request_id=None, transform_id=None): +def update_contents_to_others_by_dep_id(request_id=None, transform_id=None, session=None): """ Update contents to others by content_dep_id. :param request_id: The Request id. :param transfomr_id: The transform id. """ - return orm_contents.update_contents_to_others_by_dep_id(request_id=request_id, transform_id=transform_id) + return orm_contents.update_contents_to_others_by_dep_id(request_id=request_id, transform_id=transform_id, session=session) @transactional_session -def update_contents_from_others_by_dep_id(request_id=None, transform_id=None): +def update_contents_from_others_by_dep_id(request_id=None, transform_id=None, session=None): """ Update contents from others by content_dep_id :param request_id: The Request id. :param transfomr_id: The transform id. """ - return orm_contents.update_contents_from_others_by_dep_id(request_id=request_id, transform_id=transform_id) + return orm_contents.update_contents_from_others_by_dep_id(request_id=request_id, transform_id=transform_id, session=session) @transactional_session From 38cde9900dd6c71f1156793cd2892e44ac476b79 Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Fri, 3 Feb 2023 14:13:42 +0100 Subject: [PATCH 24/31] fix dn parse --- common/lib/idds/common/authentication.py | 2 ++ main/lib/idds/tests/test_get_dn.py | 12 ++++++++++++ 2 files changed, 14 insertions(+) diff --git a/common/lib/idds/common/authentication.py b/common/lib/idds/common/authentication.py index f10ad72e..c44541b9 100644 --- a/common/lib/idds/common/authentication.py +++ b/common/lib/idds/common/authentication.py @@ -392,6 +392,7 @@ def get_user_name_from_dn1(dn): username = username.replace('/CN=limited proxy', '') username = username.replace('limited proxy', '') username = re.sub('/CN=Robot:[^/]+', '', username) + username = re.sub('/CN=Robot[^/]+', '', username) username = re.sub('/CN=nickname:[^/]+', '', username) pat = re.compile('.*/CN=([^\/]+)/CN=([^\/]+)') # noqa W605 mat = pat.match(username) @@ -428,6 +429,7 @@ def get_user_name_from_dn2(dn): username = username.replace(',CN=limited proxy', '') username = username.replace('limited proxy', '') username = re.sub(',CN=Robot:[^/]+', '', username) + username = re.sub(',CN=Robot[^/]+', '', username) username = re.sub(',CN=nickname:[^/]+', '', username) pat = re.compile('.*,CN=([^\,]+),CN=([^\,]+)') # noqa W605 mat = pat.match(username) diff --git a/main/lib/idds/tests/test_get_dn.py b/main/lib/idds/tests/test_get_dn.py index be2177d9..8ac7f662 100644 --- a/main/lib/idds/tests/test_get_dn.py +++ b/main/lib/idds/tests/test_get_dn.py @@ -10,6 +10,8 @@ import re +from idds.common import authentication + # /DC=ch/DC=cern/OU=Organic Units/OU=Users/CN=wguan/CN=667815/CN=Wen Guan/CN=1883443395 def get_user_name_from_dn1(dn): @@ -93,7 +95,17 @@ def get_user_name_from_dn(dn): dn = "/DC=ch/DC=cern/OU=Organic Units/OU=Users/CN=wguan/CN=667815/CN=Wen Guan/CN=1883443395" username = get_user_name_from_dn(dn) print(username) + username = authentication.get_user_name_from_dn(dn) + print("auth: " + username) dn = 'CN=203633261,CN=Wen Guan,CN=667815,CN=wguan,OU=Users,OU=Organic Units,DC=cern,DC=ch' username = get_user_name_from_dn(dn) print(username) + username = authentication.get_user_name_from_dn(dn) + print("auth: " + username) + + dn = "/DC=ch/DC=cern/OU=Organic+Units/OU=Users/CN=atlpilo1/CN=614260/CN=Robot%3A+ATLAS+Pilot1" + username = get_user_name_from_dn(dn) + print(username) + username = authentication.get_user_name_from_dn(dn) + print("auth: " + username) From 456f10c87b1d929b6cfd169afdba3906eb9da6b8 Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Mon, 6 Feb 2023 18:24:17 +0100 Subject: [PATCH 25/31] fix sqlalchemy 1.4.46 --- main/tools/env/environment.yml | 2 +- requirements.yaml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/main/tools/env/environment.yml b/main/tools/env/environment.yml index 343330f7..9747a65d 100644 --- a/main/tools/env/environment.yml +++ b/main/tools/env/environment.yml @@ -5,7 +5,7 @@ dependencies: - pip: - argcomplete - requests # requests - - SQLAlchemy # db orm + - SQLAlchemy==1.4.46 # db orm, SQLAlchemy 2.0 has been deployed. Howerever it crashed iDDS. - urllib3 # url connections - retrying # retrying behavior # - mysqlclient # mysql python client diff --git a/requirements.yaml b/requirements.yaml index 1e240f4f..cf87039a 100644 --- a/requirements.yaml +++ b/requirements.yaml @@ -5,7 +5,7 @@ dependencies: - pip: - argcomplete - requests # requests - - SQLAlchemy # db orm + - SQLAlchemy==1.4.46 # db orm - urllib3 # url connections - retrying # retrying behavior # - mysqlclient # mysql python client From 75df82c42e889dece69b2e2b0019bf40d3ae02c2 Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Mon, 6 Feb 2023 18:24:43 +0100 Subject: [PATCH 26/31] fix alembic --- main/setup.py | 6 ++++-- start-daemon.sh | 3 ++- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/main/setup.py b/main/setup.py index 3d5da21e..8db70ce4 100644 --- a/main/setup.py +++ b/main/setup.py @@ -6,7 +6,7 @@ # http://www.apache.org/licenses/LICENSE-2.0OA # # Authors: -# - Wen Guan, , 2019 - 2022 +# - Wen Guan, , 2019 - 2023 import glob @@ -130,6 +130,8 @@ def replace_data_path(wsgi_file, install_data_path): ('tools/env/', glob.glob('tools/env/*')), ] +print(data_files) + scripts = glob.glob('bin/*') setup( @@ -142,7 +144,7 @@ def replace_data_path(wsgi_file, install_data_path): author='IRIS-HEP Team', author_email='atlas-adc-panda@cern.ch', python_requires='>=3.6', - packages=find_packages('lib/'), + packages=find_packages(where='lib'), package_dir={'': 'lib'}, install_requires=install_requires, extras_require=extras_requires, diff --git a/start-daemon.sh b/start-daemon.sh index 1f50670b..d6b11c3f 100755 --- a/start-daemon.sh +++ b/start-daemon.sh @@ -14,6 +14,7 @@ source /etc/profile.d/conda.sh conda activate /opt/idds; export IDDS_HOME=/opt/idds +export ALEMBIC_CONFIG=/opt/idds/config/idds/alembic.ini if [ -f /etc/grid-security/hostkey.pem ]; then echo "host certificate is already created." @@ -168,8 +169,8 @@ fi # create database if not exists python /opt/idds/tools/env/create_database.py # upgrade database -export ALEMBIC_CONFIG=/opt/idds/config/idds/alembic.ini alembic upgrade head + # configure monitor python /opt/idds/tools/env/config_monitor.py -s ${IDDS_HOME}/monitor/data/conf.js.template -d ${IDDS_HOME}/monitor/data/conf.js --host ${IDDS_SERVER} From 7e33ff8843af580df446e90db2a1d2fde4451914 Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Mon, 6 Feb 2023 18:25:27 +0100 Subject: [PATCH 27/31] fix alembic --- main/lib/idds/orm/base/alembic/__init__.py | 9 +++++++++ main/lib/idds/orm/base/alembic/versions/__init__.py | 9 +++++++++ 2 files changed, 18 insertions(+) create mode 100644 main/lib/idds/orm/base/alembic/__init__.py create mode 100644 main/lib/idds/orm/base/alembic/versions/__init__.py diff --git a/main/lib/idds/orm/base/alembic/__init__.py b/main/lib/idds/orm/base/alembic/__init__.py new file mode 100644 index 00000000..865b774e --- /dev/null +++ b/main/lib/idds/orm/base/alembic/__init__.py @@ -0,0 +1,9 @@ +#!/usr/bin/env python +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# You may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0OA +# +# Authors: +# - Wen Guan, , 2019 diff --git a/main/lib/idds/orm/base/alembic/versions/__init__.py b/main/lib/idds/orm/base/alembic/versions/__init__.py new file mode 100644 index 00000000..865b774e --- /dev/null +++ b/main/lib/idds/orm/base/alembic/versions/__init__.py @@ -0,0 +1,9 @@ +#!/usr/bin/env python +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# You may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0OA +# +# Authors: +# - Wen Guan, , 2019 From c064ced8b0dcf7d9f7ed43a7e7cacfb998e7915a Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Wed, 8 Feb 2023 13:51:50 +0100 Subject: [PATCH 28/31] fix msg broking --- .../idds/agents/common/plugins/messaging.py | 120 ++++++++++-------- 1 file changed, 66 insertions(+), 54 deletions(-) diff --git a/main/lib/idds/agents/common/plugins/messaging.py b/main/lib/idds/agents/common/plugins/messaging.py index d1ffa7c4..6045b430 100644 --- a/main/lib/idds/agents/common/plugins/messaging.py +++ b/main/lib/idds/agents/common/plugins/messaging.py @@ -6,7 +6,7 @@ # http://www.apache.org/licenses/LICENSE-2.0OA # # Authors: -# - Wen Guan, , 2019 - 2022 +# - Wen Guan, , 2019 - 2023 import logging @@ -103,60 +103,69 @@ def connect_to_messaging_brokers(self, sender=True): channel_conns = {} for name in self.channels: channel = self.channels[name] - brokers = channel['brokers'] - if type(brokers) in [list, tuple]: - pass + if channel and 'brokers' in channel: + brokers = channel['brokers'] + if type(brokers) in [list, tuple]: + pass + else: + brokers = brokers.split(",") + # destination = channel['destination'] + # username = channel['username'] + # password = channel['password'] + broker_timeout = channel['broker_timeout'] + + broker_addresses = [] + for b in brokers: + try: + b, port = b.split(":") + + addrinfos = socket.getaddrinfo(b, 0, socket.AF_INET, 0, socket.IPPROTO_TCP) + for addrinfo in addrinfos: + b_addr = addrinfo[4][0] + broker_addresses.append((b_addr, port)) + except socket.gaierror as error: + self.logger.error('Cannot resolve hostname %s: %s' % (b, str(error))) + + self.logger.info("Resolved broker addresses for channel %s: %s" % (name, broker_addresses)) + + timeout = broker_timeout + + conns = [] + for broker, port in broker_addresses: + conn = stomp.Connection12(host_and_ports=[(broker, port)], + keepalive=True, + heartbeats=(60000, 60000), # one minute + timeout=timeout) + conns.append(conn) + channel_conns[name] = conns else: - brokers = brokers.split(",") - # destination = channel['destination'] - # username = channel['username'] - # password = channel['password'] - broker_timeout = channel['broker_timeout'] - - broker_addresses = [] - for b in brokers: - try: - b, port = b.split(":") - - addrinfos = socket.getaddrinfo(b, 0, socket.AF_INET, 0, socket.IPPROTO_TCP) - for addrinfo in addrinfos: - b_addr = addrinfo[4][0] - broker_addresses.append((b_addr, port)) - except socket.gaierror as error: - self.logger.error('Cannot resolve hostname %s: %s' % (b, str(error))) - - self.logger.info("Resolved broker addresses for channel %s: %s" % (name, broker_addresses)) - - timeout = broker_timeout - - conns = [] - for broker, port in broker_addresses: - conn = stomp.Connection12(host_and_ports=[(broker, port)], - keepalive=True, - heartbeats=(60000, 60000), # one minute - timeout=timeout) - conns.append(conn) - channel_conns[name] = conns + channel_conns[name] = None return channel_conns def disconnect(self, conns): for name in conns: - for conn in conns[name]: - try: - conn.disconnect() - except Exception: - pass + if conns[name]: + for conn in conns[name]: + try: + conn.disconnect() + except Exception: + pass def get_connection(self, destination): try: if destination not in self.conns: destination = 'default' - conn = random.sample(self.conns[destination], 1)[0] - queue_dest = self.channels[destination]['destination'] - if not conn.is_connected(): - # conn.start() - conn.connect(self.username, self.password, wait=True) - return conn, queue_dest, destination + if self.conns[destination]: + conn = random.sample(self.conns[destination], 1)[0] + queue_dest = self.channels[destination]['destination'] + username = self.channels[destination]['username'] + password = self.channels[destination]['password'] + if not conn.is_connected(): + # conn.start() + conn.connect(username, password, wait=True) + return conn, queue_dest, destination + else: + return None, None, destination except Exception as error: self.logger.error("Failed to connect to message broker(will re-resolve brokers): %s" % str(error)) @@ -178,15 +187,18 @@ def send_message(self, msg): destination = msg['destination'] if 'destination' in msg else 'default' conn, queue_dest, destination = self.get_connection(destination) - self.logger.info("Sending message to message broker(%s): %s" % (destination, msg['msg_id'])) - self.logger.debug("Sending message to message broker(%s): %s" % (destination, json.dumps(msg['msg_content']))) - conn.send(body=json.dumps(msg['msg_content']), - destination=queue_dest, - id='atlas-idds-messaging', - ack='auto', - headers={'persistent': 'true', - 'vo': 'atlas', - 'msg_type': str(msg['msg_type']).lower()}) + if conn: + self.logger.info("Sending message to message broker(%s): %s" % (destination, msg['msg_id'])) + self.logger.debug("Sending message to message broker(%s): %s" % (destination, json.dumps(msg['msg_content']))) + conn.send(body=json.dumps(msg['msg_content']), + destination=queue_dest, + id='atlas-idds-messaging', + ack='auto', + headers={'persistent': 'true', + 'vo': 'atlas', + 'msg_type': str(msg['msg_type']).lower()}) + else: + self.logger.info("No brokers defined, discard(%s): %s" % (destination, msg['msg_id'])) def execute_send(self): try: From ef50e8563a1a298149cdfc06a69c764b9a7a94c7 Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Wed, 8 Feb 2023 13:52:03 +0100 Subject: [PATCH 29/31] add oracle updates --- main/etc/sql/oracle_update.sql | 47 ++++++++++++++++++++ main/lib/idds/tests/test_migrate_requests.py | 8 ++-- monitor/data/conf.js | 12 ++--- 3 files changed, 57 insertions(+), 10 deletions(-) diff --git a/main/etc/sql/oracle_update.sql b/main/etc/sql/oracle_update.sql index 5ef63790..6a0594c1 100644 --- a/main/etc/sql/oracle_update.sql +++ b/main/etc/sql/oracle_update.sql @@ -238,3 +238,50 @@ alter table contents_update add ( transform_id NUMBER(12), workload_id NUMBER(10), coll_id NUMBER(14)); + +-- 2023.02.01 +--- update (select c.content_id, c.substatus as c_substatus, t.substatus as t_substatus from contents c inner join +-- (select content_id, substatus from contents where request_id=486 and transform_id=3027 and content_relation_type =1 and status != substatus) t +--- on c.content_dep_id = t.content_id where c.request_id=486 and c.substatus != t.substatus) set c_substatus = t_substatus; + +--- remove +""" +CREATE OR REPLACE FUNCTION update_contents_to_others(request_id_in IN NUMBER, transform_id_in IN NUMBER) +RETURN NUMBER +IS num_rows NUMBER; +BEGIN + update (select c.content_id, c.substatus as c_substatus, t.substatus as t_substatus from contents c inner join + (select content_id, substatus from contents where request_id = request_id_in and transform_id = transform_id_in and content_relation_type = 1 and status != substatus) t + on c.content_dep_id = t.content_id where c.request_id = request_id_in and c.substatus != t.substatus) set c_substatus = t_substatus; + + num_rows := SQL%rowcount; + RETURN (num_rows); +END; + +CREATE OR REPLACE FUNCTION update_contents_from_others(request_id_in IN NUMBER, transform_id_in IN NUMBER) +RETURN NUMBER +IS num_rows NUMBER; +BEGIN + update (select c.content_id, c.substatus as c_substatus, t.substatus as t_substatus from contents c inner join + (select content_id, substatus from contents where request_id = request_id_in and content_relation_type = 1 and status != 0) t + on c.content_dep_id = t.content_id where c.request_id = request_id_in and c.transform_id = transform_id_in and c.substatus != t.substatus) set c_substatus = t_substatus; + + num_rows := SQL%rowcount; + RETURN (num_rows); +END; +""" + +CREATE OR REPLACE procedure update_contents_from_others(request_id_in NUMBER, transform_id_in NUMBER) AS +BEGIN + update (select c.content_id, c.substatus as c_substatus, t.substatus as t_substatus from contents c inner join + (select content_id, substatus from contents where request_id = request_id_in and content_relation_type = 1 and status != 0) t + on c.content_dep_id = t.content_id where c.request_id = request_id_in and c.transform_id = transform_id_in and c.substatus != t.substatus) set c_substatus = t_substatus; +END; + + +CREATE OR REPLACE procedure update_contents_to_others(request_id_in NUMBER, transform_id_in NUMBER) AS +BEGIN + update (select c.content_id, c.substatus as c_substatus, t.substatus as t_substatus from contents c inner join + (select content_id, substatus from contents where request_id = request_id_in and transform_id = transform_id_in and content_relation_type = 1 and status != substatus) t + on c.content_dep_id = t.content_id where c.request_id = request_id_in and c.substatus != t.substatus) set c_substatus = t_substatus; +END; diff --git a/main/lib/idds/tests/test_migrate_requests.py b/main/lib/idds/tests/test_migrate_requests.py index 5dcd41fc..3e6b7257 100644 --- a/main/lib/idds/tests/test_migrate_requests.py +++ b/main/lib/idds/tests/test_migrate_requests.py @@ -52,8 +52,8 @@ def migrate(): old_request_id = 372930 old_request_id = 2603 old_request_id = 2802 - old_request_id = 2816 - old_request_id = 3178 + old_request_id = 2816 # big tasks + # old_request_id = 3178 # 125 tasks # old_request_id = 1 # for old_request_id in [152]: @@ -63,9 +63,9 @@ def migrate(): reqs = cm1.get_requests(request_id=old_request_id, with_metadata=True) # cm2 = ClientManager(host=dev_host) - cm2 = ClientManager(host=doma_host) + # cm2 = ClientManager(host=doma_host) # cm2 = ClientManager(host=atlas_host) - # cm2 = ClientManager(host=slac_k8s_dev_host) + cm2 = ClientManager(host=slac_k8s_dev_host) # cm2 = ClientManager(host=cern_k8s_dev_host) # print(reqs) diff --git a/monitor/data/conf.js b/monitor/data/conf.js index b8a48092..69e7d923 100644 --- a/monitor/data/conf.js +++ b/monitor/data/conf.js @@ -1,9 +1,9 @@ var appConfig = { - 'iddsAPI_request': "https://lxplus8s12.cern.ch:443/idds/monitor_request/null/null", - 'iddsAPI_transform': "https://lxplus8s12.cern.ch:443/idds/monitor_transform/null/null", - 'iddsAPI_processing': "https://lxplus8s12.cern.ch:443/idds/monitor_processing/null/null", - 'iddsAPI_request_detail': "https://lxplus8s12.cern.ch:443/idds/monitor/null/null/true/false/false", - 'iddsAPI_transform_detail': "https://lxplus8s12.cern.ch:443/idds/monitor/null/null/false/true/false", - 'iddsAPI_processing_detail': "https://lxplus8s12.cern.ch:443/idds/monitor/null/null/false/false/true" + 'iddsAPI_request': "https://lxplus8s15.cern.ch:443/idds/monitor_request/null/null", + 'iddsAPI_transform': "https://lxplus8s15.cern.ch:443/idds/monitor_transform/null/null", + 'iddsAPI_processing': "https://lxplus8s15.cern.ch:443/idds/monitor_processing/null/null", + 'iddsAPI_request_detail': "https://lxplus8s15.cern.ch:443/idds/monitor/null/null/true/false/false", + 'iddsAPI_transform_detail': "https://lxplus8s15.cern.ch:443/idds/monitor/null/null/false/true/false", + 'iddsAPI_processing_detail': "https://lxplus8s15.cern.ch:443/idds/monitor/null/null/false/false/true" } From 9baad905be0406734d8ad5606c5c7ef2c6989169 Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Fri, 10 Feb 2023 09:59:47 +0100 Subject: [PATCH 30/31] only duplicated sending collection messages --- main/lib/idds/agents/conductor/conductor.py | 12 +++++++++--- main/lib/idds/orm/messages.py | 9 +++++++-- 2 files changed, 16 insertions(+), 5 deletions(-) diff --git a/main/lib/idds/agents/conductor/conductor.py b/main/lib/idds/agents/conductor/conductor.py index 7ed50a16..c3167ebf 100644 --- a/main/lib/idds/agents/conductor/conductor.py +++ b/main/lib/idds/agents/conductor/conductor.py @@ -6,7 +6,7 @@ # http://www.apache.org/licenses/LICENSE-2.0OA # # Authors: -# - Wen Guan, , 2019 - 2022 +# - Wen Guan, , 2019 - 2023 import time import traceback @@ -17,7 +17,7 @@ # Python 2 from Queue import Queue -from idds.common.constants import (Sections, MessageStatus, MessageDestination) +from idds.common.constants import (Sections, MessageStatus, MessageDestination, MessageType) from idds.common.exceptions import AgentPluginError, IDDSException from idds.common.utils import setup_logging, get_logger from idds.core import messages as core_messages @@ -73,6 +73,11 @@ def get_messages(self): if messages: self.logger.info("Main thread get %s new messages" % len(messages)) + msg_type = [MessageType.StageInCollection, MessageType.StageInWork, + MessageType.ActiveLearningCollection, MessageType.ActiveLearningWork, + MessageType.HyperParameterOptCollection, MessageType.HyperParameterOptWork, + MessageType.ProcessingCollection, MessageType.ProcessingWork, + MessageType.UnknownCollection, MessageType.UnknownWork] retry_messages = [] for retry in range(1, self.replay_times + 1): delay = int(self.delay) * (retry ** 3) @@ -80,7 +85,8 @@ def get_messages(self): messages_d = core_messages.retrieve_messages(status=MessageStatus.Delivered, retries=retry, delay=delay, bulk_size=self.retrieve_bulk_size, - destination=destination) + destination=destination, + msg_type=msg_type) if messages_d: self.logger.info("Main thread get %s retries messages" % len(messages_d)) retry_messages += messages_d diff --git a/main/lib/idds/orm/messages.py b/main/lib/idds/orm/messages.py index 50f090d9..f4619175 100644 --- a/main/lib/idds/orm/messages.py +++ b/main/lib/idds/orm/messages.py @@ -6,7 +6,7 @@ # http://www.apache.org/licenses/LICENSE-2.0OA # # Authors: -# - Wen Guan, , 2019 +# - Wen Guan, , 2019 - 2023 """ @@ -137,6 +137,11 @@ def retrieve_messages(bulk_size=1000, msg_type=None, status=None, source=None, destination = [destination] if len(destination) == 1: destination = [destination[0], destination[0]] + if msg_type is not None: + if not isinstance(msg_type, (list, tuple)): + msg_type = [msg_type] + if len(msg_type) == 1: + msg_type = [msg_type[0], msg_type[0]] query = session.query(models.Message) if request_id is not None: @@ -149,7 +154,7 @@ def retrieve_messages(bulk_size=1000, msg_type=None, status=None, source=None, query = query.with_hint(models.Message, "INDEX(MESSAGES MESSAGES_TYPE_ST_IDX)", 'oracle') if msg_type is not None: - query = query.filter_by(msg_type=msg_type) + query = query.filter(models.Message.msg_type.in_(msg_type)) if status is not None: query = query.filter_by(status=status) if source is not None: From 6d6b859d5b0e8511eff4458d6436f6440feacb61 Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Fri, 10 Feb 2023 10:00:09 +0100 Subject: [PATCH 31/31] send input collection closed messages --- main/lib/idds/agents/carrier/utils.py | 49 +++++++++++++++++++++++---- 1 file changed, 43 insertions(+), 6 deletions(-) diff --git a/main/lib/idds/agents/carrier/utils.py b/main/lib/idds/agents/carrier/utils.py index 8b4ef8df..975c852f 100644 --- a/main/lib/idds/agents/carrier/utils.py +++ b/main/lib/idds/agents/carrier/utils.py @@ -405,6 +405,25 @@ def generate_messages(request_id, transform_id, workload_id, work, msg_type='fil 'num_contents': num_msg_content, 'msg_content': msg_content} return [msg] + elif msg_type == 'collection': + msg_type_contents = [] + for coll in files: + msg_type_content = generate_collection_messages(request_id, transform_id, workload_id, work, coll, relation_type=relation_type) + msg_type_contents.append(msg_type_content) + + msgs = [] + for i_msg_type, msg_content, num_msg_content in msg_type_contents: + msg = {'msg_type': i_msg_type, + 'status': MessageStatus.New, + 'source': MessageSource.Carrier, + 'destination': MessageDestination.Outside, + 'request_id': request_id, + 'workload_id': workload_id, + 'transform_id': transform_id, + 'num_contents': num_msg_content, + 'msg_content': msg_content} + msgs.append(msg) + return msgs elif msg_type == 'work': # link collections input_collections = work.get_input_collections() @@ -1204,6 +1223,7 @@ def sync_collection_status(request_id, transform_id, workload_id, work, input_ou all_updates_flushed = True coll_status = {} + messages = [] for map_id in input_output_maps: inputs = input_output_maps[map_id]['inputs'] if 'inputs' in input_output_maps[map_id] else [] # inputs_dependency = input_output_maps[map_id]['inputs_dependency'] if 'inputs_dependency' in input_output_maps[map_id] else [] @@ -1296,6 +1316,20 @@ def sync_collection_status(request_id, transform_id, workload_id, work, input_ou 'processed_ext_files': coll.processed_ext_files, 'failed_ext_files': coll.failed_ext_files, 'missing_ext_files': coll.missing_ext_files} + + if coll in input_collections: + if coll.total_files == coll.processed_files + coll.failed_files + coll.missing_files: + coll_db = core_catalog.get_collection(coll_id=coll.coll_id) + coll.status = coll_db['status'] + if coll.status is not None and coll.status != CollectionStatus.Closed: + u_coll['status'] = CollectionStatus.Closed + u_coll['substatus'] = CollectionStatus.Closed + coll.status = CollectionStatus.Closed + coll.substatus = CollectionStatus.Closed + + msgs = generate_messages(request_id, transform_id, workload_id, work, msg_type='collection', files=[coll], relation_type='input') + messages += msgs + if terminate: if coll in output_collections and work.require_ext_contents(): if coll.processed_files == coll.processed_ext_files and coll.failed_files == coll.failed_ext_files: @@ -1313,7 +1347,7 @@ def sync_collection_status(request_id, transform_id, workload_id, work, input_ou coll.substatus = CollectionStatus.Closed update_collections.append(u_coll) - return update_collections, all_updates_flushed + return update_collections, all_updates_flushed, messages def sync_work_status(request_id, transform_id, workload_id, work): @@ -1353,16 +1387,19 @@ def sync_processing(processing, agent_attributes, terminate=False, logger=None, work = proc.work work.set_agent_attributes(agent_attributes, processing) + messages = [] input_output_maps = get_input_output_maps(transform_id, work) - update_collections, all_updates_flushed = sync_collection_status(request_id, transform_id, workload_id, work, - input_output_maps=input_output_maps, - close_collection=True, terminate=terminate) + update_collections, all_updates_flushed, msgs = sync_collection_status(request_id, transform_id, workload_id, work, + input_output_maps=input_output_maps, + close_collection=True, terminate=terminate) + + messages += msgs - messages = [] sync_work_status(request_id, transform_id, workload_id, work) logger.info(log_prefix + "sync_processing: work status: %s" % work.get_status()) if terminate and work.is_terminated(): - messages = generate_messages(request_id, transform_id, workload_id, work, msg_type='work') + msgs = generate_messages(request_id, transform_id, workload_id, work, msg_type='work') + messages += msgs if work.is_finished(): # processing['status'] = ProcessingStatus.Finished processing['status'] = processing['substatus']