Skip to content

Commit

Permalink
Remove serialization non C.41 constructors from index_read_state_from…
Browse files Browse the repository at this point in the history
…_capnp. (#4670)

Made index_read_state_from_capnp C41 compliant. 
---
TYPE: NO_HISTORY
DESC: Remove serialization non C.41 constructors from
index_read_state_from_capnp

---------

Co-authored-by: KiterLuc <[email protected]>
  • Loading branch information
abigalekim and KiterLuc authored Feb 6, 2024
1 parent 4c6d530 commit 47b3eb8
Show file tree
Hide file tree
Showing 7 changed files with 142 additions and 74 deletions.
38 changes: 21 additions & 17 deletions tiledb/sm/query/readers/sparse_global_order_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ SparseGlobalOrderReader<BitmapType>::SparseGlobalOrderReader(

template <class BitmapType>
bool SparseGlobalOrderReader<BitmapType>::incomplete() const {
return !read_state_.done_adding_result_tiles_ ||
return !read_state_.done_adding_result_tiles() ||
memory_used_for_coords_total_ != 0;
}

Expand Down Expand Up @@ -131,7 +131,7 @@ Status SparseGlobalOrderReader<BitmapType>::dowork() {

// Handle empty array.
if (fragment_metadata_.empty()) {
read_state_.done_adding_result_tiles_ = true;
read_state_.set_done_adding_result_tiles(true);
return Status::Ok();
}

Expand Down Expand Up @@ -416,7 +416,7 @@ SparseGlobalOrderReader<BitmapType>::create_result_tiles(
auto tile_num = fragment_metadata_[f]->tile_num();

// Figure out the start index.
auto start = read_state_.frag_idx_[f].tile_idx_;
auto start = read_state_.frag_idx()[f].tile_idx_;
if (!result_tiles[f].empty()) {
start = std::max(start, result_tiles[f].back().tile_idx() + 1);
}
Expand Down Expand Up @@ -470,7 +470,7 @@ SparseGlobalOrderReader<BitmapType>::create_result_tiles(
logger_->debug("All result tiles loaded");
}

read_state_.done_adding_result_tiles_ = done_adding_result_tiles;
read_state_.set_done_adding_result_tiles(done_adding_result_tiles);

// Return the list of tiles added.
std::vector<ResultTile*> created_tiles;
Expand Down Expand Up @@ -789,8 +789,8 @@ bool SparseGlobalOrderReader<BitmapType>::add_next_cell_to_queue(
// Increment the tile index, which should clear all tiles in
// end_iteration.
if (!result_tiles[frag_idx].empty()) {
read_state_.frag_idx_[frag_idx].tile_idx_++;
read_state_.frag_idx_[frag_idx].cell_idx_ = 0;
uint64_t new_tile_idx = read_state_.frag_idx()[frag_idx].tile_idx_ + 1;
read_state_.set_frag_idx(frag_idx, FragIdx(new_tile_idx, 0));
}

// This fragment has more tiles potentially.
Expand Down Expand Up @@ -876,11 +876,11 @@ void SparseGlobalOrderReader<BitmapType>::compute_hilbert_values(
template <class BitmapType>
void SparseGlobalOrderReader<BitmapType>::update_frag_idx(
GlobalOrderResultTile<BitmapType>* tile, uint64_t c) {
auto& frag_idx = read_state_.frag_idx_[tile->frag_idx()];
auto& frag_idx = read_state_.frag_idx()[tile->frag_idx()];
auto t = tile->tile_idx();
if ((t == frag_idx.tile_idx_ && c > frag_idx.cell_idx_) ||
t > frag_idx.tile_idx_) {
frag_idx = FragIdx(t, c);
read_state_.set_frag_idx(tile->frag_idx(), FragIdx(t, c));
}
}

Expand Down Expand Up @@ -932,8 +932,8 @@ SparseGlobalOrderReader<BitmapType>::merge_result_cell_slabs(

// Add the tile to the queue.
uint64_t cell_idx =
read_state_.frag_idx_[f].tile_idx_ == rt_it[f]->tile_idx() ?
read_state_.frag_idx_[f].cell_idx_ :
read_state_.frag_idx()[f].tile_idx_ == rt_it[f]->tile_idx() ?
read_state_.frag_idx()[f].cell_idx_ :
0;
GlobalOrderResultCoords rc(&*(rt_it[f]), cell_idx);
bool res = add_next_cell_to_queue(
Expand Down Expand Up @@ -1716,8 +1716,9 @@ SparseGlobalOrderReader<BitmapType>::respect_copy_memory_budget(
while (result_cell_slabs.size() > max_cs_idx) {
// Revert progress for this slab in read state, and pop it.
auto& last_rcs = result_cell_slabs.back();
read_state_.frag_idx_[last_rcs.tile_->frag_idx()] =
FragIdx(last_rcs.tile_->tile_idx(), last_rcs.start_);
read_state_.set_frag_idx(
last_rcs.tile_->frag_idx(),
FragIdx(last_rcs.tile_->tile_idx(), last_rcs.start_));
result_cell_slabs.pop_back();
}

Expand Down Expand Up @@ -1758,8 +1759,9 @@ SparseGlobalOrderReader<BitmapType>::compute_var_size_offsets(
while (query_buffer.original_buffer_var_size_ < new_var_buffer_size) {
// Revert progress for this slab in read state, and pop it.
auto& last_rcs = result_cell_slabs.back();
read_state_.frag_idx_[last_rcs.tile_->frag_idx()] =
FragIdx(last_rcs.tile_->tile_idx(), last_rcs.start_);
read_state_.set_frag_idx(
last_rcs.tile_->frag_idx(),
FragIdx(last_rcs.tile_->tile_idx(), last_rcs.start_));
result_cell_slabs.pop_back();

// Update the new var buffer size.
Expand Down Expand Up @@ -1787,8 +1789,10 @@ SparseGlobalOrderReader<BitmapType>::compute_var_size_offsets(
new_var_buffer_size = ((OffType*)query_buffer.buffer_)[total_cells];

// Update the cell progress.
read_state_.frag_idx_[last_rcs.tile_->frag_idx()] =
FragIdx(last_rcs.tile_->tile_idx(), last_rcs.start_ + last_rcs.length_);
read_state_.set_frag_idx(
last_rcs.tile_->frag_idx(),
FragIdx(
last_rcs.tile_->tile_idx(), last_rcs.start_ + last_rcs.length_));

// Remove empty cell slab.
if (last_rcs.length_ == 0) {
Expand Down Expand Up @@ -2192,7 +2196,7 @@ void SparseGlobalOrderReader<BitmapType>::end_iteration(
storage_manager_->compute_tp(), 0, fragment_num, [&](uint64_t f) {
while (!result_tiles[f].empty() &&
result_tiles[f].front().tile_idx() <
read_state_.frag_idx_[f].tile_idx_) {
read_state_.frag_idx()[f].tile_idx_) {
remove_result_tile(f, result_tiles[f].begin(), result_tiles);
}

Expand Down
17 changes: 7 additions & 10 deletions tiledb/sm/query/readers/sparse_index_reader_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ SparseIndexReaderBase::SparseIndexReaderBase(
StrategyParams& params,
bool include_coords)
: ReaderBase(stats, logger, params)
, read_state_(array_->fragment_metadata().size())
, tmp_read_state_(array_->fragment_metadata().size())
, memory_budget_(config_, reader_string)
, include_coords_(include_coords)
Expand Down Expand Up @@ -131,13 +132,13 @@ SparseIndexReaderBase::SparseIndexReaderBase(
/* PROTECTED METHODS */
/* ****************************** */

const typename SparseIndexReaderBase::ReadState*
const typename SparseIndexReaderBase::ReadState&
SparseIndexReaderBase::read_state() const {
return &read_state_;
return read_state_;
}

typename SparseIndexReaderBase::ReadState* SparseIndexReaderBase::read_state() {
return &read_state_;
void SparseIndexReaderBase::set_read_state(ReadState read_state) {
read_state_ = std::move(read_state);
}

uint64_t SparseIndexReaderBase::available_memory() {
Expand Down Expand Up @@ -319,11 +320,10 @@ Status SparseIndexReaderBase::load_initial_data() {
}

auto timer_se = stats_->start_timer("load_initial_data");
read_state_.done_adding_result_tiles_ = false;
read_state_.set_done_adding_result_tiles(false);

// For easy reference.
const auto dim_num = array_schema_.dim_num();
auto fragment_num = fragment_metadata_.size();

// Load delete conditions.
auto&& [st, conditions, update_values] =
Expand Down Expand Up @@ -366,9 +366,6 @@ Status SparseIndexReaderBase::load_initial_data() {
}
}

// Make sure there is enough space for tiles data.
read_state_.frag_idx_.resize(fragment_num);

// Calculate ranges of tiles in the subarray, if set.
if (subarray_.is_set()) {
// At this point, full memory budget is available.
Expand All @@ -385,7 +382,7 @@ Status SparseIndexReaderBase::load_initial_data() {
// below.
RETURN_NOT_OK(subarray_.precompute_all_ranges_tile_overlap(
storage_manager_->compute_tp(),
read_state_.frag_idx_,
read_state_.frag_idx(),
&tmp_read_state_));

if (tmp_read_state_.memory_used_tile_ranges() >
Expand Down
82 changes: 75 additions & 7 deletions tiledb/sm/query/readers/sparse_index_reader_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -296,11 +296,79 @@ class SparseIndexReaderBase : public ReaderBase {
* it is really required to determine if a query is incomplete from the client
* side of a cloud request.
*/
struct ReadState {
class ReadState {
public:
/* ********************************* */
/* CONSTRUCTORS & DESTRUCTORS */
/* ********************************* */

/** Delete default constructor. */
ReadState() = delete;

/** Constructor.
* @param frag_idxs_len The length of the fragment index vector.
*/
ReadState(size_t frag_idxs_len)
: frag_idx_(frag_idxs_len) {
}

/** Constructor used in deserialization. */
ReadState(std::vector<FragIdx>&& frag_idx, bool done_adding_result_tiles)
: frag_idx_(std::move(frag_idx))
, done_adding_result_tiles_(done_adding_result_tiles) {
}

/* ********************************* */
/* API */
/* ********************************* */

/**
* Return whether the tiles that will be processed are loaded in memory.
* @return Done adding result tiles.
*/
inline bool done_adding_result_tiles() const {
return done_adding_result_tiles_;
}

/**
* Sets the flag that determines whether the tiles that will be processed
* are loaded in memory.
* @param done_adding_result_tiles Done adding result tiles.
*/
inline void set_done_adding_result_tiles(bool done_adding_result_tiles) {
done_adding_result_tiles_ = done_adding_result_tiles;
}

/**
* Sets a value in the fragment index vector.
* @param idx The index of the vector.
* @param val The value to set frag_idx[idx] to.
*/
inline void set_frag_idx(uint64_t idx, FragIdx val) {
if (idx >= frag_idx_.size()) {
throw std::runtime_error(
"ReadState::set_frag_idx: idx greater than frag_idx_'s size.");
}
frag_idx_[idx] = std::move(val);
}

/**
* Returns a read-only version of the fragment index vector.
* @return The fragment index vector.
*/
const std::vector<FragIdx>& frag_idx() const {
return frag_idx_;
}

/* ********************************* */
/* PRIVATE ATTRIBUTES */
/* ********************************* */

private:
/** The tile index inside of each fragments. */
std::vector<FragIdx> frag_idx_;

/** Is the reader done with the query. */
/** Have all tiles to be processed been loaded in memory? */
bool done_adding_result_tiles_;
};

Expand Down Expand Up @@ -506,16 +574,16 @@ class SparseIndexReaderBase : public ReaderBase {
/**
* Returns the current read state.
*
* @return pointer to the read state.
* @return const reference to the read state.
*/
const ReadState* read_state() const;
const ReadState& read_state() const;

/**
* Returns the current read state.
* Sets the new read state. Used only for deserialization.
*
* @return pointer to the read state.
* @param read_state New read_state value.
*/
ReadState* read_state();
void set_read_state(ReadState read_state);

protected:
/* ********************************* */
Expand Down
Loading

0 comments on commit 47b3eb8

Please sign in to comment.