Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce messaging to pyout on progress of downloads #1554

Merged
merged 5 commits into from
Dec 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 53 additions & 49 deletions dandi/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -358,13 +358,37 @@
lock=lock,
)

def _progress_filter(gen):
"""To reduce load on pyout etc, make progress reports only if enough time
from prior report has passed (over 2 seconds) or we are done (got 100%).

Note that it requires "awareness" from the code below to issue other messages
with bundling with done% reporting if reporting on progress of some kind (e.g.,
adjusting "message").
"""
prior_time = 0
warned = False
for rec in gen:
current_time = time.time()
if done_perc := rec.get("done%", 0):
if isinstance(done_perc, (int, float)):
if current_time - prior_time < 2 and done_perc != 100:
continue
elif not warned:
yarikoptic marked this conversation as resolved.
Show resolved Hide resolved
warned = True
lgr.warning(

Check warning on line 379 in dandi/download.py

View check run for this annotation

Codecov / codecov/patch

dandi/download.py#L377-L379

Added lines #L377 - L379 were not covered by tests
"Received non numeric done%%: %r", done_perc
)
prior_time = current_time
yield rec

# If exception is raised we might just raise it, or yield
# an error record
gen = {
"raise": _download_generator,
"yield": _download_generator_guard(path, _download_generator),
}[self.on_error]

gen = _progress_filter(gen)
if self.yield_generator_for_fields:
yield {"path": path, self.yield_generator_for_fields: gen}
else:
Expand Down Expand Up @@ -779,14 +803,9 @@
attempts_allowed=attempts_allowed,
downloaded_in_attempt=downloaded_in_attempt,
)
if not attempts_allowed:
if not isinstance(attempts_allowed_or_not, int) or not attempts_allowed:

Check warning on line 806 in dandi/download.py

View check run for this annotation

Codecov / codecov/patch

dandi/download.py#L806

Added line #L806 was not covered by tests
yield {"status": "error", "message": str(exc)}
return
# for clear(er) typing, here we get only with int
assert isinstance(attempts_allowed_or_not, int), (
f"attempts_allowed_or_not is {attempts_allowed_or_not!r} "
f"of type {type(attempts_allowed_or_not)}"
)
attempts_allowed = attempts_allowed_or_not
else:
lgr.warning("downloader logic: We should not be here!")
Expand Down Expand Up @@ -1169,27 +1188,6 @@
prev_status: str = ""
yielded_size: bool = False

@property
def message(self) -> str:
done = 0
errored = 0
skipped = 0
for s in self.files.values():
if s.state is DLState.DONE:
done += 1
elif s.state in (DLState.ERROR, DLState.CHECKSUM_ERROR):
errored += 1
elif s.state is DLState.SKIPPED:
skipped += 1
parts = []
if done:
parts.append(f"{done} done")
if errored:
parts.append(f"{errored} errored")
if skipped:
parts.append(f"{skipped} skipped")
return ", ".join(parts)

def get_done(self) -> dict:
total_downloaded = sum(
s.downloaded
Expand All @@ -1207,7 +1205,8 @@
"done%": total_downloaded / self.zarr_size * 100 if self.zarr_size else 0,
}

def set_status(self, statusdict: dict) -> None:
def get_status(self, report_done: bool = True) -> dict:

