From ec1c922f07efec56acbe61e9c07c4baf2cc2a9c0 Mon Sep 17 00:00:00 2001 From: Xwg Date: Tue, 26 Dec 2023 21:23:02 +0800 Subject: [PATCH] Finish basic simple aggregate count, sum, min, max, avg one block (#381) * Finish basic simple aggregate count, sum, min, max * temp * Add physical op virtual function TaskletCount * Fix drop stl --- src/executor/fragment_builder.cpp | 10 +- src/executor/operator/physcial_drop_view.cppm | 9 +- src/executor/operator/physical_aggregate.cpp | 112 +++++++++++------- src/executor/operator/physical_aggregate.cppm | 10 +- src/executor/operator/physical_alter.cppm | 6 + src/executor/operator/physical_command.cppm | 6 + .../operator/physical_create_collection.cppm | 6 + .../operator/physical_create_index.cppm | 6 + .../operator/physical_create_schema.cppm | 6 + .../operator/physical_create_table.cppm | 5 + .../operator/physical_create_view.cppm | 6 + .../operator/physical_cross_product.cppm | 6 + src/executor/operator/physical_delete.cppm | 6 + .../operator/physical_drop_collection.cppm | 6 + .../operator/physical_drop_index.cppm | 6 + .../operator/physical_drop_schema.cppm | 6 + .../operator/physical_drop_table.cppm | 6 + .../operator/physical_dummy_operator.cppm | 6 + .../operator/physical_dummy_scan.cppm | 9 +- src/executor/operator/physical_except.cppm | 6 + src/executor/operator/physical_explain.cppm | 6 + src/executor/operator/physical_export.cppm | 6 + src/executor/operator/physical_filter.cppm | 6 + src/executor/operator/physical_flush.cppm | 6 + src/executor/operator/physical_fusion.cppm | 6 + src/executor/operator/physical_hash.cppm | 6 + src/executor/operator/physical_hash_join.cppm | 6 + src/executor/operator/physical_import.cppm | 6 + .../operator/physical_index_join.cppm | 6 + .../operator/physical_index_scan.cppm | 6 + src/executor/operator/physical_insert.cppm | 6 + src/executor/operator/physical_intersect.cppm | 6 + src/executor/operator/physical_knn_scan.cppm | 5 + src/executor/operator/physical_limit.cppm | 6 + src/executor/operator/physical_match.cppm | 6 + .../operator/physical_merge_aggregate.cpp | 32 +++++ .../operator/physical_merge_aggregate.cppm | 64 ++++++++++ .../operator/physical_merge_hash.cppm | 6 + src/executor/operator/physical_merge_knn.cppm | 6 + .../operator/physical_merge_limit.cppm | 6 + .../physical_merge_parallel_aggregate.cppm | 6 + .../operator/physical_merge_sort.cppm | 6 + src/executor/operator/physical_merge_top.cppm | 6 + .../operator/physical_nested_loop_join.cppm | 6 + src/executor/operator/physical_optimize.cppm | 6 + .../operator/physical_parallel_aggregate.cppm | 6 + .../operator/physical_prepared_plan.cppm | 6 + src/executor/operator/physical_project.cppm | 6 + src/executor/operator/physical_show.cppm | 6 + src/executor/operator/physical_sink.cpp | 13 ++ src/executor/operator/physical_sink.cppm | 6 + src/executor/operator/physical_sort.cppm | 6 + .../operator/physical_sort_merge_join.cppm | 8 +- src/executor/operator/physical_source.cppm | 6 + src/executor/operator/physical_table_scan.cpp | 8 +- .../operator/physical_table_scan.cppm | 2 + src/executor/operator/physical_top.cppm | 6 + src/executor/operator/physical_union_all.cppm | 6 + src/executor/operator/physical_update.cppm | 6 + src/executor/operator_state.cppm | 5 + src/executor/physical_operator.cppm | 2 + src/executor/physical_operator_type.cpp | 2 + src/executor/physical_operator_type.cppm | 1 + src/executor/physical_planner.cpp | 47 +++++--- src/planner/bound_select_statement.cpp | 2 + src/planner/node/logical_aggregate.cppm | 8 +- src/planner/subquery/subquery_unnest.cpp | 2 +- src/scheduler/fragment_context.cpp | 10 +- test/sql/ddl/drop/test_drop.slt | 20 ++-- test/sql/dql/aggregate/test_simple_agg.slt | 59 +++++++++ tools/sqllogictest.py | 1 + 71 files changed, 639 insertions(+), 83 deletions(-) create mode 100644 src/executor/operator/physical_merge_aggregate.cpp create mode 100644 src/executor/operator/physical_merge_aggregate.cppm create mode 100644 test/sql/dql/aggregate/test_simple_agg.slt diff --git a/src/executor/fragment_builder.cpp b/src/executor/fragment_builder.cpp index 1639787853..9e93409ef6 100644 --- a/src/executor/fragment_builder.cpp +++ b/src/executor/fragment_builder.cpp @@ -120,6 +120,14 @@ void FragmentBuilder::BuildFragments(PhysicalOperator *phys_op, PlanFragment *cu return; } case PhysicalOperatorType::kAggregate: { + current_fragment_ptr->AddOperator(phys_op); + if (phys_op->left() == nullptr) { + Error("No input node of aggregate operator"); + } else { + current_fragment_ptr->SetFragmentType(FragmentType::kParallelMaterialize); + BuildFragments(phys_op->left(), current_fragment_ptr); + } + return; current_fragment_ptr->AddOperator(phys_op); current_fragment_ptr->SetSourceNode(query_context_ptr_, SourceType::kLocalQueue, phys_op->GetOutputNames(), phys_op->GetOutputTypes()); if (phys_op->left() == nullptr) { @@ -160,7 +168,7 @@ void FragmentBuilder::BuildFragments(PhysicalOperator *phys_op, PlanFragment *cu break; } case PhysicalOperatorType::kFusion: - case PhysicalOperatorType::kMergeParallelAggregate: + case PhysicalOperatorType::kMergeAggregate: case PhysicalOperatorType::kMergeHash: case PhysicalOperatorType::kMergeLimit: case PhysicalOperatorType::kMergeTop: diff --git a/src/executor/operator/physcial_drop_view.cppm b/src/executor/operator/physcial_drop_view.cppm index aa0579a5f9..b975b48adb 100644 --- a/src/executor/operator/physcial_drop_view.cppm +++ b/src/executor/operator/physcial_drop_view.cppm @@ -21,6 +21,7 @@ import operator_state; import physical_operator; import physical_operator_type; import load_meta; +import infinity_exception; export module physical_drop_view; @@ -35,8 +36,7 @@ public: u64 id, SharedPtr> load_metas) : PhysicalOperator(PhysicalOperatorType::kDropView, nullptr, nullptr, id, load_metas), schema_name_(Move(schema_name)), - view_name_(Move(view_name)), conflict_type_(conflict_type), output_names_(Move(output_names)), - output_types_(Move(output_types)) {} + view_name_(Move(view_name)), conflict_type_(conflict_type), output_names_(Move(output_names)), output_types_(Move(output_types)) {} ~PhysicalDropView() override = default; @@ -44,6 +44,11 @@ public: bool Execute(QueryContext *query_context, OperatorState *operator_state) final; + SizeT TaskletCount() override { + Error("TaskletCount not Implement"); + return 0; + } + inline SharedPtr> GetOutputNames() const final { return output_names_; } inline SharedPtr>> GetOutputTypes() const final { return output_types_; } diff --git a/src/executor/operator/physical_aggregate.cpp b/src/executor/operator/physical_aggregate.cpp index 81da357e1e..825f159c12 100644 --- a/src/executor/operator/physical_aggregate.cpp +++ b/src/executor/operator/physical_aggregate.cpp @@ -24,24 +24,27 @@ import parser; import operator_state; import data_block; import utility; +import logger; +import column_vector; import infinity_exception; import default_values; +import parser; +import expression_state; +import expression_evaluator; module physical_aggregate; - namespace infinity { void PhysicalAggregate::Init() {} -bool PhysicalAggregate::Execute(QueryContext *, OperatorState *) { -#if 0 - input_table_ = left_->output(); - ExecutorAssert(input_table_ != nullptr, "No left input."); +bool PhysicalAggregate::Execute(QueryContext *query_context, OperatorState *operator_state) { + OperatorState *prev_op_state = operator_state->prev_op_state_; + auto *aggregate_operator_state = static_cast(operator_state); // 1. Execute group-by expressions to generate unique key. - ExpressionEvaluator groupby_executor; - groupby_executor.Init(groups_); + // ExpressionEvaluator groupby_executor; + // groupby_executor.Init(groups_); Vector> groupby_columns; SizeT group_count = groups_.size(); @@ -49,10 +52,13 @@ bool PhysicalAggregate::Execute(QueryContext *, OperatorState *) { if(group_count == 0) { // Aggregate without group by expression // e.g. SELECT count(a) FROM table; - SimpleAggregate(this->output_); - return ; + if (SimpleAggregate(this->output_, prev_op_state, aggregate_operator_state)) { + return true; + } else { + return false; + } } - +#if 0 groupby_columns.reserve(group_count); Vector types; @@ -553,10 +559,13 @@ void PhysicalAggregate::GenerateGroupByResult(const SharedPtr &input_ #endif } -void PhysicalAggregate::SimpleAggregate(SharedPtr &) { -#if 0 +bool PhysicalAggregate::SimpleAggregate(SharedPtr &output_table, + OperatorState *pre_operator_state, + AggregateOperatorState *aggregate_operator_state) { SizeT aggregates_count = aggregates_.size(); - ExecutorAssert(aggregates_count > 0, "Simple Aggregate without aggregate expression"); + if (aggregates_count <= 0) { + Error("Simple Aggregate without aggregate expression."); + } // Prepare the output table columns Vector> aggregate_columns; @@ -570,10 +579,11 @@ void PhysicalAggregate::SimpleAggregate(SharedPtr &) { Vector> output_types; output_types.reserve(aggregates_count); - SizeT input_data_block_count = input_table_->DataBlockCount(); + SizeT input_block_count = pre_operator_state->data_block_array_.size(); + for (i64 idx = 0; auto &expr: aggregates_) { // expression state - expr_states.emplace_back(ExpressionState::CreateState(expr, input_data_block_count)); + expr_states.emplace_back(ExpressionState::CreateState(expr)); SharedPtr output_type = MakeShared(expr->Type()); @@ -590,41 +600,57 @@ void PhysicalAggregate::SimpleAggregate(SharedPtr &) { ++idx; } - // output aggregate table definition - SharedPtr aggregate_tabledef = TableDef::Make(MakeShared("default"), - MakeShared("aggregate"), - aggregate_columns); - - output_table = DataTable::Make(aggregate_tabledef, TableType::kAggregate); - - SharedPtr output_data_block = DataBlock::Make(); - output_data_block->Init(output_types); - - if(input_data_block_count == 0) { + if (input_block_count == 0) { // No input data - LOG_TRACE("No input, no aggregate result") - return ; + LOG_TRACE("No input, no aggregate result"); + return true; } // Loop blocks - ExpressionEvaluator evaluator; - evaluator.Init(input_table_->data_blocks_); - for (SizeT expr_idx = 0; expr_idx < aggregates_count; ++expr_idx) { - Vector> blocks_column; - blocks_column.emplace_back(output_data_block->column_vectors[expr_idx]); - evaluator.Execute(aggregates_[expr_idx], - expr_states[expr_idx], - blocks_column); - if(blocks_column[0].get() != output_data_block->column_vectors[expr_idx].get()) { - // column vector in blocks column might be changed to the column vector from column reference. - // This check and assignment is to make sure the right column vector are assign to output_data_block - output_data_block->column_vectors[expr_idx] = blocks_column[0]; + // ExpressionEvaluator evaluator; + // //evaluator.Init(input_table_->data_blocks_); + // for (SizeT expr_idx = 0; expr_idx < aggregates_count; ++expr_idx) { + // + // ExpressionEvaluator evaluator; + // evaluator.Init(aggregates_[]) + // Vector> blocks_column; + // blocks_column.emplace_back(output_data_block->column_vectors[expr_idx]); + // evaluator.Execute(aggregates_[expr_idx], expr_states[expr_idx], blocks_column[expr_idx]); + // if(blocks_column[0].get() != output_data_block->column_vectors[expr_idx].get()) { + // // column vector in blocks column might be changed to the column vector from column reference. + // // This check and assignment is to make sure the right column vector are assign to output_data_block + // output_data_block->column_vectors[expr_idx] = blocks_column[0]; + // } + // } + // + // output_data_block->Finalize(); + + for (SizeT block_idx = 0; block_idx < input_block_count; ++block_idx) { + DataBlock *input_data_block = pre_operator_state->data_block_array_[block_idx].get(); + + aggregate_operator_state->data_block_array_.emplace_back(DataBlock::MakeUniquePtr()); + DataBlock *output_data_block = aggregate_operator_state->data_block_array_.back().get(); + output_data_block->Init(*GetOutputTypes()); + + ExpressionEvaluator evaluator; + evaluator.Init(input_data_block); + + SizeT expression_count = aggregates_count; + // Prepare the expression states + + for (SizeT expr_idx = 0; expr_idx < expression_count; ++expr_idx) { + // Vector> blocks_column; + // blocks_column.emplace_back(output_data_block->column_vectors[expr_idx]); + evaluator.Execute(aggregates_[expr_idx], expr_states[expr_idx], output_data_block->column_vectors[expr_idx]); } + output_data_block->Finalize(); } - output_data_block->Finalize(); - output_table->Append(output_data_block); -#endif + pre_operator_state->data_block_array_.clear(); + if (pre_operator_state->Complete()) { + aggregate_operator_state->SetComplete(); + } + return true; } SharedPtr> PhysicalAggregate::GetOutputNames() const { diff --git a/src/executor/operator/physical_aggregate.cppm b/src/executor/operator/physical_aggregate.cppm index 35f7a2d6a8..f9f14762f9 100644 --- a/src/executor/operator/physical_aggregate.cppm +++ b/src/executor/operator/physical_aggregate.cppm @@ -24,6 +24,7 @@ import data_table; import hash_table; import base_expression; import load_meta; +import infinity_exception; export module physical_aggregate; @@ -52,6 +53,11 @@ public: bool Execute(QueryContext *query_context, OperatorState *operator_state) final; + SizeT TaskletCount() override { + Error("TaskletCount not Implement"); + return 0; + } + void GroupByInputTable(const SharedPtr &input_table, SharedPtr &output_table); void GenerateGroupByResult(const SharedPtr &input_table, SharedPtr &output_table); @@ -60,7 +66,9 @@ public: Vector> aggregates_{}; HashTable hash_table_; - void SimpleAggregate(SharedPtr &output_table); + bool SimpleAggregate(SharedPtr &output_table, + OperatorState *pre_operator_state, + AggregateOperatorState *aggregate_operator_state); inline u64 GroupTableIndex() const { return groupby_index_; } diff --git a/src/executor/operator/physical_alter.cppm b/src/executor/operator/physical_alter.cppm index 2cf0296bcb..a25a3bdebe 100644 --- a/src/executor/operator/physical_alter.cppm +++ b/src/executor/operator/physical_alter.cppm @@ -21,6 +21,7 @@ import operator_state; import physical_operator; import physical_operator_type; import load_meta; +import infinity_exception; export module physical_alter; @@ -41,6 +42,11 @@ public: bool Execute(QueryContext *query_context, OperatorState *operator_state) final; + SizeT TaskletCount() override { + Error("TaskletCount not Implement"); + return 0; + } + inline SharedPtr> GetOutputNames() const final { return output_names_; } inline SharedPtr>> GetOutputTypes() const final { return output_types_; } diff --git a/src/executor/operator/physical_command.cppm b/src/executor/operator/physical_command.cppm index 1b41e0f239..a949dc66c2 100644 --- a/src/executor/operator/physical_command.cppm +++ b/src/executor/operator/physical_command.cppm @@ -22,6 +22,7 @@ import query_context; import parser; import operator_state; import load_meta; +import infinity_exception; export module physical_command; @@ -43,6 +44,11 @@ public: bool Execute(QueryContext *query_context, OperatorState *operator_state) override; + SizeT TaskletCount() override { + Error("TaskletCount not Implement"); + return 0; + } + inline SharedPtr> GetOutputNames() const override { return output_names_; } inline SharedPtr>> GetOutputTypes() const override { return output_types_; } diff --git a/src/executor/operator/physical_create_collection.cppm b/src/executor/operator/physical_create_collection.cppm index 2631ac2298..b9e9593409 100644 --- a/src/executor/operator/physical_create_collection.cppm +++ b/src/executor/operator/physical_create_collection.cppm @@ -21,6 +21,7 @@ import operator_state; import physical_operator; import physical_operator_type; import load_meta; +import infinity_exception; export module physical_create_collection; @@ -43,6 +44,11 @@ public: bool Execute(QueryContext *query_context, OperatorState *operator_state) final; + SizeT TaskletCount() override { + Error("TaskletCount not Implement"); + return 0; + } + inline SharedPtr> GetOutputNames() const final { return output_names_; } inline SharedPtr>> GetOutputTypes() const final { return output_types_; } diff --git a/src/executor/operator/physical_create_index.cppm b/src/executor/operator/physical_create_index.cppm index 431a8517fa..4d005b4aa6 100644 --- a/src/executor/operator/physical_create_index.cppm +++ b/src/executor/operator/physical_create_index.cppm @@ -22,6 +22,7 @@ import physical_operator; import physical_operator_type; import index_def; import load_meta; +import infinity_exception; export module physical_create_index; @@ -32,6 +33,11 @@ public: bool Execute(QueryContext *query_context, OperatorState *operator_state) override; + SizeT TaskletCount() override { + Error("TaskletCount not Implement"); + return 0; + } + SharedPtr> GetOutputNames() const override { return output_names_; } SharedPtr>> GetOutputTypes() const override { return output_types_; } diff --git a/src/executor/operator/physical_create_schema.cppm b/src/executor/operator/physical_create_schema.cppm index 3a2342267d..c2e0b89ea5 100644 --- a/src/executor/operator/physical_create_schema.cppm +++ b/src/executor/operator/physical_create_schema.cppm @@ -22,6 +22,7 @@ import physical_operator; import physical_operator_type; import index_def; import load_meta; +import infinity_exception; export module physical_create_schema; @@ -44,6 +45,11 @@ public: bool Execute(QueryContext *query_context, OperatorState *operator_state) final; + SizeT TaskletCount() override { + Error("TaskletCount not Implement"); + return 0; + } + inline SharedPtr> GetOutputNames() const final { return output_names_; } inline SharedPtr>> GetOutputTypes() const final { return output_types_; } diff --git a/src/executor/operator/physical_create_table.cppm b/src/executor/operator/physical_create_table.cppm index 7dcce607cc..d73d30ba7b 100644 --- a/src/executor/operator/physical_create_table.cppm +++ b/src/executor/operator/physical_create_table.cppm @@ -23,6 +23,7 @@ import physical_operator_type; import index_def; import table_def; import load_meta; +import infinity_exception; export module physical_create_table; @@ -54,6 +55,10 @@ public: bool Execute(QueryContext *query_context, OperatorState *operator_state) final; + SizeT TaskletCount() override { + Error("TaskletCount not Implement"); + return 0; + } inline SharedPtr> GetOutputNames() const final { return output_names_; } inline SharedPtr>> GetOutputTypes() const final { return output_types_; } diff --git a/src/executor/operator/physical_create_view.cppm b/src/executor/operator/physical_create_view.cppm index 11caf680b8..964adee02a 100644 --- a/src/executor/operator/physical_create_view.cppm +++ b/src/executor/operator/physical_create_view.cppm @@ -22,6 +22,7 @@ import physical_operator; import physical_operator_type; import index_def; import load_meta; +import infinity_exception; export module physical_create_view; @@ -43,6 +44,11 @@ public: bool Execute(QueryContext *query_context, OperatorState *operator_state) final; + SizeT TaskletCount() override { + Error("TaskletCount not Implement"); + return 0; + } + inline const SharedPtr &bound_select_statement() const { return create_view_info_; }; inline SharedPtr> GetOutputNames() const final { return output_names_; } diff --git a/src/executor/operator/physical_cross_product.cppm b/src/executor/operator/physical_cross_product.cppm index 4f988bce6f..44c0c10fa3 100644 --- a/src/executor/operator/physical_cross_product.cppm +++ b/src/executor/operator/physical_cross_product.cppm @@ -22,6 +22,7 @@ import physical_operator; import physical_operator_type; import data_table; import load_meta; +import infinity_exception; export module physical_cross_product; @@ -38,6 +39,11 @@ public: bool Execute(QueryContext *query_context, OperatorState *operator_state) final; + SizeT TaskletCount() override { + Error("TaskletCount not Implement"); + return 0; + } + SharedPtr> GetOutputNames() const final; SharedPtr>> GetOutputTypes() const final; diff --git a/src/executor/operator/physical_delete.cppm b/src/executor/operator/physical_delete.cppm index d0a6a51606..83c306586e 100644 --- a/src/executor/operator/physical_delete.cppm +++ b/src/executor/operator/physical_delete.cppm @@ -22,6 +22,7 @@ import physical_operator; import physical_operator_type; import table_collection_entry; import load_meta; +import infinity_exception; export module physical_delete; @@ -38,6 +39,11 @@ public: bool Execute(QueryContext *query_context, OperatorState *operator_state) final; + SizeT TaskletCount() override { + Error("TaskletCount not Implement"); + return 0; + } + inline SharedPtr> GetOutputNames() const final { return output_names_; } inline SharedPtr>> GetOutputTypes() const final { return output_types_; } diff --git a/src/executor/operator/physical_drop_collection.cppm b/src/executor/operator/physical_drop_collection.cppm index c2a7460414..f5950bf882 100644 --- a/src/executor/operator/physical_drop_collection.cppm +++ b/src/executor/operator/physical_drop_collection.cppm @@ -21,6 +21,7 @@ import operator_state; import physical_operator; import physical_operator_type; import load_meta; +import infinity_exception; export module physical_drop_collection; @@ -42,6 +43,11 @@ public: bool Execute(QueryContext *query_context, OperatorState *operator_state) final; + SizeT TaskletCount() override { + Error("TaskletCount not Implement"); + return 0; + } + inline SharedPtr> GetOutputNames() const final { return output_names_; } inline SharedPtr>> GetOutputTypes() const final { return output_types_; } diff --git a/src/executor/operator/physical_drop_index.cppm b/src/executor/operator/physical_drop_index.cppm index fcce12f713..891c2908bc 100644 --- a/src/executor/operator/physical_drop_index.cppm +++ b/src/executor/operator/physical_drop_index.cppm @@ -22,6 +22,7 @@ import query_context; import parser; import operator_state; import load_meta; +import infinity_exception; export module physical_drop_index; @@ -47,6 +48,11 @@ public: bool Execute(QueryContext *query_context, OperatorState *operator_state) override; + SizeT TaskletCount() override { + Error("TaskletCount not Implement"); + return 0; + } + inline SharedPtr> GetOutputNames() const override { return output_names_; } inline SharedPtr>> GetOutputTypes() const override { return output_types_; } diff --git a/src/executor/operator/physical_drop_schema.cppm b/src/executor/operator/physical_drop_schema.cppm index 24d40006c1..6b0548edbf 100644 --- a/src/executor/operator/physical_drop_schema.cppm +++ b/src/executor/operator/physical_drop_schema.cppm @@ -21,6 +21,7 @@ import operator_state; import physical_operator; import physical_operator_type; import load_meta; +import infinity_exception; export module physical_drop_schema; @@ -43,6 +44,11 @@ public: bool Execute(QueryContext *query_context, OperatorState *operator_state) final; + SizeT TaskletCount() override { + Error("TaskletCount not Implement"); + return 0; + } + inline SharedPtr> GetOutputNames() const final { return output_names_; } inline SharedPtr>> GetOutputTypes() const final { return output_types_; } diff --git a/src/executor/operator/physical_drop_table.cppm b/src/executor/operator/physical_drop_table.cppm index 17239b45bf..c759e65761 100644 --- a/src/executor/operator/physical_drop_table.cppm +++ b/src/executor/operator/physical_drop_table.cppm @@ -21,6 +21,7 @@ import operator_state; import physical_operator; import physical_operator_type; import load_meta; +import infinity_exception; export module physical_drop_table; @@ -45,6 +46,11 @@ public: bool Execute(QueryContext *query_context, OperatorState *operator_state) final; + SizeT TaskletCount() override { + Error("TaskletCount not Implement"); + return 0; + } + inline SharedPtr> GetOutputNames() const final { return output_names_; } inline SharedPtr>> GetOutputTypes() const final { return output_types_; } diff --git a/src/executor/operator/physical_dummy_operator.cppm b/src/executor/operator/physical_dummy_operator.cppm index 6eb5878ac2..b4da1418ca 100644 --- a/src/executor/operator/physical_dummy_operator.cppm +++ b/src/executor/operator/physical_dummy_operator.cppm @@ -21,6 +21,7 @@ import operator_state; import physical_operator; import physical_operator_type; import load_meta; +import infinity_exception; export module physical_dummy_operator; @@ -36,6 +37,11 @@ public: bool Execute(QueryContext *query_context, OperatorState *operator_state) final; + SizeT TaskletCount() override { + Error("TaskletCount not Implement"); + return 0; + } + inline SharedPtr> GetOutputNames() const final { return output_names_; } inline SharedPtr>> GetOutputTypes() const final { return output_types_; } diff --git a/src/executor/operator/physical_dummy_scan.cppm b/src/executor/operator/physical_dummy_scan.cppm index 714506e25d..ba525b07ee 100644 --- a/src/executor/operator/physical_dummy_scan.cppm +++ b/src/executor/operator/physical_dummy_scan.cppm @@ -21,6 +21,7 @@ import operator_state; import physical_operator; import physical_operator_type; import load_meta; +import infinity_exception; export module physical_dummy_scan; @@ -28,7 +29,8 @@ namespace infinity { export class PhysicalDummyScan final : public PhysicalOperator { public: - explicit PhysicalDummyScan(u64 id, SharedPtr> load_metas) : PhysicalOperator(PhysicalOperatorType::kDummyScan, nullptr, nullptr, id, load_metas) {} + explicit PhysicalDummyScan(u64 id, SharedPtr> load_metas) + : PhysicalOperator(PhysicalOperatorType::kDummyScan, nullptr, nullptr, id, load_metas) {} ~PhysicalDummyScan() override = default; @@ -40,6 +42,11 @@ public: inline SharedPtr>> GetOutputTypes() const final { return output_types_; } + SizeT TaskletCount() override { + Error("TaskletCount not Implement"); + return 0; + } + private: SharedPtr> output_names_{}; SharedPtr>> output_types_{}; diff --git a/src/executor/operator/physical_except.cppm b/src/executor/operator/physical_except.cppm index 3b69cf4705..246f256399 100644 --- a/src/executor/operator/physical_except.cppm +++ b/src/executor/operator/physical_except.cppm @@ -21,6 +21,7 @@ import operator_state; import physical_operator; import physical_operator_type; import load_meta; +import infinity_exception; export module physical_except; @@ -45,6 +46,11 @@ public: inline SharedPtr>> GetOutputTypes() const final { return output_types_; } + SizeT TaskletCount() override { + Error("TaskletCount not Implement"); + return 0; + } + private: SharedPtr> output_names_{}; SharedPtr>> output_types_{}; diff --git a/src/executor/operator/physical_explain.cppm b/src/executor/operator/physical_explain.cppm index 383423e63c..316218a21b 100644 --- a/src/executor/operator/physical_explain.cppm +++ b/src/executor/operator/physical_explain.cppm @@ -21,6 +21,7 @@ import operator_state; import physical_operator; import physical_operator_type; import load_meta; +import infinity_exception; export module physical_explain; @@ -49,6 +50,11 @@ public: inline SharedPtr>> GetOutputTypes() const final { return output_types_; } + SizeT TaskletCount() override { + Error("TaskletCount not Implement"); + return 0; + } + inline ExplainType explain_type() const { return explain_type_; } static void AlignParagraphs(Vector> &array1, Vector> &array2); diff --git a/src/executor/operator/physical_export.cppm b/src/executor/operator/physical_export.cppm index 5ffd31ee2f..ddeaab6d73 100644 --- a/src/executor/operator/physical_export.cppm +++ b/src/executor/operator/physical_export.cppm @@ -21,6 +21,7 @@ import operator_state; import physical_operator; import physical_operator_type; import load_meta; +import infinity_exception; export module physical_export; @@ -49,6 +50,11 @@ public: inline SharedPtr>> GetOutputTypes() const final { return output_types_; } + SizeT TaskletCount() override { + Error("TaskletCount not Implement"); + return 0; + } + void ExportCSV(QueryContext *query_context); void ExportJSON(QueryContext *query_context); diff --git a/src/executor/operator/physical_filter.cppm b/src/executor/operator/physical_filter.cppm index a8f036d807..5d4c15e661 100644 --- a/src/executor/operator/physical_filter.cppm +++ b/src/executor/operator/physical_filter.cppm @@ -25,6 +25,7 @@ import data_table; import expression_evaluator; import expression_selector; import load_meta; +import infinity_exception; export module physical_filter; @@ -45,6 +46,11 @@ public: inline SharedPtr>> GetOutputTypes() const final { return left_->GetOutputTypes(); } + SizeT TaskletCount() override { + Error("TaskletCount not Implement"); + return 0; + } + inline const SharedPtr &condition() const { return condition_; } private: diff --git a/src/executor/operator/physical_flush.cppm b/src/executor/operator/physical_flush.cppm index 38b3117fee..b8a4589138 100644 --- a/src/executor/operator/physical_flush.cppm +++ b/src/executor/operator/physical_flush.cppm @@ -22,6 +22,7 @@ import physical_operator; import physical_operator_type; import base_expression; import load_meta; +import infinity_exception; export module physical_flush; @@ -42,6 +43,11 @@ public: inline SharedPtr>> GetOutputTypes() const final { return output_types_; } + SizeT TaskletCount() override { + Error("TaskletCount not Implement"); + return 0; + } + inline FlushType flush_type() const { return flush_type_; } private: diff --git a/src/executor/operator/physical_fusion.cppm b/src/executor/operator/physical_fusion.cppm index e34f01a2d1..4d2f08900c 100644 --- a/src/executor/operator/physical_fusion.cppm +++ b/src/executor/operator/physical_fusion.cppm @@ -24,6 +24,7 @@ import table_collection_entry; import base_expression; import fusion_expression; import load_meta; +import infinity_exception; export module physical_fusion; @@ -46,6 +47,11 @@ public: SharedPtr>> GetOutputTypes() const final { return left_->GetOutputTypes(); }; + SizeT TaskletCount() override { + Error("TaskletCount not Implement"); + return 0; + } + String ToString(i64 &space) const; SharedPtr fusion_expr_; diff --git a/src/executor/operator/physical_hash.cppm b/src/executor/operator/physical_hash.cppm index 8e0b761783..c2d9134314 100644 --- a/src/executor/operator/physical_hash.cppm +++ b/src/executor/operator/physical_hash.cppm @@ -21,6 +21,7 @@ import operator_state; import physical_operator; import physical_operator_type; import load_meta; +import infinity_exception; export module physical_hash; @@ -45,6 +46,11 @@ public: inline SharedPtr>> GetOutputTypes() const final { return output_types_; } + SizeT TaskletCount() override { + Error("TaskletCount not Implement"); + return 0; + } + private: SharedPtr> output_names_{}; SharedPtr>> output_types_{}; diff --git a/src/executor/operator/physical_hash_join.cppm b/src/executor/operator/physical_hash_join.cppm index dbd51aaac6..2d89804eca 100644 --- a/src/executor/operator/physical_hash_join.cppm +++ b/src/executor/operator/physical_hash_join.cppm @@ -21,6 +21,7 @@ import operator_state; import physical_operator; import physical_operator_type; import load_meta; +import infinity_exception; export module physical_hash_join; @@ -40,6 +41,11 @@ public: SharedPtr> GetOutputNames() const final; SharedPtr>> GetOutputTypes() const final; + + SizeT TaskletCount() override { + Error("TaskletCount not Implement"); + return 0; + } }; } // namespace infinity diff --git a/src/executor/operator/physical_import.cppm b/src/executor/operator/physical_import.cppm index 106055b0ab..9c6abc7ba5 100644 --- a/src/executor/operator/physical_import.cppm +++ b/src/executor/operator/physical_import.cppm @@ -29,6 +29,7 @@ import block_entry; import block_column_entry; import zsv; import load_meta; +import infinity_exception; export module physical_import; @@ -78,6 +79,11 @@ public: inline SharedPtr>> GetOutputTypes() const final { return output_types_; } + SizeT TaskletCount() override { + Error("TaskletCount not Implement"); + return 0; + } + void ImportFVECS(QueryContext *query_context, ImportOperatorState *import_op_state); /// for push based execution diff --git a/src/executor/operator/physical_index_join.cppm b/src/executor/operator/physical_index_join.cppm index b8c9a60fa2..6818f204e5 100644 --- a/src/executor/operator/physical_index_join.cppm +++ b/src/executor/operator/physical_index_join.cppm @@ -21,6 +21,7 @@ import operator_state; import physical_operator; import physical_operator_type; import load_meta; +import infinity_exception; export module physical_index_join; @@ -40,6 +41,11 @@ public: SharedPtr> GetOutputNames() const final; SharedPtr>> GetOutputTypes() const final; + + SizeT TaskletCount() override { + Error("TaskletCount not Implement"); + return 0; + } }; } // namespace infinity diff --git a/src/executor/operator/physical_index_scan.cppm b/src/executor/operator/physical_index_scan.cppm index 3fad332cf8..6cee077b80 100644 --- a/src/executor/operator/physical_index_scan.cppm +++ b/src/executor/operator/physical_index_scan.cppm @@ -21,6 +21,7 @@ import operator_state; import physical_operator; import physical_operator_type; import load_meta; +import infinity_exception; export module physical_index_scan; @@ -41,6 +42,11 @@ public: inline SharedPtr>> GetOutputTypes() const final { return output_types_; } + SizeT TaskletCount() override { + Error("TaskletCount not Implement"); + return 0; + } + private: SharedPtr> output_names_{}; SharedPtr>> output_types_{}; diff --git a/src/executor/operator/physical_insert.cppm b/src/executor/operator/physical_insert.cppm index 6411a1131e..9d9f3eb133 100644 --- a/src/executor/operator/physical_insert.cppm +++ b/src/executor/operator/physical_insert.cppm @@ -22,6 +22,7 @@ import physical_operator; import physical_operator_type; import base_expression; import load_meta; +import infinity_exception; export module physical_insert; @@ -53,6 +54,11 @@ public: inline SharedPtr>> GetOutputTypes() const final { return output_types_; } + SizeT TaskletCount() override { + Error("TaskletCount not Implement"); + return 0; + } + private: TableCollectionEntry *table_collection_entry_{}; u64 table_index_{}; diff --git a/src/executor/operator/physical_intersect.cppm b/src/executor/operator/physical_intersect.cppm index 5c782f1b92..237111d81a 100644 --- a/src/executor/operator/physical_intersect.cppm +++ b/src/executor/operator/physical_intersect.cppm @@ -21,6 +21,7 @@ import operator_state; import physical_operator; import physical_operator_type; import load_meta; +import infinity_exception; export module physical_intersect; @@ -45,6 +46,11 @@ public: inline SharedPtr>> GetOutputTypes() const final { return output_types_; } + SizeT TaskletCount() override { + Error("TaskletCount not Implement"); + return 0; + } + private: SharedPtr> output_names_{}; SharedPtr>> output_types_{}; diff --git a/src/executor/operator/physical_knn_scan.cppm b/src/executor/operator/physical_knn_scan.cppm index 7cad2d3488..99bb304d1d 100644 --- a/src/executor/operator/physical_knn_scan.cppm +++ b/src/executor/operator/physical_knn_scan.cppm @@ -27,6 +27,7 @@ import table_collection_entry; import block_index; import load_meta; import knn_expression; +import infinity_exception; export module physical_knn_scan; @@ -75,6 +76,10 @@ public: return block_column_entries_->size() + index_entries_->size(); } + SizeT TaskletCount() override { + return block_column_entries_->size() + index_entries_->size(); + } + void FillingTableRefs(HashMap> &table_refs) override { table_refs.insert({base_table_ref_->table_index_, base_table_ref_}); } diff --git a/src/executor/operator/physical_limit.cppm b/src/executor/operator/physical_limit.cppm index 7f5d6747ff..f1a7c7c45f 100644 --- a/src/executor/operator/physical_limit.cppm +++ b/src/executor/operator/physical_limit.cppm @@ -23,6 +23,7 @@ import physical_operator_type; import base_expression; import data_table; import load_meta; +import infinity_exception; export module physical_limit; @@ -48,6 +49,11 @@ public: inline SharedPtr>> GetOutputTypes() const final { return left_->GetOutputTypes(); } + SizeT TaskletCount() override { + Error("TaskletCount not Implement"); + return 0; + } + inline const SharedPtr &limit_expr() const { return limit_expr_; } inline const SharedPtr &offset_expr() const { return offset_expr_; } diff --git a/src/executor/operator/physical_match.cppm b/src/executor/operator/physical_match.cppm index 1406c0676d..269edf8552 100644 --- a/src/executor/operator/physical_match.cppm +++ b/src/executor/operator/physical_match.cppm @@ -25,6 +25,7 @@ import base_expression; import match_expression; import base_table_ref; import load_meta; +import infinity_exception; export module physical_match; @@ -48,6 +49,11 @@ public: SharedPtr>> GetOutputTypes() const final; + SizeT TaskletCount() override { + Error("TaskletCount not Implement"); + return 0; + } + void FillingTableRefs(HashMap> &table_refs) override { table_refs.insert({base_table_ref_->table_index_, base_table_ref_}); } diff --git a/src/executor/operator/physical_merge_aggregate.cpp b/src/executor/operator/physical_merge_aggregate.cpp new file mode 100644 index 0000000000..7775588187 --- /dev/null +++ b/src/executor/operator/physical_merge_aggregate.cpp @@ -0,0 +1,32 @@ +// Copyright(C) 2023 InfiniFlow, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +module; + +import query_context; +import operator_state; +import infinity_exception; + +module physical_merge_aggregate; + +namespace infinity { + +void PhysicalMergeAggregate::Init() {} + +bool PhysicalMergeAggregate::Execute(QueryContext *query_context, OperatorState *operator_state) { + Error("Not Implement"); + return false; +} + +} // namespace infinity diff --git a/src/executor/operator/physical_merge_aggregate.cppm b/src/executor/operator/physical_merge_aggregate.cppm new file mode 100644 index 0000000000..8dda917458 --- /dev/null +++ b/src/executor/operator/physical_merge_aggregate.cppm @@ -0,0 +1,64 @@ +// Copyright(C) 2023 InfiniFlow, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +module; + +import stl; +import parser; +import query_context; +import operator_state; +import physical_operator; +import physical_operator_type; +import load_meta; +import base_table_ref; +import infinity_exception; + +export module physical_merge_aggregate; + +namespace infinity { + +export class PhysicalMergeAggregate final : public PhysicalOperator { +public: + explicit PhysicalMergeAggregate(u64 id, + SharedPtr table_ref, + UniquePtr left, + SharedPtr> output_names, + SharedPtr>> output_types, + SharedPtr> load_metas) + : PhysicalOperator(PhysicalOperatorType::kMergeAggregate, Move(left), nullptr, id, load_metas), output_names_(Move(output_names)), + output_types_(Move(output_types)), table_ref_(Move(table_ref)) {} + + ~PhysicalMergeAggregate() override = default; + + void Init() override; + + bool Execute(QueryContext *query_context, OperatorState *operator_state) final; + + inline SharedPtr> GetOutputNames() const final { return output_names_; } + + inline SharedPtr>> GetOutputTypes() const final { return output_types_; } + + SizeT TaskletCount() override { + Error("TaskletCount not Implement"); + return 0; + } +private: + SharedPtr> output_names_{}; + SharedPtr>> output_types_{}; + +public: + SharedPtr table_ref_{}; +}; + +} // namespace infinity diff --git a/src/executor/operator/physical_merge_hash.cppm b/src/executor/operator/physical_merge_hash.cppm index d27a4ccec7..b24c6a5311 100644 --- a/src/executor/operator/physical_merge_hash.cppm +++ b/src/executor/operator/physical_merge_hash.cppm @@ -21,6 +21,7 @@ import operator_state; import physical_operator; import physical_operator_type; import load_meta; +import infinity_exception; export module physical_merge_hash; @@ -42,6 +43,11 @@ public: inline SharedPtr>> GetOutputTypes() const final { return output_types_; } + SizeT TaskletCount() override { + Error("TaskletCount not Implement"); + return 0; + } + private: SharedPtr> output_names_{}; SharedPtr>> output_types_{}; diff --git a/src/executor/operator/physical_merge_knn.cppm b/src/executor/operator/physical_merge_knn.cppm index 7a61b74260..1af3792d6e 100644 --- a/src/executor/operator/physical_merge_knn.cppm +++ b/src/executor/operator/physical_merge_knn.cppm @@ -25,6 +25,7 @@ import data_table; import base_table_ref; import load_meta; import knn_expression; +import infinity_exception; export module physical_merge_knn; @@ -53,6 +54,11 @@ public: inline SharedPtr>> GetOutputTypes() const final { return output_types_; } + SizeT TaskletCount() override { + Error("TaskletCount not Implement"); + return 0; + } + inline u64 knn_table_index() const { return knn_table_index_; } private: diff --git a/src/executor/operator/physical_merge_limit.cppm b/src/executor/operator/physical_merge_limit.cppm index 71a6b5c6e4..50e956eaf0 100644 --- a/src/executor/operator/physical_merge_limit.cppm +++ b/src/executor/operator/physical_merge_limit.cppm @@ -21,6 +21,7 @@ import operator_state; import physical_operator; import physical_operator_type; import load_meta; +import infinity_exception; export module physical_merge_limit; @@ -45,6 +46,11 @@ public: inline SharedPtr>> GetOutputTypes() const final { return output_types_; } + SizeT TaskletCount() override { + Error("TaskletCount not Implement"); + return 0; + } + private: SharedPtr> output_names_{}; SharedPtr>> output_types_{}; diff --git a/src/executor/operator/physical_merge_parallel_aggregate.cppm b/src/executor/operator/physical_merge_parallel_aggregate.cppm index 00e49b479e..d11821e636 100644 --- a/src/executor/operator/physical_merge_parallel_aggregate.cppm +++ b/src/executor/operator/physical_merge_parallel_aggregate.cppm @@ -21,6 +21,7 @@ import operator_state; import physical_operator; import physical_operator_type; import load_meta; +import infinity_exception; export module physical_merge_parallel_aggregate; @@ -45,6 +46,11 @@ public: inline SharedPtr>> GetOutputTypes() const final { return output_types_; } + SizeT TaskletCount() override { + Error("TaskletCount not Implement"); + return 0; + } + private: SharedPtr> output_names_{}; SharedPtr>> output_types_{}; diff --git a/src/executor/operator/physical_merge_sort.cppm b/src/executor/operator/physical_merge_sort.cppm index 15eb14352c..be959ae651 100644 --- a/src/executor/operator/physical_merge_sort.cppm +++ b/src/executor/operator/physical_merge_sort.cppm @@ -21,6 +21,7 @@ import operator_state; import physical_operator; import physical_operator_type; import load_meta; +import infinity_exception; export module physical_merge_sort; @@ -45,6 +46,11 @@ public: inline SharedPtr>> GetOutputTypes() const final { return output_types_; } + SizeT TaskletCount() override { + Error("TaskletCount not Implement"); + return 0; + } + private: SharedPtr> output_names_{}; SharedPtr>> output_types_{}; diff --git a/src/executor/operator/physical_merge_top.cppm b/src/executor/operator/physical_merge_top.cppm index dd83e82be8..926e9a1dff 100644 --- a/src/executor/operator/physical_merge_top.cppm +++ b/src/executor/operator/physical_merge_top.cppm @@ -21,6 +21,7 @@ import operator_state; import physical_operator; import physical_operator_type; import load_meta; +import infinity_exception; export module physical_merge_top; @@ -42,6 +43,11 @@ public: inline SharedPtr>> GetOutputTypes() const final { return output_types_; } + SizeT TaskletCount() override { + Error("TaskletCount not Implement"); + return 0; + } + private: SharedPtr> output_names_{}; SharedPtr>> output_types_{}; diff --git a/src/executor/operator/physical_nested_loop_join.cppm b/src/executor/operator/physical_nested_loop_join.cppm index f3d5fe3f13..242d1f31f0 100644 --- a/src/executor/operator/physical_nested_loop_join.cppm +++ b/src/executor/operator/physical_nested_loop_join.cppm @@ -23,6 +23,7 @@ import physical_operator_type; import base_expression; import data_table; import load_meta; +import infinity_exception; export module physical_nested_loop_join; @@ -49,6 +50,11 @@ public: SharedPtr>> GetOutputTypes() const final; + SizeT TaskletCount() override { + Error("TaskletCount not Implement"); + return 0; + } + inline const Vector> &conditions() const { return conditions_; } private: diff --git a/src/executor/operator/physical_optimize.cppm b/src/executor/operator/physical_optimize.cppm index 68354f768e..73b6b97f9a 100644 --- a/src/executor/operator/physical_optimize.cppm +++ b/src/executor/operator/physical_optimize.cppm @@ -22,6 +22,7 @@ import physical_operator; import physical_operator_type; import base_expression; import load_meta; +import infinity_exception; export module physical_optimize; @@ -43,6 +44,11 @@ public: inline SharedPtr>> GetOutputTypes() const final { return output_types_; } + SizeT TaskletCount() override { + Error("TaskletCount not Implement"); + return 0; + } + inline OptimizeType optimize_type() const { return optimize_type_; } private: diff --git a/src/executor/operator/physical_parallel_aggregate.cppm b/src/executor/operator/physical_parallel_aggregate.cppm index e5a656d484..962b1f5eb9 100644 --- a/src/executor/operator/physical_parallel_aggregate.cppm +++ b/src/executor/operator/physical_parallel_aggregate.cppm @@ -22,6 +22,7 @@ import physical_operator; import physical_operator_type; import base_expression; import load_meta; +import infinity_exception; export module physical_parallel_aggregate; @@ -46,6 +47,11 @@ public: inline SharedPtr>> GetOutputTypes() const final { return output_types_; } + SizeT TaskletCount() override { + Error("TaskletCount not Implement"); + return 0; + } + private: SharedPtr> output_names_{}; SharedPtr>> output_types_{}; diff --git a/src/executor/operator/physical_prepared_plan.cppm b/src/executor/operator/physical_prepared_plan.cppm index d25441e269..cc4c505488 100644 --- a/src/executor/operator/physical_prepared_plan.cppm +++ b/src/executor/operator/physical_prepared_plan.cppm @@ -22,6 +22,7 @@ import physical_operator; import physical_operator_type; import base_expression; import load_meta; +import infinity_exception; export module physical_prepared_plan; @@ -42,6 +43,11 @@ public: inline SharedPtr>> GetOutputTypes() const final { return output_types_; } + SizeT TaskletCount() override { + Error("TaskletCount not Implement"); + return 0; + } + private: SharedPtr> output_names_{}; SharedPtr>> output_types_{}; diff --git a/src/executor/operator/physical_project.cppm b/src/executor/operator/physical_project.cppm index 0b689e0db0..89db6b19da 100644 --- a/src/executor/operator/physical_project.cppm +++ b/src/executor/operator/physical_project.cppm @@ -22,6 +22,7 @@ import physical_operator; import physical_operator_type; import base_expression; import load_meta; +import infinity_exception; export module physical_project; @@ -47,6 +48,11 @@ public: SharedPtr>> GetOutputTypes() const final; + SizeT TaskletCount() override { + Error("TaskletCount not Implement"); + return 0; + } + Vector> expressions_{}; inline u64 TableIndex() const { return projection_table_index_; } diff --git a/src/executor/operator/physical_show.cppm b/src/executor/operator/physical_show.cppm index 8d8bc727f0..c3b97eb9f3 100644 --- a/src/executor/operator/physical_show.cppm +++ b/src/executor/operator/physical_show.cppm @@ -23,6 +23,7 @@ import physical_operator_type; import base_expression; import logical_show; import load_meta; +import infinity_exception; export module physical_show; @@ -51,6 +52,11 @@ public: inline SharedPtr>> GetOutputTypes() const final { return output_types_; } + SizeT TaskletCount() override { + Error("TaskletCount not Implement"); + return 0; + } + inline ShowType scan_type() const { return scan_type_; } inline const String &db_name() const { return db_name_; }; diff --git a/src/executor/operator/physical_sink.cpp b/src/executor/operator/physical_sink.cpp index 6c44160b0c..c970fb199a 100644 --- a/src/executor/operator/physical_sink.cpp +++ b/src/executor/operator/physical_sink.cpp @@ -144,6 +144,19 @@ void PhysicalSink::FillSinkStateFromLastOperatorState(MaterializeSinkState *mate knn_output_state->data_block_array_.clear(); break; } + case PhysicalOperatorType::kAggregate: { + AggregateOperatorState *agg_output_state = static_cast(task_op_state); + if (agg_output_state->data_block_array_.empty()) { + if(materialize_sink_state->Error()) { + materialize_sink_state->empty_result_ = true; + } else { + Error("Empty sort output"); + } + } else { + materialize_sink_state->data_block_array_ = Move(agg_output_state->data_block_array_); + } + break; + } default: { Error(Format("{} isn't supported here.", PhysicalOperatorToString(task_op_state->operator_type_))); } diff --git a/src/executor/operator/physical_sink.cppm b/src/executor/operator/physical_sink.cppm index eb2a4e98e8..fb5379711d 100644 --- a/src/executor/operator/physical_sink.cppm +++ b/src/executor/operator/physical_sink.cppm @@ -21,6 +21,7 @@ import operator_state; import physical_operator; import physical_operator_type; import load_meta; +import infinity_exception; export module physical_sink; @@ -51,6 +52,11 @@ public: inline SharedPtr>> GetOutputTypes() const final { return output_types_; } + SizeT TaskletCount() override { + Error("TaskletCount not Implement"); + return 0; + } + inline SinkType sink_type() const { return type_; } private: diff --git a/src/executor/operator/physical_sort.cppm b/src/executor/operator/physical_sort.cppm index 5bbeca264e..09e121edf6 100644 --- a/src/executor/operator/physical_sort.cppm +++ b/src/executor/operator/physical_sort.cppm @@ -25,6 +25,7 @@ import base_expression; import data_table; import data_block; import load_meta; +import infinity_exception; export module physical_sort; @@ -55,6 +56,11 @@ public: inline SharedPtr>> GetOutputTypes() const final { return left_->GetOutputTypes(); } + SizeT TaskletCount() override { + Error("TaskletCount not Implement"); + return 0; + } + Vector> expressions_; Vector order_by_types_{}; diff --git a/src/executor/operator/physical_sort_merge_join.cppm b/src/executor/operator/physical_sort_merge_join.cppm index 9450b32065..9e5b81b7d3 100644 --- a/src/executor/operator/physical_sort_merge_join.cppm +++ b/src/executor/operator/physical_sort_merge_join.cppm @@ -21,8 +21,9 @@ import operator_state; import physical_operator; import physical_operator_type; import load_meta; +import infinity_exception; - export module physical_merge_join; +export module physical_merge_join; namespace infinity { @@ -41,6 +42,11 @@ public: inline SharedPtr>> GetOutputTypes() const final { return output_types_; } + SizeT TaskletCount() override { + Error("TaskletCount not Implement"); + return 0; + } + private: SharedPtr> output_names_{}; SharedPtr>> output_types_{}; diff --git a/src/executor/operator/physical_source.cppm b/src/executor/operator/physical_source.cppm index 7e4046fe61..7b85afc5eb 100644 --- a/src/executor/operator/physical_source.cppm +++ b/src/executor/operator/physical_source.cppm @@ -21,6 +21,7 @@ import operator_state; import physical_operator; import physical_operator_type; import load_meta; +import infinity_exception; export module physical_source; @@ -58,6 +59,11 @@ public: inline SharedPtr>> GetOutputTypes() const final { return output_types_; } + SizeT TaskletCount() override { + Error("TaskletCount not Implement"); + return 0; + } + inline SourceType source_type() const { return type_; } private: diff --git a/src/executor/operator/physical_table_scan.cpp b/src/executor/operator/physical_table_scan.cpp index 2eec382a76..86093198dc 100644 --- a/src/executor/operator/physical_table_scan.cpp +++ b/src/executor/operator/physical_table_scan.cpp @@ -34,6 +34,7 @@ import block_index; import table_collection_entry; import default_values; import infinity_exception; +import infinity_exception; module physical_table_scan; @@ -77,6 +78,8 @@ TableCollectionEntry *PhysicalTableScan::TableEntry() const { return base_table_ SizeT PhysicalTableScan::BlockEntryCount() const { return base_table_ref_->block_index_->BlockCount(); } +SizeT PhysicalTableScan::TaskletCount() { return base_table_ref_->block_index_->BlockCount(); } + BlockIndex *PhysicalTableScan::GetBlockIndex() const { return base_table_ref_->block_index_.get(); } Vector &PhysicalTableScan::ColumnIDs() const { @@ -110,9 +113,8 @@ Vector>> PhysicalTableScan::PlanBlockEntries(i64 return result; } -void PhysicalTableScan::ExecuteInternal(QueryContext *query_context, - TableScanOperatorState *table_scan_operator_state) { - if(!table_scan_operator_state->data_block_array_.empty()) { +void PhysicalTableScan::ExecuteInternal(QueryContext *query_context, TableScanOperatorState *table_scan_operator_state) { + if (!table_scan_operator_state->data_block_array_.empty()) { Error("Table scan output data block array should be empty"); } diff --git a/src/executor/operator/physical_table_scan.cppm b/src/executor/operator/physical_table_scan.cppm index c8fc6dea9f..bbeddc6291 100644 --- a/src/executor/operator/physical_table_scan.cppm +++ b/src/executor/operator/physical_table_scan.cppm @@ -48,6 +48,8 @@ public: bool Execute(QueryContext *query_context, OperatorState *operator_state) final; + SizeT TaskletCount() override; + SharedPtr> GetOutputNames() const final; SharedPtr>> GetOutputTypes() const final; diff --git a/src/executor/operator/physical_top.cppm b/src/executor/operator/physical_top.cppm index d5776b7e19..795f959ae9 100644 --- a/src/executor/operator/physical_top.cppm +++ b/src/executor/operator/physical_top.cppm @@ -23,6 +23,7 @@ import physical_operator_type; import base_expression; import data_table; import load_meta; +import infinity_exception; export module physical_top; @@ -42,6 +43,11 @@ public: inline SharedPtr> GetOutputNames() const final { return left_->GetOutputNames(); } inline SharedPtr>> GetOutputTypes() const final { return left_->GetOutputTypes(); } + + SizeT TaskletCount() override { + Error("TaskletCount not Implement"); + return 0; + } }; } // namespace infinity diff --git a/src/executor/operator/physical_union_all.cppm b/src/executor/operator/physical_union_all.cppm index 3c8189ed7b..1e4b12e07a 100644 --- a/src/executor/operator/physical_union_all.cppm +++ b/src/executor/operator/physical_union_all.cppm @@ -23,6 +23,7 @@ import physical_operator_type; import base_expression; import data_table; import load_meta; +import infinity_exception; export module physical_union_all; @@ -43,6 +44,11 @@ public: inline SharedPtr>> GetOutputTypes() const final { return output_types_; } + SizeT TaskletCount() override { + Error("TaskletCount not Implement"); + return 0; + } + private: SharedPtr> output_names_{}; SharedPtr>> output_types_{}; diff --git a/src/executor/operator/physical_update.cppm b/src/executor/operator/physical_update.cppm index e029c82364..75cae63ee2 100644 --- a/src/executor/operator/physical_update.cppm +++ b/src/executor/operator/physical_update.cppm @@ -23,6 +23,7 @@ import physical_operator_type; import table_collection_entry; import base_expression; import load_meta; +import infinity_exception; export module physical_update; @@ -48,6 +49,11 @@ public: inline SharedPtr>> GetOutputTypes() const final { return output_types_; } + SizeT TaskletCount() override { + Error("TaskletCount not Implement"); + return 0; + } + TableCollectionEntry *table_entry_ptr_; const Vector>> &update_columns_; diff --git a/src/executor/operator_state.cppm b/src/executor/operator_state.cppm index 01cec78889..25a7f5b479 100644 --- a/src/executor/operator_state.cppm +++ b/src/executor/operator_state.cppm @@ -56,6 +56,11 @@ export struct AggregateOperatorState : public OperatorState { inline explicit AggregateOperatorState() : OperatorState(PhysicalOperatorType::kAggregate) {} }; +// Merge Aggregate +export struct MergeAggregateOperatorState : public OperatorState { + inline explicit MergeAggregateOperatorState() : OperatorState(PhysicalOperatorType::kMergeAggregate) {} +}; + // Merge Parallel Aggregate export struct MergeParallelAggregateOperatorState : public OperatorState { inline explicit MergeParallelAggregateOperatorState() : OperatorState(PhysicalOperatorType::kMergeParallelAggregate) {} diff --git a/src/executor/physical_operator.cppm b/src/executor/physical_operator.cppm index 67c153a3af..74f9757861 100644 --- a/src/executor/physical_operator.cppm +++ b/src/executor/physical_operator.cppm @@ -39,6 +39,8 @@ public: virtual void Init() = 0; + virtual SizeT TaskletCount() = 0; + inline PhysicalOperator* left() const { return left_.get(); } inline PhysicalOperator* right() const { return right_.get(); } diff --git a/src/executor/physical_operator_type.cpp b/src/executor/physical_operator_type.cpp index 264a2c8a3b..3fea83d9b7 100644 --- a/src/executor/physical_operator_type.cpp +++ b/src/executor/physical_operator_type.cpp @@ -130,6 +130,8 @@ String PhysicalOperatorToString(PhysicalOperatorType type) { return "Match"; case PhysicalOperatorType::kFusion: return "Fusion"; + case PhysicalOperatorType::kMergeAggregate: + return "MergeAggregate"; } Error("Unknown physical operator type"); diff --git a/src/executor/physical_operator_type.cppm b/src/executor/physical_operator_type.cppm index c11f61fad2..5d24b12b76 100644 --- a/src/executor/physical_operator_type.cppm +++ b/src/executor/physical_operator_type.cppm @@ -24,6 +24,7 @@ export enum class PhysicalOperatorType : i8 { kInvalid = 0, kAggregate, + kMergeAggregate, kParallelAggregate, kMergeParallelAggregate, diff --git a/src/executor/physical_planner.cpp b/src/executor/physical_planner.cpp index fc69894498..ed60458b5b 100644 --- a/src/executor/physical_planner.cpp +++ b/src/executor/physical_planner.cpp @@ -53,6 +53,8 @@ import physical_merge_hash; import physical_merge_join; import physical_merge_knn; import physical_merge_limit; +import physical_aggregate; +import physical_merge_aggregate; import physical_merge_parallel_aggregate; import physical_merge_sort; import physical_merge_top; @@ -490,13 +492,26 @@ UniquePtr PhysicalPlanner::BuildAggregate(const SharedPtr(logical_aggregate->node_id(), - Move(input_physical_operator), - logical_aggregate->groups_, - logical_aggregate->groupby_index_, - logical_aggregate->aggregates_, - logical_aggregate->aggregate_index_, - logical_operator->load_metas()); + SizeT tasklet_count = input_physical_operator->TaskletCount(); + + auto physical_agg_op = MakeUnique(logical_aggregate->node_id(), + Move(input_physical_operator), + logical_aggregate->groups_, + logical_aggregate->groupby_index_, + logical_aggregate->aggregates_, + logical_aggregate->aggregate_index_, + logical_operator->load_metas()); + + if (tasklet_count == 1) { + return physical_agg_op; + } else { + return MakeUnique(query_context_ptr_->GetNextNodeID(), + logical_aggregate->base_table_ref_, + Move(physical_agg_op), + logical_aggregate->GetOutputNames(), + logical_aggregate->GetOutputTypes(), + logical_operator->load_metas()); + } } UniquePtr PhysicalPlanner::BuildJoin(const SharedPtr &logical_operator) const { @@ -713,18 +728,18 @@ UniquePtr PhysicalPlanner::BuildFusion(const SharedPtr PhysicalPlanner::BuildKnn(const SharedPtr &logical_operator) const { auto *logical_knn_scan = (LogicalKnnScan *)(logical_operator.get()); -// logical_knn_scan-> + // logical_knn_scan-> UniquePtr knn_scan_op = MakeUnique(logical_knn_scan->node_id(), - logical_knn_scan->base_table_ref_, - logical_knn_scan->knn_expression_, - logical_knn_scan->filter_expression_, - logical_knn_scan->GetOutputNames(), - logical_knn_scan->GetOutputTypes(), - logical_knn_scan->knn_table_index_, - logical_operator->load_metas()); + logical_knn_scan->base_table_ref_, + logical_knn_scan->knn_expression_, + logical_knn_scan->filter_expression_, + logical_knn_scan->GetOutputNames(), + logical_knn_scan->GetOutputTypes(), + logical_knn_scan->knn_table_index_, + logical_operator->load_metas()); knn_scan_op->PlanWithIndex(query_context_ptr_); - if(knn_scan_op->TaskCount() == 1) { + if (knn_scan_op->TaskletCount() == 1) { return knn_scan_op; } else { return MakeUnique(query_context_ptr_->GetNextNodeID(), diff --git a/src/planner/bound_select_statement.cpp b/src/planner/bound_select_statement.cpp index cc39e8049a..96188bd692 100644 --- a/src/planner/bound_select_statement.cpp +++ b/src/planner/bound_select_statement.cpp @@ -83,7 +83,9 @@ SharedPtr BoundSelectStatement::BuildPlan(QueryContext *query_conte if (!group_by_expressions_.empty() || !aggregate_expressions_.empty()) { // Build logical aggregate + auto base_table_ref = static_pointer_cast(table_ref_ptr_); auto aggregate = MakeShared(bind_context->GetNewLogicalNodeId(), + base_table_ref, group_by_expressions_, groupby_index_, aggregate_expressions_, diff --git a/src/planner/node/logical_aggregate.cppm b/src/planner/node/logical_aggregate.cppm index 1e8b5620e4..979727b4b0 100644 --- a/src/planner/node/logical_aggregate.cppm +++ b/src/planner/node/logical_aggregate.cppm @@ -20,6 +20,7 @@ import column_binding; import logical_node; import base_expression; import parser; +import base_table_ref; export module logical_aggregate; @@ -28,12 +29,13 @@ namespace infinity { export class LogicalAggregate : public LogicalNode { public: explicit LogicalAggregate(u64 node_id, + SharedPtr base_table_ref, Vector> groups, u64 groupby_index, Vector> aggregates, u64 aggregate_index) - : LogicalNode(node_id, LogicalNodeType::kAggregate), groups_(Move(groups)), groupby_index_(groupby_index), - aggregates_(Move(aggregates)), aggregate_index_(aggregate_index) {} + : LogicalNode(node_id, LogicalNodeType::kAggregate), groups_(Move(groups)), groupby_index_(groupby_index), aggregates_(Move(aggregates)), + aggregate_index_(aggregate_index), base_table_ref_(Move(base_table_ref)) {} [[nodiscard]] Vector GetColumnBindings() const final; @@ -50,6 +52,8 @@ public: Vector> aggregates_{}; u64 aggregate_index_{}; + + SharedPtr base_table_ref_; }; } // namespace infinity diff --git a/src/planner/subquery/subquery_unnest.cpp b/src/planner/subquery/subquery_unnest.cpp index 0e0f93f68e..90e2c29234 100644 --- a/src/planner/subquery/subquery_unnest.cpp +++ b/src/planner/subquery/subquery_unnest.cpp @@ -126,7 +126,7 @@ SharedPtr SubqueryUnnest::UnnestUncorrelated(SubqueryExpression u64 group_by_index = bind_context->GenerateTableIndex(); u64 aggregate_index = bind_context->GenerateTableIndex(); SharedPtr aggregate_node = - MakeShared(bind_context->GetNewLogicalNodeId(), groups, group_by_index, aggregates, aggregate_index); + MakeShared(bind_context->GetNewLogicalNodeId(), nullptr, groups, group_by_index, aggregates, aggregate_index); aggregate_node->set_left_node(limit_node); diff --git a/src/scheduler/fragment_context.cpp b/src/scheduler/fragment_context.cpp index c8624db107..7d9ad8f448 100644 --- a/src/scheduler/fragment_context.cpp +++ b/src/scheduler/fragment_context.cpp @@ -144,6 +144,9 @@ MakeTaskState(SizeT operator_id, const Vector &physical_ops, case PhysicalOperatorType::kAggregate: { return MakeTaskStateTemplate(physical_ops[operator_id]); } + case PhysicalOperatorType::kMergeAggregate: { + return MakeTaskStateTemplate(physical_ops[operator_id]); + } case PhysicalOperatorType::kParallelAggregate: { return MakeTaskStateTemplate(physical_ops[operator_id]); } @@ -494,7 +497,7 @@ void FragmentContext::CreateTasks(i64 cpu_count, i64 operator_count) { switch (first_operator->operator_type()) { case PhysicalOperatorType::kTableScan: { auto *table_scan_operator = static_cast(first_operator); - parallel_count = Min(parallel_count, (i64)(table_scan_operator->BlockEntryCount())); + parallel_count = Min(parallel_count, (i64)(table_scan_operator->TaskletCount())); if (parallel_count == 0) { parallel_count = 1; } @@ -580,7 +583,7 @@ void FragmentContext::CreateTasks(i64 cpu_count, i64 operator_count) { Error( Format("{} shouldn't be the first operator of the fragment", PhysicalOperatorToString(first_operator->operator_type()))); } - case PhysicalOperatorType::kMergeParallelAggregate: + case PhysicalOperatorType::kMergeAggregate: case PhysicalOperatorType::kMergeHash: case PhysicalOperatorType::kMergeLimit: case PhysicalOperatorType::kMergeTop: @@ -691,8 +694,7 @@ void FragmentContext::CreateTasks(i64 cpu_count, i64 operator_count) { case PhysicalOperatorType::kLimit: case PhysicalOperatorType::kTop: { if (fragment_type_ != FragmentType::kParallelStream) { - Error( - Format("{} should in parallel stream fragment", PhysicalOperatorToString(last_operator->operator_type()))); + Error(Format("{} should in parallel stream fragment", PhysicalOperatorToString(last_operator->operator_type()))); } if ((i64)tasks_.size() != parallel_count) { diff --git a/test/sql/ddl/drop/test_drop.slt b/test/sql/ddl/drop/test_drop.slt index 7fcaae10e4..5691f3d35e 100644 --- a/test/sql/ddl/drop/test_drop.slt +++ b/test/sql/ddl/drop/test_drop.slt @@ -7,20 +7,24 @@ # expect: success statement ok -CREATE TABLE a (i INTEGER); +DROP TABLE IF EXISTS test_drop; + +statement ok +CREATE TABLE test_drop (i INTEGER); -# crash! -#statement ok -#INSERT INTO a VALUES (42); +query I +INSERT INTO test_drop VALUES (42); +---- -statement error -SELECT COUNT(*) FROM a; +#statement error +#SELECT COUNT(*) FROM a; query I -SELECT * FROM a; +SELECT * FROM test_drop; ---- +42 statement ok -DROP TABLE a; \ No newline at end of file +DROP TABLE test_drop; \ No newline at end of file diff --git a/test/sql/dql/aggregate/test_simple_agg.slt b/test/sql/dql/aggregate/test_simple_agg.slt new file mode 100644 index 0000000000..91d6c79730 --- /dev/null +++ b/test/sql/dql/aggregate/test_simple_agg.slt @@ -0,0 +1,59 @@ +statement ok +DROP TABLE IF EXISTS simple_agg; + +statement ok +CREATE TABLE simple_agg (c1 INTEGER , c2 FLOAT); + +# insert data +query I +INSERT INTO simple_agg VALUES (1, 1.0),(2,2.0),(3,3.0); +---- + +query I +SELECT SUM(c1) FROM simple_agg +---- +6 + +query II +SELECT SUM(c2) FROM simple_agg +---- +6.000000 + +query I +SELECT AVG(c1) FROM simple_agg +---- +2.000000 + +query II +SELECT AVG(c2) FROM simple_agg +---- +2.000000 + +query I +SELECT MIN(c1) FROM simple_agg +---- +1 + +query II +SELECT MIN(c2) FROM simple_agg +---- +1.000000 + +query I +SELECT MAX(c1) FROM simple_agg +---- +3 + +query II +SELECT MAX(c2) FROM simple_agg +---- +3.000000 + +query I +SELECT COUNT(c1) FROM simple_agg +---- +3 + + +statement ok +DROP TABLE simple_agg; diff --git a/tools/sqllogictest.py b/tools/sqllogictest.py index b87e44f474..f6bbf53a81 100644 --- a/tools/sqllogictest.py +++ b/tools/sqllogictest.py @@ -10,6 +10,7 @@ def python_skd_test(python_test_dir: str): print("python test path is {}".format(python_test_dir)) # os.system(f"cd {python_test_dir}/test") + os.system(f"pip install infinity_sdk") os.system(f"python -m pytest {python_test_dir}/test")