Skip to content

Commit

Permalink
[data] add interval for iter metrics reporting (ray-project#40750)
Browse files Browse the repository at this point in the history
After adding [iteration metrics](ray-project#40541) `test_batcher` runtime on premerge went from `20-25s` to `45-50s`. It also timed out on this [build](https://buildkite.com/ray-project/premerge/builds/10085#018b71c4-fcbc-45c8-a732-fe61ab8c7467).

We currently report metrics to the StatsActor after every batch, if batches are small, then there are too many calls.

This pr adds an minimum time interval for iteration metrics reporting of 30s.

Signed-off-by: Andrew Xue <[email protected]>
  • Loading branch information
Zandew authored Oct 30, 2023
1 parent 1ee167d commit f352b5c
Showing 1 changed file with 9 additions and 1 deletion.
10 changes: 9 additions & 1 deletion python/ray/data/_internal/block_batching/iter_batches.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import collections
import time
from contextlib import nullcontext
from typing import Any, Callable, Dict, Iterator, Optional, Tuple

Expand All @@ -25,6 +26,9 @@
from ray.data.context import DataContext
from ray.types import ObjectRef

# Interval for metrics update remote calls to _StatsActor during iteration.
STATS_UPDATE_INTERVAL_SECONDS = 30


def iter_batches(
block_refs: Iterator[Tuple[ObjectRef[Block], BlockMetadata]],
Expand Down Expand Up @@ -176,6 +180,7 @@ def _async_iter_batches(
async_batch_iter = make_async_gen(block_refs, fn=_async_iter_batches, num_workers=1)
metrics_tag = {"dataset": dataset_tag}

last_stats_update_time = 0
while True:
with stats.iter_total_blocked_s.timer() if stats else nullcontext():
try:
Expand All @@ -184,7 +189,10 @@ def _async_iter_batches(
break
with stats.iter_user_s.timer() if stats else nullcontext():
yield next_batch
update_stats_actor_iter_metrics(stats, metrics_tag)

if time.time() - last_stats_update_time >= STATS_UPDATE_INTERVAL_SECONDS:
update_stats_actor_iter_metrics(stats, metrics_tag)
last_stats_update_time = time.time()
clear_stats_actor_iter_metrics(metrics_tag)


Expand Down

0 comments on commit f352b5c

Please sign in to comment.