Skip to content

Commit

Permalink
Merge pull request #1554 from dandi/rf-pyoutefficiency
Browse files Browse the repository at this point in the history
Reduce messaging to pyout on progress of downloads
  • Loading branch information
yarikoptic authored Dec 17, 2024
2 parents dc938f6 + 2f74075 commit 03af701
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 70 deletions.
102 changes: 53 additions & 49 deletions dandi/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -358,13 +358,37 @@ def download_generator(self) -> Iterator[dict]:
lock=lock,
)

def _progress_filter(gen):
"""To reduce load on pyout etc, make progress reports only if enough time
from prior report has passed (over 2 seconds) or we are done (got 100%).
Note that it requires "awareness" from the code below to issue other messages
with bundling with done% reporting if reporting on progress of some kind (e.g.,
adjusting "message").
"""
prior_time = 0
warned = False
for rec in gen:
current_time = time.time()
if done_perc := rec.get("done%", 0):
if isinstance(done_perc, (int, float)):
if current_time - prior_time < 2 and done_perc != 100:
continue
elif not warned:
warned = True
lgr.warning(
"Received non numeric done%%: %r", done_perc
)
prior_time = current_time
yield rec

# If exception is raised we might just raise it, or yield
# an error record
gen = {
"raise": _download_generator,
"yield": _download_generator_guard(path, _download_generator),
}[self.on_error]

gen = _progress_filter(gen)
if self.yield_generator_for_fields:
yield {"path": path, self.yield_generator_for_fields: gen}
else:
Expand Down Expand Up @@ -779,14 +803,9 @@ def _download_file(
attempts_allowed=attempts_allowed,
downloaded_in_attempt=downloaded_in_attempt,
)
if not attempts_allowed:
if not isinstance(attempts_allowed_or_not, int) or not attempts_allowed:
yield {"status": "error", "message": str(exc)}
return
# for clear(er) typing, here we get only with int
assert isinstance(attempts_allowed_or_not, int), (
f"attempts_allowed_or_not is {attempts_allowed_or_not!r} "
f"of type {type(attempts_allowed_or_not)}"
)
attempts_allowed = attempts_allowed_or_not
else:
lgr.warning("downloader logic: We should not be here!")
Expand Down Expand Up @@ -1169,27 +1188,6 @@ class ProgressCombiner:
prev_status: str = ""
yielded_size: bool = False

@property
def message(self) -> str:
done = 0
errored = 0
skipped = 0
for s in self.files.values():
if s.state is DLState.DONE:
done += 1
elif s.state in (DLState.ERROR, DLState.CHECKSUM_ERROR):
errored += 1
elif s.state is DLState.SKIPPED:
skipped += 1
parts = []
if done:
parts.append(f"{done} done")
if errored:
parts.append(f"{errored} errored")
if skipped:
parts.append(f"{skipped} skipped")
return ", ".join(parts)

def get_done(self) -> dict:
total_downloaded = sum(
s.downloaded
Expand All @@ -1207,7 +1205,8 @@ def get_done(self) -> dict:
"done%": total_downloaded / self.zarr_size * 100 if self.zarr_size else 0,
}

def set_status(self, statusdict: dict) -> None:
def get_status(self, report_done: bool = True) -> dict:

