diff --git a/distributed/client.py b/distributed/client.py index ad283a352a..d7fe19f91c 100644 --- a/distributed/client.py +++ b/distributed/client.py @@ -3313,6 +3313,7 @@ def _graph_to_futures( retries=None, fifo_timeout=0, actors=None, + annotations_by_type=None, ): with self._refcount_lock: if actors is not None and actors is not True and actors is not False: @@ -3380,6 +3381,7 @@ def _graph_to_futures( "actors": actors, "code": ToPickle(computations), "annotations": ToPickle(annotations), + "annotations_by_type": ToPickle(annotations_by_type or {}), "span_metadata": ToPickle(span_metadata), } ) @@ -3469,6 +3471,7 @@ def get( user_priority=priority, actors=actors, span_metadata=SpanMetadata(collections=[{"type": "low-level-graph"}]), + annotations_by_type=kwargs.get("annotations_by_type", None), ) packed = pack_data(keys, futures) if sync: diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 7dc30b8893..2e4c276773 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -4753,6 +4753,7 @@ async def update_graph( fifo_timeout: float = 0.0, code: tuple[SourceCode, ...] = (), annotations: dict | None = None, + annotations_by_type: dict | None = None, stimulus_id: str | None = None, ) -> None: start = time() @@ -4771,13 +4772,15 @@ async def update_graph( ( dsk, dependencies, - annotations_by_type, + _annotations_by_type, ) = await offload( _materialize_graph, graph=graph, global_annotations=annotations or {}, ) del graph + annotations_by_type = _annotations_by_type.update(annotations_by_type or {}) + del _annotations_by_type if not internal_priority: # Removing all non-local keys before calling order() dsk_keys = set(