diff --git a/test/src/unit-filter-pipeline.cc b/test/src/unit-filter-pipeline.cc index 6e20ce2cb97..107385645c5 100644 --- a/test/src/unit-filter-pipeline.cc +++ b/test/src/unit-filter-pipeline.cc @@ -136,489 +136,6 @@ void run_reverse( .ok()); } -/** - * Simple filter that modifies the input stream by adding 1 to every input - * element. - */ -class Add1InPlace : public tiledb::sm::Filter { - public: - // Just use a dummy filter type - Add1InPlace(Datatype filter_data_type) - : Filter(FilterType::FILTER_NONE, filter_data_type) { - } - - void dump(FILE* out) const override { - (void)out; - } - - Status run_forward( - const WriterTile&, - WriterTile* const, - FilterBuffer* input_metadata, - FilterBuffer* input, - FilterBuffer* output_metadata, - FilterBuffer* output) const override { - auto input_size = input->size(); - RETURN_NOT_OK(output->append_view(input)); - output->reset_offset(); - - uint64_t nelts = input_size / sizeof(uint64_t); - for (uint64_t i = 0; i < nelts; i++) { - uint64_t* val = output->value_ptr(); - *val += 1; - output->advance_offset(sizeof(uint64_t)); - } - - // Metadata not modified by this filter. - RETURN_NOT_OK(output_metadata->append_view(input_metadata)); - - return Status::Ok(); - } - - Status run_reverse( - const Tile&, - Tile*, - FilterBuffer* input_metadata, - FilterBuffer* input, - FilterBuffer* output_metadata, - FilterBuffer* output, - const tiledb::sm::Config& config) const override { - (void)config; - - auto input_size = input->size(); - RETURN_NOT_OK(output->append_view(input)); - output->reset_offset(); - - uint64_t nelts = input_size / sizeof(uint64_t); - for (uint64_t i = 0; i < nelts; i++) { - uint64_t* val = output->value_ptr(); - *val -= 1; - output->advance_offset(sizeof(uint64_t)); - } - - // Metadata not modified by this filter. - RETURN_NOT_OK(output_metadata->append_view(input_metadata)); - - return Status::Ok(); - } - - Add1InPlace* clone_impl() const override { - return new Add1InPlace(filter_data_type_); - } -}; - -/** - * Simple filter that increments every element of the input stream, writing the - * output to a new buffer. Does not modify the input stream. - */ -class Add1OutOfPlace : public tiledb::sm::Filter { - public: - // Just use a dummy filter type - Add1OutOfPlace(Datatype filter_data_type) - : Filter(FilterType::FILTER_NONE, filter_data_type) { - } - - void dump(FILE* out) const override { - (void)out; - } - - Status run_forward( - const WriterTile&, - WriterTile* const, - FilterBuffer* input_metadata, - FilterBuffer* input, - FilterBuffer* output_metadata, - FilterBuffer* output) const override { - auto input_size = input->size(); - auto nelts = input_size / sizeof(uint64_t); - - // Add a new output buffer. - RETURN_NOT_OK(output->prepend_buffer(input_size)); - output->reset_offset(); - - for (uint64_t i = 0; i < nelts; i++) { - uint64_t inc; - RETURN_NOT_OK(input->read(&inc, sizeof(uint64_t))); - inc++; - RETURN_NOT_OK(output->write(&inc, sizeof(uint64_t))); - } - - // Finish any remaining bytes to ensure no data loss. - auto rem = input_size % sizeof(uint64_t); - for (unsigned i = 0; i < rem; i++) { - char byte; - RETURN_NOT_OK(input->read(&byte, sizeof(char))); - RETURN_NOT_OK(output->write(&byte, sizeof(char))); - } - - // Metadata not modified by this filter. - RETURN_NOT_OK(output_metadata->append_view(input_metadata)); - - return Status::Ok(); - } - - Status run_reverse( - const Tile&, - Tile*, - FilterBuffer* input_metadata, - FilterBuffer* input, - FilterBuffer* output_metadata, - FilterBuffer* output, - const tiledb::sm::Config& config) const override { - (void)config; - - auto input_size = input->size(); - auto nelts = input->size() / sizeof(uint64_t); - - // Add a new output buffer. - RETURN_NOT_OK(output->prepend_buffer(input->size())); - output->reset_offset(); - - for (uint64_t i = 0; i < nelts; i++) { - uint64_t inc; - RETURN_NOT_OK(input->read(&inc, sizeof(uint64_t))); - inc--; - RETURN_NOT_OK(output->write(&inc, sizeof(uint64_t))); - } - - auto rem = input_size % sizeof(uint64_t); - for (unsigned i = 0; i < rem; i++) { - char byte; - RETURN_NOT_OK(input->read(&byte, sizeof(char))); - RETURN_NOT_OK(output->write(&byte, sizeof(char))); - } - - // Metadata not modified by this filter. - RETURN_NOT_OK(output_metadata->append_view(input_metadata)); - - return Status::Ok(); - } - - Add1OutOfPlace* clone_impl() const override { - return new Add1OutOfPlace(filter_data_type_); - } -}; - -/** - * Simple filter which computes the sum of its input and prepends the sum - * to the output. In reverse execute, checks that the sum is correct. - */ -class PseudoChecksumFilter : public tiledb::sm::Filter { - public: - // Just use a dummy filter type - PseudoChecksumFilter(Datatype filter_data_type) - : Filter(FilterType::FILTER_NONE, filter_data_type) { - } - - void dump(FILE* out) const override { - (void)out; - } - - Status run_forward( - const WriterTile&, - WriterTile* const, - FilterBuffer* input_metadata, - FilterBuffer* input, - FilterBuffer* output_metadata, - FilterBuffer* output) const override { - auto input_size = input->size(); - auto nelts = input_size / sizeof(uint64_t); - - // The input is unmodified by this filter. - RETURN_NOT_OK(output->append_view(input)); - - // Forward the existing metadata and prepend a metadata buffer for the - // checksum. - RETURN_NOT_OK(output_metadata->append_view(input_metadata)); - RETURN_NOT_OK(output_metadata->prepend_buffer(sizeof(uint64_t))); - output_metadata->reset_offset(); - - uint64_t sum = 0; - for (uint64_t i = 0; i < nelts; i++) { - uint64_t val; - RETURN_NOT_OK(input->read(&val, sizeof(uint64_t))); - sum += val; - } - - RETURN_NOT_OK(output_metadata->write(&sum, sizeof(uint64_t))); - - return Status::Ok(); - } - - Status run_reverse( - const Tile&, - Tile*, - FilterBuffer* input_metadata, - FilterBuffer* input, - FilterBuffer* output_metadata, - FilterBuffer* output, - const tiledb::sm::Config& config) const override { - (void)config; - - auto input_size = input->size(); - auto nelts = input_size / sizeof(uint64_t); - - uint64_t input_sum; - RETURN_NOT_OK(input_metadata->read(&input_sum, sizeof(uint64_t))); - - uint64_t sum = 0; - for (uint64_t i = 0; i < nelts; i++) { - uint64_t val; - RETURN_NOT_OK(input->read(&val, sizeof(uint64_t))); - sum += val; - } - - if (sum != input_sum) - return Status_FilterError("Filter error; sum does not match."); - - // The output metadata is just a view on the input metadata, skipping the - // checksum bytes. - RETURN_NOT_OK(output_metadata->append_view( - input_metadata, - sizeof(uint64_t), - input_metadata->size() - sizeof(uint64_t))); - - // The output data is just a view on the unmodified input. - RETURN_NOT_OK(output->append_view(input)); - - return Status::Ok(); - } - - PseudoChecksumFilter* clone_impl() const override { - return new PseudoChecksumFilter(filter_data_type_); - } -}; - -TEST_CASE("Filter: Test compression", "[filter][compression]") { - tiledb::sm::Config config; - - const uint64_t nelts = 100; - auto tile = make_increasing_tile(nelts); - - // Set up dummy array schema (needed by compressor filter for cell size, etc). - uint32_t dim_dom[] = {1, 10}; - auto dim{make_shared(HERE(), "", Datatype::INT32)}; - CHECK(dim->set_domain(dim_dom).ok()); - auto domain{make_shared(HERE())}; - CHECK(domain->add_dimension(dim).ok()); - tiledb::sm::ArraySchema schema; - tiledb::sm::Attribute attr("attr", Datatype::UINT64); - CHECK(schema.add_attribute(make_shared(HERE(), attr)) - .ok()); - CHECK(schema.set_domain(domain).ok()); - - FilterPipeline pipeline; - ThreadPool tp(4); - - SECTION("- Simple") { - pipeline.add_filter(Add1InPlace(Datatype::UINT64)); - pipeline.add_filter(Add1OutOfPlace(Datatype::UINT64)); - pipeline.add_filter( - CompressionFilter(tiledb::sm::Compressor::LZ4, 5, Datatype::UINT64)); - - CHECK( - pipeline.run_forward(&test::g_helper_stats, &tile, nullptr, &tp).ok()); - // Check compression worked - CHECK(tile.size() == 0); - CHECK(tile.filtered_buffer().size() < nelts * sizeof(uint64_t)); - - auto unfiltered_tile = create_tile_for_unfiltering(nelts, tile); - run_reverse(config, tp, unfiltered_tile, pipeline); - - // Check all elements original values. - for (uint64_t i = 0; i < nelts; i++) { - uint64_t elt = 0; - CHECK_NOTHROW( - unfiltered_tile.read(&elt, i * sizeof(uint64_t), sizeof(uint64_t))); - CHECK(elt == i); - } - } - - SECTION("- With checksum stage") { - pipeline.add_filter(PseudoChecksumFilter(Datatype::UINT64)); - pipeline.add_filter( - CompressionFilter(tiledb::sm::Compressor::LZ4, 5, Datatype::UINT64)); - - CHECK( - pipeline.run_forward(&test::g_helper_stats, &tile, nullptr, &tp).ok()); - // Check compression worked - CHECK(tile.size() == 0); - CHECK(tile.filtered_buffer().size() < nelts * sizeof(uint64_t)); - - auto unfiltered_tile = create_tile_for_unfiltering(nelts, tile); - run_reverse(config, tp, unfiltered_tile, pipeline); - - // Check all elements original values. - for (uint64_t i = 0; i < nelts; i++) { - uint64_t elt = 0; - CHECK_NOTHROW( - unfiltered_tile.read(&elt, i * sizeof(uint64_t), sizeof(uint64_t))); - CHECK(elt == i); - } - } - - SECTION("- With multiple stages") { - pipeline.add_filter(Add1InPlace(Datatype::UINT64)); - pipeline.add_filter(PseudoChecksumFilter(Datatype::UINT64)); - pipeline.add_filter(Add1OutOfPlace(Datatype::UINT64)); - pipeline.add_filter( - CompressionFilter(tiledb::sm::Compressor::LZ4, 5, Datatype::UINT64)); - - CHECK( - pipeline.run_forward(&test::g_helper_stats, &tile, nullptr, &tp).ok()); - // Check compression worked - CHECK(tile.size() == 0); - CHECK(tile.filtered_buffer().size() < nelts * sizeof(uint64_t)); - - auto unfiltered_tile = create_tile_for_unfiltering(nelts, tile); - run_reverse(config, tp, unfiltered_tile, pipeline); - - // Check all elements original values. - for (uint64_t i = 0; i < nelts; i++) { - uint64_t elt = 0; - CHECK_NOTHROW( - unfiltered_tile.read(&elt, i * sizeof(uint64_t), sizeof(uint64_t))); - CHECK(elt == i); - } - } -} - -TEST_CASE("Filter: Test compression var", "[filter][compression][var]") { - tiledb::sm::Config config; - - const uint64_t nelts = 100; - auto tile = make_increasing_tile(nelts); - - // Set up test data - std::vector sizes{ - 0, - 32, // Chunk0: 4 cells. - 80, // 10 cells, still makes it into this chunk as current size < 50%. - 48, // Chunk1: 6 cells. - 88, // Chunk2: 11 cells, new size > 50% and > than 10 cells. - 56, // Chunk3: 7 cells. - 72, // Chunk4: 9 cells, new size > 50%. - 8, // Chunk4: 10 cell, full. - 80, // Chunk5: 10 cells. - 160, // Chunk6: 20 cells. - 16, // Chunk7: 2 cells. - 16, // Chunk7: 4 cells. - 16, // Chunk7: 6 cells. - 16, // Chunk7: 8 cells. - 16, // Chunk7: 10 cells. - }; // Chunk8: 12 cells. - - std::vector out_sizes{112, 48, 88, 56, 80, 80, 160, 80, 96}; - - std::vector offsets(sizes.size()); - uint64_t offset = 0; - for (uint64_t i = 0; i < offsets.size() - 1; i++) { - offsets[i] = offset; - offset += sizes[i + 1]; - } - offsets[offsets.size() - 1] = offset; - - auto offsets_tile = make_offsets_tile(offsets); - - // Set up dummy array schema (needed by compressor filter for cell size, etc). - uint32_t dim_dom[] = {1, 10}; - auto dim{make_shared(HERE(), "", Datatype::INT32)}; - CHECK(dim->set_domain(dim_dom).ok()); - auto domain{make_shared(HERE())}; - CHECK(domain->add_dimension(dim).ok()); - tiledb::sm::ArraySchema schema; - tiledb::sm::Attribute attr("attr", Datatype::UINT64); - CHECK(schema.add_attribute(make_shared(HERE(), attr)) - .ok()); - CHECK(schema.set_domain(domain).ok()); - - FilterPipeline pipeline; - ThreadPool tp(4); - - SECTION("- Simple") { - WriterTile::set_max_tile_chunk_size(80); - pipeline.add_filter(Add1InPlace(Datatype::UINT64)); - pipeline.add_filter(Add1OutOfPlace(Datatype::UINT64)); - pipeline.add_filter( - CompressionFilter(tiledb::sm::Compressor::LZ4, 5, Datatype::UINT64)); - - CHECK(pipeline.run_forward(&test::g_helper_stats, &tile, &offsets_tile, &tp) - .ok()); - // Check number of chunks - CHECK(tile.size() == 0); - CHECK( - tile.filtered_buffer().value_at_as(0) == - 9); // Number of chunks - - auto unfiltered_tile = create_tile_for_unfiltering(nelts, tile); - run_reverse(config, tp, unfiltered_tile, pipeline); - - // Check all elements original values. - for (uint64_t i = 0; i < nelts; i++) { - uint64_t elt = 0; - CHECK_NOTHROW( - unfiltered_tile.read(&elt, i * sizeof(uint64_t), sizeof(uint64_t))); - CHECK(elt == i); - } - } - - SECTION("- With checksum stage") { - WriterTile::set_max_tile_chunk_size(80); - pipeline.add_filter(PseudoChecksumFilter(Datatype::UINT64)); - pipeline.add_filter( - CompressionFilter(tiledb::sm::Compressor::LZ4, 5, Datatype::UINT64)); - - CHECK(pipeline.run_forward(&test::g_helper_stats, &tile, &offsets_tile, &tp) - .ok()); - // Check number of chunks - CHECK(tile.size() == 0); - CHECK( - tile.filtered_buffer().value_at_as(0) == - 9); // Number of chunks - - auto unfiltered_tile = create_tile_for_unfiltering(nelts, tile); - run_reverse(config, tp, unfiltered_tile, pipeline); - - // Check all elements original values. - for (uint64_t i = 0; i < nelts; i++) { - uint64_t elt = 0; - CHECK_NOTHROW( - unfiltered_tile.read(&elt, i * sizeof(uint64_t), sizeof(uint64_t))); - CHECK(elt == i); - } - } - - SECTION("- With multiple stages") { - WriterTile::set_max_tile_chunk_size(80); - pipeline.add_filter(Add1InPlace(Datatype::UINT64)); - pipeline.add_filter(PseudoChecksumFilter(Datatype::UINT64)); - pipeline.add_filter(Add1OutOfPlace(Datatype::UINT64)); - pipeline.add_filter( - CompressionFilter(tiledb::sm::Compressor::LZ4, 5, Datatype::UINT64)); - - CHECK(pipeline.run_forward(&test::g_helper_stats, &tile, &offsets_tile, &tp) - .ok()); - // Check number of chunks - CHECK(tile.size() == 0); - CHECK( - tile.filtered_buffer().value_at_as(0) == - 9); // Number of chunks - - auto unfiltered_tile = create_tile_for_unfiltering(nelts, tile); - run_reverse(config, tp, unfiltered_tile, pipeline); - - // Check all elements original values. - for (uint64_t i = 0; i < nelts; i++) { - uint64_t elt = 0; - CHECK_NOTHROW( - unfiltered_tile.read(&elt, i * sizeof(uint64_t), sizeof(uint64_t))); - CHECK(elt == i); - } - } - - WriterTile::set_max_tile_chunk_size(constants::max_tile_chunk_size); -} - TEST_CASE( "Filter: Test skip checksum validation", "[filter][skip-checksum-validation]") { diff --git a/tiledb/sm/filter/test/unit_run_filter_pipeline.cc b/tiledb/sm/filter/test/unit_run_filter_pipeline.cc index c068d61c833..39b3ac17185 100644 --- a/tiledb/sm/filter/test/unit_run_filter_pipeline.cc +++ b/tiledb/sm/filter/test/unit_run_filter_pipeline.cc @@ -220,7 +220,7 @@ void check_run_pipeline_roundtrip( WriterTile& tile, std::optional& offsets_tile, FilterPipeline& pipeline, - TileDataGenerator* test_data) { + const TileDataGenerator* test_data) { // Run the pipeline forward. CHECK(pipeline .run_forward( @@ -983,3 +983,100 @@ TEST_CASE("Filter: Test random pipeline", "[filter][random]") { config, tp, tile, offsets_tile, pipeline, &tile_data_generator); } } + +TEST_CASE("Filter: Test compression", "[filter][compression]") { + // Create resources for running pipeline tests. + Config config; + ThreadPool tp(4); + FilterPipeline pipeline; + + // Set-up test data. + IncrementTileDataGenerator tile_data_generator( + 100); + auto&& [tile, offsets_tile] = tile_data_generator.create_writer_tiles(); + + SECTION("- Simple") { + pipeline.add_filter(Add1InPlace(Datatype::UINT64)); + pipeline.add_filter(Add1OutOfPlace(Datatype::UINT64)); + pipeline.add_filter( + CompressionFilter(tiledb::sm::Compressor::LZ4, 5, Datatype::UINT64)); + + // Check the pipelines run forward and backward without error and returns + // the input data. + check_run_pipeline_roundtrip( + config, tp, tile, offsets_tile, pipeline, &tile_data_generator); + } + + SECTION("- With checksum stage") { + pipeline.add_filter(PseudoChecksumFilter(Datatype::UINT64)); + pipeline.add_filter( + CompressionFilter(tiledb::sm::Compressor::LZ4, 5, Datatype::UINT64)); + + // Check the pipelines run forward and backward without error and returns + // the input data. + check_run_pipeline_roundtrip( + config, tp, tile, offsets_tile, pipeline, &tile_data_generator); + } + + SECTION("- With multiple stages") { + pipeline.add_filter(Add1InPlace(Datatype::UINT64)); + pipeline.add_filter(PseudoChecksumFilter(Datatype::UINT64)); + pipeline.add_filter(Add1OutOfPlace(Datatype::UINT64)); + pipeline.add_filter( + CompressionFilter(tiledb::sm::Compressor::LZ4, 5, Datatype::UINT64)); + + // Check the pipelines run forward and backward without error and returns + // the input data. + check_run_pipeline_roundtrip( + config, tp, tile, offsets_tile, pipeline, &tile_data_generator); + } +} + +TEST_CASE("Filter: Test compression var", "[filter][compression][var]") { + // Create TileDB resources for running the filter pipeline. + Config config; + ThreadPool tp(4); + + // Set-up test data. + SimpleVariableTestData test_data{}; + const auto& tile_data_generator = test_data.tile_data_generator(); + auto&& [tile, offsets_tile] = tile_data_generator.create_writer_tiles(); + + FilterPipeline pipeline; + + SECTION("- Simple") { + pipeline.add_filter(Add1InPlace(Datatype::UINT64)); + pipeline.add_filter(Add1OutOfPlace(Datatype::UINT64)); + pipeline.add_filter( + CompressionFilter(tiledb::sm::Compressor::LZ4, 5, Datatype::UINT64)); + + // Check the pipelines run forward and backward without error and returns + // the input data. + check_run_pipeline_roundtrip( + config, tp, tile, offsets_tile, pipeline, &tile_data_generator); + } + + SECTION("- With checksum stage") { + pipeline.add_filter(PseudoChecksumFilter(Datatype::UINT64)); + pipeline.add_filter( + CompressionFilter(tiledb::sm::Compressor::LZ4, 5, Datatype::UINT64)); + + // Check the pipelines run forward and backward without error and returns + // the input data. + check_run_pipeline_roundtrip( + config, tp, tile, offsets_tile, pipeline, &tile_data_generator); + } + + SECTION("- With multiple stages") { + pipeline.add_filter(Add1InPlace(Datatype::UINT64)); + pipeline.add_filter(PseudoChecksumFilter(Datatype::UINT64)); + pipeline.add_filter(Add1OutOfPlace(Datatype::UINT64)); + pipeline.add_filter( + CompressionFilter(tiledb::sm::Compressor::LZ4, 5, Datatype::UINT64)); + + // Check the pipelines run forward and backward without error and returns + // the input data. + check_run_pipeline_roundtrip( + config, tp, tile, offsets_tile, pipeline, &tile_data_generator); + } +}