Skip to content

Commit

Permalink
Finish basic simple aggregate count, sum, min, max, avg one block (#381)
Browse files Browse the repository at this point in the history
* Finish basic simple aggregate count, sum, min, max

* temp

* Add physical op virtual function TaskletCount

* Fix drop stl
  • Loading branch information
loloxwg authored Dec 26, 2023
1 parent b4ce17f commit ec1c922
Show file tree
Hide file tree
Showing 71 changed files with 639 additions and 83 deletions.
10 changes: 9 additions & 1 deletion src/executor/fragment_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,14 @@ void FragmentBuilder::BuildFragments(PhysicalOperator *phys_op, PlanFragment *cu
return;
}
case PhysicalOperatorType::kAggregate: {
current_fragment_ptr->AddOperator(phys_op);
if (phys_op->left() == nullptr) {
Error<SchedulerException>("No input node of aggregate operator");
} else {
current_fragment_ptr->SetFragmentType(FragmentType::kParallelMaterialize);
BuildFragments(phys_op->left(), current_fragment_ptr);
}
return;
current_fragment_ptr->AddOperator(phys_op);
current_fragment_ptr->SetSourceNode(query_context_ptr_, SourceType::kLocalQueue, phys_op->GetOutputNames(), phys_op->GetOutputTypes());
if (phys_op->left() == nullptr) {
Expand Down Expand Up @@ -160,7 +168,7 @@ void FragmentBuilder::BuildFragments(PhysicalOperator *phys_op, PlanFragment *cu
break;
}
case PhysicalOperatorType::kFusion:
case PhysicalOperatorType::kMergeParallelAggregate:
case PhysicalOperatorType::kMergeAggregate:
case PhysicalOperatorType::kMergeHash:
case PhysicalOperatorType::kMergeLimit:
case PhysicalOperatorType::kMergeTop:
Expand Down
9 changes: 7 additions & 2 deletions src/executor/operator/physcial_drop_view.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import operator_state;
import physical_operator;
import physical_operator_type;
import load_meta;
import infinity_exception;

export module physical_drop_view;

Expand All @@ -35,15 +36,19 @@ public:
u64 id,
SharedPtr<Vector<LoadMeta>> load_metas)
: PhysicalOperator(PhysicalOperatorType::kDropView, nullptr, nullptr, id, load_metas), schema_name_(Move(schema_name)),
view_name_(Move(view_name)), conflict_type_(conflict_type), output_names_(Move(output_names)),
output_types_(Move(output_types)) {}
view_name_(Move(view_name)), conflict_type_(conflict_type), output_names_(Move(output_names)), output_types_(Move(output_types)) {}

~PhysicalDropView() override = default;

void Init() override;

bool Execute(QueryContext *query_context, OperatorState *operator_state) final;

SizeT TaskletCount() override {
Error<NotImplementException>("TaskletCount not Implement");
return 0;
}

inline SharedPtr<Vector<String>> GetOutputNames() const final { return output_names_; }

inline SharedPtr<Vector<SharedPtr<DataType>>> GetOutputTypes() const final { return output_types_; }
Expand Down
112 changes: 69 additions & 43 deletions src/executor/operator/physical_aggregate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,35 +24,41 @@ import parser;
import operator_state;
import data_block;
import utility;
import logger;
import column_vector;

import infinity_exception;
import default_values;
import parser;
import expression_state;
import expression_evaluator;

module physical_aggregate;

namespace infinity {

void PhysicalAggregate::Init() {}

bool PhysicalAggregate::Execute(QueryContext *, OperatorState *) {
#if 0
input_table_ = left_->output();
ExecutorAssert(input_table_ != nullptr, "No left input.");
bool PhysicalAggregate::Execute(QueryContext *query_context, OperatorState *operator_state) {
OperatorState *prev_op_state = operator_state->prev_op_state_;
auto *aggregate_operator_state = static_cast<AggregateOperatorState *>(operator_state);

// 1. Execute group-by expressions to generate unique key.
ExpressionEvaluator groupby_executor;
groupby_executor.Init(groups_);
// ExpressionEvaluator groupby_executor;
// groupby_executor.Init(groups_);

Vector<SharedPtr<ColumnDef>> groupby_columns;
SizeT group_count = groups_.size();

if(group_count == 0) {
// Aggregate without group by expression
// e.g. SELECT count(a) FROM table;
SimpleAggregate(this->output_);
return ;
if (SimpleAggregate(this->output_, prev_op_state, aggregate_operator_state)) {
return true;
} else {
return false;
}
}

#if 0
groupby_columns.reserve(group_count);

Vector<DataType> types;
Expand Down Expand Up @@ -553,10 +559,13 @@ void PhysicalAggregate::GenerateGroupByResult(const SharedPtr<DataTable> &input_
#endif
}

void PhysicalAggregate::SimpleAggregate(SharedPtr<DataTable> &) {
#if 0
bool PhysicalAggregate::SimpleAggregate(SharedPtr<DataTable> &output_table,
OperatorState *pre_operator_state,
AggregateOperatorState *aggregate_operator_state) {
SizeT aggregates_count = aggregates_.size();
ExecutorAssert(aggregates_count > 0, "Simple Aggregate without aggregate expression");
if (aggregates_count <= 0) {
Error<ExecutorException>("Simple Aggregate without aggregate expression.");
}

// Prepare the output table columns
Vector<SharedPtr<ColumnDef>> aggregate_columns;
Expand All @@ -570,10 +579,11 @@ void PhysicalAggregate::SimpleAggregate(SharedPtr<DataTable> &) {
Vector<SharedPtr<DataType>> output_types;
output_types.reserve(aggregates_count);

SizeT input_data_block_count = input_table_->DataBlockCount();
SizeT input_block_count = pre_operator_state->data_block_array_.size();

for (i64 idx = 0; auto &expr: aggregates_) {
// expression state
expr_states.emplace_back(ExpressionState::CreateState(expr, input_data_block_count));
expr_states.emplace_back(ExpressionState::CreateState(expr));

SharedPtr<DataType> output_type = MakeShared<DataType>(expr->Type());

Expand All @@ -590,41 +600,57 @@ void PhysicalAggregate::SimpleAggregate(SharedPtr<DataTable> &) {
++idx;
}

// output aggregate table definition
SharedPtr<TableDef> aggregate_tabledef = TableDef::Make(MakeShared<String>("default"),
MakeShared<String>("aggregate"),
aggregate_columns);

output_table = DataTable::Make(aggregate_tabledef, TableType::kAggregate);

SharedPtr<DataBlock> output_data_block = DataBlock::Make();
output_data_block->Init(output_types);

if(input_data_block_count == 0) {
if (input_block_count == 0) {
// No input data
LOG_TRACE("No input, no aggregate result")
return ;
LOG_TRACE("No input, no aggregate result");
return true;
}
// Loop blocks

ExpressionEvaluator evaluator;
evaluator.Init(input_table_->data_blocks_);
for (SizeT expr_idx = 0; expr_idx < aggregates_count; ++expr_idx) {
Vector<SharedPtr<ColumnVector>> blocks_column;
blocks_column.emplace_back(output_data_block->column_vectors[expr_idx]);
evaluator.Execute(aggregates_[expr_idx],
expr_states[expr_idx],
blocks_column);
if(blocks_column[0].get() != output_data_block->column_vectors[expr_idx].get()) {
// column vector in blocks column might be changed to the column vector from column reference.
// This check and assignment is to make sure the right column vector are assign to output_data_block
output_data_block->column_vectors[expr_idx] = blocks_column[0];
// ExpressionEvaluator evaluator;
// //evaluator.Init(input_table_->data_blocks_);
// for (SizeT expr_idx = 0; expr_idx < aggregates_count; ++expr_idx) {
//
// ExpressionEvaluator evaluator;
// evaluator.Init(aggregates_[])
// Vector<SharedPtr<ColumnVector>> blocks_column;
// blocks_column.emplace_back(output_data_block->column_vectors[expr_idx]);
// evaluator.Execute(aggregates_[expr_idx], expr_states[expr_idx], blocks_column[expr_idx]);
// if(blocks_column[0].get() != output_data_block->column_vectors[expr_idx].get()) {
// // column vector in blocks column might be changed to the column vector from column reference.
// // This check and assignment is to make sure the right column vector are assign to output_data_block
// output_data_block->column_vectors[expr_idx] = blocks_column[0];
// }
// }
//
// output_data_block->Finalize();

for (SizeT block_idx = 0; block_idx < input_block_count; ++block_idx) {
DataBlock *input_data_block = pre_operator_state->data_block_array_[block_idx].get();

aggregate_operator_state->data_block_array_.emplace_back(DataBlock::MakeUniquePtr());
DataBlock *output_data_block = aggregate_operator_state->data_block_array_.back().get();
output_data_block->Init(*GetOutputTypes());

ExpressionEvaluator evaluator;
evaluator.Init(input_data_block);

SizeT expression_count = aggregates_count;
// Prepare the expression states

for (SizeT expr_idx = 0; expr_idx < expression_count; ++expr_idx) {
// Vector<SharedPtr<ColumnVector>> blocks_column;
// blocks_column.emplace_back(output_data_block->column_vectors[expr_idx]);
evaluator.Execute(aggregates_[expr_idx], expr_states[expr_idx], output_data_block->column_vectors[expr_idx]);
}
output_data_block->Finalize();
}

output_data_block->Finalize();
output_table->Append(output_data_block);
#endif
pre_operator_state->data_block_array_.clear();
if (pre_operator_state->Complete()) {
aggregate_operator_state->SetComplete();
}
return true;
}

SharedPtr<Vector<String>> PhysicalAggregate::GetOutputNames() const {
Expand Down
10 changes: 9 additions & 1 deletion src/executor/operator/physical_aggregate.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import data_table;
import hash_table;
import base_expression;
import load_meta;
import infinity_exception;

export module physical_aggregate;

Expand Down Expand Up @@ -52,6 +53,11 @@ public:

bool Execute(QueryContext *query_context, OperatorState *operator_state) final;

SizeT TaskletCount() override {
Error<NotImplementException>("TaskletCount not Implement");
return 0;
}

void GroupByInputTable(const SharedPtr<DataTable> &input_table, SharedPtr<DataTable> &output_table);

void GenerateGroupByResult(const SharedPtr<DataTable> &input_table, SharedPtr<DataTable> &output_table);
Expand All @@ -60,7 +66,9 @@ public:
Vector<SharedPtr<BaseExpression>> aggregates_{};
HashTable hash_table_;

void SimpleAggregate(SharedPtr<DataTable> &output_table);
bool SimpleAggregate(SharedPtr<DataTable> &output_table,
OperatorState *pre_operator_state,
AggregateOperatorState *aggregate_operator_state);

inline u64 GroupTableIndex() const { return groupby_index_; }

Expand Down
6 changes: 6 additions & 0 deletions src/executor/operator/physical_alter.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import operator_state;
import physical_operator;
import physical_operator_type;
import load_meta;
import infinity_exception;

export module physical_alter;

Expand All @@ -41,6 +42,11 @@ public:

bool Execute(QueryContext *query_context, OperatorState *operator_state) final;

SizeT TaskletCount() override {
Error<NotImplementException>("TaskletCount not Implement");
return 0;
}

inline SharedPtr<Vector<String>> GetOutputNames() const final { return output_names_; }

inline SharedPtr<Vector<SharedPtr<DataType>>> GetOutputTypes() const final { return output_types_; }
Expand Down
6 changes: 6 additions & 0 deletions src/executor/operator/physical_command.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import query_context;
import parser;
import operator_state;
import load_meta;
import infinity_exception;

export module physical_command;

Expand All @@ -43,6 +44,11 @@ public:

bool Execute(QueryContext *query_context, OperatorState *operator_state) override;

SizeT TaskletCount() override {
Error<NotImplementException>("TaskletCount not Implement");
return 0;
}

inline SharedPtr<Vector<String>> GetOutputNames() const override { return output_names_; }

inline SharedPtr<Vector<SharedPtr<DataType>>> GetOutputTypes() const override { return output_types_; }
Expand Down
6 changes: 6 additions & 0 deletions src/executor/operator/physical_create_collection.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import operator_state;
import physical_operator;
import physical_operator_type;
import load_meta;
import infinity_exception;

export module physical_create_collection;

Expand All @@ -43,6 +44,11 @@ public:

bool Execute(QueryContext *query_context, OperatorState *operator_state) final;

SizeT TaskletCount() override {
Error<NotImplementException>("TaskletCount not Implement");
return 0;
}

inline SharedPtr<Vector<String>> GetOutputNames() const final { return output_names_; }

inline SharedPtr<Vector<SharedPtr<DataType>>> GetOutputTypes() const final { return output_types_; }
Expand Down
6 changes: 6 additions & 0 deletions src/executor/operator/physical_create_index.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import physical_operator;
import physical_operator_type;
import index_def;
import load_meta;
import infinity_exception;

export module physical_create_index;

Expand All @@ -32,6 +33,11 @@ public:

bool Execute(QueryContext *query_context, OperatorState *operator_state) override;

SizeT TaskletCount() override {
Error<NotImplementException>("TaskletCount not Implement");
return 0;
}

SharedPtr<Vector<String>> GetOutputNames() const override { return output_names_; }

SharedPtr<Vector<SharedPtr<DataType>>> GetOutputTypes() const override { return output_types_; }
Expand Down
6 changes: 6 additions & 0 deletions src/executor/operator/physical_create_schema.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import physical_operator;
import physical_operator_type;
import index_def;
import load_meta;
import infinity_exception;

export module physical_create_schema;

Expand All @@ -44,6 +45,11 @@ public:

bool Execute(QueryContext *query_context, OperatorState *operator_state) final;

SizeT TaskletCount() override {
Error<NotImplementException>("TaskletCount not Implement");
return 0;
}

inline SharedPtr<Vector<String>> GetOutputNames() const final { return output_names_; }

inline SharedPtr<Vector<SharedPtr<DataType>>> GetOutputTypes() const final { return output_types_; }
Expand Down
5 changes: 5 additions & 0 deletions src/executor/operator/physical_create_table.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import physical_operator_type;
import index_def;
import table_def;
import load_meta;
import infinity_exception;

export module physical_create_table;

Expand Down Expand Up @@ -54,6 +55,10 @@ public:

bool Execute(QueryContext *query_context, OperatorState *operator_state) final;

SizeT TaskletCount() override {
Error<NotImplementException>("TaskletCount not Implement");
return 0;
}
inline SharedPtr<Vector<String>> GetOutputNames() const final { return output_names_; }

inline SharedPtr<Vector<SharedPtr<DataType>>> GetOutputTypes() const final { return output_types_; }
Expand Down
6 changes: 6 additions & 0 deletions src/executor/operator/physical_create_view.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import physical_operator;
import physical_operator_type;
import index_def;
import load_meta;
import infinity_exception;

export module physical_create_view;

Expand All @@ -43,6 +44,11 @@ public:

bool Execute(QueryContext *query_context, OperatorState *operator_state) final;

SizeT TaskletCount() override {
Error<NotImplementException>("TaskletCount not Implement");
return 0;
}

inline const SharedPtr<CreateViewInfo> &bound_select_statement() const { return create_view_info_; };

inline SharedPtr<Vector<String>> GetOutputNames() const final { return output_names_; }
Expand Down
Loading

0 comments on commit ec1c922

Please sign in to comment.