diff --git a/python/ray/data/BUILD b/python/ray/data/BUILD index a8480564bdb4..87da09bc43b5 100644 --- a/python/ray/data/BUILD +++ b/python/ray/data/BUILD @@ -249,14 +249,6 @@ py_test( deps = ["//:ray_lib", ":conftest"], ) -py_test( - name = "test_pipeline", - size = "medium", - srcs = ["tests/test_pipeline.py"], - tags = ["team:data", "exclusive"], - deps = ["//:ray_lib", ":conftest"], -) - py_test( name = "test_tensor", size = "small", @@ -386,22 +378,6 @@ py_test( deps = ["//:ray_lib", ":conftest"], ) -py_test( - name = "test_pipeline_incremental_take", - size = "small", - srcs = ["tests/test_pipeline_incremental_take.py"], - tags = ["team:data", "exclusive"], - deps = ["//:ray_lib", ":conftest"], -) - -py_test( - name = "test_pipeline_nohang", - size = "medium", - srcs = ["tests/test_pipeline_nohang.py"], - tags = ["team:data", "exclusive"], - deps = ["//:ray_lib", ":conftest"], -) - py_test( name = "test_progress_bar", size = "small", diff --git a/python/ray/data/tests/preprocessors/test_chain.py b/python/ray/data/tests/preprocessors/test_chain.py index c93cd2c4282d..4328690a1bdb 100644 --- a/python/ray/data/tests/preprocessors/test_chain.py +++ b/python/ray/data/tests/preprocessors/test_chain.py @@ -83,37 +83,6 @@ def udf(df): assert pred_out_df.equals(pred_expected_df) -def test_chain_pipeline(): - """Tests Chain functionality with DatasetPipeline.""" - - col_a = [-1, -1, -1, -1] - col_b = [1, 1, 1, 1] - in_df = pd.DataFrame.from_dict({"A": col_a, "B": col_b}) - ds = ray.data.from_pandas(in_df).repeat(1) - - def udf(df): - df["A"] *= 2 - return df - - def udf2(df): - df["B"] *= 2 - return df - - batch_mapper = BatchMapper(fn=udf, batch_format="pandas") - batch_mapper2 = BatchMapper(fn=udf2, batch_format="pandas") - chain = Chain(batch_mapper, batch_mapper2) - - transformed = chain._transform_pipeline(ds) - out_df = next(transformed.iter_batches(batch_format="pandas", batch_size=4)) - - processed_col_a = [-2, -2, -2, -2] - processed_col_b = [2, 2, 2, 2] - - expected_df = pd.DataFrame({"A": processed_col_a, "B": processed_col_b}) - - assert out_df.equals(expected_df) - - def test_nested_chain_state(): col_a = [-1, -1, 1, 1] col_b = [1, 1, 1, None] diff --git a/python/ray/data/tests/preprocessors/test_preprocessors.py b/python/ray/data/tests/preprocessors/test_preprocessors.py index efda54907d24..c5b1cc3a932c 100644 --- a/python/ray/data/tests/preprocessors/test_preprocessors.py +++ b/python/ray/data/tests/preprocessors/test_preprocessors.py @@ -167,15 +167,7 @@ def test_fit_twice(mocked_warn): mocked_warn.assert_called_once_with(msg) -def _apply_transform(preprocessor, ds): - if isinstance(ds, ray.data.DatasetPipeline): - return preprocessor._transform_pipeline(ds) - else: - return preprocessor.transform(ds) - - -@pytest.mark.parametrize("pipeline", [True, False]) -def test_transform_config(pipeline): +def test_transform_config(): """Tests that the transform_config of the Preprocessor is respected during transform.""" @@ -201,36 +193,11 @@ def _determine_transform_to_use(self): prep = DummyPreprocessor() ds = ray.data.from_pandas(pd.DataFrame({"value": list(range(4))})) - if pipeline: - ds = ds.window(blocks_per_window=1).repeat() - _apply_transform(prep, ds) - - -def test_pipeline_fail(): - ds = ray.data.range(5).window(blocks_per_window=1).repeat(1) - - class FittablePreprocessor(Preprocessor): - _is_fittable = True + prep.transform(ds) - def _fit(self, dataset): - self.fitted_ = True - return self - def _transform_numpy(data): - return data - - prep = FittablePreprocessor() - with pytest.raises(RuntimeError): - _apply_transform(prep, ds) - - # Does not fail if preprocessor is already fitted. - fitted_prep = prep.fit(ds) - _apply_transform(fitted_prep, ds) - - -@pytest.mark.parametrize("pipeline", [True, False]) @pytest.mark.parametrize("dataset_format", ["simple", "pandas", "arrow"]) -def test_transform_all_formats(create_dummy_preprocessors, pipeline, dataset_format): +def test_transform_all_formats(create_dummy_preprocessors, dataset_format): ( with_nothing, with_pandas, @@ -250,38 +217,32 @@ def test_transform_all_formats(create_dummy_preprocessors, pipeline, dataset_for else: raise ValueError(f"Untested dataset_format configuration: {dataset_format}.") - if pipeline: - ds = ds.window(blocks_per_window=1).repeat(1) - with pytest.raises(NotImplementedError): - _apply_transform(with_nothing, ds) + with_nothing.transform(ds) - if pipeline: - patcher = patch.object(ray.data.dataset_pipeline.DatasetPipeline, "map_batches") - else: - patcher = patch.object(ray.data.dataset.Dataset, "map_batches") + patcher = patch.object(ray.data.dataset.Dataset, "map_batches") with patcher as mock_map_batches: - _apply_transform(with_pandas, ds) + with_pandas.transform(ds) mock_map_batches.assert_called_once_with( with_pandas._transform_pandas, batch_format=BatchFormat.PANDAS ) with patcher as mock_map_batches: - _apply_transform(with_numpy, ds) + with_numpy.transform(ds) mock_map_batches.assert_called_once_with( with_numpy._transform_numpy, batch_format=BatchFormat.NUMPY ) # Pandas preferred by default. with patcher as mock_map_batches: - _apply_transform(with_pandas_and_numpy, ds) + with_pandas_and_numpy.transform(ds) mock_map_batches.assert_called_once_with( with_pandas_and_numpy._transform_pandas, batch_format=BatchFormat.PANDAS ) with patcher as mock_map_batches: - _apply_transform(with_pandas_and_numpy_preferred, ds) + with_pandas_and_numpy_preferred.transform(ds) mock_map_batches.assert_called_once_with( with_pandas_and_numpy_preferred._transform_numpy, batch_format=BatchFormat.NUMPY ) diff --git a/python/ray/data/tests/test_context_propagation.py b/python/ray/data/tests/test_context_propagation.py index 41231261e3c1..03761d239f6c 100644 --- a/python/ray/data/tests/test_context_propagation.py +++ b/python/ray/data/tests/test_context_propagation.py @@ -6,7 +6,6 @@ from ray.data.block import BlockMetadata from ray.data.context import DataContext from ray.data.datasource import Datasource, ReadTask -from ray.data.tests.util import extract_values from ray.tests.conftest import * # noqa @@ -64,20 +63,6 @@ def test_map(ray_start_regular_shared): assert ds.take_all()[0]["id"] == 70001 -def test_map_pipeline(ray_start_regular_shared): - context = DataContext.get_current() - context.foo = 8 - pipe = ray.data.range(2).repeat(2) - pipe = pipe.map(lambda x: {"id": DataContext.get_current().foo}) - [a, b] = pipe.split(2) - - @ray.remote - def fetch(shard): - return extract_values("id", shard.take_all()) - - assert ray.get([fetch.remote(a), fetch.remote(b)]) == [[8, 8], [8, 8]] - - def test_flat_map(ray_start_regular_shared): context = DataContext.get_current() context.foo = 70002 @@ -123,10 +108,8 @@ def test_context_placement_group(): ) ray.get(placement_group.ready()) context.scheduling_strategy = PlacementGroupSchedulingStrategy(placement_group) -pipe = ray.data.range(100, parallelism=2) \ - .window(blocks_per_window=1) \ - .map(lambda x: {"id": x["id"] + 1}) -assert pipe.take_all() == [{"id": x} for x in range(1, 101)] +ds = ray.data.range(100, parallelism=2).map(lambda x: {"id": x["id"] + 1}) +assert ds.take_all() == [{"id": x} for x in range(1, 101)] placement_group_assert_no_leak([placement_group]) ray.shutdown() """ diff --git a/python/ray/data/tests/test_dynamic_block_split.py b/python/ray/data/tests/test_dynamic_block_split.py index c910507a54bf..a943066e397e 100644 --- a/python/ray/data/tests/test_dynamic_block_split.py +++ b/python/ray/data/tests/test_dynamic_block_split.py @@ -175,35 +175,6 @@ def test_dataset( assert len(batch["one"]) == 10 -def test_dataset_pipeline(ray_start_regular_shared, target_max_block_size): - # Test 10 tasks, each task returning 10 blocks, each block has 1 row and each - # row has 1024 bytes. - num_blocks_per_task = 10 - block_size = 1024 - num_tasks = 10 - - ds = ray.data.read_datasource( - RandomBytesDatasource(), - parallelism=num_tasks, - num_blocks_per_task=num_blocks_per_task, - block_size=block_size, - ) - dsp = ds.window(blocks_per_window=2) - assert dsp._length == num_tasks / 2 - - dsp = dsp.map_batches(lambda x: x) - result_batches = list(ds.iter_batches(batch_size=5)) - for batch in result_batches: - assert len(batch["one"]) == 5 - assert len(result_batches) == num_blocks_per_task * num_tasks / 5 - - dsp = ds.window(blocks_per_window=2) - assert dsp._length == num_tasks / 2 - - dsp = ds.repeat().map_batches(lambda x: x) - assert len(dsp.take(5)) == 5 - - def test_filter(ray_start_regular_shared, target_max_block_size): # Test 10 tasks, each task returning 10 blocks, each block has 1 row and each # row has 1024 bytes. diff --git a/python/ray/data/tests/test_iterator.py b/python/ray/data/tests/test_iterator.py index 5313e720d86e..8106ee11539a 100644 --- a/python/ray/data/tests/test_iterator.py +++ b/python/ray/data/tests/test_iterator.py @@ -85,32 +85,6 @@ def test_basic_dataset_iter_rows(ray_start_regular_shared): # assert it.stats() == ds.stats() -def test_basic_dataset_pipeline(ray_start_regular_shared): - ds = ray.data.range(100).window(bytes_per_window=1).repeat() - it = ds.iterator() - for _ in range(2): - result = [] - for batch in it.iter_batches(): - batch = batch["id"].tolist() - result += batch - assert result == list(range(100)) - - assert it.stats() == ds.stats() - - -def test_basic_dataset_pipeline_iter_rows(ray_start_regular_shared): - ds = ray.data.range(100).window(bytes_per_window=1).repeat() - it = ds.iterator() - for _ in range(2): - result = [] - for row in it.iter_rows(): - row = row["id"] - result.append(row) - assert result == list(range(100)) - - assert it.stats() == ds.stats() - - def test_tf_conversion(ray_start_regular_shared): ds = ray.data.range(5) it = ds.iterator() @@ -129,45 +103,6 @@ def test_tf_e2e(ray_start_regular_shared): model.fit(it.to_tf("id", "id"), epochs=3) -def test_tf_e2e_pipeline(ray_start_regular_shared): - ds = ray.data.range(5).repeat(2) - it = ds.iterator() - model = build_model() - model.fit(it.to_tf("id", "id"), epochs=2) - - ds = ray.data.range(5).repeat(2) - it = ds.iterator() - model = build_model() - # 3 epochs fails since we only repeated twice. - with pytest.raises(Exception, match=r"generator raised StopIteration"): - model.fit(it.to_tf("id", "id"), epochs=3) - - -def test_tf_conversion_pipeline(ray_start_regular_shared): - ds = ray.data.range(5).repeat(2) - it = ds.iterator() - tf_dataset = it.to_tf("id", "id") - for i, row in enumerate(tf_dataset): - assert all(row[0] == i) - assert all(row[1] == i) - assert isinstance(row[0], tf.Tensor) - assert isinstance(row[1], tf.Tensor) - - # Repeated twice. - tf_dataset = it.to_tf("id", "id") - for i, row in enumerate(tf_dataset): - assert all(row[0] == i) - assert all(row[1] == i) - assert isinstance(row[0], tf.Tensor) - assert isinstance(row[1], tf.Tensor) - - # Fails on third try. - with pytest.raises(Exception, match=r"generator raised StopIteration"): - tf_dataset = it.to_tf("id", "id") - for _ in tf_dataset: - pass - - def test_torch_conversion(ray_start_regular_shared): ds = ray.data.range(5) it = ds.iterator() @@ -200,26 +135,6 @@ def test_torch_multi_use_iterator(ray_start_regular_shared): assert batch["id"].tolist() == list(range(5)) -def test_torch_conversion_pipeline(ray_start_regular_shared): - ds = ray.data.range(5).repeat(2) - it = ds.iterator() - - # First epoch. - for batch in it.iter_torch_batches(): - assert isinstance(batch["id"], torch.Tensor) - assert batch["id"].tolist() == list(range(5)) - - # Second epoch. - for batch in it.iter_torch_batches(): - assert isinstance(batch["id"], torch.Tensor) - assert batch["id"].tolist() == list(range(5)) - - # Fails on third iteration. - with pytest.raises(Exception, match=r"generator raised StopIteration"): - for batch in it.iter_torch_batches(): - pass - - def test_torch_conversion_collate_fn(ray_start_regular_shared): def collate_fn(batch: Dict[str, np.ndarray]): return torch.as_tensor(batch["id"] + 5) diff --git a/python/ray/data/tests/test_object_gc.py b/python/ray/data/tests/test_object_gc.py index 754c727b733f..6146d689e0d0 100644 --- a/python/ray/data/tests/test_object_gc.py +++ b/python/ray/data/tests/test_object_gc.py @@ -8,12 +8,12 @@ from ray.tests.conftest import * # noqa -def check_no_spill(ctx, pipe): - # Run up to 10 epochs of the pipeline to stress test that +def check_no_spill(ctx, dataset): + # Iterate over the dataset for 10 epochs to stress test that # no spilling will happen. max_epoch = 10 - for p in pipe.iter_epochs(max_epoch): - for _ in p.iter_batches(batch_size=None): + for _ in range(max_epoch): + for _ in dataset.iter_batches(batch_size=None): pass meminfo = memory_summary(ctx.address_info["address"], stats_only=True) assert "Spilled" not in meminfo, meminfo @@ -31,34 +31,34 @@ def _all_executor_threads_exited(): wait_for_condition(_all_executor_threads_exited, timeout=10, retry_interval_ms=1000) -def check_to_torch_no_spill(ctx, pipe): - # Run up to 10 epochs of the pipeline to stress test that +def check_to_torch_no_spill(ctx, dataset): + # Iterate over the dataset for 10 epochs to stress test that # no spilling will happen. max_epoch = 10 - for p in pipe.iter_epochs(max_epoch): - for _ in p.to_torch(batch_size=None): + for _ in range(max_epoch): + for _ in dataset.to_torch(batch_size=None): pass meminfo = memory_summary(ctx.address_info["address"], stats_only=True) assert "Spilled" not in meminfo, meminfo -def check_iter_torch_batches_no_spill(ctx, pipe): - # Run up to 10 epochs of the pipeline to stress test that +def check_iter_torch_batches_no_spill(ctx, dataset): + # Iterate over the dataset for 10 epochs to stress test that # no spilling will happen. max_epoch = 10 - for p in pipe.iter_epochs(max_epoch): - for _ in p.iter_torch_batches(batch_size=None): + for _ in range(max_epoch): + for _ in dataset.iter_torch_batches(batch_size=None): pass meminfo = memory_summary(ctx.address_info["address"], stats_only=True) assert "Spilled" not in meminfo, meminfo -def check_to_tf_no_spill(ctx, pipe): - # Run up to 10 epochs of the pipeline to stress test that +def check_to_tf_no_spill(ctx, dataset): + # Iterate over the dataset for 10 epochs to stress test that # no spilling will happen. max_epoch = 10 - for p in pipe.iter_epochs(max_epoch): - for _ in p.to_tf( + for _ in range(max_epoch): + for _ in dataset.to_tf( feature_columns="data", label_columns="label", batch_size=None ): pass @@ -66,12 +66,12 @@ def check_to_tf_no_spill(ctx, pipe): assert "Spilled" not in meminfo, meminfo -def check_iter_tf_batches_no_spill(ctx, pipe): - # Run up to 10 epochs of the pipeline to stress test that +def check_iter_tf_batches_no_spill(ctx, dataset): + # Iterate over the dataset for 10 epochs to stress test that # no spilling will happen. max_epoch = 10 - for p in pipe.iter_epochs(max_epoch): - for _ in p.iter_tf_batches(): + for _ in range(max_epoch): + for _ in dataset.iter_tf_batches(): pass meminfo = memory_summary(ctx.address_info["address"], stats_only=True) assert "Spilled" not in meminfo, meminfo @@ -82,8 +82,7 @@ def test_iter_batches_no_spilling_upon_no_transformation(shutdown_only): ctx = ray.init(num_cpus=1, object_store_memory=300e6) # The size of dataset is 500*(80*80*4)*8B, about 100MB. ds = ray.data.range_tensor(500, shape=(80, 80, 4), parallelism=100) - check_no_spill(ctx, ds.repeat()) - check_no_spill(ctx, ds.window(blocks_per_window=20)) + check_no_spill(ctx, ds) def test_torch_iteration(shutdown_only): @@ -93,11 +92,9 @@ def test_torch_iteration(shutdown_only): ds = ray.data.range_tensor(500, shape=(80, 80, 4), parallelism=100) # to_torch - check_to_torch_no_spill(ctx, ds.repeat()) - check_to_torch_no_spill(ctx, ds.window(blocks_per_window=20)) + check_to_torch_no_spill(ctx, ds) # iter_torch_batches - check_iter_torch_batches_no_spill(ctx, ds.repeat()) - check_iter_torch_batches_no_spill(ctx, ds.window(blocks_per_window=20)) + check_iter_torch_batches_no_spill(ctx, ds) def test_tf_iteration(shutdown_only): @@ -109,24 +106,9 @@ def test_tf_iteration(shutdown_only): ) # to_tf - check_to_tf_no_spill(ctx, ds.repeat().map(lambda x: x)) - check_to_tf_no_spill(ctx, ds.window(blocks_per_window=20).map(lambda x: x)) + check_to_tf_no_spill(ctx, ds.map(lambda x: x)) # iter_tf_batches - check_iter_tf_batches_no_spill(ctx, ds.repeat().map(lambda x: x)) - check_iter_tf_batches_no_spill( - ctx, ds.window(blocks_per_window=20).map(lambda x: x) - ) - - -def test_iter_batches_no_spilling_upon_rewindow(shutdown_only): - # The object store is about 300MB. - ctx = ray.init(num_cpus=1, object_store_memory=300e6) - # The size of dataset is 500*(80*80*4)*8B, about 100MB. - ds = ray.data.range_tensor(500, shape=(80, 80, 4), parallelism=100) - - check_no_spill( - ctx, ds.window(blocks_per_window=20).repeat().rewindow(blocks_per_window=10) - ) + check_iter_tf_batches_no_spill(ctx, ds.map(lambda x: x)) def test_iter_batches_no_spilling_upon_prior_transformation(shutdown_only): @@ -135,10 +117,7 @@ def test_iter_batches_no_spilling_upon_prior_transformation(shutdown_only): # The size of dataset is 500*(80*80*4)*8B, about 100MB. ds = ray.data.range_tensor(500, shape=(80, 80, 4), parallelism=100) - # Repeat, with transformation prior to the pipeline. - check_no_spill(ctx, ds.map_batches(lambda x: x).repeat()) - # Window, with transformation prior to the pipeline. - check_no_spill(ctx, ds.map_batches(lambda x: x).window(blocks_per_window=20)) + check_no_spill(ctx, ds.map_batches(lambda x: x)) def test_iter_batches_no_spilling_upon_post_transformation(shutdown_only): @@ -147,10 +126,7 @@ def test_iter_batches_no_spilling_upon_post_transformation(shutdown_only): # The size of dataset is 500*(80*80*4)*8B, about 100MB. ds = ray.data.range_tensor(500, shape=(80, 80, 4), parallelism=100) - # Repeat, with transformation post the pipeline creation. - check_no_spill(ctx, ds.repeat().map_batches(lambda x: x, batch_size=5)) - # Window, with transformation post the pipeline creation. - check_no_spill(ctx, ds.window(blocks_per_window=20).map_batches(lambda x: x)) + check_no_spill(ctx, ds.map_batches(lambda x: x, batch_size=5)) def test_iter_batches_no_spilling_upon_transformations(shutdown_only): @@ -159,79 +135,10 @@ def test_iter_batches_no_spilling_upon_transformations(shutdown_only): # The size of dataset is 500*(80*80*4)*8B, about 100MB. ds = ray.data.range_tensor(500, shape=(80, 80, 4), parallelism=100) - # Repeat, with transformation before and post the pipeline. check_no_spill( ctx, - ds.map_batches(lambda x: x, batch_size=5) - .repeat() - .map_batches(lambda x: x, batch_size=5), + ds.map_batches(lambda x: x).map_batches(lambda x: x), ) - # Window, with transformation before and post the pipeline. - check_no_spill( - ctx, - ds.map_batches(lambda x: x) - .window(blocks_per_window=20) - .map_batches(lambda x: x), - ) - - -def test_iter_batches_no_spilling_upon_shuffle(shutdown_only): - # The object store is about 500MB. - ctx = ray.init(num_cpus=1, object_store_memory=500e6) - # The size of dataset is 500*(80*80*4)*8B, about 100MB. - ds = ray.data.range_tensor(500, shape=(80, 80, 4), parallelism=100) - - check_no_spill(ctx, ds.repeat().random_shuffle_each_window()) - check_no_spill(ctx, ds.window(blocks_per_window=20).random_shuffle_each_window()) - - -def test_pipeline_splitting_has_no_spilling(shutdown_only): - # The object store is about 800MiB. - ctx = ray.init(num_cpus=1, object_store_memory=1200e6) - # The size of dataset is 50000*(80*80*4)*8B, about 10GiB, 50MiB/block. - ds = ray.data.range_tensor(5000, shape=(80, 80, 4), parallelism=20) - - # 2 blocks/window. - p = ds.window(bytes_per_window=100 * 1024 * 1024).repeat(2) - p1, p2 = p.split(2) - - @ray.remote - def consume(p): - for batch in p.iter_batches(batch_size=None): - pass - print(p.stats()) - - tasks = [consume.remote(p1), consume.remote(p2)] - ray.get(tasks) - meminfo = memory_summary(ctx.address_info["address"], stats_only=True) - assert "Spilled" not in meminfo, meminfo - - -def test_pipeline_splitting_has_no_spilling_with_equal_splitting(shutdown_only): - # The object store is about 1200MiB. - ctx = ray.init(num_cpus=1, object_store_memory=1200e6) - # The size of dataset is 50000*(80*80*4)*8B, about 10GiB, 50MiB/block. - ds = ray.data.range_tensor(50000, shape=(80, 80, 4), parallelism=200) - - # 150Mib/window, which is 3 blocks/window, which means equal splitting - # will need to split one block. - p = ds.window(bytes_per_window=150 * 1024 * 1024).repeat() - p1, p2 = p.split(2, equal=True) - - @ray.remote - def consume(p): - for batch in p.iter_batches(): - pass - - tasks = [consume.remote(p1), consume.remote(p2)] - try: - # Run it for 20 seconds. - ray.get(tasks, timeout=20) - except Exception: - for t in tasks: - ray.cancel(t, force=True) - meminfo = memory_summary(ctx.address_info["address"], stats_only=True) - assert "Spilled" not in meminfo, meminfo def test_global_bytes_spilled(shutdown_only): @@ -241,7 +148,8 @@ def test_global_bytes_spilled(shutdown_only): ds = ray.data.range_tensor(500, shape=(80, 80, 4), parallelism=100).materialize() with pytest.raises(AssertionError): - check_no_spill(ctx, ds.repeat()) + check_no_spill(ctx, ds) + print(ds._get_stats_summary()) assert ds._get_stats_summary().global_bytes_spilled > 0 assert ds._get_stats_summary().global_bytes_restored > 0 @@ -254,7 +162,7 @@ def test_no_global_bytes_spilled(shutdown_only): # The size of dataset is 500*(80*80*4)*8B, about 100MB. ds = ray.data.range_tensor(500, shape=(80, 80, 4), parallelism=100).materialize() - check_no_spill(ctx, ds.repeat()) + check_no_spill(ctx, ds) assert ds._get_stats_summary().global_bytes_spilled == 0 assert ds._get_stats_summary().global_bytes_restored == 0 diff --git a/python/ray/data/tests/test_optimize.py b/python/ray/data/tests/test_optimize.py index 936638181870..f3b0b1286a14 100644 --- a/python/ray/data/tests/test_optimize.py +++ b/python/ray/data/tests/test_optimize.py @@ -3,8 +3,6 @@ import numpy as np import pandas as pd -import pyarrow as pa -import pyarrow.parquet as pq import pytest import ray @@ -96,67 +94,6 @@ def prepare_read( return read_tasks -@pytest.mark.skip(reason="Flaky, see https://github.com/ray-project/ray/issues/24757") -@pytest.mark.parametrize("lazy_input", [True, False]) -def test_memory_release_pipeline(shutdown_only, lazy_input): - context = DataContext.get_current() - # Disable stage fusion so we can keep reads and maps from being fused together, - # since we're trying to test multi-stage memory releasing here. - context.optimize_fuse_stages = False - # This object store allocation can hold at most 1 copy of the transformed dataset. - if lazy_input: - object_store_memory = 3000e6 - else: - object_store_memory = 3000e6 - - n = 10 - info = ray.init(num_cpus=n, object_store_memory=object_store_memory) - if lazy_input: - ds = ray.data.read_datasource( - OnesSource(), - parallelism=n, - n_per_block=100 * 1024 * 1024, - ) - else: - ds = ray.data.from_items(list(range(n)), parallelism=n) - - # Create a single-window pipeline. - pipe = ds.window(blocks_per_window=n) - - # Round 1. - def gen(x): - import time - - # TODO(Clark): Remove this sleep once we have fixed memory pressure handling. - time.sleep(2) - if isinstance(x, np.ndarray): - return x - else: - return np.ones(100 * 1024 * 1024, dtype=np.uint8) - - pipe = pipe.map(gen) - - def inc(x): - import time - - # TODO(Clark): Remove this sleep once we have fixed memory pressure handling. - time.sleep(2) - return {"id": x["id"] + 1} - - num_rounds = 10 - for _ in range(num_rounds): - pipe = pipe.map(inc) - - for block in pipe.iter_batches(batch_size=None): - for arr in block: - np.testing.assert_equal( - arr, - np.ones(100 * 1024 * 1024, dtype=np.uint8) + num_rounds, - ) - meminfo = memory_summary(info["address"], stats_only=True) - assert "Spilled" not in meminfo, meminfo - - def test_memory_release_lazy(shutdown_only): context = DataContext.get_current() # Ensure that stage fusion is enabled. @@ -341,18 +278,6 @@ def test_optimize_reorder(ray_start_regular_shared): ) -def test_window_randomize_fusion(ray_start_regular_shared): - context = DataContext.get_current() - context.optimize_fuse_stages = True - context.optimize_fuse_read_stages = True - context.optimize_reorder_stages = True - - pipe = ray.data.range(100).randomize_block_order().window().map_batches(dummy_map) - pipe.take() - stats = pipe.stats() - assert "ReadRange->RandomizeBlockOrder->MapBatches(dummy_map)" in stats, stats - - def test_write_fusion(ray_start_regular_shared, tmp_path): context = DataContext.get_current() context.optimize_fuse_stages = True @@ -391,289 +316,6 @@ def test_write_doesnt_reorder_randomize_block(ray_start_regular_shared, tmp_path assert "Write" in stats, stats -def test_optimize_fuse(ray_start_regular_shared): - context = DataContext.get_current() - - def build_pipe(): - pipe = ray.data.range(3).window(blocks_per_window=1).repeat(2) - pipe = pipe.map_batches(dummy_map) - pipe = pipe.map_batches(dummy_map) - pipe = pipe.random_shuffle_each_window() - results = [] - for p in pipe.iter_epochs(): - result = sorted(extract_values("id", p.take())) - results.append(result) - assert results == [[0, 1, 2], [0, 1, 2]], results - return pipe - - context.optimize_fuse_stages = True - context.optimize_fuse_read_stages = True - context.optimize_fuse_shuffle_stages = True - expect_stages( - build_pipe(), - 1, - [ - "ReadRange->MapBatches(dummy_map)->MapBatches(dummy_map)->RandomShuffleMap", # noqa: E501 - "RandomShuffleReduce", - ], - ) - - context.optimize_fuse_stages = True - context.optimize_fuse_read_stages = False - context.optimize_fuse_shuffle_stages = True - expect_stages( - build_pipe(), - 1, - [ - "Read", - "MapBatches(dummy_map)->MapBatches(dummy_map)->RandomShuffleMap", - "RandomShuffleReduce", - ], - ) - - context.optimize_fuse_stages = True - context.optimize_fuse_read_stages = False - context.optimize_fuse_shuffle_stages = False - expect_stages( - build_pipe(), - 2, - [ - "Read", - "MapBatches(dummy_map)->MapBatches(dummy_map)", - "RandomShuffleMap", - "RandomShuffleReduce", - ], - ) - - context.optimize_fuse_stages = False - context.optimize_fuse_read_stages = False - context.optimize_fuse_shuffle_stages = False - expect_stages( - build_pipe(), - 3, - [ - "Read", - "MapBatches(dummy_map)", - "MapBatches(dummy_map)", - "RandomShuffleMap", - "RandomShuffleReduce", - ], - ) - - -def test_optimize_equivalent_remote_args(ray_start_regular_shared): - context = DataContext.get_current() - context.optimize_fuse_stages = True - context.optimize_fuse_read_stages = True - context.optimize_fuse_shuffle_stages = True - - equivalent_kwargs = [ - {}, - {"resources": {"blah": 0}}, - {"resources": {"blah": None}}, - {"num_cpus": None}, - {"num_cpus": 1}, - {"num_cpus": 1, "num_gpus": 0}, - {"num_cpus": 1, "num_gpus": None}, - ] - - for kwa in equivalent_kwargs: - for kwb in equivalent_kwargs: - print("CHECKING", kwa, kwb) - pipe = ray.data.range(3).repeat(2) - pipe = pipe.map_batches(dummy_map, batch_size=64, **kwa) - pipe = pipe.map_batches(dummy_map, batch_size=64, **kwb) - pipe.take() - expect_stages( - pipe, - 1, - [ - "ReadRange->MapBatches(dummy_map)->MapBatches(dummy_map)", - ], - ) - - for kwa in equivalent_kwargs: - for kwb in equivalent_kwargs: - print("CHECKING", kwa, kwb) - pipe = ray.data.range(3).repeat(2) - pipe = pipe.map_batches(dummy_map, batch_size=64, **kwa) - pipe = pipe.random_shuffle_each_window(**kwb) - pipe.take() - expect_stages( - pipe, - 1, - [ - "ReadRange->MapBatches(dummy_map)->RandomShuffleMap", - "RandomShuffleReduce", - ], - ) - - -def test_optimize_incompatible_stages(shutdown_only): - ray.shutdown() - ray.init(num_cpus=2) - context = DataContext.get_current() - context.optimize_fuse_stages = True - context.optimize_fuse_read_stages = True - context.optimize_fuse_shuffle_stages = True - - pipe = ray.data.range(3).repeat(2) - # Should get fused as long as their resource types are compatible. - pipe = pipe.map_batches(dummy_map, compute=ray.data.ActorPoolStrategy()) - # Cannot fuse actors->tasks. - pipe = pipe.map_batches(dummy_map) - pipe = pipe.random_shuffle_each_window() - pipe.take() - expect_stages( - pipe, - 2, - [ - "ReadRange->MapBatches(dummy_map)", - "MapBatches(dummy_map)->RandomShuffleMap", - "RandomShuffleReduce", - ], - ) - - pipe = ray.data.range(3).repeat(2) - pipe = pipe.map_batches(dummy_map) - pipe = pipe.map_batches(dummy_map, num_cpus=0.75) - pipe = pipe.random_shuffle_each_window() - pipe.take() - expect_stages( - pipe, - 3, - [ - "ReadRange->MapBatches(dummy_map)", - "MapBatches(dummy_map)", - "RandomShuffleMap", - "RandomShuffleReduce", - ], - ) - - -def test_optimize_callable_classes(shutdown_only, tmp_path): - ray.shutdown() - ray.init(num_cpus=2) - context = DataContext.get_current() - context.optimize_fuse_stages = True - context.optimize_fuse_read_stages = True - context.optimize_fuse_shuffle_stages = True - - def put(x): - # We only support automatic deref in the legacy backend. - if DataContext.get_current().new_execution_backend: - return x - else: - return ray.put(x) - - # Set up. - df = pd.DataFrame({"one": [1, 2, 3], "two": [2, 3, 4]}) - table = pa.Table.from_pandas(df) - pq.write_table(table, os.path.join(tmp_path, "test1.parquet")) - - class CallableFn: - def __init__(self, a, b=None): - assert a == 1 - assert b == 2 - self.a = a - self.b = b - - def __call__(self, x): - return self.b * x + self.a - - # Test callable chain. - fn_constructor_args = (put(1),) - fn_constructor_kwargs = {"b": put(2)} - pipe = ( - ray.data.read_parquet(str(tmp_path)) - .repeat(2) - .map_batches( - CallableFn, - batch_size=1, - batch_format="pandas", - compute=ray.data.ActorPoolStrategy(), - fn_constructor_args=fn_constructor_args, - fn_constructor_kwargs=fn_constructor_kwargs, - ) - .map_batches( - CallableFn, - batch_size=1, - batch_format="pandas", - compute=ray.data.ActorPoolStrategy(), - fn_constructor_args=fn_constructor_args, - fn_constructor_kwargs=fn_constructor_kwargs, - ) - ) - ds_list = pipe.take() - values = [s["one"] for s in ds_list] - assert values == [7, 11, 15, 7, 11, 15] - values = [s["two"] for s in ds_list] - assert values == [11, 15, 19, 11, 15, 19] - expect_stages( - pipe, - 1, - [ - "ReadParquet->MapBatches(CallableFn)->MapBatches(CallableFn)", - ], - ) - - # Test function + callable chain. - fn_constructor_args = (put(1),) - fn_constructor_kwargs = {"b": put(2)} - pipe = ( - ray.data.read_parquet(str(tmp_path)) - .repeat(2) - .map_batches( - lambda df, a, b=None: b * df + a, - batch_size=1, - batch_format="pandas", - compute=ray.data.ActorPoolStrategy(), - fn_args=(put(1),), - fn_kwargs={"b": put(2)}, - ) - .map_batches( - CallableFn, - batch_size=1, - batch_format="pandas", - compute=ray.data.ActorPoolStrategy(), - fn_constructor_args=fn_constructor_args, - fn_constructor_kwargs=fn_constructor_kwargs, - ) - ) - ds_list = pipe.take() - values = [s["one"] for s in ds_list] - assert values == [7, 11, 15, 7, 11, 15] - values = [s["two"] for s in ds_list] - assert values == [11, 15, 19, 11, 15, 19] - expect_stages( - pipe, - 1, - [ - "ReadParquet->MapBatches()->MapBatches(CallableFn)", - ], - ) - - -def test_optimize_reread_base_data(ray_start_regular_shared, local_path): - context = DataContext.get_current() - context.optimize_fuse_stages = True - context.optimize_fuse_read_stages = True - context.optimize_fuse_shuffle_stages = True - - # Re-read on. - N = 4 - df1 = pd.DataFrame({"one": [1, 2, 3], "two": ["a", "b", "c"]}) - path1 = os.path.join(local_path, "test1.csv") - df1.to_csv(path1, index=False, storage_options={}) - counter = Counter.remote() - source = MySource(counter) - ds1 = ray.data.read_datasource(source, parallelism=1, paths=path1) - pipe = ds1.repeat(N) - pipe.take() - num_reads = ray.get(counter.get.remote()) - assert num_reads == N, num_reads - - @pytest.mark.skip(reason="reusing base data not enabled") @pytest.mark.parametrize("with_shuffle", [True, False]) def test_optimize_lazy_reuse_base_data( diff --git a/python/ray/data/tests/test_pipeline.py b/python/ray/data/tests/test_pipeline.py deleted file mode 100644 index 226ca183e577..000000000000 --- a/python/ray/data/tests/test_pipeline.py +++ /dev/null @@ -1,860 +0,0 @@ -import os -import time -from typing import Tuple - -import numpy as np -import pandas as pd -import pytest - -import ray -from ray.data import dataset -from ray.data.context import OK_PREFIX, WARN_PREFIX, DataContext -from ray.data.dataset import Dataset -from ray.data.dataset_pipeline import DatasetPipeline -from ray.data.tests.util import column_udf, extract_values -from ray.tests.conftest import * # noqa - - -class MockLogger: - def __init__(self): - self.warnings = [] - self.infos = [] - - def warning(self, msg): - if "STRICT_MODE" in msg: - return - self.warnings.append(msg) - print("warning:", msg) - - def info(self, msg): - if "STRICT_MODE" in msg: - return - self.infos.append(msg) - print("info:", msg) - - def debug(self, msg): - if "STRICT_MODE" in msg: - return - print("debug:", msg) - - -def test_warnings(shutdown_only): - ray.init(num_cpus=2) - - # Test parallelism warning. - dataset.logger = MockLogger() - ray.data.range(10, parallelism=10).window(blocks_per_window=1) - print(dataset.logger.warnings) - print(dataset.logger.infos) - assert dataset.logger.warnings == [ - f"{WARN_PREFIX} This pipeline's parallelism is limited by its blocks per " - "window to " - "~1 concurrent tasks per window. To maximize " - "performance, increase the blocks per window to at least 2. This " - "may require increasing the base dataset's parallelism and/or " - "adjusting the windowing parameters." - ] - assert dataset.logger.infos == [ - "Created DatasetPipeline with 10 windows: 8b min, 8b max, 8b mean", - "Blocks per window: 1 min, 1 max, 1 mean", - f"{OK_PREFIX} This pipeline's windows likely fit in object store memory " - "without spilling.", - ] - - try: - res_dict = ray.cluster_resources() - res_dict["object_store_memory"] = 1000 - old = ray.cluster_resources - ray.cluster_resources = lambda: res_dict - - # Test window memory warning. - dataset.logger = MockLogger() - ray.data.range(100000, parallelism=100).window(blocks_per_window=10) - print(dataset.logger.warnings) - print(dataset.logger.infos) - assert dataset.logger.warnings == [ - f"{WARN_PREFIX} This pipeline's windows are ~0.08MiB in size each and " - "may not fit in " - "object store memory without spilling. To improve performance, " - "consider reducing the size of each window to 250b or less." - ] - assert dataset.logger.infos == [ - "Created DatasetPipeline with 10 windows: 0.08MiB min, 0.08MiB max, " - "0.08MiB mean", - "Blocks per window: 10 min, 10 max, 10 mean", - f"{OK_PREFIX} This pipeline's per-window parallelism is high enough " - "to fully " - "utilize the cluster.", - ] - - # Test warning on both. - dataset.logger = MockLogger() - ray.data.range(100000, parallelism=1).window(bytes_per_window=100000) - print(dataset.logger.warnings) - print(dataset.logger.infos) - assert dataset.logger.warnings == [ - f"{WARN_PREFIX} This pipeline's parallelism is limited by its blocks " - "per window " - "to ~1 concurrent tasks per window. To maximize performance, increase " - "the blocks per window to at least 2. This may require increasing the " - "base dataset's parallelism and/or adjusting the windowing parameters.", - f"{WARN_PREFIX} This pipeline's windows are ~0.76MiB in size each and may " - "not fit " - "in object store memory without spilling. To improve performance, " - "consider reducing the size of each window to 250b or less.", - ] - assert dataset.logger.infos == [ - "Created DatasetPipeline with 1 windows: 0.76MiB min, 0.76MiB max, " - "0.76MiB mean", - "Blocks per window: 1 min, 1 max, 1 mean", - ] - finally: - ray.cluster_resources = old - - # Test no warning. - dataset.logger = MockLogger() - ray.data.range(10, parallelism=10).window(blocks_per_window=10) - print(dataset.logger.warnings) - print(dataset.logger.infos) - assert dataset.logger.warnings == [] - assert dataset.logger.infos == [ - "Created DatasetPipeline with 1 windows: 80b min, 80b max, 80b mean", - "Blocks per window: 10 min, 10 max, 10 mean", - f"{OK_PREFIX} This pipeline's per-window parallelism is high enough to fully " - "utilize the cluster.", - f"{OK_PREFIX} This pipeline's windows likely fit in object store memory " - "without spilling.", - ] - - -def test_pipeline_actors(shutdown_only): - ray.init(num_cpus=2, num_gpus=1) - pipe = ( - ray.data.range(3) - .repeat(10) - .map(column_udf("id", lambda x: x + 1)) - .map( - column_udf("id", lambda x: x + 1), - compute=ray.data.ActorPoolStrategy(), - num_gpus=1, - ) - ) - - assert sorted(extract_values("id", pipe.take(999))) == sorted([2, 3, 4] * 10) - - -def test_pipeline_is_parallel(shutdown_only): - ray.init(num_cpus=4) - ds = ray.data.range(10) - - @ray.remote(num_cpus=0) - class ParallelismTracker: - def __init__(self): - self.in_progress = 0 - self.max_in_progress = 0 - - def inc(self): - self.in_progress += 1 - if self.in_progress > self.max_in_progress: - self.max_in_progress = self.in_progress - - def dec(self): - self.in_progress = 0 - - def get_max(self): - return self.max_in_progress - - tracker = ParallelismTracker.remote() - - def sleep(x): - ray.get(tracker.inc.remote()) - time.sleep(0.1) - ray.get(tracker.dec.remote()) - return x - - pipe = ds.window(blocks_per_window=1) - # Shuffle in between to prevent fusion. - pipe = pipe.map(sleep).random_shuffle_each_window().map(sleep) - for i, v in enumerate(pipe.iter_rows()): - print(i, v) - assert ray.get(tracker.get_max.remote()) > 1 - - -def test_window_by_bytes(ray_start_regular_shared): - with pytest.raises(ValueError): - ray.data.range(10).window(blocks_per_window=2, bytes_per_window=2) - - pipe = ray.data.range(10000000, parallelism=100).window(blocks_per_window=2) - assert str(pipe) == "DatasetPipeline(num_windows=50, num_stages=2)" - - pipe = ray.data.range(10000000, parallelism=100).window( - bytes_per_window=10 * 1024 * 1024 - ) - assert str(pipe) == "DatasetPipeline(num_windows=8, num_stages=2)" - dss = list(pipe.iter_datasets()) - assert len(dss) == 8, dss - for ds in dss[:-1]: - assert ds.num_blocks() in [12, 13] - - pipe = ray.data.range(10000000, parallelism=100).window(bytes_per_window=1) - assert str(pipe) == "DatasetPipeline(num_windows=100, num_stages=2)" - for ds in pipe.iter_datasets(): - assert ds.num_blocks() == 1 - - pipe = ray.data.range(10000000, parallelism=100).window(bytes_per_window=1e9) - assert str(pipe) == "DatasetPipeline(num_windows=1, num_stages=2)" - for ds in pipe.iter_datasets(): - assert ds.num_blocks() == 100 - - # Test creating from non-lazy BlockList. - pipe = ( - ray.data.range(10000000, parallelism=100) - .map_batches(lambda x: x) - .window(bytes_per_window=10 * 1024 * 1024) - ) - assert str(pipe) == "DatasetPipeline(num_windows=8, num_stages=1)" - - context = DataContext.get_current() - old = context.optimize_fuse_read_stages - try: - context.optimize_fuse_read_stages = False - dataset = ray.data.range(10).window(bytes_per_window=1) - assert extract_values("id", dataset.take(10)) == list(range(10)) - finally: - context.optimize_fuse_read_stages = old - - -def test_epoch(ray_start_regular_shared): - # Test dataset repeat. - pipe = ( - ray.data.range(5) - .map(column_udf("id", lambda x: x * 2)) - .repeat(3) - .map(column_udf("id", lambda x: x * 2)) - ) - results = [extract_values("id", p.take()) for p in pipe.iter_epochs()] - assert results == [[0, 4, 8, 12, 16], [0, 4, 8, 12, 16], [0, 4, 8, 12, 16]] - - # Test dataset pipeline repeat. - pipe = ray.data.range(3).window(blocks_per_window=2).repeat(3) - results = [extract_values("id", p.take()) for p in pipe.iter_epochs()] - assert results == [[0, 1, 2], [0, 1, 2], [0, 1, 2]] - - # Test max epochs. - pipe = ray.data.range(3).window(blocks_per_window=2).repeat(3) - results = [extract_values("id", p.take()) for p in pipe.iter_epochs(2)] - assert results == [[0, 1, 2], [0, 1, 2]] - - # Test nested repeat. - pipe = ray.data.range(5).repeat(2).repeat(2) - results = [extract_values("id", p.take()) for p in pipe.iter_epochs()] - assert results == [[0, 1, 2, 3, 4, 0, 1, 2, 3, 4], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]] - - # Test preserve_epoch=True. - pipe = ray.data.range(5).repeat(2).rewindow(blocks_per_window=2) - results = [extract_values("id", p.take()) for p in pipe.iter_epochs()] - assert results == [[0, 1, 2, 3, 4], [0, 1, 2, 3, 4]] - - # Test preserve_epoch=False. - pipe = ( - ray.data.range(5).repeat(2).rewindow(blocks_per_window=2, preserve_epoch=False) - ) - results = [extract_values("id", p.take()) for p in pipe.iter_epochs()] - assert results == [[0, 1, 2, 3], [4, 0, 1, 2, 3, 4]] - - -# https://github.com/ray-project/ray/issues/20394 -def test_unused_epoch(ray_start_regular_shared): - pipe = ray.data.from_items([0, 1, 2, 3, 4]).repeat(3).random_shuffle_each_window() - - for i, epoch in enumerate(pipe.iter_epochs()): - print("Epoch", i) - - -def test_cannot_read_twice(ray_start_regular_shared): - ds = ray.data.range(10) - pipe = ds.window(blocks_per_window=1) - assert pipe.count() == 10 - with pytest.raises(RuntimeError): - pipe.count() - with pytest.raises(RuntimeError): - next(pipe.iter_batches()) - with pytest.raises(RuntimeError): - pipe.map(lambda x: x).count() - with pytest.raises(RuntimeError): - pipe.split(2) - - -def test_basic_pipeline(ray_start_regular_shared): - context = DataContext.get_current() - context.optimize_fuse_stages = True - ds = ray.data.range(10, parallelism=10) - - pipe = ds.window(blocks_per_window=1) - assert str(pipe) == "DatasetPipeline(num_windows=10, num_stages=2)" - assert pipe.count() == 10 - pipe = ds.window(blocks_per_window=1) - pipe.show() - - pipe = ds.window(blocks_per_window=1).map(lambda x: x).map(lambda x: x) - assert str(pipe) == "DatasetPipeline(num_windows=10, num_stages=4)" - assert extract_values("id", pipe.take()) == list(range(10)) - - pipe = ( - ds.window(blocks_per_window=1) - .map(lambda x: x) - .flat_map(lambda x: [{"id": x["id"]}, {"id": x["id"] + 1}]) - ) - assert str(pipe) == "DatasetPipeline(num_windows=10, num_stages=4)" - assert pipe.count() == 20 - - pipe = ds.window(blocks_per_window=1).filter(lambda x: x["id"] % 2 == 0) - assert str(pipe) == "DatasetPipeline(num_windows=10, num_stages=3)" - assert pipe.count() == 5 - - pipe = ds.window(blocks_per_window=999) - assert str(pipe) == "DatasetPipeline(num_windows=1, num_stages=2)" - assert pipe.count() == 10 - - pipe = ds.repeat(10) - assert str(pipe) == "DatasetPipeline(num_windows=10, num_stages=2)" - assert pipe.count() == 100 - pipe = ds.repeat(10) - assert pipe.sum() == 450 - pipe = ds.repeat(10) - assert len(pipe.take_all()) == 100 - - -def test_window(ray_start_regular_shared): - context = DataContext.get_current() - context.optimize_fuse_stages = True - ds = ray.data.range(10, parallelism=10) - pipe = ds.window(blocks_per_window=1) - assert str(pipe) == "DatasetPipeline(num_windows=10, num_stages=2)" - pipe = pipe.rewindow(blocks_per_window=3) - assert str(pipe) == "DatasetPipeline(num_windows=None, num_stages=1)" - datasets = list(pipe.iter_datasets()) - assert len(datasets) == 4 - assert extract_values("id", datasets[0].take()) == [0, 1, 2] - assert extract_values("id", datasets[1].take()) == [3, 4, 5] - assert extract_values("id", datasets[2].take()) == [6, 7, 8] - assert extract_values("id", datasets[3].take()) == [9] - - ds = ray.data.range(10, parallelism=10) - pipe = ds.window(blocks_per_window=5) - assert str(pipe) == "DatasetPipeline(num_windows=2, num_stages=2)" - pipe = pipe.rewindow(blocks_per_window=3) - assert str(pipe) == "DatasetPipeline(num_windows=None, num_stages=1)" - datasets = list(pipe.iter_datasets()) - assert len(datasets) == 4 - assert extract_values("id", datasets[0].take()) == [0, 1, 2] - assert extract_values("id", datasets[1].take()) == [3, 4, 5] - assert extract_values("id", datasets[2].take()) == [6, 7, 8] - assert extract_values("id", datasets[3].take()) == [9] - - -def test_repeat(ray_start_regular_shared): - context = DataContext.get_current() - context.optimize_fuse_stages = True - ds = ray.data.range(5, parallelism=5) - pipe = ds.window(blocks_per_window=1) - assert str(pipe) == "DatasetPipeline(num_windows=5, num_stages=2)" - pipe = pipe.repeat(2) - assert str(pipe) == "DatasetPipeline(num_windows=10, num_stages=2)" - assert extract_values("id", pipe.take()) == (list(range(5)) + list(range(5))) - - ds = ray.data.range(5) - pipe = ds.window(blocks_per_window=1) - pipe = pipe.repeat() - assert str(pipe) == "DatasetPipeline(num_windows=inf, num_stages=2)" - assert len(pipe.take(99)) == 99 - - pipe = ray.data.range(5).repeat() - with pytest.raises(ValueError): - pipe.repeat() - - -def test_from_iterable(ray_start_regular_shared): - pipe = DatasetPipeline.from_iterable( - [lambda: ray.data.range(3), lambda: ray.data.range(2)] - ) - assert extract_values("id", pipe.take()) == [0, 1, 2, 0, 1] - - -def test_repeat_forever(ray_start_regular_shared): - context = DataContext.get_current() - context.optimize_fuse_stages = True - ds = ray.data.range(10) - pipe = ds.repeat() - assert str(pipe) == "DatasetPipeline(num_windows=inf, num_stages=2)" - for i, v in enumerate(pipe.iter_rows()): - v = v["id"] - assert v == i % 10, (v, i, i % 10) - if i > 1000: - break - - -def test_repartition(ray_start_regular_shared): - pipe = ray.data.range(10).repeat(10) - assert pipe.repartition_each_window(1).sum() == 450 - pipe = ray.data.range(10).repeat(10) - assert pipe.repartition_each_window(10).sum() == 450 - pipe = ray.data.range(10).repeat(10) - assert pipe.repartition_each_window(100).sum() == 450 - - -def test_iter_batches_basic(ray_start_regular_shared): - pipe = ray.data.range(10, parallelism=10).window(blocks_per_window=2) - batches = list(pipe.iter_batches(batch_size=None)) - assert len(batches) == 10 - assert all(len(e) == 1 for e in batches) - - -def test_to_torch(ray_start_regular_shared): - pipe = ray.data.range(10, parallelism=10).window(blocks_per_window=2).repeat(2) - batches = list(pipe.to_torch(batch_size=None)) - assert len(batches) == 20 - - -def test_to_tf(ray_start_regular_shared): - ds = ray.data.range_tensor(10, shape=(1, 1, 1), parallelism=10) - ds = ds.add_column("label", lambda x: 1) - pipe = ds.window(blocks_per_window=2).repeat(2) - batches = list( - pipe.to_tf(feature_columns="data", label_columns="label", batch_size=None) - ) - assert len(batches) == 20 - - -def test_iter_torch_batches(ray_start_regular_shared): - pipe = ray.data.range(10).repeat(2) - batches = list(pipe.iter_torch_batches(batch_size=1)) - assert len(batches) == 20 - - -def test_iter_tf_batches(ray_start_regular_shared): - pipe = ray.data.range(10).repeat(2) - batches = list(pipe.iter_tf_batches(batch_size=1)) - assert len(batches) == 20 - - -def test_iter_batches_batch_across_windows(ray_start_regular_shared): - # 3 windows, each containing 3 blocks, each containing 3 rows. - pipe = ray.data.range(27, parallelism=9).window(blocks_per_window=3) - # 4-row batches, with batches spanning both blocks and windows. - batches = list(pipe.iter_batches(batch_size=4, batch_format="pandas")) - assert len(batches) == 7, batches - assert all(len(e) == 4 for e in batches[:-1]) - assert len(batches[-1]) == 3 - - -def test_iter_datasets(ray_start_regular_shared): - pipe = ray.data.range(10, parallelism=10).window(blocks_per_window=2) - ds = list(pipe.iter_datasets()) - assert len(ds) == 5 - - pipe = ray.data.range(10, parallelism=10).window(blocks_per_window=5) - ds = list(pipe.iter_datasets()) - assert len(ds) == 2 - - -def test_foreach_window(ray_start_regular_shared): - pipe = ray.data.range(5).window(blocks_per_window=2) - pipe = pipe.foreach_window(lambda ds: ds.map(column_udf("id", lambda x: x * 2))) - assert extract_values("id", pipe.take()) == [0, 2, 4, 6, 8] - - -def test_schema(ray_start_regular_shared): - pipe = ray.data.range(5).window(blocks_per_window=2) - assert pipe.schema().names == ["id"] - - -def test_schema_peek(ray_start_regular_shared): - # Multiple datasets - pipe = ray.data.range(6, parallelism=6).window(blocks_per_window=2) - assert pipe.schema().names == ["id"] - assert pipe._first_dataset is not None - dss = list(pipe.iter_datasets()) - assert len(dss) == 3, dss - assert pipe._first_dataset is None - assert pipe.schema().names == ["id"] - - # Only 1 dataset - pipe = ray.data.range(1).window(blocks_per_window=2) - assert pipe.schema().names == ["id"] - assert pipe._first_dataset is not None - dss = list(pipe.iter_datasets()) - assert len(dss) == 1, dss - assert pipe._first_dataset is None - assert pipe.schema().names == ["id"] - - # Empty datasets - pipe = ( - ray.data.range(6, parallelism=6) - .filter(lambda x: x["id"] < 0) - .window(blocks_per_window=2) - ) - assert pipe.schema() is None - assert pipe._first_dataset is not None - dss = list(pipe.iter_datasets()) - assert len(dss) == 3, dss - assert pipe._first_dataset is None - assert pipe.schema() is None - - -def test_schema_after_repeat(ray_start_regular_shared): - pipe = ray.data.range(6, parallelism=6).window(blocks_per_window=2).repeat(2) - assert pipe.schema().names == ["id"] - output = [] - for ds in pipe.iter_datasets(): - output.extend(extract_values("id", ds.take())) - assert sorted(output) == sorted(list(range(6)) * 2) - - pipe = ray.data.range(6, parallelism=6).window(blocks_per_window=2).repeat(2) - assert pipe.schema().names == ["id"] - # Test that operations still work after peek. - pipe = pipe.map_batches(lambda batch: batch) - output = [] - for ds in pipe.iter_datasets(): - output.extend(extract_values("id", ds.take())) - assert sorted(output) == sorted(list(range(6)) * 2) - - -def test_split(ray_start_regular_shared): - pipe = ray.data.range(3).map(column_udf("id", lambda x: x + 1)).repeat(10) - - @ray.remote(num_cpus=0) - def consume(shard, i): - total = 0 - for row in shard.iter_rows(): - row = row["id"] - total += 1 - assert row == i + 1, row - assert total == 10, total - - shards = pipe.split(3) - refs = [consume.remote(s, i) for i, s in enumerate(shards)] - ray.get(refs) - - -def test_split_at_indices(ray_start_regular_shared): - indices = [2, 5] - n = 8 - pipe = ray.data.range(n).map(column_udf("id", lambda x: x + 1)).repeat(2) - - @ray.remote(num_cpus=0) - def consume(shard, i): - total = 0 - out = [] - for row in shard.iter_rows(): - row = row["id"] - total += 1 - out.append(row) - if i == 0: - assert total == 2 * indices[i] - elif i < len(indices): - assert total == 2 * (indices[i] - indices[i - 1]) - else: - assert total == 2 * (n - indices[i - 1]) - return out - - shards = pipe.split_at_indices(indices) - refs = [consume.remote(s, i) for i, s in enumerate(shards)] - outs = ray.get(refs) - np.testing.assert_equal( - np.array(outs, dtype=object), - np.array([[1, 2, 1, 2], [3, 4, 5, 3, 4, 5], [6, 7, 8, 6, 7, 8]], dtype=object), - ) - - -def _prepare_dataset_to_write(tmp_dir: str) -> Tuple[Dataset, pd.DataFrame]: - df1 = pd.DataFrame({"one": [1, 2, 3], "two": ["a", "b", "c"]}) - df2 = pd.DataFrame({"one": [4, 5, 6], "two": ["e", "f", "g"]}) - df = pd.concat([df1, df2]) - ds = ray.data.from_pandas([df1, df2]) - ds = ds.window(blocks_per_window=1) - os.mkdir(tmp_dir) - ds._set_uuid("data") - return (ds, df) - - -def test_json_write(ray_start_regular_shared, tmp_path): - path = os.path.join(tmp_path, "test_json_dir") - ds, df = _prepare_dataset_to_write(path) - ds.write_json(path) - path1 = os.path.join(path, "data_000000_000000_000000.json") - path2 = os.path.join(path, "data_000001_000000_000000.json") - dfds = pd.concat([pd.read_json(path1, lines=True), pd.read_json(path2, lines=True)]) - assert df.equals(dfds) - - -def test_csv_write(ray_start_regular_shared, tmp_path): - path = os.path.join(tmp_path, "test_csv_dir") - ds, df = _prepare_dataset_to_write(path) - ds.write_csv(path) - path1 = os.path.join(path, "data_000000_000000_000000.csv") - path2 = os.path.join(path, "data_000001_000000_000000.csv") - dfds = pd.concat([pd.read_csv(path1), pd.read_csv(path2)]) - assert df.equals(dfds) - - -def test_parquet_write(ray_start_regular_shared, tmp_path): - path = os.path.join(tmp_path, "test_parquet_dir") - ds, df = _prepare_dataset_to_write(path) - ds.write_parquet(path) - path1 = os.path.join(path, "data_000000_000000_000000.parquet") - path2 = os.path.join(path, "data_000001_000000_000000.parquet") - dfds = pd.concat([pd.read_parquet(path1), pd.read_parquet(path2)]) - assert df.equals(dfds) - - -def test_infinity_of_pipeline(ray_start_regular_shared): - ds = ray.data.range(3) - pipe = ds.repeat() - assert float("inf") == pipe._length - pipe = ds.window(blocks_per_window=2) - assert float("inf") != pipe._length - pipe = ds.repeat(3) - assert float("inf") != pipe._length - assert float("inf") == pipe.repeat()._length - - # ensure infinite length is transitive - pipe = ds.repeat().rewindow(blocks_per_window=2) - assert float("inf") == pipe._length - pipe = ds.repeat().split(2)[0] - assert float("inf") == pipe._length - pipe = ds.repeat().foreach_window(lambda x: x) - assert float("inf") == pipe._length - - -def test_count_sum_on_infinite_pipeline(ray_start_regular_shared): - ds = ray.data.range(3) - - pipe = ds.repeat() - assert float("inf") == pipe._length - with pytest.raises(ValueError): - pipe.count() - - pipe = ds.repeat() - assert float("inf") == pipe._length - with pytest.raises(ValueError): - pipe.sum() - - pipe = ds.repeat(3) - assert 9 == pipe.count() - - pipe = ds.repeat(3) - assert 9 == pipe.sum() - - -def test_sort_each_window(ray_start_regular_shared): - pipe = ( - ray.data.range(12, parallelism=12) - .window(blocks_per_window=3) - .sort_each_window("id") - ) - assert extract_values("id", pipe.take()) == list(range(12)) - - pipe = ( - ray.data.range(12, parallelism=12) - .window(blocks_per_window=3) - .sort_each_window("id", descending=True) - ) - assert extract_values("id", pipe.take()) == [2, 1, 0, 5, 4, 3, 8, 7, 6, 11, 10, 9] - - -def test_randomize_block_order_each_window(ray_start_regular_shared): - pipe = ray.data.range(12).repartition(6).window(blocks_per_window=3) - pipe = pipe.randomize_block_order_each_window(seed=0) - assert extract_values("id", pipe.take()) == [0, 1, 4, 5, 2, 3, 6, 7, 10, 11, 8, 9] - - -def test_add_column(ray_start_regular_shared): - df = pd.DataFrame({"col1": [1, 2, 3]}) - ds = ray.data.from_pandas(df) - pipe = ds.repeat() - assert pipe.add_column("col2", lambda x: x["col1"] + 1).take(1) == [ - {"col1": 1, "col2": 2} - ] - - -def test_select_columns(ray_start_regular_shared): - df = pd.DataFrame({"col1": [1, 2, 3], "col2": [2, 3, 4], "col3": [3, 4, 5]}) - ds = ray.data.from_pandas(df) - pipe = ds.repeat() - assert pipe.select_columns(["col2", "col3"]).take(1) == [{"col2": 2, "col3": 3}] - - -def test_drop_columns(ray_start_regular_shared): - df = pd.DataFrame({"col1": [1, 2, 3], "col2": [2, 3, 4], "col3": [3, 4, 5]}) - ds = ray.data.from_pandas(df) - pipe = ds.repeat() - assert pipe.drop_columns(["col2"]).take(1) == [{"col1": 1, "col3": 3}] - - -def test_random_shuffle_each_window_with_custom_resource(ray_start_cluster): - ray.shutdown() - cluster = ray_start_cluster - # Create two nodes which have different custom resources. - cluster.add_node( - resources={"foo": 100}, - num_cpus=1, - ) - cluster.add_node(resources={"bar": 100}, num_cpus=1) - - ray.init(cluster.address) - - # Run pipeline in "bar" nodes. - pipe = ray.data.read_datasource( - ray.data.datasource.RangeDatasource(), - parallelism=10, - n=1000, - block_format="arrow", - ray_remote_args={"resources": {"bar": 1}}, - ).repeat(3) - pipe = pipe.random_shuffle_each_window(resources={"bar": 1}) - for batch in pipe.iter_batches(): - pass - assert "1 nodes used" in pipe.stats() - assert "2 nodes used" not in pipe.stats() - - -def test_in_place_transformation_doesnt_clear_objects(ray_start_regular_shared): - ds = ray.data.from_items([1, 2, 3, 4, 5, 6]) - - def verify_integrity(p): - # The pipeline's output blocks are from original dataset (i.e. not created - # by the pipeline itself), so those blocks must not be cleared -- verified - # below by re-reading the dataset. - for b in p.iter_batches(): - pass - # Verify the integrity of the blocks of original dataset. - assert extract_values("item", ds.take_all()) == [1, 2, 3, 4, 5, 6] - - verify_integrity(ds.repeat(10).randomize_block_order_each_window()) - verify_integrity( - ds.repeat(10) - .randomize_block_order_each_window() - .randomize_block_order_each_window() - ) - # Mix in-place and non-in place transforms. - verify_integrity( - ds.repeat(10).map_batches(lambda x: x).randomize_block_order_each_window() - ) - verify_integrity( - ds.repeat(10).randomize_block_order_each_window().map_batches(lambda x: x) - ) - - -def test_in_place_transformation_split_doesnt_clear_objects(ray_start_regular_shared): - ds = ray.data.from_items([1, 2, 3, 4, 5, 6], parallelism=3) - - @ray.remote - def consume(p): - for batch in p.iter_batches(): - pass - - def verify_integrity(p): - # Divide 3 blocks ([1, 2], [3, 4] and [5, 6]) into 2 splits equally must - # have one block got splitted. Since the blocks are not created by the - # pipeline (randomize_block_order_each_window() didn't create new - # blocks since it's in-place), so the block splitting will not clear - # the input block -- verified below by re-reading the dataset. - splits = p.split(2, equal=True) - ray.get([consume.remote(p) for p in splits]) - # Verify the integrity of the blocks of original dataset - assert extract_values("item", ds.take_all()) == [1, 2, 3, 4, 5, 6] - - verify_integrity(ds.repeat(10).randomize_block_order_each_window()) - verify_integrity( - ds.repeat(10) - .randomize_block_order_each_window() - .randomize_block_order_each_window() - ) - verify_integrity( - ds.repeat(10).randomize_block_order_each_window().rewindow(blocks_per_window=1) - ) - # Mix in-place and non-in place transforms. - verify_integrity( - ds.repeat(10) - .randomize_block_order_each_window() - .randomize_block_order_each_window() - .map_batches(lambda x: x) - ) - verify_integrity( - ds.repeat(10) - .map_batches(lambda x: x) - .randomize_block_order_each_window() - .randomize_block_order_each_window() - ) - - -def test_pipeline_executor_cannot_serialize_once_started(ray_start_regular_shared): - class Iterable: - def __init__(self, iter): - self._iter = iter - - def __next__(self): - ds = next(self._iter) - return lambda: ds - - p1 = ray.data.range(10).repeat() - # Start the pipeline. - data_iter = p1.iter_datasets() - next(data_iter) - - p2 = DatasetPipeline.from_iterable(Iterable(data_iter)) - with pytest.raises(RuntimeError) as error: - p2.split(2) - assert "PipelineExecutor is not serializable once it has started" in str(error) - - -def test_if_blocks_owned_by_consumer(ray_start_regular_shared): - ds = ray.data.from_items([1, 2, 3, 4, 5, 6], parallelism=3) - assert not ds._plan.execute()._owned_by_consumer - assert not ds.randomize_block_order()._plan.execute()._owned_by_consumer - assert not ds.map_batches(lambda x: x)._plan.execute()._owned_by_consumer - - def verify_blocks(pipe, owned_by_consumer): - for ds in pipe.iter_datasets(): - assert ds._plan.execute()._owned_by_consumer == owned_by_consumer - - verify_blocks(ds.repeat(1), False) - verify_blocks(ds.repeat(1).randomize_block_order_each_window(), False) - verify_blocks(ds.repeat(1).randomize_block_order_each_window().repeat(2), False) - verify_blocks( - ds.repeat(1).randomize_block_order_each_window().map_batches(lambda x: x), True - ) - verify_blocks(ds.repeat(1).map_batches(lambda x: x), True) - verify_blocks(ds.repeat(1).map(column_udf("item", lambda x: x)), True) - verify_blocks(ds.repeat(1).filter(lambda x: x["item"] > 3), True) - verify_blocks(ds.repeat(1).sort_each_window("item"), True) - verify_blocks(ds.repeat(1).random_shuffle_each_window(), True) - verify_blocks(ds.repeat(1).repartition_each_window(2), True) - verify_blocks(ds.repeat(1).rewindow(blocks_per_window=1), False) - verify_blocks(ds.repeat(1).rewindow(blocks_per_window=1).repeat(2), False) - verify_blocks( - ds.repeat(1).map_batches(lambda x: x).rewindow(blocks_per_window=1), True - ) - verify_blocks( - ds.repeat(1).rewindow(blocks_per_window=1).map_batches(lambda x: x), True - ) - - @ray.remote - def consume(pipe, owned_by_consumer): - verify_blocks(pipe, owned_by_consumer) - - splits = ds.repeat(1).split(2) - ray.get([consume.remote(splits[0], False), consume.remote(splits[1], False)]) - - splits = ds.repeat(1).randomize_block_order_each_window().split(2) - ray.get([consume.remote(splits[0], False), consume.remote(splits[1], False)]) - - splits = ds.repeat(1).map_batches(lambda x: x).split(2) - ray.get([consume.remote(splits[0], True), consume.remote(splits[1], True)]) - - -if __name__ == "__main__": - import sys - - sys.exit(pytest.main(["-v", __file__])) diff --git a/python/ray/data/tests/test_pipeline_incremental_take.py b/python/ray/data/tests/test_pipeline_incremental_take.py deleted file mode 100644 index d67b8d83787d..000000000000 --- a/python/ray/data/tests/test_pipeline_incremental_take.py +++ /dev/null @@ -1,32 +0,0 @@ -import time - -import pytest - -import ray -from ray.data.context import DataContext -from ray.tests.conftest import * # noqa - - -def test_incremental_take(shutdown_only): - # TODO(https://github.com/ray-project/ray/issues/31145): re-enable - # after the segfault bug is fixed. - if DataContext.get_current().new_execution_backend: - return - - ray.init(num_cpus=2) - - # Can read incrementally even if future results are delayed. - def block_on_ones(x: int) -> int: - if x == 1: - time.sleep(999999) - return x - - pipe = ray.data.range(2).window(blocks_per_window=1) - pipe = pipe.map(block_on_ones) - assert pipe.take(1) == [0] - - -if __name__ == "__main__": - import sys - - sys.exit(pytest.main(["-v", __file__])) diff --git a/python/ray/data/tests/test_pipeline_nohang.py b/python/ray/data/tests/test_pipeline_nohang.py deleted file mode 100644 index 875207e00aba..000000000000 --- a/python/ray/data/tests/test_pipeline_nohang.py +++ /dev/null @@ -1,32 +0,0 @@ -import pytest - -import ray -from ray.data.tests.util import column_udf, extract_values -from ray.tests.conftest import * # noqa - -NUM_REPEATS = 10 -NUM_TASKS = 10 - - -# This test can be flaky if there is resource deadlock between the pipeline -# stages. Run it a lot to ensure no regressions. -def test_basic_actors(shutdown_only): - ray.init(num_cpus=2) - for _ in range(NUM_REPEATS): - ds = ray.data.range(NUM_TASKS) - ds = ds.window(blocks_per_window=1) - assert sorted( - extract_values( - "id", - ds.map( - column_udf("id", lambda x: x + 1), - compute=ray.data.ActorPoolStrategy(), - ).take(), - ) - ) == list(range(1, NUM_TASKS + 1)) - - -if __name__ == "__main__": - import sys - - sys.exit(pytest.main(["-v", __file__])) diff --git a/python/ray/data/tests/test_stats.py b/python/ray/data/tests/test_stats.py index 66411cb2fa3e..bff134c36b2c 100644 --- a/python/ray/data/tests/test_stats.py +++ b/python/ray/data/tests/test_stats.py @@ -817,354 +817,6 @@ def test_dataset_split_stats(ray_start_regular_shared, tmp_path): ) -def test_dataset_pipeline_stats_basic(ray_start_regular_shared, enable_auto_log_stats): - context = DataContext.get_current() - context.optimize_fuse_stages = True - - if context.new_execution_backend: - if context.use_streaming_executor: - logger = DatasetLogger( - "ray.data._internal.execution.streaming_executor" - ).get_logger( - log_to_stdout=enable_auto_log_stats, - ) - else: - logger = DatasetLogger( - "ray.data._internal.execution.bulk_executor" - ).get_logger( - log_to_stdout=enable_auto_log_stats, - ) - else: - logger = DatasetLogger("ray.data._internal.plan").get_logger( - log_to_stdout=enable_auto_log_stats, - ) - - with patch.object(logger, "info") as mock_logger: - ds = ray.data.range(1000, parallelism=10) - ds = ds.map_batches(dummy_map_batches).materialize() - - if enable_auto_log_stats: - logger_args, logger_kwargs = mock_logger.call_args - - if context.new_execution_backend: - assert ( - canonicalize(logger_args[0]) - == f"""Stage N ReadRange->MapBatches(dummy_map_batches): N/N blocks executed in T -* Remote wall time: T min, T max, T mean, T total -* Remote cpu time: T min, T max, T mean, T total -* Peak heap memory usage (MiB): N min, N max, N mean -* Output num rows: N min, N max, N mean, N total -* Output size bytes: N min, N max, N mean, N total -* Tasks per node: N min, N max, N mean; N nodes used -* Extra metrics: {STANDARD_EXTRA_METRICS} -""" - ) - else: - assert ( - canonicalize(logger_args[0]) - == """Stage N Read->MapBatches(dummy_map_batches): N/N blocks executed in T -* Remote wall time: T min, T max, T mean, T total -* Remote cpu time: T min, T max, T mean, T total -* Peak heap memory usage (MiB): N min, N max, N mean -* Output num rows: N min, N max, N mean, N total -* Output size bytes: N min, N max, N mean, N total -* Tasks per node: N min, N max, N mean; N nodes used -""" - ) - - pipe = ds.repeat(5) - pipe = pipe.map(dummy_map_batches) - if enable_auto_log_stats: - # Stats only include first stage, and not for pipelined map - logger_args, logger_kwargs = mock_logger.call_args - if context.new_execution_backend: - assert ( - canonicalize(logger_args[0]) - == f"""Stage N ReadRange->MapBatches(dummy_map_batches): N/N blocks executed in T -* Remote wall time: T min, T max, T mean, T total -* Remote cpu time: T min, T max, T mean, T total -* Peak heap memory usage (MiB): N min, N max, N mean -* Output num rows: N min, N max, N mean, N total -* Output size bytes: N min, N max, N mean, N total -* Tasks per node: N min, N max, N mean; N nodes used -* Extra metrics: {STANDARD_EXTRA_METRICS} -""" - ) - else: - assert ( - canonicalize(logger_args[0]) - == """Stage N Read->MapBatches(dummy_map_batches): N/N blocks executed in T -* Remote wall time: T min, T max, T mean, T total -* Remote cpu time: T min, T max, T mean, T total -* Peak heap memory usage (MiB): N min, N max, N mean -* Output num rows: N min, N max, N mean, N total -* Output size bytes: N min, N max, N mean, N total -* Tasks per node: N min, N max, N mean; N nodes used -""" - ) - - stats = canonicalize(pipe.stats()) - assert "No stats available" in stats, stats - for batch in pipe.iter_batches(): - pass - - if enable_auto_log_stats: - # Now stats include the pipelined map stage - logger_args, logger_kwargs = mock_logger.call_args - if context.new_execution_backend: - assert ( - canonicalize(logger_args[0]) - == f"""Stage N Map: N/N blocks executed in T -* Remote wall time: T min, T max, T mean, T total -* Remote cpu time: T min, T max, T mean, T total -* Peak heap memory usage (MiB): N min, N max, N mean -* Output num rows: N min, N max, N mean, N total -* Output size bytes: N min, N max, N mean, N total -* Tasks per node: N min, N max, N mean; N nodes used -* Extra metrics: {STANDARD_EXTRA_METRICS} -""" - ) - else: - assert ( - canonicalize(logger_args[0]) - == """Stage N Map: N/N blocks executed in T -* Remote wall time: T min, T max, T mean, T total -* Remote cpu time: T min, T max, T mean, T total -* Peak heap memory usage (MiB): N min, N max, N mean -* Output num rows: N min, N max, N mean, N total -* Output size bytes: N min, N max, N mean, N total -* Tasks per node: N min, N max, N mean; N nodes used -""" - ) - - stats = canonicalize(pipe.stats()) - if context.new_execution_backend: - assert ( - stats - == f"""== Pipeline Window N == -Stage N ReadRange->MapBatches(dummy_map_batches): N/N blocks executed in T -* Remote wall time: T min, T max, T mean, T total -* Remote cpu time: T min, T max, T mean, T total -* Peak heap memory usage (MiB): N min, N max, N mean -* Output num rows: N min, N max, N mean, N total -* Output size bytes: N min, N max, N mean, N total -* Tasks per node: N min, N max, N mean; N nodes used -* Extra metrics: {STANDARD_EXTRA_METRICS} - -Stage N Map: N/N blocks executed in T -* Remote wall time: T min, T max, T mean, T total -* Remote cpu time: T min, T max, T mean, T total -* Peak heap memory usage (MiB): N min, N max, N mean -* Output num rows: N min, N max, N mean, N total -* Output size bytes: N min, N max, N mean, N total -* Tasks per node: N min, N max, N mean; N nodes used -* Extra metrics: {STANDARD_EXTRA_METRICS} - -== Pipeline Window N == -Stage N ReadRange->MapBatches(dummy_map_batches): [execution cached] -* Extra metrics: {STANDARD_EXTRA_METRICS} - -Stage N Map: N/N blocks executed in T -* Remote wall time: T min, T max, T mean, T total -* Remote cpu time: T min, T max, T mean, T total -* Peak heap memory usage (MiB): N min, N max, N mean -* Output num rows: N min, N max, N mean, N total -* Output size bytes: N min, N max, N mean, N total -* Tasks per node: N min, N max, N mean; N nodes used -* Extra metrics: {STANDARD_EXTRA_METRICS} - -== Pipeline Window N == -Stage N ReadRange->MapBatches(dummy_map_batches): [execution cached] -* Extra metrics: {STANDARD_EXTRA_METRICS} - -Stage N Map: N/N blocks executed in T -* Remote wall time: T min, T max, T mean, T total -* Remote cpu time: T min, T max, T mean, T total -* Peak heap memory usage (MiB): N min, N max, N mean -* Output num rows: N min, N max, N mean, N total -* Output size bytes: N min, N max, N mean, N total -* Tasks per node: N min, N max, N mean; N nodes used -* Extra metrics: {STANDARD_EXTRA_METRICS} - -##### Overall Pipeline Time Breakdown ##### -* Time stalled waiting for next dataset: T min, T max, T mean, T total - -DatasetPipeline iterator time breakdown: -* Waiting for next dataset: T -* In ray.wait(): T -* In ray.get(): T -* In next_batch(): T -* In format_batch(): T -* In user code: T -* Total time: T -""" - ) - else: - assert ( - stats - == """== Pipeline Window N == -Stage N Read->MapBatches(dummy_map_batches): N/N blocks executed in T -* Remote wall time: T min, T max, T mean, T total -* Remote cpu time: T min, T max, T mean, T total -* Peak heap memory usage (MiB): N min, N max, N mean -* Output num rows: N min, N max, N mean, N total -* Output size bytes: N min, N max, N mean, N total -* Tasks per node: N min, N max, N mean; N nodes used - -Stage N Map: N/N blocks executed in T -* Remote wall time: T min, T max, T mean, T total -* Remote cpu time: T min, T max, T mean, T total -* Peak heap memory usage (MiB): N min, N max, N mean -* Output num rows: N min, N max, N mean, N total -* Output size bytes: N min, N max, N mean, N total -* Tasks per node: N min, N max, N mean; N nodes used - -== Pipeline Window N == -Stage N Read->MapBatches(dummy_map_batches): [execution cached] - -Stage N Map: N/N blocks executed in T -* Remote wall time: T min, T max, T mean, T total -* Remote cpu time: T min, T max, T mean, T total -* Peak heap memory usage (MiB): N min, N max, N mean -* Output num rows: N min, N max, N mean, N total -* Output size bytes: N min, N max, N mean, N total -* Tasks per node: N min, N max, N mean; N nodes used - -== Pipeline Window N == -Stage N Read->MapBatches(dummy_map_batches): [execution cached] - -Stage N Map: N/N blocks executed in T -* Remote wall time: T min, T max, T mean, T total -* Remote cpu time: T min, T max, T mean, T total -* Peak heap memory usage (MiB): N min, N max, N mean -* Output num rows: N min, N max, N mean, N total -* Output size bytes: N min, N max, N mean, N total -* Tasks per node: N min, N max, N mean; N nodes used - -##### Overall Pipeline Time Breakdown ##### -* Time stalled waiting for next dataset: T min, T max, T mean, T total - -DatasetPipeline iterator time breakdown: -* Waiting for next dataset: T -* In ray.wait(): T -* In ray.get(): T -* In next_batch(): T -* In format_batch(): T -* In user code: T -* Total time: T -""" - ) - - -def test_dataset_pipeline_cache_cases(ray_start_regular_shared): - # NOT CACHED (lazy read stage). - ds = ray.data.range(10).repeat(2).map_batches(lambda x: x) - ds.take(999) - stats = ds.stats() - assert "[execution cached]" not in stats - - # CACHED (called cache()). - ds = ray.data.range(10).materialize().repeat(2).map_batches(lambda x: x) - ds.take(999) - stats = ds.stats() - print("STATS", stats) - assert "[execution cached]" in stats - - # CACHED (eager map stage). - ds = ray.data.range(10).map_batches(dummy_map_batches).repeat(2) - ds.take(999) - stats = ds.stats() - assert "[execution cached]" in stats - assert "ReadRange->MapBatches(dummy_map_batches)" in stats - - -def test_dataset_pipeline_split_stats_basic(ray_start_regular_shared): - context = DataContext.get_current() - context.optimize_fuse_stages = True - ds = ray.data.range(1000, parallelism=10) - pipe = ds.repeat(2) - - @ray.remote - def consume(split): - for batch in split.iter_batches(): - pass - return split.stats() - - s0, s1 = pipe.split(2) - stats = ray.get([consume.remote(s0), consume.remote(s1)]) - if context.new_execution_backend: - print("XXX stats:", canonicalize(stats[0])) - assert ( - canonicalize(stats[0]) - == f"""== Pipeline Window Z == -Stage N ReadRange: N/N blocks executed in T -* Remote wall time: T min, T max, T mean, T total -* Remote cpu time: T min, T max, T mean, T total -* Peak heap memory usage (MiB): N min, N max, N mean -* Output num rows: N min, N max, N mean, N total -* Output size bytes: N min, N max, N mean, N total -* Tasks per node: N min, N max, N mean; N nodes used -* Extra metrics: {STANDARD_EXTRA_METRICS} - -== Pipeline Window N == -Stage N ReadRange: N/N blocks executed in T -* Remote wall time: T min, T max, T mean, T total -* Remote cpu time: T min, T max, T mean, T total -* Peak heap memory usage (MiB): N min, N max, N mean -* Output num rows: N min, N max, N mean, N total -* Output size bytes: N min, N max, N mean, N total -* Tasks per node: N min, N max, N mean; N nodes used -* Extra metrics: {STANDARD_EXTRA_METRICS} - -##### Overall Pipeline Time Breakdown ##### -* Time stalled waiting for next dataset: T min, T max, T mean, T total - -DatasetPipeline iterator time breakdown: -* Waiting for next dataset: T -* In ray.wait(): T -* In ray.get(): T -* In next_batch(): T -* In format_batch(): T -* In user code: T -* Total time: T -""" - ) - else: - assert ( - canonicalize(stats[0]) - == """== Pipeline Window Z == -Stage N Read: N/N blocks executed in T -* Remote wall time: T min, T max, T mean, T total -* Remote cpu time: T min, T max, T mean, T total -* Peak heap memory usage (MiB): N min, N max, N mean -* Output num rows: N min, N max, N mean, N total -* Output size bytes: N min, N max, N mean, N total -* Tasks per node: N min, N max, N mean; N nodes used - -== Pipeline Window N == -Stage N Read: N/N blocks executed in T -* Remote wall time: T min, T max, T mean, T total -* Remote cpu time: T min, T max, T mean, T total -* Peak heap memory usage (MiB): N min, N max, N mean -* Output num rows: N min, N max, N mean, N total -* Output size bytes: N min, N max, N mean, N total -* Tasks per node: N min, N max, N mean; N nodes used - -##### Overall Pipeline Time Breakdown ##### -* Time stalled waiting for next dataset: T min, T max, T mean, T total - -DatasetPipeline iterator time breakdown: -* Waiting for next dataset: T -* In ray.wait(): T -* In ray.get(): T -* In next_batch(): T -* In format_batch(): T -* In user code: T -* Total time: T -""" - ) - - def test_calculate_blocks_stats(ray_start_regular_shared, stage_two_block): context = DataContext.get_current() context.optimize_fuse_stages = True diff --git a/python/ray/data/tests/test_tf.py b/python/ray/data/tests/test_tf.py index 5f02a30edee5..39b744586119 100644 --- a/python/ray/data/tests/test_tf.py +++ b/python/ray/data/tests/test_tf.py @@ -118,21 +118,6 @@ def test_element_spec_shape_with_tensors(self): assert tuple(features.shape) == (4, 3, 32, 32) assert tuple(labels.shape) == (4,) - def test_element_spec_pipeline(self): - ds = ray.data.from_items( - 8 * [{"spam": np.zeros([3, 32, 32]), "ham": 0}] - ).repeat(2) - - dataset = ds.to_tf(feature_columns="spam", label_columns="ham", batch_size=4) - - feature_spec, label_spec = dataset.element_spec - assert tuple(feature_spec.shape) == (None, 3, 32, 32) - assert tuple(label_spec.shape) == (None,) - - features, labels = next(iter(dataset)) - assert tuple(features.shape) == (4, 3, 32, 32) - assert tuple(labels.shape) == (4,) - @pytest.mark.parametrize("batch_size", [1, 2]) def test_element_spec_shape_with_ragged_tensors(self, batch_size): df = pd.DataFrame(