Skip to content

Commit

Permalink
[data] add backpressure reason
Browse files Browse the repository at this point in the history
Signed-off-by: jukejian <[email protected]>
  • Loading branch information
Jay-ju committed Nov 28, 2024
1 parent d2e37b0 commit 1a86bbd
Show file tree
Hide file tree
Showing 6 changed files with 219 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,28 @@ def satisfies_limit(self, limit: "ExecutionResources") -> bool:
and self.object_store_memory <= limit.object_store_memory
)

def satisfies_limit_with_reason(self, limit: "ExecutionResources"):
"""Return if this resource struct meets the specified limits.
Note that None for a field means no limit.
"""
not_satisfied = []

if self.cpu is not None and limit.cpu is not None and self.cpu > limit.cpu:
not_satisfied.append("CPU")
if self.gpu is not None and limit.gpu is not None and self.gpu > limit.gpu:
not_satisfied.append("GPU")
if (
self.object_store_memory is not None
and limit.object_store_memory is not None
and self.object_store_memory > limit.object_store_memory
):
not_satisfied.append("MEM")

if not not_satisfied:
return True
return not_satisfied

def scale(self, f: float) -> "ExecutionResources":
"""Return copy with all set values scaled by `f`."""
if f < 0:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from abc import ABC, abstractmethod
from typing import Any, Callable, Dict, Iterator, List, Optional, Union
from typing import TYPE_CHECKING, Any, Callable, Dict, Iterator, List, Optional, Union

import ray
from .ref_bundle import RefBundle
Expand All @@ -16,6 +16,10 @@
from ray.data._internal.stats import StatsDict
from ray.data.context import DataContext

if TYPE_CHECKING:
from ray.data._internal.execution.operators.map_operator import MapTaskStats


# TODO(hchen): Ray Core should have a common interface for these two types.
Waitable = Union[ray.ObjectRef, ObjectRefGenerator]

Expand Down Expand Up @@ -47,7 +51,9 @@ def __init__(
task_index: int,
streaming_gen: ObjectRefGenerator,
output_ready_callback: Callable[[RefBundle], None],
task_done_callback: Callable[[Optional[Exception]], None],
task_done_callback: Callable[
[Optional[Exception], Optional["MapTaskStats"]], None
],
):
"""
Args:
Expand Down Expand Up @@ -136,6 +142,49 @@ def on_task_finished(self):
self._task_done_callback()


class TaskBackPressureState:
def __init__(self):
self._in_task_submission_backpressure = False
self._in_task_submission_backpressure_reason = ""
self._in_task_output_backpressure = False
self._in_task_output_backpressure_reason = ""

def is_changing_task_submission_backpressure(
self, in_backpressure: bool, reason: str
) -> bool:
return (
self._in_task_submission_backpressure != in_backpressure
or self._in_task_submission_backpressure_reason != reason
)

def set_in_task_submission_backpressure(self, in_backpressure: bool, reason: str):
self._in_task_submission_backpressure = in_backpressure
self._in_task_submission_backpressure_reason = reason

def is_changing_task_output_backpressure(
self, in_backpressure: bool, reason: str
) -> bool:
return (
self._in_task_output_backpressure != in_backpressure
or self._in_task_output_backpressure_reason != reason
)

def set_in_task_output_backpressure(self, in_backpressure: bool, reason: str):
self._in_task_output_backpressure = in_backpressure
self._in_task_output_backpressure_reason = reason

def is_in_backpressure(self):
return (
self._in_task_output_backpressure or self._in_task_submission_backpressure
)

def is_in_task_output_backpressure(self):
return self._in_task_output_backpressure

def is_in_task_submission_backpressure(self):
return self._in_task_submission_backpressure


class PhysicalOperator(Operator):
"""Abstract class for physical operators.
Expand Down Expand Up @@ -182,9 +231,8 @@ def __init__(
self._inputs_complete = not input_dependencies
self._target_max_block_size = target_max_block_size
self._started = False
self._in_task_submission_backpressure = False
self._in_task_output_backpressure = False
self._metrics = OpRuntimeMetrics(self)
self._backpressure_state = TaskBackPressureState()
self._estimated_num_output_bundles = None
self._estimated_output_num_rows = None
self._execution_completed = False
Expand Down Expand Up @@ -475,17 +523,35 @@ def incremental_resource_usage(self) -> ExecutionResources:
"""
return ExecutionResources()

def notify_in_task_submission_backpressure(self, in_backpressure: bool) -> None:
def notify_in_task_submission_backpressure(
self, in_backpressure: bool, reason: str
) -> None:
"""Called periodically from the executor to update internal in backpressure
status for stats collection purposes.
Args:
in_backpressure: Value this operator's in_backpressure should be set to.
reason: reason for in_backpressure
"""
# only update on change to in_backpressure
if self._in_task_submission_backpressure != in_backpressure:
if self._backpressure_state.is_changing_task_submission_backpressure(
in_backpressure, reason
):
self._metrics.on_toggle_task_submission_backpressure(in_backpressure)
self._in_task_submission_backpressure = in_backpressure
self._backpressure_state.set_in_task_submission_backpressure(
in_backpressure, reason
)

def notify_in_task_output_backpressure(
self, in_backpressure: bool, reason: str
) -> None:
# only update on change to in_backpressure
if self._backpressure_state.is_changing_task_output_backpressure(
in_backpressure, reason
):
self._backpressure_state.set_in_task_output_backpressure(
in_backpressure, reason
)

def get_autoscaling_actor_pools(self) -> List[AutoscalingActorPool]:
"""Return a list of `AutoscalingActorPool`s managed by this operator."""
Expand Down
24 changes: 20 additions & 4 deletions python/ray/data/_internal/execution/resource_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,11 @@ def can_submit_new_task(self, op: PhysicalOperator) -> bool:
"""Return whether the given operator can submit a new task."""
...

@abstractmethod
def can_submit_new_task_with_reason(self, op: PhysicalOperator) -> dict:
"""Return whether the given operator can submit a new task with reason."""
...

@abstractmethod
def max_task_output_bytes_to_read(self, op: PhysicalOperator) -> Optional[int]:
"""Return the maximum bytes of pending task outputs can be read for
Expand Down Expand Up @@ -487,12 +492,23 @@ def _update_reservation(self):

self._total_shared = self._total_shared.max(ExecutionResources.zero())

def can_submit_new_task(self, op: PhysicalOperator) -> bool:
def can_submit_new_task_with_reason(self, op: PhysicalOperator) -> dict:
if op not in self._op_budgets:
return True
return {"can_submit": True}
budget = self._op_budgets[op]
res = op.incremental_resource_usage().satisfies_limit(budget)
return res
incremental_usage = op.incremental_resource_usage()
res = incremental_usage.satisfies_limit_with_reason(budget)
if res is True:
return {"can_submit": True}

return {
"can_submit": False,
"usage": res,
}

def can_submit_new_task(self, op: PhysicalOperator) -> bool:
can_submit_with_reason = self.can_submit_new_task_with_reason(op)
return can_submit_with_reason["can_submit"]

def _should_unblock_streaming_output_backpressure(
self, op: PhysicalOperator
Expand Down
Loading

0 comments on commit 1a86bbd

Please sign in to comment.