Skip to content

Commit

Permalink
Double-check file sizes before & after uploading
Browse files Browse the repository at this point in the history
  • Loading branch information
jwodder committed Dec 11, 2023
1 parent a51e0a7 commit d7cb4e9
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 79 deletions.
143 changes: 75 additions & 68 deletions dandi/files/bases.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from dandi.dandiapi import RemoteAsset, RemoteDandiset, RESTFullAPIClient
from dandi.metadata.core import get_default_metadata
from dandi.misctypes import DUMMY_DANDI_ETAG, Digest, LocalReadableFile, P
from dandi.utils import yaml_load
from dandi.utils import post_upload_size_check, pre_upload_size_check, yaml_load
from dandi.validate_types import Scope, Severity, ValidationOrigin, ValidationResult

lgr = dandi.get_logger()
Expand Down Expand Up @@ -350,7 +350,7 @@ def iter_upload(
)
yield {"status": "initiating upload"}
lgr.debug("%s: Beginning upload", asset_path)
total_size = self.size
total_size = pre_upload_size_check(self.filepath)
try:
resp = client.post(
"/uploads/initialize/",
Expand All @@ -370,73 +370,80 @@ def iter_upload(
else:
raise
else:
upload_id = resp["upload_id"]
parts = resp["parts"]
if len(parts) != etagger.part_qty:
raise RuntimeError(
f"Server and client disagree on number of parts for upload;"
f" server says {len(parts)}, client says {etagger.part_qty}"
)
parts_out = []
bytes_uploaded = 0
lgr.debug("Uploading %s in %d parts", self.filepath, len(parts))
with RESTFullAPIClient("http://nil.nil") as storage:
with self.filepath.open("rb") as fp:
with ThreadPoolExecutor(max_workers=jobs or 5) as executor:
lock = Lock()
futures = [
executor.submit(
_upload_blob_part,
storage_session=storage,
fp=fp,
lock=lock,
etagger=etagger,
asset_path=asset_path,
part=part,
try:
upload_id = resp["upload_id"]
parts = resp["parts"]
if len(parts) != etagger.part_qty:
raise RuntimeError(

Check warning on line 377 in dandi/files/bases.py

View check run for this annotation

Codecov / codecov/patch

dandi/files/bases.py#L377

Added line #L377 was not covered by tests
f"Server and client disagree on number of parts for upload;"
f" server says {len(parts)}, client says {etagger.part_qty}"
)
parts_out = []
bytes_uploaded = 0
lgr.debug("Uploading %s in %d parts", self.filepath, len(parts))
with RESTFullAPIClient("http://nil.nil") as storage:
with self.filepath.open("rb") as fp:
with ThreadPoolExecutor(max_workers=jobs or 5) as executor:
lock = Lock()
futures = [
executor.submit(
_upload_blob_part,
storage_session=storage,
fp=fp,
lock=lock,
etagger=etagger,
asset_path=asset_path,
part=part,
)
for part in parts
]
for fut in as_completed(futures):
out_part = fut.result()
bytes_uploaded += out_part["size"]
yield {
"status": "uploading",
"upload": 100 * bytes_uploaded / total_size,
"current": bytes_uploaded,
}
parts_out.append(out_part)
lgr.debug("%s: Completing upload", asset_path)
resp = client.post(
f"/uploads/{upload_id}/complete/",
json={"parts": parts_out},
)
lgr.debug(
"%s: Announcing completion to %s",
asset_path,
resp["complete_url"],
)
r = storage.post(
resp["complete_url"], data=resp["body"], json_resp=False
)
lgr.debug(
"%s: Upload completed. Response content: %s",
asset_path,
r.content,
)
rxml = fromstring(r.text)
m = re.match(r"\{.+?\}", rxml.tag)
ns = m.group(0) if m else ""
final_etag = rxml.findtext(f"{ns}ETag")
if final_etag is not None:
final_etag = final_etag.strip('"')
if final_etag != filetag:
raise RuntimeError(

Check warning on line 434 in dandi/files/bases.py

View check run for this annotation

Codecov / codecov/patch

dandi/files/bases.py#L434

Added line #L434 was not covered by tests
"Server and client disagree on final ETag of"
f" uploaded file; server says {final_etag},"
f" client says {filetag}"
)
for part in parts
]
for fut in as_completed(futures):
out_part = fut.result()
bytes_uploaded += out_part["size"]
yield {
"status": "uploading",
"upload": 100 * bytes_uploaded / total_size,
"current": bytes_uploaded,
}
parts_out.append(out_part)
lgr.debug("%s: Completing upload", asset_path)
resp = client.post(
f"/uploads/{upload_id}/complete/",
json={"parts": parts_out},
)
lgr.debug(
"%s: Announcing completion to %s",
asset_path,
resp["complete_url"],
)
r = storage.post(
resp["complete_url"], data=resp["body"], json_resp=False
)
lgr.debug(
"%s: Upload completed. Response content: %s",
asset_path,
r.content,
)
rxml = fromstring(r.text)
m = re.match(r"\{.+?\}", rxml.tag)
ns = m.group(0) if m else ""
final_etag = rxml.findtext(f"{ns}ETag")
if final_etag is not None:
final_etag = final_etag.strip('"')
if final_etag != filetag:
raise RuntimeError(
"Server and client disagree on final ETag of uploaded file;"
f" server says {final_etag}, client says {filetag}"
)
# else: Error? Warning?
resp = client.post(f"/uploads/{upload_id}/validate/")
blob_id = resp["blob_id"]
# else: Error? Warning?
resp = client.post(f"/uploads/{upload_id}/validate/")
blob_id = resp["blob_id"]
except Exception:
post_upload_size_check(self.filepath, total_size, True)
raise

Check warning on line 444 in dandi/files/bases.py

View check run for this annotation

Codecov / codecov/patch

dandi/files/bases.py#L442-L444

Added lines #L442 - L444 were not covered by tests
else:
post_upload_size_check(self.filepath, total_size, False)
lgr.debug("%s: Assigning asset blob to dandiset & version", asset_path)
yield {"status": "producing asset"}
if replacing is not None:
Expand Down
39 changes: 28 additions & 11 deletions dandi/files/zarr.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,13 @@
)
from dandi.metadata.core import get_default_metadata
from dandi.misctypes import DUMMY_DANDI_ZARR_CHECKSUM, BasePath, Digest
from dandi.utils import chunked, exclude_from_zarr, pluralize
from dandi.utils import (
chunked,
exclude_from_zarr,
pluralize,
post_upload_size_check,
pre_upload_size_check,
)

from .bases import LocalDirectoryAsset
from ..validate_types import Scope, Severity, ValidationOrigin, ValidationResult
Expand Down Expand Up @@ -551,15 +557,21 @@ def mkzarr() -> str:
def _upload_zarr_file(
storage_session: RESTFullAPIClient, upload_url: str, item: UploadItem
) -> int:
with item.filepath.open("rb") as fp:
storage_session.put(
upload_url,
data=fp,
json_resp=False,
retry_if=_retry_zarr_file,
headers={"Content-MD5": item.base64_digest},
)
return item.size
try:
with item.filepath.open("rb") as fp:
storage_session.put(
upload_url,
data=fp,
json_resp=False,
retry_if=_retry_zarr_file,
headers={"Content-MD5": item.base64_digest},
)
except Exception:
post_upload_size_check(item.filepath, item.size, True)
raise

Check warning on line 571 in dandi/files/zarr.py

View check run for this annotation

Codecov / codecov/patch

dandi/files/zarr.py#L569-L571

Added lines #L569 - L571 were not covered by tests
else:
post_upload_size_check(item.filepath, item.size, False)
return item.size


def _retry_zarr_file(r: requests.Response) -> bool:
Expand Down Expand Up @@ -634,7 +646,12 @@ class UploadItem:

@classmethod
def from_entry(cls, e: LocalZarrEntry, digest: str) -> UploadItem:
return cls(entry_path=str(e), filepath=e.filepath, digest=digest, size=e.size)
return cls(
entry_path=str(e),
filepath=e.filepath,
digest=digest,
size=pre_upload_size_check(e.filepath),
)

@property
def base64_digest(self) -> str:
Expand Down
30 changes: 30 additions & 0 deletions dandi/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import shutil
import subprocess
import sys
from time import sleep
import traceback
import types
from typing import IO, Any, List, Optional, Protocol, TypeVar, Union
Expand Down Expand Up @@ -834,3 +835,32 @@ def _prepare_path_parts(paths: Iterable[str | PurePath]) -> list[tuple[str, ...]

def _starts_with(t: tuple[str, ...], prefix: tuple[str, ...]) -> bool:
return t[: len(prefix)] == prefix


def pre_upload_size_check(path: Path) -> int:
# If the filesystem reports a size of zero for a file we're about to
# upload, double-check the size in case we're on a flaky NFS system.
for naptime in [0] + [0.1] * 19:
sleep(naptime)
size = path.stat().st_size
if size != 0:
return size
return size


def post_upload_size_check(path: Path, pre_check_size: int, erroring: bool) -> None:
# More checks for NFS flakiness
size = path.stat().st_size
if size != pre_check_size:
if erroring:
lgr.error(

Check warning on line 856 in dandi/utils.py

View check run for this annotation

Codecov / codecov/patch

dandi/utils.py#L855-L856

Added lines #L855 - L856 were not covered by tests
"Size of %s was %d at start of upload but is now %d after upload",
path,
pre_check_size,
size,
)
else:
raise RuntimeError(

Check warning on line 863 in dandi/utils.py

View check run for this annotation

Codecov / codecov/patch

dandi/utils.py#L863

Added line #L863 was not covered by tests
f"Size of {path} was {pre_check_size} at start of upload but is"
f" now {size} after upload"
)

0 comments on commit d7cb4e9

Please sign in to comment.