state_qtys = Counter(s.state for s in self.files.values())
total = len(self.files)
if (
Expand All @@ -1226,9 +1225,28 @@ def set_status(self, statusdict: dict) -> None:
new_status = "downloading"
else:
new_status = ""

statusdict = {}

if report_done:
msg_comps = []
for msg_label, states in {
"done": (DLState.DONE,),
"errored": (DLState.ERROR, DLState.CHECKSUM_ERROR),
"skipped": (DLState.SKIPPED,),
}.items():
if count := sum(state_qtys.get(state, 0) for state in states):
msg_comps.append(f"{count} {msg_label}")
if msg_comps:
statusdict["message"] = ", ".join(msg_comps)

if new_status != self.prev_status:
statusdict["status"] = new_status
self.prev_status = new_status
self.prev_status = statusdict["status"] = new_status

if report_done and self.zarr_size:
statusdict.update(self.get_done())

return statusdict

def feed(self, path: str, status: dict) -> Iterator[dict]:
keys = list(status.keys())
Expand All @@ -1241,53 +1259,39 @@ def feed(self, path: str, status: dict) -> Iterator[dict]:
yield {"size": self.zarr_size}
if status.get("status") == "skipped":
self.files[path].state = DLState.SKIPPED
out = {"message": self.message}
# Treat skipped as "downloaded" for the matter of accounting
if size is not None:
self.files[path].downloaded = size
self.maxsize += size
self.set_status(out)
yield out
if self.zarr_size:
yield self.get_done()
yield self.get_status()
elif keys == ["size"]:
self.files[path].size = size
self.maxsize += status["size"]
if any(s.state is DLState.DOWNLOADING for s in self.files.values()):
yield self.get_done()
elif status == {"status": "downloading"}:
self.files[path].state = DLState.DOWNLOADING
out = {}
self.set_status(out)
if out:
if out := self.get_status(report_done=False):
yield out
elif "done" in status:
self.files[path].downloaded = status["done"]
yield self.get_done()
elif status.get("status") == "error":
if "checksum" in status:
self.files[path].state = DLState.CHECKSUM_ERROR
out = {"message": self.message}
self.set_status(out)
yield out
else:
self.files[path].state = DLState.ERROR
out = {"message": self.message}
self.set_status(out)
yield out
sz = self.files[path].size
if sz is not None:
self.maxsize -= sz
yield self.get_done()
yield self.get_status()
elif keys == ["checksum"]:
pass
elif status == {"status": "setting mtime"}:
pass
elif status == {"status": "done"}:
self.files[path].state = DLState.DONE
out = {"message": self.message}
self.set_status(out)
yield out
yield self.get_status()
else:
lgr.warning(
"Unexpected download status dict for %r received: %r", path, status
Expand Down
10 changes: 8 additions & 2 deletions dandi/support/pyout.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from collections import Counter
import datetime
import logging
import re
import sys
import time

Expand Down Expand Up @@ -54,7 +55,11 @@ def summary_dates(values):


def counts(values):
return [f"{v:d} {k}" for k, v in Counter(values).items()]
vals = [f"{v:d} {k}" for k, v in Counter(values).items()]
# limit to 5 flines
if len(vals) > 5:
vals = vals[:4] + ["+%d more" % (len(vals) - 4)]
return vals


# class mapped_counts(object):
Expand Down Expand Up @@ -158,7 +163,8 @@ def get_style(hide_if_missing=True):
color=dict(
re_lookup=[["^exists", "yellow"], ["^(failed|error|ERROR)", "red"]]
),
aggregate=counts,
# replace numbers since we do not want unique among them
aggregate=lambda values: counts(re.sub(r"\d+", "some", v) for v in values),
),
"progress": progress_style,
"done%": progress_style,
Expand Down
64 changes: 45 additions & 19 deletions dandi/tests/test_download.py
Original file line number Diff line number Diff line change
Expand Up @@ -579,7 +579,7 @@ def test_download_zarr_subdir_has_only_subdirs(
{"done": 20, "done%": 20 / 42 * 100},
{"done": 40, "done%": 40 / 42 * 100},
{"done": 42, "done%": 100.0},
{"status": "done", "message": "1 done"},
{"done": 42, "done%": 100.0, "status": "done", "message": "1 done"},
],
),
( # 1
Expand Down Expand Up @@ -618,8 +618,8 @@ def test_download_zarr_subdir_has_only_subdirs(
{"done": 122, "done%": 122 / 169 * 100},
{"done": 162, "done%": 162 / 169 * 100},
{"done": 169, "done%": 100.0},
{"message": "1 done"},
{"status": "done", "message": "2 done"},
{"done": 169, "done%": 100.0, "message": "1 done"},
{"done": 169, "done%": 100.0, "status": "done", "message": "2 done"},
],
),
( # 2
Expand Down Expand Up @@ -657,10 +657,15 @@ def test_download_zarr_subdir_has_only_subdirs(
{"done": 42, "done%": 42 / 169 * 100},
{"done": 82, "done%": 82 / 169 * 100},
{"done": 122, "done%": 122 / 169 * 100},
{"message": "1 done"},
{"done": 122, "done%": 122 / 169 * 100, "message": "1 done"},
{"done": 162, "done%": 162 / 169 * 100},
{"done": 169, "done%": 169 / 169 * 100},
{"status": "done", "message": "2 done"},
{
"done": 169,
"done%": 169 / 169 * 100,
"status": "done",
"message": "2 done",
},
],
),
( # 3
Expand Down Expand Up @@ -694,13 +699,13 @@ def test_download_zarr_subdir_has_only_subdirs(
{"done": 20, "done%": 20 / 169 * 100},
{"done": 40, "done%": 40 / 169 * 100},
{"done": 42, "done%": 42 / 169 * 100},
{"message": "1 done"},
{"done": 42, "done%": 42 / 169 * 100, "message": "1 done"},
{"done": 42, "done%": 42 / 169 * 100},
{"done": 82, "done%": 82 / 169 * 100},
{"done": 122, "done%": 122 / 169 * 100},
{"done": 162, "done%": 162 / 169 * 100},
{"done": 169, "done%": 100.0},
{"status": "done", "message": "2 done"},
{"done": 169, "done%": 100.0, "status": "done", "message": "2 done"},
],
),
( # 4
Expand Down Expand Up @@ -730,10 +735,14 @@ def test_download_zarr_subdir_has_only_subdirs(
{"done": 20, "done%": 20 / 169 * 100},
{"done": 60, "done%": 60 / 169 * 100},
{"done": 80, "done%": 80 / 169 * 100},
{"message": "1 errored"},
{"done": 40, "done%": 40 / 169 * 100},
{"done": 40, "done%": 40 / 169 * 100, "message": "1 errored"},
{"done": 42, "done%": 42 / 169 * 100},
{"status": "error", "message": "1 done, 1 errored"},
{
"done": 42,
"done%": 42 / 169 * 100,
"status": "error",
"message": "1 done, 1 errored",
},
],
),
( # 5
Expand Down Expand Up @@ -762,14 +771,18 @@ def test_download_zarr_subdir_has_only_subdirs(
],
[
{"size": 169},
{"message": "1 skipped"},
{"done": 127, "done%": (127 + 0) / 169 * 100},
{"done": 127, "done%": (127 + 0) / 169 * 100, "message": "1 skipped"},
{"status": "downloading"},
{"done": 127 + 0, "done%": (127 + 0) / 169 * 100},
{"done": 127 + 20, "done%": (127 + 20) / 169 * 100},
{"done": 127 + 40, "done%": (127 + 40) / 169 * 100},
{"done": 127 + 42, "done%": 100.0},
{"status": "done", "message": "1 done, 1 skipped"},
{
"done": 127 + 42,
"done%": 100.0,
"status": "done",
"message": "1 done, 1 skipped",
},
],
),
( # 7
Expand Down Expand Up @@ -813,8 +826,13 @@ def test_download_zarr_subdir_has_only_subdirs(
{"done": 122, "done%": 122 / 169 * 100},
{"done": 162, "done%": 162 / 169 * 100},
{"done": 169, "done%": 100.0},
{"message": "1 errored"},
{"status": "error", "message": "1 done, 1 errored"},
{"done": 169, "done%": 100.0, "message": "1 errored"},
{
"done": 169,
"done%": 100.0,
"status": "error",
"message": "1 done, 1 errored",
},
],
),
( # 8
Expand Down Expand Up @@ -853,19 +871,27 @@ def test_download_zarr_subdir_has_only_subdirs(
[
{"size": 179},
{"status": "downloading"},
{"message": "1 skipped"},
{"done": 10, "done%": 10 / 179 * 100},
{"done": 10, "done%": 10 / 179 * 100, "message": "1 skipped"},
{"done": 10, "done%": 10 / 179 * 100},
{"done": 10, "done%": 10 / 179 * 100},
{"done": 10 + 20, "done%": (10 + 20) / 179 * 100},
{"done": 10 + 60, "done%": (10 + 60) / 179 * 100},
{"done": 10 + 80, "done%": (10 + 80) / 179 * 100},
{"done": 10 + 120, "done%": (10 + 120) / 179 * 100},
{"done": 10 + 122, "done%": (10 + 122) / 179 * 100},
{"message": "1 errored, 1 skipped"},
{
"done": 10 + 122,
"done%": (10 + 122) / 179 * 100,
"message": "1 errored, 1 skipped",
},
{"done": 10 + 162, "done%": (10 + 162) / 179 * 100},
{"done": 179, "done%": 100.0},
{"status": "error", "message": "1 done, 1 errored, 1 skipped"},
{
"done": 179,
"done%": 100.0,
"status": "error",
"message": "1 done, 1 errored, 1 skipped",
},
],
),
],
Expand Down

0 comments on commit 03af701

Please sign in to comment.