Skip to content

Commit

Permalink
Merge pull request #300 from HSF/revert-299-dev_lino
Browse files Browse the repository at this point in the history
Revert "Env var to ignore work / workflow decorators"
  • Loading branch information
wguanicedew authored Apr 12, 2024
2 parents a837287 + d5293bd commit f93f77d
Show file tree
Hide file tree
Showing 6 changed files with 24 additions and 65 deletions.
32 changes: 0 additions & 32 deletions common/lib/idds/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import tarfile
import time
# import traceback
import contextlib

from enum import Enum
from functools import wraps
Expand Down Expand Up @@ -943,34 +942,3 @@ def idds_mask(dict_):
else:
ret[k] = dict_[k]
return ret


@contextlib.contextmanager
def modified_environ(*remove, **update):
"""
Temporarily updates the ``os.environ`` dictionary in-place.
The ``os.environ`` dictionary is updated in-place so that the modification
is sure to work in all situations.
:param remove: Environment variables to remove.
:param update: Dictionary of environment variables and values to add/update.
"""
env = os.environ
update = update or {}
remove = remove or []

# List of environment variables being updated or removed.
stomped = (set(update.keys()) | set(remove)) & set(env.keys())
# Environment variables and values to restore on exit.
update_after = {k: env[k] for k in stomped}
# Environment variables and values to remove on exit.
remove_after = frozenset(k for k in update if k not in env)

try:
env.update(update)
[env.pop(k, None) for k in remove]
yield
finally:
env.update(update_after)
[env.pop(k) for k in remove_after]
15 changes: 0 additions & 15 deletions main/etc/sql/postgres_partition.sql

This file was deleted.

15 changes: 8 additions & 7 deletions main/etc/sql/postgresql.sql
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,8 @@ CREATE TABLE doma_idds.contents_ext (
memory_leak VARCHAR(10),
memory_leak_x2 VARCHAR(10),
job_label VARCHAR(20),
CONSTRAINT "CONTENTS_EXT_PK" PRIMARY KEY (content_id, request_id)
) PARTITION BY RANGE (request_id) ;
CONSTRAINT "CONTENTS_EXT_PK" PRIMARY KEY (content_id)
);

CREATE INDEX "CONTENTS_EXT_RTW_IDX" ON doma_idds.contents_ext (request_id, transform_id, workload_id);

Expand Down Expand Up @@ -500,13 +500,13 @@ CREATE TABLE doma_idds.contents (
accessed_at TIMESTAMP WITHOUT TIME ZONE,
expired_at TIMESTAMP WITHOUT TIME ZONE,
content_metadata VARCHAR(1000),
CONSTRAINT "CONTENTS_PK_TEST" PRIMARY KEY (content_id, request_id),
CONSTRAINT "CONTENT_ID_UQ_TEST" UNIQUE (transform_id, coll_id, request_id, map_id, sub_map_id, dep_sub_map_id, content_relation_type, name_md5, scope_name_md5, min_id, max_id),
CONSTRAINT "CONTENTS_TRANSFORM_ID_FK" FOREIGN KEY(transform_id) REFERENCES doma_idds.transforms (transform_id),
CONSTRAINT "CONTENTS_PK" PRIMARY KEY (content_id),
CONSTRAINT "CONTENT_ID_UQ" UNIQUE (transform_id, coll_id, map_id, sub_map_id, dep_sub_map_id, content_relation_type, name_md5, scope_name_md5, min_id, max_id),
CONSTRAINT "CONTENTS_TRANSFORM_ID_FK" FOREIGN KEY(transform_id) REFERENCES doma_idds.transforms (transform_id),
CONSTRAINT "CONTENTS_COLL_ID_FK" FOREIGN KEY(coll_id) REFERENCES doma_idds.collections (coll_id),
CONSTRAINT "CONTENTS_STATUS_ID_NN" CHECK (status IS NOT NULL),
CONSTRAINT "CONTENTS_COLL_ID_NN" CHECK (coll_id IS NOT NULL)
) PARTITION BY RANGE (request_id) ;
);

CREATE INDEX "CONTENTS_STATUS_UPDATED_IDX" ON doma_idds.contents (status, locking, updated_at, created_at);

