From 1a86bbda8a05d763cf6b09fcfbc28e6d1703853b Mon Sep 17 00:00:00 2001 From: jukejian Date: Tue, 10 Sep 2024 19:24:57 +0800 Subject: [PATCH] [data] add backpressure reason Signed-off-by: jukejian --- .../execution/interfaces/execution_options.py | 22 ++++ .../execution/interfaces/physical_operator.py | 80 ++++++++++-- .../_internal/execution/resource_manager.py | 24 +++- .../execution/streaming_executor_state.py | 117 +++++++++++++++--- python/ray/data/context.py | 1 + python/ray/data/tests/test_webdataset.py | 1 - 6 files changed, 219 insertions(+), 26 deletions(-) diff --git a/python/ray/data/_internal/execution/interfaces/execution_options.py b/python/ray/data/_internal/execution/interfaces/execution_options.py index 8000901992bc..bdff5070ee4d 100644 --- a/python/ray/data/_internal/execution/interfaces/execution_options.py +++ b/python/ray/data/_internal/execution/interfaces/execution_options.py @@ -192,6 +192,28 @@ def satisfies_limit(self, limit: "ExecutionResources") -> bool: and self.object_store_memory <= limit.object_store_memory ) + def satisfies_limit_with_reason(self, limit: "ExecutionResources"): + """Return if this resource struct meets the specified limits. + + Note that None for a field means no limit. + """ + not_satisfied = [] + + if self.cpu is not None and limit.cpu is not None and self.cpu > limit.cpu: + not_satisfied.append("CPU") + if self.gpu is not None and limit.gpu is not None and self.gpu > limit.gpu: + not_satisfied.append("GPU") + if ( + self.object_store_memory is not None + and limit.object_store_memory is not None + and self.object_store_memory > limit.object_store_memory + ): + not_satisfied.append("MEM") + + if not not_satisfied: + return True + return not_satisfied + def scale(self, f: float) -> "ExecutionResources": """Return copy with all set values scaled by `f`.""" if f < 0: diff --git a/python/ray/data/_internal/execution/interfaces/physical_operator.py b/python/ray/data/_internal/execution/interfaces/physical_operator.py index 361b18babf6c..6f26c8fa5275 100644 --- a/python/ray/data/_internal/execution/interfaces/physical_operator.py +++ b/python/ray/data/_internal/execution/interfaces/physical_operator.py @@ -1,5 +1,5 @@ from abc import ABC, abstractmethod -from typing import Any, Callable, Dict, Iterator, List, Optional, Union +from typing import TYPE_CHECKING, Any, Callable, Dict, Iterator, List, Optional, Union import ray from .ref_bundle import RefBundle @@ -16,6 +16,10 @@ from ray.data._internal.stats import StatsDict from ray.data.context import DataContext +if TYPE_CHECKING: + from ray.data._internal.execution.operators.map_operator import MapTaskStats + + # TODO(hchen): Ray Core should have a common interface for these two types. Waitable = Union[ray.ObjectRef, ObjectRefGenerator] @@ -47,7 +51,9 @@ def __init__( task_index: int, streaming_gen: ObjectRefGenerator, output_ready_callback: Callable[[RefBundle], None], - task_done_callback: Callable[[Optional[Exception]], None], + task_done_callback: Callable[ + [Optional[Exception], Optional["MapTaskStats"]], None + ], ): """ Args: @@ -136,6 +142,49 @@ def on_task_finished(self): self._task_done_callback() +class TaskBackPressureState: + def __init__(self): + self._in_task_submission_backpressure = False + self._in_task_submission_backpressure_reason = "" + self._in_task_output_backpressure = False + self._in_task_output_backpressure_reason = "" + + def is_changing_task_submission_backpressure( + self, in_backpressure: bool, reason: str + ) -> bool: + return ( + self._in_task_submission_backpressure != in_backpressure + or self._in_task_submission_backpressure_reason != reason + ) + + def set_in_task_submission_backpressure(self, in_backpressure: bool, reason: str): + self._in_task_submission_backpressure = in_backpressure + self._in_task_submission_backpressure_reason = reason + + def is_changing_task_output_backpressure( + self, in_backpressure: bool, reason: str + ) -> bool: + return ( + self._in_task_output_backpressure != in_backpressure + or self._in_task_output_backpressure_reason != reason + ) + + def set_in_task_output_backpressure(self, in_backpressure: bool, reason: str): + self._in_task_output_backpressure = in_backpressure + self._in_task_output_backpressure_reason = reason + + def is_in_backpressure(self): + return ( + self._in_task_output_backpressure or self._in_task_submission_backpressure + ) + + def is_in_task_output_backpressure(self): + return self._in_task_output_backpressure + + def is_in_task_submission_backpressure(self): + return self._in_task_submission_backpressure + + class PhysicalOperator(Operator): """Abstract class for physical operators. @@ -182,9 +231,8 @@ def __init__( self._inputs_complete = not input_dependencies self._target_max_block_size = target_max_block_size self._started = False - self._in_task_submission_backpressure = False - self._in_task_output_backpressure = False self._metrics = OpRuntimeMetrics(self) + self._backpressure_state = TaskBackPressureState() self._estimated_num_output_bundles = None self._estimated_output_num_rows = None self._execution_completed = False @@ -475,17 +523,35 @@ def incremental_resource_usage(self) -> ExecutionResources: """ return ExecutionResources() - def notify_in_task_submission_backpressure(self, in_backpressure: bool) -> None: + def notify_in_task_submission_backpressure( + self, in_backpressure: bool, reason: str + ) -> None: """Called periodically from the executor to update internal in backpressure status for stats collection purposes. Args: in_backpressure: Value this operator's in_backpressure should be set to. + reason: reason for in_backpressure """ # only update on change to in_backpressure - if self._in_task_submission_backpressure != in_backpressure: + if self._backpressure_state.is_changing_task_submission_backpressure( + in_backpressure, reason + ): self._metrics.on_toggle_task_submission_backpressure(in_backpressure) - self._in_task_submission_backpressure = in_backpressure + self._backpressure_state.set_in_task_submission_backpressure( + in_backpressure, reason + ) + + def notify_in_task_output_backpressure( + self, in_backpressure: bool, reason: str + ) -> None: + # only update on change to in_backpressure + if self._backpressure_state.is_changing_task_output_backpressure( + in_backpressure, reason + ): + self._backpressure_state.set_in_task_output_backpressure( + in_backpressure, reason + ) def get_autoscaling_actor_pools(self) -> List[AutoscalingActorPool]: """Return a list of `AutoscalingActorPool`s managed by this operator.""" diff --git a/python/ray/data/_internal/execution/resource_manager.py b/python/ray/data/_internal/execution/resource_manager.py index 32b23443c7ed..4e5916855077 100644 --- a/python/ray/data/_internal/execution/resource_manager.py +++ b/python/ray/data/_internal/execution/resource_manager.py @@ -275,6 +275,11 @@ def can_submit_new_task(self, op: PhysicalOperator) -> bool: """Return whether the given operator can submit a new task.""" ... + @abstractmethod + def can_submit_new_task_with_reason(self, op: PhysicalOperator) -> dict: + """Return whether the given operator can submit a new task with reason.""" + ... + @abstractmethod def max_task_output_bytes_to_read(self, op: PhysicalOperator) -> Optional[int]: """Return the maximum bytes of pending task outputs can be read for @@ -487,12 +492,23 @@ def _update_reservation(self): self._total_shared = self._total_shared.max(ExecutionResources.zero()) - def can_submit_new_task(self, op: PhysicalOperator) -> bool: + def can_submit_new_task_with_reason(self, op: PhysicalOperator) -> dict: if op not in self._op_budgets: - return True + return {"can_submit": True} budget = self._op_budgets[op] - res = op.incremental_resource_usage().satisfies_limit(budget) - return res + incremental_usage = op.incremental_resource_usage() + res = incremental_usage.satisfies_limit_with_reason(budget) + if res is True: + return {"can_submit": True} + + return { + "can_submit": False, + "usage": res, + } + + def can_submit_new_task(self, op: PhysicalOperator) -> bool: + can_submit_with_reason = self.can_submit_new_task_with_reason(op) + return can_submit_with_reason["can_submit"] def _should_unblock_streaming_output_backpressure( self, op: PhysicalOperator diff --git a/python/ray/data/_internal/execution/streaming_executor_state.py b/python/ray/data/_internal/execution/streaming_executor_state.py index 8ca0247dd393..10f9ccda35fc 100644 --- a/python/ray/data/_internal/execution/streaming_executor_state.py +++ b/python/ray/data/_internal/execution/streaming_executor_state.py @@ -32,6 +32,7 @@ ) from ray.data._internal.execution.operators.input_data_buffer import InputDataBuffer from ray.data._internal.execution.resource_manager import ResourceManager +from ray.data._internal.execution.util import memory_string from ray.data._internal.progress_bar import ProgressBar from ray.data.context import DataContext @@ -185,6 +186,7 @@ def __init__(self, op: PhysicalOperator, inqueues: List[OpBufferQueue]): self.outqueue: OpBufferQueue = OpBufferQueue() self.op = op self.progress_bar = None + self.enable_show_backpressure_reason = True self.num_completed_tasks = 0 self.inputs_done_called = False # Tracks whether `input_done` is called for each input op. @@ -218,6 +220,9 @@ def initialize_progress_bars(self, index: int, verbose_progress: bool) -> int: position=index, enabled=progress_bar_enabled, ) + self.enable_show_backpressure_reason = ( + progress_bar_enabled and ctx.enable_show_backpressure_reason + ) num_progress_bars = 1 if is_all_to_all: # Initialize must be called for sub progress bars, even the @@ -260,11 +265,26 @@ def summary_str(self, resource_manager: ResourceManager) -> str: # Active tasks active = self.op.num_active_tasks() desc = f"- {self.op.name}: Tasks: {active}" + backpressure_state = self.op._backpressure_state if ( - self.op._in_task_submission_backpressure - or self.op._in_task_output_backpressure + self.enable_show_backpressure_reason + and backpressure_state.is_in_backpressure() ): + desc += " [🚧 BACKPRESSURED 🚧]" + reasons = [] + if backpressure_state.is_in_task_submission_backpressure(): + reasons.append( + f"Reason(S): {backpressure_state._in_task_submission_backpressure_reason}" + ) + if backpressure_state.is_in_task_output_backpressure(): + reasons.append( + f"Reason(O): {backpressure_state._in_task_output_backpressure_reason}" + ) + if reasons: + desc += f" [{', '.join(reasons)} 🚧]" + elif backpressure_state.is_in_backpressure(): desc += " [backpressured]" + desc += f", [{resource_manager.get_op_usage_str(self.op)}]" # Actors info desc += self.op.actor_info_progress_str() @@ -403,6 +423,21 @@ def process_completed_tasks( The number of errored blocks. """ + def op_output_backpressure( + opstate: OpState, max_bytes_to_read: Optional[float] + ) -> bool: + op = opstate.op + in_backpressure = max_bytes_to_read is not None and max_bytes_to_read <= 0 + in_backpressure_reason = ( + f"max_bytes_to_read({memory_string(max_bytes_to_read)})" + if in_backpressure + else "" + ) + + # TODO(jkj add some output backpressures) + op.notify_in_task_output_backpressure(in_backpressure, in_backpressure_reason) + return in_backpressure + # All active tasks, keyed by their waitables. active_tasks: Dict[Waitable, Tuple[OpState, OpTask]] = {} for op, state in topology.items(): @@ -444,11 +479,18 @@ def process_completed_tasks( for task in ready_tasks: if isinstance(task, DataOpTask): try: + # TODO(jkj Add logic to skip data reading when output + # backpressure occurs.) bytes_read = task.on_data_ready( max_bytes_to_read_per_op.get(state, None) ) if state in max_bytes_to_read_per_op: max_bytes_to_read_per_op[state] -= bytes_read + op_output_backpressure( + state, max_bytes_to_read_per_op[state] + ) + else: + op_output_backpressure(state, None) except Exception as e: num_errored_blocks += 1 should_ignore = ( @@ -548,14 +590,41 @@ def select_operator_to_run( ops = [] for op, state in topology.items(): if resource_manager.op_resource_allocator_enabled(): - under_resource_limits = ( - resource_manager.op_resource_allocator.can_submit_new_task(op) + can_submit_with_reason = ( + resource_manager.op_resource_allocator.can_submit_new_task_with_reason( + op + ) + ) + under_resource_limits = can_submit_with_reason["can_submit"] + limit_reason = ( + f"(new tasks: {''.join(can_submit_with_reason['usage'])})" + if not under_resource_limits + else "" ) else: - under_resource_limits = _execution_allowed(op, resource_manager) - in_backpressure = not under_resource_limits or any( - not p.can_add_input(op) for p in backpressure_policies + allowed_execution = _execution_allowed_with_reason(op, resource_manager) + under_resource_limits = allowed_execution["allowed_exec"] + limit_reason = ( + f"(exec allowed: {''.join(allowed_execution['usage'])})" + if not under_resource_limits + else "" + ) + in_backpressure = not under_resource_limits + in_backpressure_reason = ( + f"under_resource_limits-> {limit_reason}" if in_backpressure else "" ) + # backpressure policies + if not in_backpressure: + for p in backpressure_policies: + if not p.can_add_input(op): + in_backpressure = True + in_backpressure_reason = p.__class__.__name__ + break + # should add input + if not in_backpressure and not op.should_add_input(): + in_backpressure = True + in_backpressure_reason = "op.should_add_input" + op_runnable = False if ( not in_backpressure @@ -573,7 +642,13 @@ def select_operator_to_run( ) # Signal whether op in backpressure for stats collections - op.notify_in_task_submission_backpressure(in_backpressure) + if op.completed(): + op.notify_in_task_submission_backpressure(False, "") + else: + # Signal whether op in backpressure for stats collections + op.notify_in_task_submission_backpressure( + in_backpressure, in_backpressure_reason + ) # To ensure liveness, allow at least 1 op to run regardless of limits. This is # gated on `ensure_at_least_one_running`, which is set if the consumer is blocked. @@ -606,6 +681,13 @@ def select_operator_to_run( def _execution_allowed(op: PhysicalOperator, resource_manager: ResourceManager) -> bool: + allowed_with_reason = _execution_allowed_with_reason(op, resource_manager) + return allowed_with_reason["allowed_exec"] + + +def _execution_allowed_with_reason( + op: PhysicalOperator, resource_manager: ResourceManager +): """Return whether an operator is allowed to execute given resource usage. Operators are throttled globally based on CPU and GPU limits for the stream. @@ -624,7 +706,7 @@ def _execution_allowed(op: PhysicalOperator, resource_manager: ResourceManager) Whether the op is allowed to run. """ if op.throttling_disabled(): - return True + return {"allowed_exec": True} global_usage = resource_manager.get_global_usage() global_limits = resource_manager.get_global_limits() @@ -656,7 +738,7 @@ def _execution_allowed(op: PhysicalOperator, resource_manager: ResourceManager) # Under global limits; always allow. new_usage = global_floored.add(inc_indicator) if new_usage.satisfies_limit(global_limits): - return True + return {"allowed_exec": True} # We're over global limits, but execution may still be allowed if memory is the # only bottleneck and this wouldn't impact downstream memory limits. This avoids @@ -665,11 +747,18 @@ def _execution_allowed(op: PhysicalOperator, resource_manager: ResourceManager) global_limits_sans_memory = ExecutionResources.for_limits( cpu=global_limits.cpu, gpu=global_limits.gpu ) - global_ok_sans_memory = new_usage.satisfies_limit(global_limits_sans_memory) + global_ok_sans_memory = new_usage.satisfies_limit_with_reason( + global_limits_sans_memory + ) downstream_memory = resource_manager.get_downstream_object_store_memory(op) downstream_limit = global_limits.scale(resource_manager.get_downstream_fraction(op)) downstream_memory_ok = ExecutionResources( object_store_memory=downstream_memory - ).satisfies_limit(downstream_limit) - - return global_ok_sans_memory and downstream_memory_ok + ).satisfies_limit_with_reason(downstream_limit) + + if global_ok_sans_memory is True and downstream_memory_ok is True: + return {"allowed_exec": True} + return { + "allowed_exec": False, + "usage": f"G({global_ok_sans_memory}/D{downstream_memory_ok}", + } diff --git a/python/ray/data/context.py b/python/ray/data/context.py index f43fc2a50246..823d8fb2c766 100644 --- a/python/ray/data/context.py +++ b/python/ray/data/context.py @@ -308,6 +308,7 @@ class DataContext: enable_progress_bar_name_truncation: bool = ( DEFAULT_ENABLE_PROGRESS_BAR_NAME_TRUNCATION ) + enable_show_backpressure_reason: bool = True enable_get_object_locations_for_metrics: bool = ( DEFAULT_ENABLE_GET_OBJECT_LOCATIONS_FOR_METRICS ) diff --git a/python/ray/data/tests/test_webdataset.py b/python/ray/data/tests/test_webdataset.py index 3f509f812baf..e98b667bce27 100644 --- a/python/ray/data/tests/test_webdataset.py +++ b/python/ray/data/tests/test_webdataset.py @@ -125,7 +125,6 @@ def test_webdataset_write(ray_start_2_cpus, tmp_path): assert tf.extractfile(f"{i}.a").read().decode("utf-8") == str(i) assert tf.extractfile(f"{i}.b").read().decode("utf-8") == str(i**2) - def custom_decoder(sample): for key, value in sample.items(): if key == "png":