From e7e9dba545085c55237e911a98bf1e4dff793b6b Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Mon, 16 Sep 2024 17:47:15 +0200 Subject: [PATCH] [Backport release-2.26] Dense reader fails early when tile offsets are too large. (#5310) (#5311) Backport of #5310 to release-2.26. --- TYPE: IMPROVEMENT DESC: Dense reader fails early when tile offsets are too large. --------- Co-authored-by: Luc Rancourt Co-authored-by: KiterLuc <67824247+KiterLuc@users.noreply.github.com> --- test/src/unit-dense-reader.cc | 40 +++++- tiledb/sm/query/readers/dense_reader.cc | 22 +++ tiledb/sm/query/readers/reader_base.cc | 130 ++++++++++++++++++ tiledb/sm/query/readers/reader_base.h | 19 +++ .../query/readers/sparse_index_reader_base.cc | 124 ----------------- .../query/readers/sparse_index_reader_base.h | 19 --- 6 files changed, 209 insertions(+), 145 deletions(-) diff --git a/test/src/unit-dense-reader.cc b/test/src/unit-dense-reader.cc index f72e45564a7..9d96882179d 100644 --- a/test/src/unit-dense-reader.cc +++ b/test/src/unit-dense-reader.cc @@ -889,6 +889,42 @@ TEST_CASE_METHOD( "DenseReader: Memory budget is too small to open array"); } +TEST_CASE_METHOD( + CDenseFx, + "Dense reader: budget too small for tile offsets", + "[dense-reader][budget-too-small-tile-offsets]") { + // Create default array. + reset_config(); + create_default_array_1d(); + + uint64_t num_frags = GENERATE(1, 2); + + // Write some fragments. + int subarray[] = {1, NUM_CELLS}; + std::vector data(NUM_CELLS); + std::iota(data.begin(), data.end(), 1); + uint64_t data_size = data.size() * sizeof(int); + for (uint64_t f = 0; f < num_frags; f++) { + write_1d_fragment(subarray, data.data(), &data_size); + } + + // Footer for a fragment is 390 bytes. + // Tile offsets for a fragment are 400 bytes. + // Tile upper memory limit is more than enough to load 40 bytes tiles. + total_budget_ = std::to_string(390 * num_frags + 200); + update_config(); + + // Try to read. + int data_r[NUM_CELLS] = {0}; + uint64_t data_r_size = sizeof(data_r); + read( + subarray, + data_r, + &data_r_size, + 0, + "DenseReader: Cannot load tile offsets, increase memory budget"); +} + TEST_CASE_METHOD( CDenseFx, "Dense reader: tile budget exceeded, fixed attribute", @@ -942,7 +978,7 @@ TEST_CASE_METHOD( uint64_t data_size = data.size() * sizeof(int); write_1d_fragment(subarray, data.data(), &data_size); - total_budget_ = "420"; + total_budget_ = "804"; tile_upper_memory_limit_ = "50"; update_config(); @@ -1025,7 +1061,7 @@ TEST_CASE_METHOD( // Each tiles are 91 and 100 bytes respectively, this will only allow to // load one as the budget is split across two potential reads. - total_budget_ = "460"; + total_budget_ = "840"; tile_upper_memory_limit_ = "210"; update_config(); diff --git a/tiledb/sm/query/readers/dense_reader.cc b/tiledb/sm/query/readers/dense_reader.cc index c916e65c717..fc0d678d012 100644 --- a/tiledb/sm/query/readers/dense_reader.cc +++ b/tiledb/sm/query/readers/dense_reader.cc @@ -98,6 +98,9 @@ DenseReader::DenseReader( // Check the validity buffer sizes. check_validity_buffer_sizes(); + + // No dense dimensions can be var sized. + is_dim_var_size_.resize(array_schema_.dim_num(), false); } /* ****************************** */ @@ -310,6 +313,25 @@ Status DenseReader::dense_read() { if (condition_.has_value()) { qc_loaded_attr_names_set_ = condition_->field_names(); } + qc_loaded_attr_names_.clear(); + qc_loaded_attr_names_.reserve(qc_loaded_attr_names_set_.size()); + for (auto& name : qc_loaded_attr_names_set_) { + qc_loaded_attr_names_.emplace_back(name); + } + + // Load per fragment tile offsets memory usage. + per_frag_tile_offsets_usage_ = tile_offset_sizes(); + + // Compute total tile offsets sizes. + auto total_tile_offsets_sizes = std::accumulate( + per_frag_tile_offsets_usage_.begin(), + per_frag_tile_offsets_usage_.end(), + static_cast(0)); + if (total_tile_offsets_sizes > + memory_budget_ - array_memory_tracker_->get_memory_usage()) { + throw DenseReaderException( + "Cannot load tile offsets, increase memory budget"); + } auto&& [names, var_names] = field_names_to_process(qc_loaded_attr_names_set_); diff --git a/tiledb/sm/query/readers/reader_base.cc b/tiledb/sm/query/readers/reader_base.cc index 268571ce6f6..083d202fa15 100644 --- a/tiledb/sm/query/readers/reader_base.cc +++ b/tiledb/sm/query/readers/reader_base.cc @@ -77,6 +77,8 @@ ReaderBase::ReaderBase( , memory_tracker_(params.query_memory_tracker()) , condition_(params.condition()) , user_requested_timestamps_(false) + , deletes_consolidation_no_purge_( + buffers_.count(constants::delete_timestamps) != 0) , use_timestamps_(false) , initial_data_loaded_(false) , max_batch_size_(config_.get("vfs.max_batch_size").value()) @@ -145,6 +147,134 @@ bool ReaderBase::skip_field( /* PROTECTED METHODS */ /* ****************************** */ +std::vector ReaderBase::tile_offset_sizes() { + auto timer_se = stats_->start_timer("tile_offset_sizes"); + + // For easy reference. + std::vector ret(fragment_metadata_.size()); + const auto dim_num = array_schema_.dim_num(); + + // Compute the size of tile offsets per fragments. + const auto relevant_fragments = subarray_.relevant_fragments(); + throw_if_not_ok(parallel_for( + &resources_.compute_tp(), 0, relevant_fragments.size(), [&](uint64_t i) { + // For easy reference. + auto frag_idx = relevant_fragments[i]; + auto& fragment = fragment_metadata_[frag_idx]; + const auto& schema = fragment->array_schema(); + const auto tile_num = fragment->tile_num(); + const auto dense = schema->dense(); + + // Compute the number of dimensions/attributes requiring offsets. + uint64_t num = 0; + + // For fragments with version smaller than 5 we have zipped coords. + // Otherwise we load each dimensions independently. + if (!dense) { + if (fragment->version() < 5) { + num = 1; + } else { + for (unsigned d = 0; d < dim_num; ++d) { + // Fixed tile (offsets or fixed data). + num++; + + // If var size, we load var offsets and var tile sizes. + if (is_dim_var_size_[d]) { + num += 2; + } + } + } + } + + // Process everything loaded for query condition. + for (auto& name : qc_loaded_attr_names_) { + // Not a member of array schema, this field was added in array + // schema evolution, ignore for this fragment's tile offsets. + // Also skip dimensions. + if (!schema->is_field(name) || schema->is_dim(name)) { + continue; + } + + // Fixed tile (offsets or fixed data). + num++; + + // If var size, we load var offsets and var tile sizes. + const auto attr = schema->attribute(name); + num += 2 * attr->var_size(); + + // If nullable, we load nullable offsets. + num += attr->nullable(); + } + + // Process everything loaded for user requested data. + for (auto& it : buffers_) { + const auto& name = it.first; + + // Skip dimensions and attributes loaded by query condition as they + // are processed above. Special attributes (timestamps, delete + // timestamps, etc.) are processed below. + if (array_schema_.is_dim(name) || !schema->is_field(name) || + qc_loaded_attr_names_set_.count(name) != 0 || + schema->is_special_attribute(name)) { + continue; + } + + // Fixed tile (offsets or fixed data). + num++; + + // If var size, we load var offsets and var tile sizes. + const auto attr = schema->attribute(name); + num += 2 * attr->var_size(); + + // If nullable, we load nullable offsets. + num += attr->nullable(); + } + + if (!dense) { + // Add timestamps if required. + if (!timestamps_not_present(constants::timestamps, frag_idx)) { + num++; + } + + // Add delete metadata if required. + if (!delete_meta_not_present( + constants::delete_timestamps, frag_idx)) { + num++; + num += deletes_consolidation_no_purge_; + } + } + + // Finally set the size of the loaded data. + + // The expected size of the tile offsets + unsigned offsets_size = num * tile_num * sizeof(uint64_t); + + // Other than the offsets themselves, there is also memory used for the + // initialization of the vectors that hold them. This initialization + // takes place in LoadedFragmentMetadata::resize_offsets() + + // Calculate the number of fields + unsigned num_fields = schema->attribute_num() + 1 + + fragment->has_timestamps() + + fragment->has_delete_meta() * 2; + + // If version < 5 we use zipped coordinates, otherwise separate + num_fields += (fragment->version() >= 5) ? schema->dim_num() : 0; + + // The additional memory required for the vectors to + // store the tile offsets. The number of fields is calculated above. + // Each vector requires 32 bytes. Each field requires 4 vectors. These + // are: tile_offsets_, tile_var_offsets_, tile_var_sizes_, + // tile_validity_offsets_ and are located in loaded_fragment_metadata.h + unsigned offsets_init_size = num_fields * 4 * 32; + + ret[frag_idx] = offsets_size + offsets_init_size; + return Status::Ok(); + })); + + return ret; +} + bool ReaderBase::process_partial_timestamps(FragmentMetadata& frag_meta) const { return frag_meta.has_timestamps() && frag_meta.partial_time_overlap( diff --git a/tiledb/sm/query/readers/reader_base.h b/tiledb/sm/query/readers/reader_base.h index 0dc0fbefa82..ae06525cfed 100644 --- a/tiledb/sm/query/readers/reader_base.h +++ b/tiledb/sm/query/readers/reader_base.h @@ -269,6 +269,9 @@ class ReaderBase : public StrategyBase { /** If the user requested timestamps attribute in the query */ bool user_requested_timestamps_; + /** Are we doing deletes consolidation (without purge option). */ + bool deletes_consolidation_no_purge_; + /** * If the special timestamps attribute should be loaded to memory for * this query @@ -281,6 +284,12 @@ class ReaderBase : public StrategyBase { */ std::vector timestamps_needed_for_deletes_and_updates_; + /** Are dimensions var sized. */ + std::vector is_dim_var_size_; + + /** Names of dim/attr loaded for query condition. */ + std::vector qc_loaded_attr_names_; + /** Names of dim/attr loaded for query condition. */ std::unordered_set qc_loaded_attr_names_set_; @@ -305,6 +314,9 @@ class ReaderBase : public StrategyBase { * */ std::unordered_map& aggregate_buffers_; + /** Per fragment tile offsets memory usage. */ + std::vector per_frag_tile_offsets_usage_; + /* ********************************* */ /* PROTECTED METHODS */ /* ********************************* */ @@ -338,6 +350,13 @@ class ReaderBase : public StrategyBase { return true; } + /** + * Computes the required size for loading tile offsets, per fragments. + * + * @return Required memory for loading tile offsets, per fragments. + */ + std::vector tile_offset_sizes(); + /** * Returns if we need to process partial timestamp condition for this * fragment. diff --git a/tiledb/sm/query/readers/sparse_index_reader_base.cc b/tiledb/sm/query/readers/sparse_index_reader_base.cc index 542b797688d..1956529a140 100644 --- a/tiledb/sm/query/readers/sparse_index_reader_base.cc +++ b/tiledb/sm/query/readers/sparse_index_reader_base.cc @@ -75,8 +75,6 @@ SparseIndexReaderBase::SparseIndexReaderBase( , memory_budget_(config_, reader_string, params.memory_budget()) , include_coords_(include_coords) , memory_used_for_coords_total_(0) - , deletes_consolidation_no_purge_( - buffers_.count(constants::delete_timestamps) != 0) , partial_tile_offsets_loading_(false) { // Sanity checks if (!params.skip_checks_serialization() && buffers_.empty() && @@ -141,128 +139,6 @@ uint64_t SparseIndexReaderBase::available_memory() { array_memory_tracker_->get_memory_usage(); } -std::vector SparseIndexReaderBase::tile_offset_sizes() { - auto timer_se = stats_->start_timer("tile_offset_sizes"); - - // For easy reference. - std::vector ret(fragment_metadata_.size()); - const auto dim_num = array_schema_.dim_num(); - - // Compute the size of tile offsets per fragments. - const auto relevant_fragments = subarray_.relevant_fragments(); - throw_if_not_ok(parallel_for( - &resources_.compute_tp(), 0, relevant_fragments.size(), [&](uint64_t i) { - // For easy reference. - auto frag_idx = relevant_fragments[i]; - auto& fragment = fragment_metadata_[frag_idx]; - const auto& schema = fragment->array_schema(); - const auto tile_num = fragment->tile_num(); - - // Compute the number of dimensions/attributes requiring offsets. - uint64_t num = 0; - - // For fragments with version smaller than 5 we have zipped coords. - // Otherwise we load each dimensions independently. - if (fragment->version() < 5) { - num = 1; - } else { - for (unsigned d = 0; d < dim_num; ++d) { - // Fixed tile (offsets or fixed data). - num++; - - // If var size, we load var offsets and var tile sizes. - if (is_dim_var_size_[d]) { - num += 2; - } - } - } - - // Process everything loaded for query condition. - for (auto& name : qc_loaded_attr_names_) { - // Not a member of array schema, this field was added in array - // schema evolution, ignore for this fragment's tile offsets. - // Also skip dimensions. - if (!schema->is_field(name) || schema->is_dim(name)) { - continue; - } - - // Fixed tile (offsets or fixed data). - num++; - - // If var size, we load var offsets and var tile sizes. - const auto attr = schema->attribute(name); - num += 2 * attr->var_size(); - - // If nullable, we load nullable offsets. - num += attr->nullable(); - } - - // Process everything loaded for user requested data. - for (auto& it : buffers_) { - const auto& name = it.first; - - // Skip dimensions and attributes loaded by query condition as they - // are processed above. Special attributes (timestamps, delete - // timestamps, etc.) are processed below. - if (array_schema_.is_dim(name) || !schema->is_field(name) || - qc_loaded_attr_names_set_.count(name) != 0 || - schema->is_special_attribute(name)) { - continue; - } - - // Fixed tile (offsets or fixed data). - num++; - - // If var size, we load var offsets and var tile sizes. - const auto attr = schema->attribute(name); - num += 2 * attr->var_size(); - - // If nullable, we load nullable offsets. - num += attr->nullable(); - } - - // Add timestamps if required. - if (!timestamps_not_present(constants::timestamps, frag_idx)) { - num++; - } - - // Add delete metadata if required. - if (!delete_meta_not_present(constants::delete_timestamps, frag_idx)) { - num++; - num += deletes_consolidation_no_purge_; - } - - // Finally set the size of the loaded data. - - // The expected size of the tile offsets - unsigned offsets_size = num * tile_num * sizeof(uint64_t); - - // Other than the offsets themselves, there is also memory used for the - // initialization of the vectors that hold them. This initialization - // takes place in LoadedFragmentMetadata::resize_offsets() - - // Calculate the number of fields - unsigned num_fields = schema->attribute_num() + 1 + - fragment->has_timestamps() + - fragment->has_delete_meta() * 2; - - // If version < 5 we use zipped coordinates, otherwise separate - num_fields += (fragment->version() >= 5) ? schema->dim_num() : 0; - - // The additional memory required for the vectors to - // store the tile offsets. The number of fields is calculated above. - // Each vector requires 32 bytes. Each field requires 4 vectors. These - // are: tile_offsets_, tile_var_offsets_, tile_var_sizes_, - // tile_validity_offsets_ and are located in loaded_fragment_metadata.h - unsigned offsets_init_size = num_fields * 4 * 32; - - ret[frag_idx] = offsets_size + offsets_init_size; - return Status::Ok(); - })); - - return ret; -} - bool SparseIndexReaderBase::has_post_deduplication_conditions( FragmentMetadata& frag_meta) { return frag_meta.has_delete_meta() || condition_.has_value() || diff --git a/tiledb/sm/query/readers/sparse_index_reader_base.h b/tiledb/sm/query/readers/sparse_index_reader_base.h index 88e291b8aa8..8d45e8c6f59 100644 --- a/tiledb/sm/query/readers/sparse_index_reader_base.h +++ b/tiledb/sm/query/readers/sparse_index_reader_base.h @@ -613,21 +613,12 @@ class SparseIndexReaderBase : public ReaderBase { /** Dimension names. */ std::vector dim_names_; - /** Are dimensions var sized. */ - std::vector is_dim_var_size_; - /** Memory used for coordinates tiles. */ std::atomic memory_used_for_coords_total_; /** Are we in elements mode. */ bool elements_mode_; - /** Names of dim/attr loaded for query condition. */ - std::vector qc_loaded_attr_names_; - - /** Are we doing deletes consolidation (without purge option). */ - bool deletes_consolidation_no_purge_; - /** Do we allow partial tile offset loading for this query? */ bool partial_tile_offsets_loading_; @@ -637,9 +628,6 @@ class SparseIndexReaderBase : public ReaderBase { /** Attributes for which to load tile offsets. */ std::vector attr_tile_offsets_to_load_; - /** Per fragment tile offsets memory usage. */ - std::vector per_frag_tile_offsets_usage_; - /* ********************************* */ /* PROTECTED METHODS */ /* ********************************* */ @@ -647,13 +635,6 @@ class SparseIndexReaderBase : public ReaderBase { /** @return Available memory. */ uint64_t available_memory(); - /** - * Computes the required size for loading tile offsets, per fragments. - * - * @return Required memory for loading tile offsets, per fragments. - */ - std::vector tile_offset_sizes(); - /** * Returns if there is any condition to be applied post deduplication. This * will return true if we have: