Skip to content

Commit

Permalink
Instrumentation for memory management: FilteredData and FilteredDataB…
Browse files Browse the repository at this point in the history
…lock.
  • Loading branch information
bekadavis9 committed Mar 1, 2024
1 parent 47e70d8 commit 7c06fac
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 63 deletions.
9 changes: 4 additions & 5 deletions tiledb/common/memory_tracker.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
*
* The MIT License
*
* @copyright Copyright (c) 2017-2021 TileDB, Inc.
* @copyright Copyright (c) 2017-2024 TileDB, Inc.
* @copyright Copyright (c) 2016 MIT and Intel Corporation
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
Expand Down Expand Up @@ -97,14 +97,14 @@
#include "tiledb/common/status.h"
#include "tiledb/sm/config/config.h"

namespace tiledb {
namespace sm {
namespace tiledb::sm {

//** The type of memory to track. */
enum class MemoryType {
ENUMERATION,
ENUMERATION_PATHS,
FOOTER,
FILTERED_DATA_BLOCK,
GENERIC_TILE_IO,
RTREE,
TILE_DATA,
Expand Down Expand Up @@ -431,7 +431,6 @@ class MemoryTrackerReporter {
bool stop_;
};

} // namespace sm
} // namespace tiledb
} // namespace tiledb::sm

#endif // TILEDB_OPEN_ARRAY_MEMORY_TRACKER_H
12 changes: 5 additions & 7 deletions tiledb/sm/query/readers/dense_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
*
* The MIT License
*
* @copyright Copyright (c) 2017-2022 TileDB, Inc.
* @copyright Copyright (c) 2017-2024 TileDB, Inc.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand Down Expand Up @@ -54,8 +54,7 @@ using namespace tiledb;
using namespace tiledb::common;
using namespace tiledb::sm::stats;

namespace tiledb {
namespace sm {
namespace tiledb::sm {

class DenseReaderStatusException : public StatusException {
public:
Expand Down Expand Up @@ -433,7 +432,7 @@ Status DenseReader::dense_read() {
result_space_tiles,
tile_subarrays);

std::vector<FilteredData> filtered_data;
std::list<FilteredData> filtered_data;

// Read and unfilter tiles.
bool validity_only = null_count_aggregate_only(name);
Expand Down Expand Up @@ -911,7 +910,7 @@ Status DenseReader::apply_query_condition(
tile_subarrays);

// Read and unfilter query condition attributes.
std::vector<FilteredData> filtered_data = read_attribute_tiles(
std::list<FilteredData> filtered_data = read_attribute_tiles(
NameToLoad::from_string_vec(qc_names), result_tiles);

if (compute_task.valid()) {
Expand Down Expand Up @@ -2256,5 +2255,4 @@ void DenseReader::fill_dense_coords_col_slab(
}
}

} // namespace sm
} // namespace tiledb
} // namespace tiledb::sm
100 changes: 68 additions & 32 deletions tiledb/sm/query/readers/filtered_data.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
*
* The MIT License
*
* @copyright Copyright (c) 2023 TileDB, Inc.
* @copyright Copyright (c) 2023-2024 TileDB, Inc.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand Down Expand Up @@ -34,21 +34,18 @@
#define TILEDB_FILTERED_DATA_H

#include "tiledb/common/common.h"
#include "tiledb/common/memory_tracker.h"
#include "tiledb/common/status.h"
#include "tiledb/sm/storage_manager/storage_manager.h"

using namespace tiledb::common;

namespace tiledb {
namespace sm {
namespace tiledb::sm {

/**
* A filtered data block containing filtered data for multiple tiles. The block
* will contain a number of contiguous on-disk tiles and the data is identified
* by the fragment index and offset/size of the data in the on-disk file.
*
* This uses a vector for storage which will be replaced by datablocks when
* ready.
*/
class FilteredDataBlock {
public:
Expand All @@ -63,13 +60,26 @@ class FilteredDataBlock {
* coming from.
* @param offset File offset of the on-disk data for this datablock.
* @param size Size of the on-disk data for this data block.
* @param resource The memory resource.
*/
FilteredDataBlock(unsigned frag_idx, uint64_t offset, uint64_t size)
: frag_idx_(frag_idx)
FilteredDataBlock(
unsigned frag_idx,
uint64_t offset,
uint64_t size,
tdb::pmr::memory_resource* resource)
: resource_(resource)
, frag_idx_(frag_idx)
, offset_(offset)
, filtered_data_(size) {
, size_(size)
, filtered_data_(tdb::pmr::make_unique<std::byte>(resource_, size)) {
if (!filtered_data_) {
throw std::bad_alloc();
}
}

DISABLE_COPY_AND_COPY_ASSIGN(FilteredDataBlock);
DISABLE_MOVE_AND_MOVE_ASSIGN(FilteredDataBlock);

/* ********************************* */
/* API */
/* ********************************* */
Expand All @@ -85,21 +95,20 @@ class FilteredDataBlock {
}

/**
* @return Pointer to the data at a particular offset in the filtered data
* file.
* @return Pointer to the data at the given offset in the filtered data file.
*/
inline void* data_at(storage_size_t offset) {
return filtered_data_.data() + offset - offset_;
return filtered_data_.get() + offset - offset_;
}

/** @return Pointer to the data inside of the filtered data block. */
inline void* data() {
return filtered_data_.data();
return filtered_data_.get();
}

/** @return Size of the data block. */
inline storage_size_t size() const {
return filtered_data_.size();
return size_;
}

/**
Expand All @@ -109,22 +118,27 @@ class FilteredDataBlock {
inline bool contains(
unsigned frag_idx, storage_size_t offset, storage_size_t size) const {
return frag_idx == frag_idx_ && offset >= offset_ &&
offset + size <= offset_ + filtered_data_.size();
offset + size <= offset_ + size_;
}

private:
/* ********************************* */
/* PRIVATE ATTRIBUTES */
/* ********************************* */
/** The memory resource to use. */
tdb::pmr::memory_resource* resource_;

/** Fragment index for the data this data block contains. */
unsigned frag_idx_;

/** File offset of the on-disk data for this datablock. */
storage_size_t offset_;

/** The size of the data. */
storage_size_t size_;

/** Data for the data block. */
std::vector<char> filtered_data_;
tdb::pmr::unique_ptr<std::byte> filtered_data_;
};

/**
Expand Down Expand Up @@ -159,6 +173,7 @@ class FilteredData {
* @param validity_only Is the field read for validity only?
* @param storage_manager Storage manager.
* @param read_tasks Read tasks to queue new tasks on for new data blocks.
* @param memory_tracker Memory tracker.
*/
FilteredData(
const ReaderBase& reader,
Expand All @@ -172,8 +187,10 @@ class FilteredData {
const bool nullable,
const bool validity_only,
StorageManager* storage_manager,
std::vector<ThreadPool::Task>& read_tasks)
: name_(name)
std::vector<ThreadPool::Task>& read_tasks,
shared_ptr<MemoryTracker> memory_tracker)
: memory_tracker_(memory_tracker)
, name_(name)
, fragment_metadata_(fragment_metadata)
, var_sized_(var_sized)
, nullable_(nullable)
Expand All @@ -184,6 +201,8 @@ class FilteredData {
}

uint64_t tiles_allocated = 0;
auto* block_resource =
memory_tracker_->get_resource(MemoryType::FILTERED_DATA_BLOCK);

// Store data on the datablock in progress for fixed, var and nullable data.
std::optional<unsigned> current_frag_idx{nullopt};
Expand Down Expand Up @@ -251,19 +270,28 @@ class FilteredData {
// Finish by pushing the last in progress blocks.
if (current_fixed_size != 0) {
fixed_data_blocks_.emplace_back(
*current_frag_idx, current_fixed_offset, current_fixed_size);
*current_frag_idx,
current_fixed_offset,
current_fixed_size,
block_resource);
queue_last_block_for_read(TileType::FIXED);
}

if (current_var_size != 0) {
var_data_blocks_.emplace_back(
*current_frag_idx, current_var_offset, current_var_size);
*current_frag_idx,
current_var_offset,
current_var_size,
block_resource);
queue_last_block_for_read(TileType::VAR);
}

if (current_nullable_size != 0) {
nullable_data_blocks_.emplace_back(
*current_frag_idx, current_nullable_offset, current_nullable_size);
*current_frag_idx,
current_nullable_offset,
current_nullable_size,
block_resource);
queue_last_block_for_read(TileType::NULLABLE);
}

Expand All @@ -274,6 +302,9 @@ class FilteredData {
current_nullable_data_block_ = nullable_data_blocks_.begin();
}

DISABLE_COPY_AND_COPY_ASSIGN(FilteredData);
DISABLE_MOVE_AND_MOVE_ASSIGN(FilteredData);

/** Destructor. */
~FilteredData() = default;

Expand Down Expand Up @@ -364,7 +395,7 @@ class FilteredData {
}

/** @return Data blocks corresponding to the tile type. */
inline std::vector<FilteredDataBlock>& data_blocks(const TileType type) {
inline std::list<FilteredDataBlock>& data_blocks(const TileType type) {
switch (type) {
case TileType::FIXED:
return fixed_data_blocks_;
Expand All @@ -378,7 +409,7 @@ class FilteredData {
}

/** @return Current data block corresponding to the tile type. */
inline std::vector<FilteredDataBlock>::iterator& current_data_block(
inline std::list<FilteredDataBlock>::iterator& current_data_block(
const TileType type) {
switch (type) {
case TileType::FIXED:
Expand Down Expand Up @@ -516,7 +547,10 @@ class FilteredData {
} else {
// Push the old batch and start a new one.
data_blocks(type).emplace_back(
*current_block_frag_idx, current_block_offset, current_block_size);
*current_block_frag_idx,
current_block_offset,
current_block_size,
memory_tracker_->get_resource(MemoryType::FILTERED_DATA_BLOCK));
queue_last_block_for_read(type);
current_block_offset = offset;
current_block_size = size;
Expand Down Expand Up @@ -554,23 +588,26 @@ class FilteredData {
/* PRIVATE ATTRIBUTES */
/* ********************************* */

/** Memory tracker for the filtered data. */
shared_ptr<MemoryTracker> memory_tracker_;

/** Fixed data blocks. */
std::vector<FilteredDataBlock> fixed_data_blocks_;
std::list<FilteredDataBlock> fixed_data_blocks_;

/** Current fixed data block used when creating fixed tiles. */
std::vector<FilteredDataBlock>::iterator current_fixed_data_block_;
std::list<FilteredDataBlock>::iterator current_fixed_data_block_;

/** Var data blocks. */
std::vector<FilteredDataBlock> var_data_blocks_;
std::list<FilteredDataBlock> var_data_blocks_;

/** Current var data block used when creating var tiles. */
std::vector<FilteredDataBlock>::iterator current_var_data_block_;
std::list<FilteredDataBlock>::iterator current_var_data_block_;

/** Nullable data blocks. */
std::vector<FilteredDataBlock> nullable_data_blocks_;
std::list<FilteredDataBlock> nullable_data_blocks_;

/** Current nullable data block used when creating nullable tiles. */
std::vector<FilteredDataBlock>::iterator current_nullable_data_block_;
std::list<FilteredDataBlock>::iterator current_nullable_data_block_;

/** Name of the attribute. */
const std::string& name_;
Expand All @@ -591,7 +628,6 @@ class FilteredData {
std::vector<ThreadPool::Task>& read_tasks_;
};

} // namespace sm
} // namespace tiledb
} // namespace tiledb::sm

#endif // TILEDB_FILTERED_DATA_H
Loading

0 comments on commit 7c06fac

Please sign in to comment.