From d5293bd9a92a3d8a9e805b577214845b9eea9440 Mon Sep 17 00:00:00 2001 From: wguanicedew Date: Fri, 12 Apr 2024 16:52:20 +0200 Subject: [PATCH] Revert "Env var to ignore work / workflow decorators" --- common/lib/idds/common/utils.py | 32 ------------------------- main/etc/sql/postgres_partition.sql | 15 ------------ main/etc/sql/postgresql.sql | 15 ++++++------ website/data/index.html | 2 +- workflow/lib/idds/iworkflow/work.py | 10 ++++---- workflow/lib/idds/iworkflow/workflow.py | 15 +++++++----- 6 files changed, 24 insertions(+), 65 deletions(-) delete mode 100644 main/etc/sql/postgres_partition.sql diff --git a/common/lib/idds/common/utils.py b/common/lib/idds/common/utils.py index f2470343..1a73cde2 100644 --- a/common/lib/idds/common/utils.py +++ b/common/lib/idds/common/utils.py @@ -25,7 +25,6 @@ import tarfile import time # import traceback -import contextlib from enum import Enum from functools import wraps @@ -943,34 +942,3 @@ def idds_mask(dict_): else: ret[k] = dict_[k] return ret - - -@contextlib.contextmanager -def modified_environ(*remove, **update): - """ - Temporarily updates the ``os.environ`` dictionary in-place. - - The ``os.environ`` dictionary is updated in-place so that the modification - is sure to work in all situations. - - :param remove: Environment variables to remove. - :param update: Dictionary of environment variables and values to add/update. - """ - env = os.environ - update = update or {} - remove = remove or [] - - # List of environment variables being updated or removed. - stomped = (set(update.keys()) | set(remove)) & set(env.keys()) - # Environment variables and values to restore on exit. - update_after = {k: env[k] for k in stomped} - # Environment variables and values to remove on exit. - remove_after = frozenset(k for k in update if k not in env) - - try: - env.update(update) - [env.pop(k, None) for k in remove] - yield - finally: - env.update(update_after) - [env.pop(k) for k in remove_after] \ No newline at end of file diff --git a/main/etc/sql/postgres_partition.sql b/main/etc/sql/postgres_partition.sql deleted file mode 100644 index 42eb4843..00000000 --- a/main/etc/sql/postgres_partition.sql +++ /dev/null @@ -1,15 +0,0 @@ -SELECT partman.create_parent( -p_parent_table => 'doma_idds.contents', -p_control => 'request_id', -p_type => 'native', -p_interval=> '1000', -p_premake => 3 -); - -SELECT partman.create_parent( -p_parent_table => 'doma_idds.contents_ext', -p_control => 'request_id', -p_type => 'native', -p_interval=> '1000', -p_premake => 3 -); \ No newline at end of file diff --git a/main/etc/sql/postgresql.sql b/main/etc/sql/postgresql.sql index c8d1f1d1..792a94cf 100644 --- a/main/etc/sql/postgresql.sql +++ b/main/etc/sql/postgresql.sql @@ -181,8 +181,8 @@ CREATE TABLE doma_idds.contents_ext ( memory_leak VARCHAR(10), memory_leak_x2 VARCHAR(10), job_label VARCHAR(20), - CONSTRAINT "CONTENTS_EXT_PK" PRIMARY KEY (content_id, request_id) -) PARTITION BY RANGE (request_id) ; + CONSTRAINT "CONTENTS_EXT_PK" PRIMARY KEY (content_id) +); CREATE INDEX "CONTENTS_EXT_RTW_IDX" ON doma_idds.contents_ext (request_id, transform_id, workload_id); @@ -500,13 +500,13 @@ CREATE TABLE doma_idds.contents ( accessed_at TIMESTAMP WITHOUT TIME ZONE, expired_at TIMESTAMP WITHOUT TIME ZONE, content_metadata VARCHAR(1000), - CONSTRAINT "CONTENTS_PK_TEST" PRIMARY KEY (content_id, request_id), - CONSTRAINT "CONTENT_ID_UQ_TEST" UNIQUE (transform_id, coll_id, request_id, map_id, sub_map_id, dep_sub_map_id, content_relation_type, name_md5, scope_name_md5, min_id, max_id), - CONSTRAINT "CONTENTS_TRANSFORM_ID_FK" FOREIGN KEY(transform_id) REFERENCES doma_idds.transforms (transform_id), + CONSTRAINT "CONTENTS_PK" PRIMARY KEY (content_id), + CONSTRAINT "CONTENT_ID_UQ" UNIQUE (transform_id, coll_id, map_id, sub_map_id, dep_sub_map_id, content_relation_type, name_md5, scope_name_md5, min_id, max_id), + CONSTRAINT "CONTENTS_TRANSFORM_ID_FK" FOREIGN KEY(transform_id) REFERENCES doma_idds.transforms (transform_id), CONSTRAINT "CONTENTS_COLL_ID_FK" FOREIGN KEY(coll_id) REFERENCES doma_idds.collections (coll_id), CONSTRAINT "CONTENTS_STATUS_ID_NN" CHECK (status IS NOT NULL), CONSTRAINT "CONTENTS_COLL_ID_NN" CHECK (coll_id IS NOT NULL) -) PARTITION BY RANGE (request_id) ; +); CREATE INDEX "CONTENTS_STATUS_UPDATED_IDX" ON doma_idds.contents (status, locking, updated_at, created_at); @@ -518,7 +518,8 @@ CREATE INDEX "CONTENTS_REQ_TF_COLL_IDX" ON doma_idds.contents (request_id, trans CREATE INDEX "CONTENTS_TF_IDX" ON doma_idds.contents (transform_id, request_id, coll_id, map_id, content_relation_type); -CREATE INDEX "CONTENTS_ID_NAME_IDX" ON doma_idds.contents (coll_id, scope, md5('name'), status); +CREATE INDEX "CONTENTS_ID_NAME_IDX" ON doma_idds.contents (coll_id, scope, md5('name'); +, status); SET search_path TO doma_idds; diff --git a/website/data/index.html b/website/data/index.html index 01434d84..5535ef1f 100644 --- a/website/data/index.html +++ b/website/data/index.html @@ -61,7 +61,7 @@

