Skip to content

Commit

Permalink
perf: Implement MergeLimit to end the upstream operator early
Browse files Browse the repository at this point in the history
  • Loading branch information
KKould committed Dec 27, 2023
1 parent d2dc462 commit f1356d3
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 6 deletions.
19 changes: 17 additions & 2 deletions src/common/blocking_queue.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,32 @@ class BlockingQueue {
public:
explicit BlockingQueue(SizeT capacity = DEFAULT_BLOCKING_QUEUE_SIZE) : capacity_(capacity) {}

void Enqueue(T& task) {
void NotAllowEnqueue() {
allow_enqueue_ = false;
}

bool Enqueue(T& task) {
UniqueLock<Mutex> lock(queue_mutex_);

if (!allow_enqueue_) {
return false;
}
full_cv_.wait(lock, [this] { return queue_.size() < capacity_; });
queue_.push_back(task);
empty_cv_.notify_one();
return true;
}

void Enqueue(T&& task) {
bool Enqueue(T&& task) {
UniqueLock<Mutex> lock(queue_mutex_);

if (!allow_enqueue_) {
return false;
}
full_cv_.wait(lock, [this] { return queue_.size() < capacity_; });
queue_.push_back(Forward<T>(task));
empty_cv_.notify_one();
return true;
}

void EnqueueBulk(List<T> &input_queue) {
Expand Down Expand Up @@ -99,6 +113,7 @@ public:
}

protected:
atomic_bool allow_enqueue_{true};
mutable Mutex queue_mutex_{};
CondVar full_cv_{};
CondVar empty_cv_{};
Expand Down
11 changes: 8 additions & 3 deletions src/executor/operator/physical_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ void PhysicalSink::FillSinkStateFromLastOperatorState(MaterializeSinkState *mate
case PhysicalOperatorType::kSort: {
SortOperatorState *sort_output_state = static_cast<SortOperatorState *>(task_op_state);
if (sort_output_state->data_block_array_.empty()) {
if(materialize_sink_state->Error()) {
if (materialize_sink_state->Error()) {
materialize_sink_state->empty_result_ = true;
} else {
Error<ExecutorException>("Empty sort output");
Expand All @@ -131,7 +131,7 @@ void PhysicalSink::FillSinkStateFromLastOperatorState(MaterializeSinkState *mate
case PhysicalOperatorType::kAggregate: {
AggregateOperatorState *agg_output_state = static_cast<AggregateOperatorState *>(task_op_state);
if (agg_output_state->data_block_array_.empty()) {
if(materialize_sink_state->Error()) {
if (materialize_sink_state->Error()) {
materialize_sink_state->empty_result_ = true;
} else {
Error<ExecutorException>("Empty agg output");
Expand Down Expand Up @@ -378,7 +378,12 @@ void PhysicalSink::FillSinkStateFromLastOperatorState(QueueSinkState *queue_sink
}

for (const auto &next_fragment_queue : queue_sink_state->fragment_data_queues_) {
next_fragment_queue->Enqueue(fragment_data);
// when the Enqueue returns false,
// it means that the downstream has collected enough data,
// preventing the Queue from Enqueue in data again to avoid redundant calculations.
if (!next_fragment_queue->Enqueue(fragment_data)) {
task_operator_state->SetComplete();
}
}
}
task_operator_state->data_block_array_.clear();
Expand Down
7 changes: 6 additions & 1 deletion src/executor/operator_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,12 @@ bool QueueSourceState::GetData() {
auto *fragment_data = static_cast<FragmentData *>(fragment_data_base.get());
MergeLimitOperatorState *limit_op_state = (MergeLimitOperatorState *)next_op_state;
limit_op_state->input_data_blocks_.push_back(Move(fragment_data->data_block_));
limit_op_state->input_complete_ = completed;
if (!limit_op_state->input_complete_) {
limit_op_state->input_complete_ = completed;
}
if (limit_op_state->input_complete_) {
source_queue_.NotAllowEnqueue();
}
break;
}
default: {
Expand Down

0 comments on commit f1356d3

Please sign in to comment.