Skip to content

Commit

Permalink
Merge pull request #1535 from dandi/enh-digest
Browse files Browse the repository at this point in the history
Refactor some download code for readability + ensure that we checksum resumed downloads
  • Loading branch information
yarikoptic authored Nov 26, 2024
2 parents a832b95 + 7efa0b2 commit 433297b
Show file tree
Hide file tree
Showing 2 changed files with 120 additions and 81 deletions.
186 changes: 107 additions & 79 deletions dandi/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -690,12 +690,12 @@ def _download_file(
"%s - found no digests in hashlib for any of %s", path, str(digests)
)

# TODO: how do we discover the total size????
# TODO: do not do it in-place, but rather into some "hidden" file
resuming = False
attempt = 0
nattempts = 3 # number to do, could be incremented if we downloaded a little
while attempt <= nattempts:
attempts_allowed: int = (
3 # number to do, could be incremented if we downloaded a little
)
while attempt <= attempts_allowed:
attempt += 1
try:
if digester:
Expand Down Expand Up @@ -724,7 +724,6 @@ def _download_file(
downloaded_digest.update(block)
downloaded += len(block)
downloaded_in_attempt += len(block)
# TODO: yield progress etc
out: dict[str, Any] = {"done": downloaded}
if size:
if downloaded > size and not warned:
Expand All @@ -737,7 +736,6 @@ def _download_file(
size,
)
out["done%"] = 100 * downloaded / size
# TODO: ETA etc
yield out
dldir.append(block)
break
Expand All @@ -749,87 +747,36 @@ def _download_file(
# Catching RequestException lets us retry on timeout & connection
# errors (among others) in addition to HTTP status errors.
except requests.RequestException as exc:
sleep_amount = random.random() * 5 * attempt
if os.environ.get("DANDI_DOWNLOAD_AGGRESSIVE_RETRY"):
# in such a case if we downloaded a little more --
# consider it a successful attempt
if downloaded_in_attempt > 0:
lgr.debug(
"%s - download failed on attempt #%d: %s, "
"but did download %d bytes, so considering "
"it a success and incrementing number of allowed attempts.",
path,
attempt,
exc,
downloaded_in_attempt,
)
nattempts += 1
# TODO: actually we should probably retry only on selected codes,
if exc.response is not None:
if exc.response.status_code not in (
400, # Bad Request, but happened with gider:
# https://github.com/dandi/dandi-cli/issues/87
*RETRY_STATUSES,
):
lgr.debug(
"%s - download failed due to response %d: %s",
path,
exc.response.status_code,
exc,
)
yield {"status": "error", "message": str(exc)}
return
elif retry_after := exc.response.headers.get("Retry-After"):
# playing safe
if not str(retry_after).isdigit():
# our code is wrong, do not crash but issue warning so
# we might get report/fix it up
lgr.warning(
"%s - download failed due to response %d with non-integer"
" Retry-After=%r: %s",
path,
exc.response.status_code,
retry_after,
exc,
)
yield {"status": "error", "message": str(exc)}
return
sleep_amount = int(retry_after)
lgr.debug(
"%s - download failed due to response %d with "
"Retry-After=%d: %s, will sleep and retry",
path,
exc.response.status_code,
sleep_amount,
exc,
)
else:
lgr.debug("%s - download failed: %s", path, exc)
yield {"status": "error", "message": str(exc)}
return
elif attempt >= nattempts:
lgr.debug(
"%s - download failed after %d attempts: %s", path, attempt, exc
)
attempts_allowed_or_not = _check_if_more_attempts_allowed(
path=path,
exc=exc,
attempt=attempt,
attempts_allowed=attempts_allowed,
downloaded_in_attempt=downloaded_in_attempt,
)
if not attempts_allowed:
yield {"status": "error", "message": str(exc)}
return
# if is_access_denied(exc) or attempt >= 2:
# raise
# sleep a little and retry
else:
lgr.debug(
"%s - download failed on attempt #%d: %s, will sleep a bit and retry",
path,
attempt,
exc,
)
time.sleep(sleep_amount)
# for clear(er) typing, here we get only with int
assert isinstance(attempts_allowed_or_not, int)
attempts_allowed = attempts_allowed_or_not
else:
lgr.warning("downloader logic: We should not be here!")

final_digest = None
if downloaded_digest and not resuming:
assert downloaded_digest is not None
final_digest = downloaded_digest.hexdigest() # we care only about hex
elif digests:
if resuming:
lgr.debug("%s - resumed download. Need to check full checksum.", path)
else:
assert not downloaded_digest
lgr.debug(
"%s - no digest was checked online. Need to check full checksum", path
)
final_digest = get_digest(path, algo)
if final_digest:
if digest_callback is not None:
assert isinstance(algo, str)
digest_callback(algo, final_digest)
Expand All @@ -842,6 +789,7 @@ def _download_file(
yield {"checksum": "ok"}
lgr.debug("%s - verified that has correct %s %s", path, algo, digest)
else:
lgr.debug("%s - no digests were provided", path)
# shouldn't happen with more recent metadata etc
yield {
"checksum": "-",
Expand Down Expand Up @@ -1085,6 +1033,86 @@ def downloads_gen():
yield {"status": "done"}


def _check_if_more_attempts_allowed(
path: Path,
exc: requests.RequestException,
attempt: int,
attempts_allowed: int,
downloaded_in_attempt: int,
) -> int | None:
"""Check if we should retry the download, return potentially adjusted 'attempts_allowed'"""
sleep_amount = random.random() * 5 * attempt
if os.environ.get("DANDI_DOWNLOAD_AGGRESSIVE_RETRY"):
# in such a case if we downloaded a little more --
# consider it a successful attempt
if downloaded_in_attempt > 0:
lgr.debug(
"%s - download failed on attempt #%d: %s, "
"but did download %d bytes, so considering "
"it a success and incrementing number of allowed attempts.",
path,
attempt,
exc,
downloaded_in_attempt,
)
attempts_allowed += 1
# TODO: actually we should probably retry only on selected codes,
if exc.response is not None:
if exc.response.status_code not in (
400, # Bad Request, but happened with gider:
# https://github.com/dandi/dandi-cli/issues/87
*RETRY_STATUSES,
):
lgr.debug(
"%s - download failed due to response %d: %s",
path,
exc.response.status_code,
exc,
)
return None
elif retry_after := exc.response.headers.get("Retry-After"):
# playing safe
if not str(retry_after).isdigit():
# our code is wrong, do not crash but issue warning so
# we might get report/fix it up
lgr.warning(
"%s - download failed due to response %d with non-integer"
" Retry-After=%r: %s",
path,
exc.response.status_code,
retry_after,
exc,
)
return None
sleep_amount = int(retry_after)
lgr.debug(
"%s - download failed due to response %d with "
"Retry-After=%d: %s, will sleep and retry",
path,
exc.response.status_code,
sleep_amount,
exc,
)
else:
lgr.debug("%s - download failed: %s", path, exc)
return None
elif attempt >= attempts_allowed:
lgr.debug("%s - download failed after %d attempts: %s", path, attempt, exc)
return None
# if is_access_denied(exc) or attempt >= 2:
# raise
# sleep a little and retry
else:
lgr.debug(
"%s - download failed on attempt #%d: %s, will sleep a bit and retry",
path,
attempt,
exc,
)
time.sleep(sleep_amount)
return attempts_allowed


def pairing(p: str, gen: Iterator[dict]) -> Iterator[tuple[str, dict]]:
for d in gen:
yield (p, d)
Expand Down
15 changes: 13 additions & 2 deletions dandi/tests/test_download.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,9 @@ def test_download_000027_assets_only(url: str, tmp_path: Path) -> None:
@mark.skipif_no_network
@pytest.mark.parametrize("resizer", [lambda sz: 0, lambda sz: sz // 2, lambda sz: sz])
@pytest.mark.parametrize("version", ["0.210831.2033", DRAFT])
@pytest.mark.parametrize("break_download", [False, True])
def test_download_000027_resume(
tmp_path: Path, resizer: Callable[[int], int], version: str
tmp_path: Path, resizer: Callable[[int], int], version: str, break_download: bool
) -> None:
url = f"https://dandiarchive.org/dandiset/000027/{version}"
digester = Digester()
Expand All @@ -137,15 +138,25 @@ def test_download_000027_resume(
nwb.rename(dlfile)
size = dlfile.stat().st_size
os.truncate(dlfile, resizer(size))
if break_download:
bad_load = b"bad"
if resizer(size) == size: # no truncation
os.truncate(dlfile, size - len(bad_load))
with open(dlfile, "ab") as f:
f.write(bad_load)
with (dldir / "checksum").open("w") as fp:
json.dump(digests, fp)

download(url, tmp_path, get_metadata=False)
assert list_paths(dsdir, dirs=True) == [
dsdir / "sub-RAT123",
dsdir / "sub-RAT123" / "sub-RAT123.nwb",
]
assert nwb.stat().st_size == size
assert digester(str(nwb)) == digests
if break_download:
assert digester(str(nwb)) != digests
else:
assert digester(str(nwb)) == digests


def test_download_newest_version(text_dandiset: SampleDandiset, tmp_path: Path) -> None:
Expand Down

0 comments on commit 433297b

Please sign in to comment.