From e494e8883bf49c6921497e16f3dc99eec1440208 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Thu, 1 Aug 2024 17:28:40 -0700 Subject: [PATCH] focus on task resources --- distributed/client.py | 6 +++--- distributed/scheduler.py | 10 ++++++---- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/distributed/client.py b/distributed/client.py index d7fe19f91c..c9f239e14b 100644 --- a/distributed/client.py +++ b/distributed/client.py @@ -3313,7 +3313,7 @@ def _graph_to_futures( retries=None, fifo_timeout=0, actors=None, - annotations_by_type=None, + task_resources=None, ): with self._refcount_lock: if actors is not None and actors is not True and actors is not False: @@ -3381,7 +3381,7 @@ def _graph_to_futures( "actors": actors, "code": ToPickle(computations), "annotations": ToPickle(annotations), - "annotations_by_type": ToPickle(annotations_by_type or {}), + "task_resources": ToPickle(task_resources), "span_metadata": ToPickle(span_metadata), } ) @@ -3471,7 +3471,7 @@ def get( user_priority=priority, actors=actors, span_metadata=SpanMetadata(collections=[{"type": "low-level-graph"}]), - annotations_by_type=kwargs.get("annotations_by_type", None), + task_resources=kwargs.get("task_resources"), ) packed = pack_data(keys, futures) if sync: diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 2e4c276773..d0e7f12860 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -4753,7 +4753,7 @@ async def update_graph( fifo_timeout: float = 0.0, code: tuple[SourceCode, ...] = (), annotations: dict | None = None, - annotations_by_type: dict | None = None, + task_resources: dict | None = None, stimulus_id: str | None = None, ) -> None: start = time() @@ -4772,15 +4772,17 @@ async def update_graph( ( dsk, dependencies, - _annotations_by_type, + annotations_by_type, ) = await offload( _materialize_graph, graph=graph, global_annotations=annotations or {}, ) del graph - annotations_by_type = _annotations_by_type.update(annotations_by_type or {}) - del _annotations_by_type + + if task_resources: + annotations_by_type["resources"].update(task_resources) + if not internal_priority: # Removing all non-local keys before calling order() dsk_keys = set(