Skip to content

Commit

Permalink
Dense reader fails early when tile offsets are too large. (#5310)
Browse files Browse the repository at this point in the history
This change allows to accurately compute the memory usage for tile
offsets before they are loaded. We can then throw an error before we do
any work if the tile offsets are too large for the memory budget. This
will enable us to fail faster on REST when performing large queries and
do fast retries.

Note that the memory usage for tile offsets is done per fragment, which
will ease the implementation of partial tile offsets loading for the
dense reader.

---
TYPE: IMPROVEMENT
DESC: Dense reader fails early when tile offsets are too large.
  • Loading branch information
KiterLuc authored Sep 16, 2024
1 parent 892a8bb commit 7253795
Show file tree
Hide file tree
Showing 6 changed files with 209 additions and 145 deletions.
40 changes: 38 additions & 2 deletions test/src/unit-dense-reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -886,6 +886,42 @@ TEST_CASE_METHOD(
"DenseReader: Memory budget is too small to open array");
}

TEST_CASE_METHOD(
CDenseFx,
"Dense reader: budget too small for tile offsets",
"[dense-reader][budget-too-small-tile-offsets]") {
// Create default array.
reset_config();
create_default_array_1d();

uint64_t num_frags = GENERATE(1, 2);

// Write some fragments.
int subarray[] = {1, NUM_CELLS};
std::vector<int> data(NUM_CELLS);
std::iota(data.begin(), data.end(), 1);
uint64_t data_size = data.size() * sizeof(int);
for (uint64_t f = 0; f < num_frags; f++) {
write_1d_fragment(subarray, data.data(), &data_size);
}

// Footer for a fragment is 390 bytes.
// Tile offsets for a fragment are 400 bytes.
// Tile upper memory limit is more than enough to load 40 bytes tiles.
total_budget_ = std::to_string(390 * num_frags + 200);
update_config();

// Try to read.
int data_r[NUM_CELLS] = {0};
uint64_t data_r_size = sizeof(data_r);
read(
subarray,
data_r,
&data_r_size,
0,
"DenseReader: Cannot load tile offsets, increase memory budget");
}

