From a0ba0cbe6616122dbb6f379642d29a5d1e26342d Mon Sep 17 00:00:00 2001 From: Yaroslav Halchenko Date: Mon, 16 Dec 2024 10:28:14 -0500 Subject: [PATCH 1/5] Do not update on progress more frequently than once in two seconds per file or when done This is primarily to resolve the problem of pyout "dragging us behind" which slows down entire download process. https://github.com/dandi/dandi-cli/issues/1549 --- dandi/download.py | 35 ++++++++++++++++++++++++++++++----- 1 file changed, 30 insertions(+), 5 deletions(-) diff --git a/dandi/download.py b/dandi/download.py index d1bf4234b..27b66a219 100644 --- a/dandi/download.py +++ b/dandi/download.py @@ -358,13 +358,37 @@ def download_generator(self) -> Iterator[dict]: lock=lock, ) + def _progress_filter(gen): + """To reduce load on pyout etc, make progress reports only if enough time + from prior report has passed (over 2 seconds) or we are done (got 100%). + + Note that it requires "awareness" from the code below to issue other messages + with bundling with done% reporting if reporting on progress of some kind (e.g., + adjusting "message"). + """ + prior_time = 0 + warned = False + for rec in gen: + current_time = time.time() + if done_perc := rec.get("done%", 0): + if isinstance(done_perc, (int, float)): + if current_time - prior_time < 2 and done_perc != 100: + continue + elif not warned: + warned = True + lgr.warning( + "Received non numeric done%%: %r", done_perc + ) + prior_time = current_time + yield rec + # If exception is raised we might just raise it, or yield # an error record gen = { "raise": _download_generator, "yield": _download_generator_guard(path, _download_generator), }[self.on_error] - + gen = _progress_filter(gen) if self.yield_generator_for_fields: yield {"path": path, self.yield_generator_for_fields: gen} else: @@ -1247,9 +1271,9 @@ def feed(self, path: str, status: dict) -> Iterator[dict]: self.files[path].downloaded = size self.maxsize += size self.set_status(out) - yield out if self.zarr_size: - yield self.get_done() + out.update(self.get_done()) + yield out elif keys == ["size"]: self.files[path].size = size self.maxsize += status["size"] @@ -1274,11 +1298,11 @@ def feed(self, path: str, status: dict) -> Iterator[dict]: self.files[path].state = DLState.ERROR out = {"message": self.message} self.set_status(out) - yield out sz = self.files[path].size if sz is not None: self.maxsize -= sz - yield self.get_done() + out.update(self.get_done()) + yield out elif keys == ["checksum"]: pass elif status == {"status": "setting mtime"}: @@ -1287,6 +1311,7 @@ def feed(self, path: str, status: dict) -> Iterator[dict]: self.files[path].state = DLState.DONE out = {"message": self.message} self.set_status(out) + out.update(self.get_done()) yield out else: lgr.warning( From e02642a7e9dd882a5aac08b33c353bb0af331e85 Mon Sep 17 00:00:00 2001 From: Yaroslav Halchenko Date: Mon, 16 Dec 2024 10:28:22 -0500 Subject: [PATCH 2/5] Do not add unique summaries over how many done, limit counts to 5 max shown --- dandi/support/pyout.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/dandi/support/pyout.py b/dandi/support/pyout.py index 7ceaf5680..ccceb54de 100644 --- a/dandi/support/pyout.py +++ b/dandi/support/pyout.py @@ -3,6 +3,7 @@ from collections import Counter import datetime import logging +import re import sys import time @@ -54,7 +55,11 @@ def summary_dates(values): def counts(values): - return [f"{v:d} {k}" for k, v in Counter(values).items()] + vals = [f"{v:d} {k}" for k, v in Counter(values).items()] + # limit to 5 flines + if len(vals) > 5: + vals = vals[:4] + ["+%d more" % (len(vals) - 4)] + return vals # class mapped_counts(object): @@ -158,7 +163,8 @@ def get_style(hide_if_missing=True): color=dict( re_lookup=[["^exists", "yellow"], ["^(failed|error|ERROR)", "red"]] ), - aggregate=counts, + # replace numbers since we do not want unique among them + aggregate=lambda values: counts(re.sub(r"\d+", "some", v) for v in values), ), "progress": progress_style, "done%": progress_style, From 21dcfbc52596feaaab8f1c350b0d9ae84ba7c488 Mon Sep 17 00:00:00 2001 From: Yaroslav Halchenko Date: Mon, 16 Dec 2024 15:49:27 -0500 Subject: [PATCH 3/5] RF: centralize some logic to avoid duplication + fixup tests for adjusted here/before logic Now, that to filter out some progress records we rely on having "done%" reported, reflect that we are reportint it for final records --- dandi/download.py | 11 +++---- dandi/tests/test_download.py | 64 +++++++++++++++++++++++++----------- 2 files changed, 49 insertions(+), 26 deletions(-) diff --git a/dandi/download.py b/dandi/download.py index 27b66a219..3f03f8ffe 100644 --- a/dandi/download.py +++ b/dandi/download.py @@ -1291,18 +1291,15 @@ def feed(self, path: str, status: dict) -> Iterator[dict]: elif status.get("status") == "error": if "checksum" in status: self.files[path].state = DLState.CHECKSUM_ERROR - out = {"message": self.message} - self.set_status(out) - yield out else: self.files[path].state = DLState.ERROR - out = {"message": self.message} - self.set_status(out) sz = self.files[path].size if sz is not None: self.maxsize -= sz - out.update(self.get_done()) - yield out + out = {"message": self.message} + self.set_status(out) + out.update(self.get_done()) + yield out elif keys == ["checksum"]: pass elif status == {"status": "setting mtime"}: diff --git a/dandi/tests/test_download.py b/dandi/tests/test_download.py index a06d94aec..fb9626d36 100644 --- a/dandi/tests/test_download.py +++ b/dandi/tests/test_download.py @@ -579,7 +579,7 @@ def test_download_zarr_subdir_has_only_subdirs( {"done": 20, "done%": 20 / 42 * 100}, {"done": 40, "done%": 40 / 42 * 100}, {"done": 42, "done%": 100.0}, - {"status": "done", "message": "1 done"}, + {"done": 42, "done%": 100.0, "status": "done", "message": "1 done"}, ], ), ( # 1 @@ -618,8 +618,8 @@ def test_download_zarr_subdir_has_only_subdirs( {"done": 122, "done%": 122 / 169 * 100}, {"done": 162, "done%": 162 / 169 * 100}, {"done": 169, "done%": 100.0}, - {"message": "1 done"}, - {"status": "done", "message": "2 done"}, + {"done": 169, "done%": 100.0, "message": "1 done"}, + {"done": 169, "done%": 100.0, "status": "done", "message": "2 done"}, ], ), ( # 2 @@ -657,10 +657,15 @@ def test_download_zarr_subdir_has_only_subdirs( {"done": 42, "done%": 42 / 169 * 100}, {"done": 82, "done%": 82 / 169 * 100}, {"done": 122, "done%": 122 / 169 * 100}, - {"message": "1 done"}, + {"done": 122, "done%": 122 / 169 * 100, "message": "1 done"}, {"done": 162, "done%": 162 / 169 * 100}, {"done": 169, "done%": 169 / 169 * 100}, - {"status": "done", "message": "2 done"}, + { + "done": 169, + "done%": 169 / 169 * 100, + "status": "done", + "message": "2 done", + }, ], ), ( # 3 @@ -694,13 +699,13 @@ def test_download_zarr_subdir_has_only_subdirs( {"done": 20, "done%": 20 / 169 * 100}, {"done": 40, "done%": 40 / 169 * 100}, {"done": 42, "done%": 42 / 169 * 100}, - {"message": "1 done"}, + {"done": 42, "done%": 42 / 169 * 100, "message": "1 done"}, {"done": 42, "done%": 42 / 169 * 100}, {"done": 82, "done%": 82 / 169 * 100}, {"done": 122, "done%": 122 / 169 * 100}, {"done": 162, "done%": 162 / 169 * 100}, {"done": 169, "done%": 100.0}, - {"status": "done", "message": "2 done"}, + {"done": 169, "done%": 100.0, "status": "done", "message": "2 done"}, ], ), ( # 4 @@ -730,10 +735,14 @@ def test_download_zarr_subdir_has_only_subdirs( {"done": 20, "done%": 20 / 169 * 100}, {"done": 60, "done%": 60 / 169 * 100}, {"done": 80, "done%": 80 / 169 * 100}, - {"message": "1 errored"}, - {"done": 40, "done%": 40 / 169 * 100}, + {"done": 40, "done%": 40 / 169 * 100, "message": "1 errored"}, {"done": 42, "done%": 42 / 169 * 100}, - {"status": "error", "message": "1 done, 1 errored"}, + { + "done": 42, + "done%": 42 / 169 * 100, + "status": "error", + "message": "1 done, 1 errored", + }, ], ), ( # 5 @@ -762,14 +771,18 @@ def test_download_zarr_subdir_has_only_subdirs( ], [ {"size": 169}, - {"message": "1 skipped"}, - {"done": 127, "done%": (127 + 0) / 169 * 100}, + {"done": 127, "done%": (127 + 0) / 169 * 100, "message": "1 skipped"}, {"status": "downloading"}, {"done": 127 + 0, "done%": (127 + 0) / 169 * 100}, {"done": 127 + 20, "done%": (127 + 20) / 169 * 100}, {"done": 127 + 40, "done%": (127 + 40) / 169 * 100}, {"done": 127 + 42, "done%": 100.0}, - {"status": "done", "message": "1 done, 1 skipped"}, + { + "done": 127 + 42, + "done%": 100.0, + "status": "done", + "message": "1 done, 1 skipped", + }, ], ), ( # 7 @@ -813,8 +826,13 @@ def test_download_zarr_subdir_has_only_subdirs( {"done": 122, "done%": 122 / 169 * 100}, {"done": 162, "done%": 162 / 169 * 100}, {"done": 169, "done%": 100.0}, - {"message": "1 errored"}, - {"status": "error", "message": "1 done, 1 errored"}, + {"done": 169, "done%": 100.0, "message": "1 errored"}, + { + "done": 169, + "done%": 100.0, + "status": "error", + "message": "1 done, 1 errored", + }, ], ), ( # 8 @@ -853,8 +871,7 @@ def test_download_zarr_subdir_has_only_subdirs( [ {"size": 179}, {"status": "downloading"}, - {"message": "1 skipped"}, - {"done": 10, "done%": 10 / 179 * 100}, + {"done": 10, "done%": 10 / 179 * 100, "message": "1 skipped"}, {"done": 10, "done%": 10 / 179 * 100}, {"done": 10, "done%": 10 / 179 * 100}, {"done": 10 + 20, "done%": (10 + 20) / 179 * 100}, @@ -862,10 +879,19 @@ def test_download_zarr_subdir_has_only_subdirs( {"done": 10 + 80, "done%": (10 + 80) / 179 * 100}, {"done": 10 + 120, "done%": (10 + 120) / 179 * 100}, {"done": 10 + 122, "done%": (10 + 122) / 179 * 100}, - {"message": "1 errored, 1 skipped"}, + { + "done": 10 + 122, + "done%": (10 + 122) / 179 * 100, + "message": "1 errored, 1 skipped", + }, {"done": 10 + 162, "done%": (10 + 162) / 179 * 100}, {"done": 179, "done%": 100.0}, - {"status": "error", "message": "1 done, 1 errored, 1 skipped"}, + { + "done": 179, + "done%": 100.0, + "status": "error", + "message": "1 done, 1 errored, 1 skipped", + }, ], ), ], From fe30c7b43853fd3184f7cbc5a57d7ea243885546 Mon Sep 17 00:00:00 2001 From: Yaroslav Halchenko Date: Mon, 16 Dec 2024 16:17:20 -0500 Subject: [PATCH 4/5] RF+OPT: centralize logic on creation of record on zarr download progress OPT: this avoids 2nd loop (after Counter) over the .files in .messge property which was used only to report stats anyways. This could be notable while reporting on a zarr with 100_000 or more files --- dandi/download.py | 67 +++++++++++++++++++---------------------------- 1 file changed, 27 insertions(+), 40 deletions(-) diff --git a/dandi/download.py b/dandi/download.py index 3f03f8ffe..497fdaad7 100644 --- a/dandi/download.py +++ b/dandi/download.py @@ -1193,27 +1193,6 @@ class ProgressCombiner: prev_status: str = "" yielded_size: bool = False - @property - def message(self) -> str: - done = 0 - errored = 0 - skipped = 0 - for s in self.files.values(): - if s.state is DLState.DONE: - done += 1 - elif s.state in (DLState.ERROR, DLState.CHECKSUM_ERROR): - errored += 1 - elif s.state is DLState.SKIPPED: - skipped += 1 - parts = [] - if done: - parts.append(f"{done} done") - if errored: - parts.append(f"{errored} errored") - if skipped: - parts.append(f"{skipped} skipped") - return ", ".join(parts) - def get_done(self) -> dict: total_downloaded = sum( s.downloaded @@ -1231,7 +1210,8 @@ def get_done(self) -> dict: "done%": total_downloaded / self.zarr_size * 100 if self.zarr_size else 0, } - def set_status(self, statusdict: dict) -> None: + def get_status(self, report_done: bool = True) -> dict: + state_qtys = Counter(s.state for s in self.files.values()) total = len(self.files) if ( @@ -1250,9 +1230,28 @@ def set_status(self, statusdict: dict) -> None: new_status = "downloading" else: new_status = "" + + statusdict = {} + + if report_done: + msg_comps = [] + for msg_label, states in { + "done": (DLState.DONE,), + "errored": (DLState.ERROR, DLState.CHECKSUM_ERROR), + "skipped": (DLState.SKIPPED,), + }.items(): + if count := sum(state_qtys.get(state, 0) for state in states): + msg_comps.append(f"{count} {msg_label}") + if msg_comps: + statusdict["message"] = ", ".join(msg_comps) + if new_status != self.prev_status: - statusdict["status"] = new_status - self.prev_status = new_status + self.prev_status = statusdict["status"] = new_status + + if report_done and self.zarr_size: + statusdict.update(self.get_done()) + + return statusdict def feed(self, path: str, status: dict) -> Iterator[dict]: keys = list(status.keys()) @@ -1265,15 +1264,11 @@ def feed(self, path: str, status: dict) -> Iterator[dict]: yield {"size": self.zarr_size} if status.get("status") == "skipped": self.files[path].state = DLState.SKIPPED - out = {"message": self.message} # Treat skipped as "downloaded" for the matter of accounting if size is not None: self.files[path].downloaded = size self.maxsize += size - self.set_status(out) - if self.zarr_size: - out.update(self.get_done()) - yield out + yield self.get_status() elif keys == ["size"]: self.files[path].size = size self.maxsize += status["size"] @@ -1281,9 +1276,7 @@ def feed(self, path: str, status: dict) -> Iterator[dict]: yield self.get_done() elif status == {"status": "downloading"}: self.files[path].state = DLState.DOWNLOADING - out = {} - self.set_status(out) - if out: + if out := self.get_status(report_done=False): yield out elif "done" in status: self.files[path].downloaded = status["done"] @@ -1296,20 +1289,14 @@ def feed(self, path: str, status: dict) -> Iterator[dict]: sz = self.files[path].size if sz is not None: self.maxsize -= sz - out = {"message": self.message} - self.set_status(out) - out.update(self.get_done()) - yield out + yield self.get_status() elif keys == ["checksum"]: pass elif status == {"status": "setting mtime"}: pass elif status == {"status": "done"}: self.files[path].state = DLState.DONE - out = {"message": self.message} - self.set_status(out) - out.update(self.get_done()) - yield out + yield self.get_status() else: lgr.warning( "Unexpected download status dict for %r received: %r", path, status From 2f740752f0d18ab68a9920f636b6ccec3f73811c Mon Sep 17 00:00:00 2001 From: Yaroslav Halchenko Date: Mon, 16 Dec 2024 16:22:41 -0500 Subject: [PATCH 5/5] Yield error if attempts_allowed is not int MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit For that exception we did get logged: ❯ grep Assertion /home/yoh/.local/state/dandi-cli/log/2024.12.16* /home/yoh/.local/state/dandi-cli/log/2024.12.16-15.24.12Z-119733.log:AssertionError: attempts_allowed_or_not is None of type which makes little sense (besides some thread unsafe handling or None becoming bool(True))). To overcome and avoid need for assert, just fold that condition directly into "if" statement. --- dandi/download.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/dandi/download.py b/dandi/download.py index 497fdaad7..ca94ae7db 100644 --- a/dandi/download.py +++ b/dandi/download.py @@ -803,14 +803,9 @@ def _download_file( attempts_allowed=attempts_allowed, downloaded_in_attempt=downloaded_in_attempt, ) - if not attempts_allowed: + if not isinstance(attempts_allowed_or_not, int) or not attempts_allowed: yield {"status": "error", "message": str(exc)} return - # for clear(er) typing, here we get only with int - assert isinstance(attempts_allowed_or_not, int), ( - f"attempts_allowed_or_not is {attempts_allowed_or_not!r} " - f"of type {type(attempts_allowed_or_not)}" - ) attempts_allowed = attempts_allowed_or_not else: lgr.warning("downloader logic: We should not be here!")