diff --git a/lxm3/clusters/gridengine.py b/lxm3/clusters/gridengine.py index 7d1f894..e8b6f2f 100644 --- a/lxm3/clusters/gridengine.py +++ b/lxm3/clusters/gridengine.py @@ -145,10 +145,10 @@ def _run_command(self, command: str) -> str: result = self._connection.run(command, hide="both") return result.stdout - def launch(self, command): + def launch(self, command) -> str: output = self._run_command(f"qsub {command}") match = parse_job_id(output) - return match + return match.group(0) def qstat(self): stats = parse_qstat(self._run_command("qstat -xml")) diff --git a/lxm3/clusters/slurm.py b/lxm3/clusters/slurm.py index 9f93b89..699bfbe 100644 --- a/lxm3/clusters/slurm.py +++ b/lxm3/clusters/slurm.py @@ -32,9 +32,9 @@ def close(self): if self._connection is not None: self._connection.close() # type: ignore - def launch(self, command) -> int: + def launch(self, command) -> str: output = self._submit_command(command) - return parse_job_id(output) + return str(parse_job_id(output)) def _run_command(self, command: str) -> str: if self._connection is None: diff --git a/lxm3/experimental/image_cache.py b/lxm3/singularity/image_cache.py similarity index 100% rename from lxm3/experimental/image_cache.py rename to lxm3/singularity/image_cache.py diff --git a/lxm3/xm_cluster/artifacts.py b/lxm3/xm_cluster/artifacts.py index 1e72f43..9ab5bf4 100644 --- a/lxm3/xm_cluster/artifacts.py +++ b/lxm3/xm_cluster/artifacts.py @@ -102,7 +102,7 @@ def put_fileobj(self, fileobj, rpath: str) -> str: def should_update(self, lpath: str, rpath: str) -> Tuple[bool, str]: if not self.exists(rpath): - return True, "file does not exist" + return True, "new file" local_stat = os.stat(lpath) local_size = local_stat.st_size diff --git a/lxm3/xm_cluster/executables.py b/lxm3/xm_cluster/executables.py index d73306a..509ec9d 100644 --- a/lxm3/xm_cluster/executables.py +++ b/lxm3/xm_cluster/executables.py @@ -1,3 +1,4 @@ +import enum from typing import Dict, Optional import attr @@ -5,11 +6,21 @@ from lxm3 import xm +class ContainerImageType(enum.Enum): + DOCKER = "docker" + SINGULARITY = "singularity" + + +@attr.s(auto_attribs=True) +class ContainerImage: + name: str + image_type: ContainerImageType + + @attr.s(auto_attribs=True) class AppBundle(xm.Executable): entrypoint_command: str resource_uri: str args: xm.SequentialArgs = attr.Factory(xm.SequentialArgs) env_vars: Dict[str, str] = attr.Factory(dict) - singularity_image: Optional[str] = None - docker_image: Optional[str] = None + container_image: Optional[ContainerImage] = None diff --git a/lxm3/xm_cluster/execution/gridengine.py b/lxm3/xm_cluster/execution/gridengine.py index b2d72b3..864121f 100644 --- a/lxm3/xm_cluster/execution/gridengine.py +++ b/lxm3/xm_cluster/execution/gridengine.py @@ -51,7 +51,11 @@ def _create_job_script_prologue( ) cmds.append("nvidia-smi") - if executable.singularity_image is not None: + if ( + executable.container_image + and executable.container_image.image_type + == executables.ContainerImageType.SINGULARITY + ): cmds.append( 'echo >&2 "INFO[$(basename "$0")]: Singularity version: $(singularity --version)"' ) @@ -82,24 +86,9 @@ class GridEngineHandle: def __init__(self, job_id: str) -> None: self.job_id = job_id - async def wait(self): - raise NotImplementedError() - - async def monitor(self): - raise NotImplementedError() - - -def _sge_job_predicate(job): - if isinstance(job, xm.Job): - return isinstance(job.executor, executors.GridEngine) - elif isinstance(job, array_job.ArrayJob): - return isinstance(job.executor, executors.GridEngine) - else: - raise ValueError(f"Unexpected job type: {type(job)}") - class GridEngineClient: - builder_cls = GridEngineJobScriptBuilder + builder_cls: type[GridEngineJobScriptBuilder] = GridEngineJobScriptBuilder _settings: config_lib.ClusterSettings def __init__( @@ -134,17 +123,15 @@ def launch(self, job_name: str, job: job_script_builder.JobType): console.info( f"Launching {num_jobs} job on {self._settings.hostname} with [cyan bold dim]qsub {job_script_path}[/]" ) - group = self._cluster.launch(job_script_path) + job_id = self._cluster.launch(job_script_path) - handles = [ - GridEngineHandle(job_id) for job_id in gridengine.split_job_ids(group) - ] + handles = [GridEngineHandle(job_id)] - self._save_job_id(job_name, group.group(0)) + self._save_job_id(job_name, job_id) console.info( f"""\ -Successfully launched job [green bold]{group.group(0)}[/] +Successfully launched job [green bold]{job_id}[/] - Saved job id in [dim]{os.path.dirname(job_script_path)}/job_id[/] - Find job logs in [dim]{job_log_dir}""" ) @@ -162,16 +149,17 @@ def client(): return GridEngineClient(settings, artifact_store) -async def launch( - job_name: str, job: job_script_builder.JobType -) -> List[GridEngineHandle]: - if isinstance(job, array_job.ArrayJob): - jobs = [job] # type: ignore - elif isinstance(job, xm.JobGroup): - jobs: List[xm.Job] = xm.job_operators.flatten_jobs(job) +def _sge_job_predicate(job): + if isinstance(job, xm.Job): + return isinstance(job.executor, executors.GridEngine) + elif isinstance(job, array_job.ArrayJob): + return isinstance(job.executor, executors.GridEngine) else: - raise NotImplementedError() + raise ValueError(f"Unexpected job type: {type(job)}") + +async def launch(job_name: str, job) -> List[GridEngineHandle]: + jobs = job_script_builder.flatten_job(job) jobs = [job for job in jobs if _sge_job_predicate(job)] if not jobs: diff --git a/lxm3/xm_cluster/execution/job_script_builder.py b/lxm3/xm_cluster/execution/job_script_builder.py index 2fc1d53..8aea621 100644 --- a/lxm3/xm_cluster/execution/job_script_builder.py +++ b/lxm3/xm_cluster/execution/job_script_builder.py @@ -106,13 +106,15 @@ def _create_entrypoint_commands(self, job: JobType, install_dir: str) -> str: executable = job.executable if not isinstance(executable, executables.AppBundle): raise ValueError("Only Command executable is supported") - executor = cast(executors.SupportsContainer, job.executor) + executor = job.executor + if not isinstance(executor, executors.SupportsContainer): + raise TypeError("Executor should support container configuration") + + if executable.container_image is not None: + image = executable.container_image.name + image_type = executable.container_image.image_type - if executable.singularity_image or executable.docker_image: - if executable.docker_image and executable.singularity_image: - raise ValueError("Only docker or singularity image should be used.") - if executable.singularity_image: - image = executable.singularity_image + if image_type == executables.ContainerImageType.SINGULARITY: get_container_cmd = create_singularity_command singularity_options = ( executor.singularity_options or executors.SingularityOptions() @@ -121,8 +123,7 @@ def _create_entrypoint_commands(self, job: JobType, install_dir: str) -> str: BindMount(src, dst) for src, dst in singularity_options.bind ] runtime_options = [*singularity_options.extra_options] - elif executable.docker_image: - image = executable.docker_image + elif image_type == executables.ContainerImageType.DOCKER: get_container_cmd = create_docker_command docker_options = executor.docker_options or executors.DockerOptions() bind_mounts = [ @@ -136,8 +137,8 @@ def _create_entrypoint_commands(self, job: JobType, install_dir: str) -> str: [ BindMount(install_dir, self.CONTAINER_WORKDIR), BindMount( - f"{install_dir}/{self.JOB_PARAM_NAME}", - f"{self.CONTAINER_JOB_PARAM_PATH}", + os.path.join(install_dir, self.JOB_PARAM_NAME), + self.CONTAINER_JOB_PARAM_PATH, read_only=True, ), ] @@ -153,7 +154,7 @@ def _create_entrypoint_commands(self, job: JobType, install_dir: str) -> str: options=runtime_options, working_dir=self.CONTAINER_WORKDIR, use_gpu=self._is_gpu_requested(executor), - env_file=f"{install_dir}/.environment", + env_file=os.path.join(install_dir, ".environment"), ) else: @@ -402,3 +403,17 @@ def job_script_path(job_name: str): def job_log_path(job_name: str): return os.path.join("logs", job_name) + + +def flatten_job( + job: Union[xm.JobGroup, array_job.ArrayJob], +) -> List[Union[xm.Job, array_job.ArrayJob]]: + if isinstance(job, array_job.ArrayJob): + return [job] # type: ignore + elif isinstance(job, xm.JobGroup): + jobs = xm.job_operators.flatten_jobs(job) + if len(jobs) > 1: + raise NotImplementedError("JobGroup is not supported.") + return jobs # type: ignore + else: + raise NotImplementedError() diff --git a/lxm3/xm_cluster/execution/local.py b/lxm3/xm_cluster/execution/local.py index 790bb25..057dddf 100644 --- a/lxm3/xm_cluster/execution/local.py +++ b/lxm3/xm_cluster/execution/local.py @@ -6,7 +6,7 @@ import re import shutil import subprocess -from typing import List, Optional +from typing import Optional from absl import logging @@ -52,11 +52,6 @@ def _create_job_script_prologue( ) -> str: del executor cmds = ['echo >&2 "INFO[$(basename "$0")]: Running on host $(hostname)"'] - - if executable.singularity_image is not None: - cmds.append( - 'echo >&2 "INFO[$(basename "$0")]: Singularity version: $(singularity --version)"' - ) return "\n".join(cmds) @classmethod @@ -89,17 +84,8 @@ async def monitor(self) -> None: await asyncio.wrap_future(self.future) -def _local_job_predicate(job): - if isinstance(job, xm.Job): - return isinstance(job.executor, executors.Local) - elif isinstance(job, array_job_lib.ArrayJob): - return isinstance(job.executor, executors.Local) - else: - raise ValueError(f"Unexpected job type: {type(job)}") - - class LocalClient: - builder_cls = LocalJobScriptBuilder + builder_cls: type[LocalJobScriptBuilder] = LocalJobScriptBuilder def __init__( self, @@ -157,14 +143,17 @@ def client() -> LocalClient: return LocalClient(local_settings, artifact_store) -async def launch(job_name: str, job: job_script_builder.JobType): - if isinstance(job, array_job_lib.ArrayJob): - jobs = [job] # type: ignore - elif isinstance(job, xm.JobGroup): - jobs: List[xm.Job] = xm.job_operators.flatten_jobs(job) - elif isinstance(job, xm.Job): - jobs = [job] +def _local_job_predicate(job): + if isinstance(job, xm.Job): + return isinstance(job.executor, executors.Local) + elif isinstance(job, array_job_lib.ArrayJob): + return isinstance(job.executor, executors.Local) + else: + raise ValueError(f"Unexpected job type: {type(job)}") + +async def launch(job_name: str, job): + jobs = job_script_builder.flatten_job(job) jobs = [job for job in jobs if _local_job_predicate(job)] if not jobs: diff --git a/lxm3/xm_cluster/execution/slurm.py b/lxm3/xm_cluster/execution/slurm.py index 08f6b10..ce1268f 100644 --- a/lxm3/xm_cluster/execution/slurm.py +++ b/lxm3/xm_cluster/execution/slurm.py @@ -36,12 +36,7 @@ def _create_job_script_prologue(cls, executable, executor: executors.Slurm) -> s cmds.append( 'echo >&2 "INFO[$(basename "$0")]: CUDA_VISIBLE_DEVICES=$CUDA_VISIBLE_DEVICES"' ) - # cmds.append("nvidia-smi") - if executable.singularity_image is not None: - cmds.append( - 'echo >&2 "INFO[$(basename "$0")]: Singularity version: $(singularity --version)"' - ) return "\n".join(cmds) @classmethod @@ -69,24 +64,9 @@ class SlurmHandle: def __init__(self, job_id: str) -> None: self.job_id = job_id - async def wait(self): - raise NotImplementedError() - - async def monitor(self): - raise NotImplementedError() - - -def _slurm_job_predicate(job): - if isinstance(job, xm.Job): - return isinstance(job.executor, executors.Slurm) - elif isinstance(job, array_job.ArrayJob): - return isinstance(job.executor, executors.Slurm) - else: - raise ValueError(f"Unexpected job type: {type(job)}") - class SlurmClient: - builder_cls = SlurmJobScriptBuilder + builder_cls: type[SlurmJobScriptBuilder] = SlurmJobScriptBuilder def __init__( self, @@ -121,11 +101,7 @@ def launch(self, job_name: str, job: job_script_builder.JobType): console.info(f"Successfully launched job {job_id}") self._artifact_store.put_text(str(job_id), f"jobs/{job_name}/job_id") - if num_jobs > 1: - job_ids = [f"{job_id}_{i}" for i in range(num_jobs)] - else: - job_ids = [f"{job_id}"] - handles = [SlurmHandle(j) for j in job_ids] + handles = [SlurmHandle(job_id)] return handles @@ -137,14 +113,17 @@ def client() -> SlurmClient: return SlurmClient(settings, artifact_store) -async def launch(job_name: str, job: job_script_builder.JobType) -> List[SlurmHandle]: - if isinstance(job, array_job.ArrayJob): - jobs = [job] # type: ignore - elif isinstance(job, xm.JobGroup): - jobs: List[xm.Job] = xm.job_operators.flatten_jobs(job) - elif isinstance(job, xm.Job): - jobs = [job] +def _slurm_job_predicate(job): + if isinstance(job, xm.Job): + return isinstance(job.executor, executors.Slurm) + elif isinstance(job, array_job.ArrayJob): + return isinstance(job.executor, executors.Slurm) + else: + raise ValueError(f"Unexpected job type: {type(job)}") + +async def launch(job_name: str, job) -> List[SlurmHandle]: + jobs = job_script_builder.flatten_job(job) jobs = [job for job in jobs if _slurm_job_predicate(job)] if not jobs: diff --git a/lxm3/xm_cluster/executors.py b/lxm3/xm_cluster/executors.py index b9f77ea..268910c 100644 --- a/lxm3/xm_cluster/executors.py +++ b/lxm3/xm_cluster/executors.py @@ -1,4 +1,5 @@ import datetime +import typing from typing import Any, Dict, Optional, Protocol, Sequence, Union import attr @@ -62,6 +63,7 @@ class DockerOptions: extra_options: Sequence[str] = attr.Factory(list) +@typing.runtime_checkable class SupportsContainer(Protocol): singularity_options: Optional[SingularityOptions] docker_options: Optional[DockerOptions] diff --git a/lxm3/xm_cluster/packaging/archive_builder.py b/lxm3/xm_cluster/packaging/build_archive.py similarity index 90% rename from lxm3/xm_cluster/packaging/archive_builder.py rename to lxm3/xm_cluster/packaging/build_archive.py index 1fb7c99..d43f62a 100644 --- a/lxm3/xm_cluster/packaging/archive_builder.py +++ b/lxm3/xm_cluster/packaging/build_archive.py @@ -7,7 +7,7 @@ import tempfile from lxm3.xm_cluster import console -from lxm3.xm_cluster import executable_specs as cluster_executable_specs +from lxm3.xm_cluster import executable_specs ENTRYPOINT_SCRIPT = "./entrypoint.sh" @@ -16,10 +16,10 @@ class PackagingError(Exception): """Error raised when packaging fails.""" -def _create_entrypoint_cmds(python_package: cluster_executable_specs.PythonPackage): - if isinstance(python_package.entrypoint, cluster_executable_specs.ModuleName): +def _create_entrypoint_cmds(python_package: executable_specs.PythonPackage) -> str: + if isinstance(python_package.entrypoint, executable_specs.ModuleName): cmds = ["python3 -m {}".format(python_package.entrypoint.module_name)] - elif isinstance(python_package.entrypoint, cluster_executable_specs.CommandList): + elif isinstance(python_package.entrypoint, executable_specs.CommandList): cmds = python_package.entrypoint.commands else: raise ValueError("Unexpected entrypoint: {}".format(python_package.entrypoint)) @@ -31,8 +31,8 @@ def _create_entrypoint_cmds(python_package: cluster_executable_specs.PythonPacka def create_python_archive( - staging_directory: str, py_package: cluster_executable_specs.PythonPackage -): + staging_directory: str, py_package: executable_specs.PythonPackage +) -> str: package_name = py_package.name version = datetime.datetime.now().strftime("%Y%m%d.%H%M%S") archive_name = f"{package_name}-{version}" @@ -109,8 +109,8 @@ def create_python_archive( def create_universal_archive( - staging_directory: str, universal_package: cluster_executable_specs.UniversalPackage -): + staging_directory: str, universal_package: executable_specs.UniversalPackage +) -> str: version = datetime.datetime.now().strftime("%Y%m%d.%H%M%S") archive_name = f"{universal_package.name}-{version}" diff --git a/lxm3/xm_cluster/packaging/cluster.py b/lxm3/xm_cluster/packaging/cluster.py deleted file mode 100644 index b9d7dda..0000000 --- a/lxm3/xm_cluster/packaging/cluster.py +++ /dev/null @@ -1,297 +0,0 @@ -import contextlib -import os -import shutil -import subprocess -import tempfile -from typing import Any - -import appdirs -import fsspec -import fsspec.implementations -import fsspec.implementations.local -import rich.progress - -from lxm3 import singularity -from lxm3 import xm -from lxm3._vendor.xmanager.xm import pattern_matching -from lxm3.docker import build_image -from lxm3.experimental import image_cache -from lxm3.xm_cluster import artifacts -from lxm3.xm_cluster import console -from lxm3.xm_cluster import executable_specs as cluster_executable_specs -from lxm3.xm_cluster import executables as cluster_executables -from lxm3.xm_cluster.packaging import archive_builder - -_IMAGE_CACHE_DIR = os.path.join(appdirs.user_cache_dir("lxm3"), "image_cache") - - -def singularity_image_path(image_name: str): - return os.path.join("containers", image_name) - - -def archive_path(archive_name: str): - return os.path.join("archives", archive_name) - - -def _transfer_file_with_progress( - artifact_store: artifacts.ArtifactStore, - lpath: str, - rpath: str, -) -> str: - should_update, reason = artifact_store.should_update(lpath, rpath) - - basename = os.path.basename(lpath) - if should_update: - console.info(f"Transferring {basename} ({reason})") - with ( - rich.progress.Progress( - rich.progress.TextColumn("[progress.description]{task.description}"), - rich.progress.BarColumn(), - rich.progress.TaskProgressColumn(), - rich.progress.TimeRemainingColumn(), - rich.progress.TransferSpeedColumn(), - console=console.console, - ) as progress, - progress.open(lpath, mode="rb", description=os.path.basename(lpath)) as fin, - ): - put_path = artifact_store.put_fileobj(fin, rpath) - return put_path - else: - console.info(f"Skipped {basename}") - return artifact_store.get_file_info(rpath).path - - -def _package_python_package( - py_package: cluster_executable_specs.PythonPackage, - packageable: xm.Packageable, - artifact_store: artifacts.ArtifactStore, -): - with tempfile.TemporaryDirectory() as staging: - archive_name = archive_builder.create_python_archive(staging, py_package) - local_archive_path = os.path.join(staging, archive_name) - entrypoint_cmd = archive_builder.ENTRYPOINT_SCRIPT - push_archive_name = os.path.basename(local_archive_path) - deployed_archive_path = _transfer_file_with_progress( - artifact_store, local_archive_path, archive_path(push_archive_name) - ) - - return cluster_executables.AppBundle( - entrypoint_command=entrypoint_cmd, - resource_uri=deployed_archive_path, - name=py_package.name, - args=packageable.args, - env_vars=packageable.env_vars, - ) - - -@contextlib.contextmanager -def _chdir(directory): - cwd = os.getcwd() - os.chdir(directory) - yield - os.chdir(cwd) - - -def _package_pex_binary( - spec: cluster_executable_specs.PexBinary, - packageable: xm.Packageable, - artifact_store: artifacts.ArtifactStore, -): - pex_executable = shutil.which("pex") - pex_name = f"{spec.name}.pex" - - assert pex_executable is not None, "pex executable not found" - with tempfile.TemporaryDirectory() as staging: - install_dir = os.path.join(staging, "install") - pex_path = os.path.join(install_dir, pex_name) - with _chdir(spec.path): - pex_options = [] - for pkg in spec.packages: - pex_options.extend(["--package", pkg]) - for pkg in spec.modules: - pex_options.extend(["--module", pkg]) - pex_options.extend(["--inherit-path=fallback"]) - pex_options.extend(["--entry-point", spec.entrypoint.module_name]) - pex_options.extend(["--runtime-pex-root=./.pex"]) - with console.status(f"Creating pex {pex_name}"): - pex_cmd = [pex_executable, "-o", pex_path, *pex_options] - console.info(f"Running pex command: {' '.join(pex_cmd)}") - subprocess.run(pex_cmd, check=True) - - # Add resources to the archive - for resource in spec.dependencies: - for src, dst in resource.files: - target_file = os.path.join(install_dir, dst) - if not os.path.exists(os.path.dirname(target_file)): - os.makedirs(os.path.dirname(target_file)) - if not os.path.exists(target_file): - shutil.copy(src, target_file) - else: - raise ValueError( - "Additional resource overwrites existing file: %s", src - ) - - local_archive_path = shutil.make_archive( - os.path.join(staging, spec.name), "zip", install_dir - ) - push_archive_name = os.path.basename(local_archive_path) - deployed_archive_path = _transfer_file_with_progress( - artifact_store, local_archive_path, archive_path(push_archive_name) - ) - - return cluster_executables.AppBundle( - entrypoint_command=f"./{pex_name}", - resource_uri=deployed_archive_path, - name=spec.name, - args=packageable.args, - env_vars=packageable.env_vars, - ) - - -def _package_universal_package( - universal_package: cluster_executable_specs.UniversalPackage, - packageable: xm.Packageable, - artifact_store: artifacts.ArtifactStore, -): - with tempfile.TemporaryDirectory() as staging: - archive_name = archive_builder.create_universal_archive( - staging, universal_package - ) - local_archive_path = os.path.join(staging, os.path.basename(archive_name)) - push_archive_name = os.path.basename(local_archive_path) - deployed_archive_path = _transfer_file_with_progress( - artifact_store, local_archive_path, archive_path(push_archive_name) - ) - - return cluster_executables.AppBundle( - entrypoint_command=" ".join(universal_package.entrypoint), - resource_uri=deployed_archive_path, - name=universal_package.name, - args=packageable.args, - env_vars=packageable.env_vars, - ) - - -def _package_pdm_project( - pdm_project: cluster_executable_specs.PDMProject, - packageable: xm.Packageable, - artifact_store: artifacts.ArtifactStore, -): - py_package = cluster_executable_specs.PythonPackage( - pdm_project.entrypoint, - path=pdm_project.path, - ) - dockerfile = build_image.pdm_dockerfile( - pdm_project.base_image, pdm_project.lock_file - ) - build_image.build_image_by_dockerfile_content( - py_package.name, dockerfile, py_package.path - ) - - singularity_image = "docker-daemon://{}:latest".format(py_package.name) - spec = cluster_executable_specs.SingularityContainer(py_package, singularity_image) - return _package_singularity_container(spec, packageable, artifact_store) - - -def _package_python_container( - python_container: cluster_executable_specs.PythonContainer, - packageable: xm.Packageable, - artifact_store: artifacts.ArtifactStore, -): - py_package = cluster_executable_specs.PythonPackage( - python_container.entrypoint, path=python_container.path - ) - dockerfile = build_image.python_container_dockerfile( - base_image=python_container.base_image, - requirements=python_container.requirements, - ) - build_image.build_image_by_dockerfile_content( - py_package.name, dockerfile, py_package.path - ) - singularity_image = "docker-daemon://{}:latest".format(py_package.name) - spec = cluster_executable_specs.SingularityContainer(py_package, singularity_image) - return _package_singularity_container(spec, packageable, artifact_store) - - -def _package_singularity_container( - container: cluster_executable_specs.SingularityContainer, - packageable: xm.Packageable, - artifact_store: artifacts.ArtifactStore, -): - executable = _PACKAGING_ROUTER(container.entrypoint, packageable, artifact_store) - - singularity_image = container.image_path - - transport, _ = singularity.uri.split(singularity_image) - # TODO(yl): Add support for other transports. - # TODO(yl): think about keeping multiple versions of the container in the storage. - if not transport: - push_image_name = os.path.basename(singularity_image) - if isinstance( - artifact_store.filesystem, fsspec.implementations.local.LocalFileSystem - ): - # Do not copy SIF image if executor is local - deploy_container_path = os.path.realpath(singularity_image) - else: - deploy_container_path = _transfer_file_with_progress( - artifact_store, - singularity_image, - singularity_image_path(push_image_name), - ) - elif transport == "docker-daemon": - cache_image_info = image_cache.get_cached_image( - singularity_image, cache_dir=_IMAGE_CACHE_DIR - ) - push_image_name = singularity.uri.filename(singularity_image, "sif") - deploy_container_path = _transfer_file_with_progress( - artifact_store, - cache_image_info.blob_path, - singularity_image_path(push_image_name), - ) - else: - # For other transports, just use the image as is for now. - # TODO(yl): Consider adding support for specifying pulling behavior. - deploy_container_path = singularity_image - - executable.singularity_image = deploy_container_path - return executable - - -def _package_docker_container( - container: cluster_executable_specs.DockerContainer, - packageable: xm.Packageable, - artifact_store: artifacts.ArtifactStore, -): - executable = _PACKAGING_ROUTER(container.entrypoint, packageable, artifact_store) - docker_image = container.image - executable.docker_image = docker_image - return executable - - -def _throw_on_unknown_executable( - executable: Any, - packageable: xm.Packageable, - artifact_store: artifacts.ArtifactStore, -): - del artifact_store - raise TypeError( - f"Unsupported executable specification: {executable!r}. " - f"Packageable: {packageable!r}" - ) - - -_PACKAGING_ROUTER = pattern_matching.match( - _package_python_package, - _package_pex_binary, - _package_universal_package, - _package_pdm_project, - _package_python_container, - _package_singularity_container, - _package_docker_container, - _throw_on_unknown_executable, -) - - -def package_for_cluster_executor(packageable: xm.Packageable): - artifact_store = artifacts.get_cluster_artifact_store() - return _PACKAGING_ROUTER(packageable.executable_spec, packageable, artifact_store) diff --git a/lxm3/xm_cluster/packaging/local.py b/lxm3/xm_cluster/packaging/local.py deleted file mode 100644 index 2f1d9a3..0000000 --- a/lxm3/xm_cluster/packaging/local.py +++ /dev/null @@ -1,10 +0,0 @@ -from lxm3 import xm -from lxm3.xm_cluster import artifacts -from lxm3.xm_cluster.packaging import cluster - - -def package_for_local_executor(packageable: xm.Packageable): - artifact_store = artifacts.get_local_artifact_store() - return cluster._PACKAGING_ROUTER( - packageable.executable_spec, packageable, artifact_store - ) diff --git a/lxm3/xm_cluster/packaging/router.py b/lxm3/xm_cluster/packaging/router.py index 0996217..2105e53 100644 --- a/lxm3/xm_cluster/packaging/router.py +++ b/lxm3/xm_cluster/packaging/router.py @@ -1,23 +1,322 @@ -from typing import Sequence +import contextlib +import os +import shutil +import subprocess +import tempfile +from typing import Any, Sequence +import appdirs +import fsspec +import fsspec.implementations +import fsspec.implementations.local +import rich.progress + +from lxm3 import singularity from lxm3 import xm from lxm3._vendor.xmanager.xm import pattern_matching +from lxm3.docker import build_image +from lxm3.singularity import image_cache +from lxm3.xm_cluster import artifacts +from lxm3.xm_cluster import console +from lxm3.xm_cluster import executable_specs as cluster_executable_specs +from lxm3.xm_cluster import executables as cluster_executables from lxm3.xm_cluster import executors -from lxm3.xm_cluster.packaging import cluster as cluster_packaging -from lxm3.xm_cluster.packaging import local as local_packaging +from lxm3.xm_cluster.packaging import build_archive + +_IMAGE_CACHE_DIR = os.path.join(appdirs.user_cache_dir("lxm3"), "image_cache") + + +def singularity_image_path(image_name: str): + return os.path.join("containers", image_name) + + +def archive_path(archive_name: str): + return os.path.join("archives", archive_name) + +def _transfer_file_with_progress( + artifact_store: artifacts.ArtifactStore, + lpath: str, + rpath: str, +) -> str: + should_update, reason = artifact_store.should_update(lpath, rpath) -def packaging_router( + basename = os.path.basename(lpath) + if should_update: + console.info(f"Transferring {basename} ({reason})") + with ( + rich.progress.Progress( + rich.progress.TextColumn("[progress.description]{task.description}"), + rich.progress.BarColumn(), + rich.progress.TaskProgressColumn(), + rich.progress.TimeRemainingColumn(), + rich.progress.TransferSpeedColumn(), + console=console.console, + ) as progress, + progress.open(lpath, mode="rb", description=os.path.basename(lpath)) as fin, + ): + put_path = artifact_store.put_fileobj(fin, rpath) + return put_path + else: + console.info(f"Skipped {basename}") + return artifact_store.get_file_info(rpath).path + + +def _package_python_package( + py_package: cluster_executable_specs.PythonPackage, packageable: xm.Packageable, + artifact_store: artifacts.ArtifactStore, ): + with tempfile.TemporaryDirectory() as staging: + archive_name = build_archive.create_python_archive(staging, py_package) + local_archive_path = os.path.join(staging, archive_name) + entrypoint_cmd = build_archive.ENTRYPOINT_SCRIPT + push_archive_name = os.path.basename(local_archive_path) + deployed_archive_path = _transfer_file_with_progress( + artifact_store, local_archive_path, archive_path(push_archive_name) + ) + + return cluster_executables.AppBundle( + entrypoint_command=entrypoint_cmd, + resource_uri=deployed_archive_path, + name=py_package.name, + args=packageable.args, + env_vars=packageable.env_vars, + ) + + +@contextlib.contextmanager +def _chdir(directory): + cwd = os.getcwd() + os.chdir(directory) + yield + os.chdir(cwd) + + +def _package_pex_binary( + spec: cluster_executable_specs.PexBinary, + packageable: xm.Packageable, + artifact_store: artifacts.ArtifactStore, +): + pex_executable = shutil.which("pex") + pex_name = f"{spec.name}.pex" + + assert pex_executable is not None, "pex executable not found" + with tempfile.TemporaryDirectory() as staging: + install_dir = os.path.join(staging, "install") + pex_path = os.path.join(install_dir, pex_name) + with _chdir(spec.path): + pex_options = [] + for pkg in spec.packages: + pex_options.extend(["--package", pkg]) + for pkg in spec.modules: + pex_options.extend(["--module", pkg]) + pex_options.extend(["--inherit-path=fallback"]) + pex_options.extend(["--entry-point", spec.entrypoint.module_name]) + pex_options.extend(["--runtime-pex-root=./.pex"]) + with console.status(f"Creating pex {pex_name}"): + pex_cmd = [pex_executable, "-o", pex_path, *pex_options] + console.info(f"Running pex command: {' '.join(pex_cmd)}") + subprocess.run(pex_cmd, check=True) + + # Add resources to the archive + for resource in spec.dependencies: + for src, dst in resource.files: + target_file = os.path.join(install_dir, dst) + if not os.path.exists(os.path.dirname(target_file)): + os.makedirs(os.path.dirname(target_file)) + if not os.path.exists(target_file): + shutil.copy(src, target_file) + else: + raise ValueError( + "Additional resource overwrites existing file: %s", src + ) + + local_archive_path = shutil.make_archive( + os.path.join(staging, spec.name), "zip", install_dir + ) + push_archive_name = os.path.basename(local_archive_path) + deployed_archive_path = _transfer_file_with_progress( + artifact_store, local_archive_path, archive_path(push_archive_name) + ) + + return cluster_executables.AppBundle( + entrypoint_command=f"./{pex_name}", + resource_uri=deployed_archive_path, + name=spec.name, + args=packageable.args, + env_vars=packageable.env_vars, + ) + + +def _package_universal_package( + universal_package: cluster_executable_specs.UniversalPackage, + packageable: xm.Packageable, + artifact_store: artifacts.ArtifactStore, +): + with tempfile.TemporaryDirectory() as staging: + archive_name = build_archive.create_universal_archive( + staging, universal_package + ) + local_archive_path = os.path.join(staging, os.path.basename(archive_name)) + push_archive_name = os.path.basename(local_archive_path) + deployed_archive_path = _transfer_file_with_progress( + artifact_store, local_archive_path, archive_path(push_archive_name) + ) + + return cluster_executables.AppBundle( + entrypoint_command=" ".join(universal_package.entrypoint), + resource_uri=deployed_archive_path, + name=universal_package.name, + args=packageable.args, + env_vars=packageable.env_vars, + ) + + +def _package_pdm_project( + pdm_project: cluster_executable_specs.PDMProject, + packageable: xm.Packageable, + artifact_store: artifacts.ArtifactStore, +): + py_package = cluster_executable_specs.PythonPackage( + pdm_project.entrypoint, + path=pdm_project.path, + ) + dockerfile = build_image.pdm_dockerfile( + pdm_project.base_image, pdm_project.lock_file + ) + build_image.build_image_by_dockerfile_content( + py_package.name, dockerfile, py_package.path + ) + + singularity_image = "docker-daemon://{}:latest".format(py_package.name) + spec = cluster_executable_specs.SingularityContainer(py_package, singularity_image) + return _package_singularity_container(spec, packageable, artifact_store) + + +def _package_python_container( + python_container: cluster_executable_specs.PythonContainer, + packageable: xm.Packageable, + artifact_store: artifacts.ArtifactStore, +): + py_package = cluster_executable_specs.PythonPackage( + python_container.entrypoint, path=python_container.path + ) + dockerfile = build_image.python_container_dockerfile( + base_image=python_container.base_image, + requirements=python_container.requirements, + ) + build_image.build_image_by_dockerfile_content( + py_package.name, dockerfile, py_package.path + ) + singularity_image = "docker-daemon://{}:latest".format(py_package.name) + spec = cluster_executable_specs.SingularityContainer(py_package, singularity_image) + return _package_singularity_container(spec, packageable, artifact_store) + + +def _package_singularity_container( + container: cluster_executable_specs.SingularityContainer, + packageable: xm.Packageable, + artifact_store: artifacts.ArtifactStore, +): + executable = _PACKAGING_ROUTER(container.entrypoint, packageable, artifact_store) + + singularity_image = container.image_path + + transport, _ = singularity.uri.split(singularity_image) + # TODO(yl): Add support for other transports. + # TODO(yl): think about keeping multiple versions of the container in the storage. + if not transport: + push_image_name = os.path.basename(singularity_image) + if isinstance( + artifact_store.filesystem, fsspec.implementations.local.LocalFileSystem + ): + # Do not copy SIF image if executor is local + deploy_container_path = os.path.realpath(singularity_image) + else: + deploy_container_path = _transfer_file_with_progress( + artifact_store, + singularity_image, + singularity_image_path(push_image_name), + ) + elif transport == "docker-daemon": + cache_image_info = image_cache.get_cached_image( + singularity_image, cache_dir=_IMAGE_CACHE_DIR + ) + push_image_name = singularity.uri.filename(singularity_image, "sif") + deploy_container_path = _transfer_file_with_progress( + artifact_store, + cache_image_info.blob_path, + singularity_image_path(push_image_name), + ) + else: + # For other transports, just use the image as is for now. + # TODO(yl): Consider adding support for specifying pulling behavior. + deploy_container_path = singularity_image + + executable.container_image = cluster_executables.ContainerImage( + name=deploy_container_path, + image_type=cluster_executables.ContainerImageType.SINGULARITY, + ) + return executable + + +def _package_docker_container( + container: cluster_executable_specs.DockerContainer, + packageable: xm.Packageable, + artifact_store: artifacts.ArtifactStore, +): + executable = _PACKAGING_ROUTER(container.entrypoint, packageable, artifact_store) + docker_image = container.image + executable.container_image = cluster_executables.ContainerImage( + name=docker_image, + image_type=cluster_executables.ContainerImageType.DOCKER, + ) + return executable + + +def _throw_on_unknown_executable( + executable: Any, + packageable: xm.Packageable, + artifact_store: artifacts.ArtifactStore, +): + del artifact_store + raise TypeError( + f"Unsupported executable specification: {executable!r}. " + f"Packageable: {packageable!r}" + ) + + +_PACKAGING_ROUTER = pattern_matching.match( + _package_python_package, + _package_pex_binary, + _package_universal_package, + _package_pdm_project, + _package_python_container, + _package_singularity_container, + _package_docker_container, + _throw_on_unknown_executable, +) + + +def packaging_router(packageable: xm.Packageable): def package_local(executable_spec: executors.LocalSpec): - return local_packaging.package_for_local_executor(packageable) + artifact_store = artifacts.get_local_artifact_store() + return _PACKAGING_ROUTER( + packageable.executable_spec, packageable, artifact_store + ) def package_gridengine(executable_spec: executors.GridEngineSpec): - return cluster_packaging.package_for_cluster_executor(packageable) + artifact_store = artifacts.get_cluster_artifact_store() + return _PACKAGING_ROUTER( + packageable.executable_spec, packageable, artifact_store + ) def package_slurm(executable_spec: executors.SlurmSpec): - return cluster_packaging.package_for_cluster_executor(packageable) + artifact_store = artifacts.get_cluster_artifact_store() + return _PACKAGING_ROUTER( + packageable.executable_spec, packageable, artifact_store + ) return pattern_matching.match( package_local, diff --git a/pyproject.toml b/pyproject.toml index b5a5b18..6590d0b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -83,6 +83,7 @@ cov = {composite = ["test-cov", "coverage report"]} docs = "sphinx-build -b html docs docs/build/html -j auto" lint.shell = "ruff check ." fmt.shell = "ruff format ." +typecheck.shell = "pyright" [tool.ruff] exclude = ["_vendor", "xm", ".venv"] @@ -114,7 +115,6 @@ omit = [ "lxm3/_vendor/*", "tests/*", "conftest.py", - "lxm3/xm_cluster/execution/utils.py" # not used ] [tool.coverage.report] diff --git a/lxm3/experimental/__init__.py b/tests/__init__.py similarity index 100% rename from lxm3/experimental/__init__.py rename to tests/__init__.py diff --git a/tests/clusters/gridengine_test.py b/tests/clusters/gridengine_test.py index dd4d708..b75c012 100644 --- a/tests/clusters/gridengine_test.py +++ b/tests/clusters/gridengine_test.py @@ -133,8 +133,8 @@ def test_cluster(self, mock_connection): stdout='Your job 9830196 ("MyTESTJOBNAME") has been submitted', ) cluster = gridengine.GridEngineCluster(hostname="host", username="user") - match = cluster.launch("job.qsub") - self.assertEqual(match.group(0), "9830196") + job_id = cluster.launch("job.qsub") + self.assertEqual(job_id, "9830196") cluster.close() diff --git a/tests/clusters/slurm_test.py b/tests/clusters/slurm_test.py index ed16795..d3020ea 100644 --- a/tests/clusters/slurm_test.py +++ b/tests/clusters/slurm_test.py @@ -10,11 +10,7 @@ class SlurmTest(parameterized.TestCase): @parameterized.named_parameters( [ - { - "testcase_name": "job", - "text": "Submitted batch job 6", # noqa - "expected": 6, - }, + {"testcase_name": "job", "text": "Submitted batch job 6", "expected": 6}, ] ) def test_parse_job_id(self, text, expected): @@ -36,7 +32,7 @@ def test_cluster(self, mock_connection): ) cluster = slurm.SlurmCluster(hostname="host", username="user") job_id = cluster.launch("job.sbatch") - self.assertEqual(job_id, 6) + self.assertEqual(job_id, "6") cluster.close() diff --git a/tests/execution_test.py b/tests/execution_test.py index 3a3a109..2305743 100644 --- a/tests/execution_test.py +++ b/tests/execution_test.py @@ -1,7 +1,5 @@ import os -import shutil import subprocess -import sys import textwrap import unittest import zipfile @@ -26,14 +24,19 @@ from lxm3.xm_cluster.execution import job_script_builder as job_script from lxm3.xm_cluster.execution import local from lxm3.xm_cluster.execution import slurm +from tests import utils -def is_singularity_installed(): - return shutil.which("singularity") is not None +def _create_singularity_image(name): + return executables.ContainerImage( + name=name, image_type=executables.ContainerImageType.SINGULARITY + ) -def is_docker_installed(): - return shutil.which("docker") is not None +def _create_docker_image(name): + return executables.ContainerImage( + name=name, image_type=executables.ContainerImageType.DOCKER + ) class JobScriptBuilderTest(parameterized.TestCase): @@ -92,6 +95,19 @@ def test_empty_args(self): ), ) + def test_ml_collections_quoting(self): + args = xm.SequentialArgs.from_collection({"config.name": "train[:90%]"}) + args = args.to_list() + self.assertEqual( + job_script._create_args([args], "LXM_TASK_ID", 0), + textwrap.dedent( + """\ + TASK_CMD_ARGS_0="--config.name='train[:90%]'" + TASK_CMD_ARGS=$(eval echo \\$"TASK_CMD_ARGS_$LXM_TASK_ID") + eval set -- $TASK_CMD_ARGS""", + ), + ) + class LocalExecutionTest(parameterized.TestCase): @parameterized.named_parameters( @@ -117,7 +133,7 @@ def test_local_launch(self): name="test", entrypoint_command="echo hello", resource_uri=archive.full_path, - singularity_image=container.full_path, + container_image=_create_singularity_image(container.full_path), ) executor = executors.Local() job = xm.Job(executable, executor, name="test") @@ -149,18 +165,22 @@ def _run_job_script(self, job_script_content, env=None): raise return process - def test_job_script_run_single_job(self): + def _create_bundle(self, entrypoint: str, container_image=None): tmpf = self.create_tempfile("test.zip") with zipfile.ZipFile(tmpf.full_path, "w") as z: info = zipfile.ZipInfo("entrypoint.sh") info.external_attr = 0o777 << 16 # give full access to included file - z.writestr( - info, - "#!/usr/bin/env bash\necho $@ $FOO", - ) - executable = xm_cluster.AppBundle( - "foo", "./entrypoint.sh", resource_uri=tmpf.full_path + z.writestr(info, entrypoint) + + return xm_cluster.AppBundle( + "foo", + "./entrypoint.sh", + resource_uri=tmpf.full_path, + container_image=container_image, ) + + def test_job_script_run_single_job(self): + executable = self._create_bundle("#!/usr/bin/env bash\necho $@ $FOO") job = xm.Job( executable, executor=xm_cluster.Local(), @@ -173,17 +193,7 @@ def test_job_script_run_single_job(self): self.assertEqual(process.stdout.decode("utf-8").strip(), "--seed=1 FOO_0") def test_job_script_run_array_job(self): - tmpf = self.create_tempfile("test.zip") - with zipfile.ZipFile(tmpf.full_path, "w") as z: - info = zipfile.ZipInfo("entrypoint.sh") - info.external_attr = 0o777 << 16 # give full access to included file - z.writestr( - info, - "#!/usr/bin/env bash\necho $@ $FOO", - ) - executable = xm_cluster.AppBundle( - "foo", "./entrypoint.sh", resource_uri=tmpf.full_path - ) + executable = self._create_bundle("#!/usr/bin/env bash\necho $@ $FOO") job = xm_cluster.ArrayJob( executable, executor=xm_cluster.Local(), @@ -201,68 +211,14 @@ def test_job_script_run_array_job(self): ) self.assertEqual(process.stdout.decode("utf-8").strip(), "--seed=2 FOO_1") - def test_job_script_handles_ml_collections_quoting(self): - tmpf = self.create_tempfile("test.zip") - with zipfile.ZipFile(tmpf.full_path, "w") as z: - info = zipfile.ZipInfo("entrypoint.sh") - info.external_attr = 0o777 << 16 # give full access to included file - z.writestr( - "run.py", - textwrap.dedent("""\ - from absl import app - from ml_collections import config_dict - from ml_collections import config_flags - - def _get_config(): - config = config_dict.ConfigDict() - config.name = "" - return config - - _CONFIG = config_flags.DEFINE_config_dict("config", _get_config()) - - def main(_): - config = _CONFIG.value - print(config.name) - - if __name__ == "__main__": - config = _get_config() - app.run(main)"""), - ) - z.writestr( - info, - "#!/usr/bin/env bash\n{} run.py $@\n".format(sys.executable), - ) - executable = xm_cluster.AppBundle( - "foo", "./entrypoint.sh", resource_uri=tmpf.full_path - ) - job = xm_cluster.ArrayJob( - executable, - executor=xm_cluster.Local(), - args=[{"config.name": "train[:90%]"}], - env_vars=[{"FOO": "FOO_0"}], - ) - builder = local.LocalJobScriptBuilder() - job_script_content = builder.build(job, "foo", "/tmp") - process = self._run_job_script(job_script_content, env={"LOCAL_TASK_ID": "1"}) - self.assertEqual(process.stdout.decode("utf-8").strip(), "train[:90%]") - @pytest.mark.integration - @unittest.skipIf(not is_singularity_installed(), "Singularity is not installed") + @unittest.skipIf( + not utils.is_singularity_installed(), "Singularity is not installed" + ) def test_singularity(self): - tmpf = self.create_tempfile("test.zip") - with zipfile.ZipFile(tmpf.full_path, "w") as z: - info = zipfile.ZipInfo("entrypoint.sh") - info.external_attr = 0o777 << 16 # give full access to included file - z.writestr( - info, - "#!/usr/bin/env bash\necho $FOO", - ) - - executable = xm_cluster.AppBundle( - "test", - "./entrypoint.sh", - resource_uri=tmpf.full_path, - singularity_image="docker://python:3.10-slim", + executable = self._create_bundle( + "#!/usr/bin/env bash\necho $FOO", + container_image=_create_singularity_image("docker://python:3.10-slim"), ) job = xm_cluster.ArrayJob( executable, @@ -280,22 +236,11 @@ def test_singularity(self): self.assertEqual(process.stdout.decode("utf-8").strip(), "FOO_0") @pytest.mark.integration - @unittest.skipIf(not is_docker_installed(), "Docker is not installed") + @unittest.skipIf(not utils.is_docker_installed(), "Docker is not installed") def test_docker_image(self): - tmpf = self.create_tempfile("test.zip") - with zipfile.ZipFile(tmpf.full_path, "w") as z: - info = zipfile.ZipInfo("entrypoint.sh") - info.external_attr = 0o777 << 16 # give full access to included file - z.writestr( - info, - "#!/usr/bin/env bash\necho $FOO\n", - ) - - executable = xm_cluster.AppBundle( - "test", - "./entrypoint.sh", - resource_uri=tmpf.full_path, - docker_image="python:3.10-slim", + executable = self._create_bundle( + "#!/usr/bin/env bash\necho $FOO", + container_image=_create_docker_image("python:3.10-slim"), ) job = xm_cluster.ArrayJob( executable, executor=xm_cluster.Local(), env_vars=[{"FOO": "FOO_0"}] @@ -346,7 +291,7 @@ def test_setup_cmds(self): name="test", entrypoint_command="echo hello", resource_uri="", - singularity_image="docker://python:3.10-slim", + container_image=_create_singularity_image("docker://python:3.10-slim"), ) executor = executors.GridEngine(modules=["module1"]) with patch.object( @@ -376,7 +321,7 @@ def test_launch(self): name="test", entrypoint_command="echo hello", resource_uri=archive.full_path, - singularity_image=container.full_path, + container_image=_create_singularity_image(container.full_path), ) executor = executors.GridEngine() job = xm.Job(executable, executor, name="test") @@ -388,7 +333,7 @@ def test_launch(self): with mock.patch.object( gridengine_cluster.GridEngineCluster, "launch" ) as mock_launch: - mock_launch.return_value = gridengine_cluster.parse_job_id("1") + mock_launch.return_value = "1" client.launch("test_job", job) @@ -417,7 +362,7 @@ def test_setup_cmds(self): name="test", entrypoint_command="echo hello", resource_uri="", - singularity_image="docker://python:3.10-slim", + container_image=_create_singularity_image("docker://python:3.10-slim"), ) executor = executors.Slurm(modules=["module1"]) with patch.object( @@ -429,7 +374,6 @@ def test_setup_cmds(self): executable, executor ) self.assertIn("module load module1", setup_cmds) - self.assertIn("singularity --version", setup_cmds) def test_slurm_launch(self): staging_dir = self.create_tempdir(name="staging") @@ -444,7 +388,9 @@ def test_slurm_launch(self): name="test", entrypoint_command="echo hello", resource_uri=archive.full_path, - singularity_image=container.full_path, + container_image=_create_singularity_image( + container.full_path, + ), ) executor = executors.Slurm() job = xm.Job(executable, executor, name="test") diff --git a/tests/main.py b/tests/main.py deleted file mode 100644 index aaf62d1..0000000 --- a/tests/main.py +++ /dev/null @@ -1,6 +0,0 @@ -import os -import sys - -print(os.getenv("a")) -print(os.getenv("b")) -print(sys.argv[1:]) diff --git a/tests/packaging_test.py b/tests/packaging_test.py index 71b0b06..47deb96 100644 --- a/tests/packaging_test.py +++ b/tests/packaging_test.py @@ -11,7 +11,7 @@ from lxm3 import xm from lxm3 import xm_cluster from lxm3.xm_cluster import artifacts -from lxm3.xm_cluster.packaging import cluster +from lxm3.xm_cluster.packaging import router _HERE = os.path.abspath(os.path.dirname(__file__)) @@ -22,7 +22,7 @@ def _create_artifact_store(staging, project): class PackagingTest(parameterized.TestCase): @parameterized.parameters( - (cluster._package_python_package,), + (router._package_python_package,), ) def test_package_python(self, pkg_fun): spec = xm_cluster.PythonPackage( @@ -54,7 +54,7 @@ def test_package_default_pip_args(self): ) @parameterized.parameters( - (cluster._package_universal_package,), + (router._package_universal_package,), ) @absltest.skipIf("darwin" in sys.platform, "Not working on MacOS") def test_package_universal(self, pkg_fun): diff --git a/tests/utils.py b/tests/utils.py new file mode 100644 index 0000000..1803279 --- /dev/null +++ b/tests/utils.py @@ -0,0 +1,9 @@ +import shutil + + +def is_singularity_installed(): + return shutil.which("singularity") is not None + + +def is_docker_installed(): + return shutil.which("docker") is not None