diff --git a/src/bluish/actions/base.py b/src/bluish/actions/base.py index bc6166e..8a2537f 100644 --- a/src/bluish/actions/base.py +++ b/src/bluish/actions/base.py @@ -2,6 +2,7 @@ import bluish.contexts.step import bluish.process +from bluish.contexts import Definition from bluish.logging import debug @@ -15,9 +16,9 @@ def __init__(self, param: str): super().__init__(f"Missing required attribute: {param}") -def _key_exists(key: str, values: dict) -> bool: +def _key_exists(key: str, attrs: Definition) -> bool: """Checks if a key (or pipe-separated alternative keys) exists in a dictionary.""" - return ("|" in key and any(i in values for i in key.split("|"))) or key in values + return ("|" in key and any(i in attrs for i in key.split("|"))) or key in attrs class Action: @@ -35,7 +36,7 @@ def execute( self, step: bluish.contexts.step.StepContext ) -> bluish.process.ProcessResult: for attr in self.REQUIRED_ATTRS: - if not _key_exists(attr, step.attrs.__dict__): + if not _key_exists(attr, step.attrs): raise RequiredAttributeError(attr) if self.REQUIRED_INPUTS and not step.attrs._with: diff --git a/src/bluish/actions/core.py b/src/bluish/actions/core.py index dd5ebc7..9898bc8 100644 --- a/src/bluish/actions/core.py +++ b/src/bluish/actions/core.py @@ -17,7 +17,7 @@ class RunCommand(bluish.actions.base.Action): def run( self, step: bluish.contexts.step.StepContext ) -> bluish.process.ProcessResult: - env = ChainMap(step.env, step.job.env, step.workflow.env) # type: ignore + env = ChainMap(step.env, step.parent.env, step.parent.parent.env) # type: ignore if env: info("env:") @@ -39,7 +39,7 @@ def run( if echo_commands: info(command) - return step.job.exec(command, step, env=env, stream_output=echo_output) # type: ignore + return step.parent.exec(command, step, env=env, stream_output=echo_output) # type: ignore class ExpandTemplate(bluish.actions.base.Action): @@ -51,7 +51,7 @@ def run( ) -> bluish.process.ProcessResult: inputs = step.inputs - job = cast(bluish.contexts.job.JobContext, step.job) + job = cast(bluish.contexts.job.JobContext, step.parent) template_content: str if "input_file" in inputs: @@ -98,8 +98,8 @@ def run( destination_file = inputs.get("destination_file") assert destination_file is not None - - job = cast(bluish.contexts.job.JobContext, step.job) + + job = cast(bluish.contexts.job.JobContext, step.parent) info(f"Writing file to: {destination_file}...") try: @@ -132,7 +132,7 @@ def run( source_file = inputs["source_file"] info(f"Reading file: {source_file}...") try: - job = cast(bluish.contexts.job.JobContext, step.job) + job = cast(bluish.contexts.job.JobContext, step.parent) raw_contents = job.read_file(source_file) except IOError as e: error(f"Failed to read file: {str(e)}") diff --git a/src/bluish/actions/docker.py b/src/bluish/actions/docker.py index aa3f74a..a1c4ac6 100644 --- a/src/bluish/actions/docker.py +++ b/src/bluish/actions/docker.py @@ -32,7 +32,7 @@ def _is_valid_docker_id(id: str) -> bool: def run_and_get_pid(command: str, step: bluish.contexts.step.StepContext) -> str: - job = cast(bluish.contexts.job.JobContext, step.job) + job = cast(bluish.contexts.job.JobContext, step.parent) result = job.exec(command, step) return result.stdout.strip() if result.returncode == 0 else "" @@ -43,7 +43,7 @@ def docker_ps( pid: str | None = None, ) -> bluish.process.ProcessResult: filter = f"name={name}" if name else f"id={pid}" - job = cast(bluish.contexts.job.JobContext, step.job) + job = cast(bluish.contexts.job.JobContext, step.parent) return job.exec(f"docker ps -f {filter} --all --quiet", step) @@ -70,7 +70,7 @@ def run( ) info(f"Docker login:\n -> {protected_command}") - job = cast(bluish.contexts.job.JobContext, step.job) + job = cast(bluish.contexts.job.JobContext, step.parent) login_result = job.exec(command, step, stream_output=True) if login_result.failed: error(f"Login failed: {login_result.error}") @@ -86,7 +86,7 @@ def run( command = "docker logout" if step.get_inherited_attr("echo_commands", True): info("Logging out of Docker...") - job = cast(bluish.contexts.job.JobContext, step.job) + job = cast(bluish.contexts.job.JobContext, step.parent) return job.exec(command, step) @@ -109,7 +109,7 @@ def run( if step.get_inherited_attr("echo_commands", True): info(f"Building image:\n -> {command}") - job = cast(bluish.contexts.job.JobContext, step.job) + job = cast(bluish.contexts.job.JobContext, step.parent) build_result = job.exec(command, step, stream_output=True) if build_result.failed: error(f"Failed to build image: {build_result.error}") @@ -184,7 +184,7 @@ def run( for flag in ["quiet"]: options += _build_flag(f"--{flag}", inputs.get(flag)) - job = cast(bluish.contexts.job.JobContext, step.job) + job = cast(bluish.contexts.job.JobContext, step.parent) run_result = job.exec(f"docker run {options} {image}", step) if run_result.failed: error(f"Failed to start container with image {image}: {run_result.error}") @@ -216,7 +216,7 @@ def run( remove_container = inputs.get("remove", False) stop_container = True - job = cast(bluish.contexts.job.JobContext, step.job) + job = cast(bluish.contexts.job.JobContext, step.parent) info(f"Stopping container with {input_attr}...") @@ -328,7 +328,7 @@ def run( if echo_commands: info(line) - job = cast(bluish.contexts.job.JobContext, step.job) + job = cast(bluish.contexts.job.JobContext, step.parent) result = job.exec(f"docker exec {options} {container_pid} {line}", step) output += result.stdout @@ -355,7 +355,7 @@ def run( inputs = step.inputs name = inputs["name"] - job = cast(bluish.contexts.job.JobContext, step.job) + job = cast(bluish.contexts.job.JobContext, step.parent) info(f"Creating network {name}...") diff --git a/src/bluish/actions/git.py b/src/bluish/actions/git.py index c36f8e5..ea41ef1 100644 --- a/src/bluish/actions/git.py +++ b/src/bluish/actions/git.py @@ -18,7 +18,7 @@ def run_git_command( if key_file: preamble = f"export GIT_SSH_COMMAND='ssh -i {key_file} -o IdentitiesOnly=yes -o StrictHostKeychecking=no';" - job = cast(bluish.contexts.job.JobContext, step.job) + job = cast(bluish.contexts.job.JobContext, step.parent) return job.exec(f"{preamble} {command}", step) @@ -30,7 +30,7 @@ def prepare_environment( "openssh-client": "ssh", } - job = cast(bluish.contexts.job.JobContext, step.job) + job = cast(bluish.contexts.job.JobContext, step.parent) required_packages = [ package @@ -93,7 +93,7 @@ def run( # Update the current job working dir to the newly cloned repo info(f"Setting working directory to: {repo_name}...") wd = step.get_inherited_attr("working_directory", ".") - step.job.set_attr("working_directory", f"{wd}/{repo_name}") + step.parent.set_attr("working_directory", f"{wd}/{repo_name}") # type: ignore return clone_result finally: diff --git a/src/bluish/actions/linux.py b/src/bluish/actions/linux.py index 54db42e..d47dd7d 100644 --- a/src/bluish/actions/linux.py +++ b/src/bluish/actions/linux.py @@ -19,7 +19,7 @@ def run( info(f"Installing packages {package_str}...") - job = cast(bluish.contexts.job.JobContext, step.job) + job = cast(bluish.contexts.job.JobContext, step.parent) result = bluish.process.install_package( job.runs_on_host, step.inputs["packages"], flavor=flavor diff --git a/src/bluish/actions/macos.py b/src/bluish/actions/macos.py index 0a711e4..2c749b3 100644 --- a/src/bluish/actions/macos.py +++ b/src/bluish/actions/macos.py @@ -17,7 +17,7 @@ def run(self, step: bluish.contexts.step.StepContext) -> ProcessResult: info(f"Installing packages {package_str}...") - job = cast(bluish.contexts.job.JobContext, step.job) + job = cast(bluish.contexts.job.JobContext, step.parent) result = install_package( job.runs_on_host, step.inputs["packages"], flavor=flavor diff --git a/src/bluish/app.py b/src/bluish/app.py index b5049be..db8ee0a 100644 --- a/src/bluish/app.py +++ b/src/bluish/app.py @@ -9,6 +9,7 @@ from flask import Flask, abort, jsonify, request from bluish.__main__ import PROJECT_VERSION +from bluish.contexts import WorkflowDefinition from bluish.contexts.workflow import WorkflowContext from bluish.core import ( init_commands, @@ -80,7 +81,8 @@ def workflow_from_file(file: str) -> WorkflowContext: if not yaml_contents: fatal("No workflow file found.") - return WorkflowContext(yaml.safe_load(yaml_contents)) + definition = WorkflowDefinition(yaml.safe_load(yaml_contents)) + return WorkflowContext(definition) @click.command("blu") diff --git a/src/bluish/contexts/__init__.py b/src/bluish/contexts/__init__.py index 7312253..b6c9f94 100644 --- a/src/bluish/contexts/__init__.py +++ b/src/bluish/contexts/__init__.py @@ -7,46 +7,86 @@ import bluish.process from bluish.logging import info from bluish.redacted_string import RedactedString +from bluish.schemas import ( + JOB_SCHEMA, + STEP_SCHEMA, + WORKFLOW_SCHEMA, + get_extra_properties, + validate_schema, +) from bluish.utils import safe_string TResult = TypeVar("TResult") -class DictAttrs: - def __init__(self, definition: dict[str, Any]): - self.__dict__.update(definition) +class Definition: + SCHEMA: dict[str, Any] = {} - def __getattr__(self, name: str) -> Any: - if name == "_with": - return getattr(self, "with") - elif name == "_if": - return getattr(self, "if") + def __init__(self, attrs: dict[str, Any]): + self.__dict__["_attrs"] = attrs + remaining = self._validate_attrs(attrs) + if remaining: + raise ValueError(f"Invalid attributes: {remaining.keys()}") + + @property + def attrs(self) -> dict[str, Any]: + return self._attrs + + def get(self, name: str, default: Any = None) -> Any: + return self.__dict__["_attrs"].get(name, default) + + def _validate_attrs(self, attrs: dict[str, Any]) -> dict[str, Any]: + if self.SCHEMA: + validate_schema(self.SCHEMA, attrs) + return get_extra_properties(self.SCHEMA, attrs) else: - return None + return attrs + + def ensure_property(self, name: str, default_value: Any) -> None: + if name.startswith("_"): + name = name[1:] + if name not in self.attrs: + self.__dict__["_attrs"][name] = default_value + + def __getattr__(self, name: str) -> Any: + if name == "attrs": + return self.__dict__["_attrs"] + if name.startswith("_"): + name = name[1:] + return self.__dict__["_attrs"].get(name) def __setattr__(self, name: str, value: Any) -> None: - if name == "_with": - setattr(self, "with", value) - elif name == "_if": - setattr(self, "if", value) - else: - self.__dict__[name] = value + if name.startswith("_"): + name = name[1:] + self.__dict__["_attrs"][name] = value def __contains__(self, name: str) -> bool: - return name in self.__dict__ + if name.startswith("_"): + name = name[1:] + return name in self.__dict__["_attrs"] - def ensure_property(self, name: str, default_value: Any) -> None: - value = getattr(self, name, None) - if value is None: - setattr(self, name, default_value) + +class WorkflowDefinition(Definition): + SCHEMA = WORKFLOW_SCHEMA + pass + + +class JobDefinition(Definition): + SCHEMA = JOB_SCHEMA + pass + + +class StepDefinition(Definition): + SCHEMA = STEP_SCHEMA + pass class ContextNode: NODE_TYPE: str = "" - def __init__(self, parent: Optional["ContextNode"], definition: dict[str, Any]): + def __init__(self, parent: Optional["ContextNode"], definition: Definition): self.parent = parent - self.attrs = DictAttrs(definition) + self.attrs = definition self.attrs.ensure_property("env", {}) self.attrs.ensure_property("var", {}) @@ -55,42 +95,13 @@ def __init__(self, parent: Optional["ContextNode"], definition: dict[str, Any]): self.env = dict(self.attrs.env) self.var = dict(self.attrs.var) self.outputs: dict[str, Any] = {} + self.inputs: dict[str, Any] = {} self.result = bluish.process.ProcessResult() self.failed = False self.status = bluish.core.ExecutionStatus.PENDING self._expression_parser: Callable[[str], Any] | None = None - @property - def step_or_job(self) -> "InputOutputNode": - if isinstance(self, InputOutputNode): - return self - raise ValueError(f"Can't find step or job in context of type: {self.NODE_TYPE}") - - @property - def step(self) -> "ContextNode": - if self.NODE_TYPE == "step": - return self - raise ValueError(f"Can't find step in context of type: {self.NODE_TYPE}") - - @property - def job(self) -> "ContextNode": - if self.NODE_TYPE == "job": - return self - elif self.NODE_TYPE == "step": - return self.parent # type: ignore - raise ValueError(f"Can't find job in context of type: {self.NODE_TYPE}") - - @property - def workflow(self) -> "ContextNode": - if self.NODE_TYPE == "workflow": - return self - elif self.NODE_TYPE == "job": - return self.parent # type: ignore - elif self.NODE_TYPE == "step": - return self.parent.parent # type: ignore - raise ValueError(f"Can't find workflow in context of type: {self.NODE_TYPE}") - @property def display_name(self) -> str: return self.attrs.name if self.attrs.name else self.id @@ -144,12 +155,9 @@ def get_inherited_attr( class InputOutputNode(ContextNode): - def __init__(self, parent: ContextNode, definition: dict[str, Any]): + def __init__(self, parent: ContextNode, definition: Definition): super().__init__(parent, definition) - self.inputs: dict[str, Any] = {} - self.outputs: dict[str, Any] = {} - self.sensitive_inputs: set[str] = {"password", "token"} def log_inputs(self) -> None: @@ -179,6 +187,36 @@ class VariableExpandError(Exception): ValueResult = namedtuple("ValueResult", ["value", "contains_secrets"]) +def _step_or_job(ctx: ContextNode) -> InputOutputNode: + if isinstance(ctx, InputOutputNode): + return ctx + raise ValueError(f"Can't find step or job in context of type: {ctx.NODE_TYPE}") + + +def _step(ctx: ContextNode) -> ContextNode: + if ctx.NODE_TYPE == "step": + return ctx + raise ValueError(f"Can't find step in context of type: {ctx.NODE_TYPE}") + + +def _job(ctx: ContextNode) -> ContextNode: + if ctx.NODE_TYPE == "job": + return ctx + elif ctx.NODE_TYPE == "step": + return ctx.parent # type: ignore + raise ValueError(f"Can't find job in context of type: {ctx.NODE_TYPE}") + + +def _workflow(ctx: ContextNode) -> ContextNode: + if ctx.NODE_TYPE == "workflow": + return ctx + elif ctx.NODE_TYPE == "job": + return ctx.parent # type: ignore + elif ctx.NODE_TYPE == "step": + return ctx.parent.parent # type: ignore + raise ValueError(f"Can't find workflow in context of type: {ctx.NODE_TYPE}") + + def _try_get_value(ctx: ContextNode, name: str, raw: bool = False) -> Any: import bluish.contexts.job import bluish.contexts.step @@ -234,44 +272,44 @@ def prepare_value(value: Any) -> Any: return prepare_value(current.var[varname]) current = current.parent elif root == "workflow": - return _try_get_value(ctx.workflow, varname, raw) + return _try_get_value(_workflow(ctx), varname, raw) elif root == "secrets": - wf = cast(bluish.contexts.workflow.WorkflowContext, ctx.workflow) + wf = cast(bluish.contexts.workflow.WorkflowContext, _workflow(ctx)) if varname in wf.secrets: return prepare_value( RedactedString(cast(str, wf.secrets[varname]), "********") ) elif root == "jobs": - wf = cast(bluish.contexts.workflow.WorkflowContext, ctx.workflow) + wf = cast(bluish.contexts.workflow.WorkflowContext, _workflow(ctx)) job_id, varname = varname.split(".", maxsplit=1) job = wf.jobs.get(job_id) if not job: raise ValueError(f"Job {job_id} not found") return _try_get_value(job, varname, raw) elif root == "job": - return _try_get_value(ctx.job, varname, raw) + return _try_get_value(_job(ctx), varname, raw) elif root == "steps": - job = cast(bluish.contexts.job.JobContext, ctx.job) + job = cast(bluish.contexts.job.JobContext, _job(ctx)) step_id, varname = varname.split(".", maxsplit=1) step = job.steps.get(step_id) if not step: raise ValueError(f"Step {step_id} not found") return _try_get_value(step, varname, raw) elif root == "matrix": - job = cast(bluish.contexts.job.JobContext, ctx.job) + job = cast(bluish.contexts.job.JobContext, _job(ctx)) if varname in job.matrix: return prepare_value(job.matrix[varname]) elif root == "step": - return _try_get_value(ctx.step, varname, raw) + return _try_get_value(_step(ctx), varname, raw) elif root == "inputs": - node = ctx.step_or_job + node = _step_or_job(ctx) if varname in node.inputs: if varname in node.sensitive_inputs: return prepare_value(RedactedString(node.inputs[varname], "********")) else: return prepare_value(node.inputs[varname]) elif root == "outputs": - node = ctx.step_or_job + node = _step_or_job(ctx) if varname in node.outputs: return prepare_value(node.outputs[varname]) @@ -298,31 +336,31 @@ def _try_set_value(ctx: "ContextNode", name: str, value: str) -> bool: ctx.var[varname] = value return True elif root == "workflow": - return _try_set_value(ctx.workflow, varname, value) + return _try_set_value(_workflow(ctx), varname, value) elif root == "jobs": - wf = cast(bluish.contexts.workflow.WorkflowContext, ctx.workflow) + wf = cast(bluish.contexts.workflow.WorkflowContext, _workflow(ctx)) job_id, varname = varname.split(".", maxsplit=1) job = wf.jobs.get(job_id) if not job: raise ValueError(f"Job {job_id} not found") return _try_set_value(job, varname, value) elif root == "job": - return _try_set_value(ctx.job, varname, value) + return _try_set_value(_job(ctx), varname, value) elif root == "steps": - job = cast(bluish.contexts.job.JobContext, ctx.job) + job = cast(bluish.contexts.job.JobContext, _job(ctx)) step_id, varname = varname.split(".", maxsplit=1) step = job.steps.get(step_id) if not step: raise ValueError(f"Step {step_id} not found") return _try_set_value(step, varname, value) elif root == "step": - return _try_set_value(ctx.step, varname, value) + return _try_set_value(_step(ctx), varname, value) elif root == "inputs": - step = cast(bluish.contexts.step.StepContext, ctx.step) + step = cast(bluish.contexts.step.StepContext, _step(ctx)) step.inputs[varname] = value return True elif root == "outputs": - node = ctx.step_or_job + node = _step_or_job(ctx) node.outputs[varname] = value return True @@ -363,7 +401,7 @@ def can_dispatch(context: InputOutputNode) -> bool: condition = context.attrs._if if "${{" not in condition: condition = "${{" + condition + "}}" - + print(condition) return bool(context.expand_expr(condition)) @@ -374,7 +412,7 @@ def _read_file(ctx: ContextNode, file_path: str) -> bytes: import bluish.contexts.job - job = cast(bluish.contexts.job.JobContext, ctx.job) + job = cast(bluish.contexts.job.JobContext, _job(ctx)) result = job.exec(f"base64 -i '{file_path}'", ctx) if result.failed: raise IOError(f"Failure reading from {file_path}: {result.error}") @@ -387,7 +425,7 @@ def _write_file(ctx: ContextNode, file_path: str, content: bytes) -> None: import bluish.contexts.job - job = cast(bluish.contexts.job.JobContext, ctx.job) + job = cast(bluish.contexts.job.JobContext, _job(ctx)) b64 = base64.b64encode(content).decode() result = job.exec(f"echo {b64} | base64 -di - > {file_path}", ctx) diff --git a/src/bluish/contexts/job.py b/src/bluish/contexts/job.py index 065d1e7..3f8a631 100644 --- a/src/bluish/contexts/job.py +++ b/src/bluish/contexts/job.py @@ -13,7 +13,10 @@ class JobContext(contexts.InputOutputNode): NODE_TYPE = "job" def __init__( - self, parent: contexts.ContextNode, step_id: str, definition: dict[str, Any] + self, + parent: contexts.ContextNode, + step_id: str, + definition: contexts.Definition, ): import bluish.contexts.step @@ -39,13 +42,10 @@ def __init__( } self.steps: dict[str, bluish.contexts.step.StepContext] = {} - for i, step in enumerate(self.attrs.steps): - if "id" not in step: - step["id"] = f"step_{i+1}" - step_id = step["id"] - step = bluish.contexts.step.StepContext(self, step) - if not step.id: - step.id = step_id + for i, step_dict in enumerate(self.attrs.steps): + step_def = contexts.StepDefinition(step_dict) + step_def.ensure_property("id", f"step_{i+1}") + step = bluish.contexts.step.StepContext(self, step_def) self.steps[step_id] = step def dispatch(self) -> bluish.process.ProcessResult | None: @@ -58,7 +58,7 @@ def dispatch(self) -> bluish.process.ProcessResult | None: self.expand_expr(self.attrs.runs_on) ) else: - self.runs_on_host = self.workflow.runs_on_host # type: ignore + self.runs_on_host = self.parent.runs_on_host # type: ignore try: if self.matrix: diff --git a/src/bluish/contexts/step.py b/src/bluish/contexts/step.py index 02bcb91..f0d62f9 100644 --- a/src/bluish/contexts/step.py +++ b/src/bluish/contexts/step.py @@ -1,5 +1,3 @@ -from typing import Any - import bluish.actions import bluish.contexts as contexts import bluish.core @@ -10,14 +8,14 @@ class StepContext(contexts.InputOutputNode): NODE_TYPE = "step" - def __init__(self, parent: contexts.ContextNode, definition: dict[str, Any]): + def __init__(self, parent: contexts.ContextNode, definition: contexts.Definition): super().__init__(parent, definition) self.attrs.ensure_property("name", "") self.attrs.ensure_property("uses", "") self.attrs.ensure_property("continue_on_error", False) self.attrs.ensure_property("shell", bluish.process.DEFAULT_SHELL) - + if bluish.actions.get_action(self.attrs.uses) is None: raise ValueError(f"Unknown action: {self.attrs.uses}") diff --git a/src/bluish/contexts/workflow.py b/src/bluish/contexts/workflow.py index 5a0bad3..689a327 100644 --- a/src/bluish/contexts/workflow.py +++ b/src/bluish/contexts/workflow.py @@ -14,7 +14,7 @@ class WorkflowContext(bluish.contexts.ContextNode): NODE_TYPE = "workflow" - def __init__(self, definition: dict[str, Any]) -> None: + def __init__(self, definition: bluish.contexts.Definition) -> None: super().__init__(None, definition) self.attrs.ensure_property("var", {}) @@ -36,7 +36,7 @@ def __init__(self, definition: dict[str, Any]) -> None: } self.jobs = { - k: bluish.contexts.job.JobContext(self, k, v) + k: bluish.contexts.job.JobContext(self, k, bluish.contexts.JobDefinition(v)) for k, v in self.attrs.jobs.items() } self.var = dict(self.attrs.var) diff --git a/src/bluish/schemas.py b/src/bluish/schemas.py new file mode 100644 index 0000000..ce61ada --- /dev/null +++ b/src/bluish/schemas.py @@ -0,0 +1,186 @@ +from typing import Any, Union + +KV = { + "type": dict, + "key_schema": {"type": str}, + "value_schema": {"type": str}, +} + +STR_LIST = { + "type": list, + "item_schema": str, +} + +STEP_SCHEMA = { + "type": dict, + "properties": { + "name": [str, None], + "env": [KV, None], + "var": [KV, None], + "secrets": [KV, None], + "secrets_file": [str, None], + "env_file": [str, None], + "uses": [str, None], + "run": [str, None], + "with": [KV, None], + "if": [str, None], + }, +} + +JOB_SCHEMA = { + "type": dict, + "properties": { + "name": [str, None], + "env": [KV, None], + "var": [KV, None], + "secrets": [KV, None], + "secrets_file": [str, None], + "env_file": [str, None], + "runs_on": [str, None], + "depends_on": [STR_LIST, None], + "continue_on_error": [bool, None], + "steps": { + "type": list, + "item_schema": STEP_SCHEMA, + }, + }, +} + +WORKFLOW_SCHEMA = { + "type": dict, + "properties": { + "name": [str, None], + "env": [KV, None], + "var": [KV, None], + "secrets": [KV, None], + "secrets_file": [str, None], + "env_file": [str, None], + "runs_on": [str, None], + "jobs": { + "type": dict, + "key_schema": str, + "value_schema": JOB_SCHEMA, + }, + }, +} + + +type_def = Union[type, dict, list, None] + + +def _get_type_repr(t: type_def | None) -> str: + if t is None: + return "None" + elif isinstance(t, list): + return " | ".join(_get_type_repr(tt) for tt in t) + elif isinstance(t, dict): + return f"{t['type']}" + else: + return f"{t}" + + +def _find_type(value: Any, t: type_def | None) -> dict | type | None: + if t is None: + return None + elif isinstance(t, list): + return next((tt for tt in t if _find_type(value, tt)), None) # type: ignore + elif isinstance(t, dict): + if t["type"] is Any: + return t + return t if type(value) is t["type"] else None + else: + if t is Any: + return t # type: ignore + return t if type(value) is t else None + + +def _is_required(t: type_def | None) -> bool: + if t is None: + return False + elif isinstance(t, list): + return all(_is_required(tt) for tt in t) + elif isinstance(t, dict): + return t.get("required", True) + else: + return True + + +def validate_schema( + schema: type_def, data: Any, reject_extra_keys: bool = False +) -> None: + """ + Validate a data structure against a schema. + + >>> validate_schema({"type": str}, "hello") + >>> validate_schema({"type": str}, 42) + Traceback (most recent call last): + ... + ValueError: 42 is not any of the allowed types: + >>> validate_schema({"type": str}, {"hello": "world"}) + Traceback (most recent call last): + ... + ValueError: {'hello': 'world'} is not any of the allowed types: + >>> validate_schema({"type": dict, "key_schema": {"type": str}, "value_schema": {"type": str}}, {"hello": "world"}) + >>> validate_schema({"type": dict, "key_schema": {"type": str}, "value_schema": {"type": str}}, {"hello": 42}) + Traceback (most recent call last): + ... + ValueError: 42 is not any of the allowed types: + >>> validate_schema({"type": list, "item_schema": {"type": str}}, ["hello", "world"]) + >>> validate_schema({"type": list, "item_schema": {"type": str}}, ["hello", 42]) + Traceback (most recent call last): + ... + ValueError: 42 is not any of the allowed types: + """ + + type_def = _find_type(data, schema) + if type_def is None: + raise ValueError( + f"{data} is not any of the allowed types: {_get_type_repr(schema)}" + ) + + if isinstance(type_def, type) or type_def is Any: + return + + if type_def["type"] == dict: + assert isinstance(data, dict) + if "key_schema" in type_def: + key_schema = type_def["key_schema"] + for key in data.keys(): + validate_schema(key_schema, key) + if "value_schema" in type_def: + value_schema = type_def["value_schema"] + for val in data.values(): + validate_schema(value_schema, val) + if "properties" in type_def: + properties: dict = type_def["properties"] + for prop, tdef in properties.items(): + if prop not in data: + if _is_required(tdef): + raise ValueError(f"Missing required key: {prop}") + continue + validate_schema(tdef, data[prop]) + + if reject_extra_keys and "properties" in type_def: + extra_keys = set(data.keys()) - set(type_def["properties"].keys()) + if extra_keys: + raise ValueError(f"Extra keys: {extra_keys}") + elif type_def["type"] == list: + assert isinstance(data, list) + item_schema = type_def["item_schema"] + for item in data: + validate_schema(item_schema, item) + elif ( + type_def["type"] not in (str, int, float, bool) and type_def["type"] is not Any + ): + raise ValueError(f"Invalid type: {type_def['type']}") + + +def get_extra_properties(schema: type_def, data: dict) -> dict: + """ + Get the properties in the data not present in the schema properties. + """ + if type_def is Any or not isinstance(schema, dict) or not isinstance(data, dict): + return {} + + properties = schema["properties"].keys() + return {k: v for k, v in data.items() if k not in properties} diff --git a/test/test_core.py b/test/test_core.py index 7fc4bdf..381c406 100644 --- a/test/test_core.py +++ b/test/test_core.py @@ -1,11 +1,11 @@ import logging from io import FileIO -from bluish.actions.base import RequiredAttributeError, RequiredInputError -from bluish.contexts import CircularDependencyError from test.utils import create_workflow import pytest +from bluish.actions.base import RequiredAttributeError, RequiredInputError +from bluish.contexts import CircularDependencyError from bluish.core import ( ExecutionStatus, init_commands, diff --git a/test/test_schemas.py b/test/test_schemas.py new file mode 100644 index 0000000..d179aad --- /dev/null +++ b/test/test_schemas.py @@ -0,0 +1,115 @@ + +from typing import Any + +from bluish.schemas import KV, validate_schema + + +def test_happy() -> None: + schema = { + "type": dict, + "properties": { + "name": [str, None], + "env": [KV, None], + }, + } + + validate_schema(schema, { + "name": "test", + "env": {"a": "b"}, + }) + + validate_schema(schema, { + "name": "test", + }) + + +def test_missing_key() -> None: + schema = { + "type": dict, + "properties": { + "name": str, + "env": KV, + }, + } + + data = { + "name": "test", + } + + try: + validate_schema(schema, data) + raise AssertionError("Should have raised an exception") + except ValueError as e: + assert str(e) == "Missing required key: env" + + +def test_optional_key() -> None: + schema = { + "type": dict, + "properties": { + "name": [str, None], + }, + } + + data: dict = { + } + + validate_schema(schema, data) + + +def test_lists() -> None: + schema = { + "type": dict, + "properties": { + "values": { + "type": list, + "item_schema": str, + }, + }, + } + + data = { + "values": ["a", "b"], + } + + validate_schema(schema, data) + + +def test_lists_any() -> None: + schema = { + "type": dict, + "properties": { + "values": { + "type": list, + "item_schema": Any, + }, + }, + } + + data = { + "values": ["a", "b", 1, {"a": "b"}, [1, 2, 3]], + } + + validate_schema(schema, data) + + +def test_lists_ko() -> None: + schema = { + "type": dict, + "properties": { + "values": { + "type": list, + "item_schema": str, + }, + }, + } + + data = { + "values": ["a", "b", 1], + } + + try: + validate_schema(schema, data) + raise AssertionError("Should have raised an exception") + except ValueError as e: + assert str(e) == "1 is not any of the allowed types: "