diff --git a/src/common/blocking_queue.cppm b/src/common/blocking_queue.cppm index d4a203db00..bc933c4eaa 100644 --- a/src/common/blocking_queue.cppm +++ b/src/common/blocking_queue.cppm @@ -26,18 +26,32 @@ class BlockingQueue { public: explicit BlockingQueue(SizeT capacity = DEFAULT_BLOCKING_QUEUE_SIZE) : capacity_(capacity) {} - void Enqueue(T& task) { + void NotAllowEnqueue() { + allow_enqueue_ = false; + } + + bool Enqueue(T& task) { UniqueLock lock(queue_mutex_); + + if (!allow_enqueue_) { + return false; + } full_cv_.wait(lock, [this] { return queue_.size() < capacity_; }); queue_.push_back(task); empty_cv_.notify_one(); + return true; } - void Enqueue(T&& task) { + bool Enqueue(T&& task) { UniqueLock lock(queue_mutex_); + + if (!allow_enqueue_) { + return false; + } full_cv_.wait(lock, [this] { return queue_.size() < capacity_; }); queue_.push_back(Forward(task)); empty_cv_.notify_one(); + return true; } void EnqueueBulk(List &input_queue) { @@ -99,6 +113,7 @@ public: } protected: + atomic_bool allow_enqueue_{true}; mutable Mutex queue_mutex_{}; CondVar full_cv_{}; CondVar empty_cv_{}; diff --git a/src/executor/operator/physical_sink.cpp b/src/executor/operator/physical_sink.cpp index b60a06c560..3ab60fe5bd 100644 --- a/src/executor/operator/physical_sink.cpp +++ b/src/executor/operator/physical_sink.cpp @@ -115,7 +115,7 @@ void PhysicalSink::FillSinkStateFromLastOperatorState(MaterializeSinkState *mate case PhysicalOperatorType::kSort: { SortOperatorState *sort_output_state = static_cast(task_op_state); if (sort_output_state->data_block_array_.empty()) { - if(materialize_sink_state->Error()) { + if (materialize_sink_state->Error()) { materialize_sink_state->empty_result_ = true; } else { Error("Empty sort output"); @@ -131,7 +131,7 @@ void PhysicalSink::FillSinkStateFromLastOperatorState(MaterializeSinkState *mate case PhysicalOperatorType::kAggregate: { AggregateOperatorState *agg_output_state = static_cast(task_op_state); if (agg_output_state->data_block_array_.empty()) { - if(materialize_sink_state->Error()) { + if (materialize_sink_state->Error()) { materialize_sink_state->empty_result_ = true; } else { Error("Empty agg output"); @@ -378,7 +378,12 @@ void PhysicalSink::FillSinkStateFromLastOperatorState(QueueSinkState *queue_sink } for (const auto &next_fragment_queue : queue_sink_state->fragment_data_queues_) { - next_fragment_queue->Enqueue(fragment_data); + // when the Enqueue returns false, + // it means that the downstream has collected enough data, + // preventing the Queue from Enqueue in data again to avoid redundant calculations. + if (!next_fragment_queue->Enqueue(fragment_data)) { + task_operator_state->SetComplete(); + } } } task_operator_state->data_block_array_.clear(); diff --git a/src/executor/operator_state.cpp b/src/executor/operator_state.cpp index ebb4e60925..7f8967dabd 100644 --- a/src/executor/operator_state.cpp +++ b/src/executor/operator_state.cpp @@ -106,7 +106,12 @@ bool QueueSourceState::GetData() { auto *fragment_data = static_cast(fragment_data_base.get()); MergeLimitOperatorState *limit_op_state = (MergeLimitOperatorState *)next_op_state; limit_op_state->input_data_blocks_.push_back(Move(fragment_data->data_block_)); - limit_op_state->input_complete_ = completed; + if (!limit_op_state->input_complete_) { + limit_op_state->input_complete_ = completed; + } + if (limit_op_state->input_complete_) { + source_queue_.NotAllowEnqueue(); + } break; } default: {