Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement physical_limit.cpp for support limit #391

Merged
merged 7 commits into from
Dec 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions src/common/blocking_queue.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,32 @@ class BlockingQueue {
public:
explicit BlockingQueue(SizeT capacity = DEFAULT_BLOCKING_QUEUE_SIZE) : capacity_(capacity) {}

void Enqueue(T& task) {
void NotAllowEnqueue() {
allow_enqueue_ = false;
}

bool Enqueue(T& task) {
if (!allow_enqueue_) {
return false;
}

UniqueLock<Mutex> lock(queue_mutex_);
full_cv_.wait(lock, [this] { return queue_.size() < capacity_; });
queue_.push_back(task);
empty_cv_.notify_one();
return true;
}

void Enqueue(T&& task) {
bool Enqueue(T&& task) {
if (!allow_enqueue_) {
return false;
}

UniqueLock<Mutex> lock(queue_mutex_);
full_cv_.wait(lock, [this] { return queue_.size() < capacity_; });
queue_.push_back(Forward<T>(task));
empty_cv_.notify_one();
return true;
}

void EnqueueBulk(List<T> &input_queue) {
Expand Down Expand Up @@ -99,6 +113,7 @@ public:
}

protected:
atomic_bool allow_enqueue_{true};
mutable Mutex queue_mutex_{};
CondVar full_cv_{};
CondVar empty_cv_{};
Expand Down
3 changes: 1 addition & 2 deletions src/executor/operator/physical_filter.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,7 @@ public:
inline SharedPtr<Vector<SharedPtr<DataType>>> GetOutputTypes() const final { return left_->GetOutputTypes(); }

SizeT TaskletCount() override {
Error<NotImplementException>("TaskletCount not Implement");
return 0;
return left_->TaskletCount();
}

inline const SharedPtr<BaseExpression> &condition() const { return condition_; }
Expand Down
266 changes: 167 additions & 99 deletions src/executor/operator/physical_limit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,13 @@ module;

import stl;
import txn;
import base_expression;
import default_values;
import load_meta;
import query_context;
import table_def;
import data_table;
import default_values;
import parser;
import physical_operator_type;
import operator_state;
Expand All @@ -34,130 +38,194 @@ module physical_limit;

namespace infinity {

void PhysicalLimit::Init() {}
SizeT AtomicCounter::Offset(SizeT row_count) {
auto success = false;
SizeT result = 0;

bool PhysicalLimit::Execute(QueryContext *query_context, OperatorState *operator_state) {
while (!success) {
i64 current_offset = offset_;
if (current_offset <= 0) {
return 0;
}
i64 last_offset = current_offset - row_count;

#if 0
// output table definition is same as input
input_table_ = left_->output();
Assert<ExecutorException>(input_table_.get() != nullptr, "No input");
if (last_offset > 0) {
success = offset_.compare_exchange_strong(current_offset, last_offset);
result = row_count;
} else {
success = offset_.compare_exchange_strong(current_offset, 0);
result = current_offset;
}
}

Assert<ExecutorException>(limit_expr_->type() == ExpressionType::kValue, "Currently, only support constant limit expression");
return result;
}

i64 limit = (static_pointer_cast<ValueExpression>(limit_expr_))->GetValue().value_.big_int;
Assert<ExecutorException>(limit > 0, "Limit should be larger than 0");
SizeT AtomicCounter::Limit(SizeT row_count) {
auto success = false;
SizeT result = 0;

while (!success) {
i64 current_limit = limit_;
if (current_limit <= 0) {
return 0;
}
i64 last_limit = current_limit - row_count;

if (last_limit > 0) {
success = limit_.compare_exchange_strong(current_limit, last_limit);
result = row_count;
} else {
success = limit_.compare_exchange_strong(current_limit, 0);
result = current_limit;
}
}

return result;
}

bool AtomicCounter::IsLimitOver() {
if (limit_ < 0) {
Error<ExecutorException>("limit is not allowed to be smaller than 0");
}
return limit_ == 0;
}

SizeT UnSyncCounter::Offset(SizeT row_count) {
SizeT result = 0;

if (offset_ <= 0) {
return 0;
}
i64 last_offset = offset_ - row_count;

if (last_offset > 0) {
result = row_count;
offset_ = last_offset;
} else {
result = offset_;
offset_ = 0;
}

return result;
}

SizeT UnSyncCounter::Limit(SizeT row_count) {
SizeT result = 0;

if (limit_ <= 0) {
return 0;
}
i64 last_limit = limit_ - row_count;

if (last_limit > 0) {
result = row_count;
limit_ = last_limit;
} else {
result = limit_;
limit_ = 0;
}

return result;
}

bool UnSyncCounter::IsLimitOver() {
if (limit_ < 0) {
Error<ExecutorException>("limit is not allowed to be smaller than 0");
}
return limit_ == 0;
}

PhysicalLimit::PhysicalLimit(u64 id,
UniquePtr<PhysicalOperator> left,
SharedPtr<BaseExpression> limit_expr,
SharedPtr<BaseExpression> offset_expr,
SharedPtr<Vector<LoadMeta>> load_metas)
: PhysicalOperator(PhysicalOperatorType::kLimit, Move(left), nullptr, id, load_metas), limit_expr_(Move(limit_expr)),
offset_expr_(Move(offset_expr)) {
i64 offset = 0;
i64 limit = (static_pointer_cast<ValueExpression>(limit_expr_))->GetValue().value_.big_int;

if (offset_expr_ != nullptr) {
Assert<ExecutorException>(offset_expr_->type() == ExpressionType::kValue, "Currently, only support constant limit expression");
offset = (static_pointer_cast<ValueExpression>(offset_expr_))->GetValue().value_.big_int;
Assert<ExecutorException>(offset >= 0 && offset < input_table_->row_count(),
"Offset should be larger or equal than 0 and less than row number");
}

output_ = GetLimitOutput(input_table_, limit, offset);
#endif
return true;
counter_ = MakeUnique<UnSyncCounter>(offset, limit);
}

SharedPtr<DataTable> PhysicalLimit::GetLimitOutput(const SharedPtr<DataTable> &input_table, i64 limit, i64 offset) {
SizeT start_block = 0;
SizeT start_row_id = 0;
SizeT end_block = 0;
SizeT end_row_id = 0;
void PhysicalLimit::Init() {}

if (offset == 0) {
if (limit >= (i64)input_table->row_count()) {
return input_table;
} else {
start_block = 0;
start_row_id = 0;
SizeT block_count = input_table->DataBlockCount();
i64 total_row_count = limit;
for (SizeT block_id = 0; block_id < block_count; ++block_id) {
SizeT block_row_count = input_table->GetDataBlockById(block_id)->row_count();
if (total_row_count > (i64)block_row_count) {
total_row_count -= block_row_count;
} else {
end_block = block_id;
end_row_id = total_row_count;
break;
}
}
}
} else {
i64 total_row_count = offset;
SizeT block_count = input_table->DataBlockCount();
SizeT rest_row_count = 0;
for (SizeT block_id = 0; block_id < block_count; ++block_id) {
SizeT block_row_count = input_table->GetDataBlockById(block_id)->row_count();
if (total_row_count >= (i64)block_row_count) {
total_row_count -= block_row_count;
} else {
start_block = block_id;
start_row_id = total_row_count;
rest_row_count = block_row_count - total_row_count;
break;
}
}
// offset limit + offset
// left right
// | a | b | c | d | e | f
bool PhysicalLimit::Execute(QueryContext *query_context,
const Vector<UniquePtr<DataBlock>> &input_blocks,
Vector<UniquePtr<DataBlock>> &output_blocks,
LimitCounter *counter) {
SizeT input_row_count = 0;

for (SizeT block_id = 0; block_id < input_blocks.size(); block_id++) {
input_row_count += input_blocks[block_id]->row_count();
}

total_row_count = limit;
if (total_row_count <= (i64)rest_row_count) {
end_block = start_block;
end_row_id = total_row_count;
} else {
total_row_count -= rest_row_count;
for (SizeT block_id = start_block + 1; block_id < block_count; ++block_id) {
SizeT block_row_count = input_table->GetDataBlockById(block_id)->row_count();
if (total_row_count > (i64)block_row_count) {
total_row_count -= block_row_count;
} else {
end_block = block_id;
end_row_id = total_row_count;
break;
}
}
}
SizeT offset = counter->Offset(input_row_count);
if (offset == input_row_count) {
return true;
}

// Copy from input table to output table
SizeT column_count = input_table->ColumnCount();
Vector<SharedPtr<DataType>> types;
types.reserve(column_count);
Vector<SharedPtr<ColumnDef>> columns;
columns.reserve(column_count);
for (SizeT idx = 0; idx < column_count; ++idx) {
SharedPtr<DataType> col_type = input_table->GetColumnTypeById(idx);
types.emplace_back(col_type);
SizeT limit = counter->Limit(input_row_count - offset);
SizeT block_start_idx = 0;

String col_name = input_table->GetColumnNameById(idx);
for (SizeT block_id = 0; block_id < input_blocks.size(); block_id++) {
if (input_blocks[block_id]->row_count() == 0) {
continue;
}
SizeT max_offset = input_blocks[block_id]->row_count() - 1;

SharedPtr<ColumnDef> col_def = MakeShared<ColumnDef>(idx, col_type, col_name, HashSet<ConstraintType>());
columns.emplace_back(col_def);
if (offset > max_offset) {
offset -= max_offset;
} else {
block_start_idx = block_id;
break;
}
}

SharedPtr<TableDef> table_def = TableDef::Make(MakeShared<String>("default"), MakeShared<String>("limit"), columns);
SharedPtr<DataTable> output_table = DataTable::Make(table_def, TableType::kIntermediate);

const Vector<SharedPtr<DataBlock>> &input_datablocks = input_table->data_blocks_;
for (SizeT block_id = block_start_idx; block_id < input_blocks.size(); block_id++) {
auto &input_block = input_blocks[block_id];
auto row_count = input_block->row_count();
if (row_count == 0) {
continue;
}
auto block = DataBlock::MakeUniquePtr();

for (SizeT block_id = start_block; block_id <= end_block; ++block_id) {
SizeT input_start_offset = start_row_id;
SizeT input_end_offset;
if (end_block == block_id) {
input_end_offset = end_row_id;
block->Init(input_block->types());
if (limit >= row_count) {
block->AppendWith(input_block.get(), offset, row_count);
limit -= row_count;
} else {
// current input block isn't the last one.
input_end_offset = input_datablocks[block_id]->row_count();
block->AppendWith(input_block.get(), offset, limit);
limit = 0;
}
block->Finalize();
output_blocks.push_back(Move(block));
offset = 0;

SharedPtr<DataBlock> output_datablock = DataBlock::Make();
output_datablock->Init(input_datablocks[block_id], input_start_offset, input_end_offset);
output_table->Append(output_datablock);
if (limit == 0) {
break;
}
}

return true;
}

bool PhysicalLimit::Execute(QueryContext *query_context, OperatorState *operator_state) {
auto result = Execute(query_context, operator_state->prev_op_state_->data_block_array_, operator_state->data_block_array_, counter_.get());

start_row_id = 0;
operator_state->prev_op_state_->data_block_array_.clear();
if (counter_->IsLimitOver() || operator_state->prev_op_state_->Complete()) {
operator_state->SetComplete();
}
return output_table;
return result;
}

} // namespace infinity
Loading