Skip to content

Commit

Permalink
feature: 支持 Blobs 的 Mount API 和 Delete API
Browse files Browse the repository at this point in the history
  • Loading branch information
shabbywu committed Dec 27, 2021
1 parent 094cf86 commit 2d69c98
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 11 deletions.
2 changes: 1 addition & 1 deletion moby_distribution/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from moby_distribution.spec.endpoint import OFFICIAL_ENDPOINT, APIEndpoint
from moby_distribution.spec.manifest import ManifestSchema1, ManifestSchema2, OCIManifestSchema1

__version__ = "0.1.1"
__version__ = "0.1.2"
__ALL__ = [
"DockerRegistryV2Client",
"Blob",
Expand Down
51 changes: 44 additions & 7 deletions moby_distribution/registry/resources/blobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,16 @@ def __init__(
if isinstance(local_path, str):
local_path = Path(local_path)

self._accessor = Accessor(local_path=local_path, fileobj=fileobj)
self.digest = digest
self.local_path = local_path
self.fileobj = fileobj
self._accessor = None

@property
def accessor(self):
if self._accessor is None:
self._accessor = Accessor(local_path=self.local_path, fileobj=self.fileobj)
return self._accessor

def download(self, digest: Optional[str] = None):
"""download the blob from registry to `local_path` or `fileobj`"""
Expand All @@ -33,7 +41,7 @@ def download(self, digest: Optional[str] = None):

url = URLBuilder.build_blobs_url(self.client.api_base_url, repo=self.repo, digest=digest)
resp = self.client.get(url=url, stream=True)
with self._accessor.open(mode="wb") as fh:
with self.accessor.open(mode="wb") as fh:
for chunk in resp.iter_content(chunk_size=1024):
fh.write(chunk)

Expand All @@ -42,7 +50,7 @@ def upload(self) -> bool:
uuid, location = self._initiate_blob_upload()
blob = BlobWriter(uuid, location, client=self.client)
sha256 = hashlib.sha256()
with self._accessor.open(mode="rb") as fh:
with self.accessor.open(mode="rb") as fh:
chunk = fh.read(1024 * 1024 * 4)
sha256.update(chunk)
blob.write(chunk)
Expand All @@ -55,7 +63,7 @@ def upload(self) -> bool:

def upload_at_one_time(self):
"""upload the monolithic blob from `local_path` or `fileobj` to the registry at one time."""
data = self._accessor.read_bytes()
data = self.accessor.read_bytes()
digest = f"sha256:{hashlib.sha256(data).hexdigest()}"

headers = {"content_type": "application/octect-stream"}
Expand All @@ -64,7 +72,7 @@ def upload_at_one_time(self):
uuid, location = self._initiate_blob_upload()
resp = self.client.put(url=location, headers=headers, params=params, data=data)

if not resp.ok:
if resp.status_code != 201:
raise exceptions.RequestErrorWithResponse("failed to upload", status_code=resp.status_code, response=resp)
self.digest = digest
return True
Expand Down Expand Up @@ -96,6 +104,35 @@ def _initiate_blob_upload(self) -> Tuple[str, str]:
location = f"{self.client.api_base_url}/{location.lstrip('/')}"
return uuid, location

def mount_from(self, from_repo: str) -> bool:
"""Mount the blob from the given repo, if the client has read access to."""
if self.digest is None:
raise RuntimeError("unknown digest")

url = URLBuilder.build_upload_blobs_url(self.client.api_base_url, self.repo)
resp = self.client.post(url=url, params={"from": from_repo, "mount": self.digest})

# If the blob is successfully mounted, the client will receive a `201` Created response
if resp.status_code != 201:
raise exceptions.RequestErrorWithResponse(
f"failed to mount blob({self.digest}) from `{from_repo}`", status_code=resp.status_code, response=resp
)
return True

def delete(self, digest: Optional[str] = None):
"""Delete the blob identified by repo and digest"""
digest = digest or self.digest
if digest is None:
raise RuntimeError("unknown digest")

url = URLBuilder.build_blobs_url(self.client.api_base_url, repo=self.repo, digest=digest)
resp = self.client.delete(url=url)
if resp.status_code != 202:
raise exceptions.RequestErrorWithResponse(
f"failed to delete blob({self.digest}) from `{self.repo}`", status_code=resp.status_code, response=resp
)
return True


class BlobWriter:
def __init__(self, uuid: str, location: str, client: DockerRegistryV2Client):
Expand All @@ -112,7 +149,7 @@ def write(self, buffer: Union[bytes, bytearray]) -> int:
}
resp = self.client.patch(url=self.location, data=buffer, headers=headers)

if not resp.ok:
if resp.status_code != 202:
raise exceptions.RequestErrorWithResponse(
"fail to upload a chunk of blobs",
status_code=resp.status_code,
Expand All @@ -131,7 +168,7 @@ def write(self, buffer: Union[bytes, bytearray]) -> int:
def commit(self, digest: str) -> bool:
params = {"digest": digest}
resp = self.client.put(url=self.location, params=params)
if not resp.ok:
if resp.status_code != 201:
raise exceptions.RequestErrorWithResponse(
"can't commit an upload process",
status_code=resp.status_code,
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "moby-distribution"
version = "0.1.1"
version = "0.1.2"
description = "Yet another moby(docker) distribution implement by python."
authors = ["shabbywu <[email protected]>"]
license = "Apache-2.0"
Expand Down
9 changes: 7 additions & 2 deletions tests/integration/resources/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,17 @@ def repo() -> str:


@pytest.fixture
def reference():
def reference() -> str:
return "2.7.1"


@pytest.fixture
def temp_reference():
def temp_repo() -> str:
return "".join(random.choices(string.ascii_lowercase, k=10))


@pytest.fixture
def temp_reference() -> str:
return "".join(random.choices(string.ascii_lowercase, k=10))


Expand Down
21 changes: 21 additions & 0 deletions tests/integration/resources/test_blobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import pytest

from moby_distribution.registry.exceptions import ResourceNotFound
from moby_distribution.registry.resources.blobs import Blob
from moby_distribution.registry.resources.manifests import ManifestRef

Expand Down Expand Up @@ -37,3 +38,23 @@ def test_upload(self, repo, reference, registry_client, method):
fh2.seek(0)

assert fh1.read() == fh2.read()

def test_mount_from_and_delete(self, repo, reference, temp_repo, registry_client):
ref = ManifestRef(repo=repo, reference=reference, client=registry_client)
manifest = ref.get()

fh1 = BytesIO()
Blob(fileobj=fh1, repo=repo, digest=manifest.config.digest, client=registry_client).download()
fh1.seek(0)

assert Blob(repo=temp_repo, digest=manifest.config.digest, client=registry_client).mount_from(repo)

fh2 = BytesIO()
Blob(fileobj=fh2, repo=temp_repo, digest=manifest.config.digest, client=registry_client).download()
fh2.seek(0)

assert fh2.read() == fh1.read()

Blob(repo=temp_repo, client=registry_client).delete(manifest.config.digest)
with pytest.raises(ResourceNotFound):
Blob(fileobj=fh2, repo=temp_repo, client=registry_client).download(digest=manifest.config.digest)

0 comments on commit 2d69c98

Please sign in to comment.