From c73b5234a60137a0cf030a173b7c19e7cd088297 Mon Sep 17 00:00:00 2001 From: Theodore Tsirpanis Date: Thu, 16 May 2024 09:01:29 +0300 Subject: [PATCH] Add constructor argument to Query to override the memory budget. (#4968) [SC-45187](https://app.shortcut.com/tiledb-inc/story/45187/enable-to-set-reader-budget-on-consolidation) This PR adds an optional argument to the internal `Query` class, that overrides the total memory budget. This will be useful for better memory management during consolidation. --- TYPE: NO_HISTORY --- test/src/unit-Reader.cc | 1 + tiledb/sm/query/query.cc | 5 ++++- tiledb/sm/query/query.h | 11 ++++++++++- tiledb/sm/query/readers/dense_reader.cc | 12 ++++++++---- tiledb/sm/query/readers/dense_reader.h | 3 +++ .../sm/query/readers/sparse_index_reader_base.cc | 2 +- .../sm/query/readers/sparse_index_reader_base.h | 16 +++++++++++++--- tiledb/sm/query/strategy_base.h | 13 +++++++++++++ 8 files changed, 53 insertions(+), 10 deletions(-) diff --git a/test/src/unit-Reader.cc b/test/src/unit-Reader.cc index 8f21d9f0e9a..2b68f1350b7 100644 --- a/test/src/unit-Reader.cc +++ b/test/src/unit-Reader.cc @@ -175,6 +175,7 @@ TEST_CASE_METHOD( context.storage_manager(), array.opened_array(), config, + nullopt, buffers, aggregate_buffers, subarray, diff --git a/tiledb/sm/query/query.cc b/tiledb/sm/query/query.cc index 2374a0a2b6b..8e785f49ef3 100644 --- a/tiledb/sm/query/query.cc +++ b/tiledb/sm/query/query.cc @@ -74,7 +74,8 @@ namespace sm { Query::Query( StorageManager* storage_manager, shared_ptr array, - optional fragment_name) + optional fragment_name, + optional memory_budget) : query_memory_tracker_( storage_manager->resources().create_memory_tracker()) , array_shared_(array) @@ -105,6 +106,7 @@ Query::Query( , is_dimension_label_ordered_read_(false) , dimension_label_increasing_(true) , fragment_size_(std::numeric_limits::max()) + , memory_budget_(memory_budget) , query_remote_buffer_storage_(std::nullopt) , default_channel_{make_shared(HERE(), *this, 0)} { assert(array->is_open()); @@ -1756,6 +1758,7 @@ Status Query::create_strategy(bool skip_checks_serialization) { storage_manager_, opened_array_, config_, + memory_budget_, buffers_, aggregate_buffers_, subarray_, diff --git a/tiledb/sm/query/query.h b/tiledb/sm/query/query.h index b8782b0b53a..b4e1db07d19 100644 --- a/tiledb/sm/query/query.h +++ b/tiledb/sm/query/query.h @@ -155,11 +155,14 @@ class Query { * writes. * @param fragment_base_uri Optional base name for new fragment. Only used for * writes and only if fragment_uri is empty. + * @param memory_budget Total memory budget. If set to nullopt, the value + * will be obtained from the sm.mem.total_budget config option. */ Query( StorageManager* storage_manager, shared_ptr array, - optional fragment_name = nullopt); + optional fragment_name = nullopt, + optional memory_budget = nullopt); /** Destructor. */ virtual ~Query(); @@ -1089,6 +1092,12 @@ class Query { */ uint64_t fragment_size_; + /** + * Memory budget. If set to nullopt, the value will be obtained from the + * sm.mem.total_budget config option. + */ + optional memory_budget_; + /** Already written buffers. */ std::unordered_set written_buffers_; diff --git a/tiledb/sm/query/readers/dense_reader.cc b/tiledb/sm/query/readers/dense_reader.cc index eae579ad953..b8a13d4d2eb 100644 --- a/tiledb/sm/query/readers/dense_reader.cc +++ b/tiledb/sm/query/readers/dense_reader.cc @@ -72,7 +72,9 @@ DenseReader::DenseReader( shared_ptr logger, StrategyParams& params, bool remote_query) - : ReaderBase(stats, logger->clone("DenseReader", ++logger_id_), params) { + : ReaderBase(stats, logger->clone("DenseReader", ++logger_id_), params) + , memory_budget_(params.memory_budget().value_or(0)) + , memory_budget_from_query_(params.memory_budget()) { elements_mode_ = false; // Sanity checks. @@ -121,9 +123,11 @@ QueryStatusDetailsReason DenseReader::status_incomplete_reason() const { void DenseReader::refresh_config() { // Get config values. bool found = false; - throw_if_not_ok( - config_.get("sm.mem.total_budget", &memory_budget_, &found)); - assert(found); + if (!memory_budget_from_query_.has_value()) { + throw_if_not_ok( + config_.get("sm.mem.total_budget", &memory_budget_, &found)); + assert(found); + } throw_if_not_ok(config_.get( "sm.mem.tile_upper_memory_limit", &tile_upper_memory_limit_, &found)); assert(found); diff --git a/tiledb/sm/query/readers/dense_reader.h b/tiledb/sm/query/readers/dense_reader.h index e44c7903fdc..5d54af46124 100644 --- a/tiledb/sm/query/readers/dense_reader.h +++ b/tiledb/sm/query/readers/dense_reader.h @@ -158,6 +158,9 @@ class DenseReader : public ReaderBase, public IQueryStrategy { /** Total memory budget. */ uint64_t memory_budget_; + /** Total memory budget if overridden by the query. */ + optional memory_budget_from_query_; + /** Target upper memory limit for tiles. */ uint64_t tile_upper_memory_limit_; diff --git a/tiledb/sm/query/readers/sparse_index_reader_base.cc b/tiledb/sm/query/readers/sparse_index_reader_base.cc index 1f62acee4c1..b20a46ce4ff 100644 --- a/tiledb/sm/query/readers/sparse_index_reader_base.cc +++ b/tiledb/sm/query/readers/sparse_index_reader_base.cc @@ -72,7 +72,7 @@ SparseIndexReaderBase::SparseIndexReaderBase( : ReaderBase(stats, logger, params) , read_state_(array_->fragment_metadata().size()) , tmp_read_state_(array_->fragment_metadata().size()) - , memory_budget_(config_, reader_string) + , memory_budget_(config_, reader_string, params.memory_budget()) , include_coords_(include_coords) , memory_used_for_coords_total_(0) , deletes_consolidation_no_purge_( diff --git a/tiledb/sm/query/readers/sparse_index_reader_base.h b/tiledb/sm/query/readers/sparse_index_reader_base.h index c9303391001..3ca26a1c18c 100644 --- a/tiledb/sm/query/readers/sparse_index_reader_base.h +++ b/tiledb/sm/query/readers/sparse_index_reader_base.h @@ -110,7 +110,12 @@ class MemoryBudget { MemoryBudget() = delete; - MemoryBudget(Config& config, std::string reader_string) { + MemoryBudget( + Config& config, + std::string reader_string, + optional total_budget) + : total_budget_(total_budget.value_or(0)) + , memory_budget_from_query_(total_budget) { refresh_config(config, reader_string); } @@ -128,8 +133,10 @@ class MemoryBudget { * @param reader_string String to identify the reader settings to load. */ void refresh_config(Config& config, std::string reader_string) { - total_budget_ = - config.get("sm.mem.total_budget", Config::must_find); + if (!memory_budget_from_query_.has_value()) { + total_budget_ = + config.get("sm.mem.total_budget", Config::must_find); + } ratio_coords_ = config.get( "sm.mem.reader." + reader_string + ".ratio_coords", Config::must_find); @@ -192,6 +199,9 @@ class MemoryBudget { /** Total memory budget. */ uint64_t total_budget_; + /** Total memory budget if overridden by the query. */ + optional memory_budget_from_query_; + /** How much of the memory budget is reserved for coords. */ double ratio_coords_; diff --git a/tiledb/sm/query/strategy_base.h b/tiledb/sm/query/strategy_base.h index 8aa91b6301a..9d31a053a83 100644 --- a/tiledb/sm/query/strategy_base.h +++ b/tiledb/sm/query/strategy_base.h @@ -70,6 +70,7 @@ class StrategyParams { StorageManager* storage_manager, shared_ptr array, Config& config, + optional memory_budget, std::unordered_map& buffers, std::unordered_map& aggregate_buffers, Subarray& subarray, @@ -82,6 +83,7 @@ class StrategyParams { , storage_manager_(storage_manager) , array_(array) , config_(config) + , memory_budget_(memory_budget) , buffers_(buffers) , aggregate_buffers_(aggregate_buffers) , subarray_(subarray) @@ -119,6 +121,11 @@ class StrategyParams { return config_; }; + /** Return the memory budget, if set. */ + inline optional memory_budget() { + return memory_budget_; + } + /** Return the buffers. */ inline std::unordered_map& buffers() { return buffers_; @@ -174,6 +181,12 @@ class StrategyParams { /** Config for query-level parameters only. */ Config& config_; + /** + * Memory budget for the query. If set to nullopt, the value will be obtained + * from the sm.mem.total_budget config option. + */ + optional memory_budget_; + /** Buffers. */ std::unordered_map& buffers_;