Skip to content

Commit

Permalink
focus on task resources
Browse files Browse the repository at this point in the history
  • Loading branch information
rjzamora committed Aug 2, 2024
1 parent 308111d commit e494e88
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 7 deletions.
6 changes: 3 additions & 3 deletions distributed/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3313,7 +3313,7 @@ def _graph_to_futures(
retries=None,
fifo_timeout=0,
actors=None,
annotations_by_type=None,
task_resources=None,
):
with self._refcount_lock:
if actors is not None and actors is not True and actors is not False:
Expand Down Expand Up @@ -3381,7 +3381,7 @@ def _graph_to_futures(
"actors": actors,
"code": ToPickle(computations),
"annotations": ToPickle(annotations),
"annotations_by_type": ToPickle(annotations_by_type or {}),
"task_resources": ToPickle(task_resources),
"span_metadata": ToPickle(span_metadata),
}
)
Expand Down Expand Up @@ -3471,7 +3471,7 @@ def get(
user_priority=priority,
actors=actors,
span_metadata=SpanMetadata(collections=[{"type": "low-level-graph"}]),
annotations_by_type=kwargs.get("annotations_by_type", None),
task_resources=kwargs.get("task_resources"),
)
packed = pack_data(keys, futures)
if sync:
Expand Down
10 changes: 6 additions & 4 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -4753,7 +4753,7 @@ async def update_graph(
fifo_timeout: float = 0.0,
code: tuple[SourceCode, ...] = (),
annotations: dict | None = None,
annotations_by_type: dict | None = None,
task_resources: dict | None = None,
stimulus_id: str | None = None,
) -> None:
start = time()
Expand All @@ -4772,15 +4772,17 @@ async def update_graph(
(
dsk,
dependencies,
_annotations_by_type,
annotations_by_type,
) = await offload(
_materialize_graph,
graph=graph,
global_annotations=annotations or {},
)
del graph
annotations_by_type = _annotations_by_type.update(annotations_by_type or {})
del _annotations_by_type

if task_resources:
annotations_by_type["resources"].update(task_resources)

if not internal_priority:
# Removing all non-local keys before calling order()
dsk_keys = set(
Expand Down

0 comments on commit e494e88

Please sign in to comment.