Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement physical_limit.cpp for support limit #391

Merged
merged 7 commits into from
Dec 28, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions src/common/blocking_queue.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,32 @@ class BlockingQueue {
public:
explicit BlockingQueue(SizeT capacity = DEFAULT_BLOCKING_QUEUE_SIZE) : capacity_(capacity) {}

void Enqueue(T& task) {
void NotAllowEnqueue() {
allow_enqueue_ = false;
}

bool Enqueue(T& task) {
if (!allow_enqueue_) {
return false;
}

UniqueLock<Mutex> lock(queue_mutex_);
full_cv_.wait(lock, [this] { return queue_.size() < capacity_; });
queue_.push_back(task);
empty_cv_.notify_one();
return true;
}

void Enqueue(T&& task) {
bool Enqueue(T&& task) {
if (!allow_enqueue_) {
return false;
}

UniqueLock<Mutex> lock(queue_mutex_);
full_cv_.wait(lock, [this] { return queue_.size() < capacity_; });
queue_.push_back(Forward<T>(task));
empty_cv_.notify_one();
return true;
}

void EnqueueBulk(List<T> &input_queue) {
Expand Down Expand Up @@ -99,6 +113,7 @@ public:
}

protected:
atomic_bool allow_enqueue_{true};
mutable Mutex queue_mutex_{};
CondVar full_cv_{};
CondVar empty_cv_{};
Expand Down
1 change: 1 addition & 0 deletions src/executor/fragment/plan_fragment.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import stl;
import parser;
import data_table;
import fragment_context;
import operator_state;
import physical_operator;
import physical_source;
import physical_sink;
Expand Down
1 change: 1 addition & 0 deletions src/executor/fragment_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import physical_sink;
import physical_source;
import physical_explain;
import physical_knn_scan;
import operator_state;

import infinity_exception;
import parser;
Expand Down
273 changes: 174 additions & 99 deletions src/executor/operator/physical_limit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,13 @@ module;

import stl;
import txn;
import base_expression;
import default_values;
import load_meta;
import query_context;
import table_def;
import data_table;
import default_values;
import parser;
import physical_operator_type;
import operator_state;
Expand All @@ -34,130 +38,201 @@ module physical_limit;

namespace infinity {

void PhysicalLimit::Init() {}
SizeT AtomicCounter::Offset(SizeT row_count) {
auto success = false;
SizeT result;
KKould marked this conversation as resolved.
Show resolved Hide resolved

bool PhysicalLimit::Execute(QueryContext *query_context, OperatorState *operator_state) {
while (!success) {
i64 current_offset = offset_;
if (current_offset <= 0) {
return 0;
}
i64 last_offset = current_offset - row_count;

#if 0
// output table definition is same as input
input_table_ = left_->output();
Assert<ExecutorException>(input_table_.get() != nullptr, "No input");
if (last_offset > 0) {
success = offset_.compare_exchange_strong(current_offset, last_offset);
result = row_count;
} else {
success = offset_.compare_exchange_strong(current_offset, 0);
result = current_offset;
}
}

return result;
}

Assert<ExecutorException>(limit_expr_->type() == ExpressionType::kValue, "Currently, only support constant limit expression");
SizeT AtomicCounter::Limit(SizeT row_count) {
auto success = false;
SizeT result;

i64 limit = (static_pointer_cast<ValueExpression>(limit_expr_))->GetValue().value_.big_int;
Assert<ExecutorException>(limit > 0, "Limit should be larger than 0");
while (!success) {
i64 current_limit = limit_;
if (current_limit <= 0) {
return 0;
}
i64 last_limit = current_limit - row_count;

if (last_limit > 0) {
success = limit_.compare_exchange_strong(current_limit, last_limit);
result = row_count;
} else {
success = limit_.compare_exchange_strong(current_limit, 0);
result = current_limit;
}
}

return result;
}

bool AtomicCounter::IsLimitOver() {
if (limit_ < 0) {
Error<ExecutorException>("limit is not allowed to be smaller than 0");
}
return limit_ == 0;
}

SizeT UnSyncCounter::Offset(SizeT row_count) {
SizeT result;

if (offset_ <= 0) {
return 0;
}
i64 last_offset = offset_ - row_count;

if (last_offset > 0) {
result = row_count;
offset_ = last_offset;
} else {
result = offset_;
offset_ = 0;
}

return result;
}

SizeT UnSyncCounter::Limit(SizeT row_count) {
SizeT result;

if (limit_ <= 0) {
return 0;
}
i64 last_limit = limit_ - row_count;

if (last_limit > 0) {
result = row_count;
limit_ = last_limit;
} else {
result = limit_;
limit_ = 0;
}

return result;
}

bool UnSyncCounter::IsLimitOver() {
if (limit_ < 0) {
Error<ExecutorException>("limit is not allowed to be smaller than 0");
}
return limit_ == 0;
}

PhysicalLimit::PhysicalLimit(u64 id,
UniquePtr<PhysicalOperator> left,
SharedPtr<BaseExpression> limit_expr,
SharedPtr<BaseExpression> offset_expr,
SharedPtr<Vector<LoadMeta>> load_metas)
: PhysicalOperator(PhysicalOperatorType::kLimit, Move(left), nullptr, id, load_metas), limit_expr_(Move(limit_expr)),
offset_expr_(Move(offset_expr)) {
i64 offset = 0;
i64 limit = (static_pointer_cast<ValueExpression>(limit_expr_))->GetValue().value_.big_int;

if (offset_expr_ != nullptr) {
Assert<ExecutorException>(offset_expr_->type() == ExpressionType::kValue, "Currently, only support constant limit expression");
offset = (static_pointer_cast<ValueExpression>(offset_expr_))->GetValue().value_.big_int;
Assert<ExecutorException>(offset >= 0 && offset < input_table_->row_count(),
"Offset should be larger or equal than 0 and less than row number");
}

output_ = GetLimitOutput(input_table_, limit, offset);
#endif
return true;
counter_ = MakeShared<UnSyncCounter>(offset, limit);
}

SharedPtr<DataTable> PhysicalLimit::GetLimitOutput(const SharedPtr<DataTable> &input_table, i64 limit, i64 offset) {
SizeT start_block = 0;
SizeT start_row_id = 0;
SizeT end_block = 0;
SizeT end_row_id = 0;
void PhysicalLimit::Init() {}

if (offset == 0) {
if (limit >= (i64)input_table->row_count()) {
return input_table;
} else {
start_block = 0;
start_row_id = 0;
SizeT block_count = input_table->DataBlockCount();
i64 total_row_count = limit;
for (SizeT block_id = 0; block_id < block_count; ++block_id) {
SizeT block_row_count = input_table->GetDataBlockById(block_id)->row_count();
if (total_row_count > (i64)block_row_count) {
total_row_count -= block_row_count;
} else {
end_block = block_id;
end_row_id = total_row_count;
break;
}
}
}
} else {
i64 total_row_count = offset;
SizeT block_count = input_table->DataBlockCount();
SizeT rest_row_count = 0;
for (SizeT block_id = 0; block_id < block_count; ++block_id) {
SizeT block_row_count = input_table->GetDataBlockById(block_id)->row_count();
if (total_row_count >= (i64)block_row_count) {
total_row_count -= block_row_count;
} else {
start_block = block_id;
start_row_id = total_row_count;
rest_row_count = block_row_count - total_row_count;
break;
}
}
SizeT PhysicalLimit::TaskletCount() {
i64 limit = (static_pointer_cast<ValueExpression>(limit_expr_))->GetValue().value_.big_int;
KKould marked this conversation as resolved.
Show resolved Hide resolved

total_row_count = limit;
if (total_row_count <= (i64)rest_row_count) {
end_block = start_block;
end_row_id = total_row_count;
} else {
total_row_count -= rest_row_count;
for (SizeT block_id = start_block + 1; block_id < block_count; ++block_id) {
SizeT block_row_count = input_table->GetDataBlockById(block_id)->row_count();
if (total_row_count > (i64)block_row_count) {
total_row_count -= block_row_count;
} else {
end_block = block_id;
end_row_id = total_row_count;
break;
}
}
}
return limit / DEFAULT_BLOCK_CAPACITY;
}

// offset limit + offset
// left right
// | a | b | c | d | e | f
bool PhysicalLimit::Execute(QueryContext *query_context,
const Vector<UniquePtr<DataBlock>> &input_blocks,
Vector<UniquePtr<DataBlock>> &output_blocks,
SharedPtr<LimitCounter> counter) {
SizeT input_row_count = 0;

for (SizeT block_id = 0; block_id < input_blocks.size(); block_id++) {
input_row_count += input_blocks[block_id]->row_count();
}

// Copy from input table to output table
SizeT column_count = input_table->ColumnCount();
Vector<SharedPtr<DataType>> types;
types.reserve(column_count);
Vector<SharedPtr<ColumnDef>> columns;
columns.reserve(column_count);
for (SizeT idx = 0; idx < column_count; ++idx) {
SharedPtr<DataType> col_type = input_table->GetColumnTypeById(idx);
types.emplace_back(col_type);
SizeT offset = counter->Offset(input_row_count);
if (offset == input_row_count) {
return true;
}

String col_name = input_table->GetColumnNameById(idx);
SizeT limit = counter->Limit(input_row_count - offset);
SizeT block_start_idx = 0;

SharedPtr<ColumnDef> col_def = MakeShared<ColumnDef>(idx, col_type, col_name, HashSet<ConstraintType>());
columns.emplace_back(col_def);
}
for (SizeT block_id = 0; block_id < input_blocks.size(); block_id++) {
if (input_blocks[block_id]->row_count() == 0) {
continue;
}
SizeT max_offset = input_blocks[block_id]->row_count() - 1;

SharedPtr<TableDef> table_def = TableDef::Make(MakeShared<String>("default"), MakeShared<String>("limit"), columns);
SharedPtr<DataTable> output_table = DataTable::Make(table_def, TableType::kIntermediate);
if (offset > max_offset) {
offset -= max_offset;
continue;
KKould marked this conversation as resolved.
Show resolved Hide resolved
} else {
block_start_idx = block_id;
break;
}
}

const Vector<SharedPtr<DataBlock>> &input_datablocks = input_table->data_blocks_;
for (SizeT block_id = block_start_idx; block_id < input_blocks.size(); block_id++) {
auto &input_block = input_blocks[block_id];
auto row_count = input_block->row_count();
if (row_count == 0) {
continue;
}
auto block = DataBlock::MakeUniquePtr();

for (SizeT block_id = start_block; block_id <= end_block; ++block_id) {
SizeT input_start_offset = start_row_id;
SizeT input_end_offset;
if (end_block == block_id) {
input_end_offset = end_row_id;
block->Init(input_block->types());
if (limit >= row_count) {
block->AppendWith(input_block.get(), offset, row_count);
limit -= row_count;
} else {
// current input block isn't the last one.
input_end_offset = input_datablocks[block_id]->row_count();
block->AppendWith(input_block.get(), offset, limit);
limit = 0;
}
block->Finalize();
output_blocks.push_back(Move(block));
offset = 0;

if (limit == 0) {
break;
}
}

SharedPtr<DataBlock> output_datablock = DataBlock::Make();
output_datablock->Init(input_datablocks[block_id], input_start_offset, input_end_offset);
output_table->Append(output_datablock);
return true;
}

bool PhysicalLimit::Execute(QueryContext *query_context, OperatorState *operator_state) {
auto result = Execute(query_context, operator_state->prev_op_state_->data_block_array_, operator_state->data_block_array_, counter_);

start_row_id = 0;
operator_state->prev_op_state_->data_block_array_.clear();
if (counter_->IsLimitOver() || operator_state->prev_op_state_->Complete()) {
operator_state->SetComplete();
}
return output_table;
return result;
}

} // namespace infinity
Loading
Loading