Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Enhancement] reduce the number of fd hold by hash join spilling (backport #52020) #52083

Merged
merged 1 commit into from
Oct 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 15 additions & 4 deletions be/src/exec/spill/block_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@

namespace starrocks::spill {

using BlockAffinityGroup = uint64_t;
static const BlockAffinityGroup kDefaultBlockAffinityGroup = UINT64_MAX;

class BlockReader;
class BlockReaderOptions;
// Block represents a continuous storage space and is the smallest storage unit of flush and restore in spill task.
Expand Down Expand Up @@ -52,16 +55,16 @@ class Block {

virtual bool preallocate(size_t write_size) = 0;

bool exclusive() const { return _exclusive; }
void set_exclusive(bool exclusive) { _exclusive = exclusive; }

void inc_num_rows(size_t num_rows) { _num_rows += num_rows; }

void set_affinity_group(BlockAffinityGroup affinity_group) { _affinity_group = affinity_group; }
BlockAffinityGroup affinity_group() const { return _affinity_group; }

protected:
size_t _num_rows{};
size_t _size{};
bool _is_remote = false;
bool _exclusive{};
BlockAffinityGroup _affinity_group = kDefaultBlockAffinityGroup;
};

using BlockPtr = std::shared_ptr<Block>;
Expand Down Expand Up @@ -110,6 +113,7 @@ struct AcquireBlockOptions {
// The block will occupy the entire container, making it easier to remove the block.
bool exclusive = false;
size_t block_size = 0;
BlockAffinityGroup affinity_group = kDefaultBlockAffinityGroup;
};

// BlockManager is used to manage the life cycle of the Block.
Expand All @@ -121,9 +125,16 @@ class BlockManager {
virtual ~BlockManager() = default;
virtual Status open() = 0;
virtual void close() = 0;

// acquire a block from BlockManager, return error if BlockManager can't allocate one.
virtual StatusOr<BlockPtr> acquire_block(const AcquireBlockOptions& opts) = 0;
// return Block to BlockManager
virtual Status release_block(BlockPtr block) = 0;

BlockAffinityGroup acquire_affinity_group() { return _next_affinity_group++; }
virtual Status release_affinity_group(const BlockAffinityGroup affinity_group) { return Status::OK(); }

protected:
std::atomic<BlockAffinityGroup> _next_affinity_group = 0;
};
} // namespace starrocks::spill
6 changes: 3 additions & 3 deletions be/src/exec/spill/data_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,13 @@ Status BlockSpillOutputDataStream::_prepare_block(RuntimeState* state, size_t wr
opts.plan_node_id = _spiller->options().plan_node_id;
opts.name = _spiller->options().name;
opts.block_size = write_size;
opts.exclusive = _spiller->options().init_partition_nums > 0 || !_spiller->options().is_unordered;
opts.affinity_group = _block_group->get_affinity_group();
ASSIGN_OR_RETURN(auto block, _block_manager->acquire_block(opts));
// update metrics
auto block_count = GET_METRICS(block->is_remote(), _spiller->metrics(), block_count);
COUNTER_UPDATE(block_count, 1);
TRACE_SPILL_LOG << fmt::format("allocate block [{}]", block->debug_string());
TRACE_SPILL_LOG << fmt::format("allocate block [{}], affinity group[{}]", block->debug_string(),
opts.affinity_group);
_cur_block = std::move(block);
_block_group->append(_cur_block);
}
Expand Down Expand Up @@ -105,7 +106,6 @@ Status BlockSpillOutputDataStream::flush() {
TRACE_SPILL_LOG << fmt::format("flush block[{}]", _cur_block->debug_string());
}

// release block if not exclusive
RETURN_IF_ERROR(_block_manager->release_block(std::move(_cur_block)));
DCHECK(_cur_block == nullptr);

Expand Down
4 changes: 4 additions & 0 deletions be/src/exec/spill/input_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <mutex>
#include <utility>

#include "block_manager.h"
#include "column/vectorized_fwd.h"
#include "common/statusor.h"
#include "exec/sort_exec_exprs.h"
Expand Down Expand Up @@ -81,6 +82,7 @@ class YieldableRestoreTask {
class BlockGroup {
public:
BlockGroup() = default;
BlockGroup(BlockAffinityGroup affinity_group) : _affinity_group(affinity_group) {}

void append(BlockPtr block) {
DCHECK(block != nullptr);
Expand Down Expand Up @@ -108,11 +110,13 @@ class BlockGroup {
}
return num_rows;
}
BlockAffinityGroup get_affinity_group() const { return _affinity_group; }

private:
// used to cache data_size in blocks
mutable std::optional<size_t> _data_size;
std::vector<BlockPtr> _blocks;
BlockAffinityGroup _affinity_group = kDefaultBlockAffinityGroup;
};
using BlockGroupPtr = std::shared_ptr<BlockGroup>;

Expand Down
55 changes: 29 additions & 26 deletions be/src/exec/spill/log_block_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,16 @@

#include "exec/spill/log_block_manager.h"

#include <fmt/core.h>

#include <memory>
#include <mutex>
#include <optional>
#include <string_view>
#include <unordered_map>
#include <utility>

#include "block_manager.h"
#include "common/config.h"
#include "common/status.h"
#include "exec/spill/block_manager.h"
Expand All @@ -33,6 +36,7 @@
#include "storage/options.h"
#include "util/defer_op.h"
#include "util/raw_container.h"
#include "util/stack_util.h"
#include "util/uid_util.h"

namespace starrocks::spill {
Expand Down Expand Up @@ -204,8 +208,8 @@ class LogBlock : public Block {

std::string debug_string() const override {
#ifndef BE_TEST
return fmt::format("LogBlock:{}[container={}, offset={}, len={}]", (void*)this, _container->path(), _offset,
_size);
return fmt::format("LogBlock:{}[container={}, offset={}, len={}, affinity_group={}]", (void*)this,
_container->path(), _offset, _size, _affinity_group);
#else
return fmt::format("LogBlock[container={}]", _container->path());
#endif
Expand Down Expand Up @@ -252,16 +256,17 @@ StatusOr<BlockPtr> LogBlockManager::acquire_block(const AcquireBlockOptions& opt
#endif

ASSIGN_OR_RETURN(auto block_container, get_or_create_container(dir, opts.fragment_instance_id, opts.plan_node_id,
opts.name, opts.direct_io));
opts.name, opts.direct_io, opts.affinity_group));
auto res = std::make_shared<LogBlock>(block_container, block_container->size());
res->set_is_remote(dir->is_remote());
res->set_exclusive(opts.exclusive);
res->set_affinity_group(opts.affinity_group);
return res;
}

Status LogBlockManager::release_block(BlockPtr block) {
auto log_block = down_cast<LogBlock*>(block.get());
auto container = log_block->container();
auto affinity_group = block->affinity_group();
TRACE_SPILL_LOG << "release block: " << block->debug_string();
bool is_full = container->size() >= _max_container_bytes;
if (is_full) {
Expand All @@ -272,40 +277,38 @@ Status LogBlockManager::release_block(BlockPtr block) {
if (is_full) {
TRACE_SPILL_LOG << "mark container as full: " << container->path();
_full_containers.emplace_back(container);
} else if (!log_block->exclusive()) {
TRACE_SPILL_LOG << "return container to the pool: " << container->path();
} else {
auto dir = container->dir();
int32_t plan_node_id = container->plan_node_id();

auto iter = _available_containers.find(dir);
auto iter = _available_containers.find(affinity_group);
CHECK(iter != _available_containers.end());
auto sub_iter = iter->second->find(plan_node_id);
sub_iter->second->push(container);
iter->second->find(dir)->second->find(plan_node_id)->second->push(container);
}
return Status::OK();
}

StatusOr<LogBlockContainerPtr> LogBlockManager::get_or_create_container(const DirPtr& dir,
const TUniqueId& fragment_instance_id,
int32_t plan_node_id,
const std::string& plan_node_name,
bool direct_io) {
Status LogBlockManager::release_affinity_group(const BlockAffinityGroup affinity_group) {
std::lock_guard<std::mutex> l(_mutex);
size_t count = _available_containers.erase(affinity_group);
DCHECK(count == 1) << "can't find affinity_group: " << affinity_group;
return count == 1 ? Status::OK()
: Status::InternalError(fmt::format("can't find affinity_group {}", affinity_group));
}

StatusOr<LogBlockContainerPtr> LogBlockManager::get_or_create_container(
const DirPtr& dir, const TUniqueId& fragment_instance_id, int32_t plan_node_id,
const std::string& plan_node_name, bool direct_io, BlockAffinityGroup affinity_group) {
TRACE_SPILL_LOG << "get_or_create_container at dir: " << dir->dir()
<< ". fragment instance: " << print_id(fragment_instance_id) << ", plan node:" << plan_node_id
<< ", " << plan_node_name;

std::lock_guard<std::mutex> l(_mutex);
auto iter = _available_containers.find(dir.get());
if (iter == _available_containers.end()) {
_available_containers.insert({dir.get(), std::make_shared<PlanNodeContainerMap>()});
iter = _available_containers.find(dir.get());
}
auto sub_iter = iter->second->find(plan_node_id);
if (sub_iter == iter->second->end()) {
iter->second->insert({plan_node_id, std::make_shared<ContainerQueue>()});
sub_iter = iter->second->find(plan_node_id);
}
auto& q = sub_iter->second;

auto avaiable_containers =
_available_containers.try_emplace(affinity_group, std::make_shared<DirContainerMap>()).first->second;
auto dir_container_map =
avaiable_containers->try_emplace(dir.get(), std::make_shared<PlanNodeContainerMap>()).first->second;
auto q = dir_container_map->try_emplace(plan_node_id, std::make_shared<ContainerQueue>()).first->second;
if (!q->empty()) {
auto container = q->front();
TRACE_SPILL_LOG << "return an existed container: " << container->path();
Expand Down
14 changes: 8 additions & 6 deletions be/src/exec/spill/log_block_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
#include <queue>
#include <unordered_map>

#include "block_manager.h"
#include "exec/spill/block_manager.h"
#include "exec/spill/dir_manager.h"
#include "util/phmap/phmap.h"

namespace starrocks::spill {

Expand Down Expand Up @@ -53,6 +55,7 @@ class LogBlockManager : public BlockManager {

StatusOr<BlockPtr> acquire_block(const AcquireBlockOptions& opts) override;
Status release_block(BlockPtr block) override;
Status release_affinity_group(const BlockAffinityGroup affinity_group) override;

#ifdef BE_TEST
void set_dir_manager(DirManager* dir_mgr) { _dir_mgr = dir_mgr; }
Expand All @@ -61,10 +64,10 @@ class LogBlockManager : public BlockManager {
private:
StatusOr<LogBlockContainerPtr> get_or_create_container(const DirPtr& dir, const TUniqueId& fragment_instance_id,
int32_t plan_node_id, const std::string& plan_node_name,
bool direct_io);
bool direct_io, BlockAffinityGroup affinity_group);

private:
typedef std::unordered_map<uint64_t, LogBlockContainerPtr> ContainerMap;
typedef phmap::flat_hash_map<uint64_t, LogBlockContainerPtr> ContainerMap;
typedef std::queue<LogBlockContainerPtr> ContainerQueue;
typedef std::shared_ptr<ContainerQueue> ContainerQueuePtr;

Expand All @@ -74,11 +77,10 @@ class LogBlockManager : public BlockManager {
std::atomic<uint64_t> _next_container_id = 0;
std::mutex _mutex;

typedef std::unordered_map<int32_t, ContainerQueuePtr> PlanNodeContainerMap;
typedef std::unordered_map<TUniqueId, std::shared_ptr<PlanNodeContainerMap>> QueryContainerMap;
typedef std::unordered_map<std::string, std::shared_ptr<QueryContainerMap>> DirContainerMap;
typedef phmap::flat_hash_map<int32_t, ContainerQueuePtr> PlanNodeContainerMap;
typedef phmap::flat_hash_map<Dir*, std::shared_ptr<PlanNodeContainerMap>> DirContainerMap;

std::unordered_map<Dir*, std::shared_ptr<PlanNodeContainerMap>> _available_containers;
phmap::flat_hash_map<BlockAffinityGroup, std::shared_ptr<DirContainerMap>> _available_containers;

std::vector<LogBlockContainerPtr> _full_containers;
DirManager* _dir_mgr = nullptr;
Expand Down
16 changes: 12 additions & 4 deletions be/src/exec/spill/spill_components.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <memory>
#include <numeric>

#include "block_manager.h"
#include "column/vectorized_fwd.h"
#include "common/config.h"
#include "exec/spill/common.h"
Expand Down Expand Up @@ -271,7 +272,6 @@ void PartitionedSpillerWriter::reset_partition(RuntimeState* state, size_t num_p
num_partitions = BitUtil::next_power_of_two(num_partitions);
num_partitions = std::min<size_t>(num_partitions, 1 << config::spill_max_partition_level);
num_partitions = std::max<size_t>(num_partitions, _spiller->options().init_partition_nums);

_level_to_partitions.clear();
_id_to_partitions.clear();
std::fill(_partition_set.begin(), _partition_set.end(), false);
Expand Down Expand Up @@ -319,21 +319,28 @@ void PartitionedSpillerWriter::_add_partition(SpilledPartitionPtr&& partition_pt
std::sort(partitions.begin(), partitions.end(),
[](const auto& left, const auto& right) { return left->partition_id < right->partition_id; });
_partition_set[partition->partition_id] = true;
_total_partition_num += 1;
}

void PartitionedSpillerWriter::_remove_partition(const SpilledPartition* partition) {
auto affinity_group = partition->block_group->get_affinity_group();
DCHECK(affinity_group != kDefaultBlockAffinityGroup);
_id_to_partitions.erase(partition->partition_id);
size_t level = partition->level;
auto& partitions = _level_to_partitions[level];
_partition_set[partition->partition_id] = false;
partitions.erase(std::find_if(partitions.begin(), partitions.end(),
[partition](auto& val) { return val->partition_id == partition->partition_id; }));
auto iter = std::find_if(partitions.begin(), partitions.end(),
[partition](auto& val) { return val->partition_id == partition->partition_id; });
_total_partition_num -= (iter != partitions.end());
partitions.erase(iter);
if (partitions.empty()) {
_level_to_partitions.erase(level);
if (_min_level == level) {
_min_level = level + 1;
}
}
WARN_IF_ERROR(_spiller->block_manager()->release_affinity_group(affinity_group),
fmt::format("release affinity group {} error", affinity_group));
}

Status PartitionedSpillerWriter::_choose_partitions_to_flush(bool is_final_flush,
Expand Down Expand Up @@ -466,7 +473,8 @@ Status PartitionedSpillerWriter::spill_partition(workgroup::YieldContext& yield_
auto mem_table = partition->spill_writer->mem_table();
auto mem_table_mem_usage = mem_table->mem_usage();
if (partition->spill_output_stream == nullptr) {
auto block_group = std::make_shared<BlockGroup>();
BlockAffinityGroup affinity_group = _spiller->block_manager()->acquire_affinity_group();
auto block_group = std::make_shared<BlockGroup>(affinity_group);
partition->block_group = block_group;
partition->spill_writer->add_block_group(std::move(block_group));
auto output = create_spill_output_stream(_spiller, partition->block_group.get(), _spiller->block_manager());
Expand Down
1 change: 1 addition & 0 deletions be/src/exec/spill/spill_components.h
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,7 @@ class PartitionedSpillerWriter final : public SpillerWriter {

// level to partition
std::map<int, std::vector<SpilledPartitionPtr>> _level_to_partitions;
size_t _total_partition_num = 0;

std::unordered_map<int, SpilledPartition*> _id_to_partitions;

Expand Down
Loading