Skip to content

Commit

Permalink
Merge pull request #1374 from dandi/gh-1373
Browse files Browse the repository at this point in the history
Double-check file sizes before & after uploading
  • Loading branch information
yarikoptic authored Dec 13, 2023
2 parents cb20d8d + cec2e23 commit 7b906e6
Show file tree
Hide file tree
Showing 4 changed files with 157 additions and 79 deletions.
143 changes: 75 additions & 68 deletions dandi/files/bases.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from dandi.dandiapi import RemoteAsset, RemoteDandiset, RESTFullAPIClient
from dandi.metadata.core import get_default_metadata
from dandi.misctypes import DUMMY_DANDI_ETAG, Digest, LocalReadableFile, P
from dandi.utils import yaml_load
from dandi.utils import post_upload_size_check, pre_upload_size_check, yaml_load
from dandi.validate_types import Scope, Severity, ValidationOrigin, ValidationResult

lgr = dandi.get_logger()
Expand Down Expand Up @@ -350,7 +350,7 @@ def iter_upload(
)
yield {"status": "initiating upload"}
lgr.debug("%s: Beginning upload", asset_path)
total_size = self.size
total_size = pre_upload_size_check(self.filepath)
try:
resp = client.post(
"/uploads/initialize/",
Expand All @@ -370,73 +370,80 @@ def iter_upload(
else:
raise
else:
upload_id = resp["upload_id"]
parts = resp["parts"]
if len(parts) != etagger.part_qty:
raise RuntimeError(
f"Server and client disagree on number of parts for upload;"
f" server says {len(parts)}, client says {etagger.part_qty}"
)
parts_out = []
bytes_uploaded = 0
lgr.debug("Uploading %s in %d parts", self.filepath, len(parts))
with RESTFullAPIClient("http://nil.nil") as storage:
with self.filepath.open("rb") as fp:
with ThreadPoolExecutor(max_workers=jobs or 5) as executor:
lock = Lock()
futures = [
executor.submit(
_upload_blob_part,
storage_session=storage,
fp=fp,
lock=lock,
etagger=etagger,
asset_path=asset_path,
part=part,
try:
upload_id = resp["upload_id"]
parts = resp["parts"]
if len(parts) != etagger.part_qty:
raise RuntimeError(
f"Server and client disagree on number of parts for upload;"
f" server says {len(parts)}, client says {etagger.part_qty}"
)
parts_out = []
bytes_uploaded = 0
lgr.debug("Uploading %s in %d parts", self.filepath, len(parts))
with RESTFullAPIClient("http://nil.nil") as storage:
with self.filepath.open("rb") as fp:
with ThreadPoolExecutor(max_workers=jobs or 5) as executor:
lock = Lock()
futures = [
executor.submit(
_upload_blob_part,
storage_session=storage,
fp=fp,
lock=lock,
etagger=etagger,
asset_path=asset_path,
part=part,
)
for part in parts
]
for fut in as_completed(futures):
out_part = fut.result()
bytes_uploaded += out_part["size"]
yield {
"status": "uploading",
"upload": 100 * bytes_uploaded / total_size,
"current": bytes_uploaded,
}
parts_out.append(out_part)
lgr.debug("%s: Completing upload", asset_path)
resp = client.post(
f"/uploads/{upload_id}/complete/",
json={"parts": parts_out},
)
lgr.debug(
"%s: Announcing completion to %s",
asset_path,
resp["complete_url"],
)
r = storage.post(
resp["complete_url"], data=resp["body"], json_resp=False
)
lgr.debug(
"%s: Upload completed. Response content: %s",
asset_path,
r.content,
)
rxml = fromstring(r.text)
m = re.match(r"\{.+?\}", rxml.tag)
ns = m.group(0) if m else ""
final_etag = rxml.findtext(f"{ns}ETag")
if final_etag is not None:
final_etag = final_etag.strip('"')
if final_etag != filetag:
raise RuntimeError(
"Server and client disagree on final ETag of"
f" uploaded file; server says {final_etag},"
f" client says {filetag}"
)
for part in parts
]
for fut in as_completed(futures):
out_part = fut.result()
bytes_uploaded += out_part["size"]
yield {
"status": "uploading",
"upload": 100 * bytes_uploaded / total_size,
"current": bytes_uploaded,
}
parts_out.append(out_part)
lgr.debug("%s: Completing upload", asset_path)
resp = client.post(
f"/uploads/{upload_id}/complete/",
json={"parts": parts_out},
)
lgr.debug(
"%s: Announcing completion to %s",
asset_path,
resp["complete_url"],
)
r = storage.post(
resp["complete_url"], data=resp["body"], json_resp=False
)
lgr.debug(
"%s: Upload completed. Response content: %s",
asset_path,
r.content,
)
rxml = fromstring(r.text)
m = re.match(r"\{.+?\}", rxml.tag)
ns = m.group(0) if m else ""
final_etag = rxml.findtext(f"{ns}ETag")
if final_etag is not None:
final_etag = final_etag.strip('"')
if final_etag != filetag:
raise RuntimeError(
"Server and client disagree on final ETag of uploaded file;"
f" server says {final_etag}, client says {filetag}"
)
# else: Error? Warning?
resp = client.post(f"/uploads/{upload_id}/validate/")
blob_id = resp["blob_id"]
# else: Error? Warning?
resp = client.post(f"/uploads/{upload_id}/validate/")
blob_id = resp["blob_id"]
except Exception:
post_upload_size_check(self.filepath, total_size, True)
raise
else:
post_upload_size_check(self.filepath, total_size, False)
lgr.debug("%s: Assigning asset blob to dandiset & version", asset_path)
yield {"status": "producing asset"}
if replacing is not None:
Expand Down
39 changes: 28 additions & 11 deletions dandi/files/zarr.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,13 @@
)
from dandi.metadata.core import get_default_metadata
from dandi.misctypes import DUMMY_DANDI_ZARR_CHECKSUM, BasePath, Digest
from dandi.utils import chunked, exclude_from_zarr, pluralize
from dandi.utils import (
chunked,
exclude_from_zarr,
pluralize,
post_upload_size_check,
pre_upload_size_check,
)

from .bases import LocalDirectoryAsset
from ..validate_types import Scope, Severity, ValidationOrigin, ValidationResult
Expand Down Expand Up @@ -551,15 +557,21 @@ def mkzarr() -> str:
def _upload_zarr_file(
storage_session: RESTFullAPIClient, upload_url: str, item: UploadItem
) -> int:
with item.filepath.open("rb") as fp:
storage_session.put(
upload_url,
data=fp,
json_resp=False,
retry_if=_retry_zarr_file,
headers={"Content-MD5": item.base64_digest},
)
return item.size
try:
with item.filepath.open("rb") as fp:
storage_session.put(
upload_url,
data=fp,
json_resp=False,
retry_if=_retry_zarr_file,
headers={"Content-MD5": item.base64_digest},
)
except Exception:
post_upload_size_check(item.filepath, item.size, True)
raise
else:
post_upload_size_check(item.filepath, item.size, False)
return item.size


def _retry_zarr_file(r: requests.Response) -> bool:
Expand Down Expand Up @@ -634,7 +646,12 @@ class UploadItem:

@classmethod
def from_entry(cls, e: LocalZarrEntry, digest: str) -> UploadItem:
return cls(entry_path=str(e), filepath=e.filepath, digest=digest, size=e.size)
return cls(
entry_path=str(e),
filepath=e.filepath,
digest=digest,
size=pre_upload_size_check(e.filepath),
)

@property
def base64_digest(self) -> str:
Expand Down
28 changes: 28 additions & 0 deletions dandi/tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from collections.abc import Iterable
import inspect
import logging
import os.path as op
from pathlib import Path
import time
Expand Down Expand Up @@ -29,6 +30,7 @@
is_page2_url,
is_same_time,
on_windows,
post_upload_size_check,
under_paths,
)

Expand Down Expand Up @@ -561,3 +563,29 @@ def test_under_paths(
paths: list[str], filter_paths: list[str], results: list[str]
) -> None:
assert list(map(str, under_paths(paths, filter_paths))) == results


def test_post_upload_size_check_not_erroring(tmp_path: Path) -> None:
p = tmp_path / "file.txt"
# Write bytes so the size is the same on Unix and Windows:
p.write_bytes(b"This is test text.\n")
with pytest.raises(RuntimeError) as excinfo:
post_upload_size_check(p, 42, False)
assert (
str(excinfo.value)
== f"Size of {p} was 42 at start of upload but is now 19 after upload"
)


def test_post_upload_size_check_erroring(
caplog: pytest.LogCaptureFixture, tmp_path: Path
) -> None:
p = tmp_path / "file.txt"
# Write bytes so the size is the same on Unix and Windows:
p.write_bytes(b"This is test text.\n")
post_upload_size_check(p, 42, True)
assert (
"dandi",
logging.ERROR,
f"Size of {p} was 42 at start of upload but is now 19 after upload",
) in caplog.record_tuples
26 changes: 26 additions & 0 deletions dandi/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import shutil
import subprocess
import sys
from time import sleep
import traceback
import types
from typing import IO, Any, List, Optional, Protocol, TypeVar, Union
Expand Down Expand Up @@ -834,3 +835,28 @@ def _prepare_path_parts(paths: Iterable[str | PurePath]) -> list[tuple[str, ...]

def _starts_with(t: tuple[str, ...], prefix: tuple[str, ...]) -> bool:
return t[: len(prefix)] == prefix


def pre_upload_size_check(path: Path) -> int:
# If the filesystem reports a size of zero for a file we're about to
# upload, double-check the size in case we're on a flaky NFS system.
for naptime in [0] + [0.1] * 19:
sleep(naptime)
size = path.stat().st_size
if size != 0:
return size
return size


def post_upload_size_check(path: Path, pre_check_size: int, erroring: bool) -> None:
# More checks for NFS flakiness
size = path.stat().st_size
if size != pre_check_size:
msg = (
f"Size of {path} was {pre_check_size} at start of upload but is"
f" now {size} after upload"
)
if erroring:
lgr.error(msg)
else:
raise RuntimeError(msg)

0 comments on commit 7b906e6

Please sign in to comment.