state_qtys = Counter(s.state for s in self.files.values())
total = len(self.files)
if (
Expand All @@ -1226,9 +1225,28 @@
new_status = "downloading"
else:
new_status = ""

statusdict = {}

if report_done:
msg_comps = []
for msg_label, states in {
"done": (DLState.DONE,),
"errored": (DLState.ERROR, DLState.CHECKSUM_ERROR),
"skipped": (DLState.SKIPPED,),
}.items():
if count := sum(state_qtys.get(state, 0) for state in states):
msg_comps.append(f"{count} {msg_label}")
if msg_comps:
statusdict["message"] = ", ".join(msg_comps)

if new_status != self.prev_status:
statusdict["status"] = new_status
self.prev_status = new_status
self.prev_status = statusdict["status"] = new_status

if report_done and self.zarr_size:
statusdict.update(self.get_done())

return statusdict

def feed(self, path: str, status: dict) -> Iterator[dict]:
keys = list(status.keys())
Expand All @@ -1241,53 +1259,39 @@
yield {"size": self.zarr_size}
if status.get("status") == "skipped":
self.files[path].state = DLState.SKIPPED
out = {"message": self.message}
# Treat skipped as "downloaded" for the matter of accounting
if size is not None:
self.files[path].downloaded = size
self.maxsize += size
self.set_status(out)
yield out
if self.zarr_size:
yield self.get_done()
yield self.get_status()
elif keys == ["size"]:
self.files[path].size = size
self.maxsize += status["size"]
if any(s.state is DLState.DOWNLOADING for s in self.files.values()):
yield self.get_done()
elif status == {"status": "downloading"}:
self.files[path].state = DLState.DOWNLOADING
out = {}
self.set_status(out)
if out:
if out := self.get_status(report_done=False):
yield out
elif "done" in status:
self.files[path].downloaded = status["done"]
yield self.get_done()
elif status.get("status") == "error":
if "checksum" in status:
self.files[path].state = DLState.CHECKSUM_ERROR
out = {"message": self.message}
self.set_status(out)
yield out
else:
self.files[path].state = DLState.ERROR
out = {"message": self.message}
self.set_status(out)
yield out
sz = self.files[path].size
if sz is not None:
self.maxsize -= sz
yield self.get_done()
yield self.get_status()
elif keys == ["checksum"]:
pass
elif status == {"status": "setting mtime"}:
pass
elif status == {"status": "done"}:
self.files[path].state = DLState.DONE
out = {"message": self.message}
self.set_status(out)
yield out
yield self.get_status()
else:
lgr.warning(
"Unexpected download status dict for %r received: %r", path, status
Expand Down
10 changes: 8 additions & 2 deletions dandi/support/pyout.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from collections import Counter
import datetime
import logging
import re
import sys
import time

Expand Down Expand Up @@ -54,7 +55,11 @@


def counts(values):
return [f"{v:d} {k}" for k, v in Counter(values).items()]
vals = [f"{v:d} {k}" for k, v in Counter(values).items()]
# limit to 5 flines
if len(vals) > 5:
vals = vals[:4] + ["+%d more" % (len(vals) - 4)]

Check warning on line 61 in dandi/support/pyout.py

View check run for this annotation

Codecov / codecov/patch

dandi/support/pyout.py#L61

Added line #L61 was not covered by tests
return vals


# class mapped_counts(object):
Expand Down Expand Up @@ -158,7 +163,8 @@
color=dict(
re_lookup=[["^exists", "yellow"], ["^(failed|error|ERROR)", "red"]]
),
aggregate=counts,
# replace numbers since we do not want unique among them
aggregate=lambda values: counts(re.sub(r"\d+", "some", v) for v in values),
),
"progress": progress_style,
"done%": progress_style,
Expand Down
64 changes: 45 additions & 19 deletions dandi/tests/test_download.py
Original file line number Diff line number Diff line change
Expand Up @@ -579,7 +579,7 @@ def test_download_zarr_subdir_has_only_subdirs(
{"done": 20, "done%": 20 / 42 * 100},
{"done": 40, "done%": 40 / 42 * 100},
{"done": 42, "done%": 100.0},
{"status": "done", "message": "1 done"},
{"done": 42, "done%": 100.0, "status": "done", "message": "1 done"},
],
),
( # 1
Expand Down Expand Up @@ -618,8 +618,8 @@ def test_download_zarr_subdir_has_only_subdirs(
{"done": 122, "done%": 122 / 169 * 100},
{"done": 162, "done%": 162 / 169 * 100},
{"done": 169, "done%": 100.0},
{"message": "1 done"},
{"status": "done", "message": "2 done"},
{"done": 169, "done%": 100.0, "message": "1 done"},
{"done": 169, "done%": 100.0, "status": "done", "message": "2 done"},
],
),
( # 2
Expand Down Expand Up @@ -657,10 +657,15 @@ def test_download_zarr_subdir_has_only_subdirs(
{"done": 42, "done%": 42 / 169 * 100},
{"done": 82, "done%": 82 / 169 * 100},
{"done": 122, "done%": 122 / 169 * 100},
{"message": "1 done"},
{"done": 122, "done%": 122 / 169 * 100, "message": "1 done"},
{"done": 162, "done%": 162 / 169 * 100},
{"done": 169, "done%": 169 / 169 * 100},
{"status": "done", "message": "2 done"},
{
"done": 169,
"done%": 169 / 169 * 100,
"status": "done",
"message": "2 done",
},
],
),
( # 3
Expand Down Expand Up @@ -694,13 +699,13 @@ def test_download_zarr_subdir_has_only_subdirs(
{"done": 20, "done%": 20 / 169 * 100},
{"done": 40, "done%": 40 / 169 * 100},
{"done": 42, "done%": 42 / 169 * 100},
{"message": "1 done"},
{"done": 42, "done%": 42 / 169 * 100, "message": "1 done"},
{"done": 42, "done%": 42 / 169 * 100},
{"done": 82, "done%": 82 / 169 * 100},
{"done": 122, "done%": 122 / 169 * 100},
{"done": 162, "done%": 162 / 169 * 100},
{"done": 169, "done%": 100.0},
{"status": "done", "message": "2 done"},
{"done": 169, "done%": 100.0, "status": "done", "message": "2 done"},
],
),
( # 4
Expand Down Expand Up @@ -730,10 +735,14 @@ def test_download_zarr_subdir_has_only_subdirs(
{"done": 20, "done%": 20 / 169 * 100},
{"done": 60, "done%": 60 / 169 * 100},
{"done": 80, "done%": 80 / 169 * 100},
{"message": "1 errored"},
{"done": 40, "done%": 40 / 169 * 100},
{"done": 40, "done%": 40 / 169 * 100, "message": "1 errored"},
{"done": 42, "done%": 42 / 169 * 100},
{"status": "error", "message": "1 done, 1 errored"},
{
"done": 42,
"done%": 42 / 169 * 100,
"status": "error",
"message": "1 done, 1 errored",
},
],
),
( # 5
Expand Down Expand Up @@ -762,14 +771,18 @@ def test_download_zarr_subdir_has_only_subdirs(
],
[
{"size": 169},
{"message": "1 skipped"},
{"done": 127, "done%": (127 + 0) / 169 * 100},
{"done": 127, "done%": (127 + 0) / 169 * 100, "message": "1 skipped"},
{"status": "downloading"},
{"done": 127 + 0, "done%": (127 + 0) / 169 * 100},
{"done": 127 + 20, "done%": (127 + 20) / 169 * 100},
{"done": 127 + 40, "done%": (127 + 40) / 169 * 100},
{"done": 127 + 42, "done%": 100.0},
{"status": "done", "message": "1 done, 1 skipped"},
{
"done": 127 + 42,
"done%": 100.0,
"status": "done",
"message": "1 done, 1 skipped",
},
],
),
( # 7
Expand Down Expand Up @@ -813,8 +826,13 @@ def test_download_zarr_subdir_has_only_subdirs(
{"done": 122, "done%": 122 / 169 * 100},
{"done": 162, "done%": 162 / 169 * 100},
{"done": 169, "done%": 100.0},
{"message": "1 errored"},
{"status": "error", "message": "1 done, 1 errored"},
{"done": 169, "done%": 100.0, "message": "1 errored"},
{
"done": 169,
"done%": 100.0,
"status": "error",
"message": "1 done, 1 errored",
},
],
),
( # 8
Expand Down Expand Up @@ -853,19 +871,27 @@ def test_download_zarr_subdir_has_only_subdirs(
[
{"size": 179},
{"status": "downloading"},
{"message": "1 skipped"},
{"done": 10, "done%": 10 / 179 * 100},
{"done": 10, "done%": 10 / 179 * 100, "message": "1 skipped"},
{"done": 10, "done%": 10 / 179 * 100},
{"done": 10, "done%": 10 / 179 * 100},
{"done": 10 + 20, "done%": (10 + 20) / 179 * 100},
{"done": 10 + 60, "done%": (10 + 60) / 179 * 100},
{"done": 10 + 80, "done%": (10 + 80) / 179 * 100},
{"done": 10 + 120, "done%": (10 + 120) / 179 * 100},
{"done": 10 + 122, "done%": (10 + 122) / 179 * 100},
{"message": "1 errored, 1 skipped"},
{
"done": 10 + 122,
"done%": (10 + 122) / 179 * 100,
"message": "1 errored, 1 skipped",
},
{"done": 10 + 162, "done%": (10 + 162) / 179 * 100},
{"done": 179, "done%": 100.0},
{"status": "error", "message": "1 done, 1 errored, 1 skipped"},
{
"done": 179,
"done%": 100.0,
"status": "error",
"message": "1 done, 1 errored, 1 skipped",
},
],
),
],
Expand Down
Loading