Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JP-3490: Allow run to check CRDS or user parameters for default overrides #192

Open
wants to merge 19 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes/192.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Allow parameter updates from CRDS or user in ``step.run`` method.
13 changes: 9 additions & 4 deletions src/stpipe/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,12 @@ def __init__(self, *args, **kwargs):
Step.__init__(self, *args, **kwargs)

# Configure all of the steps
step_parameters = kwargs.get("steps", {})
for key, val in self.step_defs.items():
cfg = self.steps.get(key)
if cfg is not None:
new_step = val.from_config_section(
cfg,
parent=self,
name=key,
config_file=self.config_file,
cfg, parent=self, name=key, config_file=self.config_file
)
else:
new_step = val(
Expand All @@ -54,6 +52,13 @@ def __init__(self, *args, **kwargs):
**kwargs.get(key, {}),
)

# Make sure explicitly passed parameters for sub-steps
# are marked as initialized
input_parameters = step_parameters.get(key, {})
for param in input_parameters:
if param in new_step._initialized:
new_step._initialized[param] = True

setattr(self, key, new_step)

@property
Expand Down
169 changes: 160 additions & 9 deletions src/stpipe/step.py
Original file line number Diff line number Diff line change
Expand Up @@ -334,14 +334,16 @@
Additional parameters to set. These will be set as member
variables on the new Step instance.
"""
self._initialized = {}
self._reference_files_used = []
# A list of logging.LogRecord emitted to the stpipe root logger
# during the most recent call to Step.run.
self._log_records = []
self._input_filename = None
self._input_dir = None
self._keywords = kws
if _validate_kwds:
self._validate_kwds = _validate_kwds
if self._validate_kwds:
spec = self.load_spec_file()
kws = config_parser.config_from_dict(
kws,
Expand All @@ -362,6 +364,11 @@
for key, val in kws.items():
setattr(self, key, val)

# Mark them as uninitialized, from the user standpoint,
# unless they were explicitly passed on instantiation
if key not in self._keywords or parent is not None:
self._initialized[key] = False

# Create a new logger for this step
self.log = log.getLogger(self.qualified_name)

Expand Down Expand Up @@ -402,6 +409,12 @@
i,
)

def __setattr__(self, key, value):
"""Override setattr to track initialization status for step parameters."""
super().__setattr__(key, value)
if not key.startswith("_"):
self._initialized[key] = True

@property
def log_records(self):
"""
Expand All @@ -413,11 +426,50 @@
"""
return self._log_records

def run(self, *args):
def run(self, *args, **kwargs):
"""
Set up and run a step process.

Run handles the generic setup and teardown that happens with
the running of each step. The real work that is unique to
each step type is done in the `process` method.

If this step was not created via a `call` or a command line process,
and if 'disable_crds_parameters' is set to False,
default parameters from CRDS are retrieved at runtime, if available.
Override parameters can also be passed as keyword arguments if desired.

The order of parameter checking and overrides is:

1. spec default value for the step
2. keyword parameters or configuration set on step initialization
3. CRDS parameters if available and if CRDS checks are not disabled
(disable_crds_parameters = False)
4. step attributes explicitly set by the user before calling run,
either on instantiation or by directly setting the attribute
after instantiation.
5. keyword parameters passed directly to the run call

Only 1 and 2 are checked if the step was created via `call`
or the command line.

Parameters
----------
disable_crds_steppars : bool, optional
If set to True, CRDS parameter checks are disabled.
If set to False, CRDS parameter checks are enabled.
If set to None, the environment variable STPIPE_DISABLE_CRDS_STEPPARS
is checked: if set to 'true', CRDS parameters checks
are disabled.

Notes
-----
Currently, the default value for disable_crds_steppars is True,
for backward compatibility with previous behavior for the `run`
function. In future builds, the default value will be None,
so that the default behavior is to check CRDS for default
parameters when possible.

