From 7632a96b8c98aa739f2f40f7bea8cd546b126f90 Mon Sep 17 00:00:00 2001 From: kould Date: Wed, 27 Dec 2023 17:06:38 +0800 Subject: [PATCH 1/7] feat: impl limit --- src/executor/fragment/plan_fragment.cppm | 1 + src/executor/fragment_builder.cpp | 1 + src/executor/operator/physical_limit.cpp | 273 +++++++++++------- src/executor/operator/physical_limit.cppm | 63 +++- .../operator/physical_merge_limit.cpp | 36 ++- .../operator/physical_merge_limit.cppm | 23 +- src/executor/operator/physical_sink.cpp | 15 +- src/executor/operator_state.cpp | 11 +- src/executor/operator_state.cppm | 27 +- src/executor/physical_planner.cpp | 11 +- src/planner/bound_select_statement.cpp | 6 + src/planner/optimizer/lazy_load.cpp | 1 + src/scheduler/fragment_context.cpp | 33 ++- src/scheduler/fragment_context.cppm | 16 +- src/scheduler/fragment_data.cppm | 2 +- src/storage/data_block.cpp | 2 +- src/storage/data_block.cppm | 2 +- src/storage/txn/txn_store.cpp | 6 +- tools/generate_limit.py | 82 ++++++ tools/sqllogictest.py | 2 + 20 files changed, 446 insertions(+), 167 deletions(-) create mode 100644 tools/generate_limit.py diff --git a/src/executor/fragment/plan_fragment.cppm b/src/executor/fragment/plan_fragment.cppm index 374766f9e3..1eefcfce62 100644 --- a/src/executor/fragment/plan_fragment.cppm +++ b/src/executor/fragment/plan_fragment.cppm @@ -18,6 +18,7 @@ import stl; import parser; import data_table; import fragment_context; +import operator_state; import physical_operator; import physical_source; import physical_sink; diff --git a/src/executor/fragment_builder.cpp b/src/executor/fragment_builder.cpp index cf69d1d76b..41a529b173 100644 --- a/src/executor/fragment_builder.cpp +++ b/src/executor/fragment_builder.cpp @@ -22,6 +22,7 @@ import physical_sink; import physical_source; import physical_explain; import physical_knn_scan; +import operator_state; import infinity_exception; import parser; diff --git a/src/executor/operator/physical_limit.cpp b/src/executor/operator/physical_limit.cpp index b8ad6ea2ea..35a2581a17 100644 --- a/src/executor/operator/physical_limit.cpp +++ b/src/executor/operator/physical_limit.cpp @@ -18,9 +18,13 @@ module; import stl; import txn; +import base_expression; +import default_values; +import load_meta; import query_context; import table_def; import data_table; +import default_values; import parser; import physical_operator_type; import operator_state; @@ -34,130 +38,201 @@ module physical_limit; namespace infinity { -void PhysicalLimit::Init() {} +SizeT AtomicCounter::Offset(SizeT row_count) { + auto success = false; + SizeT result; -bool PhysicalLimit::Execute(QueryContext *query_context, OperatorState *operator_state) { + while (!success) { + i64 current_offset = offset_; + if (current_offset <= 0) { + return 0; + } + i64 last_offset = current_offset - row_count; -#if 0 - // output table definition is same as input - input_table_ = left_->output(); - Assert(input_table_.get() != nullptr, "No input"); + if (last_offset > 0) { + success = offset_.compare_exchange_strong(current_offset, last_offset); + result = row_count; + } else { + success = offset_.compare_exchange_strong(current_offset, 0); + result = current_offset; + } + } + + return result; +} - Assert(limit_expr_->type() == ExpressionType::kValue, "Currently, only support constant limit expression"); +SizeT AtomicCounter::Limit(SizeT row_count) { + auto success = false; + SizeT result; - i64 limit = (static_pointer_cast(limit_expr_))->GetValue().value_.big_int; - Assert(limit > 0, "Limit should be larger than 0"); + while (!success) { + i64 current_limit = limit_; + if (current_limit <= 0) { + return 0; + } + i64 last_limit = current_limit - row_count; + + if (last_limit > 0) { + success = limit_.compare_exchange_strong(current_limit, last_limit); + result = row_count; + } else { + success = limit_.compare_exchange_strong(current_limit, 0); + result = current_limit; + } + } + + return result; +} + +bool AtomicCounter::IsLimitOver() { + if (limit_ < 0) { + Error("limit is not allowed to be smaller than 0"); + } + return limit_ == 0; +} + +SizeT UnSyncCounter::Offset(SizeT row_count) { + SizeT result; + + if (offset_ <= 0) { + return 0; + } + i64 last_offset = offset_ - row_count; + + if (last_offset > 0) { + result = row_count; + offset_ = last_offset; + } else { + result = offset_; + offset_ = 0; + } + + return result; +} + +SizeT UnSyncCounter::Limit(SizeT row_count) { + SizeT result; + + if (limit_ <= 0) { + return 0; + } + i64 last_limit = limit_ - row_count; + + if (last_limit > 0) { + result = row_count; + limit_ = last_limit; + } else { + result = limit_; + limit_ = 0; + } + + return result; +} + +bool UnSyncCounter::IsLimitOver() { + if (limit_ < 0) { + Error("limit is not allowed to be smaller than 0"); + } + return limit_ == 0; +} + +PhysicalLimit::PhysicalLimit(u64 id, + UniquePtr left, + SharedPtr limit_expr, + SharedPtr offset_expr, + SharedPtr> load_metas) + : PhysicalOperator(PhysicalOperatorType::kLimit, Move(left), nullptr, id, load_metas), limit_expr_(Move(limit_expr)), + offset_expr_(Move(offset_expr)) { i64 offset = 0; + i64 limit = (static_pointer_cast(limit_expr_))->GetValue().value_.big_int; + if (offset_expr_ != nullptr) { - Assert(offset_expr_->type() == ExpressionType::kValue, "Currently, only support constant limit expression"); offset = (static_pointer_cast(offset_expr_))->GetValue().value_.big_int; - Assert(offset >= 0 && offset < input_table_->row_count(), - "Offset should be larger or equal than 0 and less than row number"); } - output_ = GetLimitOutput(input_table_, limit, offset); -#endif - return true; + counter_ = MakeShared(offset, limit); } -SharedPtr PhysicalLimit::GetLimitOutput(const SharedPtr &input_table, i64 limit, i64 offset) { - SizeT start_block = 0; - SizeT start_row_id = 0; - SizeT end_block = 0; - SizeT end_row_id = 0; +void PhysicalLimit::Init() {} - if (offset == 0) { - if (limit >= (i64)input_table->row_count()) { - return input_table; - } else { - start_block = 0; - start_row_id = 0; - SizeT block_count = input_table->DataBlockCount(); - i64 total_row_count = limit; - for (SizeT block_id = 0; block_id < block_count; ++block_id) { - SizeT block_row_count = input_table->GetDataBlockById(block_id)->row_count(); - if (total_row_count > (i64)block_row_count) { - total_row_count -= block_row_count; - } else { - end_block = block_id; - end_row_id = total_row_count; - break; - } - } - } - } else { - i64 total_row_count = offset; - SizeT block_count = input_table->DataBlockCount(); - SizeT rest_row_count = 0; - for (SizeT block_id = 0; block_id < block_count; ++block_id) { - SizeT block_row_count = input_table->GetDataBlockById(block_id)->row_count(); - if (total_row_count >= (i64)block_row_count) { - total_row_count -= block_row_count; - } else { - start_block = block_id; - start_row_id = total_row_count; - rest_row_count = block_row_count - total_row_count; - break; - } - } +SizeT PhysicalLimit::TaskletCount() { + i64 limit = (static_pointer_cast(limit_expr_))->GetValue().value_.big_int; - total_row_count = limit; - if (total_row_count <= (i64)rest_row_count) { - end_block = start_block; - end_row_id = total_row_count; - } else { - total_row_count -= rest_row_count; - for (SizeT block_id = start_block + 1; block_id < block_count; ++block_id) { - SizeT block_row_count = input_table->GetDataBlockById(block_id)->row_count(); - if (total_row_count > (i64)block_row_count) { - total_row_count -= block_row_count; - } else { - end_block = block_id; - end_row_id = total_row_count; - break; - } - } - } + return limit / DEFAULT_BLOCK_CAPACITY; +} + +// offset limit + offset +// left right +// | a | b | c | d | e | f +bool PhysicalLimit::Execute(QueryContext *query_context, + const Vector> &input_blocks, + Vector> &output_blocks, + SharedPtr counter) { + SizeT input_row_count = 0; + + for (SizeT block_id = 0; block_id < input_blocks.size(); block_id++) { + input_row_count += input_blocks[block_id]->row_count(); } - // Copy from input table to output table - SizeT column_count = input_table->ColumnCount(); - Vector> types; - types.reserve(column_count); - Vector> columns; - columns.reserve(column_count); - for (SizeT idx = 0; idx < column_count; ++idx) { - SharedPtr col_type = input_table->GetColumnTypeById(idx); - types.emplace_back(col_type); + SizeT offset = counter->Offset(input_row_count); + if (offset == input_row_count) { + return true; + } - String col_name = input_table->GetColumnNameById(idx); + SizeT limit = counter->Limit(input_row_count - offset); + SizeT block_start_idx = 0; - SharedPtr col_def = MakeShared(idx, col_type, col_name, HashSet()); - columns.emplace_back(col_def); - } + for (SizeT block_id = 0; block_id < input_blocks.size(); block_id++) { + if (input_blocks[block_id]->row_count() == 0) { + continue; + } + SizeT max_offset = input_blocks[block_id]->row_count() - 1; - SharedPtr table_def = TableDef::Make(MakeShared("default"), MakeShared("limit"), columns); - SharedPtr output_table = DataTable::Make(table_def, TableType::kIntermediate); + if (offset > max_offset) { + offset -= max_offset; + continue; + } else { + block_start_idx = block_id; + break; + } + } - const Vector> &input_datablocks = input_table->data_blocks_; + for (SizeT block_id = block_start_idx; block_id < input_blocks.size(); block_id++) { + auto &input_block = input_blocks[block_id]; + auto row_count = input_block->row_count(); + if (row_count == 0) { + continue; + } + auto block = DataBlock::MakeUniquePtr(); - for (SizeT block_id = start_block; block_id <= end_block; ++block_id) { - SizeT input_start_offset = start_row_id; - SizeT input_end_offset; - if (end_block == block_id) { - input_end_offset = end_row_id; + block->Init(input_block->types()); + if (limit >= row_count) { + block->AppendWith(input_block.get(), offset, row_count); + limit -= row_count; } else { - // current input block isn't the last one. - input_end_offset = input_datablocks[block_id]->row_count(); + block->AppendWith(input_block.get(), offset, limit); + limit = 0; + } + block->Finalize(); + output_blocks.push_back(Move(block)); + offset = 0; + + if (limit == 0) { + break; } + } - SharedPtr output_datablock = DataBlock::Make(); - output_datablock->Init(input_datablocks[block_id], input_start_offset, input_end_offset); - output_table->Append(output_datablock); + return true; +} + +bool PhysicalLimit::Execute(QueryContext *query_context, OperatorState *operator_state) { + auto result = Execute(query_context, operator_state->prev_op_state_->data_block_array_, operator_state->data_block_array_, counter_); - start_row_id = 0; + operator_state->prev_op_state_->data_block_array_.clear(); + if (counter_->IsLimitOver() || operator_state->prev_op_state_->Complete()) { + operator_state->SetComplete(); } - return output_table; + return result; } } // namespace infinity diff --git a/src/executor/operator/physical_limit.cppm b/src/executor/operator/physical_limit.cppm index f1a7c7c45f..d793638ccb 100644 --- a/src/executor/operator/physical_limit.cppm +++ b/src/executor/operator/physical_limit.cppm @@ -21,6 +21,7 @@ import operator_state; import physical_operator; import physical_operator_type; import base_expression; +import value_expression; import data_table; import load_meta; import infinity_exception; @@ -29,43 +30,83 @@ export module physical_limit; namespace infinity { +class DataBlock; + +export class LimitCounter { +public: + // Returns the left index after offset + virtual SizeT Offset(SizeT row_count) = 0; + + // Returns the right index after limit + virtual SizeT Limit(SizeT row_count) = 0; + + virtual bool IsLimitOver() = 0; +}; + +export class AtomicCounter : public LimitCounter { +public: + AtomicCounter(const i64 &offset, const i64 &limit) : offset_(offset), limit_(limit) {} + + SizeT Offset(SizeT row_count); + + SizeT Limit(SizeT row_count); + + bool IsLimitOver(); + +private: + ai64 offset_; + ai64 limit_; +}; + +export class UnSyncCounter : public LimitCounter { +public: + UnSyncCounter(const i64 &offset, const i64 &limit) : offset_(offset), limit_(limit) {} + + SizeT Offset(SizeT row_count); + + SizeT Limit(SizeT row_count); + + bool IsLimitOver(); + +private: + i64 offset_; + i64 limit_; +}; + export class PhysicalLimit : public PhysicalOperator { public: explicit PhysicalLimit(u64 id, UniquePtr left, SharedPtr limit_expr, SharedPtr offset_expr, - SharedPtr> load_metas) - : PhysicalOperator(PhysicalOperatorType::kLimit, Move(left), nullptr, id, load_metas), limit_expr_(Move(limit_expr)), - offset_expr_(Move(offset_expr)) {} + SharedPtr> load_metas); ~PhysicalLimit() override = default; void Init() override; + static bool Execute(QueryContext *query_context, + const Vector> &input_blocks, + Vector> &output_blocks, + SharedPtr counter); + bool Execute(QueryContext *query_context, OperatorState *operator_state) final; inline SharedPtr> GetOutputNames() const final { return left_->GetOutputNames(); } inline SharedPtr>> GetOutputTypes() const final { return left_->GetOutputTypes(); } - SizeT TaskletCount() override { - Error("TaskletCount not Implement"); - return 0; - } + SizeT TaskletCount() override; inline const SharedPtr &limit_expr() const { return limit_expr_; } inline const SharedPtr &offset_expr() const { return offset_expr_; } - static SharedPtr GetLimitOutput(const SharedPtr &input_table, i64 limit, i64 offset); - private: SharedPtr limit_expr_{}; SharedPtr offset_expr_{}; - SharedPtr input_table_{}; - u64 input_table_index_{}; + SharedPtr counter_; }; } // namespace infinity diff --git a/src/executor/operator/physical_merge_limit.cpp b/src/executor/operator/physical_merge_limit.cpp index 90a16596ff..3095277391 100644 --- a/src/executor/operator/physical_merge_limit.cpp +++ b/src/executor/operator/physical_merge_limit.cpp @@ -14,15 +14,49 @@ module; +#include + +import stl; import query_context; +import base_expression; +import load_meta; +import physical_operator_type; +import value_expression; +import physical_limit; import operator_state; module physical_merge_limit; namespace infinity { +PhysicalMergeLimit::PhysicalMergeLimit(u64 id, + UniquePtr left, + SharedPtr limit_expr, + SharedPtr offset_expr, + SharedPtr> load_metas) + : PhysicalOperator(PhysicalOperatorType::kMergeLimit, Move(left), nullptr, id, load_metas), limit_expr_(Move(limit_expr)), + offset_expr_(Move(offset_expr)) { + i64 offset = 0; + i64 limit = (static_pointer_cast(limit_expr_))->GetValue().value_.big_int; + + if (offset_expr_ != nullptr) { + offset = (static_pointer_cast(offset_expr_))->GetValue().value_.big_int; + } + counter_ = MakeShared(offset, limit); +} + void PhysicalMergeLimit::Init() {} -bool PhysicalMergeLimit::Execute(QueryContext *, OperatorState *) { return true; } +bool PhysicalMergeLimit::Execute(QueryContext *query_context, OperatorState *operator_state) { + MergeLimitOperatorState *limit_op_state = (MergeLimitOperatorState *)operator_state; + auto result = PhysicalLimit::Execute(query_context, limit_op_state->input_data_blocks_, limit_op_state->data_block_array_, counter_); + + if (counter_->IsLimitOver() || limit_op_state->input_complete_) { + limit_op_state->input_complete_ = true; + limit_op_state->SetComplete(); + } + limit_op_state->input_data_blocks_.clear(); + return result; +} } // namespace infinity diff --git a/src/executor/operator/physical_merge_limit.cppm b/src/executor/operator/physical_merge_limit.cppm index 50e956eaf0..fbae336ca2 100644 --- a/src/executor/operator/physical_merge_limit.cppm +++ b/src/executor/operator/physical_merge_limit.cppm @@ -16,9 +16,11 @@ module; import stl; import parser; +import base_expression; import query_context; import operator_state; import physical_operator; +import physical_limit; import physical_operator_type; import load_meta; import infinity_exception; @@ -29,12 +31,11 @@ namespace infinity { export class PhysicalMergeLimit final : public PhysicalOperator { public: - explicit PhysicalMergeLimit(SharedPtr> output_names, - SharedPtr>> output_types, - u64 id, - SharedPtr> load_metas) - : PhysicalOperator(PhysicalOperatorType::kMergeLimit, nullptr, nullptr, id, load_metas), output_names_(Move(output_names)), - output_types_(Move(output_types)) {} + explicit PhysicalMergeLimit(u64 id, + UniquePtr left, + SharedPtr limit_expr, + SharedPtr offset_expr, + SharedPtr> load_metas); ~PhysicalMergeLimit() override = default; @@ -42,9 +43,9 @@ public: bool Execute(QueryContext *query_context, OperatorState *operator_state) final; - inline SharedPtr> GetOutputNames() const final { return output_names_; } + inline SharedPtr> GetOutputNames() const final { return left_->GetOutputNames(); } - inline SharedPtr>> GetOutputTypes() const final { return output_types_; } + inline SharedPtr>> GetOutputTypes() const final { return left_->GetOutputTypes(); } SizeT TaskletCount() override { Error("TaskletCount not Implement"); @@ -52,8 +53,10 @@ public: } private: - SharedPtr> output_names_{}; - SharedPtr>> output_types_{}; + SharedPtr limit_expr_{}; + SharedPtr offset_expr_{}; + + SharedPtr counter_; }; } // namespace infinity diff --git a/src/executor/operator/physical_sink.cpp b/src/executor/operator/physical_sink.cpp index 1cfc3acd6d..b60a06c560 100644 --- a/src/executor/operator/physical_sink.cpp +++ b/src/executor/operator/physical_sink.cpp @@ -20,6 +20,7 @@ import query_context; import parser; import operator_state; import physical_operator_type; +import fragment_context; import third_party; import fragment_data; import data_block; @@ -102,11 +103,7 @@ void PhysicalSink::FillSinkStateFromLastOperatorState(MaterializeSinkState *mate case PhysicalOperatorType::kProjection: { ProjectionOperatorState *projection_output_state = static_cast(task_op_state); if (projection_output_state->data_block_array_.empty()) { - if(materialize_sink_state->Error()) { - materialize_sink_state->empty_result_ = true; - } else { - Error("Empty projection output"); - } + materialize_sink_state->empty_result_ = true; } else { for (auto &data_block : projection_output_state->data_block_array_) { materialize_sink_state->data_block_array_.emplace_back(Move(data_block)); @@ -137,7 +134,7 @@ void PhysicalSink::FillSinkStateFromLastOperatorState(MaterializeSinkState *mate if(materialize_sink_state->Error()) { materialize_sink_state->empty_result_ = true; } else { - Error("Empty sort output"); + Error("Empty agg output"); } } else { materialize_sink_state->data_block_array_ = Move(agg_output_state->data_block_array_); @@ -358,7 +355,7 @@ void PhysicalSink::FillSinkStateFromLastOperatorState(QueueSinkState *queue_sink return; } - if (!task_operator_state->Complete()) { + if (!task_operator_state->Complete() && queue_sink_state->IsMaterialize()) { LOG_TRACE("Task not completed"); return; } @@ -376,6 +373,10 @@ void PhysicalSink::FillSinkStateFromLastOperatorState(QueueSinkState *queue_sink queue_sink_state->task_id_, idx, output_data_block_count); + if (task_operator_state->Complete() && !queue_sink_state->IsMaterialize()) { + fragment_data->data_idx_ = None; + } + for (const auto &next_fragment_queue : queue_sink_state->fragment_data_queues_) { next_fragment_queue->Enqueue(fragment_data); } diff --git a/src/executor/operator_state.cpp b/src/executor/operator_state.cpp index 0c439534c7..6c2084bd52 100644 --- a/src/executor/operator_state.cpp +++ b/src/executor/operator_state.cpp @@ -45,7 +45,10 @@ bool QueueSourceState::GetData() { switch (fragment_data_base->type_) { case FragmentDataType::kData: { auto *fragment_data = static_cast(fragment_data_base.get()); - if (fragment_data->data_idx_ + 1 == fragment_data->data_count_) { + if (!fragment_data->data_idx_.has_value()) { + // fragment completed + MarkCompletedTask(fragment_data->fragment_id_); + } else if (fragment_data->data_idx_.value() + 1 == fragment_data->data_count_) { // Get an all data from this MarkCompletedTask(fragment_data->fragment_id_); } @@ -99,6 +102,12 @@ bool QueueSourceState::GetData() { create_index_finish_op_state->input_complete_ = completed; break; } + case PhysicalOperatorType::kMergeLimit: { + MergeLimitOperatorState *limit_op_state = (MergeLimitOperatorState *)next_op_state; + limit_op_state->input_data_blocks_.push_back(Move(fragment_data->data_block_)); + limit_op_state->input_complete_ = completed; + break; + } default: { Error("Not support operator type"); break; diff --git a/src/executor/operator_state.cppm b/src/executor/operator_state.cppm index 80500c5647..49cf8dc43c 100644 --- a/src/executor/operator_state.cppm +++ b/src/executor/operator_state.cppm @@ -31,6 +31,13 @@ export module operator_state; namespace infinity { +export enum class FragmentType { + kInvalid, + kSerialMaterialize, + kParallelMaterialize, + kParallelStream, +}; + export struct OperatorState { inline explicit OperatorState(PhysicalOperatorType operator_type) : operator_type_(operator_type) {} virtual ~OperatorState() = default; @@ -153,6 +160,9 @@ export struct LimitOperatorState : public OperatorState { // Merge Limit export struct MergeLimitOperatorState : public OperatorState { inline explicit MergeLimitOperatorState() : OperatorState(PhysicalOperatorType::kMergeLimit) {} + + Vector> input_data_blocks_{}; // Since merge knn is the first op, no previous operator state. This ptr is to get input data. + bool input_complete_{false}; }; // Merge Top @@ -407,31 +417,34 @@ export enum class SinkStateType { export struct SinkState { virtual ~SinkState() {} - inline explicit SinkState(SinkStateType state_type, u64 fragment_id, u64 task_id) - : fragment_id_(fragment_id), task_id_(task_id), state_type_(state_type) {} + inline explicit SinkState(SinkStateType state_type, u64 fragment_id, FragmentType fragment_type, u64 task_id) + : fragment_id_(fragment_id), task_id_(task_id), fragment_type_(fragment_type), state_type_(state_type) {} inline void SetPrevOpState(OperatorState *prev_op_state) { prev_op_state_ = prev_op_state; } [[nodiscard]] inline SinkStateType state_type() const { return state_type_; } + [[nodiscard]] inline bool IsMaterialize() const { return fragment_type_ == FragmentType::kSerialMaterialize || fragment_type_ == FragmentType::kParallelMaterialize; } + inline bool Error() const { return error_message_.get() != nullptr; } u64 fragment_id_{}; u64 task_id_{}; + FragmentType fragment_type_{}; OperatorState *prev_op_state_{}; SinkStateType state_type_{SinkStateType::kInvalid}; UniquePtr error_message_{}; }; export struct QueueSinkState : public SinkState { - inline explicit QueueSinkState(u64 fragment_id, u64 task_id) : SinkState(SinkStateType::kQueue, fragment_id, task_id) {} + inline explicit QueueSinkState(u64 fragment_id, FragmentType fragment_type, u64 task_id) : SinkState(SinkStateType::kQueue, fragment_id, fragment_type, task_id) {} Vector> data_block_array_{}; Vector> *> fragment_data_queues_; }; export struct MaterializeSinkState : public SinkState { - inline explicit MaterializeSinkState(u64 fragment_id, u64 task_id) : SinkState(SinkStateType::kMaterialize, fragment_id, task_id) {} + inline explicit MaterializeSinkState(u64 fragment_id, FragmentType fragment_type, u64 task_id) : SinkState(SinkStateType::kMaterialize, fragment_id, fragment_type, task_id) {} SharedPtr>> column_types_{}; SharedPtr> column_names_{}; @@ -441,19 +454,19 @@ export struct MaterializeSinkState : public SinkState { }; export struct ResultSinkState : public SinkState { - inline explicit ResultSinkState() : SinkState(SinkStateType::kResult, 0, 0) {} + inline explicit ResultSinkState() : SinkState(SinkStateType::kResult, 0, FragmentType::kInvalid, 0) {} SharedPtr result_def_{}; }; export struct MessageSinkState : public SinkState { - inline explicit MessageSinkState() : SinkState(SinkStateType::kMessage, 0, 0) {} + inline explicit MessageSinkState() : SinkState(SinkStateType::kMessage, 0, FragmentType::kInvalid, 0) {} UniquePtr message_{}; }; export struct SummarySinkState : public SinkState { - inline explicit SummarySinkState() : SinkState(SinkStateType::kSummary, 0, 0), count_(0), sum_(0) {} + inline explicit SummarySinkState() : SinkState(SinkStateType::kSummary, 0, FragmentType::kInvalid, 0), count_(0), sum_(0) {} u64 count_; u64 sum_; diff --git a/src/executor/physical_planner.cpp b/src/executor/physical_planner.cpp index 011e2ae19b..121a400e76 100644 --- a/src/executor/physical_planner.cpp +++ b/src/executor/physical_planner.cpp @@ -638,11 +638,20 @@ UniquePtr PhysicalPlanner::BuildLimit(const SharedPtr logical_limit = static_pointer_cast(logical_operator); UniquePtr input_physical_operator = BuildPhysicalOperator(input_logical_node); - return MakeUnique(logical_operator->node_id(), + auto limit_op = MakeUnique(logical_operator->node_id(), Move(input_physical_operator), logical_limit->limit_expression_, logical_limit->offset_expression_, logical_operator->load_metas()); + if (limit_op->TaskletCount() <= 1) { + return limit_op; + } else { + return MakeUnique(query_context_ptr_->GetNextNodeID(), + Move(limit_op), + logical_limit->limit_expression_, + logical_limit->offset_expression_, + logical_operator->load_metas()); + } } UniquePtr PhysicalPlanner::BuildProjection(const SharedPtr &logical_operator) const { diff --git a/src/planner/bound_select_statement.cpp b/src/planner/bound_select_statement.cpp index 96188bd692..33ee8b5cb0 100644 --- a/src/planner/bound_select_statement.cpp +++ b/src/planner/bound_select_statement.cpp @@ -110,6 +110,12 @@ SharedPtr BoundSelectStatement::BuildPlan(QueryContext *query_conte root = sort; } + if (limit_expression_ != nullptr) { + auto limit = MakeShared(bind_context->GetNewLogicalNodeId(), limit_expression_, offset_expression_); + limit->set_left_node(root); + root = limit; + } + auto project = MakeShared(bind_context->GetNewLogicalNodeId(), projection_expressions_, projection_index_); project->set_left_node(root); root = project; diff --git a/src/planner/optimizer/lazy_load.cpp b/src/planner/optimizer/lazy_load.cpp index baa6a0a584..33a1a976c0 100644 --- a/src/planner/optimizer/lazy_load.cpp +++ b/src/planner/optimizer/lazy_load.cpp @@ -135,6 +135,7 @@ void CleanScan::VisitNode(LogicalNode &op) { match.base_table_ref_->RetainColumnByIndices(Move(project_indices)); break; } + case LogicalNodeType::kLimit: case LogicalNodeType::kFusion: { // Skip VisitNodeChildren(op); diff --git a/src/scheduler/fragment_context.cpp b/src/scheduler/fragment_context.cpp index c2ad399599..93ee5e4578 100644 --- a/src/scheduler/fragment_context.cpp +++ b/src/scheduler/fragment_context.cpp @@ -677,7 +677,6 @@ void FragmentContext::MakeSinkState(i64 parallel_count) { case PhysicalOperatorType::kAggregate: case PhysicalOperatorType::kParallelAggregate: case PhysicalOperatorType::kHash: - case PhysicalOperatorType::kLimit: case PhysicalOperatorType::kTop: { if (fragment_type_ != FragmentType::kParallelStream) { Error(Format("{} should in parallel stream fragment", PhysicalOperatorToString(last_operator->operator_type()))); @@ -688,7 +687,7 @@ void FragmentContext::MakeSinkState(i64 parallel_count) { } for (u64 task_id = 0; (i64)task_id < parallel_count; ++task_id) { - auto sink_state = MakeUnique(fragment_ptr_->FragmentID(), task_id); + auto sink_state = MakeUnique(fragment_ptr_->FragmentID(), fragment_ptr_->GetFragmentType(), task_id); sink_state->column_types_ = last_operator->GetOutputTypes(); sink_state->column_names_ = last_operator->GetOutputNames(); @@ -696,6 +695,22 @@ void FragmentContext::MakeSinkState(i64 parallel_count) { } break; } + case PhysicalOperatorType::kLimit: { + if (fragment_type_ != FragmentType::kParallelStream) { + Error(Format("{} should in parallel stream fragment", PhysicalOperatorToString(last_operator->operator_type()))); + } + + if ((i64)tasks_.size() != parallel_count) { + Error(Format("{} task count isn't correct.", PhysicalOperatorToString(last_operator->operator_type()))); + } + + for (u64 task_id = 0; (i64)task_id < parallel_count; ++task_id) { + auto sink_state = MakeUnique(fragment_ptr_->FragmentID(), fragment_ptr_->GetFragmentType(), task_id); + + tasks_[task_id]->sink_state_ = Move(sink_state); + } + break; + } case PhysicalOperatorType::kMergeParallelAggregate: case PhysicalOperatorType::kMergeHash: case PhysicalOperatorType::kMergeLimit: @@ -711,7 +726,7 @@ void FragmentContext::MakeSinkState(i64 parallel_count) { Error(Format("{} task count isn't correct.", PhysicalOperatorToString(last_operator->operator_type()))); } - tasks_[0]->sink_state_ = MakeUnique(fragment_ptr_->FragmentID(), 0); + tasks_[0]->sink_state_ = MakeUnique(fragment_ptr_->FragmentID(), fragment_ptr_->GetFragmentType(), 0); break; } @@ -726,7 +741,7 @@ void FragmentContext::MakeSinkState(i64 parallel_count) { Error(Format("{} task count isn't correct.", PhysicalOperatorToString(last_operator->operator_type()))); } - tasks_[0]->sink_state_ = MakeUnique(fragment_ptr_->FragmentID(), 0); + tasks_[0]->sink_state_ = MakeUnique(fragment_ptr_->FragmentID(), fragment_ptr_->GetFragmentType(), 0); MaterializeSinkState *sink_state_ptr = static_cast(tasks_[0]->sink_state_.get()); sink_state_ptr->column_types_ = last_operator->GetOutputTypes(); sink_state_ptr->column_names_ = last_operator->GetOutputNames(); @@ -734,7 +749,7 @@ void FragmentContext::MakeSinkState(i64 parallel_count) { } case PhysicalOperatorType::kMatch: { for (u64 task_id = 0; (i64)task_id < parallel_count; ++task_id) { - tasks_[task_id]->sink_state_ = MakeUnique(fragment_ptr_->FragmentID(), task_id); + tasks_[task_id]->sink_state_ = MakeUnique(fragment_ptr_->FragmentID(), fragment_ptr_->GetFragmentType(), task_id); } break; } @@ -752,7 +767,7 @@ void FragmentContext::MakeSinkState(i64 parallel_count) { } for (u64 task_id = 0; (i64)task_id < parallel_count; ++task_id) { - tasks_[task_id]->sink_state_ = MakeUnique(fragment_ptr_->FragmentID(), task_id); + tasks_[task_id]->sink_state_ = MakeUnique(fragment_ptr_->FragmentID(), fragment_ptr_->GetFragmentType(), task_id); } break; } @@ -769,7 +784,7 @@ void FragmentContext::MakeSinkState(i64 parallel_count) { } for (u64 task_id = 0; (i64)task_id < parallel_count; ++task_id) { - tasks_[task_id]->sink_state_ = MakeUnique(fragment_ptr_->FragmentID(), task_id); + tasks_[task_id]->sink_state_ = MakeUnique(fragment_ptr_->FragmentID(), fragment_ptr_->GetFragmentType(), task_id); MaterializeSinkState *sink_state_ptr = static_cast(tasks_[task_id]->sink_state_.get()); sink_state_ptr->column_types_ = last_operator->GetOutputTypes(); sink_state_ptr->column_names_ = last_operator->GetOutputNames(); @@ -782,7 +797,7 @@ void FragmentContext::MakeSinkState(i64 parallel_count) { Error("SerialMaterialize type fragment should only have 1 task."); } - tasks_[0]->sink_state_ = MakeUnique(fragment_ptr_->FragmentID(), 0); + tasks_[0]->sink_state_ = MakeUnique(fragment_ptr_->FragmentID(), fragment_ptr_->GetFragmentType(), 0); MaterializeSinkState *sink_state_ptr = static_cast(tasks_[0]->sink_state_.get()); sink_state_ptr->column_types_ = last_operator->GetOutputTypes(); sink_state_ptr->column_names_ = last_operator->GetOutputNames(); @@ -792,7 +807,7 @@ void FragmentContext::MakeSinkState(i64 parallel_count) { } for (u64 task_id = 0; (i64)task_id < parallel_count; ++task_id) { - tasks_[task_id]->sink_state_ = MakeUnique(fragment_ptr_->FragmentID(), task_id); + tasks_[task_id]->sink_state_ = MakeUnique(fragment_ptr_->FragmentID(), fragment_ptr_->GetFragmentType(), task_id); MaterializeSinkState *sink_state_ptr = static_cast(tasks_[task_id]->sink_state_.get()); sink_state_ptr->column_types_ = last_operator->GetOutputTypes(); sink_state_ptr->column_names_ = last_operator->GetOutputNames(); diff --git a/src/scheduler/fragment_context.cppm b/src/scheduler/fragment_context.cppm index 807c5caf83..f9587d6d12 100644 --- a/src/scheduler/fragment_context.cppm +++ b/src/scheduler/fragment_context.cppm @@ -18,6 +18,7 @@ import stl; import fragment_task; import query_context; import profiler; +import operator_state; import physical_operator; import physical_source; import physical_sink; @@ -32,21 +33,6 @@ namespace infinity { class PlanFragment; -//class KnnScanSharedData; - -// enum class FragmentStatus { -// kNotStart, -// k -// kStart, -// kFinish, -// }; -export enum class FragmentType { - kInvalid, - kSerialMaterialize, - kParallelMaterialize, - kParallelStream, -}; - export class FragmentContext { public: static void diff --git a/src/scheduler/fragment_data.cppm b/src/scheduler/fragment_data.cppm index d8dfcebbdb..bbda608f7a 100644 --- a/src/scheduler/fragment_data.cppm +++ b/src/scheduler/fragment_data.cppm @@ -45,7 +45,7 @@ export struct FragmentError : public FragmentDataBase { export struct FragmentData : public FragmentDataBase { UniquePtr data_block_{}; i64 task_id_{-1}; - SizeT data_idx_{u64_max}; + Optional data_idx_{}; SizeT data_count_{u64_max}; FragmentData(u64 fragment_id, UniquePtr data_block, i64 task_id, SizeT data_idx, SizeT data_count) diff --git a/src/storage/data_block.cpp b/src/storage/data_block.cpp index df1f66d570..d505726d4e 100644 --- a/src/storage/data_block.cpp +++ b/src/storage/data_block.cpp @@ -271,7 +271,7 @@ void DataBlock::AppendWith(const DataBlock *other) { } } -void DataBlock::AppendWith(const SharedPtr &other, SizeT from, SizeT count) { +void DataBlock::AppendWith(const DataBlock *other, SizeT from, SizeT count) { if (other->column_count() != this->column_count()) { Error( Format("Attempt merge block with column count {} into block with column count {}", other->column_count(), this->column_count())); diff --git a/src/storage/data_block.cppm b/src/storage/data_block.cppm index 5e2a3c77ba..96fbbbb5bf 100644 --- a/src/storage/data_block.cppm +++ b/src/storage/data_block.cppm @@ -80,7 +80,7 @@ public: void AppendWith(const DataBlock *other); - void AppendWith(const SharedPtr &other, SizeT from, SizeT count); + void AppendWith(const DataBlock *other, SizeT from, SizeT count); void InsertVector(const SharedPtr &vector, SizeT index); diff --git a/src/storage/txn/txn_store.cpp b/src/storage/txn/txn_store.cpp index 3d3fb53fe1..8c51750f94 100644 --- a/src/storage/txn/txn_store.cpp +++ b/src/storage/txn/txn_store.cpp @@ -64,17 +64,17 @@ UniquePtr TxnTableStore::Append(const SharedPtr &input_block) if (current_block->row_count() + input_block->row_count() > current_block->capacity()) { SizeT to_append = current_block->capacity() - current_block->row_count(); - current_block->AppendWith(input_block, 0, to_append); + current_block->AppendWith(input_block.get(), 0, to_append); current_block->Finalize(); blocks_.emplace_back(DataBlock::Make()); blocks_.back()->Init(column_types); ++current_block_id_; current_block = blocks_[current_block_id_].get(); - current_block->AppendWith(input_block, to_append, input_block->row_count() - to_append); + current_block->AppendWith(input_block.get(), to_append, input_block->row_count() - to_append); } else { SizeT to_append = input_block->row_count(); - current_block->AppendWith(input_block, 0, to_append); + current_block->AppendWith(input_block.get(), 0, to_append); } current_block->Finalize(); diff --git a/tools/generate_limit.py b/tools/generate_limit.py new file mode 100644 index 0000000000..ee9a0750b4 --- /dev/null +++ b/tools/generate_limit.py @@ -0,0 +1,82 @@ +import numpy as np +import random +import os +import argparse + + +def generate(generate_if_exists: bool, copy_dir: str): + row_n = 9000 + limit = 8500 + offset = 20 + dim = 128 + limit_dir = "./test/data/csv" + slt_dir = "./test/sql/dql" + + table_name = "test_limit" + limit_path = limit_dir + "/test_limit.csv" + slt_path = slt_dir + "/limit.slt" + copy_path = copy_dir + "/test_limit.csv" + + os.makedirs(limit_dir, exist_ok=True) + os.makedirs(slt_dir, exist_ok=True) + if os.path.exists(limit_path) and os.path.exists(slt_path) and generate_if_exists: + print( + "File {} and {} already existed exists. Skip Generating.".format( + slt_path, limit_path + ) + ) + return + with open(limit_path, "w") as limit_file, open(slt_path, "w") as slt_file: + slt_file.write("statement ok\n") + slt_file.write("DROP TABLE IF EXISTS {};\n".format(table_name)) + slt_file.write("\n") + slt_file.write("statement ok\n") + slt_file.write( + "CREATE TABLE {} (c1 int, c2 int);\n".format(table_name, dim) + ) + slt_file.write("\n") + slt_file.write("query I\n") + slt_file.write( + "COPY {} FROM '{}' WITH ( DELIMITER ',' );\n".format( + table_name, copy_path + ) + ) + slt_file.write("----\n") + slt_file.write("\n") + slt_file.write("query I\n") + slt_file.write("SELECT * FROM {} limit {} offset {};\n".format(table_name, limit, offset)) + slt_file.write("----\n") + + for _ in range(row_n): + limit_file.write("0,0") + limit_file.write("\n") + + for _ in range(limit): + slt_file.write("0,0") + slt_file.write("\n") + + slt_file.write("\n") + slt_file.write("statement ok\n") + slt_file.write("DROP TABLE {};\n".format(table_name)) + random.random() + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Generate limit data for test") + + parser.add_argument( + "-g", + "--generate", + type=bool, + default=False, + dest="generate_if_exists", + ) + parser.add_argument( + "-c", + "--copy", + type=str, + default="/tmp/infinity/test_data", + dest="copy_dir", + ) + args = parser.parse_args() + generate(args.generate_if_exists, args.copy_dir) diff --git a/tools/sqllogictest.py b/tools/sqllogictest.py index f6bbf53a81..5c49dccfb1 100644 --- a/tools/sqllogictest.py +++ b/tools/sqllogictest.py @@ -5,6 +5,7 @@ from generate_big import generate as generate1 from generate_fvecs import generate as generate2 from generate_sort import generate as generate3 +from generate_limit import generate as generate4 def python_skd_test(python_test_dir: str): @@ -106,6 +107,7 @@ def copy_all(data_dir, copy_dir): generate1(args.generate_if_exists, args.copy) generate2(args.generate_if_exists, args.copy) generate3(args.generate_if_exists, args.copy) + generate4(args.generate_if_exists, args.copy) print("Generate file finshed.") python_skd_test(python_test_dir) test_process(args.path, args.test, args.data, args.copy) From 56f1bc34212c9dc688f0cf09b05a6b9e27553762 Mon Sep 17 00:00:00 2001 From: kould Date: Wed, 27 Dec 2023 17:20:36 +0800 Subject: [PATCH 2/7] test: add limit.slt for limit case --- test/sql/dql/limit.slt | 42 +++++++++++++++++++++++++++++++++++++++++ tools/generate_limit.py | 13 ++++++------- tools/generate_sort.py | 3 +-- 3 files changed, 49 insertions(+), 9 deletions(-) create mode 100644 test/sql/dql/limit.slt diff --git a/test/sql/dql/limit.slt b/test/sql/dql/limit.slt new file mode 100644 index 0000000000..c16bd70e9a --- /dev/null +++ b/test/sql/dql/limit.slt @@ -0,0 +1,42 @@ +statement ok +DROP TABLE IF EXISTS test_limit; + +statement ok +CREATE TABLE test_limit (c1 INTEGER, c2 INTEGER); + +statement ok +INSERT INTO test_limit VALUES(0,1),(2,3),(4,5),(6,7); + +query II +SELECT * FROM test_limit limit 0; +---- + +query II +SELECT * FROM test_limit limit 10; +---- +0 1 +2 3 +4 5 +6 7 + +query II +SELECT * FROM test_limit limit 2; +---- +0 1 +2 3 + +query II +SELECT * FROM test_limit limit 2 offset 0; +---- +0 1 +2 3 + +query II +SELECT * FROM test_limit limit 2 offset 2; +---- +4 5 +6 7 + +query II +SELECT * FROM test_limit limit 2 offset 4; +---- \ No newline at end of file diff --git a/tools/generate_limit.py b/tools/generate_limit.py index ee9a0750b4..8a87418020 100644 --- a/tools/generate_limit.py +++ b/tools/generate_limit.py @@ -8,14 +8,13 @@ def generate(generate_if_exists: bool, copy_dir: str): row_n = 9000 limit = 8500 offset = 20 - dim = 128 limit_dir = "./test/data/csv" slt_dir = "./test/sql/dql" - table_name = "test_limit" - limit_path = limit_dir + "/test_limit.csv" - slt_path = slt_dir + "/limit.slt" - copy_path = copy_dir + "/test_limit.csv" + table_name = "test_big_limit" + limit_path = limit_dir + "/test_big_limit.csv" + slt_path = slt_dir + "/big_limit.slt" + copy_path = copy_dir + "/test_big_limit.csv" os.makedirs(limit_dir, exist_ok=True) os.makedirs(slt_dir, exist_ok=True) @@ -32,7 +31,7 @@ def generate(generate_if_exists: bool, copy_dir: str): slt_file.write("\n") slt_file.write("statement ok\n") slt_file.write( - "CREATE TABLE {} (c1 int, c2 int);\n".format(table_name, dim) + "CREATE TABLE {} (c1 int, c2 int);\n".format(table_name) ) slt_file.write("\n") slt_file.write("query I\n") @@ -52,7 +51,7 @@ def generate(generate_if_exists: bool, copy_dir: str): limit_file.write("\n") for _ in range(limit): - slt_file.write("0,0") + slt_file.write("0 0") slt_file.write("\n") slt_file.write("\n") diff --git a/tools/generate_sort.py b/tools/generate_sort.py index 256536df2c..f605438d2c 100644 --- a/tools/generate_sort.py +++ b/tools/generate_sort.py @@ -6,7 +6,6 @@ def generate(generate_if_exists: bool, copy_dir: str): row_n = 9000 - dim = 128 sort_dir = "./test/data/csv" slt_dir = "./test/sql/dql" @@ -30,7 +29,7 @@ def generate(generate_if_exists: bool, copy_dir: str): slt_file.write("\n") slt_file.write("statement ok\n") slt_file.write( - "CREATE TABLE {} (c1 int, c2 int);\n".format(table_name, dim) + "CREATE TABLE {} (c1 int, c2 int);\n".format(table_name) ) slt_file.write("\n") slt_file.write("query I\n") From d2dc462e6684022931c20071cf93464f40a16e78 Mon Sep 17 00:00:00 2001 From: kould Date: Wed, 27 Dec 2023 20:02:32 +0800 Subject: [PATCH 3/7] fix conflict --- src/executor/operator_state.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/src/executor/operator_state.cpp b/src/executor/operator_state.cpp index 6c2084bd52..ebb4e60925 100644 --- a/src/executor/operator_state.cpp +++ b/src/executor/operator_state.cpp @@ -103,6 +103,7 @@ bool QueueSourceState::GetData() { break; } case PhysicalOperatorType::kMergeLimit: { + auto *fragment_data = static_cast(fragment_data_base.get()); MergeLimitOperatorState *limit_op_state = (MergeLimitOperatorState *)next_op_state; limit_op_state->input_data_blocks_.push_back(Move(fragment_data->data_block_)); limit_op_state->input_complete_ = completed; From f1356d3b05c4d03f73a0dbe3ce3a112dd58d8631 Mon Sep 17 00:00:00 2001 From: kould Date: Wed, 27 Dec 2023 19:50:09 +0800 Subject: [PATCH 4/7] perf: Implement MergeLimit to end the upstream operator early --- src/common/blocking_queue.cppm | 19 +++++++++++++++++-- src/executor/operator/physical_sink.cpp | 11 ++++++++--- src/executor/operator_state.cpp | 7 ++++++- 3 files changed, 31 insertions(+), 6 deletions(-) diff --git a/src/common/blocking_queue.cppm b/src/common/blocking_queue.cppm index d4a203db00..bc933c4eaa 100644 --- a/src/common/blocking_queue.cppm +++ b/src/common/blocking_queue.cppm @@ -26,18 +26,32 @@ class BlockingQueue { public: explicit BlockingQueue(SizeT capacity = DEFAULT_BLOCKING_QUEUE_SIZE) : capacity_(capacity) {} - void Enqueue(T& task) { + void NotAllowEnqueue() { + allow_enqueue_ = false; + } + + bool Enqueue(T& task) { UniqueLock lock(queue_mutex_); + + if (!allow_enqueue_) { + return false; + } full_cv_.wait(lock, [this] { return queue_.size() < capacity_; }); queue_.push_back(task); empty_cv_.notify_one(); + return true; } - void Enqueue(T&& task) { + bool Enqueue(T&& task) { UniqueLock lock(queue_mutex_); + + if (!allow_enqueue_) { + return false; + } full_cv_.wait(lock, [this] { return queue_.size() < capacity_; }); queue_.push_back(Forward(task)); empty_cv_.notify_one(); + return true; } void EnqueueBulk(List &input_queue) { @@ -99,6 +113,7 @@ public: } protected: + atomic_bool allow_enqueue_{true}; mutable Mutex queue_mutex_{}; CondVar full_cv_{}; CondVar empty_cv_{}; diff --git a/src/executor/operator/physical_sink.cpp b/src/executor/operator/physical_sink.cpp index b60a06c560..3ab60fe5bd 100644 --- a/src/executor/operator/physical_sink.cpp +++ b/src/executor/operator/physical_sink.cpp @@ -115,7 +115,7 @@ void PhysicalSink::FillSinkStateFromLastOperatorState(MaterializeSinkState *mate case PhysicalOperatorType::kSort: { SortOperatorState *sort_output_state = static_cast(task_op_state); if (sort_output_state->data_block_array_.empty()) { - if(materialize_sink_state->Error()) { + if (materialize_sink_state->Error()) { materialize_sink_state->empty_result_ = true; } else { Error("Empty sort output"); @@ -131,7 +131,7 @@ void PhysicalSink::FillSinkStateFromLastOperatorState(MaterializeSinkState *mate case PhysicalOperatorType::kAggregate: { AggregateOperatorState *agg_output_state = static_cast(task_op_state); if (agg_output_state->data_block_array_.empty()) { - if(materialize_sink_state->Error()) { + if (materialize_sink_state->Error()) { materialize_sink_state->empty_result_ = true; } else { Error("Empty agg output"); @@ -378,7 +378,12 @@ void PhysicalSink::FillSinkStateFromLastOperatorState(QueueSinkState *queue_sink } for (const auto &next_fragment_queue : queue_sink_state->fragment_data_queues_) { - next_fragment_queue->Enqueue(fragment_data); + // when the Enqueue returns false, + // it means that the downstream has collected enough data, + // preventing the Queue from Enqueue in data again to avoid redundant calculations. + if (!next_fragment_queue->Enqueue(fragment_data)) { + task_operator_state->SetComplete(); + } } } task_operator_state->data_block_array_.clear(); diff --git a/src/executor/operator_state.cpp b/src/executor/operator_state.cpp index ebb4e60925..7f8967dabd 100644 --- a/src/executor/operator_state.cpp +++ b/src/executor/operator_state.cpp @@ -106,7 +106,12 @@ bool QueueSourceState::GetData() { auto *fragment_data = static_cast(fragment_data_base.get()); MergeLimitOperatorState *limit_op_state = (MergeLimitOperatorState *)next_op_state; limit_op_state->input_data_blocks_.push_back(Move(fragment_data->data_block_)); - limit_op_state->input_complete_ = completed; + if (!limit_op_state->input_complete_) { + limit_op_state->input_complete_ = completed; + } + if (limit_op_state->input_complete_) { + source_queue_.NotAllowEnqueue(); + } break; } default: { From d65df9d2cf13a973b8806267e0cefa006b64c6ed Mon Sep 17 00:00:00 2001 From: kould Date: Wed, 27 Dec 2023 20:19:10 +0800 Subject: [PATCH 5/7] code fmt --- src/common/blocking_queue.cppm | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/common/blocking_queue.cppm b/src/common/blocking_queue.cppm index bc933c4eaa..7cfaf0eb68 100644 --- a/src/common/blocking_queue.cppm +++ b/src/common/blocking_queue.cppm @@ -31,11 +31,11 @@ public: } bool Enqueue(T& task) { - UniqueLock lock(queue_mutex_); - if (!allow_enqueue_) { return false; } + + UniqueLock lock(queue_mutex_); full_cv_.wait(lock, [this] { return queue_.size() < capacity_; }); queue_.push_back(task); empty_cv_.notify_one(); @@ -43,11 +43,11 @@ public: } bool Enqueue(T&& task) { - UniqueLock lock(queue_mutex_); - if (!allow_enqueue_) { return false; } + + UniqueLock lock(queue_mutex_); full_cv_.wait(lock, [this] { return queue_.size() < capacity_; }); queue_.push_back(Forward(task)); empty_cv_.notify_one(); From e8ba79ae5e3337cbedaa8dd79f4ec4cf39b153ee Mon Sep 17 00:00:00 2001 From: kould Date: Thu, 28 Dec 2023 12:08:03 +0800 Subject: [PATCH 6/7] fix: the limit quantity is incorrect. When executing `SELECT * FROM test_big_limit limit 8500 offset 20;` when 9000 pieces of data are executed --- src/executor/fragment/plan_fragment.cppm | 1 - src/executor/fragment_builder.cpp | 1 - src/executor/operator/physical_filter.cppm | 3 +- src/executor/operator/physical_limit.cpp | 17 +++------ src/executor/operator/physical_limit.cppm | 12 +++--- .../operator/physical_merge_limit.cpp | 8 +++- .../operator/physical_merge_limit.cppm | 2 +- src/executor/operator/physical_project.cppm | 3 +- src/executor/operator/physical_sink.cpp | 10 ++--- src/executor/operator/physical_sink.cppm | 6 ++- src/executor/operator/physical_sort.cppm | 3 +- src/executor/operator_state.cppm | 24 ++++-------- src/executor/physical_planner.cpp | 37 ++++++++++++------- src/scheduler/fragment_context.cpp | 18 ++++----- src/scheduler/fragment_context.cppm | 21 ++++++++++- src/scheduler/fragment_task.cpp | 2 +- test/sql/dql/limit.slt | 13 +++++++ 17 files changed, 105 insertions(+), 76 deletions(-) diff --git a/src/executor/fragment/plan_fragment.cppm b/src/executor/fragment/plan_fragment.cppm index 1eefcfce62..374766f9e3 100644 --- a/src/executor/fragment/plan_fragment.cppm +++ b/src/executor/fragment/plan_fragment.cppm @@ -18,7 +18,6 @@ import stl; import parser; import data_table; import fragment_context; -import operator_state; import physical_operator; import physical_source; import physical_sink; diff --git a/src/executor/fragment_builder.cpp b/src/executor/fragment_builder.cpp index 41a529b173..cf69d1d76b 100644 --- a/src/executor/fragment_builder.cpp +++ b/src/executor/fragment_builder.cpp @@ -22,7 +22,6 @@ import physical_sink; import physical_source; import physical_explain; import physical_knn_scan; -import operator_state; import infinity_exception; import parser; diff --git a/src/executor/operator/physical_filter.cppm b/src/executor/operator/physical_filter.cppm index 5d4c15e661..60a0982a69 100644 --- a/src/executor/operator/physical_filter.cppm +++ b/src/executor/operator/physical_filter.cppm @@ -47,8 +47,7 @@ public: inline SharedPtr>> GetOutputTypes() const final { return left_->GetOutputTypes(); } SizeT TaskletCount() override { - Error("TaskletCount not Implement"); - return 0; + return left_->TaskletCount(); } inline const SharedPtr &condition() const { return condition_; } diff --git a/src/executor/operator/physical_limit.cpp b/src/executor/operator/physical_limit.cpp index 35a2581a17..b073e799fb 100644 --- a/src/executor/operator/physical_limit.cpp +++ b/src/executor/operator/physical_limit.cpp @@ -40,7 +40,7 @@ namespace infinity { SizeT AtomicCounter::Offset(SizeT row_count) { auto success = false; - SizeT result; + SizeT result = 0; while (!success) { i64 current_offset = offset_; @@ -63,7 +63,7 @@ SizeT AtomicCounter::Offset(SizeT row_count) { SizeT AtomicCounter::Limit(SizeT row_count) { auto success = false; - SizeT result; + SizeT result = 0; while (!success) { i64 current_limit = limit_; @@ -150,24 +150,18 @@ PhysicalLimit::PhysicalLimit(u64 id, offset = (static_pointer_cast(offset_expr_))->GetValue().value_.big_int; } - counter_ = MakeShared(offset, limit); + counter_ = MakeUnique(offset, limit); } void PhysicalLimit::Init() {} -SizeT PhysicalLimit::TaskletCount() { - i64 limit = (static_pointer_cast(limit_expr_))->GetValue().value_.big_int; - - return limit / DEFAULT_BLOCK_CAPACITY; -} - // offset limit + offset // left right // | a | b | c | d | e | f bool PhysicalLimit::Execute(QueryContext *query_context, const Vector> &input_blocks, Vector> &output_blocks, - SharedPtr counter) { + LimitCounter *counter) { SizeT input_row_count = 0; for (SizeT block_id = 0; block_id < input_blocks.size(); block_id++) { @@ -190,7 +184,6 @@ bool PhysicalLimit::Execute(QueryContext *query_context, if (offset > max_offset) { offset -= max_offset; - continue; } else { block_start_idx = block_id; break; @@ -226,7 +219,7 @@ bool PhysicalLimit::Execute(QueryContext *query_context, } bool PhysicalLimit::Execute(QueryContext *query_context, OperatorState *operator_state) { - auto result = Execute(query_context, operator_state->prev_op_state_->data_block_array_, operator_state->data_block_array_, counter_); + auto result = Execute(query_context, operator_state->prev_op_state_->data_block_array_, operator_state->data_block_array_, counter_.get()); operator_state->prev_op_state_->data_block_array_.clear(); if (counter_->IsLimitOver() || operator_state->prev_op_state_->Complete()) { diff --git a/src/executor/operator/physical_limit.cppm b/src/executor/operator/physical_limit.cppm index d793638ccb..9496fe3c10 100644 --- a/src/executor/operator/physical_limit.cppm +++ b/src/executor/operator/physical_limit.cppm @@ -45,7 +45,7 @@ public: export class AtomicCounter : public LimitCounter { public: - AtomicCounter(const i64 &offset, const i64 &limit) : offset_(offset), limit_(limit) {} + AtomicCounter(i64 offset, i64 limit) : offset_(offset), limit_(limit) {} SizeT Offset(SizeT row_count); @@ -60,7 +60,7 @@ private: export class UnSyncCounter : public LimitCounter { public: - UnSyncCounter(const i64 &offset, const i64 &limit) : offset_(offset), limit_(limit) {} + UnSyncCounter(i64 offset, i64 limit) : offset_(offset), limit_(limit) {} SizeT Offset(SizeT row_count); @@ -88,7 +88,7 @@ public: static bool Execute(QueryContext *query_context, const Vector> &input_blocks, Vector> &output_blocks, - SharedPtr counter); + LimitCounter *counter); bool Execute(QueryContext *query_context, OperatorState *operator_state) final; @@ -96,7 +96,9 @@ public: inline SharedPtr>> GetOutputTypes() const final { return left_->GetOutputTypes(); } - SizeT TaskletCount() override; + SizeT TaskletCount() override { + return left_->TaskletCount(); + } inline const SharedPtr &limit_expr() const { return limit_expr_; } @@ -106,7 +108,7 @@ private: SharedPtr limit_expr_{}; SharedPtr offset_expr_{}; - SharedPtr counter_; + UniquePtr counter_; }; } // namespace infinity diff --git a/src/executor/operator/physical_merge_limit.cpp b/src/executor/operator/physical_merge_limit.cpp index 3095277391..3870f24421 100644 --- a/src/executor/operator/physical_merge_limit.cpp +++ b/src/executor/operator/physical_merge_limit.cpp @@ -42,14 +42,18 @@ PhysicalMergeLimit::PhysicalMergeLimit(u64 id, if (offset_expr_ != nullptr) { offset = (static_pointer_cast(offset_expr_))->GetValue().value_.big_int; } - counter_ = MakeShared(offset, limit); + counter_ = MakeUnique(offset, limit); } void PhysicalMergeLimit::Init() {} bool PhysicalMergeLimit::Execute(QueryContext *query_context, OperatorState *operator_state) { MergeLimitOperatorState *limit_op_state = (MergeLimitOperatorState *)operator_state; - auto result = PhysicalLimit::Execute(query_context, limit_op_state->input_data_blocks_, limit_op_state->data_block_array_, counter_); + + if (limit_op_state->input_data_blocks_.empty()) { + return false; + } + auto result = PhysicalLimit::Execute(query_context, limit_op_state->input_data_blocks_, limit_op_state->data_block_array_, counter_.get()); if (counter_->IsLimitOver() || limit_op_state->input_complete_) { limit_op_state->input_complete_ = true; diff --git a/src/executor/operator/physical_merge_limit.cppm b/src/executor/operator/physical_merge_limit.cppm index fbae336ca2..0db34acb18 100644 --- a/src/executor/operator/physical_merge_limit.cppm +++ b/src/executor/operator/physical_merge_limit.cppm @@ -56,7 +56,7 @@ private: SharedPtr limit_expr_{}; SharedPtr offset_expr_{}; - SharedPtr counter_; + UniquePtr counter_; }; } // namespace infinity diff --git a/src/executor/operator/physical_project.cppm b/src/executor/operator/physical_project.cppm index 89db6b19da..101d36b77c 100644 --- a/src/executor/operator/physical_project.cppm +++ b/src/executor/operator/physical_project.cppm @@ -49,8 +49,7 @@ public: SharedPtr>> GetOutputTypes() const final; SizeT TaskletCount() override { - Error("TaskletCount not Implement"); - return 0; + return left_->TaskletCount(); } Vector> expressions_{}; diff --git a/src/executor/operator/physical_sink.cpp b/src/executor/operator/physical_sink.cpp index 3ab60fe5bd..f753d2caad 100644 --- a/src/executor/operator/physical_sink.cpp +++ b/src/executor/operator/physical_sink.cpp @@ -36,7 +36,7 @@ void PhysicalSink::Init() {} bool PhysicalSink::Execute(QueryContext *, OperatorState *) { return true; } -bool PhysicalSink::Execute(QueryContext *, SinkState *sink_state) { +bool PhysicalSink::Execute(QueryContext *, FragmentContext *fragment_context, SinkState *sink_state) { switch (sink_state->state_type_) { case SinkStateType::kInvalid: { Error("Invalid sinker type"); @@ -68,7 +68,7 @@ bool PhysicalSink::Execute(QueryContext *, SinkState *sink_state) { } case SinkStateType::kQueue: { QueueSinkState *queue_sink_state = static_cast(sink_state); - FillSinkStateFromLastOperatorState(queue_sink_state, queue_sink_state->prev_op_state_); + FillSinkStateFromLastOperatorState(fragment_context, queue_sink_state, queue_sink_state->prev_op_state_); break; } } @@ -345,7 +345,7 @@ void PhysicalSink::FillSinkStateFromLastOperatorState(MessageSinkState *message_ } } -void PhysicalSink::FillSinkStateFromLastOperatorState(QueueSinkState *queue_sink_state, OperatorState *task_operator_state) { +void PhysicalSink::FillSinkStateFromLastOperatorState(FragmentContext *fragment_context, QueueSinkState *queue_sink_state, OperatorState *task_operator_state) { if (queue_sink_state->error_message_.get() != nullptr) { LOG_TRACE(Format("Error: {} is sent to notify next fragment", *queue_sink_state->error_message_)); auto fragment_error = MakeShared(queue_sink_state->fragment_id_, MakeUnique(*queue_sink_state->error_message_)); @@ -355,7 +355,7 @@ void PhysicalSink::FillSinkStateFromLastOperatorState(QueueSinkState *queue_sink return; } - if (!task_operator_state->Complete() && queue_sink_state->IsMaterialize()) { + if (!task_operator_state->Complete() && fragment_context->IsMaterialize()) { LOG_TRACE("Task not completed"); return; } @@ -373,7 +373,7 @@ void PhysicalSink::FillSinkStateFromLastOperatorState(QueueSinkState *queue_sink queue_sink_state->task_id_, idx, output_data_block_count); - if (task_operator_state->Complete() && !queue_sink_state->IsMaterialize()) { + if (task_operator_state->Complete() && !fragment_context->IsMaterialize()) { fragment_data->data_idx_ = None; } diff --git a/src/executor/operator/physical_sink.cppm b/src/executor/operator/physical_sink.cppm index fb5379711d..51a6f721af 100644 --- a/src/executor/operator/physical_sink.cppm +++ b/src/executor/operator/physical_sink.cppm @@ -27,6 +27,8 @@ export module physical_sink; namespace infinity { +class FragmentContext; + export enum class SinkType { kInvalid, kLocalQueue, @@ -46,7 +48,7 @@ public: bool Execute(QueryContext *query_context, OperatorState *output_state) final; - bool Execute(QueryContext *query_context, SinkState *sink_state); + bool Execute(QueryContext *query_context, FragmentContext *fragment_context, SinkState *sink_state); inline SharedPtr> GetOutputNames() const final { return output_names_; } @@ -68,7 +70,7 @@ private: void FillSinkStateFromLastOperatorState(SummarySinkState *message_sink_state, OperatorState *task_operator_state); - void FillSinkStateFromLastOperatorState(QueueSinkState *queue_sink_state, OperatorState *task_operator_state); + void FillSinkStateFromLastOperatorState(FragmentContext *fragment_context, QueueSinkState *queue_sink_state, OperatorState *task_operator_state); private: SharedPtr> output_names_{}; diff --git a/src/executor/operator/physical_sort.cppm b/src/executor/operator/physical_sort.cppm index 09e121edf6..e556ce0265 100644 --- a/src/executor/operator/physical_sort.cppm +++ b/src/executor/operator/physical_sort.cppm @@ -57,8 +57,7 @@ public: inline SharedPtr>> GetOutputTypes() const final { return left_->GetOutputTypes(); } SizeT TaskletCount() override { - Error("TaskletCount not Implement"); - return 0; + return left_->TaskletCount(); } Vector> expressions_; diff --git a/src/executor/operator_state.cppm b/src/executor/operator_state.cppm index 49cf8dc43c..a5b2515027 100644 --- a/src/executor/operator_state.cppm +++ b/src/executor/operator_state.cppm @@ -31,13 +31,6 @@ export module operator_state; namespace infinity { -export enum class FragmentType { - kInvalid, - kSerialMaterialize, - kParallelMaterialize, - kParallelStream, -}; - export struct OperatorState { inline explicit OperatorState(PhysicalOperatorType operator_type) : operator_type_(operator_type) {} virtual ~OperatorState() = default; @@ -417,34 +410,31 @@ export enum class SinkStateType { export struct SinkState { virtual ~SinkState() {} - inline explicit SinkState(SinkStateType state_type, u64 fragment_id, FragmentType fragment_type, u64 task_id) - : fragment_id_(fragment_id), task_id_(task_id), fragment_type_(fragment_type), state_type_(state_type) {} + inline explicit SinkState(SinkStateType state_type, u64 fragment_id, u64 task_id) + : fragment_id_(fragment_id), task_id_(task_id), state_type_(state_type) {} inline void SetPrevOpState(OperatorState *prev_op_state) { prev_op_state_ = prev_op_state; } [[nodiscard]] inline SinkStateType state_type() const { return state_type_; } - [[nodiscard]] inline bool IsMaterialize() const { return fragment_type_ == FragmentType::kSerialMaterialize || fragment_type_ == FragmentType::kParallelMaterialize; } - inline bool Error() const { return error_message_.get() != nullptr; } u64 fragment_id_{}; u64 task_id_{}; - FragmentType fragment_type_{}; OperatorState *prev_op_state_{}; SinkStateType state_type_{SinkStateType::kInvalid}; UniquePtr error_message_{}; }; export struct QueueSinkState : public SinkState { - inline explicit QueueSinkState(u64 fragment_id, FragmentType fragment_type, u64 task_id) : SinkState(SinkStateType::kQueue, fragment_id, fragment_type, task_id) {} + inline explicit QueueSinkState(u64 fragment_id, u64 task_id) : SinkState(SinkStateType::kQueue, fragment_id, task_id) {} Vector> data_block_array_{}; Vector> *> fragment_data_queues_; }; export struct MaterializeSinkState : public SinkState { - inline explicit MaterializeSinkState(u64 fragment_id, FragmentType fragment_type, u64 task_id) : SinkState(SinkStateType::kMaterialize, fragment_id, fragment_type, task_id) {} + inline explicit MaterializeSinkState(u64 fragment_id, u64 task_id) : SinkState(SinkStateType::kMaterialize, fragment_id, task_id) {} SharedPtr>> column_types_{}; SharedPtr> column_names_{}; @@ -454,19 +444,19 @@ export struct MaterializeSinkState : public SinkState { }; export struct ResultSinkState : public SinkState { - inline explicit ResultSinkState() : SinkState(SinkStateType::kResult, 0, FragmentType::kInvalid, 0) {} + inline explicit ResultSinkState() : SinkState(SinkStateType::kResult, 0, 0) {} SharedPtr result_def_{}; }; export struct MessageSinkState : public SinkState { - inline explicit MessageSinkState() : SinkState(SinkStateType::kMessage, 0, FragmentType::kInvalid, 0) {} + inline explicit MessageSinkState() : SinkState(SinkStateType::kMessage, 0, 0) {} UniquePtr message_{}; }; export struct SummarySinkState : public SinkState { - inline explicit SummarySinkState() : SinkState(SinkStateType::kSummary, 0, FragmentType::kInvalid, 0), count_(0), sum_(0) {} + inline explicit SummarySinkState() : SinkState(SinkStateType::kSummary, 0, 0), count_(0), sum_(0) {} u64 count_; u64 sum_; diff --git a/src/executor/physical_planner.cpp b/src/executor/physical_planner.cpp index 121a400e76..eba2599e84 100644 --- a/src/executor/physical_planner.cpp +++ b/src/executor/physical_planner.cpp @@ -115,6 +115,8 @@ import logical_match; import logical_fusion; import parser; +import value; +import value_expression; import explain_physical_plan; import infinity_exception; @@ -543,11 +545,11 @@ UniquePtr PhysicalPlanner::BuildAggregate(const SharedPtr(query_context_ptr_->GetNextNodeID(), - logical_aggregate->base_table_ref_, - Move(physical_agg_op), - logical_aggregate->GetOutputNames(), - logical_aggregate->GetOutputTypes(), - logical_operator->load_metas()); + logical_aggregate->base_table_ref_, + Move(physical_agg_op), + logical_aggregate->GetOutputNames(), + logical_aggregate->GetOutputTypes(), + logical_operator->load_metas()); } } @@ -638,16 +640,25 @@ UniquePtr PhysicalPlanner::BuildLimit(const SharedPtr logical_limit = static_pointer_cast(logical_operator); UniquePtr input_physical_operator = BuildPhysicalOperator(input_logical_node); - auto limit_op = MakeUnique(logical_operator->node_id(), - Move(input_physical_operator), - logical_limit->limit_expression_, - logical_limit->offset_expression_, - logical_operator->load_metas()); - if (limit_op->TaskletCount() <= 1) { - return limit_op; + if (input_physical_operator->TaskletCount() <= 1) { + return MakeUnique(logical_operator->node_id(), + Move(input_physical_operator), + logical_limit->limit_expression_, + logical_limit->offset_expression_, + logical_operator->load_metas()); } else { + i64 child_limit = (static_pointer_cast(logical_limit->limit_expression_))->GetValue().value_.big_int; + + if (logical_limit->offset_expression_ != nullptr) { + child_limit += (static_pointer_cast(logical_limit->offset_expression_))->GetValue().value_.big_int; + } + auto child_limit_op = MakeUnique(logical_operator->node_id(), + Move(input_physical_operator), + MakeShared(Value::MakeBigInt(child_limit)), + nullptr, + logical_operator->load_metas()); return MakeUnique(query_context_ptr_->GetNextNodeID(), - Move(limit_op), + Move(child_limit_op), logical_limit->limit_expression_, logical_limit->offset_expression_, logical_operator->load_metas()); diff --git a/src/scheduler/fragment_context.cpp b/src/scheduler/fragment_context.cpp index 93ee5e4578..f0c890723c 100644 --- a/src/scheduler/fragment_context.cpp +++ b/src/scheduler/fragment_context.cpp @@ -687,7 +687,7 @@ void FragmentContext::MakeSinkState(i64 parallel_count) { } for (u64 task_id = 0; (i64)task_id < parallel_count; ++task_id) { - auto sink_state = MakeUnique(fragment_ptr_->FragmentID(), fragment_ptr_->GetFragmentType(), task_id); + auto sink_state = MakeUnique(fragment_ptr_->FragmentID(), task_id); sink_state->column_types_ = last_operator->GetOutputTypes(); sink_state->column_names_ = last_operator->GetOutputNames(); @@ -705,7 +705,7 @@ void FragmentContext::MakeSinkState(i64 parallel_count) { } for (u64 task_id = 0; (i64)task_id < parallel_count; ++task_id) { - auto sink_state = MakeUnique(fragment_ptr_->FragmentID(), fragment_ptr_->GetFragmentType(), task_id); + auto sink_state = MakeUnique(fragment_ptr_->FragmentID(), task_id); tasks_[task_id]->sink_state_ = Move(sink_state); } @@ -726,7 +726,7 @@ void FragmentContext::MakeSinkState(i64 parallel_count) { Error(Format("{} task count isn't correct.", PhysicalOperatorToString(last_operator->operator_type()))); } - tasks_[0]->sink_state_ = MakeUnique(fragment_ptr_->FragmentID(), fragment_ptr_->GetFragmentType(), 0); + tasks_[0]->sink_state_ = MakeUnique(fragment_ptr_->FragmentID(), 0); break; } @@ -741,7 +741,7 @@ void FragmentContext::MakeSinkState(i64 parallel_count) { Error(Format("{} task count isn't correct.", PhysicalOperatorToString(last_operator->operator_type()))); } - tasks_[0]->sink_state_ = MakeUnique(fragment_ptr_->FragmentID(), fragment_ptr_->GetFragmentType(), 0); + tasks_[0]->sink_state_ = MakeUnique(fragment_ptr_->FragmentID(), 0); MaterializeSinkState *sink_state_ptr = static_cast(tasks_[0]->sink_state_.get()); sink_state_ptr->column_types_ = last_operator->GetOutputTypes(); sink_state_ptr->column_names_ = last_operator->GetOutputNames(); @@ -749,7 +749,7 @@ void FragmentContext::MakeSinkState(i64 parallel_count) { } case PhysicalOperatorType::kMatch: { for (u64 task_id = 0; (i64)task_id < parallel_count; ++task_id) { - tasks_[task_id]->sink_state_ = MakeUnique(fragment_ptr_->FragmentID(), fragment_ptr_->GetFragmentType(), task_id); + tasks_[task_id]->sink_state_ = MakeUnique(fragment_ptr_->FragmentID(), task_id); } break; } @@ -767,7 +767,7 @@ void FragmentContext::MakeSinkState(i64 parallel_count) { } for (u64 task_id = 0; (i64)task_id < parallel_count; ++task_id) { - tasks_[task_id]->sink_state_ = MakeUnique(fragment_ptr_->FragmentID(), fragment_ptr_->GetFragmentType(), task_id); + tasks_[task_id]->sink_state_ = MakeUnique(fragment_ptr_->FragmentID(), task_id); } break; } @@ -784,7 +784,7 @@ void FragmentContext::MakeSinkState(i64 parallel_count) { } for (u64 task_id = 0; (i64)task_id < parallel_count; ++task_id) { - tasks_[task_id]->sink_state_ = MakeUnique(fragment_ptr_->FragmentID(), fragment_ptr_->GetFragmentType(), task_id); + tasks_[task_id]->sink_state_ = MakeUnique(fragment_ptr_->FragmentID(), task_id); MaterializeSinkState *sink_state_ptr = static_cast(tasks_[task_id]->sink_state_.get()); sink_state_ptr->column_types_ = last_operator->GetOutputTypes(); sink_state_ptr->column_names_ = last_operator->GetOutputNames(); @@ -797,7 +797,7 @@ void FragmentContext::MakeSinkState(i64 parallel_count) { Error("SerialMaterialize type fragment should only have 1 task."); } - tasks_[0]->sink_state_ = MakeUnique(fragment_ptr_->FragmentID(), fragment_ptr_->GetFragmentType(), 0); + tasks_[0]->sink_state_ = MakeUnique(fragment_ptr_->FragmentID(), 0); MaterializeSinkState *sink_state_ptr = static_cast(tasks_[0]->sink_state_.get()); sink_state_ptr->column_types_ = last_operator->GetOutputTypes(); sink_state_ptr->column_names_ = last_operator->GetOutputNames(); @@ -807,7 +807,7 @@ void FragmentContext::MakeSinkState(i64 parallel_count) { } for (u64 task_id = 0; (i64)task_id < parallel_count; ++task_id) { - tasks_[task_id]->sink_state_ = MakeUnique(fragment_ptr_->FragmentID(), fragment_ptr_->GetFragmentType(), task_id); + tasks_[task_id]->sink_state_ = MakeUnique(fragment_ptr_->FragmentID(), task_id); MaterializeSinkState *sink_state_ptr = static_cast(tasks_[task_id]->sink_state_.get()); sink_state_ptr->column_types_ = last_operator->GetOutputTypes(); sink_state_ptr->column_names_ = last_operator->GetOutputNames(); diff --git a/src/scheduler/fragment_context.cppm b/src/scheduler/fragment_context.cppm index f9587d6d12..1ff604e58c 100644 --- a/src/scheduler/fragment_context.cppm +++ b/src/scheduler/fragment_context.cppm @@ -18,7 +18,6 @@ import stl; import fragment_task; import query_context; import profiler; -import operator_state; import physical_operator; import physical_source; import physical_sink; @@ -33,6 +32,23 @@ namespace infinity { class PlanFragment; +//class KnnScanSharedData; + +// enum class FragmentStatus { +// kNotStart, +// k +// kStart, +// kFinish, +// }; +export enum class FragmentType { + kInvalid, + kSerialMaterialize, + kParallelMaterialize, + kParallelStream, +}; + +class PlanFragment; + export class FragmentContext { public: static void @@ -64,6 +80,9 @@ public: inline Vector> &Tasks() { return tasks_; } + [[nodiscard]] inline bool IsMaterialize() const { return fragment_type_ == FragmentType::kSerialMaterialize || fragment_type_ == FragmentType::kParallelMaterialize; } + + inline SharedPtr GetResult() { UniqueLock lk(locker_); cv_.wait(lk, [&] { return completed_; }); diff --git a/src/scheduler/fragment_task.cpp b/src/scheduler/fragment_task.cpp index 377ef8b7bc..a7e544d2bd 100644 --- a/src/scheduler/fragment_task.cpp +++ b/src/scheduler/fragment_task.cpp @@ -100,7 +100,7 @@ void FragmentTask::OnExecute(i64) { if (execute_success or sink_state_->error_message_.get() != nullptr) { PhysicalSink *sink_op = fragment_context->GetSinkOperator(); - sink_op->Execute(query_context, sink_state_.get()); + sink_op->Execute(query_context, fragment_context, sink_state_.get()); } } diff --git a/test/sql/dql/limit.slt b/test/sql/dql/limit.slt index c16bd70e9a..eafcc55e49 100644 --- a/test/sql/dql/limit.slt +++ b/test/sql/dql/limit.slt @@ -31,12 +31,25 @@ SELECT * FROM test_limit limit 2 offset 0; 0 1 2 3 +query II +SELECT * FROM test_limit where c1 > 0 limit 2 offset 1; +---- +4 5 +6 7 + + query II SELECT * FROM test_limit limit 2 offset 2; ---- 4 5 6 7 +query II +SELECT * FROM test_limit order by c1 desc limit 2 offset 2; +---- +2 3 +0 1 + query II SELECT * FROM test_limit limit 2 offset 4; ---- \ No newline at end of file From 64004a63b6861fd3232bd7b0a386016b7a516629 Mon Sep 17 00:00:00 2001 From: kould Date: Thu, 28 Dec 2023 13:43:59 +0800 Subject: [PATCH 7/7] code fmt --- src/executor/operator/physical_limit.cpp | 4 ++-- src/executor/operator/physical_limit.cppm | 10 +++++----- src/executor/operator/physical_merge_limit.cppm | 2 +- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/src/executor/operator/physical_limit.cpp b/src/executor/operator/physical_limit.cpp index b073e799fb..e2e9799e83 100644 --- a/src/executor/operator/physical_limit.cpp +++ b/src/executor/operator/physical_limit.cpp @@ -92,7 +92,7 @@ bool AtomicCounter::IsLimitOver() { } SizeT UnSyncCounter::Offset(SizeT row_count) { - SizeT result; + SizeT result = 0; if (offset_ <= 0) { return 0; @@ -111,7 +111,7 @@ SizeT UnSyncCounter::Offset(SizeT row_count) { } SizeT UnSyncCounter::Limit(SizeT row_count) { - SizeT result; + SizeT result = 0; if (limit_ <= 0) { return 0; diff --git a/src/executor/operator/physical_limit.cppm b/src/executor/operator/physical_limit.cppm index 9496fe3c10..8cb04602c1 100644 --- a/src/executor/operator/physical_limit.cppm +++ b/src/executor/operator/physical_limit.cppm @@ -54,8 +54,8 @@ public: bool IsLimitOver(); private: - ai64 offset_; - ai64 limit_; + ai64 offset_{}; + ai64 limit_{}; }; export class UnSyncCounter : public LimitCounter { @@ -69,8 +69,8 @@ public: bool IsLimitOver(); private: - i64 offset_; - i64 limit_; + i64 offset_{}; + i64 limit_{}; }; export class PhysicalLimit : public PhysicalOperator { @@ -108,7 +108,7 @@ private: SharedPtr limit_expr_{}; SharedPtr offset_expr_{}; - UniquePtr counter_; + UniquePtr counter_{}; }; } // namespace infinity diff --git a/src/executor/operator/physical_merge_limit.cppm b/src/executor/operator/physical_merge_limit.cppm index 0db34acb18..2db6157e85 100644 --- a/src/executor/operator/physical_merge_limit.cppm +++ b/src/executor/operator/physical_merge_limit.cppm @@ -56,7 +56,7 @@ private: SharedPtr limit_expr_{}; SharedPtr offset_expr_{}; - UniquePtr counter_; + UniquePtr counter_{}; }; } // namespace infinity