Skip to content

Commit

Permalink
Pass CLI parameters to workflows
Browse files Browse the repository at this point in the history
  • Loading branch information
luismedel committed Dec 8, 2024
1 parent afb14cf commit 2921f39
Show file tree
Hide file tree
Showing 7 changed files with 117 additions and 13 deletions.
14 changes: 13 additions & 1 deletion src/bluish/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,12 @@ def workflow_from_file(file: str) -> Workflow:
help="Log level",
)
@click.version_option(PROJECT_VERSION)
@click.argument("args", nargs=-1, type=click.UNPROCESSED)
def blu_cli(
job_id: str,
no_deps: bool,
log_level: str,
args: tuple[str],
) -> None:
init_logging(log_level)
init_commands()
Expand All @@ -117,7 +119,12 @@ def blu_cli(
if not yaml_path:
fatal("No workflow file found.")

logging.info(f"Loading workflow from {yaml_path}")
logging.info("")

wf = workflow_from_file(yaml_path)
wf.set_inputs({k: v for k, v in (arg.split("=", maxsplit=1) for arg in args)})

job: Job | None = wf.jobs.get(job_id)
if not job:
fatal(f"Job '{job_id}' not found.")
Expand Down Expand Up @@ -165,6 +172,9 @@ def bluish_cli(
if not yaml_path:
fatal("No workflow file found.")

logging.info(f"Loading workflow from {yaml_path}")
logging.info("")

with contextlib.suppress(FileNotFoundError):
with open(yaml_path, "r") as yaml_file:
yaml_contents = yaml_file.read()
Expand Down Expand Up @@ -194,8 +204,10 @@ def list_jobs(wf: Workflow) -> None:
@bluish_cli.command("run")
@click.argument("job_id", type=str, required=True)
@click.option("--no-deps", is_flag=True, help="Don't run job dependencies")
@click.argument("args", nargs=-1, type=click.UNPROCESSED)
@click.pass_obj
def run_job(wf: Workflow, job_id: str, no_deps: bool) -> None:
def run_job(wf: Workflow, job_id: str, no_deps: bool, args: tuple[str]) -> None:
wf.set_inputs({k: v for k, v in (arg.split("=", maxsplit=1) for arg in args)})
job = wf.jobs.get(job_id)
if not job:
fatal(f"Job '{job_id}' not found.")
Expand Down
15 changes: 5 additions & 10 deletions src/bluish/nodes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ def __init__(self, parent: Optional["Node"], definition: Definition):
self.result = bluish.process.ProcessResult()
self.failed = False
self.status = bluish.core.ExecutionStatus.PENDING
self.sensitive_inputs: set[str] = {"password", "token"}

self._expression_parser: Callable[[str], Any] | None = None

Expand Down Expand Up @@ -192,12 +193,6 @@ def get_inherited_attr(
return self.expand_expr(result)


class InputOutputNode(Node):
def __init__(self, parent: Node, definition: Definition):
super().__init__(parent, definition)
self.sensitive_inputs: set[str] = {"password", "token"}


class CircularDependencyError(Exception):
pass

Expand All @@ -212,8 +207,8 @@ class VariableExpandError(Exception):
ValueResult = namedtuple("ValueResult", ["value", "contains_secrets"])


def _step_or_job(ctx: Node) -> InputOutputNode:
if isinstance(ctx, InputOutputNode):
def _step_or_job(ctx: Node) -> Node:
if ctx.NODE_TYPE == "step" or ctx.NODE_TYPE == "job":
return ctx
raise ValueError(f"Can't find step or job in context of type: {ctx.NODE_TYPE}")

Expand Down Expand Up @@ -337,7 +332,7 @@ def prepare_value(value: Any) -> Any:
elif root == "step":
return _try_get_value(_step(ctx), varname, raw)
elif root == "inputs":
node = _step_or_job(ctx)
node = ctx
if varname in node.inputs:
if varname in node.sensitive_inputs:
return prepare_value(SafeString(node.inputs[varname], "********"))
Expand Down Expand Up @@ -422,7 +417,7 @@ def _expand_expr(
return ctx.expression_parser(value)


def can_dispatch(context: InputOutputNode) -> bool:
def can_dispatch(context: Node) -> bool:
if context.attrs._if is None:
return True

Expand Down
2 changes: 1 addition & 1 deletion src/bluish/nodes/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from bluish.utils import decorate_for_log


class Job(bluish.nodes.InputOutputNode):
class Job(bluish.nodes.Node):
NODE_TYPE = "job"

def __init__(
Expand Down
2 changes: 1 addition & 1 deletion src/bluish/nodes/step.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from bluish.logging import info


class Step(nodes.InputOutputNode):
class Step(nodes.Node):
NODE_TYPE = "step"

def __init__(self, parent: nodes.Node, definition: nodes.Definition):
Expand Down
33 changes: 33 additions & 0 deletions src/bluish/nodes/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ def __init__(self, definition: bluish.nodes.Definition) -> None:
self.sys_env: dict
self.jobs: dict
self.runs_on_host: dict[str, Any] | None
self._inputs: dict[str, str] | None = None

self._job_definitions: dict = {}
for k, v in self.attrs.jobs.items():
Expand All @@ -28,6 +29,10 @@ def __init__(self, definition: bluish.nodes.Definition) -> None:

self.reset()

@property
def inputs(self) -> dict[str, str]:
return self._inputs or {}

def reset(self) -> None:
self.matrix = {}
self.sys_env = {}
Expand All @@ -50,6 +55,34 @@ def reset(self) -> None:
for k, v in self._job_definitions.items():
self.jobs[k] = bluish.nodes.job.Job(self, v)

def set_inputs(self, inputs: dict[str, str]) -> None:
def is_true(v: Any) -> bool:
return v in ("true", "1", True)

self._inputs = {}
for param in self.attrs.inputs:
name = param.get("name")
if not name:
raise ValueError("Invalid input parameter (missing name)")

if is_true(param.get("sensitive")):
self.sensitive_inputs.add(name)

if name in inputs or "default" in param:
self._inputs[name] = self.expand_expr(
inputs.get(name, param.get("default"))
)
elif is_true(param.get("required")):
raise ValueError(f"Missing required input parameter: {name}")

# Check for unknown input parameters
unknowns = list(k for k in inputs.keys() if k not in self._inputs)
if unknowns:
if len(unknowns) == 1:
raise ValueError(f"Unknown input parameter: {unknowns[0]}")
else:
raise ValueError(f"Unknown input parameters: {unknowns}")

def dispatch(self) -> bluish.process.ProcessResult:
self.reset()

Expand Down
12 changes: 12 additions & 0 deletions src/bluish/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,9 +301,21 @@ def __repr__(self) -> str:
)


WORKFLOW_INPUT_SCHEMA = Object(
{
"name": Str,
"description": Optional(Str),
"required": Bool(default=False),
"sensitive": Bool(default=False),
"default": Optional(AnyType),
}
)


WORKFLOW_SCHEMA = Object(
{
**_COMMON_PROPERTIES,
"inputs": List(WORKFLOW_INPUT_SCHEMA, default=list),
"runs_on": Optional(Str),
"jobs": Dict(Str, JOB_SCHEMA),
}
Expand Down
52 changes: 52 additions & 0 deletions test/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,58 @@ def test_mandatory_inputs() -> None:
pass


def test_workflow_inputs() -> None:
wf = create_workflow("""
inputs:
- name: TEST_INPUT
required: true
jobs:
test:
steps:
- run: |
echo "Hello, ${{ workflow.inputs.TEST_INPUT }}!"
""")
wf.set_inputs({"TEST_INPUT": "World"})
_ = wf.dispatch()
assert wf.get_value("jobs.test.stdout") == "Hello, World!"


def test_workflow_unexpected_inputs() -> None:
try:
wf = create_workflow("""
jobs:
test:
steps:
- run: |
echo "Hello, ${{ workflow.inputs.TEST_INPUT }}!"
""")
wf.set_inputs({
"UNEXPECTED_INPUT": "Unexpected"
})
raise AssertionError("Unexpected input not detected")
except ValueError as ex:
assert str(ex) == "Unknown input parameter: UNEXPECTED_INPUT"


def test_workflow_optional_inputs() -> None:
wf = create_workflow("""
inputs:
- name: TEST_INPUT
required: false
default: "World"
jobs:
test:
steps:
- run: |
echo "Hello, ${{ workflow.inputs.TEST_INPUT }}!"
""")
wf.set_inputs({})
_ = wf.dispatch()
assert wf.get_value("jobs.test.stdout") == "Hello, World!"


def test_cwd() -> None:
wf = create_workflow("""
working_directory: /tmp
Expand Down

0 comments on commit 2921f39

Please sign in to comment.