Skip to content

Commit

Permalink
Finish basic simple aggregate count, sum, min, max
Browse files Browse the repository at this point in the history
  • Loading branch information
loloxwg committed Dec 26, 2023
1 parent 06d6093 commit 536967a
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 44 deletions.
8 changes: 8 additions & 0 deletions src/executor/fragment_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,14 @@ void FragmentBuilder::BuildFragments(PhysicalOperator *phys_op, PlanFragment *cu
return;
}
case PhysicalOperatorType::kAggregate: {
current_fragment_ptr->AddOperator(phys_op);
if (phys_op->left() == nullptr) {
Error<SchedulerException>("No input node of aggregate operator");
} else {
current_fragment_ptr->SetFragmentType(FragmentType::kParallelMaterialize);
BuildFragments(phys_op->left(), current_fragment_ptr);
}
return;
current_fragment_ptr->AddOperator(phys_op);
current_fragment_ptr->SetSourceNode(query_context_ptr_, SourceType::kLocalQueue, phys_op->GetOutputNames(), phys_op->GetOutputTypes());
if (phys_op->left() == nullptr) {
Expand Down
112 changes: 69 additions & 43 deletions src/executor/operator/physical_aggregate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,35 +24,41 @@ import parser;
import operator_state;
import data_block;
import utility;
import logger;
import column_vector;

import infinity_exception;
import default_values;
import parser;
import expression_state;
import expression_evaluator;

module physical_aggregate;

namespace infinity {

void PhysicalAggregate::Init() {}

bool PhysicalAggregate::Execute(QueryContext *, OperatorState *) {
#if 0
input_table_ = left_->output();
ExecutorAssert(input_table_ != nullptr, "No left input.");
bool PhysicalAggregate::Execute(QueryContext *query_context, OperatorState *operator_state) {
OperatorState *prev_op_state = operator_state->prev_op_state_;
auto *aggregate_operator_state = static_cast<AggregateOperatorState *>(operator_state);

// 1. Execute group-by expressions to generate unique key.
ExpressionEvaluator groupby_executor;
groupby_executor.Init(groups_);
// ExpressionEvaluator groupby_executor;
// groupby_executor.Init(groups_);

Vector<SharedPtr<ColumnDef>> groupby_columns;
SizeT group_count = groups_.size();

if(group_count == 0) {
// Aggregate without group by expression
// e.g. SELECT count(a) FROM table;
SimpleAggregate(this->output_);
return ;
if (SimpleAggregate(this->output_, prev_op_state, aggregate_operator_state)) {
return true;
} else {
return false;
}
}

#if 0
groupby_columns.reserve(group_count);

Vector<DataType> types;
Expand Down Expand Up @@ -553,10 +559,13 @@ void PhysicalAggregate::GenerateGroupByResult(const SharedPtr<DataTable> &input_
#endif
}

void PhysicalAggregate::SimpleAggregate(SharedPtr<DataTable> &) {
#if 0
bool PhysicalAggregate::SimpleAggregate(SharedPtr<DataTable> &output_table,
OperatorState *pre_operator_state,
AggregateOperatorState *aggregate_operator_state) {
SizeT aggregates_count = aggregates_.size();
ExecutorAssert(aggregates_count > 0, "Simple Aggregate without aggregate expression");
if (aggregates_count <= 0) {
Error<ExecutorException>("Simple Aggregate without aggregate expression.");
}

// Prepare the output table columns
Vector<SharedPtr<ColumnDef>> aggregate_columns;
Expand All @@ -570,10 +579,11 @@ void PhysicalAggregate::SimpleAggregate(SharedPtr<DataTable> &) {
Vector<SharedPtr<DataType>> output_types;
output_types.reserve(aggregates_count);

SizeT input_data_block_count = input_table_->DataBlockCount();
SizeT input_block_count = pre_operator_state->data_block_array_.size();

for (i64 idx = 0; auto &expr: aggregates_) {
// expression state
expr_states.emplace_back(ExpressionState::CreateState(expr, input_data_block_count));
expr_states.emplace_back(ExpressionState::CreateState(expr));

SharedPtr<DataType> output_type = MakeShared<DataType>(expr->Type());

Expand All @@ -590,41 +600,57 @@ void PhysicalAggregate::SimpleAggregate(SharedPtr<DataTable> &) {
++idx;
}

// output aggregate table definition
SharedPtr<TableDef> aggregate_tabledef = TableDef::Make(MakeShared<String>("default"),
MakeShared<String>("aggregate"),
aggregate_columns);

output_table = DataTable::Make(aggregate_tabledef, TableType::kAggregate);

SharedPtr<DataBlock> output_data_block = DataBlock::Make();
output_data_block->Init(output_types);

if(input_data_block_count == 0) {
if (input_block_count == 0) {
// No input data
LOG_TRACE("No input, no aggregate result")
return ;
LOG_TRACE("No input, no aggregate result");
return true;
}
// Loop blocks

ExpressionEvaluator evaluator;
evaluator.Init(input_table_->data_blocks_);
for (SizeT expr_idx = 0; expr_idx < aggregates_count; ++expr_idx) {
Vector<SharedPtr<ColumnVector>> blocks_column;
blocks_column.emplace_back(output_data_block->column_vectors[expr_idx]);
evaluator.Execute(aggregates_[expr_idx],
expr_states[expr_idx],
blocks_column);
if(blocks_column[0].get() != output_data_block->column_vectors[expr_idx].get()) {
// column vector in blocks column might be changed to the column vector from column reference.
// This check and assignment is to make sure the right column vector are assign to output_data_block
output_data_block->column_vectors[expr_idx] = blocks_column[0];
// ExpressionEvaluator evaluator;
// //evaluator.Init(input_table_->data_blocks_);
// for (SizeT expr_idx = 0; expr_idx < aggregates_count; ++expr_idx) {
//
// ExpressionEvaluator evaluator;
// evaluator.Init(aggregates_[])
// Vector<SharedPtr<ColumnVector>> blocks_column;
// blocks_column.emplace_back(output_data_block->column_vectors[expr_idx]);
// evaluator.Execute(aggregates_[expr_idx], expr_states[expr_idx], blocks_column[expr_idx]);
// if(blocks_column[0].get() != output_data_block->column_vectors[expr_idx].get()) {
// // column vector in blocks column might be changed to the column vector from column reference.
// // This check and assignment is to make sure the right column vector are assign to output_data_block
// output_data_block->column_vectors[expr_idx] = blocks_column[0];
// }
// }
//
// output_data_block->Finalize();

for (SizeT block_idx = 0; block_idx < input_block_count; ++block_idx) {
DataBlock *input_data_block = pre_operator_state->data_block_array_[block_idx].get();

aggregate_operator_state->data_block_array_.emplace_back(DataBlock::MakeUniquePtr());
DataBlock *output_data_block = aggregate_operator_state->data_block_array_.back().get();
output_data_block->Init(*GetOutputTypes());

ExpressionEvaluator evaluator;
evaluator.Init(input_data_block);

SizeT expression_count = aggregates_count;
// Prepare the expression states

for (SizeT expr_idx = 0; expr_idx < expression_count; ++expr_idx) {
// Vector<SharedPtr<ColumnVector>> blocks_column;
// blocks_column.emplace_back(output_data_block->column_vectors[expr_idx]);
evaluator.Execute(aggregates_[expr_idx], expr_states[expr_idx], output_data_block->column_vectors[expr_idx]);
}
output_data_block->Finalize();
}

output_data_block->Finalize();
output_table->Append(output_data_block);
#endif
pre_operator_state->data_block_array_.clear();
if (pre_operator_state->Complete()) {
aggregate_operator_state->SetComplete();
}
return true;
}

SharedPtr<Vector<String>> PhysicalAggregate::GetOutputNames() const {
Expand Down
4 changes: 3 additions & 1 deletion src/executor/operator/physical_aggregate.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@ public:
Vector<SharedPtr<BaseExpression>> aggregates_{};
HashTable hash_table_;

void SimpleAggregate(SharedPtr<DataTable> &output_table);
bool SimpleAggregate(SharedPtr<DataTable> &output_table,
OperatorState *pre_operator_state,
AggregateOperatorState *aggregate_operator_state);

inline u64 GroupTableIndex() const { return groupby_index_; }

Expand Down

0 comments on commit 536967a

Please sign in to comment.