Skip to content

Commit

Permalink
work with multiple global_write() calls
Browse files Browse the repository at this point in the history
  • Loading branch information
DimitrisStaratzis committed Aug 27, 2024
1 parent 73a7fda commit 737d8e8
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 11 deletions.
39 changes: 29 additions & 10 deletions tiledb/sm/query/writers/global_order_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,10 @@ GlobalOrderWriter::GlobalOrderWriter(
, fragment_size_(fragment_size)
, current_fragment_size_(0)
, rows_written_(0)
, start_(0) {
, tiles_in_current_row_(0)
, start_(0)
, end_(0)
, nd_if_dense_split_{} {
// Check the layout is global order.
if (layout_ != Layout::GLOBAL_ORDER) {
throw GlobalOrderWriterException(
Expand Down Expand Up @@ -786,11 +789,16 @@ Status GlobalOrderWriter::global_write() {
// Compute the number of tiles that will fit in this fragment.
auto num = num_tiles_to_write(idx, tile_num, tiles);

if (tile_num != num && array_schema_.array_type() == ArrayType::DENSE) {
if (array_schema_.array_type() ==
ArrayType::DENSE) { //&& this is consolidation
// if it is a dense array and not all tiles can fit in the current
// fragment then we need to split the domain
NDRange new_nd = ndranges_after_split(num);
frag_meta->init_domain(new_nd);
// fragment then we need to split the domain, otherwise if all tiles can
// fit it means that we are in the middle of a write
nd_if_dense_split_ = ndranges_after_split(num, tile_num != num);
}

if (tile_num != num && !nd_if_dense_split_.empty()) {
frag_meta->init_domain(nd_if_dense_split_);
}

// If we're resuming a fragment write and the first tile doesn't fit into
Expand Down Expand Up @@ -1454,7 +1462,8 @@ uint64_t GlobalOrderWriter::num_tiles_per_row(const Domain& domain) {
return ret;
}

NDRange GlobalOrderWriter::ndranges_after_split(uint64_t num) {
NDRange GlobalOrderWriter::ndranges_after_split(
uint64_t num, bool reached_end_of_fragment) {
// Expand domain to full tiles
auto& domain{array_schema_.domain()};
if (disable_checks_consolidation_) {
Expand All @@ -1465,16 +1474,21 @@ NDRange GlobalOrderWriter::ndranges_after_split(uint64_t num) {
// Calculate how many tiles each row can hold
uint64_t tiles_per_row = num_tiles_per_row(domain);

if (num % tiles_per_row != 0) {
// Calculate how many rows we will write in the current fragment
uint64_t rows_of_tiles_to_write =
(num - tiles_in_current_row_) / tiles_per_row;
uint64_t remainder_of_tiles = (num - tiles_in_current_row_) % tiles_per_row;
tiles_in_current_row_ = remainder_of_tiles;

// If we have not written a full row and we have reached the end of the
// fragment abort
if (tiles_in_current_row_ != 0 && reached_end_of_fragment) {
throw GlobalOrderWriterException(
"The target fragment size cannot be achieved. Please try using a "
"different size, or there might be a misconfiguration in the array "
"schema.");
}

// Calculate how many rows we will write in the current fragment
uint64_t rows_of_tiles_to_write = num / tiles_per_row;

// Create NDRange object and reserve for dims
auto dim_num = domain.dim_num();
NDRange nd;
Expand All @@ -1499,9 +1513,14 @@ NDRange GlobalOrderWriter::ndranges_after_split(uint64_t num) {
};

start_ = apply_with_type(ll, dim->type());
end_ = apply_with_type(ll, dim->type());
}
uint64_t end = start_ + (rows_of_tiles_to_write * tile_extent) - 1;

if (tiles_in_current_row_ != 0 && !reached_end_of_fragment && end < end_) {
end++;
}

// Add range
Range range(&start_, &end, sizeof(int));
nd.emplace_back(range);
Expand Down
21 changes: 20 additions & 1 deletion tiledb/sm/query/writers/global_order_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -221,12 +221,29 @@ class GlobalOrderWriter : public WriterBase {
*/
uint64_t rows_written_;

/**
* Counter for the number of tiles in the current row written. This is used
* only when the consolidation produces more than one fragment in Dense arrays
*/
uint64_t tiles_in_current_row_;

/**
* This is the start for the dim range in case we need to split in multiple
* fragments in Dense arrays
*/
uint64_t start_;

/**
* This is the start for the dim range in case we need to split in multiple
* fragments in Dense arrays
*/
uint64_t end_;

/**
* NDRange in case we have a dense consolidation with split
*/
NDRange nd_if_dense_split_;

/* ********************************* */
/* PRIVATE METHODS */
/* ********************************* */
Expand Down Expand Up @@ -401,9 +418,11 @@ class GlobalOrderWriter : public WriterBase {
* Create new ndranges by splitting the first dimension based on the number of
* tiles we need to write
* @param num The number of tiles we need to write.
* @param reached_end_of_fragment True if we have reached the end of the
* current frag
*
*/
NDRange ndranges_after_split(uint64_t num);
NDRange ndranges_after_split(uint64_t num, bool reached_end_of_fragment);

/**
* Return the number of tiles a single row can hold. More specifically, the
Expand Down

0 comments on commit 737d8e8

Please sign in to comment.