From 536967a7e61f4324f8a1d0f3bb816b63b76328ca Mon Sep 17 00:00:00 2001 From: Xwg Date: Mon, 25 Dec 2023 17:28:16 +0800 Subject: [PATCH] Finish basic simple aggregate count, sum, min, max --- src/executor/fragment_builder.cpp | 8 ++ src/executor/operator/physical_aggregate.cpp | 112 +++++++++++------- src/executor/operator/physical_aggregate.cppm | 4 +- 3 files changed, 80 insertions(+), 44 deletions(-) diff --git a/src/executor/fragment_builder.cpp b/src/executor/fragment_builder.cpp index 1639787853..751d5a78c0 100644 --- a/src/executor/fragment_builder.cpp +++ b/src/executor/fragment_builder.cpp @@ -120,6 +120,14 @@ void FragmentBuilder::BuildFragments(PhysicalOperator *phys_op, PlanFragment *cu return; } case PhysicalOperatorType::kAggregate: { + current_fragment_ptr->AddOperator(phys_op); + if (phys_op->left() == nullptr) { + Error("No input node of aggregate operator"); + } else { + current_fragment_ptr->SetFragmentType(FragmentType::kParallelMaterialize); + BuildFragments(phys_op->left(), current_fragment_ptr); + } + return; current_fragment_ptr->AddOperator(phys_op); current_fragment_ptr->SetSourceNode(query_context_ptr_, SourceType::kLocalQueue, phys_op->GetOutputNames(), phys_op->GetOutputTypes()); if (phys_op->left() == nullptr) { diff --git a/src/executor/operator/physical_aggregate.cpp b/src/executor/operator/physical_aggregate.cpp index 81da357e1e..825f159c12 100644 --- a/src/executor/operator/physical_aggregate.cpp +++ b/src/executor/operator/physical_aggregate.cpp @@ -24,24 +24,27 @@ import parser; import operator_state; import data_block; import utility; +import logger; +import column_vector; import infinity_exception; import default_values; +import parser; +import expression_state; +import expression_evaluator; module physical_aggregate; - namespace infinity { void PhysicalAggregate::Init() {} -bool PhysicalAggregate::Execute(QueryContext *, OperatorState *) { -#if 0 - input_table_ = left_->output(); - ExecutorAssert(input_table_ != nullptr, "No left input."); +bool PhysicalAggregate::Execute(QueryContext *query_context, OperatorState *operator_state) { + OperatorState *prev_op_state = operator_state->prev_op_state_; + auto *aggregate_operator_state = static_cast(operator_state); // 1. Execute group-by expressions to generate unique key. - ExpressionEvaluator groupby_executor; - groupby_executor.Init(groups_); + // ExpressionEvaluator groupby_executor; + // groupby_executor.Init(groups_); Vector> groupby_columns; SizeT group_count = groups_.size(); @@ -49,10 +52,13 @@ bool PhysicalAggregate::Execute(QueryContext *, OperatorState *) { if(group_count == 0) { // Aggregate without group by expression // e.g. SELECT count(a) FROM table; - SimpleAggregate(this->output_); - return ; + if (SimpleAggregate(this->output_, prev_op_state, aggregate_operator_state)) { + return true; + } else { + return false; + } } - +#if 0 groupby_columns.reserve(group_count); Vector types; @@ -553,10 +559,13 @@ void PhysicalAggregate::GenerateGroupByResult(const SharedPtr &input_ #endif } -void PhysicalAggregate::SimpleAggregate(SharedPtr &) { -#if 0 +bool PhysicalAggregate::SimpleAggregate(SharedPtr &output_table, + OperatorState *pre_operator_state, + AggregateOperatorState *aggregate_operator_state) { SizeT aggregates_count = aggregates_.size(); - ExecutorAssert(aggregates_count > 0, "Simple Aggregate without aggregate expression"); + if (aggregates_count <= 0) { + Error("Simple Aggregate without aggregate expression."); + } // Prepare the output table columns Vector> aggregate_columns; @@ -570,10 +579,11 @@ void PhysicalAggregate::SimpleAggregate(SharedPtr &) { Vector> output_types; output_types.reserve(aggregates_count); - SizeT input_data_block_count = input_table_->DataBlockCount(); + SizeT input_block_count = pre_operator_state->data_block_array_.size(); + for (i64 idx = 0; auto &expr: aggregates_) { // expression state - expr_states.emplace_back(ExpressionState::CreateState(expr, input_data_block_count)); + expr_states.emplace_back(ExpressionState::CreateState(expr)); SharedPtr output_type = MakeShared(expr->Type()); @@ -590,41 +600,57 @@ void PhysicalAggregate::SimpleAggregate(SharedPtr &) { ++idx; } - // output aggregate table definition - SharedPtr aggregate_tabledef = TableDef::Make(MakeShared("default"), - MakeShared("aggregate"), - aggregate_columns); - - output_table = DataTable::Make(aggregate_tabledef, TableType::kAggregate); - - SharedPtr output_data_block = DataBlock::Make(); - output_data_block->Init(output_types); - - if(input_data_block_count == 0) { + if (input_block_count == 0) { // No input data - LOG_TRACE("No input, no aggregate result") - return ; + LOG_TRACE("No input, no aggregate result"); + return true; } // Loop blocks - ExpressionEvaluator evaluator; - evaluator.Init(input_table_->data_blocks_); - for (SizeT expr_idx = 0; expr_idx < aggregates_count; ++expr_idx) { - Vector> blocks_column; - blocks_column.emplace_back(output_data_block->column_vectors[expr_idx]); - evaluator.Execute(aggregates_[expr_idx], - expr_states[expr_idx], - blocks_column); - if(blocks_column[0].get() != output_data_block->column_vectors[expr_idx].get()) { - // column vector in blocks column might be changed to the column vector from column reference. - // This check and assignment is to make sure the right column vector are assign to output_data_block - output_data_block->column_vectors[expr_idx] = blocks_column[0]; + // ExpressionEvaluator evaluator; + // //evaluator.Init(input_table_->data_blocks_); + // for (SizeT expr_idx = 0; expr_idx < aggregates_count; ++expr_idx) { + // + // ExpressionEvaluator evaluator; + // evaluator.Init(aggregates_[]) + // Vector> blocks_column; + // blocks_column.emplace_back(output_data_block->column_vectors[expr_idx]); + // evaluator.Execute(aggregates_[expr_idx], expr_states[expr_idx], blocks_column[expr_idx]); + // if(blocks_column[0].get() != output_data_block->column_vectors[expr_idx].get()) { + // // column vector in blocks column might be changed to the column vector from column reference. + // // This check and assignment is to make sure the right column vector are assign to output_data_block + // output_data_block->column_vectors[expr_idx] = blocks_column[0]; + // } + // } + // + // output_data_block->Finalize(); + + for (SizeT block_idx = 0; block_idx < input_block_count; ++block_idx) { + DataBlock *input_data_block = pre_operator_state->data_block_array_[block_idx].get(); + + aggregate_operator_state->data_block_array_.emplace_back(DataBlock::MakeUniquePtr()); + DataBlock *output_data_block = aggregate_operator_state->data_block_array_.back().get(); + output_data_block->Init(*GetOutputTypes()); + + ExpressionEvaluator evaluator; + evaluator.Init(input_data_block); + + SizeT expression_count = aggregates_count; + // Prepare the expression states + + for (SizeT expr_idx = 0; expr_idx < expression_count; ++expr_idx) { + // Vector> blocks_column; + // blocks_column.emplace_back(output_data_block->column_vectors[expr_idx]); + evaluator.Execute(aggregates_[expr_idx], expr_states[expr_idx], output_data_block->column_vectors[expr_idx]); } + output_data_block->Finalize(); } - output_data_block->Finalize(); - output_table->Append(output_data_block); -#endif + pre_operator_state->data_block_array_.clear(); + if (pre_operator_state->Complete()) { + aggregate_operator_state->SetComplete(); + } + return true; } SharedPtr> PhysicalAggregate::GetOutputNames() const { diff --git a/src/executor/operator/physical_aggregate.cppm b/src/executor/operator/physical_aggregate.cppm index 35f7a2d6a8..0ae1a183e7 100644 --- a/src/executor/operator/physical_aggregate.cppm +++ b/src/executor/operator/physical_aggregate.cppm @@ -60,7 +60,9 @@ public: Vector> aggregates_{}; HashTable hash_table_; - void SimpleAggregate(SharedPtr &output_table); + bool SimpleAggregate(SharedPtr &output_table, + OperatorState *pre_operator_state, + AggregateOperatorState *aggregate_operator_state); inline u64 GroupTableIndex() const { return groupby_index_; }