Skip to content

Commit

Permalink
[Data] Remove DatasetPipeline unit tests (ray-project#40133)
Browse files Browse the repository at this point in the history
We're deprecating DatasetPipeline in Ray 2.8. This PR removes the unit tests.

---------

Signed-off-by: Balaji Veeramani <[email protected]>
  • Loading branch information
bveeramani authored Oct 12, 2023
1 parent 01e786f commit c8e2fde
Show file tree
Hide file tree
Showing 13 changed files with 42 additions and 2,004 deletions.
24 changes: 0 additions & 24 deletions python/ray/data/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -249,14 +249,6 @@ py_test(
deps = ["//:ray_lib", ":conftest"],
)

py_test(
name = "test_pipeline",
size = "medium",
srcs = ["tests/test_pipeline.py"],
tags = ["team:data", "exclusive"],
deps = ["//:ray_lib", ":conftest"],
)

py_test(
name = "test_tensor",
size = "small",
Expand Down Expand Up @@ -386,22 +378,6 @@ py_test(
deps = ["//:ray_lib", ":conftest"],
)

py_test(
name = "test_pipeline_incremental_take",
size = "small",
srcs = ["tests/test_pipeline_incremental_take.py"],
tags = ["team:data", "exclusive"],
deps = ["//:ray_lib", ":conftest"],
)

py_test(
name = "test_pipeline_nohang",
size = "medium",
srcs = ["tests/test_pipeline_nohang.py"],
tags = ["team:data", "exclusive"],
deps = ["//:ray_lib", ":conftest"],
)

py_test(
name = "test_progress_bar",
size = "small",
Expand Down
31 changes: 0 additions & 31 deletions python/ray/data/tests/preprocessors/test_chain.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,37 +83,6 @@ def udf(df):
assert pred_out_df.equals(pred_expected_df)


def test_chain_pipeline():
"""Tests Chain functionality with DatasetPipeline."""

col_a = [-1, -1, -1, -1]
col_b = [1, 1, 1, 1]
in_df = pd.DataFrame.from_dict({"A": col_a, "B": col_b})
ds = ray.data.from_pandas(in_df).repeat(1)

def udf(df):
df["A"] *= 2
return df

def udf2(df):
df["B"] *= 2
return df

batch_mapper = BatchMapper(fn=udf, batch_format="pandas")
batch_mapper2 = BatchMapper(fn=udf2, batch_format="pandas")
chain = Chain(batch_mapper, batch_mapper2)

transformed = chain._transform_pipeline(ds)
out_df = next(transformed.iter_batches(batch_format="pandas", batch_size=4))

processed_col_a = [-2, -2, -2, -2]
processed_col_b = [2, 2, 2, 2]

expected_df = pd.DataFrame({"A": processed_col_a, "B": processed_col_b})

assert out_df.equals(expected_df)


def test_nested_chain_state():
col_a = [-1, -1, 1, 1]
col_b = [1, 1, 1, None]
Expand Down
57 changes: 9 additions & 48 deletions python/ray/data/tests/preprocessors/test_preprocessors.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,15 +167,7 @@ def test_fit_twice(mocked_warn):
mocked_warn.assert_called_once_with(msg)


def _apply_transform(preprocessor, ds):
if isinstance(ds, ray.data.DatasetPipeline):
return preprocessor._transform_pipeline(ds)
else:
return preprocessor.transform(ds)


@pytest.mark.parametrize("pipeline", [True, False])
def test_transform_config(pipeline):
def test_transform_config():
"""Tests that the transform_config of
the Preprocessor is respected during transform."""

Expand All @@ -201,36 +193,11 @@ def _determine_transform_to_use(self):

prep = DummyPreprocessor()
ds = ray.data.from_pandas(pd.DataFrame({"value": list(range(4))}))
if pipeline:
ds = ds.window(blocks_per_window=1).repeat()
_apply_transform(prep, ds)


def test_pipeline_fail():
ds = ray.data.range(5).window(blocks_per_window=1).repeat(1)

class FittablePreprocessor(Preprocessor):
_is_fittable = True
prep.transform(ds)

def _fit(self, dataset):
self.fitted_ = True
return self

def _transform_numpy(data):
return data

prep = FittablePreprocessor()
with pytest.raises(RuntimeError):
_apply_transform(prep, ds)

# Does not fail if preprocessor is already fitted.
fitted_prep = prep.fit(ds)
_apply_transform(fitted_prep, ds)


@pytest.mark.parametrize("pipeline", [True, False])
@pytest.mark.parametrize("dataset_format", ["simple", "pandas", "arrow"])
def test_transform_all_formats(create_dummy_preprocessors, pipeline, dataset_format):
def test_transform_all_formats(create_dummy_preprocessors, dataset_format):
(
with_nothing,
with_pandas,
Expand All @@ -250,38 +217,32 @@ def test_transform_all_formats(create_dummy_preprocessors, pipeline, dataset_for
else:
raise ValueError(f"Untested dataset_format configuration: {dataset_format}.")

if pipeline:
ds = ds.window(blocks_per_window=1).repeat(1)

with pytest.raises(NotImplementedError):
_apply_transform(with_nothing, ds)
with_nothing.transform(ds)

if pipeline:
patcher = patch.object(ray.data.dataset_pipeline.DatasetPipeline, "map_batches")
else:
patcher = patch.object(ray.data.dataset.Dataset, "map_batches")
patcher = patch.object(ray.data.dataset.Dataset, "map_batches")

with patcher as mock_map_batches:
_apply_transform(with_pandas, ds)
with_pandas.transform(ds)
mock_map_batches.assert_called_once_with(
with_pandas._transform_pandas, batch_format=BatchFormat.PANDAS
)

with patcher as mock_map_batches:
_apply_transform(with_numpy, ds)
with_numpy.transform(ds)
mock_map_batches.assert_called_once_with(
with_numpy._transform_numpy, batch_format=BatchFormat.NUMPY
)

# Pandas preferred by default.
with patcher as mock_map_batches:
_apply_transform(with_pandas_and_numpy, ds)
with_pandas_and_numpy.transform(ds)
mock_map_batches.assert_called_once_with(
with_pandas_and_numpy._transform_pandas, batch_format=BatchFormat.PANDAS
)

with patcher as mock_map_batches:
_apply_transform(with_pandas_and_numpy_preferred, ds)
with_pandas_and_numpy_preferred.transform(ds)
mock_map_batches.assert_called_once_with(
with_pandas_and_numpy_preferred._transform_numpy, batch_format=BatchFormat.NUMPY
)
Expand Down
21 changes: 2 additions & 19 deletions python/ray/data/tests/test_context_propagation.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from ray.data.block import BlockMetadata
from ray.data.context import DataContext
from ray.data.datasource import Datasource, ReadTask
from ray.data.tests.util import extract_values
from ray.tests.conftest import * # noqa


Expand Down Expand Up @@ -64,20 +63,6 @@ def test_map(ray_start_regular_shared):
assert ds.take_all()[0]["id"] == 70001


def test_map_pipeline(ray_start_regular_shared):
context = DataContext.get_current()
context.foo = 8
pipe = ray.data.range(2).repeat(2)
pipe = pipe.map(lambda x: {"id": DataContext.get_current().foo})
[a, b] = pipe.split(2)

@ray.remote
def fetch(shard):
return extract_values("id", shard.take_all())

assert ray.get([fetch.remote(a), fetch.remote(b)]) == [[8, 8], [8, 8]]


def test_flat_map(ray_start_regular_shared):
context = DataContext.get_current()
context.foo = 70002
Expand Down Expand Up @@ -123,10 +108,8 @@ def test_context_placement_group():
)
ray.get(placement_group.ready())
context.scheduling_strategy = PlacementGroupSchedulingStrategy(placement_group)
pipe = ray.data.range(100, parallelism=2) \
.window(blocks_per_window=1) \
.map(lambda x: {"id": x["id"] + 1})
assert pipe.take_all() == [{"id": x} for x in range(1, 101)]
ds = ray.data.range(100, parallelism=2).map(lambda x: {"id": x["id"] + 1})
assert ds.take_all() == [{"id": x} for x in range(1, 101)]
placement_group_assert_no_leak([placement_group])
ray.shutdown()
"""
Expand Down
29 changes: 0 additions & 29 deletions python/ray/data/tests/test_dynamic_block_split.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,35 +175,6 @@ def test_dataset(
assert len(batch["one"]) == 10


def test_dataset_pipeline(ray_start_regular_shared, target_max_block_size):
# Test 10 tasks, each task returning 10 blocks, each block has 1 row and each
# row has 1024 bytes.
num_blocks_per_task = 10
block_size = 1024
num_tasks = 10

ds = ray.data.read_datasource(
RandomBytesDatasource(),
parallelism=num_tasks,
num_blocks_per_task=num_blocks_per_task,
block_size=block_size,
)
dsp = ds.window(blocks_per_window=2)
assert dsp._length == num_tasks / 2

dsp = dsp.map_batches(lambda x: x)
result_batches = list(ds.iter_batches(batch_size=5))
for batch in result_batches:
assert len(batch["one"]) == 5
assert len(result_batches) == num_blocks_per_task * num_tasks / 5

dsp = ds.window(blocks_per_window=2)
assert dsp._length == num_tasks / 2

dsp = ds.repeat().map_batches(lambda x: x)
assert len(dsp.take(5)) == 5


def test_filter(ray_start_regular_shared, target_max_block_size):
# Test 10 tasks, each task returning 10 blocks, each block has 1 row and each
# row has 1024 bytes.
Expand Down
85 changes: 0 additions & 85 deletions python/ray/data/tests/test_iterator.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,32 +85,6 @@ def test_basic_dataset_iter_rows(ray_start_regular_shared):
# assert it.stats() == ds.stats()


def test_basic_dataset_pipeline(ray_start_regular_shared):
ds = ray.data.range(100).window(bytes_per_window=1).repeat()
it = ds.iterator()
for _ in range(2):
result = []
for batch in it.iter_batches():
batch = batch["id"].tolist()
result += batch
assert result == list(range(100))

assert it.stats() == ds.stats()


def test_basic_dataset_pipeline_iter_rows(ray_start_regular_shared):
ds = ray.data.range(100).window(bytes_per_window=1).repeat()
it = ds.iterator()
for _ in range(2):
result = []
for row in it.iter_rows():
row = row["id"]
result.append(row)
assert result == list(range(100))

assert it.stats() == ds.stats()


def test_tf_conversion(ray_start_regular_shared):
ds = ray.data.range(5)
it = ds.iterator()
Expand All @@ -129,45 +103,6 @@ def test_tf_e2e(ray_start_regular_shared):
model.fit(it.to_tf("id", "id"), epochs=3)


def test_tf_e2e_pipeline(ray_start_regular_shared):
ds = ray.data.range(5).repeat(2)
it = ds.iterator()
model = build_model()
model.fit(it.to_tf("id", "id"), epochs=2)

ds = ray.data.range(5).repeat(2)
it = ds.iterator()
model = build_model()
# 3 epochs fails since we only repeated twice.
with pytest.raises(Exception, match=r"generator raised StopIteration"):
model.fit(it.to_tf("id", "id"), epochs=3)


def test_tf_conversion_pipeline(ray_start_regular_shared):
ds = ray.data.range(5).repeat(2)
it = ds.iterator()
tf_dataset = it.to_tf("id", "id")
for i, row in enumerate(tf_dataset):
assert all(row[0] == i)
assert all(row[1] == i)
assert isinstance(row[0], tf.Tensor)
assert isinstance(row[1], tf.Tensor)

# Repeated twice.
tf_dataset = it.to_tf("id", "id")
for i, row in enumerate(tf_dataset):
assert all(row[0] == i)
assert all(row[1] == i)
assert isinstance(row[0], tf.Tensor)
assert isinstance(row[1], tf.Tensor)

# Fails on third try.
with pytest.raises(Exception, match=r"generator raised StopIteration"):
tf_dataset = it.to_tf("id", "id")
for _ in tf_dataset:
pass


def test_torch_conversion(ray_start_regular_shared):
ds = ray.data.range(5)
it = ds.iterator()
Expand Down Expand Up @@ -200,26 +135,6 @@ def test_torch_multi_use_iterator(ray_start_regular_shared):
assert batch["id"].tolist() == list(range(5))


def test_torch_conversion_pipeline(ray_start_regular_shared):
ds = ray.data.range(5).repeat(2)
it = ds.iterator()

# First epoch.
for batch in it.iter_torch_batches():
assert isinstance(batch["id"], torch.Tensor)
assert batch["id"].tolist() == list(range(5))

# Second epoch.
for batch in it.iter_torch_batches():
assert isinstance(batch["id"], torch.Tensor)
assert batch["id"].tolist() == list(range(5))

# Fails on third iteration.
with pytest.raises(Exception, match=r"generator raised StopIteration"):
for batch in it.iter_torch_batches():
pass


def test_torch_conversion_collate_fn(ray_start_regular_shared):
def collate_fn(batch: Dict[str, np.ndarray]):
return torch.as_tensor(batch["id"] + 5)
Expand Down
Loading

0 comments on commit c8e2fde

Please sign in to comment.