Skip to content

Commit

Permalink
Merge packaging
Browse files Browse the repository at this point in the history
* Clean up tests
* Move image cache
  • Loading branch information
ethanluoyc committed May 3, 2024
1 parent 675b6c7 commit 4a62ba4
Show file tree
Hide file tree
Showing 22 changed files with 472 additions and 551 deletions.
4 changes: 2 additions & 2 deletions lxm3/clusters/gridengine.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,10 +145,10 @@ def _run_command(self, command: str) -> str:
result = self._connection.run(command, hide="both")
return result.stdout

def launch(self, command):
def launch(self, command) -> str:
output = self._run_command(f"qsub {command}")
match = parse_job_id(output)
return match
return match.group(0)

def qstat(self):
stats = parse_qstat(self._run_command("qstat -xml"))
Expand Down
4 changes: 2 additions & 2 deletions lxm3/clusters/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ def close(self):
if self._connection is not None:
self._connection.close() # type: ignore

def launch(self, command) -> int:
def launch(self, command) -> str:
output = self._submit_command(command)
return parse_job_id(output)
return str(parse_job_id(output))

def _run_command(self, command: str) -> str:
if self._connection is None:
Expand Down
File renamed without changes.
2 changes: 1 addition & 1 deletion lxm3/xm_cluster/artifacts.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ def put_fileobj(self, fileobj, rpath: str) -> str:

def should_update(self, lpath: str, rpath: str) -> Tuple[bool, str]:
if not self.exists(rpath):
return True, "file does not exist"
return True, "new file"

local_stat = os.stat(lpath)
local_size = local_stat.st_size
Expand Down
15 changes: 13 additions & 2 deletions lxm3/xm_cluster/executables.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,26 @@
import enum
from typing import Dict, Optional

import attr

from lxm3 import xm


class ContainerImageType(enum.Enum):
DOCKER = "docker"
SINGULARITY = "singularity"


@attr.s(auto_attribs=True)
class ContainerImage:
name: str
image_type: ContainerImageType


@attr.s(auto_attribs=True)
class AppBundle(xm.Executable):
entrypoint_command: str
resource_uri: str
args: xm.SequentialArgs = attr.Factory(xm.SequentialArgs)
env_vars: Dict[str, str] = attr.Factory(dict)
singularity_image: Optional[str] = None
docker_image: Optional[str] = None
container_image: Optional[ContainerImage] = None
50 changes: 19 additions & 31 deletions lxm3/xm_cluster/execution/gridengine.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,11 @@ def _create_job_script_prologue(
)
cmds.append("nvidia-smi")

if executable.singularity_image is not None:
if (
executable.container_image
and executable.container_image.image_type
== executables.ContainerImageType.SINGULARITY
):
cmds.append(
'echo >&2 "INFO[$(basename "$0")]: Singularity version: $(singularity --version)"'
)
Expand Down Expand Up @@ -82,24 +86,9 @@ class GridEngineHandle:
def __init__(self, job_id: str) -> None:
self.job_id = job_id

async def wait(self):
raise NotImplementedError()

async def monitor(self):
raise NotImplementedError()


def _sge_job_predicate(job):
if isinstance(job, xm.Job):
return isinstance(job.executor, executors.GridEngine)
elif isinstance(job, array_job.ArrayJob):
return isinstance(job.executor, executors.GridEngine)
else:
raise ValueError(f"Unexpected job type: {type(job)}")


class GridEngineClient:
builder_cls = GridEngineJobScriptBuilder
builder_cls: type[GridEngineJobScriptBuilder] = GridEngineJobScriptBuilder
_settings: config_lib.ClusterSettings

