Skip to content

Commit

Permalink
Improve Docker runner
Browse files Browse the repository at this point in the history
  • Loading branch information
luismedel committed Dec 11, 2024
1 parent 2921f39 commit 5dbf252
Show file tree
Hide file tree
Showing 8 changed files with 194 additions and 62 deletions.
10 changes: 4 additions & 6 deletions .bluish/bluish.yaml
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@

var:
project_version: "0.8.4"
project_version: "0.9.0"
python_version: "3.12"

jobs:
publish:

name: Publish the latest release of Bluish

runs_on: docker://python:${{ python_version }}-alpine
runs_on:
host: docker://python:${{ python_version }}-alpine
automount: true

steps:
- name: Install required packages
Expand All @@ -31,10 +33,6 @@ jobs:
echo "Didn't get a valid PYPI_VERSION"
false
- uses: git/checkout
with:
repository: https://github.com/luismedel/bluish

- name: Abort if upload is not needed
if: ${{ project_version == pypi_version }}
run: |
Expand Down
7 changes: 7 additions & 0 deletions .bluish/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,13 @@
var:
python_version: "3.12"

inputs:
- name: test
default: 1

runs_on:
host: docker://python:${{ python_version }}-slim

jobs:
_prepare_lint:
name: Prepare linters
Expand Down
6 changes: 4 additions & 2 deletions src/bluish/nodes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import re
from collections import ChainMap, namedtuple
from itertools import product
from typing import Any, Callable, Generator, Optional, Sequence, TypeVar, cast
from typing import Any, Callable, Generator, Iterable, Optional, Sequence, TypeVar, cast

import bluish.core
import bluish.process
Expand All @@ -23,7 +23,7 @@ def log_dict(
_dict: dict | ChainMap,
header: str,
ctx: "Node | None" = None,
sensitive_keys: Sequence[str] = [],
sensitive_keys: Sequence[str] | Iterable[str] = (),
) -> None:
if not _dict:
return
Expand Down Expand Up @@ -160,6 +160,8 @@ def expand_expr(self, value: Any) -> Any:
return _expand_expr(self, value)
elif isinstance(value, list):
return [_expand_expr(self, v) for v in value]
elif isinstance(value, dict):
return {k: _expand_expr(self, v) for k, v in value.items()}
else:
return value

