Skip to content

Commit

Permalink
[Backport release-2.26] Dense reader fails early when tile offsets ar…
Browse files Browse the repository at this point in the history
…e too large. (#5310) (#5311)

Backport of #5310 to release-2.26.

---
TYPE: IMPROVEMENT
DESC: Dense reader fails early when tile offsets are too large.

---------

Co-authored-by: Luc Rancourt <[email protected]>
Co-authored-by: KiterLuc <[email protected]>
  • Loading branch information
3 people authored Sep 16, 2024
1 parent 983b716 commit e7e9dba
Show file tree
Hide file tree
Showing 6 changed files with 209 additions and 145 deletions.
40 changes: 38 additions & 2 deletions test/src/unit-dense-reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -889,6 +889,42 @@ TEST_CASE_METHOD(
"DenseReader: Memory budget is too small to open array");
}

TEST_CASE_METHOD(
CDenseFx,
"Dense reader: budget too small for tile offsets",
"[dense-reader][budget-too-small-tile-offsets]") {
// Create default array.
reset_config();
create_default_array_1d();

uint64_t num_frags = GENERATE(1, 2);

// Write some fragments.
int subarray[] = {1, NUM_CELLS};
std::vector<int> data(NUM_CELLS);
std::iota(data.begin(), data.end(), 1);
uint64_t data_size = data.size() * sizeof(int);
for (uint64_t f = 0; f < num_frags; f++) {
write_1d_fragment(subarray, data.data(), &data_size);
}

// Footer for a fragment is 390 bytes.
// Tile offsets for a fragment are 400 bytes.
// Tile upper memory limit is more than enough to load 40 bytes tiles.
total_budget_ = std::to_string(390 * num_frags + 200);
update_config();

// Try to read.
int data_r[NUM_CELLS] = {0};
uint64_t data_r_size = sizeof(data_r);
read(
subarray,
data_r,
&data_r_size,
0,
"DenseReader: Cannot load tile offsets, increase memory budget");
}

