Skip to content

Commit

Permalink
Revert "Revert "[Data] Add BundleQueue abstraction (ray-project#48503
Browse files Browse the repository at this point in the history
…)" (ray-project#48612)" (ray-project#48686)

This reverts commit 4a9c424.

---------

Signed-off-by: Balaji Veeramani <[email protected]>
  • Loading branch information
bveeramani authored Nov 13, 2024
1 parent dcf4892 commit 4b4cdcd
Show file tree
Hide file tree
Showing 11 changed files with 453 additions and 75 deletions.
8 changes: 8 additions & 0 deletions python/ray/data/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -587,6 +587,14 @@ py_test(
deps = ["//:ray_lib", ":conftest"],
)

py_test(
name = "test_bundle_queue",
size = "small",
srcs = ["tests/test_bundle_queue.py"],
tags = ["team:data", "exclusive"],
deps = ["//:ray_lib", ":conftest"],
)

py_test(
name = "test_autoscaler",
size = "small",
Expand Down
9 changes: 9 additions & 0 deletions python/ray/data/_internal/execution/bundle_queue/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from .bundle_queue import BundleQueue
from .fifo_bundle_queue import FIFOBundleQueue


def create_bundle_queue() -> BundleQueue:
return FIFOBundleQueue()


__all__ = ["BundleQueue", "create_bundle_queue"]
62 changes: 62 additions & 0 deletions python/ray/data/_internal/execution/bundle_queue/bundle_queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import abc
from typing import TYPE_CHECKING, Optional

if TYPE_CHECKING:
from ray.data._internal.execution.interfaces import RefBundle


class BundleQueue(abc.ABC):
@abc.abstractmethod
def __len__(self) -> int:
"""Return the number of bundles in the queue."""
...

@abc.abstractmethod
def __contains__(self, bundle: "RefBundle") -> bool:
"""Return whether the bundle is in the queue."""
...

@abc.abstractmethod
def add(self, bundle: "RefBundle") -> None:
"""Add a bundle to the queue."""
...

@abc.abstractmethod
def pop(self) -> "RefBundle":
"""Remove and return the head of the queue.
Raises:
IndexError: If the queue is empty.
"""
...

@abc.abstractmethod
def peek(self) -> Optional["RefBundle"]:
"""Return the head of the queue without removing it.
If the queue is empty, return `None`.
"""
...

@abc.abstractmethod
def remove(self, bundle: "RefBundle"):
"""Remove a bundle from the queue."""
...

@abc.abstractmethod
def clear(self):
"""Remove all bundles from the queue."""
...

@abc.abstractmethod
def estimate_size_bytes(self) -> int:
"""Return an estimate of the total size of objects in the queue."""
...

@abc.abstractmethod
def is_empty(self):
"""Return whether this queue and all of its internal data structures are empty.
This method is used for testing.
"""
...
129 changes: 129 additions & 0 deletions python/ray/data/_internal/execution/bundle_queue/fifo_bundle_queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
from collections import defaultdict, deque
from dataclasses import dataclass
from typing import TYPE_CHECKING, Dict, List, Optional

from .bundle_queue import BundleQueue

if TYPE_CHECKING:
from ray.data._internal.execution.interfaces import RefBundle


@dataclass
class _Node:
value: "RefBundle"
next: Optional["_Node"] = None
prev: Optional["_Node"] = None


class FIFOBundleQueue(BundleQueue):
"""A bundle queue that follows a first-in-first-out policy."""

def __init__(self):
# We manually implement a linked list because we need to remove elements
# efficiently, and Python's built-in data structures have O(n) removal time.
self._head: Optional[_Node] = None
self._tail: Optional[_Node] = None
# We use a dictionary to keep track of the nodes corresponding to each bundle.
# This allows us to remove a bundle from the queue in O(1) time. We need a list
# because a bundle can be added to the queue multiple times. Nodes in each list
# are insertion-ordered.
self._bundle_to_nodes: Dict["RefBundle", List[_Node]] = defaultdict(deque)

self._nbytes = 0
self._num_bundles = 0

def __len__(self) -> int:
return self._num_bundles

def __contains__(self, bundle: "RefBundle") -> bool:
return bundle in self._bundle_to_nodes

def add(self, bundle: "RefBundle") -> None:
"""Add a bundle to the end (right) of the queue."""
new_node = _Node(value=bundle, next=None, prev=self._tail)
# Case 1: The queue is empty.
if self._head is None:
assert self._tail is None
self._head = new_node
self._tail = new_node
# Case 2: The queue has at least one element.
else:
self._tail.next = new_node
self._tail = new_node

self._bundle_to_nodes[bundle].append(new_node)

self._nbytes += bundle.size_bytes()
self._num_bundles += 1

def pop(self) -> "RefBundle":
"""Return the first (left) bundle in the queue."""
# Case 1: The queue is empty.
if not self._head:
raise IndexError("You can't pop from an empty queue")

bundle = self._head.value
self.remove(bundle)

return bundle

def peek(self) -> Optional["RefBundle"]:
"""Return the first (left) bundle in the queue without removing it."""
if self._head is None:
return None

return self._head.value

def remove(self, bundle: "RefBundle"):
"""Remove a bundle from the queue.
If there are multiple instances of the bundle in the queue, this method only
removes the first one.
"""
# Case 1: The queue is empty.
if bundle not in self._bundle_to_nodes:
raise ValueError(f"The bundle {bundle} is not in the queue.")

node = self._bundle_to_nodes[bundle].popleft()
if not self._bundle_to_nodes[bundle]:
del self._bundle_to_nodes[bundle]

# Case 2: The bundle is the only element in the queue.
if self._head is self._tail:
self._head = None
self._tail = None
# Case 3: The bundle is the first element in the queue.
elif node is self._head:
self._head = node.next
self._head.prev = None
# Case 4: The bundle is the last element in the queue.
elif node is self._tail:
self._tail = node.prev
self._tail.next = None
# Case 5: The bundle is in the middle of the queue.
else:
node.prev.next = node.next
node.next.prev = node.prev

self._nbytes -= bundle.size_bytes()
assert self._nbytes >= 0, (
"Expected the total size of objects in the queue to be non-negative, but "
f"got {self._nbytes} bytes instead."
)

self._num_bundles -= 1

return node.value

def clear(self):
self._head = None
self._tail = None
self._bundle_to_nodes.clear()
self._nbytes = 0
self._num_bundles = 0

def estimate_size_bytes(self) -> int:
return self._nbytes

def is_empty(self):
return not self._bundle_to_nodes and self._head is None and self._tail is None
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from typing import TYPE_CHECKING, Any, Dict, List, Optional

import ray
from ray.data._internal.execution.bundle_queue import create_bundle_queue
from ray.data._internal.execution.interfaces.ref_bundle import RefBundle
from ray.data._internal.memory_tracing import trace_allocation

Expand Down Expand Up @@ -267,31 +268,11 @@ class OpRuntimeMetrics(metaclass=OpRuntimesMetricsMeta):
description="Number of blocks in operator's internal input queue.",
metrics_group=MetricsGroup.OBJECT_STORE_MEMORY,
)
obj_store_mem_internal_inqueue: int = metric_field(
default=0,
description=(
"Byte size of input blocks in the operator's internal input queue."
),
metrics_group=MetricsGroup.OBJECT_STORE_MEMORY,
)
obj_store_mem_internal_outqueue_blocks: int = metric_field(
default=0,
description="Number of blocks in the operator's internal output queue.",
metrics_group=MetricsGroup.OBJECT_STORE_MEMORY,
)
obj_store_mem_internal_outqueue: int = metric_field(
default=0,
description=(
"Byte size of output blocks in the operator's internal output queue."
),
metrics_group=MetricsGroup.OBJECT_STORE_MEMORY,
)
obj_store_mem_pending_task_inputs: int = metric_field(
default=0,
description="Byte size of input blocks used by pending tasks.",
metrics_group=MetricsGroup.OBJECT_STORE_MEMORY,
map_only=True,
)
obj_store_mem_freed: int = metric_field(
default=0,
description="Byte size of freed memory in object store.",
Expand Down Expand Up @@ -323,6 +304,10 @@ def __init__(self, op: "PhysicalOperator"):
# Start time of current pause due to task submission backpressure
self._task_submission_backpressure_start_time = -1

self._internal_inqueue = create_bundle_queue()
self._internal_outqueue = create_bundle_queue()
self._pending_task_inputs = create_bundle_queue()

@property
def extra_metrics(self) -> Dict[str, Any]:
"""Return a dict of extra metrics."""
Expand Down Expand Up @@ -377,6 +362,30 @@ def average_bytes_per_output(self) -> Optional[float]:
else:
return self.bytes_task_outputs_generated / self.num_task_outputs_generated

@metric_property(
description="Byte size of input blocks in the operator's internal input queue.",
metrics_group=MetricsGroup.OBJECT_STORE_MEMORY,
)
def obj_store_mem_internal_inqueue(self) -> int:
return self._internal_inqueue.estimate_size_bytes()

@metric_property(
description=(
"Byte size of output blocks in the operator's internal output queue."
),
metrics_group=MetricsGroup.OBJECT_STORE_MEMORY,
)
def obj_store_mem_internal_outqueue(self) -> int:
return self._internal_outqueue.estimate_size_bytes()

@metric_property(
description="Byte size of input blocks used by pending tasks.",
metrics_group=MetricsGroup.OBJECT_STORE_MEMORY,
map_only=True,
)
def obj_store_mem_pending_task_inputs(self) -> int:
return self._pending_task_inputs.estimate_size_bytes()

@property
def obj_store_mem_pending_task_outputs(self) -> Optional[float]:
"""Estimated size in bytes of output blocks in Ray generator buffers.
Expand Down Expand Up @@ -454,13 +463,13 @@ def on_input_received(self, input: RefBundle):
def on_input_queued(self, input: RefBundle):
"""Callback when the operator queues an input."""
self.obj_store_mem_internal_inqueue_blocks += len(input.blocks)
self.obj_store_mem_internal_inqueue += input.size_bytes()
self._internal_inqueue.add(input)

def on_input_dequeued(self, input: RefBundle):
"""Callback when the operator dequeues an input."""
self.obj_store_mem_internal_inqueue_blocks -= len(input.blocks)
input_size = input.size_bytes()
self.obj_store_mem_internal_inqueue -= input_size
self._internal_inqueue.remove(input)
assert self.obj_store_mem_internal_inqueue >= 0, (
self._op,
self.obj_store_mem_internal_inqueue,
Expand All @@ -470,13 +479,13 @@ def on_input_dequeued(self, input: RefBundle):
def on_output_queued(self, output: RefBundle):
"""Callback when an output is queued by the operator."""
self.obj_store_mem_internal_outqueue_blocks += len(output.blocks)
self.obj_store_mem_internal_outqueue += output.size_bytes()
self._internal_outqueue.add(output)

def on_output_dequeued(self, output: RefBundle):
"""Callback when an output is dequeued by the operator."""
self.obj_store_mem_internal_outqueue_blocks -= len(output.blocks)
output_size = output.size_bytes()
self.obj_store_mem_internal_outqueue -= output_size
self._internal_outqueue.remove(output)
assert self.obj_store_mem_internal_outqueue >= 0, (
self._op,
self.obj_store_mem_internal_outqueue,
Expand Down Expand Up @@ -504,7 +513,7 @@ def on_task_submitted(self, task_index: int, inputs: RefBundle):
self.num_tasks_submitted += 1
self.num_tasks_running += 1
self.bytes_inputs_of_submitted_tasks += inputs.size_bytes()
self.obj_store_mem_pending_task_inputs += inputs.size_bytes()
self._pending_task_inputs.add(inputs)
self._running_tasks[task_index] = RunningTaskInfo(inputs, 0, 0)

def on_task_output_generated(self, task_index: int, output: RefBundle):
Expand Down Expand Up @@ -544,7 +553,7 @@ def on_task_finished(self, task_index: int, exception: Optional[Exception]):
total_input_size = inputs.size_bytes()
self.bytes_task_inputs_processed += total_input_size
input_size = inputs.size_bytes()
self.obj_store_mem_pending_task_inputs -= input_size
self._pending_task_inputs.remove(inputs)
assert self.obj_store_mem_pending_task_inputs >= 0, (
self._op,
self.obj_store_mem_pending_task_inputs,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import collections
import logging
from dataclasses import dataclass
from typing import Any, Callable, Dict, Iterator, List, Optional, Tuple, Union
Expand All @@ -8,6 +7,7 @@
from ray.core.generated import gcs_pb2
from ray.data._internal.compute import ActorPoolStrategy
from ray.data._internal.execution.autoscaler import AutoscalingActorPool
from ray.data._internal.execution.bundle_queue import create_bundle_queue
from ray.data._internal.execution.interfaces import (
ExecutionOptions,
ExecutionResources,
Expand Down Expand Up @@ -109,7 +109,7 @@ def __init__(

self._actor_pool = _ActorPool(compute_strategy, self._start_actor)
# A queue of bundles awaiting dispatch to actors.
self._bundle_queue = collections.deque()
self._bundle_queue = create_bundle_queue()
# Cached actor class.
self._cls = None
# Whether no more submittable bundles will be added.
Expand Down Expand Up @@ -175,7 +175,7 @@ def _task_done_callback(res_ref):
return actor, res_ref

def _add_bundled_input(self, bundle: RefBundle):
self._bundle_queue.append(bundle)
self._bundle_queue.add(bundle)
self._metrics.on_input_queued(bundle)
# Try to dispatch all bundles in the queue, including this new bundle.
self._dispatch_tasks()
Expand All @@ -191,14 +191,14 @@ def _dispatch_tasks(self):
while self._bundle_queue:
# Pick an actor from the pool.
if self._actor_locality_enabled:
actor = self._actor_pool.pick_actor(self._bundle_queue[0])
actor = self._actor_pool.pick_actor(self._bundle_queue.peek())
else:
actor = self._actor_pool.pick_actor()
if actor is None:
# No actors available for executing the next task.
break
# Submit the map task.
bundle = self._bundle_queue.popleft()
bundle = self._bundle_queue.pop()
self._metrics.on_input_dequeued(bundle)
input_blocks = [block for block, _ in bundle.blocks]
ctx = TaskContext(
Expand Down
Loading

0 comments on commit 4b4cdcd

Please sign in to comment.