diff --git a/dandi/files/bases.py b/dandi/files/bases.py index 7236b55d8..45f5929f5 100644 --- a/dandi/files/bases.py +++ b/dandi/files/bases.py @@ -27,7 +27,7 @@ from dandi.dandiapi import RemoteAsset, RemoteDandiset, RESTFullAPIClient from dandi.metadata.core import get_default_metadata from dandi.misctypes import DUMMY_DANDI_ETAG, Digest, LocalReadableFile, P -from dandi.utils import yaml_load +from dandi.utils import post_upload_size_check, pre_upload_size_check, yaml_load from dandi.validate_types import Scope, Severity, ValidationOrigin, ValidationResult lgr = dandi.get_logger() @@ -350,7 +350,7 @@ def iter_upload( ) yield {"status": "initiating upload"} lgr.debug("%s: Beginning upload", asset_path) - total_size = self.size + total_size = pre_upload_size_check(self.filepath) try: resp = client.post( "/uploads/initialize/", @@ -370,73 +370,80 @@ def iter_upload( else: raise else: - upload_id = resp["upload_id"] - parts = resp["parts"] - if len(parts) != etagger.part_qty: - raise RuntimeError( - f"Server and client disagree on number of parts for upload;" - f" server says {len(parts)}, client says {etagger.part_qty}" - ) - parts_out = [] - bytes_uploaded = 0 - lgr.debug("Uploading %s in %d parts", self.filepath, len(parts)) - with RESTFullAPIClient("http://nil.nil") as storage: - with self.filepath.open("rb") as fp: - with ThreadPoolExecutor(max_workers=jobs or 5) as executor: - lock = Lock() - futures = [ - executor.submit( - _upload_blob_part, - storage_session=storage, - fp=fp, - lock=lock, - etagger=etagger, - asset_path=asset_path, - part=part, + try: + upload_id = resp["upload_id"] + parts = resp["parts"] + if len(parts) != etagger.part_qty: + raise RuntimeError( + f"Server and client disagree on number of parts for upload;" + f" server says {len(parts)}, client says {etagger.part_qty}" + ) + parts_out = [] + bytes_uploaded = 0 + lgr.debug("Uploading %s in %d parts", self.filepath, len(parts)) + with RESTFullAPIClient("http://nil.nil") as storage: + with self.filepath.open("rb") as fp: + with ThreadPoolExecutor(max_workers=jobs or 5) as executor: + lock = Lock() + futures = [ + executor.submit( + _upload_blob_part, + storage_session=storage, + fp=fp, + lock=lock, + etagger=etagger, + asset_path=asset_path, + part=part, + ) + for part in parts + ] + for fut in as_completed(futures): + out_part = fut.result() + bytes_uploaded += out_part["size"] + yield { + "status": "uploading", + "upload": 100 * bytes_uploaded / total_size, + "current": bytes_uploaded, + } + parts_out.append(out_part) + lgr.debug("%s: Completing upload", asset_path) + resp = client.post( + f"/uploads/{upload_id}/complete/", + json={"parts": parts_out}, + ) + lgr.debug( + "%s: Announcing completion to %s", + asset_path, + resp["complete_url"], + ) + r = storage.post( + resp["complete_url"], data=resp["body"], json_resp=False + ) + lgr.debug( + "%s: Upload completed. Response content: %s", + asset_path, + r.content, + ) + rxml = fromstring(r.text) + m = re.match(r"\{.+?\}", rxml.tag) + ns = m.group(0) if m else "" + final_etag = rxml.findtext(f"{ns}ETag") + if final_etag is not None: + final_etag = final_etag.strip('"') + if final_etag != filetag: + raise RuntimeError( + "Server and client disagree on final ETag of" + f" uploaded file; server says {final_etag}," + f" client says {filetag}" ) - for part in parts - ] - for fut in as_completed(futures): - out_part = fut.result() - bytes_uploaded += out_part["size"] - yield { - "status": "uploading", - "upload": 100 * bytes_uploaded / total_size, - "current": bytes_uploaded, - } - parts_out.append(out_part) - lgr.debug("%s: Completing upload", asset_path) - resp = client.post( - f"/uploads/{upload_id}/complete/", - json={"parts": parts_out}, - ) - lgr.debug( - "%s: Announcing completion to %s", - asset_path, - resp["complete_url"], - ) - r = storage.post( - resp["complete_url"], data=resp["body"], json_resp=False - ) - lgr.debug( - "%s: Upload completed. Response content: %s", - asset_path, - r.content, - ) - rxml = fromstring(r.text) - m = re.match(r"\{.+?\}", rxml.tag) - ns = m.group(0) if m else "" - final_etag = rxml.findtext(f"{ns}ETag") - if final_etag is not None: - final_etag = final_etag.strip('"') - if final_etag != filetag: - raise RuntimeError( - "Server and client disagree on final ETag of uploaded file;" - f" server says {final_etag}, client says {filetag}" - ) - # else: Error? Warning? - resp = client.post(f"/uploads/{upload_id}/validate/") - blob_id = resp["blob_id"] + # else: Error? Warning? + resp = client.post(f"/uploads/{upload_id}/validate/") + blob_id = resp["blob_id"] + except Exception: + post_upload_size_check(self.filepath, total_size, True) + raise + else: + post_upload_size_check(self.filepath, total_size, False) lgr.debug("%s: Assigning asset blob to dandiset & version", asset_path) yield {"status": "producing asset"} if replacing is not None: diff --git a/dandi/files/zarr.py b/dandi/files/zarr.py index a39b5153d..bcb5dba7f 100644 --- a/dandi/files/zarr.py +++ b/dandi/files/zarr.py @@ -33,7 +33,13 @@ ) from dandi.metadata.core import get_default_metadata from dandi.misctypes import DUMMY_DANDI_ZARR_CHECKSUM, BasePath, Digest -from dandi.utils import chunked, exclude_from_zarr, pluralize +from dandi.utils import ( + chunked, + exclude_from_zarr, + pluralize, + post_upload_size_check, + pre_upload_size_check, +) from .bases import LocalDirectoryAsset from ..validate_types import Scope, Severity, ValidationOrigin, ValidationResult @@ -551,15 +557,21 @@ def mkzarr() -> str: def _upload_zarr_file( storage_session: RESTFullAPIClient, upload_url: str, item: UploadItem ) -> int: - with item.filepath.open("rb") as fp: - storage_session.put( - upload_url, - data=fp, - json_resp=False, - retry_if=_retry_zarr_file, - headers={"Content-MD5": item.base64_digest}, - ) - return item.size + try: + with item.filepath.open("rb") as fp: + storage_session.put( + upload_url, + data=fp, + json_resp=False, + retry_if=_retry_zarr_file, + headers={"Content-MD5": item.base64_digest}, + ) + except Exception: + post_upload_size_check(item.filepath, item.size, True) + raise + else: + post_upload_size_check(item.filepath, item.size, False) + return item.size def _retry_zarr_file(r: requests.Response) -> bool: @@ -634,7 +646,12 @@ class UploadItem: @classmethod def from_entry(cls, e: LocalZarrEntry, digest: str) -> UploadItem: - return cls(entry_path=str(e), filepath=e.filepath, digest=digest, size=e.size) + return cls( + entry_path=str(e), + filepath=e.filepath, + digest=digest, + size=pre_upload_size_check(e.filepath), + ) @property def base64_digest(self) -> str: diff --git a/dandi/utils.py b/dandi/utils.py index 0046b7a5c..549bcbf4f 100644 --- a/dandi/utils.py +++ b/dandi/utils.py @@ -19,6 +19,7 @@ import shutil import subprocess import sys +from time import sleep import traceback import types from typing import IO, Any, List, Optional, Protocol, TypeVar, Union @@ -834,3 +835,32 @@ def _prepare_path_parts(paths: Iterable[str | PurePath]) -> list[tuple[str, ...] def _starts_with(t: tuple[str, ...], prefix: tuple[str, ...]) -> bool: return t[: len(prefix)] == prefix + + +def pre_upload_size_check(path: Path) -> int: + # If the filesystem reports a size of zero for a file we're about to + # upload, double-check the size in case we're on a flaky NFS system. + for naptime in [0] + [0.1] * 19: + sleep(naptime) + size = path.stat().st_size + if size != 0: + return size + return size + + +def post_upload_size_check(path: Path, pre_check_size: int, erroring: bool) -> None: + # More checks for NFS flakiness + size = path.stat().st_size + if size != pre_check_size: + if erroring: + lgr.error( + "Size of %s was %d at start of upload but is now %d after upload", + path, + pre_check_size, + size, + ) + else: + raise RuntimeError( + f"Size of {path} was {pre_check_size} at start of upload but is" + f" now {size} after upload" + )