From c796736807664191a8a07f39148e302e1372dc1b Mon Sep 17 00:00:00 2001 From: Kould Date: Fri, 22 Dec 2023 22:34:36 +0800 Subject: [PATCH] perf: optimize physical_sort.cpp implementation (#354) --- .../local_infinity/infinity_benchmark.cpp | 195 ++++++++++++------ src/executor/operator/physical_sort.cpp | 126 +++++------ src/storage/data_block.cppm | 10 + 3 files changed, 209 insertions(+), 122 deletions(-) diff --git a/benchmark/local_infinity/infinity_benchmark.cpp b/benchmark/local_infinity/infinity_benchmark.cpp index 7892e12db3..c79edd6678 100644 --- a/benchmark/local_infinity/infinity_benchmark.cpp +++ b/benchmark/local_infinity/infinity_benchmark.cpp @@ -38,8 +38,8 @@ using namespace infinity; constexpr u64 second_unit = 1000 * 1000 * 1000; -double Measurement(SizeT thread_num, SizeT times, const StdFunction, std::thread::id)> &closure) { - infinity::BaseProfiler profiler; +double Measurement(String name, SizeT thread_num, SizeT times, const StdFunction, std::thread::id)> &closure) { + infinity::BaseProfiler profiler(name); Vector threads; threads.reserve(thread_num); @@ -88,20 +88,20 @@ int main() { Vector results; // Database { - auto tims_costing_second = Measurement(thread_num, total_times, [&](SizeT i, SharedPtr infinity, std::thread::id thread_id) { + auto tims_costing_second = Measurement("Get Database", thread_num, total_times, [&](SizeT i, SharedPtr infinity, std::thread::id thread_id) { __attribute__((unused)) auto ignored = infinity->GetDatabase("default"); }); results.push_back(Format("-> Get Database QPS: {}", total_times / tims_costing_second)); } { - auto tims_costing_second = Measurement(thread_num, total_times, [&](SizeT i, SharedPtr infinity, std::thread::id thread_id) { + auto tims_costing_second = Measurement("List Databases", thread_num, total_times, [&](SizeT i, SharedPtr infinity, std::thread::id thread_id) { __attribute__((unused)) auto ignored = infinity->ListDatabases(); }); results.push_back(Format("-> List Databases QPS: {}", total_times / tims_costing_second)); } { CreateDatabaseOptions create_db_opts; - auto tims_costing_second = Measurement(thread_num, total_times, [&](SizeT i, SharedPtr infinity, std::thread::id thread_id) { + auto tims_costing_second = Measurement("Create Database", thread_num, total_times, [&](SizeT i, SharedPtr infinity, std::thread::id thread_id) { __attribute__((unused)) auto ignored = infinity->CreateDatabase(ToStr(i), create_db_opts); }); results.push_back(Format("-> Create Database QPS: {}", total_times / tims_costing_second)); @@ -109,7 +109,7 @@ int main() { { DropDatabaseOptions drop_db_opts; - auto tims_costing_second = Measurement(thread_num, total_times, [&](SizeT i, SharedPtr infinity, std::thread::id thread_id) { + auto tims_costing_second = Measurement("Drop Database", thread_num, total_times, [&](SizeT i, SharedPtr infinity, std::thread::id thread_id) { __attribute__((unused)) auto ignored = infinity->DropDatabase(ToStr(i), drop_db_opts); }); results.push_back(Format("-> Drop Database QPS: {}", total_times / tims_costing_second)); @@ -140,19 +140,20 @@ int main() { infinity->LocalDisconnect(); } // { - // auto tims_costing_second = Measurement(thread_num, total_times, [&](SizeT i, SharedPtr infinity, std::thread::id thread_id) { + // auto tims_costing_second = Measurement(thread_num, total_times, [&](SizeT i, SharedPtr infinity, std::thread::id + // thread_id) { // __attribute__((unused)) auto ignored = infinity->GetDatabase("default")->ListTables(); // }); // results.push_back(Format("-> List Tables QPS: {}", total_times / tims_costing_second)); // } { - auto tims_costing_second = Measurement(thread_num, total_times, [&](SizeT i, SharedPtr infinity, std::thread::id thread_id) { + auto tims_costing_second = Measurement("Get Tables", thread_num, total_times, [&](SizeT i, SharedPtr infinity, std::thread::id thread_id) { __attribute__((unused)) auto ignored = infinity->GetDatabase("default")->GetTable("benchmark_test"); }); results.push_back(Format("-> Get Tables QPS: {}", total_times / tims_costing_second)); } { - auto tims_costing_second = Measurement(thread_num, total_times, [&](SizeT i, SharedPtr infinity, std::thread::id thread_id) { + auto tims_costing_second = Measurement("Describe Tables", thread_num, total_times, [&](SizeT i, SharedPtr infinity, std::thread::id thread_id) { __attribute__((unused)) auto ignored = infinity->GetDatabase("default")->DescribeTable("benchmark_test"); }); results.push_back(Format("-> Describe Tables QPS: {}", total_times / tims_costing_second)); @@ -179,32 +180,120 @@ int main() { results.push_back(Format("-> Create Table QPS: {}", total_times / tims_costing_second)); } { - auto tims_costing_second = Measurement(thread_num, total_times, [&](SizeT i, SharedPtr infinity, std::thread::id thread_id) { + auto tims_costing_second = Measurement("Drop Table", thread_num, total_times, [&](SizeT i, SharedPtr infinity, std::thread::id thread_id) { __attribute__((unused)) auto ignored = infinity->GetDatabase("default")->DropTable(ToStr(i), drop_table_options); }); results.push_back(Format("-> Drop Table QPS: {}", total_times / tims_costing_second)); } { { - auto tims_costing_second = Measurement(thread_num, total_times, [&](SizeT i, SharedPtr infinity, std::thread::id - thread_id) { - Vector *output_columns = new Vector(); - ColumnExpr *col1 = new ColumnExpr(); - col1->names_.emplace_back("col1"); - output_columns->emplace_back(col1); + auto tims_costing_second = + Measurement("Select", thread_num, total_times, [&](SizeT i, SharedPtr infinity, std::thread::id thread_id) { + Vector *output_columns = new Vector(); + ColumnExpr *col1 = new ColumnExpr(); + col1->names_.emplace_back("col1"); + output_columns->emplace_back(col1); + + ColumnExpr *col2 = new ColumnExpr(); + col2->names_.emplace_back("col2"); + output_columns->emplace_back(col2); + + __attribute__((unused)) auto ignored = + infinity->GetDatabase("default")->GetTable("benchmark_test")->Search(nullptr, nullptr, output_columns); + }); + results.push_back(Format("-> Select QPS: {}", total_times / tims_costing_second)); + } + { + auto tims_costing_second = + Measurement("Insert", thread_num, total_times, [&](SizeT i, SharedPtr infinity, std::thread::id thread_id) { + Vector *> *values = new Vector *>(); + values->emplace_back(new Vector()); - ColumnExpr *col2 = new ColumnExpr(); - col2->names_.emplace_back("col2"); - output_columns->emplace_back(col2); + Vector *columns = new Vector(); + columns->emplace_back(col_name_1); + columns->emplace_back(col_name_2); - __attribute__((unused)) auto ignored = - infinity->GetDatabase("default")->GetTable("benchmark_test")->Search(nullptr, nullptr, output_columns); - }); - results.push_back(Format("-> Select QPS: {}", total_times / tims_costing_second)); + ConstantExpr *value1 = new ConstantExpr(LiteralType::kInteger); + value1->integer_value_ = i; + values->at(0)->emplace_back(value1); + + ConstantExpr *value2 = new ConstantExpr(LiteralType::kInteger); + value2->integer_value_ = i; + values->at(0)->emplace_back(value2); + + __attribute__((unused)) auto ignored = infinity->GetDatabase("default")->GetTable("benchmark_test")->Insert(columns, values); + }); + results.push_back(Format("-> Insert QPS: {}", total_times / tims_costing_second)); + } + { + auto tims_costing_second = + Measurement("Update", thread_num, total_times, [&](SizeT i, SharedPtr infinity, std::thread::id thread_id) { + Vector *values = new Vector(); + + Vector *columns = new Vector(); + columns->emplace_back(col_name_1); + columns->emplace_back(col_name_2); + + ConstantExpr *value1 = new ConstantExpr(LiteralType::kInteger); + value1->integer_value_ = i; + UpdateExpr *update_expr1 = new UpdateExpr(); + update_expr1->column_name = col_name_1; + update_expr1->value = value1; + + ConstantExpr *value2 = new ConstantExpr(LiteralType::kInteger); + value1->integer_value_ = i; + UpdateExpr *update_expr2 = new UpdateExpr(); + update_expr2->column_name = col_name_2; + update_expr2->value = value2; + + values->push_back(update_expr1); + values->push_back(update_expr2); + + __attribute__((unused)) auto ignored = infinity->GetDatabase("default")->GetTable("benchmark_test")->Update(nullptr, + values); + }); + results.push_back(Format("-> Update QPS: {}", total_times / tims_costing_second)); } { - auto tims_costing_second = Measurement(thread_num, total_times, [&](SizeT i, SharedPtr infinity, std::thread::id - thread_id) { + auto tims_costing_second = + Measurement("Delete", thread_num, total_times, [&](SizeT i, SharedPtr infinity, std::thread::id thread_id) { + __attribute__((unused)) auto ignored = infinity->GetDatabase("default")->GetTable("benchmark_test")->Delete(nullptr); + }); + results.push_back(Format("-> Delete QPS: {}", total_times / tims_costing_second)); + } + } + } + { + SizeT sort_row = 1000 * 1000; + SizeT sort_times = 48 * 10; + + String col_name_1 = "c1"; + String col_name_2 = "c2"; + + std::srand(static_cast(std::time(nullptr))); + + SharedPtr col_type = MakeShared(LogicalType::kInteger); + { + CreateTableOptions create_table_opts; + + SizeT column_count = 2; + Vector column_definitions; + column_definitions.reserve(column_count); + + auto col_def_1 = new ColumnDef(0, col_type, col_name_1, HashSet()); + column_definitions.emplace_back(col_def_1); + + auto col_def_2 = new ColumnDef(1, col_type, col_name_2, HashSet()); + column_definitions.emplace_back(col_def_2); + + SharedPtr infinity = Infinity::LocalConnect(); + __attribute__((unused)) auto ignored = + infinity->GetDatabase("default")->CreateTable("benchmark_sort", column_definitions, Vector(), create_table_opts); + infinity->LocalDisconnect(); + } + { + auto tims_costing_second = + Measurement("Insert for Select Sort", thread_num, sort_row, [&](SizeT i, SharedPtr infinity, std::thread::id thread_id) { Vector *> *values = new Vector *>(); values->emplace_back(new Vector()); @@ -213,52 +302,32 @@ int main() { columns->emplace_back(col_name_2); ConstantExpr *value1 = new ConstantExpr(LiteralType::kInteger); - value1->integer_value_ = i; + value1->integer_value_ = std::rand(); values->at(0)->emplace_back(value1); ConstantExpr *value2 = new ConstantExpr(LiteralType::kInteger); - value2->integer_value_ = i; + value2->integer_value_ = std::rand(); values->at(0)->emplace_back(value2); - __attribute__((unused)) auto ignored = infinity->GetDatabase("default")->GetTable("benchmark_test")->Insert(columns, values); + __attribute__((unused)) auto ignored = infinity->GetDatabase("default")->GetTable("benchmark_sort")->Insert(columns, values); }); - results.push_back(Format("-> Insert QPS: {}", total_times / tims_costing_second)); - } - { - auto tims_costing_second = Measurement(thread_num, total_times, [&](SizeT i, SharedPtr infinity, std::thread::id - thread_id) { - Vector *values = new Vector(); - - Vector *columns = new Vector(); - columns->emplace_back(col_name_1); - columns->emplace_back(col_name_2); - - ConstantExpr *value1 = new ConstantExpr(LiteralType::kInteger); - value1->integer_value_ = i; - UpdateExpr *update_expr1 = new UpdateExpr(); - update_expr1->column_name = col_name_1; - update_expr1->value = value1; - - ConstantExpr *value2 = new ConstantExpr(LiteralType::kInteger); - value1->integer_value_ = i; - UpdateExpr *update_expr2 = new UpdateExpr(); - update_expr2->column_name = col_name_2; - update_expr2->value = value2; + results.push_back(Format("-> Insert for Sort Time: {}s", tims_costing_second)); + } + { + auto tims_costing_second = + Measurement("Select Sort", thread_num, sort_times, [&](SizeT i, SharedPtr infinity, std::thread::id thread_id) { + Vector *output_columns = new Vector(); + ColumnExpr *col1 = new ColumnExpr(); + col1->names_.emplace_back("col1"); + output_columns->emplace_back(col1); - values->push_back(update_expr1); - values->push_back(update_expr2); + ColumnExpr *col2 = new ColumnExpr(); + col2->names_.emplace_back("col2"); + output_columns->emplace_back(col2); - __attribute__((unused)) auto ignored = infinity->GetDatabase("default")->GetTable("benchmark_test")->Update(nullptr, values); + __attribute__((unused)) auto ignored = infinity->Query("select c1, c2 from benchmark_sort order by c1"); }); - results.push_back(Format("-> Update QPS: {}", total_times / tims_costing_second)); - } - { - auto tims_costing_second = - Measurement(thread_num, total_times, [&](SizeT i, SharedPtr infinity, std::thread::id thread_id) { - __attribute__((unused)) auto ignored = infinity->GetDatabase("default")->GetTable("benchmark_test")->Delete(nullptr); - }); - results.push_back(Format("-> Delete QPS: {}", total_times / tims_costing_second)); - } + results.push_back(Format("-> Select Sort Time QPS: {}", sort_times / tims_costing_second)); } } @@ -318,7 +387,7 @@ int main() { } table->CreateIndex(index_name, index_info_list, CreateIndexOptions()); - } while(false); + } while (false); // { // std::cout << "--- Start to run search benchmark: "; diff --git a/src/executor/operator/physical_sort.cpp b/src/executor/operator/physical_sort.cpp index 105cbf2ffe..934e63b5c2 100644 --- a/src/executor/operator/physical_sort.cpp +++ b/src/executor/operator/physical_sort.cpp @@ -58,6 +58,37 @@ class Comparator { Vector> expressions) : order_by_blocks_(order_by_blocks), order_by_types_(order_by_types), expressions_(expressions) {} + void Init() { + if (order_by_blocks_.empty()) { + return; + } + eval_results_.reserve(order_by_blocks_.size()); + + ExpressionEvaluator expr_evaluator; + Vector> expr_states; + for (SizeT expr_id = 0; expr_id < expressions_.size(); ++expr_id) { + expr_states.push_back(ExpressionState::CreateState(expressions_[expr_id])); + } + + for (SizeT block_id = 0; block_id < order_by_blocks_.size(); ++block_id) { + Vector> results; + + expr_evaluator.Init(order_by_blocks_[block_id].get()); + results.reserve(expressions_.size()); + + for (SizeT expr_id = 0; expr_id < expressions_.size(); ++expr_id) { + auto expr = expressions_[expr_id]; + SharedPtr result_vector = MakeShared(MakeShared(expr->Type())); + + result_vector->Initialize(); + expr_evaluator.Execute(expr, expr_states[expr_id], result_vector); + + results.push_back(result_vector); + } + eval_results_.push_back(results); + } + } + bool operator()(BlockRawIndex left_index, BlockRawIndex right_index) { SizeT exprs_count = expressions_.size(); for (SizeT expr_idx = 0; expr_idx < exprs_count; ++expr_idx) { @@ -65,11 +96,8 @@ class Comparator { DataType type = expr->Type(); OrderType order_type = order_by_types_[expr_idx]; - const UniquePtr &left_block = order_by_blocks_[left_index.block_idx]; - SharedPtr left_result_vector = EvalOrderVector(left_block.get(), left_index.block_idx, expr); - - const UniquePtr &right_block = order_by_blocks_[right_index.block_idx]; - SharedPtr right_result_vector = EvalOrderVector(right_block.get(), right_index.block_idx, expr); + SharedPtr &left_result_vector = eval_results_[left_index.block_idx][expr_idx]; + SharedPtr &right_result_vector = eval_results_[right_index.block_idx][expr_idx]; switch (type.type()) { case kBoolean: { @@ -102,45 +130,15 @@ class Comparator { } private: - struct HashPair { - template - SizeT operator()(const Pair &p) const { - auto hash_1 = Hash(p.first); - auto hash_2 = Hash(p.second); - return hash_1 ^ hash_2; - } - }; - - SharedPtr EvalOrderVector(DataBlock *block, SizeT block_idx, SharedPtr &expr) { - Pair> eval_key = {block_idx, expr}; - - if (eval_cache_.contains(eval_key)) { - return eval_cache_[eval_key]; - } else { - ExpressionEvaluator expr_evaluator; - auto state = ExpressionState::CreateState(expr); - SharedPtr result_vector = MakeShared(MakeShared(expr->Type())); - - result_vector->Initialize(); - expr_evaluator.Init(block); - expr_evaluator.Execute(expr, state, result_vector); - - eval_cache_[eval_key] = result_vector; - - return result_vector; - } - } - const Vector> &order_by_blocks_; const Vector &order_by_types_; Vector> expressions_; - // K: BlockIdx & Expression - // V: ColumnVector - HashMap>, SharedPtr, HashPair> eval_cache_; + // Blocks -> Expressions + Vector>> eval_results_; }; -Vector MergeTwoIndexes(Vector &indexes_a, Vector &indexes_b, Comparator &comparator) { +Vector MergeTwoIndexes(Vector &&indexes_a, Vector &&indexes_b, Comparator &comparator) { if (indexes_a.empty() || indexes_b.empty()) { return indexes_a.empty() ? indexes_b : indexes_a; } @@ -169,25 +167,31 @@ Vector MergeTwoIndexes(Vector &indexes_a, Vector MergeIndexes(Vector> &&indexes_group, Comparator &comparator) { - Vector ans; - for (SizeT i = 0; i < indexes_group.size(); ++i) { - ans = MergeTwoIndexes(ans, indexes_group[i], comparator); - } - return ans; +Vector MergeIndexes(Vector> &indexes_group, SizeT l, SizeT r, Comparator &comparator) { + if (l == r) + return indexes_group[l]; + if (l > r) + return Vector(); + SizeT mid = (l + r) >> 1; + return MergeTwoIndexes(Move(MergeIndexes(indexes_group, l, mid, comparator)), + Move(MergeIndexes(indexes_group, mid + 1, r, comparator)), + comparator); } void CopyWithIndexes(const Vector> &input_blocks, Vector> &output_blocks, - const Vector &block_indexes, - SizeT start_block_index, - const SharedPtr>> &types) { + const Vector &block_indexes) { + if (input_blocks.empty()) { + return; + } + + SizeT start_block_index = output_blocks.size(); auto block_count = (block_indexes.size() + DEFAULT_BLOCK_CAPACITY - 1) / DEFAULT_BLOCK_CAPACITY; // copy with block_indexes and push to unmerge_sorted_blocks for (SizeT i = 0; i < block_count; ++i) { auto sorted_datablock = DataBlock::MakeUniquePtr(); - sorted_datablock->Init(*types.get()); + sorted_datablock->Init(input_blocks[0]->types()); output_blocks.push_back(Move(sorted_datablock)); } @@ -197,7 +201,9 @@ void CopyWithIndexes(const Vector> &input_blocks, output_blocks[(index_idx / DEFAULT_BLOCK_CAPACITY) + start_block_index]->column_vectors; for (SizeT column_id = 0; column_id < output_column_vectors.size(); ++column_id) { - output_column_vectors[column_id]->AppendValue(input_blocks[block_index.block_idx]->GetValue(column_id, block_index.offset)); + output_column_vectors[column_id]->AppendWith(*input_blocks[block_index.block_idx]->column_vectors[column_id].get(), + block_index.offset, + 1); } } for (SizeT i = 0; i < block_count; ++i) { @@ -210,12 +216,12 @@ void PhysicalSort::Init() {} bool PhysicalSort::Execute(QueryContext *, OperatorState *operator_state) { auto *prev_op_state = operator_state->prev_op_state_; auto *sort_operator_state = static_cast(operator_state); - auto types = left_->GetOutputTypes(); // Generate block indexes Vector block_indexes; auto pre_op_state = operator_state->prev_op_state_; + block_indexes.reserve(pre_op_state->data_block_array_.size() * DEFAULT_BLOCK_CAPACITY); // filling block_indexes for (SizeT block_id = 0; block_id < pre_op_state->data_block_array_.size(); block_id++) { for (SizeT offset = 0; offset < pre_op_state->data_block_array_[block_id]->row_count(); offset++) { @@ -224,14 +230,13 @@ bool PhysicalSort::Execute(QueryContext *, OperatorState *operator_state) { block_indexes.push_back(block_index); } } + auto block_comparator = Comparator(pre_op_state->data_block_array_, order_by_types_, expressions_); + + block_comparator.Init(); // sort block_indexes - std::sort(block_indexes.begin(), block_indexes.end(), Comparator(pre_op_state->data_block_array_, order_by_types_, expressions_)); + std::sort(block_indexes.begin(), block_indexes.end(), block_comparator); - CopyWithIndexes(pre_op_state->data_block_array_, - sort_operator_state->unmerge_sorted_blocks_, - block_indexes, - sort_operator_state->unmerge_sorted_blocks_.size(), - types); + CopyWithIndexes(pre_op_state->data_block_array_, sort_operator_state->unmerge_sorted_blocks_, block_indexes); prev_op_state->data_block_array_.clear(); if (!prev_op_state->Complete()) { @@ -239,9 +244,11 @@ bool PhysicalSort::Execute(QueryContext *, OperatorState *operator_state) { } auto &unmerge_sorted_blocks = sort_operator_state->unmerge_sorted_blocks_; auto merge_comparator = Comparator(unmerge_sorted_blocks, order_by_types_, expressions_); - Vector> indexes_group; + + merge_comparator.Init(); indexes_group.reserve(unmerge_sorted_blocks.size()); + for (SizeT block_id = 0; block_id < unmerge_sorted_blocks.size(); ++block_id) { Vector indexes; indexes.reserve(unmerge_sorted_blocks[block_id]->row_count()); @@ -252,9 +259,10 @@ bool PhysicalSort::Execute(QueryContext *, OperatorState *operator_state) { } indexes_group.push_back(indexes); } - auto merge_indexes = MergeIndexes(Move(indexes_group), merge_comparator); + auto merge_indexes = MergeIndexes(indexes_group, 0, indexes_group.size() - 1, merge_comparator); + indexes_group.clear(); - CopyWithIndexes(sort_operator_state->unmerge_sorted_blocks_, sort_operator_state->data_block_array_, merge_indexes, 0, types); + CopyWithIndexes(sort_operator_state->unmerge_sorted_blocks_, sort_operator_state->data_block_array_, merge_indexes); sort_operator_state->unmerge_sorted_blocks_.clear(); sort_operator_state->SetComplete(); return true; diff --git a/src/storage/data_block.cppm b/src/storage/data_block.cppm index 66441ef51d..5e2a3c77ba 100644 --- a/src/storage/data_block.cppm +++ b/src/storage/data_block.cppm @@ -97,6 +97,16 @@ public: return row_count_; } + [[nodiscard]] inline Vector> types() const { + Vector> types; + + types.reserve(column_count()); + for (SizeT colum_idx = 0; colum_idx < column_vectors.size(); ++colum_idx) { + types.push_back(column_vectors[colum_idx]->data_type()); + } + return types; + } + [[nodiscard]] inline SizeT capacity() const { return capacity_; } [[nodiscard]] inline SizeT available_capacity() const { return capacity_ - row_count_; }