Intelligent

Fine grained data transformation

-

Remote data transformation/reduction

+

Remtoe data transformation/reduction

On-demand production of analysis format data transformation

Transformation

diff --git a/workflow/lib/idds/iworkflow/work.py b/workflow/lib/idds/iworkflow/work.py index 41663271..f15e9cb3 100644 --- a/workflow/lib/idds/iworkflow/work.py +++ b/workflow/lib/idds/iworkflow/work.py @@ -21,7 +21,7 @@ from idds.common import exceptions from idds.common.constants import WorkflowType, TransformStatus from idds.common.imports import get_func_name -from idds.common.utils import setup_logging, json_dumps, json_loads, encode_base64, modified_environ +from idds.common.utils import setup_logging, json_dumps, json_loads, encode_base64 from .asyncresult import AsyncResult, MapResult from .base import Base, Context from .workflow import WorkflowCanvas @@ -779,8 +779,10 @@ def load(self, func_name): :raise Exception """ - with modified_environ(IDDS_IGNORE_WORK_DECORATOR='true'): - func = super(Work, self).load(func_name) + os.environ['IDDS_IWORKFLOW_LOAD_WORK'] = 'true' + func = super(Work, self).load(func_name) + del os.environ['IDDS_IWORKFLOW_LOAD_WORK'] + return func def pre_run(self): @@ -920,7 +922,7 @@ def work(func=None, *, map_results=False, lazy=False, init_env=None): if func is None: return functools.partial(work, map_results=map_results, lazy=lazy, init_env=init_env) - if 'IDDS_IGNORE_WORK_DECORATOR' in os.environ: + if 'IDDS_IWORKFLOW_LOAD_WORK' in os.environ: return func @functools.wraps(func) diff --git a/workflow/lib/idds/iworkflow/workflow.py b/workflow/lib/idds/iworkflow/workflow.py index 5d1ee912..cf4eb719 100644 --- a/workflow/lib/idds/iworkflow/workflow.py +++ b/workflow/lib/idds/iworkflow/workflow.py @@ -23,8 +23,7 @@ # from idds.common import exceptions from idds.common.constants import WorkflowType -from idds.common.utils import setup_logging, create_archive_file, json_dumps, json_loads, encode_base64,\ - modified_environ +from idds.common.utils import setup_logging, create_archive_file, json_dumps, json_loads, encode_base64 from .asyncresult import AsyncResult from .base import Base, Context @@ -639,7 +638,9 @@ def get_idds_env(self): def get_idds_server(self): if 'IDDS_HOST' in self._idds_env: return self._idds_env['IDDS_HOST'] - return os.environ.get('IDDS_HOST', None) + if os.environ.get('IDDS_HOST', None): + return os.environ.get('IDDS_HOST', None) + return None def prepare_with_idds(self): """ @@ -1058,8 +1059,10 @@ def load(self, func_name): :raise Exception """ - with modified_environ(IDDS_IGNORE_WORKFLOW_DECORATOR='true'): - func = super(Workflow, self).load(func_name) + os.environ['IDDS_IWORKFLOW_LOAD_WORKFLOW'] = 'true' + func = super(Workflow, self).load(func_name) + del os.environ['IDDS_IWORKFLOW_LOAD_WORKFLOW'] + return func def pre_run(self): @@ -1158,7 +1161,7 @@ def workflow(func=None, *, local=False, service='idds', source_dir=None, primary return functools.partial(workflow, local=local, service=service, source_dir=source_dir, primary=primary, queue=queue, site=site, cloud=cloud, max_walltime=max_walltime, distributed=distributed, init_env=init_env) - if 'IDDS_IGNORE_WORKFLOW_DECORATOR' in os.environ: + if 'IDDS_IWORKFLOW_LOAD_WORKFLOW' in os.environ: return func @functools.wraps(func)