From 82ff9fc5624c953ce84cab13b2bfe0b394d0c609 Mon Sep 17 00:00:00 2001 From: fjetter Date: Thu, 29 Aug 2024 18:04:46 +0200 Subject: [PATCH] dask order no longer needs dependencies copy --- distributed/scheduler.py | 27 ++++++--------------------- 1 file changed, 6 insertions(+), 21 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 35f2aecb22..ca5d961a09 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -4615,7 +4615,7 @@ async def add_nanny(self, comm: Comm, address: str) -> None: def _find_lost_dependencies( self, dsk: dict[Key, T_runspec], - dependencies: dict[Key, set[Key]], + dependencies: DependenciesMapping, keys: set[Key], ) -> set[Key]: lost_keys = set() @@ -4646,7 +4646,7 @@ def _create_taskstate_from_graph( *, start: float, dsk: dict[Key, T_runspec], - dependencies: dict, + dependencies: DependenciesMapping, keys: set[Key], ordered: dict[Key, int], client: str, @@ -4795,7 +4795,7 @@ def _create_taskstate_from_graph( def _remove_done_tasks_from_dsk( self, dsk: dict[Key, T_runspec], - dependencies: dict[Key, set[Key]], + dependencies: DependenciesMapping, ) -> None: # Avoid computation that is already finished done = set() # tasks that are already done @@ -4826,7 +4826,6 @@ def _remove_done_tasks_from_dsk( stack.append(dep) for anc in done: dsk.pop(anc, None) - dependencies.pop(anc, None) @log_errors async def update_graph( @@ -4886,19 +4885,7 @@ async def update_graph( dsk = _cull(dsk, keys) if not internal_priority: - # Removing all non-local keys before calling order() - dsk_keys = set( - dsk - ) # intersection() of sets is much faster than dict_keys - stripped_deps = { - k: v.intersection(dsk_keys) - for k, v in dependencies.items() - if k in dsk_keys - } - - internal_priority = await offload( - dask.order.order, dsk=dsk, dependencies=stripped_deps - ) + internal_priority = await offload(dask.order.order, dsk=dsk) ordering_done = time() logger.debug("Ordering done.") @@ -9389,7 +9376,7 @@ def _materialize_graph( global_annotations: dict[str, Any], validate: bool, keys: set[Key], -) -> tuple[dict[Key, T_runspec], dict[Key, set[Key]], dict[str, dict[Key, Any]]]: +) -> tuple[dict[Key, T_runspec], DependenciesMapping, dict[str, dict[Key, Any]]]: dsk: dict = ensure_dict(graph) if validate: for k in dsk: @@ -9418,9 +9405,7 @@ def _materialize_graph( logger.debug( "Removing aliases. Started with %i and got %i left", len(dsk2), len(dsk3) ) - # FIXME: There should be no need to fully materialize and copy this but some - # sections in the scheduler are mutating it. - dependencies = {k: set(v) for k, v in DependenciesMapping(dsk3).items()} + dependencies = DependenciesMapping(dsk3) return dsk3, dependencies, annotations_by_type