TEST_CASE_METHOD(
CDenseFx,
"Dense reader: tile budget exceeded, fixed attribute",
Expand Down Expand Up @@ -939,7 +975,7 @@ TEST_CASE_METHOD(
uint64_t data_size = data.size() * sizeof(int);
write_1d_fragment(subarray, data.data(), &data_size);

total_budget_ = "420";
total_budget_ = "804";
tile_upper_memory_limit_ = "50";
update_config();

Expand Down Expand Up @@ -1022,7 +1058,7 @@ TEST_CASE_METHOD(

// Each tiles are 91 and 100 bytes respectively, this will only allow to
// load one as the budget is split across two potential reads.
total_budget_ = "460";
total_budget_ = "840";
tile_upper_memory_limit_ = "210";
update_config();

Expand Down
22 changes: 22 additions & 0 deletions tiledb/sm/query/readers/dense_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ DenseReader::DenseReader(

// Check the validity buffer sizes.
check_validity_buffer_sizes();

// No dense dimensions can be var sized.
is_dim_var_size_.resize(array_schema_.dim_num(), false);
}

/* ****************************** */
Expand Down Expand Up @@ -310,6 +313,25 @@ Status DenseReader::dense_read() {
if (condition_.has_value()) {
qc_loaded_attr_names_set_ = condition_->field_names();
}
qc_loaded_attr_names_.clear();
qc_loaded_attr_names_.reserve(qc_loaded_attr_names_set_.size());
for (auto& name : qc_loaded_attr_names_set_) {
qc_loaded_attr_names_.emplace_back(name);
}

// Load per fragment tile offsets memory usage.
per_frag_tile_offsets_usage_ = tile_offset_sizes();

// Compute total tile offsets sizes.
auto total_tile_offsets_sizes = std::accumulate(
per_frag_tile_offsets_usage_.begin(),
per_frag_tile_offsets_usage_.end(),
static_cast<uint64_t>(0));
if (total_tile_offsets_sizes >
memory_budget_ - array_memory_tracker_->get_memory_usage()) {
throw DenseReaderException(
"Cannot load tile offsets, increase memory budget");
}

auto&& [names, var_names] = field_names_to_process(qc_loaded_attr_names_set_);

Expand Down
130 changes: 130 additions & 0 deletions tiledb/sm/query/readers/reader_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ ReaderBase::ReaderBase(
, memory_tracker_(params.query_memory_tracker())
, condition_(params.condition())
, user_requested_timestamps_(false)
, deletes_consolidation_no_purge_(
buffers_.count(constants::delete_timestamps) != 0)
, use_timestamps_(false)
, initial_data_loaded_(false)
, max_batch_size_(config_.get<uint64_t>("vfs.max_batch_size").value())
Expand Down Expand Up @@ -145,6 +147,134 @@ bool ReaderBase::skip_field(
/* PROTECTED METHODS */
/* ****************************** */

std::vector<uint64_t> ReaderBase::tile_offset_sizes() {
auto timer_se = stats_->start_timer("tile_offset_sizes");

// For easy reference.
std::vector<uint64_t> ret(fragment_metadata_.size());
const auto dim_num = array_schema_.dim_num();

// Compute the size of tile offsets per fragments.
const auto relevant_fragments = subarray_.relevant_fragments();
throw_if_not_ok(parallel_for(
&resources_.compute_tp(), 0, relevant_fragments.size(), [&](uint64_t i) {
// For easy reference.
auto frag_idx = relevant_fragments[i];
auto& fragment = fragment_metadata_[frag_idx];
const auto& schema = fragment->array_schema();
const auto tile_num = fragment->tile_num();
const auto dense = schema->dense();

// Compute the number of dimensions/attributes requiring offsets.
uint64_t num = 0;

// For fragments with version smaller than 5 we have zipped coords.
// Otherwise we load each dimensions independently.
if (!dense) {
if (fragment->version() < 5) {
num = 1;
} else {
for (unsigned d = 0; d < dim_num; ++d) {
// Fixed tile (offsets or fixed data).
num++;

// If var size, we load var offsets and var tile sizes.
if (is_dim_var_size_[d]) {
num += 2;
}
}
}
}

// Process everything loaded for query condition.
for (auto& name : qc_loaded_attr_names_) {
// Not a member of array schema, this field was added in array
// schema evolution, ignore for this fragment's tile offsets.
// Also skip dimensions.
if (!schema->is_field(name) || schema->is_dim(name)) {
continue;
}

// Fixed tile (offsets or fixed data).
num++;

// If var size, we load var offsets and var tile sizes.
const auto attr = schema->attribute(name);
num += 2 * attr->var_size();

// If nullable, we load nullable offsets.
num += attr->nullable();
}

// Process everything loaded for user requested data.
for (auto& it : buffers_) {
const auto& name = it.first;

// Skip dimensions and attributes loaded by query condition as they
// are processed above. Special attributes (timestamps, delete
// timestamps, etc.) are processed below.
if (array_schema_.is_dim(name) || !schema->is_field(name) ||
qc_loaded_attr_names_set_.count(name) != 0 ||
schema->is_special_attribute(name)) {
continue;
}

// Fixed tile (offsets or fixed data).
num++;

// If var size, we load var offsets and var tile sizes.
const auto attr = schema->attribute(name);
num += 2 * attr->var_size();

// If nullable, we load nullable offsets.
num += attr->nullable();
}

if (!dense) {
// Add timestamps if required.
if (!timestamps_not_present(constants::timestamps, frag_idx)) {
num++;
}

// Add delete metadata if required.
if (!delete_meta_not_present(
constants::delete_timestamps, frag_idx)) {
num++;
num += deletes_consolidation_no_purge_;
}
}

// Finally set the size of the loaded data.

// The expected size of the tile offsets
unsigned offsets_size = num * tile_num * sizeof(uint64_t);

// Other than the offsets themselves, there is also memory used for the
// initialization of the vectors that hold them. This initialization
// takes place in LoadedFragmentMetadata::resize_offsets()

// Calculate the number of fields
unsigned num_fields = schema->attribute_num() + 1 +
fragment->has_timestamps() +
fragment->has_delete_meta() * 2;

// If version < 5 we use zipped coordinates, otherwise separate
num_fields += (fragment->version() >= 5) ? schema->dim_num() : 0;

// The additional memory required for the vectors to
// store the tile offsets. The number of fields is calculated above.
// Each vector requires 32 bytes. Each field requires 4 vectors. These
// are: tile_offsets_, tile_var_offsets_, tile_var_sizes_,
// tile_validity_offsets_ and are located in loaded_fragment_metadata.h
unsigned offsets_init_size = num_fields * 4 * 32;

ret[frag_idx] = offsets_size + offsets_init_size;
return Status::Ok();
}));

return ret;
}

bool ReaderBase::process_partial_timestamps(FragmentMetadata& frag_meta) const {
return frag_meta.has_timestamps() &&
frag_meta.partial_time_overlap(
Expand Down
19 changes: 19 additions & 0 deletions tiledb/sm/query/readers/reader_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,9 @@ class ReaderBase : public StrategyBase {
/** If the user requested timestamps attribute in the query */
bool user_requested_timestamps_;

/** Are we doing deletes consolidation (without purge option). */
bool deletes_consolidation_no_purge_;

/**
* If the special timestamps attribute should be loaded to memory for
* this query
Expand All @@ -281,6 +284,12 @@ class ReaderBase : public StrategyBase {
*/
std::vector<bool> timestamps_needed_for_deletes_and_updates_;

/** Are dimensions var sized. */
std::vector<bool> is_dim_var_size_;

/** Names of dim/attr loaded for query condition. */
std::vector<std::string> qc_loaded_attr_names_;

/** Names of dim/attr loaded for query condition. */
std::unordered_set<std::string> qc_loaded_attr_names_set_;

Expand All @@ -305,6 +314,9 @@ class ReaderBase : public StrategyBase {
* */
std::unordered_map<std::string, QueryBuffer>& aggregate_buffers_;

/** Per fragment tile offsets memory usage. */
std::vector<uint64_t> per_frag_tile_offsets_usage_;

/* ********************************* */
/* PROTECTED METHODS */
/* ********************************* */
Expand Down Expand Up @@ -338,6 +350,13 @@ class ReaderBase : public StrategyBase {
return true;
}

/**
* Computes the required size for loading tile offsets, per fragments.
*
* @return Required memory for loading tile offsets, per fragments.
*/
std::vector<uint64_t> tile_offset_sizes();

/**
* Returns if we need to process partial timestamp condition for this
* fragment.
Expand Down
Loading

0 comments on commit 7253795

Please sign in to comment.