Skip to content

Commit

Permalink
[Enhancement] reduce the number of fd hold by hash join spilling (bac…
Browse files Browse the repository at this point in the history
…kport #52020) (#52083)

Co-authored-by: eyes_on_me <[email protected]>
  • Loading branch information
mergify[bot] and silverbullet233 authored Oct 18, 2024
1 parent 0a6cdeb commit ed156ea
Show file tree
Hide file tree
Showing 7 changed files with 72 additions and 43 deletions.
19 changes: 15 additions & 4 deletions be/src/exec/spill/block_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@

namespace starrocks::spill {

using BlockAffinityGroup = uint64_t;
static const BlockAffinityGroup kDefaultBlockAffinityGroup = UINT64_MAX;

class BlockReader;
class BlockReaderOptions;
// Block represents a continuous storage space and is the smallest storage unit of flush and restore in spill task.
Expand Down Expand Up @@ -52,16 +55,16 @@ class Block {

virtual bool preallocate(size_t write_size) = 0;

bool exclusive() const { return _exclusive; }
void set_exclusive(bool exclusive) { _exclusive = exclusive; }

void inc_num_rows(size_t num_rows) { _num_rows += num_rows; }

void set_affinity_group(BlockAffinityGroup affinity_group) { _affinity_group = affinity_group; }
BlockAffinityGroup affinity_group() const { return _affinity_group; }

protected:
size_t _num_rows{};
size_t _size{};
bool _is_remote = false;
bool _exclusive{};
BlockAffinityGroup _affinity_group = kDefaultBlockAffinityGroup;
};

using BlockPtr = std::shared_ptr<Block>;
Expand Down Expand Up @@ -110,6 +113,7 @@ struct AcquireBlockOptions {
// The block will occupy the entire container, making it easier to remove the block.
bool exclusive = false;
size_t block_size = 0;
BlockAffinityGroup affinity_group = kDefaultBlockAffinityGroup;
};

// BlockManager is used to manage the life cycle of the Block.
Expand All @@ -121,9 +125,16 @@ class BlockManager {
virtual ~BlockManager() = default;
virtual Status open() = 0;
virtual void close() = 0;

// acquire a block from BlockManager, return error if BlockManager can't allocate one.
virtual StatusOr<BlockPtr> acquire_block(const AcquireBlockOptions& opts) = 0;
// return Block to BlockManager
virtual Status release_block(BlockPtr block) = 0;

BlockAffinityGroup acquire_affinity_group() { return _next_affinity_group++; }
virtual Status release_affinity_group(const BlockAffinityGroup affinity_group) { return Status::OK(); }

protected:
std::atomic<BlockAffinityGroup> _next_affinity_group = 0;
};
} // namespace starrocks::spill
6 changes: 3 additions & 3 deletions be/src/exec/spill/data_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,13 @@ Status BlockSpillOutputDataStream::_prepare_block(RuntimeState* state, size_t wr
opts.plan_node_id = _spiller->options().plan_node_id;
opts.name = _spiller->options().name;
opts.block_size = write_size;
opts.exclusive = _spiller->options().init_partition_nums > 0 || !_spiller->options().is_unordered;
opts.affinity_group = _block_group->get_affinity_group();
ASSIGN_OR_RETURN(auto block, _block_manager->acquire_block(opts));
// update metrics
auto block_count = GET_METRICS(block->is_remote(), _spiller->metrics(), block_count);
COUNTER_UPDATE(block_count, 1);
TRACE_SPILL_LOG << fmt::format("allocate block [{}]", block->debug_string());
TRACE_SPILL_LOG << fmt::format("allocate block [{}], affinity group[{}]", block->debug_string(),
opts.affinity_group);
_cur_block = std::move(block);
_block_group->append(_cur_block);
}
Expand Down Expand Up @@ -105,7 +106,6 @@ Status BlockSpillOutputDataStream::flush() {
TRACE_SPILL_LOG << fmt::format("flush block[{}]", _cur_block->debug_string());
}

// release block if not exclusive
RETURN_IF_ERROR(_block_manager->release_block(std::move(_cur_block)));
DCHECK(_cur_block == nullptr);

Expand Down
4 changes: 4 additions & 0 deletions be/src/exec/spill/input_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <mutex>
#include <utility>

#include "block_manager.h"
#include "column/vectorized_fwd.h"
#include "common/statusor.h"
#include "exec/sort_exec_exprs.h"
Expand Down Expand Up @@ -81,6 +82,7 @@ class YieldableRestoreTask {
class BlockGroup {
public:
BlockGroup() = default;
BlockGroup(BlockAffinityGroup affinity_group) : _affinity_group(affinity_group) {}

void append(BlockPtr block) {
DCHECK(block != nullptr);
Expand Down Expand Up @@ -108,11 +110,13 @@ class BlockGroup {
}
return num_rows;
}
BlockAffinityGroup get_affinity_group() const { return _affinity_group; }

private:
// used to cache data_size in blocks
mutable std::optional<size_t> _data_size;
std::vector<BlockPtr> _blocks;
BlockAffinityGroup _affinity_group = kDefaultBlockAffinityGroup;
};
using BlockGroupPtr = std::shared_ptr<BlockGroup>;

Expand Down
55 changes: 29 additions & 26 deletions be/src/exec/spill/log_block_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,16 @@

#include "exec/spill/log_block_manager.h"

#include <fmt/core.h>

#include <memory>
#include <mutex>
#include <optional>
#include <string_view>
#include <unordered_map>
#include <utility>

#include "block_manager.h"
#include "common/config.h"
#include "common/status.h"
#include "exec/spill/block_manager.h"
Expand All @@ -33,6 +36,7 @@
#include "storage/options.h"
#include "util/defer_op.h"
#include "util/raw_container.h"
#include "util/stack_util.h"
#include "util/uid_util.h"

namespace starrocks::spill {
Expand Down Expand Up @@ -204,8 +208,8 @@ class LogBlock : public Block {

std::string debug_string() const override {
#ifndef BE_TEST
return fmt::format("LogBlock:{}[container={}, offset={}, len={}]", (void*)this, _container->path(), _offset,
_size);
return fmt::format("LogBlock:{}[container={}, offset={}, len={}, affinity_group={}]", (void*)this,
_container->path(), _offset, _size, _affinity_group);
#else
return fmt::format("LogBlock[container={}]", _container->path());
#endif
Expand Down Expand Up @@ -252,16 +256,17 @@ StatusOr<BlockPtr> LogBlockManager::acquire_block(const AcquireBlockOptions& opt
#endif

ASSIGN_OR_RETURN(auto block_container, get_or_create_container(dir, opts.fragment_instance_id, opts.plan_node_id,
opts.name, opts.direct_io));
opts.name, opts.direct_io, opts.affinity_group));
auto res = std::make_shared<LogBlock>(block_container, block_container->size());
res->set_is_remote(dir->is_remote());
res->set_exclusive(opts.exclusive);
res->set_affinity_group(opts.affinity_group);
return res;
}

Status LogBlockManager::release_block(BlockPtr block) {
auto log_block = down_cast<LogBlock*>(block.get());
auto container = log_block->container();
auto affinity_group = block->affinity_group();
TRACE_SPILL_LOG << "release block: " << block->debug_string();
bool is_full = container->size() >= _max_container_bytes;
if (is_full) {
Expand All @@ -272,40 +277,38 @@ Status LogBlockManager::release_block(BlockPtr block) {
if (is_full) {
TRACE_SPILL_LOG << "mark container as full: " << container->path();
_full_containers.emplace_back(container);
} else if (!log_block->exclusive()) {
TRACE_SPILL_LOG << "return container to the pool: " << container->path();
} else {
auto dir = container->dir();
int32_t plan_node_id = container->plan_node_id();

auto iter = _available_containers.find(dir);
auto iter = _available_containers.find(affinity_group);
CHECK(iter != _available_containers.end());
auto sub_iter = iter->second->find(plan_node_id);
sub_iter->second->push(container);
iter->second->find(dir)->second->find(plan_node_id)->second->push(container);
}
return Status::OK();
}

StatusOr<LogBlockContainerPtr> LogBlockManager::get_or_create_container(const DirPtr& dir,
const TUniqueId& fragment_instance_id,
int32_t plan_node_id,
const std::string& plan_node_name,
bool direct_io) {
Status LogBlockManager::release_affinity_group(const BlockAffinityGroup affinity_group) {
std::lock_guard<std::mutex> l(_mutex);
size_t count = _available_containers.erase(affinity_group);
DCHECK(count == 1) << "can't find affinity_group: " << affinity_group;
return count == 1 ? Status::OK()
: Status::InternalError(fmt::format("can't find affinity_group {}", affinity_group));
}

StatusOr<LogBlockContainerPtr> LogBlockManager::get_or_create_container(
const DirPtr& dir, const TUniqueId& fragment_instance_id, int32_t plan_node_id,
const std::string& plan_node_name, bool direct_io, BlockAffinityGroup affinity_group) {
TRACE_SPILL_LOG << "get_or_create_container at dir: " << dir->dir()
<< ". fragment instance: " << print_id(fragment_instance_id) << ", plan node:" << plan_node_id
<< ", " << plan_node_name;

std::lock_guard<std::mutex> l(_mutex);
auto iter = _available_containers.find(dir.get());
if (iter == _available_containers.end()) {
_available_containers.insert({dir.get(), std::make_shared<PlanNodeContainerMap>()});
iter = _available_containers.find(dir.get());
}
auto sub_iter = iter->second->find(plan_node_id);
if (sub_iter == iter->second->end()) {
iter->second->insert({plan_node_id, std::make_shared<ContainerQueue>()});
sub_iter = iter->second->find(plan_node_id);
}
auto& q = sub_iter->second;

auto avaiable_containers =
_available_containers.try_emplace(affinity_group, std::make_shared<DirContainerMap>()).first->second;
auto dir_container_map =
avaiable_containers->try_emplace(dir.get(), std::make_shared<PlanNodeContainerMap>()).first->second;
auto q = dir_container_map->try_emplace(plan_node_id, std::make_shared<ContainerQueue>()).first->second;
if (!q->empty()) {
auto container = q->front();
TRACE_SPILL_LOG << "return an existed container: " << container->path();
Expand Down
14 changes: 8 additions & 6 deletions be/src/exec/spill/log_block_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
#include <queue>
#include <unordered_map>

#include "block_manager.h"
#include "exec/spill/block_manager.h"
#include "exec/spill/dir_manager.h"
#include "util/phmap/phmap.h"

namespace starrocks::spill {

Expand Down Expand Up @@ -53,6 +55,7 @@ class LogBlockManager : public BlockManager {

StatusOr<BlockPtr> acquire_block(const AcquireBlockOptions& opts) override;
Status release_block(BlockPtr block) override;
Status release_affinity_group(const BlockAffinityGroup affinity_group) override;

#ifdef BE_TEST
void set_dir_manager(DirManager* dir_mgr) { _dir_mgr = dir_mgr; }
Expand All @@ -61,10 +64,10 @@ class LogBlockManager : public BlockManager {
private:
StatusOr<LogBlockContainerPtr> get_or_create_container(const DirPtr& dir, const TUniqueId& fragment_instance_id,
int32_t plan_node_id, const std::string& plan_node_name,
bool direct_io);
bool direct_io, BlockAffinityGroup affinity_group);

private:
typedef std::unordered_map<uint64_t, LogBlockContainerPtr> ContainerMap;
typedef phmap::flat_hash_map<uint64_t, LogBlockContainerPtr> ContainerMap;
typedef std::queue<LogBlockContainerPtr> ContainerQueue;
typedef std::shared_ptr<ContainerQueue> ContainerQueuePtr;

Expand All @@ -74,11 +77,10 @@ class LogBlockManager : public BlockManager {
std::atomic<uint64_t> _next_container_id = 0;
std::mutex _mutex;

typedef std::unordered_map<int32_t, ContainerQueuePtr> PlanNodeContainerMap;
typedef std::unordered_map<TUniqueId, std::shared_ptr<PlanNodeContainerMap>> QueryContainerMap;
typedef std::unordered_map<std::string, std::shared_ptr<QueryContainerMap>> DirContainerMap;
typedef phmap::flat_hash_map<int32_t, ContainerQueuePtr> PlanNodeContainerMap;
typedef phmap::flat_hash_map<Dir*, std::shared_ptr<PlanNodeContainerMap>> DirContainerMap;

std::unordered_map<Dir*, std::shared_ptr<PlanNodeContainerMap>> _available_containers;
phmap::flat_hash_map<BlockAffinityGroup, std::shared_ptr<DirContainerMap>> _available_containers;

std::vector<LogBlockContainerPtr> _full_containers;
DirManager* _dir_mgr = nullptr;
Expand Down
16 changes: 12 additions & 4 deletions be/src/exec/spill/spill_components.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <memory>
#include <numeric>

#include "block_manager.h"
#include "column/vectorized_fwd.h"
#include "common/config.h"
#include "exec/spill/common.h"
Expand Down Expand Up @@ -271,7 +272,6 @@ void PartitionedSpillerWriter::reset_partition(RuntimeState* state, size_t num_p
num_partitions = BitUtil::next_power_of_two(num_partitions);
num_partitions = std::min<size_t>(num_partitions, 1 << config::spill_max_partition_level);
num_partitions = std::max<size_t>(num_partitions, _spiller->options().init_partition_nums);

_level_to_partitions.clear();
_id_to_partitions.clear();
std::fill(_partition_set.begin(), _partition_set.end(), false);
Expand Down Expand Up @@ -319,21 +319,28 @@ void PartitionedSpillerWriter::_add_partition(SpilledPartitionPtr&& partition_pt
std::sort(partitions.begin(), partitions.end(),
[](const auto& left, const auto& right) { return left->partition_id < right->partition_id; });
_partition_set[partition->partition_id] = true;
_total_partition_num += 1;
}

void PartitionedSpillerWriter::_remove_partition(const SpilledPartition* partition) {
auto affinity_group = partition->block_group->get_affinity_group();
DCHECK(affinity_group != kDefaultBlockAffinityGroup);
_id_to_partitions.erase(partition->partition_id);
size_t level = partition->level;
auto& partitions = _level_to_partitions[level];
_partition_set[partition->partition_id] = false;
partitions.erase(std::find_if(partitions.begin(), partitions.end(),
[partition](auto& val) { return val->partition_id == partition->partition_id; }));
auto iter = std::find_if(partitions.begin(), partitions.end(),
[partition](auto& val) { return val->partition_id == partition->partition_id; });
_total_partition_num -= (iter != partitions.end());
partitions.erase(iter);
if (partitions.empty()) {
_level_to_partitions.erase(level);
if (_min_level == level) {
_min_level = level + 1;
}
}
WARN_IF_ERROR(_spiller->block_manager()->release_affinity_group(affinity_group),
fmt::format("release affinity group {} error", affinity_group));
}

Status PartitionedSpillerWriter::_choose_partitions_to_flush(bool is_final_flush,
Expand Down Expand Up @@ -466,7 +473,8 @@ Status PartitionedSpillerWriter::spill_partition(workgroup::YieldContext& yield_
auto mem_table = partition->spill_writer->mem_table();
auto mem_table_mem_usage = mem_table->mem_usage();
if (partition->spill_output_stream == nullptr) {
auto block_group = std::make_shared<BlockGroup>();
BlockAffinityGroup affinity_group = _spiller->block_manager()->acquire_affinity_group();
auto block_group = std::make_shared<BlockGroup>(affinity_group);
partition->block_group = block_group;
partition->spill_writer->add_block_group(std::move(block_group));
auto output = create_spill_output_stream(_spiller, partition->block_group.get(), _spiller->block_manager());
Expand Down
1 change: 1 addition & 0 deletions be/src/exec/spill/spill_components.h
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,7 @@ class PartitionedSpillerWriter final : public SpillerWriter {

// level to partition
std::map<int, std::vector<SpilledPartitionPtr>> _level_to_partitions;
size_t _total_partition_num = 0;

std::unordered_map<int, SpilledPartition*> _id_to_partitions;

Expand Down

0 comments on commit ed156ea

Please sign in to comment.