diff --git a/src/common/blocking_queue.cppm b/src/common/blocking_queue.cppm index d4a203db00..7cfaf0eb68 100644 --- a/src/common/blocking_queue.cppm +++ b/src/common/blocking_queue.cppm @@ -26,18 +26,32 @@ class BlockingQueue { public: explicit BlockingQueue(SizeT capacity = DEFAULT_BLOCKING_QUEUE_SIZE) : capacity_(capacity) {} - void Enqueue(T& task) { + void NotAllowEnqueue() { + allow_enqueue_ = false; + } + + bool Enqueue(T& task) { + if (!allow_enqueue_) { + return false; + } + UniqueLock lock(queue_mutex_); full_cv_.wait(lock, [this] { return queue_.size() < capacity_; }); queue_.push_back(task); empty_cv_.notify_one(); + return true; } - void Enqueue(T&& task) { + bool Enqueue(T&& task) { + if (!allow_enqueue_) { + return false; + } + UniqueLock lock(queue_mutex_); full_cv_.wait(lock, [this] { return queue_.size() < capacity_; }); queue_.push_back(Forward(task)); empty_cv_.notify_one(); + return true; } void EnqueueBulk(List &input_queue) { @@ -99,6 +113,7 @@ public: } protected: + atomic_bool allow_enqueue_{true}; mutable Mutex queue_mutex_{}; CondVar full_cv_{}; CondVar empty_cv_{}; diff --git a/src/executor/operator/physical_filter.cppm b/src/executor/operator/physical_filter.cppm index 5d4c15e661..60a0982a69 100644 --- a/src/executor/operator/physical_filter.cppm +++ b/src/executor/operator/physical_filter.cppm @@ -47,8 +47,7 @@ public: inline SharedPtr>> GetOutputTypes() const final { return left_->GetOutputTypes(); } SizeT TaskletCount() override { - Error("TaskletCount not Implement"); - return 0; + return left_->TaskletCount(); } inline const SharedPtr &condition() const { return condition_; } diff --git a/src/executor/operator/physical_limit.cpp b/src/executor/operator/physical_limit.cpp index b8ad6ea2ea..e2e9799e83 100644 --- a/src/executor/operator/physical_limit.cpp +++ b/src/executor/operator/physical_limit.cpp @@ -18,9 +18,13 @@ module; import stl; import txn; +import base_expression; +import default_values; +import load_meta; import query_context; import table_def; import data_table; +import default_values; import parser; import physical_operator_type; import operator_state; @@ -34,130 +38,194 @@ module physical_limit; namespace infinity { -void PhysicalLimit::Init() {} +SizeT AtomicCounter::Offset(SizeT row_count) { + auto success = false; + SizeT result = 0; -bool PhysicalLimit::Execute(QueryContext *query_context, OperatorState *operator_state) { + while (!success) { + i64 current_offset = offset_; + if (current_offset <= 0) { + return 0; + } + i64 last_offset = current_offset - row_count; -#if 0 - // output table definition is same as input - input_table_ = left_->output(); - Assert(input_table_.get() != nullptr, "No input"); + if (last_offset > 0) { + success = offset_.compare_exchange_strong(current_offset, last_offset); + result = row_count; + } else { + success = offset_.compare_exchange_strong(current_offset, 0); + result = current_offset; + } + } - Assert(limit_expr_->type() == ExpressionType::kValue, "Currently, only support constant limit expression"); + return result; +} - i64 limit = (static_pointer_cast(limit_expr_))->GetValue().value_.big_int; - Assert(limit > 0, "Limit should be larger than 0"); +SizeT AtomicCounter::Limit(SizeT row_count) { + auto success = false; + SizeT result = 0; + + while (!success) { + i64 current_limit = limit_; + if (current_limit <= 0) { + return 0; + } + i64 last_limit = current_limit - row_count; + + if (last_limit > 0) { + success = limit_.compare_exchange_strong(current_limit, last_limit); + result = row_count; + } else { + success = limit_.compare_exchange_strong(current_limit, 0); + result = current_limit; + } + } + + return result; +} + +bool AtomicCounter::IsLimitOver() { + if (limit_ < 0) { + Error("limit is not allowed to be smaller than 0"); + } + return limit_ == 0; +} + +SizeT UnSyncCounter::Offset(SizeT row_count) { + SizeT result = 0; + + if (offset_ <= 0) { + return 0; + } + i64 last_offset = offset_ - row_count; + + if (last_offset > 0) { + result = row_count; + offset_ = last_offset; + } else { + result = offset_; + offset_ = 0; + } + + return result; +} + +SizeT UnSyncCounter::Limit(SizeT row_count) { + SizeT result = 0; + + if (limit_ <= 0) { + return 0; + } + i64 last_limit = limit_ - row_count; + + if (last_limit > 0) { + result = row_count; + limit_ = last_limit; + } else { + result = limit_; + limit_ = 0; + } + + return result; +} + +bool UnSyncCounter::IsLimitOver() { + if (limit_ < 0) { + Error("limit is not allowed to be smaller than 0"); + } + return limit_ == 0; +} + +PhysicalLimit::PhysicalLimit(u64 id, + UniquePtr left, + SharedPtr limit_expr, + SharedPtr offset_expr, + SharedPtr> load_metas) + : PhysicalOperator(PhysicalOperatorType::kLimit, Move(left), nullptr, id, load_metas), limit_expr_(Move(limit_expr)), + offset_expr_(Move(offset_expr)) { i64 offset = 0; + i64 limit = (static_pointer_cast(limit_expr_))->GetValue().value_.big_int; + if (offset_expr_ != nullptr) { - Assert(offset_expr_->type() == ExpressionType::kValue, "Currently, only support constant limit expression"); offset = (static_pointer_cast(offset_expr_))->GetValue().value_.big_int; - Assert(offset >= 0 && offset < input_table_->row_count(), - "Offset should be larger or equal than 0 and less than row number"); } - output_ = GetLimitOutput(input_table_, limit, offset); -#endif - return true; + counter_ = MakeUnique(offset, limit); } -SharedPtr PhysicalLimit::GetLimitOutput(const SharedPtr &input_table, i64 limit, i64 offset) { - SizeT start_block = 0; - SizeT start_row_id = 0; - SizeT end_block = 0; - SizeT end_row_id = 0; +void PhysicalLimit::Init() {} - if (offset == 0) { - if (limit >= (i64)input_table->row_count()) { - return input_table; - } else { - start_block = 0; - start_row_id = 0; - SizeT block_count = input_table->DataBlockCount(); - i64 total_row_count = limit; - for (SizeT block_id = 0; block_id < block_count; ++block_id) { - SizeT block_row_count = input_table->GetDataBlockById(block_id)->row_count(); - if (total_row_count > (i64)block_row_count) { - total_row_count -= block_row_count; - } else { - end_block = block_id; - end_row_id = total_row_count; - break; - } - } - } - } else { - i64 total_row_count = offset; - SizeT block_count = input_table->DataBlockCount(); - SizeT rest_row_count = 0; - for (SizeT block_id = 0; block_id < block_count; ++block_id) { - SizeT block_row_count = input_table->GetDataBlockById(block_id)->row_count(); - if (total_row_count >= (i64)block_row_count) { - total_row_count -= block_row_count; - } else { - start_block = block_id; - start_row_id = total_row_count; - rest_row_count = block_row_count - total_row_count; - break; - } - } +// offset limit + offset +// left right +// | a | b | c | d | e | f +bool PhysicalLimit::Execute(QueryContext *query_context, + const Vector> &input_blocks, + Vector> &output_blocks, + LimitCounter *counter) { + SizeT input_row_count = 0; + + for (SizeT block_id = 0; block_id < input_blocks.size(); block_id++) { + input_row_count += input_blocks[block_id]->row_count(); + } - total_row_count = limit; - if (total_row_count <= (i64)rest_row_count) { - end_block = start_block; - end_row_id = total_row_count; - } else { - total_row_count -= rest_row_count; - for (SizeT block_id = start_block + 1; block_id < block_count; ++block_id) { - SizeT block_row_count = input_table->GetDataBlockById(block_id)->row_count(); - if (total_row_count > (i64)block_row_count) { - total_row_count -= block_row_count; - } else { - end_block = block_id; - end_row_id = total_row_count; - break; - } - } - } + SizeT offset = counter->Offset(input_row_count); + if (offset == input_row_count) { + return true; } - // Copy from input table to output table - SizeT column_count = input_table->ColumnCount(); - Vector> types; - types.reserve(column_count); - Vector> columns; - columns.reserve(column_count); - for (SizeT idx = 0; idx < column_count; ++idx) { - SharedPtr col_type = input_table->GetColumnTypeById(idx); - types.emplace_back(col_type); + SizeT limit = counter->Limit(input_row_count - offset); + SizeT block_start_idx = 0; - String col_name = input_table->GetColumnNameById(idx); + for (SizeT block_id = 0; block_id < input_blocks.size(); block_id++) { + if (input_blocks[block_id]->row_count() == 0) { + continue; + } + SizeT max_offset = input_blocks[block_id]->row_count() - 1; - SharedPtr col_def = MakeShared(idx, col_type, col_name, HashSet()); - columns.emplace_back(col_def); + if (offset > max_offset) { + offset -= max_offset; + } else { + block_start_idx = block_id; + break; + } } - SharedPtr table_def = TableDef::Make(MakeShared("default"), MakeShared("limit"), columns); - SharedPtr output_table = DataTable::Make(table_def, TableType::kIntermediate); - - const Vector> &input_datablocks = input_table->data_blocks_; + for (SizeT block_id = block_start_idx; block_id < input_blocks.size(); block_id++) { + auto &input_block = input_blocks[block_id]; + auto row_count = input_block->row_count(); + if (row_count == 0) { + continue; + } + auto block = DataBlock::MakeUniquePtr(); - for (SizeT block_id = start_block; block_id <= end_block; ++block_id) { - SizeT input_start_offset = start_row_id; - SizeT input_end_offset; - if (end_block == block_id) { - input_end_offset = end_row_id; + block->Init(input_block->types()); + if (limit >= row_count) { + block->AppendWith(input_block.get(), offset, row_count); + limit -= row_count; } else { - // current input block isn't the last one. - input_end_offset = input_datablocks[block_id]->row_count(); + block->AppendWith(input_block.get(), offset, limit); + limit = 0; } + block->Finalize(); + output_blocks.push_back(Move(block)); + offset = 0; - SharedPtr output_datablock = DataBlock::Make(); - output_datablock->Init(input_datablocks[block_id], input_start_offset, input_end_offset); - output_table->Append(output_datablock); + if (limit == 0) { + break; + } + } + + return true; +} + +bool PhysicalLimit::Execute(QueryContext *query_context, OperatorState *operator_state) { + auto result = Execute(query_context, operator_state->prev_op_state_->data_block_array_, operator_state->data_block_array_, counter_.get()); - start_row_id = 0; + operator_state->prev_op_state_->data_block_array_.clear(); + if (counter_->IsLimitOver() || operator_state->prev_op_state_->Complete()) { + operator_state->SetComplete(); } - return output_table; + return result; } } // namespace infinity diff --git a/src/executor/operator/physical_limit.cppm b/src/executor/operator/physical_limit.cppm index f1a7c7c45f..8cb04602c1 100644 --- a/src/executor/operator/physical_limit.cppm +++ b/src/executor/operator/physical_limit.cppm @@ -21,6 +21,7 @@ import operator_state; import physical_operator; import physical_operator_type; import base_expression; +import value_expression; import data_table; import load_meta; import infinity_exception; @@ -29,20 +30,66 @@ export module physical_limit; namespace infinity { +class DataBlock; + +export class LimitCounter { +public: + // Returns the left index after offset + virtual SizeT Offset(SizeT row_count) = 0; + + // Returns the right index after limit + virtual SizeT Limit(SizeT row_count) = 0; + + virtual bool IsLimitOver() = 0; +}; + +export class AtomicCounter : public LimitCounter { +public: + AtomicCounter(i64 offset, i64 limit) : offset_(offset), limit_(limit) {} + + SizeT Offset(SizeT row_count); + + SizeT Limit(SizeT row_count); + + bool IsLimitOver(); + +private: + ai64 offset_{}; + ai64 limit_{}; +}; + +export class UnSyncCounter : public LimitCounter { +public: + UnSyncCounter(i64 offset, i64 limit) : offset_(offset), limit_(limit) {} + + SizeT Offset(SizeT row_count); + + SizeT Limit(SizeT row_count); + + bool IsLimitOver(); + +private: + i64 offset_{}; + i64 limit_{}; +}; + export class PhysicalLimit : public PhysicalOperator { public: explicit PhysicalLimit(u64 id, UniquePtr left, SharedPtr limit_expr, SharedPtr offset_expr, - SharedPtr> load_metas) - : PhysicalOperator(PhysicalOperatorType::kLimit, Move(left), nullptr, id, load_metas), limit_expr_(Move(limit_expr)), - offset_expr_(Move(offset_expr)) {} + SharedPtr> load_metas); ~PhysicalLimit() override = default; void Init() override; + static bool Execute(QueryContext *query_context, + const Vector> &input_blocks, + Vector> &output_blocks, + LimitCounter *counter); + bool Execute(QueryContext *query_context, OperatorState *operator_state) final; inline SharedPtr> GetOutputNames() const final { return left_->GetOutputNames(); } @@ -50,22 +97,18 @@ public: inline SharedPtr>> GetOutputTypes() const final { return left_->GetOutputTypes(); } SizeT TaskletCount() override { - Error("TaskletCount not Implement"); - return 0; + return left_->TaskletCount(); } inline const SharedPtr &limit_expr() const { return limit_expr_; } inline const SharedPtr &offset_expr() const { return offset_expr_; } - static SharedPtr GetLimitOutput(const SharedPtr &input_table, i64 limit, i64 offset); - private: SharedPtr limit_expr_{}; SharedPtr offset_expr_{}; - SharedPtr input_table_{}; - u64 input_table_index_{}; + UniquePtr counter_{}; }; } // namespace infinity diff --git a/src/executor/operator/physical_merge_limit.cpp b/src/executor/operator/physical_merge_limit.cpp index 90a16596ff..3870f24421 100644 --- a/src/executor/operator/physical_merge_limit.cpp +++ b/src/executor/operator/physical_merge_limit.cpp @@ -14,15 +14,53 @@ module; +#include + +import stl; import query_context; +import base_expression; +import load_meta; +import physical_operator_type; +import value_expression; +import physical_limit; import operator_state; module physical_merge_limit; namespace infinity { +PhysicalMergeLimit::PhysicalMergeLimit(u64 id, + UniquePtr left, + SharedPtr limit_expr, + SharedPtr offset_expr, + SharedPtr> load_metas) + : PhysicalOperator(PhysicalOperatorType::kMergeLimit, Move(left), nullptr, id, load_metas), limit_expr_(Move(limit_expr)), + offset_expr_(Move(offset_expr)) { + i64 offset = 0; + i64 limit = (static_pointer_cast(limit_expr_))->GetValue().value_.big_int; + + if (offset_expr_ != nullptr) { + offset = (static_pointer_cast(offset_expr_))->GetValue().value_.big_int; + } + counter_ = MakeUnique(offset, limit); +} + void PhysicalMergeLimit::Init() {} -bool PhysicalMergeLimit::Execute(QueryContext *, OperatorState *) { return true; } +bool PhysicalMergeLimit::Execute(QueryContext *query_context, OperatorState *operator_state) { + MergeLimitOperatorState *limit_op_state = (MergeLimitOperatorState *)operator_state; + + if (limit_op_state->input_data_blocks_.empty()) { + return false; + } + auto result = PhysicalLimit::Execute(query_context, limit_op_state->input_data_blocks_, limit_op_state->data_block_array_, counter_.get()); + + if (counter_->IsLimitOver() || limit_op_state->input_complete_) { + limit_op_state->input_complete_ = true; + limit_op_state->SetComplete(); + } + limit_op_state->input_data_blocks_.clear(); + return result; +} } // namespace infinity diff --git a/src/executor/operator/physical_merge_limit.cppm b/src/executor/operator/physical_merge_limit.cppm index 50e956eaf0..2db6157e85 100644 --- a/src/executor/operator/physical_merge_limit.cppm +++ b/src/executor/operator/physical_merge_limit.cppm @@ -16,9 +16,11 @@ module; import stl; import parser; +import base_expression; import query_context; import operator_state; import physical_operator; +import physical_limit; import physical_operator_type; import load_meta; import infinity_exception; @@ -29,12 +31,11 @@ namespace infinity { export class PhysicalMergeLimit final : public PhysicalOperator { public: - explicit PhysicalMergeLimit(SharedPtr> output_names, - SharedPtr>> output_types, - u64 id, - SharedPtr> load_metas) - : PhysicalOperator(PhysicalOperatorType::kMergeLimit, nullptr, nullptr, id, load_metas), output_names_(Move(output_names)), - output_types_(Move(output_types)) {} + explicit PhysicalMergeLimit(u64 id, + UniquePtr left, + SharedPtr limit_expr, + SharedPtr offset_expr, + SharedPtr> load_metas); ~PhysicalMergeLimit() override = default; @@ -42,9 +43,9 @@ public: bool Execute(QueryContext *query_context, OperatorState *operator_state) final; - inline SharedPtr> GetOutputNames() const final { return output_names_; } + inline SharedPtr> GetOutputNames() const final { return left_->GetOutputNames(); } - inline SharedPtr>> GetOutputTypes() const final { return output_types_; } + inline SharedPtr>> GetOutputTypes() const final { return left_->GetOutputTypes(); } SizeT TaskletCount() override { Error("TaskletCount not Implement"); @@ -52,8 +53,10 @@ public: } private: - SharedPtr> output_names_{}; - SharedPtr>> output_types_{}; + SharedPtr limit_expr_{}; + SharedPtr offset_expr_{}; + + UniquePtr counter_{}; }; } // namespace infinity diff --git a/src/executor/operator/physical_project.cppm b/src/executor/operator/physical_project.cppm index 89db6b19da..101d36b77c 100644 --- a/src/executor/operator/physical_project.cppm +++ b/src/executor/operator/physical_project.cppm @@ -49,8 +49,7 @@ public: SharedPtr>> GetOutputTypes() const final; SizeT TaskletCount() override { - Error("TaskletCount not Implement"); - return 0; + return left_->TaskletCount(); } Vector> expressions_{}; diff --git a/src/executor/operator/physical_sink.cpp b/src/executor/operator/physical_sink.cpp index 1cfc3acd6d..f753d2caad 100644 --- a/src/executor/operator/physical_sink.cpp +++ b/src/executor/operator/physical_sink.cpp @@ -20,6 +20,7 @@ import query_context; import parser; import operator_state; import physical_operator_type; +import fragment_context; import third_party; import fragment_data; import data_block; @@ -35,7 +36,7 @@ void PhysicalSink::Init() {} bool PhysicalSink::Execute(QueryContext *, OperatorState *) { return true; } -bool PhysicalSink::Execute(QueryContext *, SinkState *sink_state) { +bool PhysicalSink::Execute(QueryContext *, FragmentContext *fragment_context, SinkState *sink_state) { switch (sink_state->state_type_) { case SinkStateType::kInvalid: { Error("Invalid sinker type"); @@ -67,7 +68,7 @@ bool PhysicalSink::Execute(QueryContext *, SinkState *sink_state) { } case SinkStateType::kQueue: { QueueSinkState *queue_sink_state = static_cast(sink_state); - FillSinkStateFromLastOperatorState(queue_sink_state, queue_sink_state->prev_op_state_); + FillSinkStateFromLastOperatorState(fragment_context, queue_sink_state, queue_sink_state->prev_op_state_); break; } } @@ -102,11 +103,7 @@ void PhysicalSink::FillSinkStateFromLastOperatorState(MaterializeSinkState *mate case PhysicalOperatorType::kProjection: { ProjectionOperatorState *projection_output_state = static_cast(task_op_state); if (projection_output_state->data_block_array_.empty()) { - if(materialize_sink_state->Error()) { - materialize_sink_state->empty_result_ = true; - } else { - Error("Empty projection output"); - } + materialize_sink_state->empty_result_ = true; } else { for (auto &data_block : projection_output_state->data_block_array_) { materialize_sink_state->data_block_array_.emplace_back(Move(data_block)); @@ -118,7 +115,7 @@ void PhysicalSink::FillSinkStateFromLastOperatorState(MaterializeSinkState *mate case PhysicalOperatorType::kSort: { SortOperatorState *sort_output_state = static_cast(task_op_state); if (sort_output_state->data_block_array_.empty()) { - if(materialize_sink_state->Error()) { + if (materialize_sink_state->Error()) { materialize_sink_state->empty_result_ = true; } else { Error("Empty sort output"); @@ -134,10 +131,10 @@ void PhysicalSink::FillSinkStateFromLastOperatorState(MaterializeSinkState *mate case PhysicalOperatorType::kAggregate: { AggregateOperatorState *agg_output_state = static_cast(task_op_state); if (agg_output_state->data_block_array_.empty()) { - if(materialize_sink_state->Error()) { + if (materialize_sink_state->Error()) { materialize_sink_state->empty_result_ = true; } else { - Error("Empty sort output"); + Error("Empty agg output"); } } else { materialize_sink_state->data_block_array_ = Move(agg_output_state->data_block_array_); @@ -348,7 +345,7 @@ void PhysicalSink::FillSinkStateFromLastOperatorState(MessageSinkState *message_ } } -void PhysicalSink::FillSinkStateFromLastOperatorState(QueueSinkState *queue_sink_state, OperatorState *task_operator_state) { +void PhysicalSink::FillSinkStateFromLastOperatorState(FragmentContext *fragment_context, QueueSinkState *queue_sink_state, OperatorState *task_operator_state) { if (queue_sink_state->error_message_.get() != nullptr) { LOG_TRACE(Format("Error: {} is sent to notify next fragment", *queue_sink_state->error_message_)); auto fragment_error = MakeShared(queue_sink_state->fragment_id_, MakeUnique(*queue_sink_state->error_message_)); @@ -358,7 +355,7 @@ void PhysicalSink::FillSinkStateFromLastOperatorState(QueueSinkState *queue_sink return; } - if (!task_operator_state->Complete()) { + if (!task_operator_state->Complete() && fragment_context->IsMaterialize()) { LOG_TRACE("Task not completed"); return; } @@ -376,8 +373,17 @@ void PhysicalSink::FillSinkStateFromLastOperatorState(QueueSinkState *queue_sink queue_sink_state->task_id_, idx, output_data_block_count); + if (task_operator_state->Complete() && !fragment_context->IsMaterialize()) { + fragment_data->data_idx_ = None; + } + for (const auto &next_fragment_queue : queue_sink_state->fragment_data_queues_) { - next_fragment_queue->Enqueue(fragment_data); + // when the Enqueue returns false, + // it means that the downstream has collected enough data, + // preventing the Queue from Enqueue in data again to avoid redundant calculations. + if (!next_fragment_queue->Enqueue(fragment_data)) { + task_operator_state->SetComplete(); + } } } task_operator_state->data_block_array_.clear(); diff --git a/src/executor/operator/physical_sink.cppm b/src/executor/operator/physical_sink.cppm index fb5379711d..51a6f721af 100644 --- a/src/executor/operator/physical_sink.cppm +++ b/src/executor/operator/physical_sink.cppm @@ -27,6 +27,8 @@ export module physical_sink; namespace infinity { +class FragmentContext; + export enum class SinkType { kInvalid, kLocalQueue, @@ -46,7 +48,7 @@ public: bool Execute(QueryContext *query_context, OperatorState *output_state) final; - bool Execute(QueryContext *query_context, SinkState *sink_state); + bool Execute(QueryContext *query_context, FragmentContext *fragment_context, SinkState *sink_state); inline SharedPtr> GetOutputNames() const final { return output_names_; } @@ -68,7 +70,7 @@ private: void FillSinkStateFromLastOperatorState(SummarySinkState *message_sink_state, OperatorState *task_operator_state); - void FillSinkStateFromLastOperatorState(QueueSinkState *queue_sink_state, OperatorState *task_operator_state); + void FillSinkStateFromLastOperatorState(FragmentContext *fragment_context, QueueSinkState *queue_sink_state, OperatorState *task_operator_state); private: SharedPtr> output_names_{}; diff --git a/src/executor/operator/physical_sort.cppm b/src/executor/operator/physical_sort.cppm index 09e121edf6..e556ce0265 100644 --- a/src/executor/operator/physical_sort.cppm +++ b/src/executor/operator/physical_sort.cppm @@ -57,8 +57,7 @@ public: inline SharedPtr>> GetOutputTypes() const final { return left_->GetOutputTypes(); } SizeT TaskletCount() override { - Error("TaskletCount not Implement"); - return 0; + return left_->TaskletCount(); } Vector> expressions_; diff --git a/src/executor/operator_state.cpp b/src/executor/operator_state.cpp index 0c439534c7..7f8967dabd 100644 --- a/src/executor/operator_state.cpp +++ b/src/executor/operator_state.cpp @@ -45,7 +45,10 @@ bool QueueSourceState::GetData() { switch (fragment_data_base->type_) { case FragmentDataType::kData: { auto *fragment_data = static_cast(fragment_data_base.get()); - if (fragment_data->data_idx_ + 1 == fragment_data->data_count_) { + if (!fragment_data->data_idx_.has_value()) { + // fragment completed + MarkCompletedTask(fragment_data->fragment_id_); + } else if (fragment_data->data_idx_.value() + 1 == fragment_data->data_count_) { // Get an all data from this MarkCompletedTask(fragment_data->fragment_id_); } @@ -99,6 +102,18 @@ bool QueueSourceState::GetData() { create_index_finish_op_state->input_complete_ = completed; break; } + case PhysicalOperatorType::kMergeLimit: { + auto *fragment_data = static_cast(fragment_data_base.get()); + MergeLimitOperatorState *limit_op_state = (MergeLimitOperatorState *)next_op_state; + limit_op_state->input_data_blocks_.push_back(Move(fragment_data->data_block_)); + if (!limit_op_state->input_complete_) { + limit_op_state->input_complete_ = completed; + } + if (limit_op_state->input_complete_) { + source_queue_.NotAllowEnqueue(); + } + break; + } default: { Error("Not support operator type"); break; diff --git a/src/executor/operator_state.cppm b/src/executor/operator_state.cppm index 80500c5647..a5b2515027 100644 --- a/src/executor/operator_state.cppm +++ b/src/executor/operator_state.cppm @@ -153,6 +153,9 @@ export struct LimitOperatorState : public OperatorState { // Merge Limit export struct MergeLimitOperatorState : public OperatorState { inline explicit MergeLimitOperatorState() : OperatorState(PhysicalOperatorType::kMergeLimit) {} + + Vector> input_data_blocks_{}; // Since merge knn is the first op, no previous operator state. This ptr is to get input data. + bool input_complete_{false}; }; // Merge Top diff --git a/src/executor/physical_planner.cpp b/src/executor/physical_planner.cpp index 011e2ae19b..eba2599e84 100644 --- a/src/executor/physical_planner.cpp +++ b/src/executor/physical_planner.cpp @@ -115,6 +115,8 @@ import logical_match; import logical_fusion; import parser; +import value; +import value_expression; import explain_physical_plan; import infinity_exception; @@ -543,11 +545,11 @@ UniquePtr PhysicalPlanner::BuildAggregate(const SharedPtr(query_context_ptr_->GetNextNodeID(), - logical_aggregate->base_table_ref_, - Move(physical_agg_op), - logical_aggregate->GetOutputNames(), - logical_aggregate->GetOutputTypes(), - logical_operator->load_metas()); + logical_aggregate->base_table_ref_, + Move(physical_agg_op), + logical_aggregate->GetOutputNames(), + logical_aggregate->GetOutputTypes(), + logical_operator->load_metas()); } } @@ -638,11 +640,29 @@ UniquePtr PhysicalPlanner::BuildLimit(const SharedPtr logical_limit = static_pointer_cast(logical_operator); UniquePtr input_physical_operator = BuildPhysicalOperator(input_logical_node); - return MakeUnique(logical_operator->node_id(), - Move(input_physical_operator), - logical_limit->limit_expression_, - logical_limit->offset_expression_, - logical_operator->load_metas()); + if (input_physical_operator->TaskletCount() <= 1) { + return MakeUnique(logical_operator->node_id(), + Move(input_physical_operator), + logical_limit->limit_expression_, + logical_limit->offset_expression_, + logical_operator->load_metas()); + } else { + i64 child_limit = (static_pointer_cast(logical_limit->limit_expression_))->GetValue().value_.big_int; + + if (logical_limit->offset_expression_ != nullptr) { + child_limit += (static_pointer_cast(logical_limit->offset_expression_))->GetValue().value_.big_int; + } + auto child_limit_op = MakeUnique(logical_operator->node_id(), + Move(input_physical_operator), + MakeShared(Value::MakeBigInt(child_limit)), + nullptr, + logical_operator->load_metas()); + return MakeUnique(query_context_ptr_->GetNextNodeID(), + Move(child_limit_op), + logical_limit->limit_expression_, + logical_limit->offset_expression_, + logical_operator->load_metas()); + } } UniquePtr PhysicalPlanner::BuildProjection(const SharedPtr &logical_operator) const { diff --git a/src/planner/bound_select_statement.cpp b/src/planner/bound_select_statement.cpp index 96188bd692..33ee8b5cb0 100644 --- a/src/planner/bound_select_statement.cpp +++ b/src/planner/bound_select_statement.cpp @@ -110,6 +110,12 @@ SharedPtr BoundSelectStatement::BuildPlan(QueryContext *query_conte root = sort; } + if (limit_expression_ != nullptr) { + auto limit = MakeShared(bind_context->GetNewLogicalNodeId(), limit_expression_, offset_expression_); + limit->set_left_node(root); + root = limit; + } + auto project = MakeShared(bind_context->GetNewLogicalNodeId(), projection_expressions_, projection_index_); project->set_left_node(root); root = project; diff --git a/src/planner/optimizer/lazy_load.cpp b/src/planner/optimizer/lazy_load.cpp index baa6a0a584..33a1a976c0 100644 --- a/src/planner/optimizer/lazy_load.cpp +++ b/src/planner/optimizer/lazy_load.cpp @@ -135,6 +135,7 @@ void CleanScan::VisitNode(LogicalNode &op) { match.base_table_ref_->RetainColumnByIndices(Move(project_indices)); break; } + case LogicalNodeType::kLimit: case LogicalNodeType::kFusion: { // Skip VisitNodeChildren(op); diff --git a/src/scheduler/fragment_context.cpp b/src/scheduler/fragment_context.cpp index c2ad399599..f0c890723c 100644 --- a/src/scheduler/fragment_context.cpp +++ b/src/scheduler/fragment_context.cpp @@ -677,7 +677,6 @@ void FragmentContext::MakeSinkState(i64 parallel_count) { case PhysicalOperatorType::kAggregate: case PhysicalOperatorType::kParallelAggregate: case PhysicalOperatorType::kHash: - case PhysicalOperatorType::kLimit: case PhysicalOperatorType::kTop: { if (fragment_type_ != FragmentType::kParallelStream) { Error(Format("{} should in parallel stream fragment", PhysicalOperatorToString(last_operator->operator_type()))); @@ -696,6 +695,22 @@ void FragmentContext::MakeSinkState(i64 parallel_count) { } break; } + case PhysicalOperatorType::kLimit: { + if (fragment_type_ != FragmentType::kParallelStream) { + Error(Format("{} should in parallel stream fragment", PhysicalOperatorToString(last_operator->operator_type()))); + } + + if ((i64)tasks_.size() != parallel_count) { + Error(Format("{} task count isn't correct.", PhysicalOperatorToString(last_operator->operator_type()))); + } + + for (u64 task_id = 0; (i64)task_id < parallel_count; ++task_id) { + auto sink_state = MakeUnique(fragment_ptr_->FragmentID(), task_id); + + tasks_[task_id]->sink_state_ = Move(sink_state); + } + break; + } case PhysicalOperatorType::kMergeParallelAggregate: case PhysicalOperatorType::kMergeHash: case PhysicalOperatorType::kMergeLimit: diff --git a/src/scheduler/fragment_context.cppm b/src/scheduler/fragment_context.cppm index 807c5caf83..1ff604e58c 100644 --- a/src/scheduler/fragment_context.cppm +++ b/src/scheduler/fragment_context.cppm @@ -47,6 +47,8 @@ export enum class FragmentType { kParallelStream, }; +class PlanFragment; + export class FragmentContext { public: static void @@ -78,6 +80,9 @@ public: inline Vector> &Tasks() { return tasks_; } + [[nodiscard]] inline bool IsMaterialize() const { return fragment_type_ == FragmentType::kSerialMaterialize || fragment_type_ == FragmentType::kParallelMaterialize; } + + inline SharedPtr GetResult() { UniqueLock lk(locker_); cv_.wait(lk, [&] { return completed_; }); diff --git a/src/scheduler/fragment_data.cppm b/src/scheduler/fragment_data.cppm index d8dfcebbdb..bbda608f7a 100644 --- a/src/scheduler/fragment_data.cppm +++ b/src/scheduler/fragment_data.cppm @@ -45,7 +45,7 @@ export struct FragmentError : public FragmentDataBase { export struct FragmentData : public FragmentDataBase { UniquePtr data_block_{}; i64 task_id_{-1}; - SizeT data_idx_{u64_max}; + Optional data_idx_{}; SizeT data_count_{u64_max}; FragmentData(u64 fragment_id, UniquePtr data_block, i64 task_id, SizeT data_idx, SizeT data_count) diff --git a/src/scheduler/fragment_task.cpp b/src/scheduler/fragment_task.cpp index 377ef8b7bc..a7e544d2bd 100644 --- a/src/scheduler/fragment_task.cpp +++ b/src/scheduler/fragment_task.cpp @@ -100,7 +100,7 @@ void FragmentTask::OnExecute(i64) { if (execute_success or sink_state_->error_message_.get() != nullptr) { PhysicalSink *sink_op = fragment_context->GetSinkOperator(); - sink_op->Execute(query_context, sink_state_.get()); + sink_op->Execute(query_context, fragment_context, sink_state_.get()); } } diff --git a/src/storage/data_block.cpp b/src/storage/data_block.cpp index df1f66d570..d505726d4e 100644 --- a/src/storage/data_block.cpp +++ b/src/storage/data_block.cpp @@ -271,7 +271,7 @@ void DataBlock::AppendWith(const DataBlock *other) { } } -void DataBlock::AppendWith(const SharedPtr &other, SizeT from, SizeT count) { +void DataBlock::AppendWith(const DataBlock *other, SizeT from, SizeT count) { if (other->column_count() != this->column_count()) { Error( Format("Attempt merge block with column count {} into block with column count {}", other->column_count(), this->column_count())); diff --git a/src/storage/data_block.cppm b/src/storage/data_block.cppm index 5e2a3c77ba..96fbbbb5bf 100644 --- a/src/storage/data_block.cppm +++ b/src/storage/data_block.cppm @@ -80,7 +80,7 @@ public: void AppendWith(const DataBlock *other); - void AppendWith(const SharedPtr &other, SizeT from, SizeT count); + void AppendWith(const DataBlock *other, SizeT from, SizeT count); void InsertVector(const SharedPtr &vector, SizeT index); diff --git a/src/storage/txn/txn_store.cpp b/src/storage/txn/txn_store.cpp index 3d3fb53fe1..8c51750f94 100644 --- a/src/storage/txn/txn_store.cpp +++ b/src/storage/txn/txn_store.cpp @@ -64,17 +64,17 @@ UniquePtr TxnTableStore::Append(const SharedPtr &input_block) if (current_block->row_count() + input_block->row_count() > current_block->capacity()) { SizeT to_append = current_block->capacity() - current_block->row_count(); - current_block->AppendWith(input_block, 0, to_append); + current_block->AppendWith(input_block.get(), 0, to_append); current_block->Finalize(); blocks_.emplace_back(DataBlock::Make()); blocks_.back()->Init(column_types); ++current_block_id_; current_block = blocks_[current_block_id_].get(); - current_block->AppendWith(input_block, to_append, input_block->row_count() - to_append); + current_block->AppendWith(input_block.get(), to_append, input_block->row_count() - to_append); } else { SizeT to_append = input_block->row_count(); - current_block->AppendWith(input_block, 0, to_append); + current_block->AppendWith(input_block.get(), 0, to_append); } current_block->Finalize(); diff --git a/test/sql/dql/limit.slt b/test/sql/dql/limit.slt new file mode 100644 index 0000000000..eafcc55e49 --- /dev/null +++ b/test/sql/dql/limit.slt @@ -0,0 +1,55 @@ +statement ok +DROP TABLE IF EXISTS test_limit; + +statement ok +CREATE TABLE test_limit (c1 INTEGER, c2 INTEGER); + +statement ok +INSERT INTO test_limit VALUES(0,1),(2,3),(4,5),(6,7); + +query II +SELECT * FROM test_limit limit 0; +---- + +query II +SELECT * FROM test_limit limit 10; +---- +0 1 +2 3 +4 5 +6 7 + +query II +SELECT * FROM test_limit limit 2; +---- +0 1 +2 3 + +query II +SELECT * FROM test_limit limit 2 offset 0; +---- +0 1 +2 3 + +query II +SELECT * FROM test_limit where c1 > 0 limit 2 offset 1; +---- +4 5 +6 7 + + +query II +SELECT * FROM test_limit limit 2 offset 2; +---- +4 5 +6 7 + +query II +SELECT * FROM test_limit order by c1 desc limit 2 offset 2; +---- +2 3 +0 1 + +query II +SELECT * FROM test_limit limit 2 offset 4; +---- \ No newline at end of file diff --git a/tools/generate_limit.py b/tools/generate_limit.py new file mode 100644 index 0000000000..8a87418020 --- /dev/null +++ b/tools/generate_limit.py @@ -0,0 +1,81 @@ +import numpy as np +import random +import os +import argparse + + +def generate(generate_if_exists: bool, copy_dir: str): + row_n = 9000 + limit = 8500 + offset = 20 + limit_dir = "./test/data/csv" + slt_dir = "./test/sql/dql" + + table_name = "test_big_limit" + limit_path = limit_dir + "/test_big_limit.csv" + slt_path = slt_dir + "/big_limit.slt" + copy_path = copy_dir + "/test_big_limit.csv" + + os.makedirs(limit_dir, exist_ok=True) + os.makedirs(slt_dir, exist_ok=True) + if os.path.exists(limit_path) and os.path.exists(slt_path) and generate_if_exists: + print( + "File {} and {} already existed exists. Skip Generating.".format( + slt_path, limit_path + ) + ) + return + with open(limit_path, "w") as limit_file, open(slt_path, "w") as slt_file: + slt_file.write("statement ok\n") + slt_file.write("DROP TABLE IF EXISTS {};\n".format(table_name)) + slt_file.write("\n") + slt_file.write("statement ok\n") + slt_file.write( + "CREATE TABLE {} (c1 int, c2 int);\n".format(table_name) + ) + slt_file.write("\n") + slt_file.write("query I\n") + slt_file.write( + "COPY {} FROM '{}' WITH ( DELIMITER ',' );\n".format( + table_name, copy_path + ) + ) + slt_file.write("----\n") + slt_file.write("\n") + slt_file.write("query I\n") + slt_file.write("SELECT * FROM {} limit {} offset {};\n".format(table_name, limit, offset)) + slt_file.write("----\n") + + for _ in range(row_n): + limit_file.write("0,0") + limit_file.write("\n") + + for _ in range(limit): + slt_file.write("0 0") + slt_file.write("\n") + + slt_file.write("\n") + slt_file.write("statement ok\n") + slt_file.write("DROP TABLE {};\n".format(table_name)) + random.random() + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Generate limit data for test") + + parser.add_argument( + "-g", + "--generate", + type=bool, + default=False, + dest="generate_if_exists", + ) + parser.add_argument( + "-c", + "--copy", + type=str, + default="/tmp/infinity/test_data", + dest="copy_dir", + ) + args = parser.parse_args() + generate(args.generate_if_exists, args.copy_dir) diff --git a/tools/generate_sort.py b/tools/generate_sort.py index 256536df2c..f605438d2c 100644 --- a/tools/generate_sort.py +++ b/tools/generate_sort.py @@ -6,7 +6,6 @@ def generate(generate_if_exists: bool, copy_dir: str): row_n = 9000 - dim = 128 sort_dir = "./test/data/csv" slt_dir = "./test/sql/dql" @@ -30,7 +29,7 @@ def generate(generate_if_exists: bool, copy_dir: str): slt_file.write("\n") slt_file.write("statement ok\n") slt_file.write( - "CREATE TABLE {} (c1 int, c2 int);\n".format(table_name, dim) + "CREATE TABLE {} (c1 int, c2 int);\n".format(table_name) ) slt_file.write("\n") slt_file.write("query I\n") diff --git a/tools/sqllogictest.py b/tools/sqllogictest.py index f6bbf53a81..5c49dccfb1 100644 --- a/tools/sqllogictest.py +++ b/tools/sqllogictest.py @@ -5,6 +5,7 @@ from generate_big import generate as generate1 from generate_fvecs import generate as generate2 from generate_sort import generate as generate3 +from generate_limit import generate as generate4 def python_skd_test(python_test_dir: str): @@ -106,6 +107,7 @@ def copy_all(data_dir, copy_dir): generate1(args.generate_if_exists, args.copy) generate2(args.generate_if_exists, args.copy) generate3(args.generate_if_exists, args.copy) + generate4(args.generate_if_exists, args.copy) print("Generate file finshed.") python_skd_test(python_test_dir) test_process(args.path, args.test, args.data, args.copy)