Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Log key collision count in update_graph log event #8692

Merged
merged 7 commits into from
Jun 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 19 additions & 7 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -4590,11 +4590,6 @@ def _create_taskstate_from_graph(

lost_keys = self._match_graph_with_tasks(dsk, dependencies, keys)

if len(dsk) > 1:
self.log_event(
["all", client], {"action": "update_graph", "count": len(dsk)}
)

if lost_keys:
self.report({"op": "cancelled-keys", "keys": lost_keys}, client=client)
self.client_releases_keys(
Expand All @@ -4616,13 +4611,28 @@ def _create_taskstate_from_graph(
computation.annotations.update(global_annotations)
del global_annotations

runnable, touched_tasks, new_tasks = self._generate_taskstates(
(
runnable,
touched_tasks,
new_tasks,
colliding_task_count,
) = self._generate_taskstates(
keys=keys,
dsk=dsk,
dependencies=dependencies,
computation=computation,
)

if len(dsk) > 1 or colliding_task_count:
self.log_event(
["all", client],
{
"action": "update_graph",
"count": len(dsk),
"key-collisions": colliding_task_count,
},
)

keys_with_annotations = self._apply_annotations(
tasks=new_tasks,
annotations_by_type=annotations_by_type,
Expand Down Expand Up @@ -4815,6 +4825,7 @@ def _generate_taskstates(
touched_keys = set()
touched_tasks = []
tgs_with_bad_run_spec = set()
colliding_task_count = 0
while stack:
k = stack.pop()
if k in touched_keys:
Expand Down Expand Up @@ -4860,6 +4871,7 @@ def _generate_taskstates(
# dask/dask#9888.
dependencies[k] = deps_lhs

colliding_task_count += 1
if ts.group not in tgs_with_bad_run_spec:
tgs_with_bad_run_spec.add(ts.group)
logger.warning(
Expand Down Expand Up @@ -4912,7 +4924,7 @@ def _generate_taskstates(
len(touched_tasks),
len(keys),
)
return runnable, touched_tasks, new_tasks
return runnable, touched_tasks, new_tasks, colliding_task_count

def _apply_annotations(
self,
Expand Down
19 changes: 19 additions & 0 deletions distributed/tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -4789,6 +4789,23 @@ async def test_resubmit_different_task_same_key_before_previous_is_done(c, s, de
For a real world example where this can trigger, see
https://github.com/dask/dask/issues/9888
"""
seen = False

def _match(event):
_, msg = event
return (
isinstance(msg, dict)
and msg.get("action", None) == "update_graph"
and msg["key-collisions"] > 0
)

def handler(ev):
if _match(ev):
nonlocal seen
seen = True

c.subscribe_topic("all", handler)

x1 = c.submit(inc, 1, key="x1")
y_old = c.submit(inc, x1, key="y")

Expand All @@ -4803,6 +4820,8 @@ async def test_resubmit_different_task_same_key_before_previous_is_done(c, s, de

assert "Detected different `run_spec` for key 'y'" in log.getvalue()

await async_poll_for(lambda: seen, timeout=5)

async with Worker(s.address):
# Used old run_spec
assert await y_old == 3
Expand Down
Loading