"""
gc.collect()

Expand All @@ -431,6 +483,59 @@
step_result = None

self.log.info("Step %s running with args %s.", self.name, args)

# Check for explicit disable for CRDS parameters
if "disable_crds_steppars" in kwargs:
disable_crds_steppars = kwargs.pop("disable_crds_steppars")
else:
disable_crds_steppars = True
if self.parent is None and self._validate_kwds:
self.log.warning(
"CRDS parameter checks are currently disabled by default."
)
self.log.warning(
"In future builds, they will be enabled by default."
)
self.log.warning(
"To turn them on now, set 'disable_crds_steppars' to False "
"in the arguments to 'run'."
)

# Get parameters from user
parameters = None
if kwargs:
# Unset initialization for any provided override keywords
for key in kwargs:
if key in self._initialized:
self._initialized[key] = False
parameters = kwargs

# Get parameters from CRDS
if self._validate_kwds and not disable_crds_steppars:
# Get the first filename, if available
filename = None
if len(args) > 0:
filename = args[0]

# Build config from CRDS + user keywords
try:
parameters, _ = self.build_config(
melanieclarke marked this conversation as resolved.
Show resolved Hide resolved
filename, disable=disable_crds_steppars, **kwargs
)
except (NotImplementedError, FileNotFoundError, RuntimeError):
# Catch steps that cannot build a config
# (e.g. post hooks created from local functions,
# missing input files)
raise ValueError(
f"Cannot retrieve CRDS keywords for "
f"{self.name} with input {str(filename)}."
)

# Update parameters from the retrieved config + keywords
if parameters:
self._validate_parameter_updates(parameters)
self.update_pars(parameters, skip_initialized=True)

# log Step or Pipeline parameters from top level only
if self.parent is None:
self.log.info(
Expand Down Expand Up @@ -462,7 +567,7 @@

hook_args = args
for pre_hook in self._pre_hooks:
hook_results = pre_hook.run(*hook_args)
hook_results = pre_hook.run(*hook_args, disable_crds_steppars=True)

Check warning on line 570 in src/stpipe/step.py

View check run for this annotation

Codecov / codecov/patch

src/stpipe/step.py#L570

Added line #L570 was not covered by tests
if hook_results is not None:
hook_args = hook_results
args = hook_args
Expand Down Expand Up @@ -534,7 +639,9 @@

# Run the post hooks
for post_hook in self._post_hooks:
hook_results = post_hook.run(step_result)
hook_results = post_hook.run(
step_result, disable_crds_steppars=True
)
if hook_results is not None:
step_result = hook_results

Expand Down Expand Up @@ -1276,7 +1383,7 @@
) as af:
af.write_to(filename)

def update_pars(self, parameters):
def update_pars(self, parameters, skip_initialized=False):
"""Update step parameters

Only existing parameters are updated. Otherwise, new keys
Expand All @@ -1287,6 +1394,9 @@
parameters : dict
Parameters to update.

skip_initialized : bool, optional
If True, values that have been initialized are not updated.

Notes
-----
``parameters`` is presumed to have been produced by the
Expand All @@ -1299,17 +1409,27 @@
for parameter, value in parameters.items():
if parameter in existing:
if parameter != "steps":
setattr(self, parameter, value)
if (
skip_initialized
and parameter in self._initialized
and self._initialized[parameter]
):
self.log.debug(f"Skipping initialized parameter {parameter}")
else:
self.log.debug(f"Setting parameter {parameter} to {value}")
setattr(self, parameter, value)
else:
for step_name, step_parameters in value.items():
getattr(self, step_name).update_pars(step_parameters)
getattr(self, step_name).update_pars(
step_parameters, skip_initialized=skip_initialized
)
else:
self.log.debug(
"Parameter %s is not valid for step %s. Ignoring.", parameter, self
)

@classmethod
def build_config(cls, input, **kwargs): # noqa: A002
def build_config(cls, input, disable=None, **kwargs): # noqa: A002
"""Build the ConfigObj to initialize a Step

A Step config is built in the following order:
Expand All @@ -1323,6 +1443,9 @@
input : str or None
Input file

disable : bool, optional
Do not retrieve parameters from CRDS. If None, check global settings.

kwargs : dict
Keyword arguments that specify Step parameters.

Expand All @@ -1334,7 +1457,7 @@
logger_name = cls.__name__
log_cls = log.getLogger(logger_name)
if input:
config = cls.get_config_from_reference(input)
config = cls.get_config_from_reference(input, disable=disable)
else:
log_cls.info("No filename given, cannot retrieve config from CRDS")
config = config_parser.ConfigObj()
Expand Down Expand Up @@ -1381,6 +1504,34 @@

return config, config_file

def _validate_parameter_updates(self, parameter_updates):
"""Validate new config keywords without adding the full spec."""

# Strip unnecessary keys from a low level section
def _strip_keys(config_dict, keys_to_strip):
for key in keys_to_strip:
if key in config_dict:
del config_dict[key]

# Recursively format keywords from the full config
def _format_new_keywords(config_dict, full_config, keys_to_strip):
_strip_keys(config_dict, keys_to_strip)
for key in config_dict:
if key == "steps":
for step_name, step_parameters in config_dict[key].items():
_format_new_keywords(
step_parameters, full_config[key][step_name], keys_to_strip
)
else:
config_dict[key] = full_config[key]

skip = {"class", "logcfg", "name", "config_file"}
spec = self.load_spec_file()
_strip_keys(parameter_updates, skip)
formatted = config_parser.config_from_dict(parameter_updates, spec)

_format_new_keywords(parameter_updates, formatted, skip)


# #########
# Utilities
Expand Down
Loading