Skip to content

Commit

Permalink
[data] add backpressure reason
Browse files Browse the repository at this point in the history
Signed-off-by: jukejian <[email protected]>
  • Loading branch information
Jay-ju committed Nov 5, 2024
1 parent 4308825 commit 37730b1
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 10 deletions.
37 changes: 27 additions & 10 deletions python/ray/data/_internal/execution/streaming_executor_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ def __init__(self, op: PhysicalOperator, inqueues: List[OpBufferQueue]):
self.outqueue: OpBufferQueue = OpBufferQueue()
self.op = op
self.progress_bar = None
self.enable_show_backpressure_reason = True
self.num_completed_tasks = 0
self.inputs_done_called = False
# Tracks whether `input_done` is called for each input op.
Expand Down Expand Up @@ -219,6 +220,9 @@ def initialize_progress_bars(self, index: int, verbose_progress: bool) -> int:
position=index,
enabled=progress_bar_enabled,
)
self.enable_show_backpressure_reason = (
self.progress_bar.enabled and ctx.show_backpressure_reason
)
num_progress_bars = 1
if is_all_to_all:
# Initialize must be called for sub progress bars, even the
Expand Down Expand Up @@ -261,16 +265,29 @@ def summary_str(self, resource_manager: ResourceManager) -> str:
# Active tasks
active = self.op.num_active_tasks()
desc = f"- {self.op.name}: Tasks: {active}"
if self.op._in_task_submission_backpressure:
desc += (
f" [🚧 BACKPRESSURED, Reason(S): "
f"{self.op._in_task_submission_backpressure_reason} 🚧"
)
if self.op._in_task_output_backpressure:
desc += (
f" [🚧 BACKPRESSURED, Reason(O): "
f"{self.op._in_task_output_backpressure_reason} 🚧"
)
if (
self.op._in_task_submission_backpressure
or self.op._in_task_output_backpressure
):
desc += " [🚧 BACKPRESSURED 🚧]"
if self.enable_show_backpressure_reason:
reasons = []
if self.op._in_task_submission_backpressure:
reasons.append(
f"Reason(S): {self.op._in_task_submission_backpressure_reason}"
)
if self.op._in_task_output_backpressure:
reasons.append(
f"Reason(O): {self.op._in_task_output_backpressure_reason}"
)
if reasons:
desc += f" [{', '.join(reasons)} 🚧]"
else:
if (
self.op._in_task_submission_backpressure
or self.op._in_task_output_backpressure
):
desc += " [backpressured]"
desc += f", [{resource_manager.get_op_usage_str(self.op)}]"

# Actors info
Expand Down
1 change: 1 addition & 0 deletions python/ray/data/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,7 @@ class DataContext:
enable_progress_bar_name_truncation: bool = (
DEFAULT_ENABLE_PROGRESS_BAR_NAME_TRUNCATION
)
enable_show_backpressure_reason: bool = True
enable_get_object_locations_for_metrics: bool = (
DEFAULT_ENABLE_GET_OBJECT_LOCATIONS_FOR_METRICS
)
Expand Down

0 comments on commit 37730b1

Please sign in to comment.