Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move executable deployment to packaging phase #12

Merged
merged 2 commits into from
Oct 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions lxm3/contrib/ucl.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,12 +117,13 @@ def UclGridEngine(
"myriad": _myriad_executor_fn,
"cs": _cs_executor_fn,
}
location = requirements.location or config.default().default_cluster()
if location not in executor_fns:
if requirements.location is not None:
raise ValueError("location is not supported requirements")
cluster = config.default().default_cluster()
if cluster not in executor_fns:
raise ValueError(
f"Unsupported location {location} for UCL GridEngine. Supported locations: "
f"Unsupported location {cluster} for UCL GridEngine. Supported locations: "
f"{list(executor_fns.keys())}"
)
executor = executor_fns[location](requirements, walltime=walltime, **kwargs)
executor.requirements.location = location
executor = executor_fns[cluster](requirements, walltime=walltime, **kwargs)
return executor
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import abc
import datetime
import logging
import os
from typing import Any, Mapping, Optional

Expand All @@ -10,6 +11,9 @@

from lxm3.xm_cluster.console import console

# Disable verbose logging from paramiko
logging.getLogger("paramiko").setLevel(logging.WARNING)


class Artifact(abc.ABC):
def __init__(
Expand All @@ -34,9 +38,6 @@ def job_path(self, job_name: str):
def job_script_path(self, job_name: str):
return os.path.join(self.job_path(job_name), "job.sh")

def job_array_wrapper_path(self, job_name: str):
return os.path.join(self.job_path(job_name), "array_wrapper.sh")

def singularity_image_path(self, image_name: str):
return os.path.join(self._storage_root, "containers", image_name)

Expand Down Expand Up @@ -77,15 +78,13 @@ def _put_file(self, local_filename, dst):
self._fs.makedirs(os.path.dirname(dst), exist_ok=True)
self._fs.put(local_filename, dst)

def deploy_job_scripts(self, job_name, job_script, array_wrapper=None):
def deploy_job_scripts(self, job_name, job_script):
job_path = self.job_path(job_name)
job_log_path = os.path.join(job_path, "logs")

self._fs.makedirs(job_path, exist_ok=True)
self._fs.makedirs(job_log_path, exist_ok=True)
self._put_content(self.job_script_path(job_name), job_script)
if array_wrapper is not None:
self._put_content(self.job_array_wrapper_path(job_name), array_wrapper)
console.log(f"Created job script {self.job_script_path(job_name)}")

def deploy_singularity_container(self, singularity_image):
Expand Down
28 changes: 24 additions & 4 deletions lxm3/xm_cluster/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,33 @@ def default_cluster(self) -> str:
cluster = self.data["clusters"][0]["name"]
return cluster

def cluster_config(self, location: Optional[str] = None) -> Dict[str, Any]:
location = location or self.default_cluster()
def get_cluster_settings(self):
location = self.default_cluster()
clusters = {cluster["name"]: cluster for cluster in self.data["clusters"]}
if location not in clusters:
raise ValueError("Unknown cluster")
cluster = clusters[location]
return cluster
cluster_config = clusters[location]
storage_root = cluster_config["storage"]["staging"]
hostname = cluster_config.get("server", None)
user = cluster_config.get("user", None)

connect_kwargs = {}

proxycommand = cluster_config.get("proxycommand", None)
if proxycommand is not None:
import paramiko

connect_kwargs["sock"] = paramiko.ProxyCommand(proxycommand)

ssh_private_key = cluster_config.get("ssh_private_key", None)
if ssh_private_key is not None:
connect_kwargs["key_filename"] = os.path.expanduser(ssh_private_key)

password = cluster_config.get("password", None)
if password is not None:
connect_kwargs["password"] = password

return storage_root, hostname, user, connect_kwargs


@functools.lru_cache()
Expand Down
92 changes: 4 additions & 88 deletions lxm3/xm_cluster/execution/common.py
Original file line number Diff line number Diff line change
@@ -1,38 +1,27 @@
import datetime
import os
from typing import List, Optional, Union, cast
from typing import List, Optional

import paramiko

from lxm3 import singularity
from lxm3 import xm
from lxm3.xm_cluster import config as config_lib
from lxm3.xm_cluster import executables
from lxm3.xm_cluster import executors
from lxm3.xm_cluster.execution import artifacts
from lxm3.xm_cluster.execution import job_script


def create_array_job(
*,
artifact: artifacts.Artifact,
executable: executables.Command,
singularity_image: Optional[str],
singularity_options: Optional[executors.SingularityOptions],
jobs: List[xm.Job],
use_gpu: bool,
version: Optional[str] = None,
job_script_shebang: str,
task_offset: int,
task_id_var_name: str,
setup: str,
header: str,
):
version = version or datetime.datetime.now().strftime("%Y%m%d.%H%M%S")

job_name = f"job-{version}"
array_wrapper = _create_array_wrapper(executable, jobs, task_offset)
deploy_archive_path = artifact.archive_path(executable.resource_uri)
deploy_archive_path = executable.resource_uri
setup_cmds = """
set -e

Expand Down Expand Up @@ -79,17 +68,9 @@ def create_array_job(
job_command = " ".join(["sh", "$ARRAY_WRAPPER_PATH", f"{task_offset}"])

if singularity_image is not None:
transport, _ = singularity.uri.split(singularity_image)
if not transport:
deploy_container_path = artifact.singularity_image_path(
os.path.basename(singularity_image)
)
else:
deploy_container_path = singularity_image
singularity_options = singularity_options
job_command = _wrap_singularity_cmd(
job_command,
deploy_container_path,
singularity_image,
singularity_options,
use_gpu,
)
Expand All @@ -100,38 +81,7 @@ def create_array_job(
setup=setup_cmds,
shebang=job_script_shebang,
)

return _put_job_resources(
artifact=artifact,
executable=executable,
singularity_image=singularity_image,
job_name=job_name,
job_script_content=job_script_content,
)


def _put_job_resources(
*,
artifact: artifacts.Artifact,
executable: executables.Command,
singularity_image: Optional[str],
job_name: str,
job_script_content: str,
) -> str:
# Put artifacts on the staging fs
job_script_dir = artifact.job_path(job_name)
artifact.deploy_resource_archive(executable.resource_uri)

if singularity_image is not None:
transport, _ = singularity.uri.split(singularity_image)
if not transport:
artifact.deploy_singularity_container(singularity_image)

artifact.deploy_job_scripts(job_name, job_script_content)

deploy_job_script_path = os.path.join(job_script_dir, job_script.JOB_SCRIPT_NAME)

return deploy_job_script_path
return job_script_content


def _create_array_wrapper(
Expand All @@ -158,40 +108,6 @@ def _wrap_singularity_cmd(
return job_command


def get_cluster_settings(config: config_lib.Config, jobs: List[xm.Job]):
executor = cast(Union[executors.Slurm, executors.GridEngine], jobs[0].executor)
location = executor.requirements.location

for job in jobs:
if not isinstance(job.executor, (executors.GridEngine, executors.Slurm)):
raise ValueError("Only GridEngine and Slurm executors are supported.")
if job.executor.requirements.location != location:
raise ValueError("All jobs must be launched on the same cluster.")

if location is None:
location = config.default_cluster()

cluster_config = config.cluster_config(location)
storage_root = cluster_config["storage"]["staging"]
hostname = cluster_config.get("server", None)
user = cluster_config.get("user", None)

connect_kwargs = {}
proxycommand = cluster_config.get("proxycommand", None)
if proxycommand is not None:
connect_kwargs["sock"] = paramiko.ProxyCommand(proxycommand)

ssh_private_key = cluster_config.get("ssh_private_key", None)
if ssh_private_key is not None:
connect_kwargs["key_filename"] = os.path.expanduser(ssh_private_key)

password = cluster_config.get("password", None)
if password is not None:
connect_kwargs["password"] = password

return storage_root, hostname, user, connect_kwargs


def write_job_id(artifact, job_script_path: str, job_id: str):
artifact._fs.write_text(
os.path.join(os.path.dirname(job_script_path), "job_id"), f"{job_id}\n"
Expand Down
33 changes: 16 additions & 17 deletions lxm3/xm_cluster/execution/gridengine.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@

from lxm3 import xm
from lxm3.clusters import gridengine
from lxm3.xm_cluster import artifacts
from lxm3.xm_cluster import config as config_lib
from lxm3.xm_cluster import executables
from lxm3.xm_cluster import executors
from lxm3.xm_cluster import requirements as cluster_requirements
from lxm3.xm_cluster.console import console
from lxm3.xm_cluster.execution import artifacts
from lxm3.xm_cluster.execution import common
from lxm3.xm_cluster.execution import job_script

Expand Down Expand Up @@ -158,34 +158,23 @@ def _get_setup_cmds(
return "\n".join(cmds)


def deploy_job_resources(
artifact: artifacts.Artifact,
jobs: List[xm.Job],
version: Optional[str] = None,
) -> str:
version = version or datetime.datetime.now().strftime("%Y%m%d.%H%M%S")
def create_job_script(jobs: List[xm.Job], job_name: str, job_script_dir: str) -> str:
executable = jobs[0].executable
executor = jobs[0].executor

assert isinstance(executor, executors.GridEngine)
assert isinstance(executable, executables.Command)
job_script.validate_same_job_configuration(jobs)

job_name = f"job-{version}"

job_script_dir = artifact.job_path(job_name)

setup = _get_setup_cmds(executable, executor)
header = _create_job_header(executor, jobs, job_script_dir, job_name)

return common.create_array_job(
artifact=artifact,
executable=executable,
singularity_image=executable.singularity_image,
singularity_options=executor.singularity_options,
jobs=jobs,
use_gpu=_is_gpu_requested(executor),
version=version,
job_script_shebang=_JOB_SCRIPT_SHEBANG,
task_offset=_TASK_OFFSET,
task_id_var_name=_TASK_ID_VAR_NAME,
Expand All @@ -211,9 +200,12 @@ async def launch(
if len(jobs) < 1:
return []

storage_root, hostname, user, connect_kwargs = common.get_cluster_settings(
config, jobs
)
(
storage_root,
hostname,
user,
connect_kwargs,
) = config_lib.default().get_cluster_settings()

artifact = artifacts.create_artifact_store(
storage_root,
Expand All @@ -223,9 +215,16 @@ async def launch(
connect_kwargs=connect_kwargs,
)

version = datetime.datetime.now().strftime("%Y%m%d.%H%M%S")
job_name = f"job-{version}"
job_script_dir = artifact.job_path(job_name)
job_script_content = create_job_script(jobs, job_name, job_script_dir)

artifact.deploy_job_scripts(job_name, job_script_content)
job_script_path = os.path.join(job_script_dir, job_script.JOB_SCRIPT_NAME)

client = gridengine.Client(hostname, user)
console.log(f"Launching {len(jobs)} jobs on {hostname}")
job_script_path = deploy_job_resources(artifact, jobs)

console.log(f"Launch with command:\n qsub {job_script_path}")
group = client.launch(job_script_path)
Expand Down
17 changes: 10 additions & 7 deletions lxm3/xm_cluster/execution/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@
from absl import logging

from lxm3 import xm
from lxm3.xm_cluster import artifacts
from lxm3.xm_cluster import config as config_lib
from lxm3.xm_cluster import executables
from lxm3.xm_cluster import executors
from lxm3.xm_cluster.console import console
from lxm3.xm_cluster.execution import artifacts
from lxm3.xm_cluster.execution import common
from lxm3.xm_cluster.execution import job_script

Expand Down Expand Up @@ -63,12 +63,13 @@ def _get_setup_cmds(
return "\n".join(cmds)


def deploy_job_resources(
def create_job_script(
artifact: artifacts.Artifact,
jobs: List[xm.Job],
version: Optional[str] = None,
) -> str:
version = version or datetime.datetime.now().strftime("%Y%m%d.%H%M%S")
job_name = f"job-{version}"

executable = jobs[0].executable
executor = jobs[0].executor
Expand All @@ -77,21 +78,17 @@ def deploy_job_resources(
assert isinstance(executable, executables.Command)
job_script.validate_same_job_configuration(jobs)

job_name = f"job-{version}"

job_script_dir = artifact.job_path(job_name)

setup = _get_setup_cmds(executable, executor)
header = _create_job_header(executor, jobs, job_script_dir, job_name)

return common.create_array_job(
artifact=artifact,
executable=executable,
singularity_image=executable.singularity_image,
singularity_options=executor.singularity_options,
jobs=jobs,
use_gpu=_is_gpu_requested(executor),
version=version,
job_script_shebang=_JOB_SCRIPT_SHEBANG,
task_offset=_TASK_OFFSET,
task_id_var_name=_TASK_ID_VAR_NAME,
Expand Down Expand Up @@ -119,7 +116,13 @@ async def launch(config: config_lib.Config, jobs: List[xm.Job]):
artifact = artifacts.LocalArtifact(
local_config["storage"]["staging"], project=config.project()
)
job_script_path = deploy_job_resources(artifact, jobs)
version = datetime.datetime.now().strftime("%Y%m%d.%H%M%S")
job_script_content = create_job_script(artifact, jobs, version)
job_name = f"job-{version}"

job_script_dir = artifact.job_path(job_name)
artifact.deploy_job_scripts(job_name, job_script_content)
job_script_path = os.path.join(job_script_dir, job_script.JOB_SCRIPT_NAME)

console.print(f"Launching {len(jobs)} jobs locally...")
handles = []
Expand Down
Loading