Expand All @@ -518,7 +518,8 @@ CREATE INDEX "CONTENTS_REQ_TF_COLL_IDX" ON doma_idds.contents (request_id, trans

CREATE INDEX "CONTENTS_TF_IDX" ON doma_idds.contents (transform_id, request_id, coll_id, map_id, content_relation_type);

CREATE INDEX "CONTENTS_ID_NAME_IDX" ON doma_idds.contents (coll_id, scope, md5('name'), status);
CREATE INDEX "CONTENTS_ID_NAME_IDX" ON doma_idds.contents (coll_id, scope, md5('name');
, status);


SET search_path TO doma_idds;
Expand Down
2 changes: 1 addition & 1 deletion website/data/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ <h2>Intelligent</h2>
<div class="inner">
<header>
<p><b>Fine grained data transformation</b></p>
<p><b>Remote data transformation/reduction</b></p>
<p><b>Remtoe data transformation/reduction</b></p>
<p><b>On-demand production of analysis format data transformation</b></p>
<h2>Transformation</h2>
</header>
Expand Down
10 changes: 6 additions & 4 deletions workflow/lib/idds/iworkflow/work.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from idds.common import exceptions
from idds.common.constants import WorkflowType, TransformStatus
from idds.common.imports import get_func_name
from idds.common.utils import setup_logging, json_dumps, json_loads, encode_base64, modified_environ
from idds.common.utils import setup_logging, json_dumps, json_loads, encode_base64
from .asyncresult import AsyncResult, MapResult
from .base import Base, Context
from .workflow import WorkflowCanvas
Expand Down Expand Up @@ -779,8 +779,10 @@ def load(self, func_name):
:raise Exception
"""
with modified_environ(IDDS_IGNORE_WORK_DECORATOR='true'):
func = super(Work, self).load(func_name)
os.environ['IDDS_IWORKFLOW_LOAD_WORK'] = 'true'
func = super(Work, self).load(func_name)
del os.environ['IDDS_IWORKFLOW_LOAD_WORK']

return func

def pre_run(self):
Expand Down Expand Up @@ -920,7 +922,7 @@ def work(func=None, *, map_results=False, lazy=False, init_env=None):
if func is None:
return functools.partial(work, map_results=map_results, lazy=lazy, init_env=init_env)

if 'IDDS_IGNORE_WORK_DECORATOR' in os.environ:
if 'IDDS_IWORKFLOW_LOAD_WORK' in os.environ:
return func

@functools.wraps(func)
Expand Down
15 changes: 9 additions & 6 deletions workflow/lib/idds/iworkflow/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@

# from idds.common import exceptions
from idds.common.constants import WorkflowType
from idds.common.utils import setup_logging, create_archive_file, json_dumps, json_loads, encode_base64,\
modified_environ
from idds.common.utils import setup_logging, create_archive_file, json_dumps, json_loads, encode_base64
from .asyncresult import AsyncResult
from .base import Base, Context

Expand Down Expand Up @@ -639,7 +638,9 @@ def get_idds_env(self):
def get_idds_server(self):
if 'IDDS_HOST' in self._idds_env:
return self._idds_env['IDDS_HOST']
return os.environ.get('IDDS_HOST', None)
if os.environ.get('IDDS_HOST', None):
return os.environ.get('IDDS_HOST', None)
return None

def prepare_with_idds(self):
"""
Expand Down Expand Up @@ -1058,8 +1059,10 @@ def load(self, func_name):
:raise Exception
"""
with modified_environ(IDDS_IGNORE_WORKFLOW_DECORATOR='true'):
func = super(Workflow, self).load(func_name)
os.environ['IDDS_IWORKFLOW_LOAD_WORKFLOW'] = 'true'
func = super(Workflow, self).load(func_name)
del os.environ['IDDS_IWORKFLOW_LOAD_WORKFLOW']

return func

def pre_run(self):
Expand Down Expand Up @@ -1158,7 +1161,7 @@ def workflow(func=None, *, local=False, service='idds', source_dir=None, primary
return functools.partial(workflow, local=local, service=service, source_dir=source_dir, primary=primary, queue=queue, site=site, cloud=cloud,
max_walltime=max_walltime, distributed=distributed, init_env=init_env)

if 'IDDS_IGNORE_WORKFLOW_DECORATOR' in os.environ:
if 'IDDS_IWORKFLOW_LOAD_WORKFLOW' in os.environ:
return func

@functools.wraps(func)
Expand Down

0 comments on commit f93f77d

Please sign in to comment.