From 2921f394113db4bec27133308e406fab18bc4bca Mon Sep 17 00:00:00 2001 From: Luis Medel Date: Mon, 9 Dec 2024 00:44:38 +0100 Subject: [PATCH] Pass CLI parameters to workflows --- src/bluish/app.py | 14 +++++++++- src/bluish/nodes/__init__.py | 15 ++++------- src/bluish/nodes/job.py | 2 +- src/bluish/nodes/step.py | 2 +- src/bluish/nodes/workflow.py | 33 +++++++++++++++++++++++ src/bluish/schemas.py | 12 +++++++++ test/test_core.py | 52 ++++++++++++++++++++++++++++++++++++ 7 files changed, 117 insertions(+), 13 deletions(-) diff --git a/src/bluish/app.py b/src/bluish/app.py index 71959f7..5859cca 100644 --- a/src/bluish/app.py +++ b/src/bluish/app.py @@ -101,10 +101,12 @@ def workflow_from_file(file: str) -> Workflow: help="Log level", ) @click.version_option(PROJECT_VERSION) +@click.argument("args", nargs=-1, type=click.UNPROCESSED) def blu_cli( job_id: str, no_deps: bool, log_level: str, + args: tuple[str], ) -> None: init_logging(log_level) init_commands() @@ -117,7 +119,12 @@ def blu_cli( if not yaml_path: fatal("No workflow file found.") + logging.info(f"Loading workflow from {yaml_path}") + logging.info("") + wf = workflow_from_file(yaml_path) + wf.set_inputs({k: v for k, v in (arg.split("=", maxsplit=1) for arg in args)}) + job: Job | None = wf.jobs.get(job_id) if not job: fatal(f"Job '{job_id}' not found.") @@ -165,6 +172,9 @@ def bluish_cli( if not yaml_path: fatal("No workflow file found.") + logging.info(f"Loading workflow from {yaml_path}") + logging.info("") + with contextlib.suppress(FileNotFoundError): with open(yaml_path, "r") as yaml_file: yaml_contents = yaml_file.read() @@ -194,8 +204,10 @@ def list_jobs(wf: Workflow) -> None: @bluish_cli.command("run") @click.argument("job_id", type=str, required=True) @click.option("--no-deps", is_flag=True, help="Don't run job dependencies") +@click.argument("args", nargs=-1, type=click.UNPROCESSED) @click.pass_obj -def run_job(wf: Workflow, job_id: str, no_deps: bool) -> None: +def run_job(wf: Workflow, job_id: str, no_deps: bool, args: tuple[str]) -> None: + wf.set_inputs({k: v for k, v in (arg.split("=", maxsplit=1) for arg in args)}) job = wf.jobs.get(job_id) if not job: fatal(f"Job '{job_id}' not found.") diff --git a/src/bluish/nodes/__init__.py b/src/bluish/nodes/__init__.py index 8615744..a471910 100644 --- a/src/bluish/nodes/__init__.py +++ b/src/bluish/nodes/__init__.py @@ -97,6 +97,7 @@ def __init__(self, parent: Optional["Node"], definition: Definition): self.result = bluish.process.ProcessResult() self.failed = False self.status = bluish.core.ExecutionStatus.PENDING + self.sensitive_inputs: set[str] = {"password", "token"} self._expression_parser: Callable[[str], Any] | None = None @@ -192,12 +193,6 @@ def get_inherited_attr( return self.expand_expr(result) -class InputOutputNode(Node): - def __init__(self, parent: Node, definition: Definition): - super().__init__(parent, definition) - self.sensitive_inputs: set[str] = {"password", "token"} - - class CircularDependencyError(Exception): pass @@ -212,8 +207,8 @@ class VariableExpandError(Exception): ValueResult = namedtuple("ValueResult", ["value", "contains_secrets"]) -def _step_or_job(ctx: Node) -> InputOutputNode: - if isinstance(ctx, InputOutputNode): +def _step_or_job(ctx: Node) -> Node: + if ctx.NODE_TYPE == "step" or ctx.NODE_TYPE == "job": return ctx raise ValueError(f"Can't find step or job in context of type: {ctx.NODE_TYPE}") @@ -337,7 +332,7 @@ def prepare_value(value: Any) -> Any: elif root == "step": return _try_get_value(_step(ctx), varname, raw) elif root == "inputs": - node = _step_or_job(ctx) + node = ctx if varname in node.inputs: if varname in node.sensitive_inputs: return prepare_value(SafeString(node.inputs[varname], "********")) @@ -422,7 +417,7 @@ def _expand_expr( return ctx.expression_parser(value) -def can_dispatch(context: InputOutputNode) -> bool: +def can_dispatch(context: Node) -> bool: if context.attrs._if is None: return True diff --git a/src/bluish/nodes/job.py b/src/bluish/nodes/job.py index 2ae0699..cb1e629 100644 --- a/src/bluish/nodes/job.py +++ b/src/bluish/nodes/job.py @@ -9,7 +9,7 @@ from bluish.utils import decorate_for_log -class Job(bluish.nodes.InputOutputNode): +class Job(bluish.nodes.Node): NODE_TYPE = "job" def __init__( diff --git a/src/bluish/nodes/step.py b/src/bluish/nodes/step.py index 9db9b12..7878bed 100644 --- a/src/bluish/nodes/step.py +++ b/src/bluish/nodes/step.py @@ -5,7 +5,7 @@ from bluish.logging import info -class Step(nodes.InputOutputNode): +class Step(nodes.Node): NODE_TYPE = "step" def __init__(self, parent: nodes.Node, definition: nodes.Definition): diff --git a/src/bluish/nodes/workflow.py b/src/bluish/nodes/workflow.py index b114287..7287b93 100644 --- a/src/bluish/nodes/workflow.py +++ b/src/bluish/nodes/workflow.py @@ -20,6 +20,7 @@ def __init__(self, definition: bluish.nodes.Definition) -> None: self.sys_env: dict self.jobs: dict self.runs_on_host: dict[str, Any] | None + self._inputs: dict[str, str] | None = None self._job_definitions: dict = {} for k, v in self.attrs.jobs.items(): @@ -28,6 +29,10 @@ def __init__(self, definition: bluish.nodes.Definition) -> None: self.reset() + @property + def inputs(self) -> dict[str, str]: + return self._inputs or {} + def reset(self) -> None: self.matrix = {} self.sys_env = {} @@ -50,6 +55,34 @@ def reset(self) -> None: for k, v in self._job_definitions.items(): self.jobs[k] = bluish.nodes.job.Job(self, v) + def set_inputs(self, inputs: dict[str, str]) -> None: + def is_true(v: Any) -> bool: + return v in ("true", "1", True) + + self._inputs = {} + for param in self.attrs.inputs: + name = param.get("name") + if not name: + raise ValueError("Invalid input parameter (missing name)") + + if is_true(param.get("sensitive")): + self.sensitive_inputs.add(name) + + if name in inputs or "default" in param: + self._inputs[name] = self.expand_expr( + inputs.get(name, param.get("default")) + ) + elif is_true(param.get("required")): + raise ValueError(f"Missing required input parameter: {name}") + + # Check for unknown input parameters + unknowns = list(k for k in inputs.keys() if k not in self._inputs) + if unknowns: + if len(unknowns) == 1: + raise ValueError(f"Unknown input parameter: {unknowns[0]}") + else: + raise ValueError(f"Unknown input parameters: {unknowns}") + def dispatch(self) -> bluish.process.ProcessResult: self.reset() diff --git a/src/bluish/schemas.py b/src/bluish/schemas.py index ad21850..3d062c8 100644 --- a/src/bluish/schemas.py +++ b/src/bluish/schemas.py @@ -301,9 +301,21 @@ def __repr__(self) -> str: ) +WORKFLOW_INPUT_SCHEMA = Object( + { + "name": Str, + "description": Optional(Str), + "required": Bool(default=False), + "sensitive": Bool(default=False), + "default": Optional(AnyType), + } +) + + WORKFLOW_SCHEMA = Object( { **_COMMON_PROPERTIES, + "inputs": List(WORKFLOW_INPUT_SCHEMA, default=list), "runs_on": Optional(Str), "jobs": Dict(Str, JOB_SCHEMA), } diff --git a/test/test_core.py b/test/test_core.py index 2035892..d261e93 100644 --- a/test/test_core.py +++ b/test/test_core.py @@ -418,6 +418,58 @@ def test_mandatory_inputs() -> None: pass +def test_workflow_inputs() -> None: + wf = create_workflow(""" +inputs: + - name: TEST_INPUT + required: true + +jobs: + test: + steps: + - run: | + echo "Hello, ${{ workflow.inputs.TEST_INPUT }}!" +""") + wf.set_inputs({"TEST_INPUT": "World"}) + _ = wf.dispatch() + assert wf.get_value("jobs.test.stdout") == "Hello, World!" + + +def test_workflow_unexpected_inputs() -> None: + try: + wf = create_workflow(""" +jobs: + test: + steps: + - run: | + echo "Hello, ${{ workflow.inputs.TEST_INPUT }}!" +""") + wf.set_inputs({ + "UNEXPECTED_INPUT": "Unexpected" + }) + raise AssertionError("Unexpected input not detected") + except ValueError as ex: + assert str(ex) == "Unknown input parameter: UNEXPECTED_INPUT" + + +def test_workflow_optional_inputs() -> None: + wf = create_workflow(""" +inputs: + - name: TEST_INPUT + required: false + default: "World" + +jobs: + test: + steps: + - run: | + echo "Hello, ${{ workflow.inputs.TEST_INPUT }}!" +""") + wf.set_inputs({}) + _ = wf.dispatch() + assert wf.get_value("jobs.test.stdout") == "Hello, World!" + + def test_cwd() -> None: wf = create_workflow(""" working_directory: /tmp