TEST_CASE_METHOD(
CDenseFx,
"Dense reader: tile budget exceeded, fixed attribute",
Expand Down Expand Up @@ -942,7 +978,7 @@ TEST_CASE_METHOD(
uint64_t data_size = data.size() * sizeof(int);
write_1d_fragment(subarray, data.data(), &data_size);

total_budget_ = "420";
total_budget_ = "804";
tile_upper_memory_limit_ = "50";
update_config();

Expand Down Expand Up @@ -1025,7 +1061,7 @@ TEST_CASE_METHOD(

// Each tiles are 91 and 100 bytes respectively, this will only allow to
// load one as the budget is split across two potential reads.
total_budget_ = "460";
total_budget_ = "840";
tile_upper_memory_limit_ = "210";
update_config();

Expand Down
22 changes: 22 additions & 0 deletions tiledb/sm/query/readers/dense_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ DenseReader::DenseReader(

// Check the validity buffer sizes.
check_validity_buffer_sizes();

// No dense dimensions can be var sized.
is_dim_var_size_.resize(array_schema_.dim_num(), false);
}

/* ****************************** */
Expand Down Expand Up @@ -310,6 +313,25 @@ Status DenseReader::dense_read() {
if (condition_.has_value()) {
qc_loaded_attr_names_set_ = condition_->field_names();
}
qc_loaded_attr_names_.clear();
qc_loaded_attr_names_.reserve(qc_loaded_attr_names_set_.size());
for (auto& name : qc_loaded_attr_names_set_) {
qc_loaded_attr_names_.emplace_back(name);
}

// Load per fragment tile offsets memory usage.
per_frag_tile_offsets_usage_ = tile_offset_sizes();

// Compute total tile offsets sizes.
auto total_tile_offsets_sizes = std::accumulate(
per_frag_tile_offsets_usage_.begin(),
per_frag_tile_offsets_usage_.end(),
static_cast<uint64_t>(0));
if (total_tile_offsets_sizes >
memory_budget_ - array_memory_tracker_->get_memory_usage()) {
throw DenseReaderException(
"Cannot load tile offsets, increase memory budget");
}

auto&& [names, var_names] = field_names_to_process(qc_loaded_attr_names_set_);

Expand Down
130 changes: 130 additions & 0 deletions tiledb/sm/query/readers/reader_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ ReaderBase::ReaderBase(
, memory_tracker_(params.query_memory_tracker())
, condition_(params.condition())
, user_requested_timestamps_(false)
, deletes_consolidation_no_purge_(
buffers_.count(constants::delete_timestamps) != 0)
, use_timestamps_(false)
, initial_data_loaded_(false)
, max_batch_size_(config_.get<uint64_t>("vfs.max_batch_size").value())
Expand Down Expand Up @@ -145,6 +147,134 @@ bool ReaderBase::skip_field(
/* PROTECTED METHODS */
/* ****************************** */

std::vector<uint64_t> ReaderBase::tile_offset_sizes() {
auto timer_se = stats_->start_timer("tile_offset_sizes");

// For easy reference.
std::vector<uint64_t> ret(fragment_metadata_.size());
const auto dim_num = array_schema_.dim_num();

// Compute the size of tile offsets per fragments.
const auto relevant_fragments = subarray_.relevant_fragments();
throw_if_not_ok(parallel_for(
&resources_.compute_tp(), 0, relevant_fragments.size(), [&](uint64_t i) {
// For easy reference.
auto frag_idx = relevant_fragments[i];
auto& fragment = fragment_metadata_[frag_idx];
const auto& schema = fragment->array_schema();
const auto tile_num = fragment->tile_num();
const auto dense = schema->dense();

// Compute the number of dimensions/attributes requiring offsets.
uint64_t num = 0;

// For fragments with version smaller than 5 we have zipped coords.
// Otherwise we load each dimensions independently.
if (!dense) {
if (fragment->version() < 5) {
num = 1;
} else {
for (unsigned d = 0; d < dim_num; ++d) {
// Fixed tile (offsets or fixed data).
num++;

// If var size, we load var offsets and var tile sizes.
if (is_dim_var_size_[d]) {
num += 2;
}
}
}
}

// Process everything loaded for query condition.
for (auto& name : qc_loaded_attr_names_) {
// Not a member of array schema, this field was added in array
// schema evolution, ignore for this fragment's tile offsets.
// Also skip dimensions.
if (!schema->is_field(name) || schema->is_dim(name)) {
continue;
}

// Fixed tile (offsets or fixed data).
num++;

// If var size, we load var offsets and var tile sizes.
const auto attr = schema->attribute(name);
num += 2 * attr->var_size();

// If nullable, we load nullable offsets.
num += attr->nullable();
}

// Process everything loaded for user requested data.
for (auto& it : buffers_) {
const auto& name = it.first;

// Skip dimensions and attributes loaded by query condition as they
// are processed above. Special attributes (timestamps, delete
// timestamps, etc.) are processed below.
if (array_schema_.is_dim(name) || !schema->is_field(name) ||
qc_loaded_attr_names_set_.count(name) != 0 ||
schema->is_special_attribute(name)) {
continue;
}

// Fixed tile (offsets or fixed data).
num++;

// If var size, we load var offsets and var tile sizes.
const auto attr = schema->attribute(name);
num += 2 * attr->var_size();

// If nullable, we load nullable offsets.
num += attr->nullable();
}

if (!dense) {
// Add timestamps if required.
if (!timestamps_not_present(constants::timestamps, frag_idx)) {
num++;
}

// Add delete metadata if required.
if (!delete_meta_not_present(
constants::delete_timestamps, frag_idx)) {
num++;
num += deletes_consolidation_no_purge_;
}
}

// Finally set the size of the loaded data.

// The expected size of the tile offsets
unsigned offsets_size = num * tile_num * sizeof(uint64_t);

// Other than the offsets themselves, there is also memory used for the
// initialization of the vectors that hold them. This initialization
// takes place in LoadedFragmentMetadata::resize_offsets()

// Calculate the number of fields
unsigned num_fields = schema->attribute_num() + 1 +
fragment->has_timestamps() +
fragment->has_delete_meta() * 2;

// If version < 5 we use zipped coordinates, otherwise separate
num_fields += (fragment->version() >= 5) ? schema->dim_num() : 0;

// The additional memory required for the vectors to
// store the tile offsets. The number of fields is calculated above.
// Each vector requires 32 bytes. Each field requires 4 vectors. These
// are: tile_offsets_, tile_var_offsets_, tile_var_sizes_,
// tile_validity_offsets_ and are located in loaded_fragment_metadata.h
unsigned offsets_init_size = num_fields * 4 * 32;

ret[frag_idx] = offsets_size + offsets_init_size;
return Status::Ok();
}));

return ret;
}

bool ReaderBase::process_partial_timestamps(FragmentMetadata& frag_meta) const {
return frag_meta.has_timestamps() &&
frag_meta.partial_time_overlap(
Expand Down
19 changes: 19 additions & 0 deletions tiledb/sm/query/readers/reader_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,9 @@ class ReaderBase : public StrategyBase {
/** If the user requested timestamps attribute in the query */
bool user_requested_timestamps_;

/** Are we doing deletes consolidation (without purge option). */
bool deletes_consolidation_no_purge_;

/**
* If the special timestamps attribute should be loaded to memory for
* this query
Expand All @@ -281,6 +284,12 @@ class ReaderBase : public StrategyBase {
*/
std::vector<bool> timestamps_needed_for_deletes_and_updates_;

/** Are dimensions var sized. */
std::vector<bool> is_dim_var_size_;

/** Names of dim/attr loaded for query condition. */
std::vector<std::string> qc_loaded_attr_names_;

/** Names of dim/attr loaded for query condition. */
std::unordered_set<std::string> qc_loaded_attr_names_set_;

Expand All @@ -305,6 +314,9 @@ class ReaderBase : public StrategyBase {
* */
std::unordered_map<std::string, QueryBuffer>& aggregate_buffers_;

/** Per fragment tile offsets memory usage. */
std::vector<uint64_t> per_frag_tile_offsets_usage_;

/* ********************************* */
/* PROTECTED METHODS */
/* ********************************* */
Expand Down Expand Up @@ -338,6 +350,13 @@ class ReaderBase : public StrategyBase {
return true;
}

/**
* Computes the required size for loading tile offsets, per fragments.
*
* @return Required memory for loading tile offsets, per fragments.
*/
std::vector<uint64_t> tile_offset_sizes();

/**
* Returns if we need to process partial timestamp condition for this
* fragment.
Expand Down
Loading

0 comments on commit e7e9dba

Please sign in to comment.