diff --git a/test/src/unit-cppapi-max-fragment-size.cc b/test/src/unit-cppapi-max-fragment-size.cc index dd79e638fe4..a4dfc5f95c5 100644 --- a/test/src/unit-cppapi-max-fragment-size.cc +++ b/test/src/unit-cppapi-max-fragment-size.cc @@ -503,3 +503,203 @@ TEST_CASE( array.close(); } + +TEST_CASE( + "Setting max_fragment_size in Dense consolidation", + "[global-order-writer][max-frag-size-consolidation]") { + std::string array_name = "cpp_max_fragment_size_bug"; + Context ctx; + + auto cleanup = [&]() { + auto obj = Object::object(ctx, array_name); + if (obj.type() == Object::Type::Array) { + Object::remove(ctx, array_name); + } + }; + + cleanup(); + std::string cons_buffer_size; + + SECTION("More than one loop write, one row or more at the time") { + cons_buffer_size = "30"; + } + + SECTION("More than one loop write, one or two tiles at the time") { + cons_buffer_size = "10"; + } + + SECTION("One loop write") { + cons_buffer_size = "10000"; // needed only to speed up consolidation + } + + // Remove the array at the end of this test. + ScopedExecutor deferred(cleanup); + + // Create an array with exactly 9 tiles and tile extend 1 + Domain domain(ctx); + ArraySchema schema(ctx, TILEDB_DENSE); + auto d1 = tiledb::Dimension::create(ctx, "d1", {{0, 2}}, 1); + auto d2 = tiledb::Dimension::create(ctx, "d2", {{0, 2}}, 1); + domain.add_dimension(d1); + domain.add_dimension(d2); + + auto a1 = tiledb::Attribute::create(ctx, "a"); + schema.add_attribute(a1); + + schema.set_order({{TILEDB_ROW_MAJOR, TILEDB_ROW_MAJOR}}); + schema.set_domain(domain); + + Array::create(array_name, schema); + + // Populate array with data from 1 to 9 + int value = 0; + for (int i = 0; i < 3; i++) { + Array array(ctx, array_name, TILEDB_WRITE); + Query query(ctx, array); + query.set_layout(TILEDB_ROW_MAJOR); + tiledb::Subarray sub(ctx, array); + sub.set_subarray({i, i, 0, 2}); + query.set_subarray(sub); + std::vector data = {++value, ++value, ++value}; + query.set_data_buffer("a", data); + query.submit(); + array.close(); + } + + // Read data to validate write and num of fragments. + CHECK(tiledb::test::num_fragments(array_name) == 3); + + Array array(ctx, array_name, TILEDB_READ); + tiledb::Subarray sub(ctx, array); + sub.set_subarray({0, 2, 0, 2}); + std::vector a(9); + Query query(ctx, array, TILEDB_READ); + query.set_subarray(sub).set_layout(TILEDB_ROW_MAJOR).set_data_buffer("a", a); + query.submit(); + array.close(); + + for (int i = 0; i < 9; i++) { + CHECK(a[i] == i + 1); + } + + // Consolidate with a size limitation for the fragment. This will result in + // the creation of two new fragments. + tiledb::Config cfg; + cfg.set("sm.consolidation.max_fragment_size", "150"); + cfg.set("sm.consolidation.buffer_size", cons_buffer_size); + + ctx = Context(cfg); + Array::consolidate(ctx, array_name); + Array::vacuum(ctx, array_name); + + // Check that we now have 2 fragments instead of 3 + CHECK(tiledb::test::num_fragments(array_name) == 2); + + // Read data to validate correctness + Array array2(ctx, array_name, TILEDB_READ); + tiledb::Subarray sub2(ctx, array2); + sub2.set_subarray({0, 2, 0, 2}); + std::vector a2(9); + Query query2(ctx, array2, TILEDB_READ); + query2.set_subarray(sub2) + .set_layout(TILEDB_ROW_MAJOR) + .set_data_buffer("a", a2); + query2.submit(); + array2.close(); + + for (int i = 0; i < 9; i++) { + CHECK(a2[i] == i + 1); + } +} + +TEST_CASE( + "Setting max_fragment_size in Dense consolidation one dim", + "[global-order-writer][max-frag-size-consolidation]") { + std::string array_name = "cpp_max_fragment_size_bug"; + Context ctx; + + auto cleanup = [&]() { + auto obj = Object::object(ctx, array_name); + if (obj.type() == Object::Type::Array) { + Object::remove(ctx, array_name); + } + }; + + cleanup(); + + // Remove the array at the end of this test. + ScopedExecutor deferred(cleanup); + + // Create an array with exactly 9 tiles and tile extend 1 + Domain domain(ctx); + ArraySchema schema(ctx, TILEDB_DENSE); + auto d1 = tiledb::Dimension::create(ctx, "d1", {{1, 9}}, 3); + domain.add_dimension(d1); + + auto a1 = tiledb::Attribute::create(ctx, "a"); + schema.add_attribute(a1); + + schema.set_order({{TILEDB_ROW_MAJOR, TILEDB_ROW_MAJOR}}); + schema.set_domain(domain); + + Array::create(array_name, schema); + + // Populate array with data from 1 to 9 + int value = 0; + for (int i = 1; i < 10; i += 3) { + Array array(ctx, array_name, TILEDB_WRITE); + Query query(ctx, array); + query.set_layout(TILEDB_ROW_MAJOR); + tiledb::Subarray sub(ctx, array); + sub.set_subarray({i, i + 2}); + query.set_subarray(sub); + std::vector data = {++value, ++value, ++value}; + query.set_data_buffer("a", data); + query.submit(); + array.close(); + } + + // Read data to validate write and num of fragments. + // CHECK(tiledb::test::num_fragments(array_name) == 3); + + Array array(ctx, array_name, TILEDB_READ); + tiledb::Subarray sub(ctx, array); + sub.set_subarray({1, 9}); + std::vector a(9); + Query query(ctx, array, TILEDB_READ); + query.set_subarray(sub).set_layout(TILEDB_ROW_MAJOR).set_data_buffer("a", a); + query.submit(); + array.close(); + + for (int i = 0; i < 9; i++) { + CHECK(a[i] == i + 1); + } + + // Consolidate with a size limitation for the fragment. This will result in + // the creation of two new fragments. + tiledb::Config cfg; + cfg.set("sm.consolidation.max_fragment_size", "80"); + cfg.set("sm.consolidation.buffer_size", "10000"); // speed up consolidation + ctx = Context(cfg); + Array::consolidate(ctx, array_name); + Array::vacuum(ctx, array_name); + + // Check that we now have 2 fragments instead of 3 + CHECK(tiledb::test::num_fragments(array_name) == 2); + + // Read data to validate correctness + Array array2(ctx, array_name, TILEDB_READ); + tiledb::Subarray sub2(ctx, array2); + sub2.set_subarray({1, 9}); + std::vector a2(9); + Query query2(ctx, array2, TILEDB_READ); + query2.set_subarray(sub2) + .set_layout(TILEDB_ROW_MAJOR) + .set_data_buffer("a", a2); + query2.submit(); + array2.close(); + + for (int i = 0; i < 9; i++) { + CHECK(a2[i] == i + 1); + } +} diff --git a/tiledb/sm/enums/datatype.h b/tiledb/sm/enums/datatype.h index 897766241da..ab05203fc05 100644 --- a/tiledb/sm/enums/datatype.h +++ b/tiledb/sm/enums/datatype.h @@ -338,6 +338,13 @@ inline bool datatype_is_string(Datatype type) { type == Datatype::STRING_UCS2 || type == Datatype::STRING_UCS4); } +/** Returns true if the input datatype is an unsigned type. */ +inline bool datatype_is_unsigned(Datatype type) { + return ( + type == Datatype::UINT8 || type == Datatype::UINT32 || + type == Datatype::UINT16 || type == Datatype::UINT64); +} + /** Returns true if the input datatype is an integer type. */ inline bool datatype_is_integer(Datatype type) { return ( diff --git a/tiledb/sm/fragment/fragment_metadata.cc b/tiledb/sm/fragment/fragment_metadata.cc index 236cb1e8de4..1e8fa05ae21 100644 --- a/tiledb/sm/fragment/fragment_metadata.cc +++ b/tiledb/sm/fragment/fragment_metadata.cc @@ -695,8 +695,8 @@ void FragmentMetadata::init_domain(const NDRange& non_empty_domain) { // Sanity check assert(!non_empty_domain.empty()); - assert(non_empty_domain_.empty()); - assert(domain_.empty()); + // assert(non_empty_domain_.empty()); todo, this might cause problems + // assert(domain_.empty()); // Set non-empty domain for dense arrays (for sparse it will be calculated // via the MBRs) diff --git a/tiledb/sm/query/writers/global_order_writer.cc b/tiledb/sm/query/writers/global_order_writer.cc index fccf6754494..8f4587971ca 100644 --- a/tiledb/sm/query/writers/global_order_writer.cc +++ b/tiledb/sm/query/writers/global_order_writer.cc @@ -38,6 +38,7 @@ #include "tiledb/sm/array_schema/array_schema.h" #include "tiledb/sm/array_schema/dimension.h" #include "tiledb/sm/consolidator/consolidator.h" +#include "tiledb/sm/enums/array_type.h" #include "tiledb/sm/fragment/fragment_metadata.h" #include "tiledb/sm/misc/comparators.h" #include "tiledb/sm/misc/hilbert.h" @@ -51,6 +52,7 @@ #include "tiledb/sm/tile/tile_metadata_generator.h" #include "tiledb/sm/tile/writer_tile_tuple.h" #include "tiledb/storage_format/uri/generate_uri.h" +#include "tiledb/type/apply_with_type.h" using namespace tiledb; using namespace tiledb::common; @@ -91,7 +93,17 @@ GlobalOrderWriter::GlobalOrderWriter( fragment_name) , processed_conditions_(processed_conditions) , fragment_size_(fragment_size) - , current_fragment_size_(0) { + , current_fragment_size_(0) + , rows_written_(0) + , tiles_in_current_row_(0) + , tiles_written_(0) + , tiles_since_last_split_(0) + , u_start_(0) + , start_(0) + , u_end_(0) + , end_(0) + , nd_if_dense_split_{} + , dense_with_split_(false) { // Check the layout is global order. if (layout_ != Layout::GLOBAL_ORDER) { throw GlobalOrderWriterException( @@ -780,11 +792,26 @@ Status GlobalOrderWriter::global_write() { // Compute the number of tiles that will fit in this fragment. auto num = num_tiles_to_write(idx, tile_num, tiles); + bool is_dense = array_schema_.array_type() == ArrayType::DENSE; + bool is_last_range = false; + + if (is_dense && disable_checks_consolidation_) { + // if it is a dense array during consolidation and not all tiles can fit + // in the current fragment then we need to split the domain, otherwise if + // all tiles can fit it means that we are in the middle of a write + nd_if_dense_split_ = ndranges_after_split(num, tile_num, is_last_range); + } + + if ((!nd_if_dense_split_.empty() && num != tile_num) || + (is_last_range && dense_with_split_)) { + frag_meta->init_domain(nd_if_dense_split_); + dense_with_split_ = true; + } // If we're resuming a fragment write and the first tile doesn't fit into // the previous fragment, we need to start a new fragment and recalculate // the number of tiles to write. - if (current_fragment_size_ > 0 && num == 0) { + if (current_fragment_size_ > 0 && num == 0 && !dense_with_split_) { RETURN_CANCEL_OR_ERROR(start_new_fragment()); num = num_tiles_to_write(idx, tile_num, tiles); } @@ -1422,6 +1449,155 @@ uint64_t GlobalOrderWriter::num_tiles_to_write( return tile_num - start; } +uint64_t GlobalOrderWriter::num_tiles_per_row(const Domain& domain) { + auto dim_num = domain.dim_num(); + uint64_t ret = 1; + + for (unsigned d = 1; d < dim_num; ++d) { + // Skip first dim. We want to calculate how many tiles can fit in one row. + // To do that we skip the first dim and multiply the range / extend of the + // other dimensions + auto dim{domain.dimension_ptr(d)}; + auto dim_dom = dim->domain(); + auto l = [&](auto T) { + return static_cast(dim->tile_extent().rvalue_as()); + }; + + ret *= dim->domain_range(dim_dom) / apply_with_type(l, dim->type()); + // todo consider cases where the above calculation has a remainder. + } + return ret; +} + +NDRange GlobalOrderWriter::ndranges_after_split( + uint64_t num, uint64_t tile_num, bool& is_last_range) { + // Expand domain to full tiles + bool reached_end_of_fragment = tile_num != num; + auto& domain{array_schema_.domain()}; + if (disable_checks_consolidation_) { + auto expanded_subarray = subarray_.ndrange(0); + domain.expand_to_tiles(&expanded_subarray); + } + + tiles_written_ += num; + tiles_since_last_split_ += num; + + // Calculate how many tiles each row can hold + uint64_t tiles_per_row = num_tiles_per_row(domain); + + // Calculate how many rows we will write in the current fragment + uint64_t rows_of_tiles_to_write = 0; + + if (num != 0) { + rows_of_tiles_to_write = (num - tiles_in_current_row_) / tiles_per_row; + } + + // If we have not written a full row and we have reached the end of the + // fragment abort + + // set vars + uint64_t remainder_of_tiles = 0; + + // Calculate how many tiles we have in the current row + if (rows_of_tiles_to_write == 0) { + remainder_of_tiles += num; + } else { + remainder_of_tiles = (num - tiles_in_current_row_) % tiles_per_row; + } + tiles_in_current_row_ += remainder_of_tiles; + + if (tiles_in_current_row_ == tiles_per_row) { + tiles_in_current_row_ = 0; + } + + // If we have finished the write in the middle of the row, throw + if (tiles_in_current_row_ != 0 && reached_end_of_fragment) { + throw GlobalOrderWriterException( + "The target fragment size cannot be achieved. Please try using a " + "different size, or there might be a misconfiguration in the array " + "schema."); + } + + // Create NDRange object and reserve for dims + auto dim_num = domain.dim_num(); + NDRange nd; + nd.reserve(dim_num); + + // Calculate the range for the index dim (first). + auto dim{domain.dimension_ptr(0)}; + auto dim_dom = dim->domain(); + auto l = [&](auto T) { + return static_cast(dim->tile_extent().rvalue_as()); + }; + uint64_t tile_extent = apply_with_type(l, dim->type()); + + // Calculate start and end + if (rows_written_ == 0) { + // It means that the start has not been set yet. Set it to the minimum value + // of the expanded domain for that dim + auto ll = [&](auto T, size_t index) { + // Return a pair. The domain can be signed or unsigned + auto dim_dom_data = dim_dom.typed_data()[index]; + int64_t ret_s = static_cast(dim_dom_data); + uint64_t ret_u = static_cast(dim_dom_data); + return std::make_pair(ret_s, ret_u); + }; + + // based on whether the dim is signed or unsigned assign the proper vars + if (datatype_is_unsigned(dim->type())) { + u_start_ = apply_with_type(ll, dim->type(), 0).second; + u_end_ = apply_with_type(ll, dim->type(), 1).second; + } else { + start_ = apply_with_type(ll, dim->type(), 0).first; + end_ = apply_with_type(ll, dim->type(), 1).first; + } + } + + // Use 'auto' to temporarily use the cached signed or unsigned start_ and end_ + // values + auto start_to_use = datatype_is_unsigned(dim->type()) ? u_start_ : start_; + auto end_to_use = datatype_is_unsigned(dim->type()) ? u_end_ : end_; + + auto end = + start_to_use + ((tiles_since_last_split_ / tiles_per_row) * tile_extent); + if (tiles_since_last_split_ % tiles_per_row == 0 && end != start_to_use) { + // We are at the finish of the row, subtract 1 from the end so that + // we dont go to the next range + end--; + } + + rows_written_ = tiles_written_ / tiles_per_row; + + // Add range + Range range(&start_to_use, &end, sizeof(int)); + nd.emplace_back(range); + + // For the rest of the dims, use their domains as ranges. No split there. + for (unsigned d = 1; d < dim_num; ++d) { + // begin from second dim + auto dim{array_schema_.dimension_ptr(d)}; + auto dim_dom = dim->domain(); + nd.emplace_back(dim_dom); + } + + // add rows written to the cache + if (tile_num != num) { + if (datatype_is_unsigned(dim->type())) { + u_start_ = end + 1; + end = u_start_; + } else { + start_ = end + 1; + end = start_; + } + + tiles_since_last_split_ = 0; + } + + is_last_range = end == end_to_use; + + return nd; +} + Status GlobalOrderWriter::start_new_fragment() { auto frag_meta = global_write_state_->frag_meta_; auto& uri = frag_meta->fragment_uri(); diff --git a/tiledb/sm/query/writers/global_order_writer.h b/tiledb/sm/query/writers/global_order_writer.h index c15b81f67c8..5818296da91 100644 --- a/tiledb/sm/query/writers/global_order_writer.h +++ b/tiledb/sm/query/writers/global_order_writer.h @@ -215,6 +215,68 @@ class GlobalOrderWriter : public WriterBase { */ uint64_t current_fragment_size_; + /** + * Counter for the number of rows written. This is used only when the + * consolidation produces more than one fragment in Dense arrays + */ + uint64_t rows_written_; + + /** + * Counter for the number of tiles in the current row written. This is used + * only when the consolidation produces more than one fragment in Dense arrays + */ + uint64_t tiles_in_current_row_; + + /** + * The total number of tiles written. It is only being used when consolidating + * Dense arrays where the result can not fit into one fragment only + */ + uint64_t tiles_written_; + + /** + * The number of tiles written since the last Dense domain split. It is only + * being used when consolidating Dense arrays where the result can not fit + * into one fragment only + */ + uint64_t tiles_since_last_split_; + + /** + * This is the unsigned start for the dim range in case we need to split in + * multiple fragments in Dense arrays + */ + uint64_t u_start_; + + /** + * This is the start for the dim range in case we need to split in multiple + * fragments in Dense arrays + */ + int64_t start_; + + /** + * This is the unsigned start for the dim range in case we need to split in + * multiple fragments in Dense arrays + */ + uint64_t u_end_; + + /** + * This is the start for the dim range in case we need to split in multiple + * fragments in Dense arrays + */ + int64_t end_; + + /** + * NDRange in case we have a dense consolidation with split + */ + NDRange nd_if_dense_split_; + + /** + * True if we have made at least one split in the Dense consolidation. By + * split we mean a fragment split so the result of the consolidation if we + * have n fragments is not n+1 but n+m where m is the number of fragments + * created after consolidation + */ + bool dense_with_split_; + /* ********************************* */ /* PRIVATE METHODS */ /* ********************************* */ @@ -385,6 +447,27 @@ class GlobalOrderWriter : public WriterBase { uint64_t tile_num, tdb::pmr::unordered_map& tiles); + /** + * Create new ndranges by splitting the first dimension based on the number of + * tiles we need to write + * @param num The number of tiles we need to write. + * @param reached_end_of_fragment True if we have reached the end of the + * current frag + * + */ + NDRange ndranges_after_split( + uint64_t num, uint64_t tile_num, bool& is_last_range); + + /** + * Return the number of tiles a single row can hold. More specifically, the + * number of tiles all dimensions except the first can hold. + * + * @param domain The domain + * + * @return Number of tiles. + */ + uint64_t num_tiles_per_row(const Domain& domain); + /** * Close the current fragment and start a new one. The closed fragment will * be added to `frag_uris_to_commit_` so that all fragments in progress can