Skip to content

Commit

Permalink
[data] assign dataset ids using statsactor (ray-project#40597)
Browse files Browse the repository at this point in the history
UUID is too long and not user-friendly, and we use it for tagging data metrics. 
This PR removes `uuid4()` and assigns `Dataset._uuid` using a global counter stored in `_StatsActor` for a shorter dataset id.

Signed-off-by: Andrew Xue <[email protected]>
  • Loading branch information
Zandew authored Oct 25, 2023
1 parent 1ed5e85 commit 728f2f4
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 9 deletions.
2 changes: 1 addition & 1 deletion python/ray/data/_internal/plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -591,7 +591,7 @@ def execute(
StreamingExecutor,
)

metrics_tag = (self._dataset_name or "") + self._dataset_uuid
metrics_tag = (self._dataset_name or "dataset") + self._dataset_uuid
executor = StreamingExecutor(
copy.deepcopy(context.execution_options),
metrics_tag,
Expand Down
22 changes: 22 additions & 0 deletions python/ray/data/_internal/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from contextlib import contextmanager
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Set, Union
from uuid import uuid4

import numpy as np

Expand Down Expand Up @@ -143,6 +144,9 @@ def __init__(self, max_stats=1000):
self.max_stats = max_stats
self.fifo_queue = []

# Assign dataset uuids with a global counter.
self.next_dataset_id = 0

# Ray Data dashboard metrics
# Everything is a gauge because we need to reset all of
# a dataset's metrics to 0 after each finishes execution.
Expand Down Expand Up @@ -220,6 +224,11 @@ def get(self, stats_uuid):
def _get_stats_dict_size(self):
return len(self.start_time), len(self.last_time), len(self.metadata)

def get_dataset_id(self):
dataset_id = str(self.next_dataset_id)
self.next_dataset_id += 1
return dataset_id

def update_metrics(self, stats: Dict[str, Union[int, float]], tags: Dict[str, str]):
self.bytes_spilled.set(stats["obj_store_mem_spilled"], tags)
self.bytes_allocated.set(stats["obj_store_mem_alloc"], tags)
Expand Down Expand Up @@ -269,6 +278,8 @@ def _check_cluster_stats_actor():
# Checks if global _stats_actor belongs to current cluster,
# if not, creates a new one on the current cluster.
global _stats_actor, _stats_actor_cluster_id
if ray._private.worker._global_node is None:
raise RuntimeError("Global node is not initialized.")
current_cluster_id = ray._private.worker._global_node.cluster_id
if _stats_actor is None or _stats_actor_cluster_id != current_cluster_id:
_stats_actor = _get_or_create_stats_actor()
Expand Down Expand Up @@ -297,6 +308,17 @@ def clear_stats_actor_metrics(tags: Dict[str, str]):
_stats_actor.clear_metrics.remote(tags)


def get_dataset_id_from_stats_actor() -> str:
global _stats_actor
try:
_check_cluster_stats_actor()
return ray.get(_stats_actor.get_dataset_id.remote())
except Exception:
# Getting dataset id from _StatsActor may fail, in this case
# fall back to uuid4
return uuid4().hex


class DatasetStats:
"""Holds the execution times for a given Dataset.
Expand Down
10 changes: 7 additions & 3 deletions python/ray/data/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
TypeVar,
Union,
)
from uuid import uuid4

import numpy as np

Expand Down Expand Up @@ -83,7 +82,11 @@
SortStage,
ZipStage,
)
from ray.data._internal.stats import DatasetStats, DatasetStatsSummary
from ray.data._internal.stats import (
DatasetStats,
DatasetStatsSummary,
get_dataset_id_from_stats_actor,
)
from ray.data._internal.util import ConsumptionAPI, _is_local_scheme, validate_compute
from ray.data.aggregate import AggregateFn, Max, Mean, Min, Std, Sum
from ray.data.block import (
Expand Down Expand Up @@ -236,7 +239,6 @@ def __init__(
usage_lib.record_library_usage("dataset") # Legacy telemetry name.

self._plan = plan
self._set_uuid(uuid4().hex)
self._logical_plan = logical_plan
if logical_plan is not None:
self._plan.link_logical_plan(logical_plan)
Expand All @@ -245,6 +247,8 @@ def __init__(
self._current_executor: Optional["Executor"] = None
self._write_ds = None

self._set_uuid(get_dataset_id_from_stats_actor())

@staticmethod
def copy(
ds: "Dataset", _deep_copy: bool = False, _as: Optional[type] = None
Expand Down
10 changes: 5 additions & 5 deletions python/ray/data/tests/test_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,7 @@ def test_dataset__repr__(ray_start_regular_shared):

expected_stats = (
"DatasetStatsSummary(\n"
" dataset_uuid=U,\n"
" dataset_uuid=N,\n"
" base_name=None,\n"
" number=N,\n"
" extra_metrics={},\n"
Expand Down Expand Up @@ -539,7 +539,7 @@ def check_stats():
assert len(ds2.take_all()) == n
expected_stats2 = (
"DatasetStatsSummary(\n"
" dataset_uuid=U,\n"
" dataset_uuid=N,\n"
" base_name=MapBatches(<lambda>),\n"
" number=N,\n"
" extra_metrics={\n"
Expand Down Expand Up @@ -596,7 +596,7 @@ def check_stats():
" dataset_bytes_spilled=M,\n"
" parents=[\n"
" DatasetStatsSummary(\n"
" dataset_uuid=U,\n"
" dataset_uuid=N,\n"
" base_name=None,\n"
" number=N,\n"
" extra_metrics={},\n"
Expand Down Expand Up @@ -1180,7 +1180,7 @@ def test_stats_actor_metrics():
# There should be nothing in object store at the end of execution.
assert final_metric.obj_store_mem_cur == 0

assert ds._uuid == update_fn.call_args_list[-1].args[1]["dataset"]
assert "dataset" + ds._uuid == update_fn.call_args_list[-1].args[1]["dataset"]


def test_dataset_name():
Expand Down Expand Up @@ -1219,7 +1219,7 @@ def test_dataset_name():
with patch_update_stats_actor() as update_fn:
mds = ds.materialize()

assert update_fn.call_args_list[-1].args[1]["dataset"] == mds._uuid
assert update_fn.call_args_list[-1].args[1]["dataset"] == "dataset" + mds._uuid

ds = ray.data.range(100, parallelism=20)
ds._set_name("very_loooooooong_name")
Expand Down

0 comments on commit 728f2f4

Please sign in to comment.