diff --git a/python/ray/data/_internal/block_batching/iter_batches.py b/python/ray/data/_internal/block_batching/iter_batches.py index 55be53918d69..0a87c9b3e949 100644 --- a/python/ray/data/_internal/block_batching/iter_batches.py +++ b/python/ray/data/_internal/block_batching/iter_batches.py @@ -1,4 +1,5 @@ import collections +import time from contextlib import nullcontext from typing import Any, Callable, Dict, Iterator, Optional, Tuple @@ -25,6 +26,9 @@ from ray.data.context import DataContext from ray.types import ObjectRef +# Interval for metrics update remote calls to _StatsActor during iteration. +STATS_UPDATE_INTERVAL_SECONDS = 30 + def iter_batches( block_refs: Iterator[Tuple[ObjectRef[Block], BlockMetadata]], @@ -176,6 +180,7 @@ def _async_iter_batches( async_batch_iter = make_async_gen(block_refs, fn=_async_iter_batches, num_workers=1) metrics_tag = {"dataset": dataset_tag} + last_stats_update_time = 0 while True: with stats.iter_total_blocked_s.timer() if stats else nullcontext(): try: @@ -184,7 +189,10 @@ def _async_iter_batches( break with stats.iter_user_s.timer() if stats else nullcontext(): yield next_batch - update_stats_actor_iter_metrics(stats, metrics_tag) + + if time.time() - last_stats_update_time >= STATS_UPDATE_INTERVAL_SECONDS: + update_stats_actor_iter_metrics(stats, metrics_tag) + last_stats_update_time = time.time() clear_stats_actor_iter_metrics(metrics_tag)