Skip to content

Commit

Permalink
[Enhancement] split chunk of HashTable (#51175)
Browse files Browse the repository at this point in the history
Signed-off-by: Murphy <[email protected]>
  • Loading branch information
murphyatwork authored Oct 18, 2024
1 parent 22a1f91 commit 5dd0cc5
Show file tree
Hide file tree
Showing 17 changed files with 844 additions and 55 deletions.
183 changes: 179 additions & 4 deletions be/src/bench/shuffle_chunk_bench.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@
#include <testutil/assert.h>

#include <memory>
#include <random>

#include "column/chunk.h"
#include "column/column_helper.h"
#include "column/datum_tuple.h"
#include "common/config.h"
#include "runtime/chunk_cursor.h"
#include "runtime/runtime_state.h"
#include "column/vectorized_fwd.h"
#include "runtime/types.h"
#include "storage/chunk_helper.h"
#include "types/logical_type.h"

namespace starrocks {

Expand Down Expand Up @@ -196,6 +196,181 @@ static void bench_func(benchmark::State& state) {
perf.do_bench(state);
}

// Benchmark SegmentedColumn::clone_selective && Chunk::append_selective function
class SegmentedChunkPerf {
public:
SegmentedChunkPerf() = default;

void prepare_bench_segmented_chunk_clone(benchmark::State& state) {
// std::cerr << "chunk_size: " << _dest_chunk_size << std::endl;
// std::cerr << "segment_size: " << _segment_size << std::endl;
// std::cerr << "segmented_chunk_size: " << _segment_chunk_size << std::endl;
SegmentedChunkPtr seg_chunk = prepare_chunk();
CHECK_EQ(seg_chunk->num_rows(), _segment_chunk_size);

// random select
random_select(select, _dest_chunk_size, seg_chunk->num_rows());
}

void prepare_bench_chunk_clone(benchmark::State& state) {
ChunkPtr chunk = build_chunk(_segment_size);
CHECK_EQ(chunk->num_rows(), _segment_size);
random_select(select, _dest_chunk_size, chunk->num_rows());
}

void prepare(benchmark::State& state) {
state.PauseTiming();

_column_count = state.range(0);
_data_type = state.range(1);
_num_segments = state.range(2);
_types.clear();

prepare_bench_chunk_clone(state);
prepare_bench_segmented_chunk_clone(state);

state.ResumeTiming();
}

void do_bench_segmented_chunk_clone(benchmark::State& state) {
SegmentedChunkPtr seg_chunk = prepare_chunk();
// clone_selective
size_t items = 0;
for (auto _ : state) {
for (auto& column : seg_chunk->columns()) {
auto cloned = column->clone_selective(select.data(), 0, select.size());
}
items += select.size();
}
state.SetItemsProcessed(items);
}

void do_bench_chunk_clone(benchmark::State& state) {
ChunkPtr chunk = prepare_big_chunk();
size_t items = 0;
for (auto _ : state) {
ChunkPtr empty = chunk->clone_empty();
empty->append_selective(*chunk, select.data(), 0, select.size());
items += select.size();
}
state.SetItemsProcessed(items);
}

ChunkPtr prepare_big_chunk() {
if (_big_chunk) {
return _big_chunk;
}
_big_chunk = build_chunk(_segment_chunk_size);
return _big_chunk;
}

SegmentedChunkPtr prepare_chunk() {
if (_seg_chunk) {
return _seg_chunk;
}
ChunkPtr chunk = build_chunk(_dest_chunk_size);

for (int i = 0; i < (_segment_chunk_size / _dest_chunk_size); i++) {
if (!_seg_chunk) {
_seg_chunk = SegmentedChunk::create(_segment_size);
ChunkPtr chunk = build_chunk(_dest_chunk_size);
auto map = chunk->get_slot_id_to_index_map();
for (auto entry : map) {
_seg_chunk->append_column(chunk->get_column_by_slot_id(entry.first), entry.first);
}
_seg_chunk->build_columns();
} else {
// std::cerr << " append " << chunk->num_rows() << "rows, become " << _seg_chunk->num_rows() << std::endl;
_seg_chunk->append_chunk(chunk);
}
}
return _seg_chunk;
}

void random_select(std::vector<uint32_t>& select, size_t count, size_t range) {
select.resize(count);
std::random_device rd;
std::mt19937 gen(rd());
std::uniform_int_distribution<> dis(0, range - 1);
std::generate(select.begin(), select.end(), [&]() { return dis(gen); });
}

ChunkPtr build_chunk(size_t chunk_size) {
if (_types.empty()) {
for (int i = 0; i < _column_count; i++) {
if (_data_type == 0) {
_types.emplace_back(TypeDescriptor::create_varchar_type(128));
} else if (_data_type == 1) {
_types.emplace_back(LogicalType::TYPE_INT);
} else {
CHECK(false) << "data type not supported: " << _data_type;
}
}
}

auto chunk = std::make_unique<Chunk>();
for (int i = 0; i < _column_count; i++) {
auto col = init_dest_column(_types[i], chunk_size);
chunk->append_column(col, i);
}
return chunk;
}

ColumnPtr init_dest_column(const TypeDescriptor& type, size_t chunk_size) {
auto c1 = ColumnHelper::create_column(type, true);
c1->reserve(chunk_size);
for (int i = 0; i < chunk_size; i++) {
if (type.is_string_type()) {
std::string str = fmt::format("str{}", i);
c1->append_datum(Slice(str));
} else if (type.is_integer_type()) {
c1->append_datum(i);
} else {
CHECK(false) << "data type not supported";
}
}
return c1;
}

private:
int _column_count = 4;
int _data_type = 0;
size_t _dest_chunk_size = 4096;
size_t _segment_size = 65536;
size_t _num_segments = 10;
size_t _segment_chunk_size = _segment_size * _num_segments;

SegmentedChunkPtr _seg_chunk;
ChunkPtr _big_chunk;
std::vector<uint32_t> select;
std::vector<TypeDescriptor> _types;
};

static void BenchSegmentedChunkClone(benchmark::State& state) {
google::InstallFailureSignalHandler();
auto perf = std::make_unique<SegmentedChunkPerf>();
perf->prepare(state);
perf->do_bench_segmented_chunk_clone(state);
}

static void BenchChunkClone(benchmark::State& state) {
google::InstallFailureSignalHandler();
auto perf = std::make_unique<SegmentedChunkPerf>();
perf->prepare(state);
perf->do_bench_chunk_clone(state);
}

static std::vector<std::vector<int64_t>> chunk_clone_args() {
return {
{1, 2, 3, 4}, // num columns
{0, 1}, // data type
{1, 4, 16, 64} // num_segments
};
}

BENCHMARK(BenchSegmentedChunkClone)->ArgsProduct(chunk_clone_args());
BENCHMARK(BenchChunkClone)->ArgsProduct(chunk_clone_args());

static void process_args(benchmark::internal::Benchmark* b) {
// chunk_count, column_count, node_count, src_chunk_size, null percent
b->Args({400, 400, 140, 4096, 80});
Expand Down
8 changes: 8 additions & 0 deletions be/src/column/binary_column.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,16 @@ void BinaryColumnBase<T>::check_or_die() const {
}
}

template <typename T>
void BinaryColumnBase<T>::append(const Slice& str) {
_bytes.insert(_bytes.end(), str.data, str.data + str.size);
_offsets.emplace_back(_bytes.size());
_slices_cache = false;
}

template <typename T>
void BinaryColumnBase<T>::append(const Column& src, size_t offset, size_t count) {
DCHECK(offset + count <= src.size());
const auto& b = down_cast<const BinaryColumnBase<T>&>(src);
const unsigned char* p = &b._bytes[b._offsets[offset]];
const unsigned char* e = &b._bytes[b._offsets[offset + count]];
Expand Down
8 changes: 2 additions & 6 deletions be/src/column/binary_column.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class BinaryColumnBase final : public ColumnFactory<Column, BinaryColumnBase<T>>

using Offset = T;
using Offsets = Buffer<T>;

using Byte = uint8_t;
using Bytes = starrocks::raw::RawVectorPad16<uint8_t, ColumnAllocator<uint8_t>>;

struct BinaryDataProxyContainer {
Expand Down Expand Up @@ -172,11 +172,7 @@ class BinaryColumnBase final : public ColumnFactory<Column, BinaryColumnBase<T>>
// No complain about the overloaded-virtual for this function
DIAGNOSTIC_PUSH
DIAGNOSTIC_IGNORE("-Woverloaded-virtual")
void append(const Slice& str) {
_bytes.insert(_bytes.end(), str.data, str.data + str.size);
_offsets.emplace_back(_bytes.size());
_slices_cache = false;
}
void append(const Slice& str);
DIAGNOSTIC_POP

void append_datum(const Datum& datum) override {
Expand Down
27 changes: 26 additions & 1 deletion be/src/column/column_helper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "column/vectorized_fwd.h"
#include "gutil/casts.h"
#include "simd/simd.h"
#include "storage/chunk_helper.h"
#include "types/logical_type_infra.h"
#include "util/date_func.h"
#include "util/percentile_value.h"
Expand Down Expand Up @@ -465,7 +466,7 @@ size_t ChunkSliceTemplate<Ptr>::skip(size_t skip_rows) {

// Cutoff required rows from this chunk
template <class Ptr>
Ptr ChunkSliceTemplate<Ptr>::cutoff(size_t required_rows) {
ChunkUniquePtr ChunkSliceTemplate<Ptr>::cutoff(size_t required_rows) {
DCHECK(!empty());
size_t cut_rows = std::min(rows(), required_rows);
auto res = chunk->clone_empty(cut_rows);
Expand All @@ -478,7 +479,31 @@ Ptr ChunkSliceTemplate<Ptr>::cutoff(size_t required_rows) {
return res;
}

// Specialized for SegmentedChunkPtr
template <>
ChunkUniquePtr ChunkSliceTemplate<SegmentedChunkPtr>::cutoff(size_t required_rows) {
DCHECK(!empty());
// cutoff a chunk from current segment, if it doesn't meet the requirement just let it be
ChunkPtr segment = chunk->segments()[segment_id];
size_t segment_offset = offset % chunk->segment_size();
size_t cut_rows = std::min(segment->num_rows() - segment_offset, required_rows);

auto res = segment->clone_empty(cut_rows);
res->append(*segment, segment_offset, cut_rows);
offset += cut_rows;

// move to next segment
segment_id = offset / chunk->segment_size();

if (empty()) {
chunk->reset();
offset = 0;
}
return res;
}

template struct ChunkSliceTemplate<ChunkPtr>;
template struct ChunkSliceTemplate<ChunkUniquePtr>;
template struct ChunkSliceTemplate<SegmentedChunkPtr>;

} // namespace starrocks
4 changes: 3 additions & 1 deletion be/src/column/column_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -536,12 +536,13 @@ class ColumnHelper {
template <class Ptr = ChunkUniquePtr>
struct ChunkSliceTemplate {
Ptr chunk;
size_t segment_id = 0;
size_t offset = 0;

bool empty() const;
size_t rows() const;
size_t skip(size_t skip_rows);
Ptr cutoff(size_t required_rows);
ChunkUniquePtr cutoff(size_t required_rows);
void reset(Ptr input);
};

Expand Down Expand Up @@ -571,5 +572,6 @@ APPLY_FOR_ALL_STRING_TYPE(GET_CONTAINER)

using ChunkSlice = ChunkSliceTemplate<ChunkUniquePtr>;
using ChunkSharedSlice = ChunkSliceTemplate<ChunkPtr>;
using SegmentedChunkSlice = ChunkSliceTemplate<SegmentedChunkPtr>;

} // namespace starrocks
2 changes: 2 additions & 0 deletions be/src/column/const_column.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ class ConstColumn final : public ColumnFactory<Column, ConstColumn> {
friend class ColumnFactory<Column, ConstColumn>;

public:
using ValueType = void;

explicit ConstColumn(ColumnPtr data_column);
ConstColumn(ColumnPtr data_column, size_t size);

Expand Down
2 changes: 2 additions & 0 deletions be/src/column/nullable_column.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ class NullableColumn : public ColumnFactory<Column, NullableColumn> {
friend class ColumnFactory<Column, NullableColumn>;

public:
using ValueType = void;

inline static ColumnPtr wrap_if_necessary(ColumnPtr column) {
if (column->is_nullable()) {
return column;
Expand Down
6 changes: 6 additions & 0 deletions be/src/column/vectorized_fwd.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,12 @@ using ChunkPtr = std::shared_ptr<Chunk>;
using ChunkUniquePtr = std::unique_ptr<Chunk>;
using Chunks = std::vector<ChunkPtr>;

class SegmentedColumn;
class SegmentedChunk;
using SegmentedColumnPtr = std::shared_ptr<SegmentedColumn>;
using SegmentedColumns = std::vector<SegmentedColumnPtr>;
using SegmentedChunkPtr = std::shared_ptr<SegmentedChunk>;

using SchemaPtr = std::shared_ptr<Schema>;

using Fields = std::vector<std::shared_ptr<Field>>;
Expand Down
Loading

0 comments on commit 5dd0cc5

Please sign in to comment.