Skip to content

Commit

Permalink
[data] emit block generation time as data metric (ray-project#40610)
Browse files Browse the repository at this point in the history
Emits block generation time as a metric to ray data dashboard.

Signed-off-by: Andrew Xue <[email protected]>
  • Loading branch information
Zandew authored Oct 30, 2023
1 parent f352b5c commit afdcdd2
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 12 deletions.
8 changes: 6 additions & 2 deletions dashboard/client/src/pages/metrics/Metrics.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -205,13 +205,17 @@ const DATA_METRICS_CONFIG: MetricsSectionConfig[] = [
pathParams: "orgId=1&theme=light&panelId=7",
},
{
title: "Iteration Blocked Time",
title: "Block Generation Time",
pathParams: "orgId=1&theme=light&panelId=8",
},
{
title: "Iteration User Time",
title: "Iteration Blocked Time",
pathParams: "orgId=1&theme=light&panelId=9",
},
{
title: "Iteration User Time",
pathParams: "orgId=1&theme=light&panelId=10",
},
],
},
];
Expand Down
14 changes: 13 additions & 1 deletion dashboard/modules/metrics/dashboards/data_dashboard_panels.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,18 @@
),
Panel(
id=8,
title="Block Generation Time",
description="Time spent generating blocks.",
unit="seconds",
targets=[
Target(
expr="sum(ray_data_block_generation_seconds{{{global_filters}}}) by (dataset)",
legend="Block Generation Time: {{dataset}}",
)
],
),
Panel(
id=9,
title="Iteration Blocked Time",
description="Seconds user thread is blocked by iter_batches()",
unit="seconds",
Expand All @@ -104,7 +116,7 @@
],
),
Panel(
id=9,
id=10,
title="Iteration User Time",
description="Seconds spent in user code",
unit="seconds",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,13 @@ class OpRuntimeMetrics:
default=0, metadata={"map_only": True, "export_metric": True}
)

# === Miscellaneous metrics ===

# Time spent generating blocks.
block_generation_time: float = field(
default=0, metadata={"map_only": True, "export_metric": True}
)

def __init__(self, op: "PhysicalOperator"):
from ray.data._internal.execution.operators.map_operator import MapOperator

Expand Down Expand Up @@ -208,7 +215,9 @@ def on_output_generated(self, task_index: int, output: RefBundle):
if self.obj_store_mem_cur > self.obj_store_mem_peak:
self.obj_store_mem_peak = self.obj_store_mem_cur

for block_ref, _ in output.blocks:
for block_ref, meta in output.blocks:
assert meta.exec_stats and meta.exec_stats.wall_time_s
self.block_generation_time += meta.exec_stats.wall_time_s
trace_allocation(block_ref, "operator_output")

def on_task_finished(self, task_index: int):
Expand Down
8 changes: 8 additions & 0 deletions python/ray/data/_internal/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,12 @@ def __init__(self, max_stats=1000):
description="Bytes outputted by dataset operators",
tag_keys=tags_keys,
)
self.block_generation_time = Gauge(
"data_block_generation_seconds",
description="Time spent generating blocks.",
tag_keys=tags_keys,
)

self.iter_total_blocked_s = Gauge(
"data_iter_total_blocked_seconds",
description="Seconds user thread is blocked by iter_batches()",
Expand Down Expand Up @@ -247,6 +253,7 @@ def update_metrics(self, stats: Dict[str, Union[int, float]], tags: Dict[str, st
self.bytes_outputted.set(stats["bytes_outputs_generated"], tags)
self.cpu_usage.set(stats["cpu_usage"], tags)
self.gpu_usage.set(stats["gpu_usage"], tags)
self.block_generation_time.set(stats["block_generation_time"], tags)

def update_iter_metrics(self, stats: "DatasetStats", tags):
self.iter_total_blocked_s.set(stats.iter_total_blocked_s.get(), tags)
Expand All @@ -260,6 +267,7 @@ def clear_metrics(self, tags: Dict[str, str]):
self.bytes_outputted.set(0, tags)
self.cpu_usage.set(0, tags)
self.gpu_usage.set(0, tags)
self.block_generation_time.set(0, tags)

def clear_iter_metrics(self, tags: Dict[str, str]):
self.iter_total_blocked_s.set(0, tags)
Expand Down
32 changes: 24 additions & 8 deletions python/ray/data/tests/test_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ def gen_expected_metrics(
"'obj_store_mem_cur': Z",
"'obj_store_mem_peak': N",
f"""'obj_store_mem_spilled': {"N" if spilled else "Z"}""",
"'block_generation_time': N",
"'cpu_usage': Z",
"'gpu_usage': Z",
]
Expand Down Expand Up @@ -73,7 +74,7 @@ def gen_expected_metrics(
is_map=True,
spilled=False,
extra_metrics=[
"'ray_remote_args': {'num_cpus': Z.N, 'scheduling_strategy': 'DEFAULT'}"
"'ray_remote_args': {'num_cpus': N, 'scheduling_strategy': 'DEFAULT'}"
],
)

Expand Down Expand Up @@ -106,17 +107,19 @@ def canonicalize(stats: str, filter_global_stats: bool = True) -> str:
s1 = re.sub("[0-9\.]+(ms|us|s)", "T", s0)
# Memory expressions.
s2 = re.sub("[0-9\.]+(B|MB|GB)", "M", s1)
# Handle floats in (0, 1)
s3 = re.sub(" (0\.0*[1-9][0-9]*)", " N", s2)
# Handle zero values specially so we can check for missing values.
s3 = re.sub(" [0]+(\.[0]+)?", " Z", s2)
s4 = re.sub(" [0]+(\.[0])?", " Z", s3)
# Other numerics.
s4 = re.sub("[0-9]+(\.[0-9]+)?", "N", s3)
s5 = re.sub("[0-9]+(\.[0-9]+)?", "N", s4)
# Replace tabs with spaces.
s5 = re.sub("\t", " ", s4)
s6 = re.sub("\t", " ", s5)
if filter_global_stats:
s6 = s5.replace(CLUSTER_MEMORY_STATS, "")
s7 = s6.replace(DATASET_MEMORY_STATS, "")
return s7
return s5
s7 = s6.replace(CLUSTER_MEMORY_STATS, "")
s8 = s7.replace(DATASET_MEMORY_STATS, "")
return s8
return s6


def dummy_map_batches(x):
Expand Down Expand Up @@ -570,6 +573,7 @@ def check_stats():
" obj_store_mem_cur: Z,\n"
" obj_store_mem_peak: N,\n"
" obj_store_mem_spilled: Z,\n"
" block_generation_time: N,\n"
" cpu_usage: Z,\n"
" gpu_usage: Z,\n"
" ray_remote_args: {'num_cpus': N, 'scheduling_strategy': 'SPREAD'},\n"
Expand Down Expand Up @@ -1190,6 +1194,18 @@ def test_stats_actor_metrics():

assert "dataset" + ds._uuid == update_fn.call_args_list[-1].args[1]["dataset"]

def sleep_three(x):
import time

time.sleep(3)
return x

with patch_update_stats_actor() as update_fn:
ds = ray.data.range(3).map_batches(sleep_three, batch_size=1).materialize()

final_metric = update_fn.call_args_list[-1].args[0][-1]
assert final_metric.block_generation_time >= 9


def test_stats_actor_iter_metrics():
ds = ray.data.range(1e6).map_batches(lambda x: x)
Expand Down

0 comments on commit afdcdd2

Please sign in to comment.