Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove offset from table func #4648

Merged
merged 2 commits into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions src/binder/bind/bind_table_function.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,7 @@ BoundTableFunction Binder::bindTableFunc(std::string tableFuncName,
bindInput.binder = this;
auto bindData = tableFunc->bindFunc(clientContext, &bindInput);
columns = bindData->columns;
auto offset = expressionBinder.createVariableExpression(LogicalType::INT64(),
std::string(InternalKeyword::ROW_OFFSET));
return BoundTableFunction{tableFunc->copy(), std::move(bindData), std::move(offset)};
return BoundTableFunction{tableFunc->copy(), std::move(bindData)};
}

} // namespace binder
Expand Down
9 changes: 4 additions & 5 deletions src/binder/bind/copy/bind_copy_from.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,8 @@ std::unique_ptr<BoundStatement> Binder::bindCopyNodeFrom(const Statement& statem
evaluateTypes.push_back(evaluateType);
}
columns.insert(columns.end(), warningDataExprs.begin(), warningDataExprs.end());
// TODO(Guodong): Should remove this expression.
auto offset = expressionBinder.createVariableExpression(LogicalType::INT64(),
std::string(InternalKeyword::ANONYMOUS));
auto offset =
createInvisibleVariable(std::string(InternalKeyword::ROW_OFFSET), LogicalType::INT64());
auto boundCopyFromInfo = BoundCopyFromInfo(nodeTableEntry, std::move(boundSource),
std::move(offset), std::move(columns), std::move(evaluateTypes), nullptr /* extraInfo */);
return std::make_unique<BoundCopyFrom>(std::move(boundCopyFromInfo));
Expand All @@ -124,8 +123,8 @@ std::unique_ptr<BoundStatement> Binder::bindCopyRelFrom(const parser::Statement&
copyStatement.getParsingOptionsRef(), expectedColumnNames, expectedColumnTypes);
expression_vector warningDataExprs = boundSource->getWarningColumns();
auto columns = boundSource->getColumns();
auto offset = expressionBinder.createVariableExpression(LogicalType::INT64(),
std::string(InternalKeyword::ROW_OFFSET));
auto offset =
createInvisibleVariable(std::string(InternalKeyword::ROW_OFFSET), LogicalType::INT64());
auto srcTableID = relTableEntry->getSrcTableID();
auto dstTableID = relTableEntry->getDstTableID();
auto srcKey = columns[0];
Expand Down
8 changes: 4 additions & 4 deletions src/include/binder/bound_standalone_call_function.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,18 @@ namespace kuzu {
namespace binder {

class BoundStandaloneCallFunction : public BoundStatement {
static constexpr common::StatementType statementType =
common::StatementType::STANDALONE_CALL_FUNCTION;

public:
explicit BoundStandaloneCallFunction(BoundTableFunction tableFunc)
: BoundStatement{common::StatementType::STANDALONE_CALL_FUNCTION,
BoundStatementResult::createEmptyResult()},
: BoundStatement{statementType, BoundStatementResult::createEmptyResult()},
tableFunc{std::move(tableFunc)} {}

const function::TableFunction& getTableFunction() const { return *tableFunc.tableFunction; }

function::TableFuncBindData* getBindData() const { return tableFunc.bindData.get(); }

std::shared_ptr<Expression> getOffset() const { return tableFunc.offset; }

private:
BoundTableFunction tableFunc;
};
Expand Down
1 change: 0 additions & 1 deletion src/include/binder/bound_table_function.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ namespace binder {
struct BoundTableFunction {
std::unique_ptr<function::TableFunction> tableFunction;
std::unique_ptr<function::TableFuncBindData> bindData;
std::shared_ptr<binder::Expression> offset;
};

} // namespace binder
Expand Down
4 changes: 2 additions & 2 deletions src/include/binder/copy/bound_copy_from.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ struct ExtraBoundCopyFromInfo {
struct BoundCopyFromInfo {
// Table entry to copy into.
catalog::TableCatalogEntry* tableEntry;
// Data source
// Data source.
std::unique_ptr<BoundBaseScanSource> source;
// Row offset of input data to generate internal ID.
// Row offset.
std::shared_ptr<Expression> offset;
andyfengHKU marked this conversation as resolved.
Show resolved Hide resolved
expression_vector columnExprs;
std::vector<common::ColumnEvaluateType> columnEvaluateTypes;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ class BoundTableFunctionCall : public BoundReadingClause {

const function::TableFunction& getTableFunc() const { return *tableFunc.tableFunction; }
const function::TableFuncBindData* getBindData() const { return tableFunc.bindData.get(); }
std::shared_ptr<Expression> getOffset() const { return tableFunc.offset; }
expression_vector getColumns() const { return columns; }

private:
Expand Down
17 changes: 6 additions & 11 deletions src/include/planner/operator/logical_accumulate.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@ namespace kuzu {
namespace planner {

class LogicalAccumulate final : public LogicalOperator {
static constexpr LogicalOperatorType type = LogicalOperatorType::ACCUMULATE;

public:
LogicalAccumulate(common::AccumulateType accumulateType, binder::expression_vector flatExprs,
std::shared_ptr<binder::Expression> offset, std::shared_ptr<binder::Expression> mark,
std::shared_ptr<LogicalOperator> child)
: LogicalOperator{LogicalOperatorType::ACCUMULATE, std::move(child)},
accumulateType{accumulateType}, flatExprs{std::move(flatExprs)},
offset{std::move(offset)}, mark{std::move(mark)} {}
std::shared_ptr<binder::Expression> mark, std::shared_ptr<LogicalOperator> child)
: LogicalOperator{type, std::move(child)}, accumulateType{accumulateType},
flatExprs{std::move(flatExprs)}, mark{std::move(mark)} {}

void computeFactorizedSchema() override;
void computeFlatSchema() override;
Expand All @@ -26,21 +26,16 @@ class LogicalAccumulate final : public LogicalOperator {
binder::expression_vector getPayloads() const {
return children[0]->getSchema()->getExpressionsInScope();
}
std::shared_ptr<binder::Expression> getOffset() const { return offset; }
bool hasMark() const { return mark != nullptr; }
std::shared_ptr<binder::Expression> getMark() const { return mark; }

std::unique_ptr<LogicalOperator> copy() override {
return make_unique<LogicalAccumulate>(accumulateType, flatExprs, offset, mark,
children[0]->copy());
return make_unique<LogicalAccumulate>(accumulateType, flatExprs, mark, children[0]->copy());
}

private:
common::AccumulateType accumulateType;
binder::expression_vector flatExprs;
// Accumulate may be used as a source operator for COPY pipeline. In such case, row offset needs
// to be provided in order to generate internal ID.
std::shared_ptr<binder::Expression> offset;
// Accumulate may be used for optional match, e.g. OPTIONAL MATCH (a). In such case, we use
// mark to determine if at least one pattern is found.
std::shared_ptr<binder::Expression> mark;
Expand Down
6 changes: 4 additions & 2 deletions src/include/planner/operator/logical_partitioner.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,13 @@ struct LogicalPartitionerInfo {
};

class LogicalPartitioner final : public LogicalOperator {
static constexpr LogicalOperatorType type = LogicalOperatorType::PARTITIONER;

public:
LogicalPartitioner(LogicalPartitionerInfo info, binder::BoundCopyFromInfo copyFromInfo,
std::shared_ptr<LogicalOperator> child)
: LogicalOperator{LogicalOperatorType::PARTITIONER, std::move(child)},
info{std::move(info)}, copyFromInfo{std::move(copyFromInfo)} {}
: LogicalOperator{type, std::move(child)}, info{std::move(info)},
copyFromInfo{std::move(copyFromInfo)} {}

void computeFactorizedSchema() override;
void computeFlatSchema() override;
Expand Down
11 changes: 3 additions & 8 deletions src/include/planner/operator/logical_table_function_call.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,9 @@ class LogicalTableFunctionCall : public LogicalOperator {

public:
LogicalTableFunctionCall(function::TableFunction tableFunc,
std::unique_ptr<function::TableFuncBindData> bindData, binder::expression_vector columns,
std::shared_ptr<binder::Expression> offset)
std::unique_ptr<function::TableFuncBindData> bindData, binder::expression_vector columns)
: LogicalOperator{operatorType_}, tableFunc{tableFunc}, bindData{std::move(bindData)},
columns{std::move(columns)}, offset{std::move(offset)} {
columns{std::move(columns)} {
cardinality = this->bindData->cardinality;
}

Expand All @@ -34,20 +33,16 @@ class LogicalTableFunctionCall : public LogicalOperator {

binder::expression_vector getColumns() const { return columns; }

std::shared_ptr<binder::Expression> getOffset() const { return offset; }

std::string getExpressionsForPrinting() const override { return tableFunc.name; }

std::unique_ptr<LogicalOperator> copy() override {
return std::make_unique<LogicalTableFunctionCall>(tableFunc, bindData->copy(), columns,
offset);
return std::make_unique<LogicalTableFunctionCall>(tableFunc, bindData->copy(), columns);
}

private:
function::TableFunction tableFunc;
std::unique_ptr<function::TableFuncBindData> bindData;
binder::expression_vector columns;
std::shared_ptr<binder::Expression> offset;
};

} // namespace planner
Expand Down
11 changes: 4 additions & 7 deletions src/include/planner/planner.h
Original file line number Diff line number Diff line change
Expand Up @@ -297,15 +297,14 @@ class Planner {
void tryAppendAccumulate(LogicalPlan& plan);
// Accumulate everything.
void appendAccumulate(LogicalPlan& plan);
// Accumulate everything. Append
// Accumulate everything. Append mark.
void appendOptionalAccumulate(std::shared_ptr<binder::Expression> mark, LogicalPlan& plan);
// Append accumulate with a set of expressions being flattened first.
void appendAccumulate(const binder::expression_vector& flatExprs, LogicalPlan& plan);
// Append accumulate with a set of expressions being flattened first.
// Additionally, scan table with row offset.
// Append accumulate with a set of expressions being flattened first. Append mark.
void appendAccumulate(common::AccumulateType accumulateType,
const binder::expression_vector& flatExprs, std::shared_ptr<binder::Expression> offset,
std::shared_ptr<binder::Expression> mark, LogicalPlan& plan);
const binder::expression_vector& flatExprs, std::shared_ptr<binder::Expression> mark,
LogicalPlan& plan);

void appendDummyScan(LogicalPlan& plan);

Expand All @@ -318,8 +317,6 @@ class Planner {
void appendFilter(const std::shared_ptr<binder::Expression>& predicate, LogicalPlan& plan);

void appendTableFunctionCall(const binder::BoundTableScanSourceInfo& info, LogicalPlan& plan);
void appendTableFunctionCall(const binder::BoundTableScanSourceInfo& info,
std::shared_ptr<binder::Expression> offset, LogicalPlan& plan);

void appendDistinct(const binder::expression_vector& keys, LogicalPlan& plan);

Expand Down
2 changes: 0 additions & 2 deletions src/include/processor/operator/table_function_call.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ struct TableFunctionCallInfo {
function::TableFunction function{};
std::unique_ptr<function::TableFuncBindData> bindData;
std::vector<DataPos> outPosV;
DataPos rowOffsetPos;
TableScanOutputType outputType = TableScanOutputType::EMPTY;

TableFunctionCallInfo() = default;
Expand All @@ -42,7 +41,6 @@ struct TableFunctionCallInfo {
function = other.function;
bindData = other.bindData->copy();
outPosV = other.outPosV;
rowOffsetPos = other.rowOffsetPos;
outputType = other.outputType;
}
};
Expand Down
15 changes: 1 addition & 14 deletions src/include/processor/plan_mapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,17 +124,11 @@ class PlanMapper {
const binder::expression_vector& expressions, planner::Schema* schema,
std::unique_ptr<PhysicalOperator> prevOperator);

// Scan fTable with row offset.
std::unique_ptr<PhysicalOperator> createFTableScan(const binder::expression_vector& exprs,
std::vector<ft_col_idx_t> colIndices, std::shared_ptr<binder::Expression> offset,
planner::Schema* schema, std::shared_ptr<FactorizedTable> table, uint64_t maxMorselSize,
physical_op_vector_t children);
// Scan fTable without row offset.
// Scan fTable
std::unique_ptr<PhysicalOperator> createFTableScan(const binder::expression_vector& exprs,
std::vector<ft_col_idx_t> colIndices, planner::Schema* schema,
std::shared_ptr<FactorizedTable> table, uint64_t maxMorselSize,
physical_op_vector_t children);
// Scan fTable without row offset.
// Scan is the leaf operator of physical plan.
std::unique_ptr<PhysicalOperator> createFTableScan(const binder::expression_vector& exprs,
std::vector<ft_col_idx_t> colIndices, planner::Schema* schema,
Expand All @@ -149,13 +143,6 @@ class PlanMapper {
std::unique_ptr<PhysicalOperator> createEmptyFTableScan(std::shared_ptr<FactorizedTable> table,
uint64_t maxMorselSize);
// Assume scans all columns of table in the same order as given expressions.
// Scan fTable with row offset.
std::unique_ptr<PhysicalOperator> createFTableScanAligned(
const binder::expression_vector& exprs, planner::Schema* schema,
std::shared_ptr<binder::Expression> offset, std::shared_ptr<FactorizedTable> table,
uint64_t maxMorselSize, physical_op_vector_t children);
// Assume scans all columns of table in the same order as given expressions.
// Scan fTable without row offset.
std::unique_ptr<PhysicalOperator> createFTableScanAligned(
const binder::expression_vector& exprs, planner::Schema* schema,
std::shared_ptr<FactorizedTable> table, uint64_t maxMorselSize,
Expand Down
2 changes: 1 addition & 1 deletion src/optimizer/acc_hash_join_optimizer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ namespace optimizer {

static std::shared_ptr<LogicalOperator> appendAccumulate(std::shared_ptr<LogicalOperator> child) {
auto accumulate = std::make_shared<LogicalAccumulate>(AccumulateType::REGULAR,
expression_vector{}, nullptr /* offset */, nullptr /* mark */, std::move(child));
expression_vector{}, nullptr /* mark */, std::move(child));
accumulate->computeFlatSchema();
return accumulate;
}
Expand Down
9 changes: 0 additions & 9 deletions src/planner/operator/logical_accumulate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,6 @@ void LogicalAccumulate::computeFactorizedSchema() {
createEmptySchema();
auto childSchema = children[0]->getSchema();
SinkOperatorUtil::recomputeSchema(*childSchema, getPayloads(), *schema);
if (offset != nullptr) {
// If we need to generate row offset. Then all expressions must have been flattened and
// accumulated. So the new schema should just have one group.
KU_ASSERT(schema->getNumGroups() == 1);
schema->insertToGroupAndScope(offset, 0);
}
if (mark != nullptr) {
auto groupPos = schema->createGroup();
schema->setGroupAsSingleState(groupPos);
Expand All @@ -25,9 +19,6 @@ void LogicalAccumulate::computeFactorizedSchema() {

void LogicalAccumulate::computeFlatSchema() {
copyChildSchema(0);
if (offset != nullptr) {
schema->insertToGroupAndScope(offset, 0);
}
if (mark != nullptr) {
schema->insertToGroupAndScope(mark, 0);
}
Expand Down
16 changes: 16 additions & 0 deletions src/planner/operator/logical_partitioner.cpp
Original file line number Diff line number Diff line change
@@ -1,16 +1,32 @@
#include "planner/operator/logical_partitioner.h"

#include "binder/expression/expression_util.h"
#include "common/exception/runtime.h"

namespace kuzu {
namespace planner {

static void validateSingleGroup(const Schema& schema) {
if (schema.getNumGroups() != 1) {
throw common::RuntimeException(
"Try to partition multiple factorization group. This should not happen.");
}
}

void LogicalPartitioner::computeFactorizedSchema() {
copyChildSchema(0);
// LCOV_EXCL_START
validateSingleGroup(*schema);
// LCOV_EXCL_STOP
schema->insertToGroupAndScope(info.offset, 0);
}

void LogicalPartitioner::computeFlatSchema() {
copyChildSchema(0);
// LCOV_EXCL_START
validateSingleGroup(*schema);
// LCOV_EXCL_STOP
schema->insertToGroupAndScope(info.offset, 0);
}

std::string LogicalPartitioner::getExpressionsForPrinting() const {
Expand Down
6 changes: 0 additions & 6 deletions src/planner/operator/logical_table_function_call.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,6 @@ void LogicalTableFunctionCall::computeFlatSchema() {
for (auto& expr : columns) {
schema->insertToGroupAndScope(expr, groupPos);
}
if (offset != nullptr) {
schema->insertToGroupAndScope(offset, groupPos);
}
}

void LogicalTableFunctionCall::computeFactorizedSchema() {
Expand All @@ -20,9 +17,6 @@ void LogicalTableFunctionCall::computeFactorizedSchema() {
for (auto& expr : columns) {
schema->insertToGroupAndScope(expr, groupPos);
}
if (offset != nullptr) {
schema->insertToGroupAndScope(offset, groupPos);
}
}

} // namespace planner
Expand Down
15 changes: 6 additions & 9 deletions src/planner/plan/append_accumulate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,21 @@ void Planner::tryAppendAccumulate(LogicalPlan& plan) {
}

void Planner::appendAccumulate(LogicalPlan& plan) {
appendAccumulate(AccumulateType::REGULAR, expression_vector{}, nullptr /* offset */,
nullptr /* mark */, plan);
appendAccumulate(AccumulateType::REGULAR, expression_vector{}, nullptr /* mark */, plan);
}

void Planner::appendOptionalAccumulate(std::shared_ptr<Expression> mark, LogicalPlan& plan) {
appendAccumulate(AccumulateType::OPTIONAL_, expression_vector{}, nullptr /* offset */, mark,
plan);
appendAccumulate(AccumulateType::OPTIONAL_, expression_vector{}, mark, plan);
}

void Planner::appendAccumulate(const expression_vector& flatExprs, LogicalPlan& plan) {
appendAccumulate(AccumulateType::REGULAR, flatExprs, nullptr /* offset */, nullptr /* mark */,
plan);
appendAccumulate(AccumulateType::REGULAR, flatExprs, nullptr /* mark */, plan);
}

void Planner::appendAccumulate(AccumulateType accumulateType, const expression_vector& flatExprs,
std::shared_ptr<Expression> offset, std::shared_ptr<Expression> mark, LogicalPlan& plan) {
auto op = make_shared<LogicalAccumulate>(accumulateType, flatExprs, offset, mark,
plan.getLastOperator());
std::shared_ptr<Expression> mark, LogicalPlan& plan) {
auto op =
make_shared<LogicalAccumulate>(accumulateType, flatExprs, mark, plan.getLastOperator());
appendFlattens(op->getGroupPositionsToFlatten(), plan);
op->setChild(0, plan.getLastOperator());
op->computeFactorizedSchema();
Expand Down
3 changes: 1 addition & 2 deletions src/planner/plan/append_simple.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,7 @@ void Planner::appendStandaloneCallFunction(const BoundStatement& statement, Logi
auto& standaloneCallFunctionClause = statement.constCast<BoundStandaloneCallFunction>();
auto op =
std::make_shared<LogicalTableFunctionCall>(standaloneCallFunctionClause.getTableFunction(),
standaloneCallFunctionClause.getBindData()->copy(), binder::expression_vector{},
standaloneCallFunctionClause.getOffset());
standaloneCallFunctionClause.getBindData()->copy(), binder::expression_vector{});
plan.setLastOperator(std::move(op));
}

Expand Down
Loading