Expand Down
30 changes: 13 additions & 17 deletions src/bluish/nodes/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def __init__(

super().__init__(parent, definition)

self.runs_on_host: dict[str, Any] | None
self.runs_on_host: dict[str, Any] | None = None
self.matrix: dict[str, Any]
self.steps: list[bluish.nodes.step.Step]

Expand All @@ -35,7 +35,7 @@ def __init__(
def reset(self) -> None:
import bluish.nodes.step

self.runs_on_host = None
self._runs_on_host = None
self.matrix = {}
self.steps = []

Expand All @@ -48,13 +48,6 @@ def dispatch(self) -> bluish.process.ProcessResult | None:

info(f"** Run job '{self.display_name}'")

if self.attrs.runs_on:
self.runs_on_host = bluish.process.prepare_host(
self.expand_expr(self.attrs.runs_on)
)
else:
self.runs_on_host = self.parent.runs_on_host # type: ignore

try:
bluish.nodes.log_dict(self.matrix, header="matrix", ctx=self)

Expand All @@ -77,8 +70,9 @@ def dispatch(self) -> bluish.process.ProcessResult | None:
finally:
if self.status == bluish.core.ExecutionStatus.RUNNING:
self.status = bluish.core.ExecutionStatus.FINISHED
bluish.process.cleanup_host(self.runs_on_host)
self.runs_on_host = None
if self._runs_on_host:
bluish.process.cleanup_host(self._runs_on_host)
self._runs_on_host = None

return self.result

Expand All @@ -100,15 +94,15 @@ def exec(
if "${{" in command:
raise ValueError("Command contains unexpanded variables")

host = self.runs_on_host

if context.get_inherited_attr("is_sensitive", False):
stream_output = False

# Define where to capture the output with the >> operator
capture_filename = f"/tmp/{uuid4().hex}"
debug(f"Capture file: {capture_filename}")
touch_result = bluish.process.run(f"touch {capture_filename}", host)
touch_result = bluish.process.run(
f"touch {capture_filename}", self.runs_on_host
)
if touch_result.failed:
error(
f"Failed to create capture file {capture_filename}: {touch_result.error}"
Expand Down Expand Up @@ -141,7 +135,9 @@ def exec(
if working_dir:
debug(f"Working dir: {working_dir}")
debug("Making sure working directory exists...")
mkdir_result = bluish.process.run(f"mkdir -p {working_dir}", host)
mkdir_result = bluish.process.run(
f"mkdir -p {working_dir}", self.runs_on_host
)
if mkdir_result.failed:
error(
f"Failed to create working directory {working_dir}: {mkdir_result.error}"
Expand All @@ -158,14 +154,14 @@ def stderr_handler(line: str) -> None:

run_result = bluish.process.run(
command,
host_opts=host,
host_opts=self.runs_on_host,
stdout_handler=stdout_handler if stream_output else None,
stderr_handler=stderr_handler if stream_output else None,
)

# HACK: We should use our own process.read_file here,
# but it currently causes an infinite recursion
output_result = bluish.process.run(f"cat {capture_filename}", host)
output_result = bluish.process.run(f"cat {capture_filename}", self.runs_on_host)
if output_result.failed:
error(f"Failed to read capture file: {output_result.error}")
return output_result
Expand Down
36 changes: 30 additions & 6 deletions src/bluish/nodes/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,21 @@ def is_true(v: Any) -> bool:
def dispatch(self) -> bluish.process.ProcessResult:
self.reset()

self.status = bluish.core.ExecutionStatus.RUNNING

if self.attrs.runs_on:
self.runs_on_host = bluish.process.prepare_host(
cleanup_host = False
if not self.runs_on_host:
self.runs_on_host = self.runs_on_host or bluish.process.prepare_host(
self.expand_expr(self.attrs.runs_on)
)
cleanup_host = True

self.status = bluish.core.ExecutionStatus.RUNNING

bluish.nodes.log_dict(
self.inputs,
header="with",
ctx=self,
sensitive_keys=self.sensitive_inputs,
)

try:
for job in self.jobs.values():
Expand All @@ -109,8 +118,10 @@ def dispatch(self) -> bluish.process.ProcessResult:
finally:
if self.status == bluish.core.ExecutionStatus.RUNNING:
self.status = bluish.core.ExecutionStatus.FINISHED
bluish.process.cleanup_host(self.runs_on_host)
self.runs_on_host = None

if cleanup_host:
bluish.process.cleanup_host(self.runs_on_host)
self.runs_on_host = None

def dispatch_job(
self, job: bluish.nodes.job.Job, no_deps: bool
Expand Down Expand Up @@ -146,8 +157,21 @@ def __dispatch_job(
for wf_matrix in bluish.nodes._generate_matrices(self):
for job_matrix in bluish.nodes._generate_matrices(job):
job.reset()

if job.attrs.runs_on:
job.runs_on_host = bluish.process.prepare_host(
self.expand_expr(job.attrs.runs_on)
)
else:
job.runs_on_host = self.runs_on_host

job.matrix = {**wf_matrix, **job_matrix}
result = job.dispatch()

if job.runs_on_host and job.runs_on_host is not self.runs_on_host:
bluish.process.cleanup_host(job.runs_on_host)
job.runs_on_host = None

if result and result.failed:
return result

Expand Down
67 changes: 48 additions & 19 deletions src/bluish/process.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import contextlib
import logging
import subprocess
from typing import Any, Callable

from bluish.logging import debug
from bluish.logging import debug, info

SHELLS = {
"bash": "bash -euo pipefail",
Expand Down Expand Up @@ -44,28 +43,58 @@ def _escape_command(command: str) -> str:
return command.replace("\\", r"\\\\").replace("$", "\\$")


def _get_docker_pid(host: str) -> str:
def _get_docker_pid(host: str, docker_args: dict[str, Any]) -> str:
"""Gets the container id from the container name or id."""

docker_pid = run(f"docker ps -f name={host} -qa").stdout.strip()
if not docker_pid:
docker_pid = run(f"docker ps -f id={host} -qa").stdout.strip()
if not docker_pid:
logging.info(f"Preparing container {host}...")
command = f"docker run --detach {host} sleep infinity"
debug(f" > {command}")
run_result = run(command)
if run_result.failed:
raise ValueError(f"Could not start container {host}: {run_result.error}")
docker_pid = run_result.stdout.strip()
debug(f"Docker pid {docker_pid}")
if docker_pid:
info(f"Found container {host} with pid {docker_pid}")
return docker_pid

docker_pid = run(f"docker ps -f id={host} -qa").stdout.strip()
if docker_pid:
info(f"Found container {host} with pid {docker_pid}")
return docker_pid

info(f"Preparing container {host}...")

opts: str = ""

if docker_args.get("automount", False):
if "-v" in docker_args or "--volume" in docker_args:
raise ValueError("To use custom volumes, set automount to false")
if "-w" in docker_args or "--workdir" in docker_args:
raise ValueError("To use custom workdir, set automount to false")

opts += " -v .:/mnt"
opts += " -w /mnt"

for k, v in docker_args.items():
if k == "automount":
continue
opts += f" {k}" if isinstance(v, bool) else f" {k} {v}"

command = f"docker run {opts} --detach {host} sleep infinity"
debug(f" > {command}")
run_result = run(command)
if run_result.failed:
raise ValueError(f"Could not start container {host}: {run_result.error}")
docker_pid = run_result.stdout.strip()
info(f" - Container pid {docker_pid}")

return docker_pid


def prepare_host(opts: str | dict[str, Any] | None) -> dict[str, Any]:
"""Prepares a host for running commands."""

host = opts.get("host", None) if isinstance(opts, dict) else opts
host_args: dict[str, Any] | None = None
if isinstance(opts, dict):
host = opts.get("host", None)
host_args = {k: v for k, v in opts.items() if k != "host"}
else:
host = opts

if not host:
return {}

Expand All @@ -74,12 +103,12 @@ def prepare_host(opts: str | dict[str, Any] | None) -> dict[str, Any]:

if host.startswith("docker://"):
host = host[9:]
docker_pid = _get_docker_pid(host)
docker_pid = _get_docker_pid(host, host_args or {})
if not docker_pid:
raise ValueError(f"Could not find container with name or id {host}")
return {"host": f"docker://{docker_pid}"}
return {"host": f"docker://{docker_pid}", **(host_args if host_args else {})}
elif host.startswith("ssh://"):
return {"host": host, **(opts if isinstance(opts, dict) else {})}
return {"host": host, **(host_args if host_args else {})}
else:
raise ValueError(f"Unsupported host: {host}")

Expand All @@ -98,7 +127,7 @@ def cleanup_host(host_opts: dict[str, Any] | None) -> None:

if host.startswith("docker://"):
host = host[9:]
logging.info(f"Stopping and removing container {host}...")
info(f"Stopping and removing container {host}...")

with contextlib.suppress(Exception):
run(f"docker stop {host}")
Expand Down
25 changes: 13 additions & 12 deletions src/bluish/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,17 +291,7 @@ def __repr__(self) -> str:
)


JOB_SCHEMA = Object(
{
**_COMMON_PROPERTIES,
"runs_on": Optional(Str),
"depends_on": DefaultStringList,
"steps": List(STEP_SCHEMA),
}
)


WORKFLOW_INPUT_SCHEMA = Object(
INPUT_DEFINITION_SCHEMA = Object(
{
"name": Str,
"description": Optional(Str),
Expand All @@ -312,10 +302,21 @@ def __repr__(self) -> str:
)


JOB_SCHEMA = Object(
{
**_COMMON_PROPERTIES,
"inputs": List(INPUT_DEFINITION_SCHEMA, default=list),
"runs_on": Optional(Str),
"depends_on": DefaultStringList,
"steps": List(STEP_SCHEMA),
}
)


WORKFLOW_SCHEMA = Object(
{
**_COMMON_PROPERTIES,
"inputs": List(WORKFLOW_INPUT_SCHEMA, default=list),
"inputs": List(INPUT_DEFINITION_SCHEMA, default=list),
"runs_on": Optional(Str),
"jobs": Dict(Str, JOB_SCHEMA),
}
Expand Down
Loading

0 comments on commit 5dbf252

Please sign in to comment.