Skip to content

Commit

Permalink
Include resolved digest in pushing container
Browse files Browse the repository at this point in the history
  • Loading branch information
ethanluoyc committed Feb 9, 2024
1 parent 6053f13 commit d3f9aea
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 25 deletions.
6 changes: 6 additions & 0 deletions lxm3/xm_cluster/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
# Disable verbose logging from paramiko
import logging

from lxm3.xm_cluster.array_job import ArrayJob
from lxm3.xm_cluster.config import Config
from lxm3.xm_cluster.executable_specs import CommandList
Expand All @@ -19,3 +22,6 @@
from lxm3.xm_cluster.experiment import ClusterWorkUnit
from lxm3.xm_cluster.experiment import create_experiment
from lxm3.xm_cluster.requirements import JobRequirements

logging.getLogger("paramiko").setLevel(logging.WARNING)
del logging
8 changes: 2 additions & 6 deletions lxm3/xm_cluster/artifacts.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import abc
import datetime
import logging
import os
from typing import Any, Mapping, Optional

Expand All @@ -11,9 +10,6 @@

from lxm3.xm_cluster.console import console

# Disable verbose logging from paramiko
logging.getLogger("paramiko").setLevel(logging.WARNING)


class ArtifactStore(abc.ABC):
def __init__(
Expand Down Expand Up @@ -151,7 +147,7 @@ def __init__(
):
if connect_kwargs is None:
connect_kwargs = {}
fs = fsspec.filesystem("sftp", host=hostname, username=user, **connect_kwargs)
fs = SFTPFileSystem("sftp", host=hostname, username=user, **connect_kwargs) # type: ignore
# Normalize the storage root to an absolute path.
self._host = hostname
self._user = user
Expand All @@ -160,7 +156,7 @@ def __init__(
super().__init__(fs, staging_directory, project)

def deploy_singularity_container(self, lpath, image_name):
assert not "/" not in image_name
assert "/" not in image_name
deploy_container_path = self.singularity_image_path(image_name)
should_update = self._should_update(lpath, deploy_container_path)
if should_update:
Expand Down
14 changes: 12 additions & 2 deletions lxm3/xm_cluster/packaging/cluster.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
import pathlib
import tempfile
from typing import Any

Expand All @@ -11,6 +12,13 @@
from lxm3.xm_cluster import executables as cluster_executables
from lxm3.xm_cluster.packaging import archive_builder
from lxm3.xm_cluster.packaging import container_builder
from lxm3.xm_cluster.packaging import digest_util


def _get_push_image_name(image_path: str):
digest = digest_util.sha256_digest(image_path)
path = pathlib.Path(image_path)
return path.with_stem(path.stem + "@" + digest.replace(":", ".")).name


def _package_python_package(
Expand Down Expand Up @@ -108,20 +116,22 @@ def _package_singularity_container(

transport, _ = singularity.uri.split(singularity_image)
if not transport:
push_image_name = os.path.basename(singularity_image)
push_image_name = _get_push_image_name(singularity_image)
deploy_container_path = artifact_store.singularity_image_path(push_image_name)
artifact_store.deploy_singularity_container(singularity_image, push_image_name)
elif transport == "docker-daemon":
# Try building singularity image using cache
# TODO(yl): Use the new image cache implementation
cache_image_path = (
singularity.images.build_singularity_image_from_docker_daemon(
singularity_image
)
)
push_image_name = os.path.basename(cache_image_path)
push_image_name = _get_push_image_name(cache_image_path)
deploy_container_path = artifact_store.singularity_image_path(push_image_name)
artifact_store.deploy_singularity_container(cache_image_path, push_image_name)
else:
# For other transports, just use the image as is for now.
deploy_container_path = singularity_image

executable.singularity_image = deploy_container_path
Expand Down
31 changes: 14 additions & 17 deletions lxm3/xm_cluster/packaging/digest_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import hashlib
import os
import pathlib
from typing import NamedTuple

import appdirs
import cachetools
Expand All @@ -15,13 +14,6 @@
_DIGEST_CACHE = None


class _CacheKey(NamedTuple):
path: str
node_id: int
time_modified: float
size: int


def clear_cache():
_get_cache().clear()

Expand All @@ -39,20 +31,25 @@ def _get_cache():
return _DIGEST_CACHE


def _create_hash_key(stat: os.stat_result, path: str):
return (path, stat.st_ino, stat.st_mtime, stat.st_size)


def sha256_digest(filename: str):
path = os.path.abspath(filename)
stat = os.stat(path)

cache_key = cachetools.keys.hashkey(
_CacheKey(path, stat.st_ino, stat.st_mtime, stat.st_size)
)
cache = _get_cache()
if cache_key in cache:
return cache[cache_key]
if stat.st_size > 4096:
cache_key = cachetools.keys.hashkey(_create_hash_key(stat, path))
cache = _get_cache()
if cache_key in cache:
return cache[cache_key]
else:
digest = f"sha256:{_sha256sum(path)}"
cache[cache_key] = digest
return digest
else:
digest = f"sha256:{_sha256sum(path)}"
cache[cache_key] = digest
return digest
return f"sha256:{_sha256sum(path)}"


def _sha256sum(path: str, block_size=2**20):
Expand Down

0 comments on commit d3f9aea

Please sign in to comment.