Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dask order uses task_spec #8842

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 6 additions & 21 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -4615,7 +4615,7 @@ async def add_nanny(self, comm: Comm, address: str) -> None:
def _find_lost_dependencies(
self,
dsk: dict[Key, T_runspec],
dependencies: dict[Key, set[Key]],
dependencies: DependenciesMapping,
keys: set[Key],
) -> set[Key]:
lost_keys = set()
Expand Down Expand Up @@ -4646,7 +4646,7 @@ def _create_taskstate_from_graph(
*,
start: float,
dsk: dict[Key, T_runspec],
dependencies: dict,
dependencies: DependenciesMapping,
keys: set[Key],
ordered: dict[Key, int],
client: str,
Expand Down Expand Up @@ -4795,7 +4795,7 @@ def _create_taskstate_from_graph(
def _remove_done_tasks_from_dsk(
self,
dsk: dict[Key, T_runspec],
dependencies: dict[Key, set[Key]],
dependencies: DependenciesMapping,
) -> None:
# Avoid computation that is already finished
done = set() # tasks that are already done
Expand Down Expand Up @@ -4826,7 +4826,6 @@ def _remove_done_tasks_from_dsk(
stack.append(dep)
for anc in done:
dsk.pop(anc, None)
dependencies.pop(anc, None)

@log_errors
async def update_graph(
Expand Down Expand Up @@ -4886,19 +4885,7 @@ async def update_graph(
dsk = _cull(dsk, keys)

if not internal_priority:
# Removing all non-local keys before calling order()
dsk_keys = set(
dsk
) # intersection() of sets is much faster than dict_keys
stripped_deps = {
k: v.intersection(dsk_keys)
for k, v in dependencies.items()
if k in dsk_keys
}

internal_priority = await offload(
dask.order.order, dsk=dsk, dependencies=stripped_deps
)
internal_priority = await offload(dask.order.order, dsk=dsk)
ordering_done = time()
logger.debug("Ordering done.")

Expand Down Expand Up @@ -9389,7 +9376,7 @@ def _materialize_graph(
global_annotations: dict[str, Any],
validate: bool,
keys: set[Key],
) -> tuple[dict[Key, T_runspec], dict[Key, set[Key]], dict[str, dict[Key, Any]]]:
) -> tuple[dict[Key, T_runspec], DependenciesMapping, dict[str, dict[Key, Any]]]:
dsk: dict = ensure_dict(graph)
if validate:
for k in dsk:
Expand Down Expand Up @@ -9418,9 +9405,7 @@ def _materialize_graph(
logger.debug(
"Removing aliases. Started with %i and got %i left", len(dsk2), len(dsk3)
)
# FIXME: There should be no need to fully materialize and copy this but some
# sections in the scheduler are mutating it.
dependencies = {k: set(v) for k, v in DependenciesMapping(dsk3).items()}
dependencies = DependenciesMapping(dsk3)
return dsk3, dependencies, annotations_by_type


Expand Down
Loading