Skip to content

Commit

Permalink
Revert "[Data] Add BundleQueue abstraction (ray-project#48503)" (ra…
Browse files Browse the repository at this point in the history
…y-project#48612)

Signed-off-by: Balaji Veeramani <[email protected]>
  • Loading branch information
bveeramani authored Nov 7, 2024
1 parent 218bdd7 commit 4a9c424
Show file tree
Hide file tree
Showing 11 changed files with 71 additions and 443 deletions.
8 changes: 0 additions & 8 deletions python/ray/data/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -587,14 +587,6 @@ py_test(
deps = ["//:ray_lib", ":conftest"],
)

py_test(
name = "test_bundle_queue",
size = "small",
srcs = ["tests/test_bundle_queue.py"],
tags = ["team:data", "exclusive"],
deps = ["//:ray_lib", ":conftest"],
)

py_test(
name = "test_autoscaler",
size = "small",
Expand Down
9 changes: 0 additions & 9 deletions python/ray/data/_internal/execution/bundle_queue/__init__.py

This file was deleted.

62 changes: 0 additions & 62 deletions python/ray/data/_internal/execution/bundle_queue/bundle_queue.py

This file was deleted.

129 changes: 0 additions & 129 deletions python/ray/data/_internal/execution/bundle_queue/fifo_bundle_queue.py

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
from typing import TYPE_CHECKING, Any, Dict, List, Optional

import ray
from ray.data._internal.execution.bundle_queue import create_bundle_queue
from ray.data._internal.execution.interfaces.ref_bundle import RefBundle
from ray.data._internal.memory_tracing import trace_allocation

Expand Down Expand Up @@ -268,11 +267,31 @@ class OpRuntimeMetrics(metaclass=OpRuntimesMetricsMeta):
description="Number of blocks in operator's internal input queue.",
metrics_group=MetricsGroup.OBJECT_STORE_MEMORY,
)
obj_store_mem_internal_inqueue: int = metric_field(
default=0,
description=(
"Byte size of input blocks in the operator's internal input queue."
),
metrics_group=MetricsGroup.OBJECT_STORE_MEMORY,
)
obj_store_mem_internal_outqueue_blocks: int = metric_field(
default=0,
description="Number of blocks in the operator's internal output queue.",
metrics_group=MetricsGroup.OBJECT_STORE_MEMORY,
)
obj_store_mem_internal_outqueue: int = metric_field(
default=0,
description=(
"Byte size of output blocks in the operator's internal output queue."
),
metrics_group=MetricsGroup.OBJECT_STORE_MEMORY,
)
obj_store_mem_pending_task_inputs: int = metric_field(
default=0,
description="Byte size of input blocks used by pending tasks.",
metrics_group=MetricsGroup.OBJECT_STORE_MEMORY,
map_only=True,
)
obj_store_mem_freed: int = metric_field(
default=0,
description="Byte size of freed memory in object store.",
Expand Down Expand Up @@ -304,10 +323,6 @@ def __init__(self, op: "PhysicalOperator"):
# Start time of current pause due to task submission backpressure
self._task_submission_backpressure_start_time = -1

self._internal_inqueue = create_bundle_queue()
self._internal_outqueue = create_bundle_queue()
self._pending_task_inputs = create_bundle_queue()

@property
def extra_metrics(self) -> Dict[str, Any]:
"""Return a dict of extra metrics."""
Expand Down Expand Up @@ -362,30 +377,6 @@ def average_bytes_per_output(self) -> Optional[float]:
else:
return self.bytes_task_outputs_generated / self.num_task_outputs_generated

@metric_property(
description="Byte size of input blocks in the operator's internal input queue.",
metrics_group=MetricsGroup.OBJECT_STORE_MEMORY,
)
def obj_store_mem_internal_inqueue(self) -> int:
return self._internal_inqueue.estimate_size_bytes()

@metric_property(
description=(
"Byte size of output blocks in the operator's internal output queue."
),
metrics_group=MetricsGroup.OBJECT_STORE_MEMORY,
)
def obj_store_mem_internal_outqueue(self) -> int:
return self._internal_outqueue.estimate_size_bytes()

@metric_property(
description="Byte size of input blocks used by pending tasks.",
metrics_group=MetricsGroup.OBJECT_STORE_MEMORY,
map_only=True,
)
def obj_store_mem_pending_task_inputs(self) -> int:
return self._pending_task_inputs.estimate_size_bytes()

@property
def obj_store_mem_pending_task_outputs(self) -> Optional[float]:
"""Estimated size in bytes of output blocks in Ray generator buffers.
Expand Down Expand Up @@ -463,13 +454,13 @@ def on_input_received(self, input: RefBundle):
def on_input_queued(self, input: RefBundle):
"""Callback when the operator queues an input."""
self.obj_store_mem_internal_inqueue_blocks += len(input.blocks)
self._internal_inqueue.add(input)
self.obj_store_mem_internal_inqueue += input.size_bytes()

def on_input_dequeued(self, input: RefBundle):
"""Callback when the operator dequeues an input."""
self.obj_store_mem_internal_inqueue_blocks -= len(input.blocks)
input_size = input.size_bytes()
self._internal_inqueue.remove(input)
self.obj_store_mem_internal_inqueue -= input_size
assert self.obj_store_mem_internal_inqueue >= 0, (
self._op,
self.obj_store_mem_internal_inqueue,
Expand All @@ -479,13 +470,13 @@ def on_input_dequeued(self, input: RefBundle):
def on_output_queued(self, output: RefBundle):
"""Callback when an output is queued by the operator."""
self.obj_store_mem_internal_outqueue_blocks += len(output.blocks)
self._internal_outqueue.add(output)
self.obj_store_mem_internal_outqueue += output.size_bytes()

def on_output_dequeued(self, output: RefBundle):
"""Callback when an output is dequeued by the operator."""
self.obj_store_mem_internal_outqueue_blocks -= len(output.blocks)
output_size = output.size_bytes()
self._internal_outqueue.remove(output)
self.obj_store_mem_internal_outqueue -= output_size
assert self.obj_store_mem_internal_outqueue >= 0, (
self._op,
self.obj_store_mem_internal_outqueue,
Expand Down Expand Up @@ -513,7 +504,7 @@ def on_task_submitted(self, task_index: int, inputs: RefBundle):
self.num_tasks_submitted += 1
self.num_tasks_running += 1
self.bytes_inputs_of_submitted_tasks += inputs.size_bytes()
self._pending_task_inputs.add(inputs)
self.obj_store_mem_pending_task_inputs += inputs.size_bytes()
self._running_tasks[task_index] = RunningTaskInfo(inputs, 0, 0)

def on_task_output_generated(self, task_index: int, output: RefBundle):
Expand Down Expand Up @@ -553,7 +544,7 @@ def on_task_finished(self, task_index: int, exception: Optional[Exception]):
total_input_size = inputs.size_bytes()
self.bytes_task_inputs_processed += total_input_size
input_size = inputs.size_bytes()
self._pending_task_inputs.remove(inputs)
self.obj_store_mem_pending_task_inputs -= input_size
assert self.obj_store_mem_pending_task_inputs >= 0, (
self._op,
self.obj_store_mem_pending_task_inputs,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import collections
import logging
from dataclasses import dataclass
from typing import Any, Callable, Dict, Iterator, List, Optional, Tuple, Union
Expand All @@ -7,7 +8,6 @@
from ray.core.generated import gcs_pb2
from ray.data._internal.compute import ActorPoolStrategy
from ray.data._internal.execution.autoscaler import AutoscalingActorPool
from ray.data._internal.execution.bundle_queue import create_bundle_queue
from ray.data._internal.execution.interfaces import (
ExecutionOptions,
ExecutionResources,
Expand Down Expand Up @@ -109,7 +109,7 @@ def __init__(

self._actor_pool = _ActorPool(compute_strategy, self._start_actor)
# A queue of bundles awaiting dispatch to actors.
self._bundle_queue = create_bundle_queue()
self._bundle_queue = collections.deque()
# Cached actor class.
self._cls = None
# Whether no more submittable bundles will be added.
Expand Down Expand Up @@ -175,7 +175,7 @@ def _task_done_callback(res_ref):
return actor, res_ref

def _add_bundled_input(self, bundle: RefBundle):
self._bundle_queue.add(bundle)
self._bundle_queue.append(bundle)
self._metrics.on_input_queued(bundle)
# Try to dispatch all bundles in the queue, including this new bundle.
self._dispatch_tasks()
Expand All @@ -191,14 +191,14 @@ def _dispatch_tasks(self):
while self._bundle_queue:
# Pick an actor from the pool.
if self._actor_locality_enabled:
actor = self._actor_pool.pick_actor(self._bundle_queue.peek())
actor = self._actor_pool.pick_actor(self._bundle_queue[0])
else:
actor = self._actor_pool.pick_actor()
if actor is None:
# No actors available for executing the next task.
break
# Submit the map task.
bundle = self._bundle_queue.pop()
bundle = self._bundle_queue.popleft()
self._metrics.on_input_dequeued(bundle)
input_blocks = [block for block, _ in bundle.blocks]
ctx = TaskContext(
Expand Down
Loading

0 comments on commit 4a9c424

Please sign in to comment.