From 1e5c7507d2d04f584b18cc2b15c22fc3f9e4a9c9 Mon Sep 17 00:00:00 2001 From: Dimitris Staratzis <33267511+DimitrisStaratzis@users.noreply.github.com> Date: Tue, 16 Jul 2024 21:46:14 +0300 Subject: [PATCH 01/14] support consolidation with max_frag_size in Dense --- test/src/unit-cppapi-max-fragment-size.cc | 92 +++++++++++++++++++ tiledb/sm/fragment/fragment_metadata.cc | 4 +- .../sm/query/writers/global_order_writer.cc | 57 +++++++++++- tiledb/sm/query/writers/global_order_writer.h | 15 +++ 4 files changed, 165 insertions(+), 3 deletions(-) diff --git a/test/src/unit-cppapi-max-fragment-size.cc b/test/src/unit-cppapi-max-fragment-size.cc index dd79e638fe4..bfbd796bee3 100644 --- a/test/src/unit-cppapi-max-fragment-size.cc +++ b/test/src/unit-cppapi-max-fragment-size.cc @@ -503,3 +503,95 @@ TEST_CASE( array.close(); } + +TEST_CASE( + "Setting max_fragment_size in Dense consolidation", + "[global-order-writer]") { + std::string array_name = "cpp_max_fragment_size_bug"; + Context ctx; + + auto cleanup = [&]() { + auto obj = Object::object(ctx, array_name); + if (obj.type() == Object::Type::Array) { + Object::remove(ctx, array_name); + } + }; + + cleanup(); + + // Remove the array at the end of this test. + ScopedExecutor deferred(cleanup); + + // Create an array with exactly 9 tiles and tile extend 1 + Domain domain(ctx); + ArraySchema schema(ctx, TILEDB_DENSE); + auto d1 = tiledb::Dimension::create(ctx, "d1", {{0, 2}}, 1); + auto d2 = tiledb::Dimension::create(ctx, "d2", {{0, 2}}, 1); + domain.add_dimension(d1); + domain.add_dimension(d2); + + auto a1 = tiledb::Attribute::create(ctx, "a"); + schema.add_attribute(a1); + + schema.set_order({{TILEDB_ROW_MAJOR, TILEDB_ROW_MAJOR}}); + schema.set_domain(domain); + + Array::create(array_name, schema); + + // Populate array with data from 1 to 9 + int value = 0; + for (int i = 0; i < 3; i++) { + Array array(ctx, array_name, TILEDB_WRITE); + Query query(ctx, array); + query.set_layout(TILEDB_ROW_MAJOR); + tiledb::Subarray sub(ctx, array); + sub.set_subarray({i, i, 0, 2}); + query.set_subarray(sub); + std::vector data = {++value, ++value, ++value}; + query.set_data_buffer("a", data); + query.submit(); + array.close(); + } + + // Read data to validate write and num of fragments. + CHECK(tiledb::test::num_fragments(array_name) == 3); + Array array(ctx, array_name, TILEDB_READ); + const std::vector subarray = {0, 2, 0, 2}; + std::vector a(9); + Query query(ctx, array, TILEDB_READ); + query.set_subarray(subarray) + .set_layout(TILEDB_ROW_MAJOR) + .set_data_buffer("a", a); + query.submit(); + array.close(); + + for (int i = 0; i < 9; i++) { + CHECK(a[i] == i + 1); + } + + // Consolidate with a size limitation for the fragment. This will result in + // the creation of two new fragments. + tiledb::Config cfg; + cfg.set("sm.consolidation.max_fragment_size", "150"); + ctx = Context(cfg); + Array::consolidate(ctx, array_name); + Array::vacuum(ctx, array_name); + + // Check that we now have 2 fragments instead of 3 + CHECK(tiledb::test::num_fragments(array_name) == 2); + + // Read data to validate correctness + Array array2(ctx, array_name, TILEDB_READ); + const std::vector subarray2 = {0, 2, 0, 2}; + std::vector a2(9); + Query query2(ctx, array2, TILEDB_READ); + query2.set_subarray(subarray2) + .set_layout(TILEDB_ROW_MAJOR) + .set_data_buffer("a", a2); + query2.submit(); + array2.close(); + + for (int i = 0; i < 9; i++) { + CHECK(a2[i] == i + 1); + } +} diff --git a/tiledb/sm/fragment/fragment_metadata.cc b/tiledb/sm/fragment/fragment_metadata.cc index 236cb1e8de4..1e8fa05ae21 100644 --- a/tiledb/sm/fragment/fragment_metadata.cc +++ b/tiledb/sm/fragment/fragment_metadata.cc @@ -695,8 +695,8 @@ void FragmentMetadata::init_domain(const NDRange& non_empty_domain) { // Sanity check assert(!non_empty_domain.empty()); - assert(non_empty_domain_.empty()); - assert(domain_.empty()); + // assert(non_empty_domain_.empty()); todo, this might cause problems + // assert(domain_.empty()); // Set non-empty domain for dense arrays (for sparse it will be calculated // via the MBRs) diff --git a/tiledb/sm/query/writers/global_order_writer.cc b/tiledb/sm/query/writers/global_order_writer.cc index fccf6754494..3a097bbf0d3 100644 --- a/tiledb/sm/query/writers/global_order_writer.cc +++ b/tiledb/sm/query/writers/global_order_writer.cc @@ -38,6 +38,7 @@ #include "tiledb/sm/array_schema/array_schema.h" #include "tiledb/sm/array_schema/dimension.h" #include "tiledb/sm/consolidator/consolidator.h" +#include "tiledb/sm/enums/array_type.h" #include "tiledb/sm/fragment/fragment_metadata.h" #include "tiledb/sm/misc/comparators.h" #include "tiledb/sm/misc/hilbert.h" @@ -91,7 +92,8 @@ GlobalOrderWriter::GlobalOrderWriter( fragment_name) , processed_conditions_(processed_conditions) , fragment_size_(fragment_size) - , current_fragment_size_(0) { + , current_fragment_size_(0) + , rows_written_(0) { // Check the layout is global order. if (layout_ != Layout::GLOBAL_ORDER) { throw GlobalOrderWriterException( @@ -781,6 +783,13 @@ Status GlobalOrderWriter::global_write() { // Compute the number of tiles that will fit in this fragment. auto num = num_tiles_to_write(idx, tile_num, tiles); + if (tile_num != num && array_schema_.array_type() == ArrayType::DENSE) { + // if it is a dense array and not all tiles can fit in the current + // fragment then we need to split the domain + NDRange new_nd = ndranges_after_split(num); + frag_meta->init_domain(new_nd); + } + // If we're resuming a fragment write and the first tile doesn't fit into // the previous fragment, we need to start a new fragment and recalculate // the number of tiles to write. @@ -1422,6 +1431,52 @@ uint64_t GlobalOrderWriter::num_tiles_to_write( return tile_num - start; } +uint64_t GlobalOrderWriter::num_tiles_per_row() { + auto dim_num = array_schema_.dim_num(); + uint64_t ret = 1; + for (unsigned d = 1; d < dim_num; ++d) { + // skip first dim. todo Explain + auto dim{array_schema_.dimension_ptr(d)}; + auto dim_dom = dim->domain(); + ret *= dim->domain_range(dim_dom); + } + return ret; +} + +NDRange GlobalOrderWriter::ndranges_after_split(uint64_t num) { + uint64_t tiles_per_row = num_tiles_per_row(); + auto dim_num = array_schema_.dim_num(); + NDRange nd; + nd.reserve(dim_num); + + if (num % tiles_per_row != 0) { + throw GlobalOrderWriterException( + "This fragment target size is not possible please try something else "); // todo fix + } + + // Calculate how many rows we will write in the current fragment + uint64_t rows_to_write = num / tiles_per_row; + + // Create the range for the index dim (first). + int start = rows_written_; + int end = start + rows_to_write - 1; + Range range(&start, &end, sizeof(int)); + nd.emplace_back(range); + + // Use the domain as ranges for the rest of the dims + for (unsigned d = 1; d < dim_num; ++d) { + // begin from second dim + auto dim{array_schema_.dimension_ptr(d)}; + auto dim_dom = dim->domain(); + nd.emplace_back(dim_dom); + } + + // add rows written to the cache + rows_written_ += rows_to_write; + + return nd; +} + Status GlobalOrderWriter::start_new_fragment() { auto frag_meta = global_write_state_->frag_meta_; auto& uri = frag_meta->fragment_uri(); diff --git a/tiledb/sm/query/writers/global_order_writer.h b/tiledb/sm/query/writers/global_order_writer.h index c15b81f67c8..cf6cc417e6c 100644 --- a/tiledb/sm/query/writers/global_order_writer.h +++ b/tiledb/sm/query/writers/global_order_writer.h @@ -215,6 +215,12 @@ class GlobalOrderWriter : public WriterBase { */ uint64_t current_fragment_size_; + /** + * Counter for the number of rows written. This is used only when the + * consolidation produces more than one fragment in Dense arrays + */ + uint64_t rows_written_; + /* ********************************* */ /* PRIVATE METHODS */ /* ********************************* */ @@ -385,6 +391,15 @@ class GlobalOrderWriter : public WriterBase { uint64_t tile_num, tdb::pmr::unordered_map& tiles); + /** + * Return the number of tiles a single row can hold + * + * @return Number of tiles. + */ + NDRange ndranges_after_split(uint64_t num); + + uint64_t num_tiles_per_row(); + /** * Close the current fragment and start a new one. The closed fragment will * be added to `frag_uris_to_commit_` so that all fragments in progress can From 14cc0b22e285fac58f1fb82557a8deec2a4d2572 Mon Sep 17 00:00:00 2001 From: Dimitris Staratzis <33267511+DimitrisStaratzis@users.noreply.github.com> Date: Tue, 16 Jul 2024 23:18:51 +0300 Subject: [PATCH 02/14] fix build failures in test --- test/src/unit-cppapi-max-fragment-size.cc | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/test/src/unit-cppapi-max-fragment-size.cc b/test/src/unit-cppapi-max-fragment-size.cc index bfbd796bee3..b865d7cc422 100644 --- a/test/src/unit-cppapi-max-fragment-size.cc +++ b/test/src/unit-cppapi-max-fragment-size.cc @@ -555,13 +555,13 @@ TEST_CASE( // Read data to validate write and num of fragments. CHECK(tiledb::test::num_fragments(array_name) == 3); + Array array(ctx, array_name, TILEDB_READ); - const std::vector subarray = {0, 2, 0, 2}; + tiledb::Subarray sub(ctx, array); + sub.set_subarray({0, 2, 0, 2}); std::vector a(9); Query query(ctx, array, TILEDB_READ); - query.set_subarray(subarray) - .set_layout(TILEDB_ROW_MAJOR) - .set_data_buffer("a", a); + query.set_subarray(sub).set_layout(TILEDB_ROW_MAJOR).set_data_buffer("a", a); query.submit(); array.close(); @@ -582,10 +582,11 @@ TEST_CASE( // Read data to validate correctness Array array2(ctx, array_name, TILEDB_READ); - const std::vector subarray2 = {0, 2, 0, 2}; + tiledb::Subarray sub2(ctx, array2); + sub2.set_subarray({0, 2, 0, 2}); std::vector a2(9); Query query2(ctx, array2, TILEDB_READ); - query2.set_subarray(subarray2) + query2.set_subarray(sub2) .set_layout(TILEDB_ROW_MAJOR) .set_data_buffer("a", a2); query2.submit(); From 94de2d8117f78c46ccf5f4baba1999f380a4f46b Mon Sep 17 00:00:00 2001 From: Dimitris Staratzis <33267511+DimitrisStaratzis@users.noreply.github.com> Date: Wed, 17 Jul 2024 00:38:02 +0300 Subject: [PATCH 03/14] improve tiles_per_row calculation --- tiledb/sm/query/writers/global_order_writer.cc | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tiledb/sm/query/writers/global_order_writer.cc b/tiledb/sm/query/writers/global_order_writer.cc index 3a097bbf0d3..cdba00d7bf0 100644 --- a/tiledb/sm/query/writers/global_order_writer.cc +++ b/tiledb/sm/query/writers/global_order_writer.cc @@ -1438,7 +1438,9 @@ uint64_t GlobalOrderWriter::num_tiles_per_row() { // skip first dim. todo Explain auto dim{array_schema_.dimension_ptr(d)}; auto dim_dom = dim->domain(); - ret *= dim->domain_range(dim_dom); + ret *= dim->domain_range(dim_dom) / dim->tile_extent().rvalue_as(); + // todo consider cases where the above calculation has a remainder. Also + // consider other types } return ret; } From 4857d609b620e1a2a6e05e1cc0f8da7785c4f542 Mon Sep 17 00:00:00 2001 From: Dimitris Staratzis <33267511+DimitrisStaratzis@users.noreply.github.com> Date: Thu, 18 Jul 2024 19:15:45 +0300 Subject: [PATCH 04/14] fix support for support multiple cells per tile --- test/src/unit-cppapi-max-fragment-size.cc | 91 +++++++++++++++++++ .../sm/query/writers/global_order_writer.cc | 19 ++-- tiledb/sm/query/writers/global_order_writer.h | 11 ++- 3 files changed, 113 insertions(+), 8 deletions(-) diff --git a/test/src/unit-cppapi-max-fragment-size.cc b/test/src/unit-cppapi-max-fragment-size.cc index b865d7cc422..81f3e62a882 100644 --- a/test/src/unit-cppapi-max-fragment-size.cc +++ b/test/src/unit-cppapi-max-fragment-size.cc @@ -596,3 +596,94 @@ TEST_CASE( CHECK(a2[i] == i + 1); } } + +TEST_CASE( + "Setting max_fragment_size in Dense consolidation one dim", + "[global-order-writer]") { + std::string array_name = "cpp_max_fragment_size_bug"; + Context ctx; + + auto cleanup = [&]() { + auto obj = Object::object(ctx, array_name); + if (obj.type() == Object::Type::Array) { + Object::remove(ctx, array_name); + } + }; + + cleanup(); + + // Remove the array at the end of this test. + // ScopedExecutor deferred(cleanup); + + // Create an array with exactly 9 tiles and tile extend 1 + Domain domain(ctx); + ArraySchema schema(ctx, TILEDB_DENSE); + auto d1 = tiledb::Dimension::create(ctx, "d1", {{0, 8}}, 3); + domain.add_dimension(d1); + + auto a1 = tiledb::Attribute::create(ctx, "a"); + schema.add_attribute(a1); + + schema.set_order({{TILEDB_ROW_MAJOR, TILEDB_ROW_MAJOR}}); + schema.set_domain(domain); + + Array::create(array_name, schema); + + // Populate array with data from 1 to 9 + int value = 0; + for (int i = 0; i < 9; i+=3) { + Array array(ctx, array_name, TILEDB_WRITE); + Query query(ctx, array); + query.set_layout(TILEDB_ROW_MAJOR); + tiledb::Subarray sub(ctx, array); + sub.set_subarray({i, i + 2}); + query.set_subarray(sub); + std::vector data = {++value, ++value, ++value}; + query.set_data_buffer("a", data); + query.submit(); + array.close(); + } + + // Read data to validate write and num of fragments. + // CHECK(tiledb::test::num_fragments(array_name) == 3); + + Array array(ctx, array_name, TILEDB_READ); + tiledb::Subarray sub(ctx, array); + sub.set_subarray({0, 8}); + std::vector a(9); + Query query(ctx, array, TILEDB_READ); + query.set_subarray(sub).set_layout(TILEDB_ROW_MAJOR).set_data_buffer("a", a); + query.submit(); + array.close(); + + for (int i = 0; i < 9; i++) { + CHECK(a[i] == i + 1); + } + + // Consolidate with a size limitation for the fragment. This will result in + // the creation of two new fragments. + tiledb::Config cfg; + cfg.set("sm.consolidation.max_fragment_size", "80"); + ctx = Context(cfg); + Array::consolidate(ctx, array_name); + Array::vacuum(ctx, array_name); + + // Check that we now have 2 fragments instead of 3 + CHECK(tiledb::test::num_fragments(array_name) == 2); + + // Read data to validate correctness + Array array2(ctx, array_name, TILEDB_READ); + tiledb::Subarray sub2(ctx, array2); + sub2.set_subarray({0, 8}); + std::vector a2(9); + Query query2(ctx, array2, TILEDB_READ); + query2.set_subarray(sub2) + .set_layout(TILEDB_ROW_MAJOR) + .set_data_buffer("a", a2); + query2.submit(); + array2.close(); + + for (int i = 0; i < 9; i++) { + CHECK(a2[i] == i + 1); + } +} diff --git a/tiledb/sm/query/writers/global_order_writer.cc b/tiledb/sm/query/writers/global_order_writer.cc index cdba00d7bf0..f1fafc7ab88 100644 --- a/tiledb/sm/query/writers/global_order_writer.cc +++ b/tiledb/sm/query/writers/global_order_writer.cc @@ -1434,8 +1434,11 @@ uint64_t GlobalOrderWriter::num_tiles_to_write( uint64_t GlobalOrderWriter::num_tiles_per_row() { auto dim_num = array_schema_.dim_num(); uint64_t ret = 1; + for (unsigned d = 1; d < dim_num; ++d) { - // skip first dim. todo Explain + // Skip first dim. We want to calculate how many tiles can fit in one row. + // To do that we skip the first dim and multiply the range / extend of the + // other dimensions auto dim{array_schema_.dimension_ptr(d)}; auto dim_dom = dim->domain(); ret *= dim->domain_range(dim_dom) / dim->tile_extent().rvalue_as(); @@ -1448,20 +1451,24 @@ uint64_t GlobalOrderWriter::num_tiles_per_row() { NDRange GlobalOrderWriter::ndranges_after_split(uint64_t num) { uint64_t tiles_per_row = num_tiles_per_row(); auto dim_num = array_schema_.dim_num(); + uint64_t cells_in_tile = array_schema_.domain().cell_num_per_tile(); NDRange nd; nd.reserve(dim_num); if (num % tiles_per_row != 0) { throw GlobalOrderWriterException( - "This fragment target size is not possible please try something else "); // todo fix + "The target fragment size cannot be achieved. Please try using a " + "different size, or there might be a misconfiguration in the array " + "schema."); } // Calculate how many rows we will write in the current fragment - uint64_t rows_to_write = num / tiles_per_row; + uint64_t rows_of_tiles_to_write = num / tiles_per_row; // Create the range for the index dim (first). - int start = rows_written_; - int end = start + rows_to_write - 1; + int start = + rows_written_ * cells_in_tile; // todo start from the dim_dom start + int end = start + (rows_of_tiles_to_write * cells_in_tile) - 1; Range range(&start, &end, sizeof(int)); nd.emplace_back(range); @@ -1474,7 +1481,7 @@ NDRange GlobalOrderWriter::ndranges_after_split(uint64_t num) { } // add rows written to the cache - rows_written_ += rows_to_write; + rows_written_ += rows_of_tiles_to_write; return nd; } diff --git a/tiledb/sm/query/writers/global_order_writer.h b/tiledb/sm/query/writers/global_order_writer.h index cf6cc417e6c..0a7937de01f 100644 --- a/tiledb/sm/query/writers/global_order_writer.h +++ b/tiledb/sm/query/writers/global_order_writer.h @@ -392,12 +392,19 @@ class GlobalOrderWriter : public WriterBase { tdb::pmr::unordered_map& tiles); /** - * Return the number of tiles a single row can hold + * Create new ndranges by splitting the first dimension based on the number of + * tiles we need to write + * @param num The number of tiles we need to write. * - * @return Number of tiles. */ NDRange ndranges_after_split(uint64_t num); + /** + * Return the number of tiles a single row can hold. More specifically, the + * number of tiles all dimensions except the first can hold. + * + * @return Number of tiles. + */ uint64_t num_tiles_per_row(); /** From e8a26c85cd7e3e1bc4ada02e3764329a70b1f5f0 Mon Sep 17 00:00:00 2001 From: dimitrisstaratzis Date: Thu, 18 Jul 2024 19:53:08 +0300 Subject: [PATCH 05/14] fix format --- test/src/unit-cppapi-max-fragment-size.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/src/unit-cppapi-max-fragment-size.cc b/test/src/unit-cppapi-max-fragment-size.cc index 81f3e62a882..d78e03912ac 100644 --- a/test/src/unit-cppapi-max-fragment-size.cc +++ b/test/src/unit-cppapi-max-fragment-size.cc @@ -631,7 +631,7 @@ TEST_CASE( // Populate array with data from 1 to 9 int value = 0; - for (int i = 0; i < 9; i+=3) { + for (int i = 0; i < 9; i += 3) { Array array(ctx, array_name, TILEDB_WRITE); Query query(ctx, array); query.set_layout(TILEDB_ROW_MAJOR); From 86fc717e5322f9d73ee8742a3629dc98c900f2eb Mon Sep 17 00:00:00 2001 From: Dimitris Staratzis <33267511+DimitrisStaratzis@users.noreply.github.com> Date: Mon, 22 Jul 2024 18:37:37 +0300 Subject: [PATCH 06/14] add support for multi typed dimensions --- .../sm/query/writers/global_order_writer.cc | 25 +++++++++++++++---- 1 file changed, 20 insertions(+), 5 deletions(-) diff --git a/tiledb/sm/query/writers/global_order_writer.cc b/tiledb/sm/query/writers/global_order_writer.cc index f1fafc7ab88..4da413c5c14 100644 --- a/tiledb/sm/query/writers/global_order_writer.cc +++ b/tiledb/sm/query/writers/global_order_writer.cc @@ -52,6 +52,7 @@ #include "tiledb/sm/tile/tile_metadata_generator.h" #include "tiledb/sm/tile/writer_tile_tuple.h" #include "tiledb/storage_format/uri/generate_uri.h" +#include "tiledb/type/apply_with_type.h" using namespace tiledb; using namespace tiledb::common; @@ -1441,7 +1442,11 @@ uint64_t GlobalOrderWriter::num_tiles_per_row() { // other dimensions auto dim{array_schema_.dimension_ptr(d)}; auto dim_dom = dim->domain(); - ret *= dim->domain_range(dim_dom) / dim->tile_extent().rvalue_as(); + auto l = [&](auto T) { + return static_cast(dim->tile_extent().rvalue_as()); + }; + + ret *= dim->domain_range(dim_dom) / apply_with_type(l, dim->type()); // todo consider cases where the above calculation has a remainder. Also // consider other types } @@ -1449,9 +1454,20 @@ uint64_t GlobalOrderWriter::num_tiles_per_row() { } NDRange GlobalOrderWriter::ndranges_after_split(uint64_t num) { + // if (disable_checks_consolidation_) { + // auto expanded_subarray = subarray_.ndrange(0); + // domain.expand_to_tiles(&expanded_subarray); + // } + uint64_t tiles_per_row = num_tiles_per_row(); auto dim_num = array_schema_.dim_num(); - uint64_t cells_in_tile = array_schema_.domain().cell_num_per_tile(); + auto dim{array_schema_.dimension_ptr(0)}; + + auto l = [&](auto T) { + return static_cast(dim->tile_extent().rvalue_as()); + }; + + uint64_t tile_extent = apply_with_type(l, dim->type()); NDRange nd; nd.reserve(dim_num); @@ -1466,9 +1482,8 @@ NDRange GlobalOrderWriter::ndranges_after_split(uint64_t num) { uint64_t rows_of_tiles_to_write = num / tiles_per_row; // Create the range for the index dim (first). - int start = - rows_written_ * cells_in_tile; // todo start from the dim_dom start - int end = start + (rows_of_tiles_to_write * cells_in_tile) - 1; + int start = rows_written_ * tile_extent; // todo start from the dim_dom start + int end = start + (rows_of_tiles_to_write * tile_extent) - 1; Range range(&start, &end, sizeof(int)); nd.emplace_back(range); From 35b0e9d1e97289052e5e075d1f5342e6a0f40ab4 Mon Sep 17 00:00:00 2001 From: Dimitris Staratzis <33267511+DimitrisStaratzis@users.noreply.github.com> Date: Tue, 23 Jul 2024 16:17:50 +0300 Subject: [PATCH 07/14] use domain expanded to the closest tiles --- .../sm/query/writers/global_order_writer.cc | 25 +++++++++++-------- tiledb/sm/query/writers/global_order_writer.h | 4 ++- 2 files changed, 17 insertions(+), 12 deletions(-) diff --git a/tiledb/sm/query/writers/global_order_writer.cc b/tiledb/sm/query/writers/global_order_writer.cc index 4da413c5c14..6ca73c70704 100644 --- a/tiledb/sm/query/writers/global_order_writer.cc +++ b/tiledb/sm/query/writers/global_order_writer.cc @@ -1432,15 +1432,15 @@ uint64_t GlobalOrderWriter::num_tiles_to_write( return tile_num - start; } -uint64_t GlobalOrderWriter::num_tiles_per_row() { - auto dim_num = array_schema_.dim_num(); +uint64_t GlobalOrderWriter::num_tiles_per_row(const Domain& domain) { + auto dim_num = domain.dim_num(); uint64_t ret = 1; for (unsigned d = 1; d < dim_num; ++d) { // Skip first dim. We want to calculate how many tiles can fit in one row. // To do that we skip the first dim and multiply the range / extend of the // other dimensions - auto dim{array_schema_.dimension_ptr(d)}; + auto dim{domain.dimension_ptr(d)}; auto dim_dom = dim->domain(); auto l = [&](auto T) { return static_cast(dim->tile_extent().rvalue_as()); @@ -1454,14 +1454,17 @@ uint64_t GlobalOrderWriter::num_tiles_per_row() { } NDRange GlobalOrderWriter::ndranges_after_split(uint64_t num) { - // if (disable_checks_consolidation_) { - // auto expanded_subarray = subarray_.ndrange(0); - // domain.expand_to_tiles(&expanded_subarray); - // } + // Expand domain to full tiles + auto& domain{array_schema_.domain()}; + if (disable_checks_consolidation_) { + auto expanded_subarray = subarray_.ndrange(0); + domain.expand_to_tiles(&expanded_subarray); + } - uint64_t tiles_per_row = num_tiles_per_row(); - auto dim_num = array_schema_.dim_num(); - auto dim{array_schema_.dimension_ptr(0)}; + // Calculate how many tiles each row can hold + uint64_t tiles_per_row = num_tiles_per_row(domain); + auto dim_num = domain.dim_num(); + auto dim{domain.dimension_ptr(0)}; auto l = [&](auto T) { return static_cast(dim->tile_extent().rvalue_as()); @@ -1482,7 +1485,7 @@ NDRange GlobalOrderWriter::ndranges_after_split(uint64_t num) { uint64_t rows_of_tiles_to_write = num / tiles_per_row; // Create the range for the index dim (first). - int start = rows_written_ * tile_extent; // todo start from the dim_dom start + int start = rows_written_ * tile_extent; int end = start + (rows_of_tiles_to_write * tile_extent) - 1; Range range(&start, &end, sizeof(int)); nd.emplace_back(range); diff --git a/tiledb/sm/query/writers/global_order_writer.h b/tiledb/sm/query/writers/global_order_writer.h index 0a7937de01f..7b0d7499b79 100644 --- a/tiledb/sm/query/writers/global_order_writer.h +++ b/tiledb/sm/query/writers/global_order_writer.h @@ -403,9 +403,11 @@ class GlobalOrderWriter : public WriterBase { * Return the number of tiles a single row can hold. More specifically, the * number of tiles all dimensions except the first can hold. * + * @param domain The domain + * * @return Number of tiles. */ - uint64_t num_tiles_per_row(); + uint64_t num_tiles_per_row(const Domain& domain); /** * Close the current fragment and start a new one. The closed fragment will From 3129b29c9b3f4ae52e4580cd535b542f02a4b4e6 Mon Sep 17 00:00:00 2001 From: Dimitris Staratzis <33267511+DimitrisStaratzis@users.noreply.github.com> Date: Tue, 23 Jul 2024 18:09:06 +0300 Subject: [PATCH 08/14] support dimension domains that don't start from 0 --- test/src/unit-cppapi-max-fragment-size.cc | 10 ++--- .../sm/query/writers/global_order_writer.cc | 44 ++++++++++++------- tiledb/sm/query/writers/global_order_writer.h | 6 +++ 3 files changed, 39 insertions(+), 21 deletions(-) diff --git a/test/src/unit-cppapi-max-fragment-size.cc b/test/src/unit-cppapi-max-fragment-size.cc index d78e03912ac..0e4d371ed8c 100644 --- a/test/src/unit-cppapi-max-fragment-size.cc +++ b/test/src/unit-cppapi-max-fragment-size.cc @@ -613,12 +613,12 @@ TEST_CASE( cleanup(); // Remove the array at the end of this test. - // ScopedExecutor deferred(cleanup); + ScopedExecutor deferred(cleanup); // Create an array with exactly 9 tiles and tile extend 1 Domain domain(ctx); ArraySchema schema(ctx, TILEDB_DENSE); - auto d1 = tiledb::Dimension::create(ctx, "d1", {{0, 8}}, 3); + auto d1 = tiledb::Dimension::create(ctx, "d1", {{1, 9}}, 3); domain.add_dimension(d1); auto a1 = tiledb::Attribute::create(ctx, "a"); @@ -631,7 +631,7 @@ TEST_CASE( // Populate array with data from 1 to 9 int value = 0; - for (int i = 0; i < 9; i += 3) { + for (int i = 1; i < 10; i += 3) { Array array(ctx, array_name, TILEDB_WRITE); Query query(ctx, array); query.set_layout(TILEDB_ROW_MAJOR); @@ -649,7 +649,7 @@ TEST_CASE( Array array(ctx, array_name, TILEDB_READ); tiledb::Subarray sub(ctx, array); - sub.set_subarray({0, 8}); + sub.set_subarray({1, 9}); std::vector a(9); Query query(ctx, array, TILEDB_READ); query.set_subarray(sub).set_layout(TILEDB_ROW_MAJOR).set_data_buffer("a", a); @@ -674,7 +674,7 @@ TEST_CASE( // Read data to validate correctness Array array2(ctx, array_name, TILEDB_READ); tiledb::Subarray sub2(ctx, array2); - sub2.set_subarray({0, 8}); + sub2.set_subarray({1, 9}); std::vector a2(9); Query query2(ctx, array2, TILEDB_READ); query2.set_subarray(sub2) diff --git a/tiledb/sm/query/writers/global_order_writer.cc b/tiledb/sm/query/writers/global_order_writer.cc index 6ca73c70704..a8e50afd77d 100644 --- a/tiledb/sm/query/writers/global_order_writer.cc +++ b/tiledb/sm/query/writers/global_order_writer.cc @@ -94,7 +94,8 @@ GlobalOrderWriter::GlobalOrderWriter( , processed_conditions_(processed_conditions) , fragment_size_(fragment_size) , current_fragment_size_(0) - , rows_written_(0) { + , rows_written_(0) + , start_(0) { // Check the layout is global order. if (layout_ != Layout::GLOBAL_ORDER) { throw GlobalOrderWriterException( @@ -1463,16 +1464,6 @@ NDRange GlobalOrderWriter::ndranges_after_split(uint64_t num) { // Calculate how many tiles each row can hold uint64_t tiles_per_row = num_tiles_per_row(domain); - auto dim_num = domain.dim_num(); - auto dim{domain.dimension_ptr(0)}; - - auto l = [&](auto T) { - return static_cast(dim->tile_extent().rvalue_as()); - }; - - uint64_t tile_extent = apply_with_type(l, dim->type()); - NDRange nd; - nd.reserve(dim_num); if (num % tiles_per_row != 0) { throw GlobalOrderWriterException( @@ -1484,13 +1475,33 @@ NDRange GlobalOrderWriter::ndranges_after_split(uint64_t num) { // Calculate how many rows we will write in the current fragment uint64_t rows_of_tiles_to_write = num / tiles_per_row; - // Create the range for the index dim (first). - int start = rows_written_ * tile_extent; - int end = start + (rows_of_tiles_to_write * tile_extent) - 1; - Range range(&start, &end, sizeof(int)); + // Create NDRange object and reserve for dims + auto dim_num = domain.dim_num(); + NDRange nd; + nd.reserve(dim_num); + + // Calculate the range for the index dim (first). + auto dim{domain.dimension_ptr(0)}; + auto dim_dom = dim->domain(); + auto l = [&](auto T) { + return static_cast(dim->tile_extent().rvalue_as()); + }; + uint64_t tile_extent = apply_with_type(l, dim->type()); + + // Calculate start and end + if (rows_written_ == 0) { + // It means that the start has not been set yet. Set it to the minimum value + // of the expanded domain for that dim + auto dim_dom_data = (const uint64_t*)dim_dom.data(); + start_ = dim_dom_data[0]; + } + uint64_t end = start_ + (rows_of_tiles_to_write * tile_extent) - 1; + + // Add range + Range range(&start_, &end, sizeof(int)); nd.emplace_back(range); - // Use the domain as ranges for the rest of the dims + // For the rest of the dims, use their domains as ranges. No split there. for (unsigned d = 1; d < dim_num; ++d) { // begin from second dim auto dim{array_schema_.dimension_ptr(d)}; @@ -1500,6 +1511,7 @@ NDRange GlobalOrderWriter::ndranges_after_split(uint64_t num) { // add rows written to the cache rows_written_ += rows_of_tiles_to_write; + start_ = end + 1; return nd; } diff --git a/tiledb/sm/query/writers/global_order_writer.h b/tiledb/sm/query/writers/global_order_writer.h index 7b0d7499b79..f9396ca531f 100644 --- a/tiledb/sm/query/writers/global_order_writer.h +++ b/tiledb/sm/query/writers/global_order_writer.h @@ -221,6 +221,12 @@ class GlobalOrderWriter : public WriterBase { */ uint64_t rows_written_; + /** + * This is the start for the dim range in case we need to split in multiple + * fragments in Dense arrays + */ + uint64_t start_; + /* ********************************* */ /* PRIVATE METHODS */ /* ********************************* */ From f24e071853961f5b8ffbf26c6abcea84acd1d7bb Mon Sep 17 00:00:00 2001 From: Dimitris Staratzis <33267511+DimitrisStaratzis@users.noreply.github.com> Date: Tue, 23 Jul 2024 20:33:48 +0300 Subject: [PATCH 09/14] make previous commit work for all datatypes --- tiledb/sm/query/writers/global_order_writer.cc | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/tiledb/sm/query/writers/global_order_writer.cc b/tiledb/sm/query/writers/global_order_writer.cc index a8e50afd77d..18bf1e27b28 100644 --- a/tiledb/sm/query/writers/global_order_writer.cc +++ b/tiledb/sm/query/writers/global_order_writer.cc @@ -1448,8 +1448,7 @@ uint64_t GlobalOrderWriter::num_tiles_per_row(const Domain& domain) { }; ret *= dim->domain_range(dim_dom) / apply_with_type(l, dim->type()); - // todo consider cases where the above calculation has a remainder. Also - // consider other types + // todo consider cases where the above calculation has a remainder. } return ret; } @@ -1492,8 +1491,13 @@ NDRange GlobalOrderWriter::ndranges_after_split(uint64_t num) { if (rows_written_ == 0) { // It means that the start has not been set yet. Set it to the minimum value // of the expanded domain for that dim - auto dim_dom_data = (const uint64_t*)dim_dom.data(); - start_ = dim_dom_data[0]; + auto ll = [&](auto T) { + auto dim_dom_data = (const decltype(T)*)dim_dom.data(); + // todo this should be unsigned or signed + return static_cast(dim_dom_data[0]); + }; + + start_ = apply_with_type(ll, dim->type()); } uint64_t end = start_ + (rows_of_tiles_to_write * tile_extent) - 1; From f978bbf5b3e22d5bb2c79da91caa969d2c27c7cc Mon Sep 17 00:00:00 2001 From: Dimitris Staratzis <33267511+DimitrisStaratzis@users.noreply.github.com> Date: Tue, 27 Aug 2024 22:23:34 +0300 Subject: [PATCH 10/14] work with multiple global_write() calls --- .../sm/query/writers/global_order_writer.cc | 39 ++++++++++++++----- tiledb/sm/query/writers/global_order_writer.h | 21 +++++++++- 2 files changed, 49 insertions(+), 11 deletions(-) diff --git a/tiledb/sm/query/writers/global_order_writer.cc b/tiledb/sm/query/writers/global_order_writer.cc index 18bf1e27b28..70f07ba0a26 100644 --- a/tiledb/sm/query/writers/global_order_writer.cc +++ b/tiledb/sm/query/writers/global_order_writer.cc @@ -95,7 +95,10 @@ GlobalOrderWriter::GlobalOrderWriter( , fragment_size_(fragment_size) , current_fragment_size_(0) , rows_written_(0) - , start_(0) { + , tiles_in_current_row_(0) + , start_(0) + , end_(0) + , nd_if_dense_split_{} { // Check the layout is global order. if (layout_ != Layout::GLOBAL_ORDER) { throw GlobalOrderWriterException( @@ -785,11 +788,16 @@ Status GlobalOrderWriter::global_write() { // Compute the number of tiles that will fit in this fragment. auto num = num_tiles_to_write(idx, tile_num, tiles); - if (tile_num != num && array_schema_.array_type() == ArrayType::DENSE) { + if (array_schema_.array_type() == + ArrayType::DENSE) { //&& this is consolidation // if it is a dense array and not all tiles can fit in the current - // fragment then we need to split the domain - NDRange new_nd = ndranges_after_split(num); - frag_meta->init_domain(new_nd); + // fragment then we need to split the domain, otherwise if all tiles can + // fit it means that we are in the middle of a write + nd_if_dense_split_ = ndranges_after_split(num, tile_num != num); + } + + if (tile_num != num && !nd_if_dense_split_.empty()) { + frag_meta->init_domain(nd_if_dense_split_); } // If we're resuming a fragment write and the first tile doesn't fit into @@ -1453,7 +1461,8 @@ uint64_t GlobalOrderWriter::num_tiles_per_row(const Domain& domain) { return ret; } -NDRange GlobalOrderWriter::ndranges_after_split(uint64_t num) { +NDRange GlobalOrderWriter::ndranges_after_split( + uint64_t num, bool reached_end_of_fragment) { // Expand domain to full tiles auto& domain{array_schema_.domain()}; if (disable_checks_consolidation_) { @@ -1464,16 +1473,21 @@ NDRange GlobalOrderWriter::ndranges_after_split(uint64_t num) { // Calculate how many tiles each row can hold uint64_t tiles_per_row = num_tiles_per_row(domain); - if (num % tiles_per_row != 0) { + // Calculate how many rows we will write in the current fragment + uint64_t rows_of_tiles_to_write = + (num - tiles_in_current_row_) / tiles_per_row; + uint64_t remainder_of_tiles = (num - tiles_in_current_row_) % tiles_per_row; + tiles_in_current_row_ = remainder_of_tiles; + + // If we have not written a full row and we have reached the end of the + // fragment abort + if (tiles_in_current_row_ != 0 && reached_end_of_fragment) { throw GlobalOrderWriterException( "The target fragment size cannot be achieved. Please try using a " "different size, or there might be a misconfiguration in the array " "schema."); } - // Calculate how many rows we will write in the current fragment - uint64_t rows_of_tiles_to_write = num / tiles_per_row; - // Create NDRange object and reserve for dims auto dim_num = domain.dim_num(); NDRange nd; @@ -1498,9 +1512,14 @@ NDRange GlobalOrderWriter::ndranges_after_split(uint64_t num) { }; start_ = apply_with_type(ll, dim->type()); + end_ = apply_with_type(ll, dim->type()); } uint64_t end = start_ + (rows_of_tiles_to_write * tile_extent) - 1; + if (tiles_in_current_row_ != 0 && !reached_end_of_fragment && end < end_) { + end++; + } + // Add range Range range(&start_, &end, sizeof(int)); nd.emplace_back(range); diff --git a/tiledb/sm/query/writers/global_order_writer.h b/tiledb/sm/query/writers/global_order_writer.h index f9396ca531f..2f0462c5362 100644 --- a/tiledb/sm/query/writers/global_order_writer.h +++ b/tiledb/sm/query/writers/global_order_writer.h @@ -221,12 +221,29 @@ class GlobalOrderWriter : public WriterBase { */ uint64_t rows_written_; + /** + * Counter for the number of tiles in the current row written. This is used + * only when the consolidation produces more than one fragment in Dense arrays + */ + uint64_t tiles_in_current_row_; + /** * This is the start for the dim range in case we need to split in multiple * fragments in Dense arrays */ uint64_t start_; + /** + * This is the start for the dim range in case we need to split in multiple + * fragments in Dense arrays + */ + uint64_t end_; + + /** + * NDRange in case we have a dense consolidation with split + */ + NDRange nd_if_dense_split_; + /* ********************************* */ /* PRIVATE METHODS */ /* ********************************* */ @@ -401,9 +418,11 @@ class GlobalOrderWriter : public WriterBase { * Create new ndranges by splitting the first dimension based on the number of * tiles we need to write * @param num The number of tiles we need to write. + * @param reached_end_of_fragment True if we have reached the end of the + * current frag * */ - NDRange ndranges_after_split(uint64_t num); + NDRange ndranges_after_split(uint64_t num, bool reached_end_of_fragment); /** * Return the number of tiles a single row can hold. More specifically, the From 15017afb09b2176b15585f7c566b1ebd2310590d Mon Sep 17 00:00:00 2001 From: Dimitris Staratzis <33267511+DimitrisStaratzis@users.noreply.github.com> Date: Wed, 28 Aug 2024 18:00:49 +0300 Subject: [PATCH 11/14] catch edge cases for above --- test/src/unit-cppapi-max-fragment-size.cc | 4 ++-- tiledb/sm/query/writers/global_order_writer.cc | 13 +++++++++++-- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/test/src/unit-cppapi-max-fragment-size.cc b/test/src/unit-cppapi-max-fragment-size.cc index 0e4d371ed8c..019f3b75d71 100644 --- a/test/src/unit-cppapi-max-fragment-size.cc +++ b/test/src/unit-cppapi-max-fragment-size.cc @@ -506,7 +506,7 @@ TEST_CASE( TEST_CASE( "Setting max_fragment_size in Dense consolidation", - "[global-order-writer]") { + "[global-order-writer][max-frag-size-consolidation]") { std::string array_name = "cpp_max_fragment_size_bug"; Context ctx; @@ -599,7 +599,7 @@ TEST_CASE( TEST_CASE( "Setting max_fragment_size in Dense consolidation one dim", - "[global-order-writer]") { + "[global-order-writer][max-frag-size-consolidation]") { std::string array_name = "cpp_max_fragment_size_bug"; Context ctx; diff --git a/tiledb/sm/query/writers/global_order_writer.cc b/tiledb/sm/query/writers/global_order_writer.cc index 70f07ba0a26..fb0dfb0b242 100644 --- a/tiledb/sm/query/writers/global_order_writer.cc +++ b/tiledb/sm/query/writers/global_order_writer.cc @@ -1476,7 +1476,15 @@ NDRange GlobalOrderWriter::ndranges_after_split( // Calculate how many rows we will write in the current fragment uint64_t rows_of_tiles_to_write = (num - tiles_in_current_row_) / tiles_per_row; - uint64_t remainder_of_tiles = (num - tiles_in_current_row_) % tiles_per_row; + uint64_t remainder_of_tiles = 0; + bool moved_row = true; + + if (rows_of_tiles_to_write == 0) { + remainder_of_tiles += num; + moved_row = false; + } else { + remainder_of_tiles = (num - tiles_in_current_row_) % tiles_per_row; + } tiles_in_current_row_ = remainder_of_tiles; // If we have not written a full row and we have reached the end of the @@ -1516,7 +1524,8 @@ NDRange GlobalOrderWriter::ndranges_after_split( } uint64_t end = start_ + (rows_of_tiles_to_write * tile_extent) - 1; - if (tiles_in_current_row_ != 0 && !reached_end_of_fragment && end < end_) { + if (tiles_in_current_row_ != 0 && !reached_end_of_fragment && end < end_ && + moved_row) { end++; } From 0184785fe0c7d554c64bd462e39c3bf96071df13 Mon Sep 17 00:00:00 2001 From: Dimitris Staratzis <33267511+DimitrisStaratzis@users.noreply.github.com> Date: Wed, 28 Aug 2024 18:09:24 +0300 Subject: [PATCH 12/14] add comments --- tiledb/sm/query/writers/global_order_writer.cc | 1 + 1 file changed, 1 insertion(+) diff --git a/tiledb/sm/query/writers/global_order_writer.cc b/tiledb/sm/query/writers/global_order_writer.cc index fb0dfb0b242..1d54c7eaf90 100644 --- a/tiledb/sm/query/writers/global_order_writer.cc +++ b/tiledb/sm/query/writers/global_order_writer.cc @@ -1524,6 +1524,7 @@ NDRange GlobalOrderWriter::ndranges_after_split( } uint64_t end = start_ + (rows_of_tiles_to_write * tile_extent) - 1; + // if there is a remainder it means we need one more row if (tiles_in_current_row_ != 0 && !reached_end_of_fragment && end < end_ && moved_row) { end++; From 86fd805950569e4ca456702f69bca4ac5128c4ed Mon Sep 17 00:00:00 2001 From: Dimitris Staratzis <33267511+DimitrisStaratzis@users.noreply.github.com> Date: Mon, 2 Sep 2024 20:52:38 +0300 Subject: [PATCH 13/14] fully working with multi loop writes +signed dims support --- test/src/unit-cppapi-max-fragment-size.cc | 15 +++ tiledb/sm/enums/datatype.h | 7 ++ .../sm/query/writers/global_order_writer.cc | 113 +++++++++++++----- tiledb/sm/query/writers/global_order_writer.h | 40 ++++++- 4 files changed, 140 insertions(+), 35 deletions(-) diff --git a/test/src/unit-cppapi-max-fragment-size.cc b/test/src/unit-cppapi-max-fragment-size.cc index 019f3b75d71..d1fc02f77ea 100644 --- a/test/src/unit-cppapi-max-fragment-size.cc +++ b/test/src/unit-cppapi-max-fragment-size.cc @@ -519,6 +519,15 @@ TEST_CASE( cleanup(); + bool more_than_one_loop = false; + SECTION("More than one loop write") { + more_than_one_loop = true; + } + + SECTION("One loop write") { + more_than_one_loop = false; + } + // Remove the array at the end of this test. ScopedExecutor deferred(cleanup); @@ -573,6 +582,11 @@ TEST_CASE( // the creation of two new fragments. tiledb::Config cfg; cfg.set("sm.consolidation.max_fragment_size", "150"); + cfg.set("sm.consolidation.buffer_size", "10000"); // speed up consolidation + if (more_than_one_loop) { + cfg.set("sm.consolidation.buffer_size", "10"); + } + ctx = Context(cfg); Array::consolidate(ctx, array_name); Array::vacuum(ctx, array_name); @@ -664,6 +678,7 @@ TEST_CASE( // the creation of two new fragments. tiledb::Config cfg; cfg.set("sm.consolidation.max_fragment_size", "80"); + cfg.set("sm.consolidation.buffer_size", "10000"); // speed up consolidation ctx = Context(cfg); Array::consolidate(ctx, array_name); Array::vacuum(ctx, array_name); diff --git a/tiledb/sm/enums/datatype.h b/tiledb/sm/enums/datatype.h index 897766241da..ab05203fc05 100644 --- a/tiledb/sm/enums/datatype.h +++ b/tiledb/sm/enums/datatype.h @@ -338,6 +338,13 @@ inline bool datatype_is_string(Datatype type) { type == Datatype::STRING_UCS2 || type == Datatype::STRING_UCS4); } +/** Returns true if the input datatype is an unsigned type. */ +inline bool datatype_is_unsigned(Datatype type) { + return ( + type == Datatype::UINT8 || type == Datatype::UINT32 || + type == Datatype::UINT16 || type == Datatype::UINT64); +} + /** Returns true if the input datatype is an integer type. */ inline bool datatype_is_integer(Datatype type) { return ( diff --git a/tiledb/sm/query/writers/global_order_writer.cc b/tiledb/sm/query/writers/global_order_writer.cc index 1d54c7eaf90..8f4587971ca 100644 --- a/tiledb/sm/query/writers/global_order_writer.cc +++ b/tiledb/sm/query/writers/global_order_writer.cc @@ -96,9 +96,14 @@ GlobalOrderWriter::GlobalOrderWriter( , current_fragment_size_(0) , rows_written_(0) , tiles_in_current_row_(0) + , tiles_written_(0) + , tiles_since_last_split_(0) + , u_start_(0) , start_(0) + , u_end_(0) , end_(0) - , nd_if_dense_split_{} { + , nd_if_dense_split_{} + , dense_with_split_(false) { // Check the layout is global order. if (layout_ != Layout::GLOBAL_ORDER) { throw GlobalOrderWriterException( @@ -787,23 +792,26 @@ Status GlobalOrderWriter::global_write() { // Compute the number of tiles that will fit in this fragment. auto num = num_tiles_to_write(idx, tile_num, tiles); - - if (array_schema_.array_type() == - ArrayType::DENSE) { //&& this is consolidation - // if it is a dense array and not all tiles can fit in the current - // fragment then we need to split the domain, otherwise if all tiles can - // fit it means that we are in the middle of a write - nd_if_dense_split_ = ndranges_after_split(num, tile_num != num); + bool is_dense = array_schema_.array_type() == ArrayType::DENSE; + bool is_last_range = false; + + if (is_dense && disable_checks_consolidation_) { + // if it is a dense array during consolidation and not all tiles can fit + // in the current fragment then we need to split the domain, otherwise if + // all tiles can fit it means that we are in the middle of a write + nd_if_dense_split_ = ndranges_after_split(num, tile_num, is_last_range); } - if (tile_num != num && !nd_if_dense_split_.empty()) { + if ((!nd_if_dense_split_.empty() && num != tile_num) || + (is_last_range && dense_with_split_)) { frag_meta->init_domain(nd_if_dense_split_); + dense_with_split_ = true; } // If we're resuming a fragment write and the first tile doesn't fit into // the previous fragment, we need to start a new fragment and recalculate // the number of tiles to write. - if (current_fragment_size_ > 0 && num == 0) { + if (current_fragment_size_ > 0 && num == 0 && !dense_with_split_) { RETURN_CANCEL_OR_ERROR(start_new_fragment()); num = num_tiles_to_write(idx, tile_num, tiles); } @@ -1462,33 +1470,47 @@ uint64_t GlobalOrderWriter::num_tiles_per_row(const Domain& domain) { } NDRange GlobalOrderWriter::ndranges_after_split( - uint64_t num, bool reached_end_of_fragment) { + uint64_t num, uint64_t tile_num, bool& is_last_range) { // Expand domain to full tiles + bool reached_end_of_fragment = tile_num != num; auto& domain{array_schema_.domain()}; if (disable_checks_consolidation_) { auto expanded_subarray = subarray_.ndrange(0); domain.expand_to_tiles(&expanded_subarray); } + tiles_written_ += num; + tiles_since_last_split_ += num; + // Calculate how many tiles each row can hold uint64_t tiles_per_row = num_tiles_per_row(domain); // Calculate how many rows we will write in the current fragment - uint64_t rows_of_tiles_to_write = - (num - tiles_in_current_row_) / tiles_per_row; + uint64_t rows_of_tiles_to_write = 0; + + if (num != 0) { + rows_of_tiles_to_write = (num - tiles_in_current_row_) / tiles_per_row; + } + + // If we have not written a full row and we have reached the end of the + // fragment abort + + // set vars uint64_t remainder_of_tiles = 0; - bool moved_row = true; + // Calculate how many tiles we have in the current row if (rows_of_tiles_to_write == 0) { remainder_of_tiles += num; - moved_row = false; } else { remainder_of_tiles = (num - tiles_in_current_row_) % tiles_per_row; } - tiles_in_current_row_ = remainder_of_tiles; + tiles_in_current_row_ += remainder_of_tiles; - // If we have not written a full row and we have reached the end of the - // fragment abort + if (tiles_in_current_row_ == tiles_per_row) { + tiles_in_current_row_ = 0; + } + + // If we have finished the write in the middle of the row, throw if (tiles_in_current_row_ != 0 && reached_end_of_fragment) { throw GlobalOrderWriterException( "The target fragment size cannot be achieved. Please try using a " @@ -1513,25 +1535,41 @@ NDRange GlobalOrderWriter::ndranges_after_split( if (rows_written_ == 0) { // It means that the start has not been set yet. Set it to the minimum value // of the expanded domain for that dim - auto ll = [&](auto T) { - auto dim_dom_data = (const decltype(T)*)dim_dom.data(); - // todo this should be unsigned or signed - return static_cast(dim_dom_data[0]); + auto ll = [&](auto T, size_t index) { + // Return a pair. The domain can be signed or unsigned + auto dim_dom_data = dim_dom.typed_data()[index]; + int64_t ret_s = static_cast(dim_dom_data); + uint64_t ret_u = static_cast(dim_dom_data); + return std::make_pair(ret_s, ret_u); }; - start_ = apply_with_type(ll, dim->type()); - end_ = apply_with_type(ll, dim->type()); + // based on whether the dim is signed or unsigned assign the proper vars + if (datatype_is_unsigned(dim->type())) { + u_start_ = apply_with_type(ll, dim->type(), 0).second; + u_end_ = apply_with_type(ll, dim->type(), 1).second; + } else { + start_ = apply_with_type(ll, dim->type(), 0).first; + end_ = apply_with_type(ll, dim->type(), 1).first; + } } - uint64_t end = start_ + (rows_of_tiles_to_write * tile_extent) - 1; - // if there is a remainder it means we need one more row - if (tiles_in_current_row_ != 0 && !reached_end_of_fragment && end < end_ && - moved_row) { - end++; + // Use 'auto' to temporarily use the cached signed or unsigned start_ and end_ + // values + auto start_to_use = datatype_is_unsigned(dim->type()) ? u_start_ : start_; + auto end_to_use = datatype_is_unsigned(dim->type()) ? u_end_ : end_; + + auto end = + start_to_use + ((tiles_since_last_split_ / tiles_per_row) * tile_extent); + if (tiles_since_last_split_ % tiles_per_row == 0 && end != start_to_use) { + // We are at the finish of the row, subtract 1 from the end so that + // we dont go to the next range + end--; } + rows_written_ = tiles_written_ / tiles_per_row; + // Add range - Range range(&start_, &end, sizeof(int)); + Range range(&start_to_use, &end, sizeof(int)); nd.emplace_back(range); // For the rest of the dims, use their domains as ranges. No split there. @@ -1543,8 +1581,19 @@ NDRange GlobalOrderWriter::ndranges_after_split( } // add rows written to the cache - rows_written_ += rows_of_tiles_to_write; - start_ = end + 1; + if (tile_num != num) { + if (datatype_is_unsigned(dim->type())) { + u_start_ = end + 1; + end = u_start_; + } else { + start_ = end + 1; + end = start_; + } + + tiles_since_last_split_ = 0; + } + + is_last_range = end == end_to_use; return nd; } diff --git a/tiledb/sm/query/writers/global_order_writer.h b/tiledb/sm/query/writers/global_order_writer.h index 2f0462c5362..5818296da91 100644 --- a/tiledb/sm/query/writers/global_order_writer.h +++ b/tiledb/sm/query/writers/global_order_writer.h @@ -227,23 +227,56 @@ class GlobalOrderWriter : public WriterBase { */ uint64_t tiles_in_current_row_; + /** + * The total number of tiles written. It is only being used when consolidating + * Dense arrays where the result can not fit into one fragment only + */ + uint64_t tiles_written_; + + /** + * The number of tiles written since the last Dense domain split. It is only + * being used when consolidating Dense arrays where the result can not fit + * into one fragment only + */ + uint64_t tiles_since_last_split_; + + /** + * This is the unsigned start for the dim range in case we need to split in + * multiple fragments in Dense arrays + */ + uint64_t u_start_; + /** * This is the start for the dim range in case we need to split in multiple * fragments in Dense arrays */ - uint64_t start_; + int64_t start_; + + /** + * This is the unsigned start for the dim range in case we need to split in + * multiple fragments in Dense arrays + */ + uint64_t u_end_; /** * This is the start for the dim range in case we need to split in multiple * fragments in Dense arrays */ - uint64_t end_; + int64_t end_; /** * NDRange in case we have a dense consolidation with split */ NDRange nd_if_dense_split_; + /** + * True if we have made at least one split in the Dense consolidation. By + * split we mean a fragment split so the result of the consolidation if we + * have n fragments is not n+1 but n+m where m is the number of fragments + * created after consolidation + */ + bool dense_with_split_; + /* ********************************* */ /* PRIVATE METHODS */ /* ********************************* */ @@ -422,7 +455,8 @@ class GlobalOrderWriter : public WriterBase { * current frag * */ - NDRange ndranges_after_split(uint64_t num, bool reached_end_of_fragment); + NDRange ndranges_after_split( + uint64_t num, uint64_t tile_num, bool& is_last_range); /** * Return the number of tiles a single row can hold. More specifically, the From d6dfd6e596eb2debde8e0bbb6850c8462e5dd77e Mon Sep 17 00:00:00 2001 From: Dimitris Staratzis <33267511+DimitrisStaratzis@users.noreply.github.com> Date: Wed, 4 Sep 2024 13:42:09 +0300 Subject: [PATCH 14/14] add more tests --- test/src/unit-cppapi-max-fragment-size.cc | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/test/src/unit-cppapi-max-fragment-size.cc b/test/src/unit-cppapi-max-fragment-size.cc index d1fc02f77ea..a4dfc5f95c5 100644 --- a/test/src/unit-cppapi-max-fragment-size.cc +++ b/test/src/unit-cppapi-max-fragment-size.cc @@ -518,14 +518,18 @@ TEST_CASE( }; cleanup(); + std::string cons_buffer_size; - bool more_than_one_loop = false; - SECTION("More than one loop write") { - more_than_one_loop = true; + SECTION("More than one loop write, one row or more at the time") { + cons_buffer_size = "30"; + } + + SECTION("More than one loop write, one or two tiles at the time") { + cons_buffer_size = "10"; } SECTION("One loop write") { - more_than_one_loop = false; + cons_buffer_size = "10000"; // needed only to speed up consolidation } // Remove the array at the end of this test. @@ -582,10 +586,7 @@ TEST_CASE( // the creation of two new fragments. tiledb::Config cfg; cfg.set("sm.consolidation.max_fragment_size", "150"); - cfg.set("sm.consolidation.buffer_size", "10000"); // speed up consolidation - if (more_than_one_loop) { - cfg.set("sm.consolidation.buffer_size", "10"); - } + cfg.set("sm.consolidation.buffer_size", cons_buffer_size); ctx = Context(cfg); Array::consolidate(ctx, array_name);