def __init__(
Expand Down Expand Up @@ -134,17 +123,15 @@ def launch(self, job_name: str, job: job_script_builder.JobType):
console.info(
f"Launching {num_jobs} job on {self._settings.hostname} with [cyan bold dim]qsub {job_script_path}[/]"
)
group = self._cluster.launch(job_script_path)
job_id = self._cluster.launch(job_script_path)

handles = [
GridEngineHandle(job_id) for job_id in gridengine.split_job_ids(group)
]
handles = [GridEngineHandle(job_id)]

self._save_job_id(job_name, group.group(0))
self._save_job_id(job_name, job_id)

console.info(
f"""\
Successfully launched job [green bold]{group.group(0)}[/]
Successfully launched job [green bold]{job_id}[/]
- Saved job id in [dim]{os.path.dirname(job_script_path)}/job_id[/]
- Find job logs in [dim]{job_log_dir}"""
)
Expand All @@ -162,16 +149,17 @@ def client():
return GridEngineClient(settings, artifact_store)


async def launch(
job_name: str, job: job_script_builder.JobType
) -> List[GridEngineHandle]:
if isinstance(job, array_job.ArrayJob):
jobs = [job] # type: ignore
elif isinstance(job, xm.JobGroup):
jobs: List[xm.Job] = xm.job_operators.flatten_jobs(job)
def _sge_job_predicate(job):
if isinstance(job, xm.Job):
return isinstance(job.executor, executors.GridEngine)
elif isinstance(job, array_job.ArrayJob):
return isinstance(job.executor, executors.GridEngine)
else:
raise NotImplementedError()
raise ValueError(f"Unexpected job type: {type(job)}")


async def launch(job_name: str, job) -> List[GridEngineHandle]:
jobs = job_script_builder.flatten_job(job)
jobs = [job for job in jobs if _sge_job_predicate(job)]

if not jobs:
Expand Down
37 changes: 26 additions & 11 deletions lxm3/xm_cluster/execution/job_script_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,15 @@ def _create_entrypoint_commands(self, job: JobType, install_dir: str) -> str:
executable = job.executable
if not isinstance(executable, executables.AppBundle):
raise ValueError("Only Command executable is supported")
executor = cast(executors.SupportsContainer, job.executor)
executor = job.executor
if not isinstance(executor, executors.SupportsContainer):
raise TypeError("Executor should support container configuration")

if executable.container_image is not None:
image = executable.container_image.name
image_type = executable.container_image.image_type

if executable.singularity_image or executable.docker_image:
if executable.docker_image and executable.singularity_image:
raise ValueError("Only docker or singularity image should be used.")
if executable.singularity_image:
image = executable.singularity_image
if image_type == executables.ContainerImageType.SINGULARITY:
get_container_cmd = create_singularity_command
singularity_options = (
executor.singularity_options or executors.SingularityOptions()
Expand All @@ -121,8 +123,7 @@ def _create_entrypoint_commands(self, job: JobType, install_dir: str) -> str:
BindMount(src, dst) for src, dst in singularity_options.bind
]
runtime_options = [*singularity_options.extra_options]
elif executable.docker_image:
image = executable.docker_image
elif image_type == executables.ContainerImageType.DOCKER:
get_container_cmd = create_docker_command
docker_options = executor.docker_options or executors.DockerOptions()
bind_mounts = [
Expand All @@ -136,8 +137,8 @@ def _create_entrypoint_commands(self, job: JobType, install_dir: str) -> str:
[
BindMount(install_dir, self.CONTAINER_WORKDIR),
BindMount(
f"{install_dir}/{self.JOB_PARAM_NAME}",
f"{self.CONTAINER_JOB_PARAM_PATH}",
os.path.join(install_dir, self.JOB_PARAM_NAME),
self.CONTAINER_JOB_PARAM_PATH,
read_only=True,
),
]
Expand All @@ -153,7 +154,7 @@ def _create_entrypoint_commands(self, job: JobType, install_dir: str) -> str:
options=runtime_options,
working_dir=self.CONTAINER_WORKDIR,
use_gpu=self._is_gpu_requested(executor),
env_file=f"{install_dir}/.environment",
env_file=os.path.join(install_dir, ".environment"),
)

else:
Expand Down Expand Up @@ -402,3 +403,17 @@ def job_script_path(job_name: str):

def job_log_path(job_name: str):
return os.path.join("logs", job_name)


def flatten_job(
job: Union[xm.JobGroup, array_job.ArrayJob],
) -> List[Union[xm.Job, array_job.ArrayJob]]:
if isinstance(job, array_job.ArrayJob):
return [job] # type: ignore
elif isinstance(job, xm.JobGroup):
jobs = xm.job_operators.flatten_jobs(job)
if len(jobs) > 1:
raise NotImplementedError("JobGroup is not supported.")
return jobs # type: ignore
else:
raise NotImplementedError()
35 changes: 12 additions & 23 deletions lxm3/xm_cluster/execution/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import re
import shutil
import subprocess
from typing import List, Optional
from typing import Optional

from absl import logging

Expand Down Expand Up @@ -52,11 +52,6 @@ def _create_job_script_prologue(
) -> str:
del executor
cmds = ['echo >&2 "INFO[$(basename "$0")]: Running on host $(hostname)"']

if executable.singularity_image is not None:
cmds.append(
'echo >&2 "INFO[$(basename "$0")]: Singularity version: $(singularity --version)"'
)
return "\n".join(cmds)

@classmethod
Expand Down Expand Up @@ -89,17 +84,8 @@ async def monitor(self) -> None:
await asyncio.wrap_future(self.future)


def _local_job_predicate(job):
if isinstance(job, xm.Job):
return isinstance(job.executor, executors.Local)
elif isinstance(job, array_job_lib.ArrayJob):
return isinstance(job.executor, executors.Local)
else:
raise ValueError(f"Unexpected job type: {type(job)}")


class LocalClient:
builder_cls = LocalJobScriptBuilder
builder_cls: type[LocalJobScriptBuilder] = LocalJobScriptBuilder

def __init__(
self,
Expand Down Expand Up @@ -157,14 +143,17 @@ def client() -> LocalClient:
return LocalClient(local_settings, artifact_store)


async def launch(job_name: str, job: job_script_builder.JobType):
if isinstance(job, array_job_lib.ArrayJob):
jobs = [job] # type: ignore
elif isinstance(job, xm.JobGroup):
jobs: List[xm.Job] = xm.job_operators.flatten_jobs(job)
elif isinstance(job, xm.Job):
jobs = [job]
def _local_job_predicate(job):
if isinstance(job, xm.Job):
return isinstance(job.executor, executors.Local)
elif isinstance(job, array_job_lib.ArrayJob):
return isinstance(job.executor, executors.Local)
else:
raise ValueError(f"Unexpected job type: {type(job)}")


async def launch(job_name: str, job):
jobs = job_script_builder.flatten_job(job)
jobs = [job for job in jobs if _local_job_predicate(job)]

if not jobs:
Expand Down
45 changes: 12 additions & 33 deletions lxm3/xm_cluster/execution/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,7 @@ def _create_job_script_prologue(cls, executable, executor: executors.Slurm) -> s
cmds.append(
'echo >&2 "INFO[$(basename "$0")]: CUDA_VISIBLE_DEVICES=$CUDA_VISIBLE_DEVICES"'
)
# cmds.append("nvidia-smi")

if executable.singularity_image is not None:
cmds.append(
'echo >&2 "INFO[$(basename "$0")]: Singularity version: $(singularity --version)"'
)
return "\n".join(cmds)

@classmethod
Expand Down Expand Up @@ -69,24 +64,9 @@ class SlurmHandle:
def __init__(self, job_id: str) -> None:
self.job_id = job_id

async def wait(self):
raise NotImplementedError()

async def monitor(self):
raise NotImplementedError()


def _slurm_job_predicate(job):
if isinstance(job, xm.Job):
return isinstance(job.executor, executors.Slurm)
elif isinstance(job, array_job.ArrayJob):
return isinstance(job.executor, executors.Slurm)
else:
raise ValueError(f"Unexpected job type: {type(job)}")


class SlurmClient:
builder_cls = SlurmJobScriptBuilder
builder_cls: type[SlurmJobScriptBuilder] = SlurmJobScriptBuilder

def __init__(
self,
Expand Down Expand Up @@ -121,11 +101,7 @@ def launch(self, job_name: str, job: job_script_builder.JobType):
console.info(f"Successfully launched job {job_id}")
self._artifact_store.put_text(str(job_id), f"jobs/{job_name}/job_id")

if num_jobs > 1:
job_ids = [f"{job_id}_{i}" for i in range(num_jobs)]
else:
job_ids = [f"{job_id}"]
handles = [SlurmHandle(j) for j in job_ids]
handles = [SlurmHandle(job_id)]

return handles

Expand All @@ -137,14 +113,17 @@ def client() -> SlurmClient:
return SlurmClient(settings, artifact_store)


async def launch(job_name: str, job: job_script_builder.JobType) -> List[SlurmHandle]:
if isinstance(job, array_job.ArrayJob):
jobs = [job] # type: ignore
elif isinstance(job, xm.JobGroup):
jobs: List[xm.Job] = xm.job_operators.flatten_jobs(job)
elif isinstance(job, xm.Job):
jobs = [job]
def _slurm_job_predicate(job):
if isinstance(job, xm.Job):
return isinstance(job.executor, executors.Slurm)
elif isinstance(job, array_job.ArrayJob):
return isinstance(job.executor, executors.Slurm)
else:
raise ValueError(f"Unexpected job type: {type(job)}")


async def launch(job_name: str, job) -> List[SlurmHandle]:
jobs = job_script_builder.flatten_job(job)
jobs = [job for job in jobs if _slurm_job_predicate(job)]

if not jobs:
Expand Down
2 changes: 2 additions & 0 deletions lxm3/xm_cluster/executors.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import datetime
import typing
from typing import Any, Dict, Optional, Protocol, Sequence, Union

import attr
Expand Down Expand Up @@ -62,6 +63,7 @@ class DockerOptions:
extra_options: Sequence[str] = attr.Factory(list)


@typing.runtime_checkable
class SupportsContainer(Protocol):
singularity_options: Optional[SingularityOptions]
docker_options: Optional[DockerOptions]
Expand Down
Loading

0 comments on commit 4a62ba4

Please sign in to comment.