From f352b5cb8f01ad9f0f590176e3166565286ffe59 Mon Sep 17 00:00:00 2001 From: Andrew Xue Date: Mon, 30 Oct 2023 09:27:15 -0700 Subject: [PATCH] [data] add interval for iter metrics reporting (#40750) After adding [iteration metrics](https://github.com/ray-project/ray/pull/40541) `test_batcher` runtime on premerge went from `20-25s` to `45-50s`. It also timed out on this [build](https://buildkite.com/ray-project/premerge/builds/10085#018b71c4-fcbc-45c8-a732-fe61ab8c7467). We currently report metrics to the StatsActor after every batch, if batches are small, then there are too many calls. This pr adds an minimum time interval for iteration metrics reporting of 30s. Signed-off-by: Andrew Xue --- .../ray/data/_internal/block_batching/iter_batches.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/python/ray/data/_internal/block_batching/iter_batches.py b/python/ray/data/_internal/block_batching/iter_batches.py index 55be53918d693..0a87c9b3e949d 100644 --- a/python/ray/data/_internal/block_batching/iter_batches.py +++ b/python/ray/data/_internal/block_batching/iter_batches.py @@ -1,4 +1,5 @@ import collections +import time from contextlib import nullcontext from typing import Any, Callable, Dict, Iterator, Optional, Tuple @@ -25,6 +26,9 @@ from ray.data.context import DataContext from ray.types import ObjectRef +# Interval for metrics update remote calls to _StatsActor during iteration. +STATS_UPDATE_INTERVAL_SECONDS = 30 + def iter_batches( block_refs: Iterator[Tuple[ObjectRef[Block], BlockMetadata]], @@ -176,6 +180,7 @@ def _async_iter_batches( async_batch_iter = make_async_gen(block_refs, fn=_async_iter_batches, num_workers=1) metrics_tag = {"dataset": dataset_tag} + last_stats_update_time = 0 while True: with stats.iter_total_blocked_s.timer() if stats else nullcontext(): try: @@ -184,7 +189,10 @@ def _async_iter_batches( break with stats.iter_user_s.timer() if stats else nullcontext(): yield next_batch - update_stats_actor_iter_metrics(stats, metrics_tag) + + if time.time() - last_stats_update_time >= STATS_UPDATE_INTERVAL_SECONDS: + update_stats_actor_iter_metrics(stats, metrics_tag) + last_stats_update_time = time.time() clear_stats_actor_iter_metrics(metrics_tag)