diff --git a/python/ray/data/_internal/execution/streaming_executor_state.py b/python/ray/data/_internal/execution/streaming_executor_state.py index f89da0fe1067..6d931ab6ed90 100644 --- a/python/ray/data/_internal/execution/streaming_executor_state.py +++ b/python/ray/data/_internal/execution/streaming_executor_state.py @@ -186,6 +186,7 @@ def __init__(self, op: PhysicalOperator, inqueues: List[OpBufferQueue]): self.outqueue: OpBufferQueue = OpBufferQueue() self.op = op self.progress_bar = None + self.enable_show_backpressure_reason = True self.num_completed_tasks = 0 self.inputs_done_called = False # Tracks whether `input_done` is called for each input op. @@ -219,6 +220,9 @@ def initialize_progress_bars(self, index: int, verbose_progress: bool) -> int: position=index, enabled=progress_bar_enabled, ) + self.enable_show_backpressure_reason = ( + self.progress_bar.enabled and ctx.show_backpressure_reason + ) num_progress_bars = 1 if is_all_to_all: # Initialize must be called for sub progress bars, even the @@ -261,16 +265,29 @@ def summary_str(self, resource_manager: ResourceManager) -> str: # Active tasks active = self.op.num_active_tasks() desc = f"- {self.op.name}: Tasks: {active}" - if self.op._in_task_submission_backpressure: - desc += ( - f" [🚧 BACKPRESSURED, Reason(S): " - f"{self.op._in_task_submission_backpressure_reason} 🚧" - ) - if self.op._in_task_output_backpressure: - desc += ( - f" [🚧 BACKPRESSURED, Reason(O): " - f"{self.op._in_task_output_backpressure_reason} 🚧" - ) + if ( + self.op._in_task_submission_backpressure + or self.op._in_task_output_backpressure + ): + desc += " [🚧 BACKPRESSURED 🚧]" + if self.enable_show_backpressure_reason: + reasons = [] + if self.op._in_task_submission_backpressure: + reasons.append( + f"Reason(S): {self.op._in_task_submission_backpressure_reason}" + ) + if self.op._in_task_output_backpressure: + reasons.append( + f"Reason(O): {self.op._in_task_output_backpressure_reason}" + ) + if reasons: + desc += f" [{', '.join(reasons)} 🚧]" + else: + if ( + self.op._in_task_submission_backpressure + or self.op._in_task_output_backpressure + ): + desc += " [backpressured]" desc += f", [{resource_manager.get_op_usage_str(self.op)}]" # Actors info diff --git a/python/ray/data/context.py b/python/ray/data/context.py index f43fc2a50246..823d8fb2c766 100644 --- a/python/ray/data/context.py +++ b/python/ray/data/context.py @@ -308,6 +308,7 @@ class DataContext: enable_progress_bar_name_truncation: bool = ( DEFAULT_ENABLE_PROGRESS_BAR_NAME_TRUNCATION ) + enable_show_backpressure_reason: bool = True enable_get_object_locations_for_metrics: bool = ( DEFAULT_ENABLE_GET_OBJECT_LOCATIONS_FOR_METRICS )