Skip to content

Commit

Permalink
perf: optimize physical_sort.cpp implementation (#354)
Browse files Browse the repository at this point in the history
  • Loading branch information
KKould authored Dec 22, 2023
1 parent 5a782e3 commit c796736
Show file tree
Hide file tree
Showing 3 changed files with 209 additions and 122 deletions.
195 changes: 132 additions & 63 deletions benchmark/local_infinity/infinity_benchmark.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ using namespace infinity;

constexpr u64 second_unit = 1000 * 1000 * 1000;

double Measurement(SizeT thread_num, SizeT times, const StdFunction<void(SizeT, SharedPtr<Infinity>, std::thread::id)> &closure) {
infinity::BaseProfiler profiler;
double Measurement(String name, SizeT thread_num, SizeT times, const StdFunction<void(SizeT, SharedPtr<Infinity>, std::thread::id)> &closure) {
infinity::BaseProfiler profiler(name);
Vector<std::thread> threads;
threads.reserve(thread_num);

Expand Down Expand Up @@ -88,28 +88,28 @@ int main() {
Vector<String> results;
// Database
{
auto tims_costing_second = Measurement(thread_num, total_times, [&](SizeT i, SharedPtr<Infinity> infinity, std::thread::id thread_id) {
auto tims_costing_second = Measurement("Get Database", thread_num, total_times, [&](SizeT i, SharedPtr<Infinity> infinity, std::thread::id thread_id) {
__attribute__((unused)) auto ignored = infinity->GetDatabase("default");
});
results.push_back(Format("-> Get Database QPS: {}", total_times / tims_costing_second));
}
{
auto tims_costing_second = Measurement(thread_num, total_times, [&](SizeT i, SharedPtr<Infinity> infinity, std::thread::id thread_id) {
auto tims_costing_second = Measurement("List Databases", thread_num, total_times, [&](SizeT i, SharedPtr<Infinity> infinity, std::thread::id thread_id) {
__attribute__((unused)) auto ignored = infinity->ListDatabases();
});
results.push_back(Format("-> List Databases QPS: {}", total_times / tims_costing_second));
}
{
CreateDatabaseOptions create_db_opts;
auto tims_costing_second = Measurement(thread_num, total_times, [&](SizeT i, SharedPtr<Infinity> infinity, std::thread::id thread_id) {
auto tims_costing_second = Measurement("Create Database", thread_num, total_times, [&](SizeT i, SharedPtr<Infinity> infinity, std::thread::id thread_id) {
__attribute__((unused)) auto ignored = infinity->CreateDatabase(ToStr(i), create_db_opts);
});
results.push_back(Format("-> Create Database QPS: {}", total_times / tims_costing_second));
}

{
DropDatabaseOptions drop_db_opts;
auto tims_costing_second = Measurement(thread_num, total_times, [&](SizeT i, SharedPtr<Infinity> infinity, std::thread::id thread_id) {
auto tims_costing_second = Measurement("Drop Database", thread_num, total_times, [&](SizeT i, SharedPtr<Infinity> infinity, std::thread::id thread_id) {
__attribute__((unused)) auto ignored = infinity->DropDatabase(ToStr(i), drop_db_opts);
});
results.push_back(Format("-> Drop Database QPS: {}", total_times / tims_costing_second));
Expand Down Expand Up @@ -140,19 +140,20 @@ int main() {
infinity->LocalDisconnect();
}
// {
// auto tims_costing_second = Measurement(thread_num, total_times, [&](SizeT i, SharedPtr<Infinity> infinity, std::thread::id thread_id) {
// auto tims_costing_second = Measurement(thread_num, total_times, [&](SizeT i, SharedPtr<Infinity> infinity, std::thread::id
// thread_id) {
// __attribute__((unused)) auto ignored = infinity->GetDatabase("default")->ListTables();
// });
// results.push_back(Format("-> List Tables QPS: {}", total_times / tims_costing_second));
// }
{
auto tims_costing_second = Measurement(thread_num, total_times, [&](SizeT i, SharedPtr<Infinity> infinity, std::thread::id thread_id) {
auto tims_costing_second = Measurement("Get Tables", thread_num, total_times, [&](SizeT i, SharedPtr<Infinity> infinity, std::thread::id thread_id) {
__attribute__((unused)) auto ignored = infinity->GetDatabase("default")->GetTable("benchmark_test");
});
results.push_back(Format("-> Get Tables QPS: {}", total_times / tims_costing_second));
}
{
auto tims_costing_second = Measurement(thread_num, total_times, [&](SizeT i, SharedPtr<Infinity> infinity, std::thread::id thread_id) {
auto tims_costing_second = Measurement("Describe Tables", thread_num, total_times, [&](SizeT i, SharedPtr<Infinity> infinity, std::thread::id thread_id) {
__attribute__((unused)) auto ignored = infinity->GetDatabase("default")->DescribeTable("benchmark_test");
});
results.push_back(Format("-> Describe Tables QPS: {}", total_times / tims_costing_second));
Expand All @@ -179,32 +180,120 @@ int main() {
results.push_back(Format("-> Create Table QPS: {}", total_times / tims_costing_second));
}
{
auto tims_costing_second = Measurement(thread_num, total_times, [&](SizeT i, SharedPtr<Infinity> infinity, std::thread::id thread_id) {
auto tims_costing_second = Measurement("Drop Table", thread_num, total_times, [&](SizeT i, SharedPtr<Infinity> infinity, std::thread::id thread_id) {
__attribute__((unused)) auto ignored = infinity->GetDatabase("default")->DropTable(ToStr(i), drop_table_options);
});
results.push_back(Format("-> Drop Table QPS: {}", total_times / tims_costing_second));
}
{
{
auto tims_costing_second = Measurement(thread_num, total_times, [&](SizeT i, SharedPtr<Infinity> infinity, std::thread::id
thread_id) {
Vector<ParsedExpr *> *output_columns = new Vector<ParsedExpr *>();
ColumnExpr *col1 = new ColumnExpr();
col1->names_.emplace_back("col1");
output_columns->emplace_back(col1);
auto tims_costing_second =
Measurement("Select", thread_num, total_times, [&](SizeT i, SharedPtr<Infinity> infinity, std::thread::id thread_id) {
Vector<ParsedExpr *> *output_columns = new Vector<ParsedExpr *>();
ColumnExpr *col1 = new ColumnExpr();
col1->names_.emplace_back("col1");
output_columns->emplace_back(col1);

ColumnExpr *col2 = new ColumnExpr();
col2->names_.emplace_back("col2");
output_columns->emplace_back(col2);

__attribute__((unused)) auto ignored =
infinity->GetDatabase("default")->GetTable("benchmark_test")->Search(nullptr, nullptr, output_columns);
});
results.push_back(Format("-> Select QPS: {}", total_times / tims_costing_second));
}
{
auto tims_costing_second =
Measurement("Insert", thread_num, total_times, [&](SizeT i, SharedPtr<Infinity> infinity, std::thread::id thread_id) {
Vector<Vector<ParsedExpr *> *> *values = new Vector<Vector<ParsedExpr *> *>();
values->emplace_back(new Vector<ParsedExpr *>());

ColumnExpr *col2 = new ColumnExpr();
col2->names_.emplace_back("col2");
output_columns->emplace_back(col2);
Vector<String> *columns = new Vector<String>();
columns->emplace_back(col_name_1);
columns->emplace_back(col_name_2);

__attribute__((unused)) auto ignored =
infinity->GetDatabase("default")->GetTable("benchmark_test")->Search(nullptr, nullptr, output_columns);
});
results.push_back(Format("-> Select QPS: {}", total_times / tims_costing_second));
ConstantExpr *value1 = new ConstantExpr(LiteralType::kInteger);
value1->integer_value_ = i;
values->at(0)->emplace_back(value1);

ConstantExpr *value2 = new ConstantExpr(LiteralType::kInteger);
value2->integer_value_ = i;
values->at(0)->emplace_back(value2);

__attribute__((unused)) auto ignored = infinity->GetDatabase("default")->GetTable("benchmark_test")->Insert(columns, values);
});
results.push_back(Format("-> Insert QPS: {}", total_times / tims_costing_second));
}
{
auto tims_costing_second =
Measurement("Update", thread_num, total_times, [&](SizeT i, SharedPtr<Infinity> infinity, std::thread::id thread_id) {
Vector<UpdateExpr *> *values = new Vector<UpdateExpr *>();

Vector<String> *columns = new Vector<String>();
columns->emplace_back(col_name_1);
columns->emplace_back(col_name_2);

ConstantExpr *value1 = new ConstantExpr(LiteralType::kInteger);
value1->integer_value_ = i;
UpdateExpr *update_expr1 = new UpdateExpr();
update_expr1->column_name = col_name_1;
update_expr1->value = value1;

ConstantExpr *value2 = new ConstantExpr(LiteralType::kInteger);
value1->integer_value_ = i;
UpdateExpr *update_expr2 = new UpdateExpr();
update_expr2->column_name = col_name_2;
update_expr2->value = value2;

values->push_back(update_expr1);
values->push_back(update_expr2);

__attribute__((unused)) auto ignored = infinity->GetDatabase("default")->GetTable("benchmark_test")->Update(nullptr,
values);
});
results.push_back(Format("-> Update QPS: {}", total_times / tims_costing_second));
}
{
auto tims_costing_second = Measurement(thread_num, total_times, [&](SizeT i, SharedPtr<Infinity> infinity, std::thread::id
thread_id) {
auto tims_costing_second =
Measurement("Delete", thread_num, total_times, [&](SizeT i, SharedPtr<Infinity> infinity, std::thread::id thread_id) {
__attribute__((unused)) auto ignored = infinity->GetDatabase("default")->GetTable("benchmark_test")->Delete(nullptr);
});
results.push_back(Format("-> Delete QPS: {}", total_times / tims_costing_second));
}
}
}
{
SizeT sort_row = 1000 * 1000;
SizeT sort_times = 48 * 10;

String col_name_1 = "c1";
String col_name_2 = "c2";

std::srand(static_cast<unsigned int>(std::time(nullptr)));

SharedPtr<DataType> col_type = MakeShared<DataType>(LogicalType::kInteger);
{
CreateTableOptions create_table_opts;

SizeT column_count = 2;
Vector<ColumnDef *> column_definitions;
column_definitions.reserve(column_count);

auto col_def_1 = new ColumnDef(0, col_type, col_name_1, HashSet<ConstraintType>());
column_definitions.emplace_back(col_def_1);

auto col_def_2 = new ColumnDef(1, col_type, col_name_2, HashSet<ConstraintType>());
column_definitions.emplace_back(col_def_2);

SharedPtr<Infinity> infinity = Infinity::LocalConnect();
__attribute__((unused)) auto ignored =
infinity->GetDatabase("default")->CreateTable("benchmark_sort", column_definitions, Vector<TableConstraint *>(), create_table_opts);
infinity->LocalDisconnect();
}
{
auto tims_costing_second =
Measurement("Insert for Select Sort", thread_num, sort_row, [&](SizeT i, SharedPtr<Infinity> infinity, std::thread::id thread_id) {
Vector<Vector<ParsedExpr *> *> *values = new Vector<Vector<ParsedExpr *> *>();
values->emplace_back(new Vector<ParsedExpr *>());

Expand All @@ -213,52 +302,32 @@ int main() {
columns->emplace_back(col_name_2);

ConstantExpr *value1 = new ConstantExpr(LiteralType::kInteger);
value1->integer_value_ = i;
value1->integer_value_ = std::rand();
values->at(0)->emplace_back(value1);

ConstantExpr *value2 = new ConstantExpr(LiteralType::kInteger);
value2->integer_value_ = i;
value2->integer_value_ = std::rand();
values->at(0)->emplace_back(value2);

__attribute__((unused)) auto ignored = infinity->GetDatabase("default")->GetTable("benchmark_test")->Insert(columns, values);
__attribute__((unused)) auto ignored = infinity->GetDatabase("default")->GetTable("benchmark_sort")->Insert(columns, values);
});
results.push_back(Format("-> Insert QPS: {}", total_times / tims_costing_second));
}
{
auto tims_costing_second = Measurement(thread_num, total_times, [&](SizeT i, SharedPtr<Infinity> infinity, std::thread::id
thread_id) {
Vector<UpdateExpr *> *values = new Vector<UpdateExpr *>();

Vector<String> *columns = new Vector<String>();
columns->emplace_back(col_name_1);
columns->emplace_back(col_name_2);

ConstantExpr *value1 = new ConstantExpr(LiteralType::kInteger);
value1->integer_value_ = i;
UpdateExpr *update_expr1 = new UpdateExpr();
update_expr1->column_name = col_name_1;
update_expr1->value = value1;

ConstantExpr *value2 = new ConstantExpr(LiteralType::kInteger);
value1->integer_value_ = i;
UpdateExpr *update_expr2 = new UpdateExpr();
update_expr2->column_name = col_name_2;
update_expr2->value = value2;
results.push_back(Format("-> Insert for Sort Time: {}s", tims_costing_second));
}
{
auto tims_costing_second =
Measurement("Select Sort", thread_num, sort_times, [&](SizeT i, SharedPtr<Infinity> infinity, std::thread::id thread_id) {
Vector<ParsedExpr *> *output_columns = new Vector<ParsedExpr *>();
ColumnExpr *col1 = new ColumnExpr();
col1->names_.emplace_back("col1");
output_columns->emplace_back(col1);

values->push_back(update_expr1);
values->push_back(update_expr2);
ColumnExpr *col2 = new ColumnExpr();
col2->names_.emplace_back("col2");
output_columns->emplace_back(col2);

__attribute__((unused)) auto ignored = infinity->GetDatabase("default")->GetTable("benchmark_test")->Update(nullptr, values);
__attribute__((unused)) auto ignored = infinity->Query("select c1, c2 from benchmark_sort order by c1");
});
results.push_back(Format("-> Update QPS: {}", total_times / tims_costing_second));
}
{
auto tims_costing_second =
Measurement(thread_num, total_times, [&](SizeT i, SharedPtr<Infinity> infinity, std::thread::id thread_id) {
__attribute__((unused)) auto ignored = infinity->GetDatabase("default")->GetTable("benchmark_test")->Delete(nullptr);
});
results.push_back(Format("-> Delete QPS: {}", total_times / tims_costing_second));
}
results.push_back(Format("-> Select Sort Time QPS: {}", sort_times / tims_costing_second));
}
}

Expand Down Expand Up @@ -318,7 +387,7 @@ int main() {
}

table->CreateIndex(index_name, index_info_list, CreateIndexOptions());
} while(false);
} while (false);

// {
// std::cout << "--- Start to run search benchmark: ";
Expand Down
Loading

0 comments on commit c796736

Please sign in to comment.