diff --git a/src/include/common/data_chunk/sel_vector.h b/src/include/common/data_chunk/sel_vector.h index 29ca58c47d2..574e6d01730 100644 --- a/src/include/common/data_chunk/sel_vector.h +++ b/src/include/common/data_chunk/sel_vector.h @@ -24,7 +24,6 @@ class SelectionVector { enum class State { DYNAMIC, STATIC, - STATIC_FILTERED, }; public: @@ -50,9 +49,12 @@ class SelectionVector { } void setRange(sel_t startPos, sel_t size) { KU_ASSERT(startPos + size <= capacity); - selectedPositions = const_cast(INCREMENTAL_SELECTED_POS.data()) + startPos; + selectedPositions = selectedPositionsBuffer.get(); + for (auto i = 0u; i < size; ++i) { + selectedPositions[i] = startPos + i; + } selectedSize = size; - state = State::STATIC_FILTERED; + state = State::DYNAMIC; } // Set to filtered is not very accurate. It sets selectedPositions to a mutable array. diff --git a/src/include/common/string_utils.h b/src/include/common/string_utils.h index 11422884763..2363d36ad77 100644 --- a/src/include/common/string_utils.h +++ b/src/include/common/string_utils.h @@ -60,7 +60,7 @@ class KUZU_API StringUtils { } static std::string_view rtrim(std::string_view input) { auto end = input.size(); - while (end > 0 && isspace(input[end - 1])) { + while (end > 0 && isSpace(input[end - 1])) { end--; } return input.substr(0, end); diff --git a/src/include/planner/join_order/cost_model.h b/src/include/planner/join_order/cost_model.h index 94e6ba19492..fb56d0dd370 100644 --- a/src/include/planner/join_order/cost_model.h +++ b/src/include/planner/join_order/cost_model.h @@ -10,8 +10,12 @@ class CostModel { static uint64_t computeExtendCost(const LogicalPlan& childPlan); static uint64_t computeRecursiveExtendCost(uint8_t upperBound, double extensionRate, const LogicalPlan& childPlan); + static uint64_t computeHashJoinCost(const std::vector& joinConditions, + const LogicalPlan& probe, const LogicalPlan& build); static uint64_t computeHashJoinCost(const binder::expression_vector& joinNodeIDs, const LogicalPlan& probe, const LogicalPlan& build); + static uint64_t computeMarkJoinCost(const std::vector& joinConditions, + const LogicalPlan& probe, const LogicalPlan& build); static uint64_t computeMarkJoinCost(const binder::expression_vector& joinNodeIDs, const LogicalPlan& probe, const LogicalPlan& build); static uint64_t computeIntersectCost(const LogicalPlan& probePlan, diff --git a/src/include/planner/planner.h b/src/include/planner/planner.h index b37059c7d2e..c3d6c4499d6 100644 --- a/src/include/planner/planner.h +++ b/src/include/planner/planner.h @@ -270,12 +270,18 @@ class Planner { void appendHashJoin(const binder::expression_vector& joinNodeIDs, common::JoinType joinType, std::shared_ptr mark, LogicalPlan& probePlan, LogicalPlan& buildPlan, LogicalPlan& resultPlan); - void appendAccHashJoin(const binder::expression_vector& joinNodeIDs, common::JoinType joinType, - std::shared_ptr mark, LogicalPlan& probePlan, LogicalPlan& buildPlan, - LogicalPlan& resultPlan); + void appendHashJoin(const std::vector& joinConditions, + common::JoinType joinType, std::shared_ptr mark, LogicalPlan& probePlan, + LogicalPlan& buildPlan, LogicalPlan& resultPlan); + void appendAccHashJoin(const std::vector& joinConditions, + common::JoinType joinType, std::shared_ptr mark, LogicalPlan& probePlan, + LogicalPlan& buildPlan, LogicalPlan& resultPlan); void appendMarkJoin(const binder::expression_vector& joinNodeIDs, const std::shared_ptr& mark, LogicalPlan& probePlan, - LogicalPlan& buildPlan); + LogicalPlan& buildPlan, LogicalPlan& resultPlan); + void appendMarkJoin(const std::vector& joinConditions, + const std::shared_ptr& mark, LogicalPlan& probePlan, + LogicalPlan& buildPlan, LogicalPlan& resultPlan); void appendIntersect(const std::shared_ptr& intersectNodeID, binder::expression_vector& boundNodeIDs, LogicalPlan& probePlan, std::vector>& buildPlans); diff --git a/src/planner/join_order/cost_model.cpp b/src/planner/join_order/cost_model.cpp index 89336d07925..aa49ef4743a 100644 --- a/src/planner/join_order/cost_model.cpp +++ b/src/planner/join_order/cost_model.cpp @@ -18,6 +18,23 @@ uint64_t CostModel::computeRecursiveExtendCost(uint8_t upperBound, double extens upperBound; } +binder::expression_vector getJoinNodeIDs( + const std::vector& joinConditions) { + binder::expression_vector joinNodeIDs; + for (auto& [left, _] : joinConditions) { + if (left->expressionType == ExpressionType::PROPERTY && + left->getDataType().getLogicalTypeID() == LogicalTypeID::INTERNAL_ID) { + joinNodeIDs.push_back(left); + } + } + return joinNodeIDs; +} + +uint64_t CostModel::computeHashJoinCost(const std::vector& joinConditions, + const LogicalPlan& probe, const LogicalPlan& build) { + return computeHashJoinCost(getJoinNodeIDs(joinConditions), probe, build); +} + uint64_t CostModel::computeHashJoinCost(const binder::expression_vector& joinNodeIDs, const LogicalPlan& probe, const LogicalPlan& build) { uint64_t cost = 0ul; @@ -29,6 +46,11 @@ uint64_t CostModel::computeHashJoinCost(const binder::expression_vector& joinNod return cost; } +uint64_t CostModel::computeMarkJoinCost(const std::vector& joinConditions, + const LogicalPlan& probe, const LogicalPlan& build) { + return computeMarkJoinCost(getJoinNodeIDs(joinConditions), probe, build); +} + uint64_t CostModel::computeMarkJoinCost(const binder::expression_vector& joinNodeIDs, const LogicalPlan& probe, const LogicalPlan& build) { return computeHashJoinCost(joinNodeIDs, probe, build); diff --git a/src/planner/plan/append_join.cpp b/src/planner/plan/append_join.cpp index eb2c11b607c..446ae673407 100644 --- a/src/planner/plan/append_join.cpp +++ b/src/planner/plan/append_join.cpp @@ -21,6 +21,12 @@ void Planner::appendHashJoin(const expression_vector& joinNodeIDs, JoinType join for (auto& joinNodeID : joinNodeIDs) { joinConditions.emplace_back(joinNodeID, joinNodeID); } + appendHashJoin(joinConditions, joinType, mark, probePlan, buildPlan, resultPlan); +} + +void Planner::appendHashJoin(const std::vector& joinConditions, JoinType joinType, + std::shared_ptr mark, LogicalPlan& probePlan, LogicalPlan& buildPlan, + LogicalPlan& resultPlan) { auto hashJoin = make_shared(joinConditions, joinType, mark, probePlan.getLastOperator(), buildPlan.getLastOperator()); // Apply flattening to probe side @@ -38,26 +44,33 @@ void Planner::appendHashJoin(const expression_vector& joinNodeIDs, JoinType join // Update cost hashJoin->setCardinality(cardinalityEstimator.estimateHashJoin(joinConditions, probePlan.getLastOperatorRef(), buildPlan.getLastOperatorRef())); - resultPlan.setCost(CostModel::computeHashJoinCost(joinNodeIDs, probePlan, buildPlan)); + resultPlan.setCost(CostModel::computeHashJoinCost(joinConditions, probePlan, buildPlan)); resultPlan.setLastOperator(std::move(hashJoin)); } -void Planner::appendAccHashJoin(const expression_vector& joinNodeIDs, JoinType joinType, - std::shared_ptr mark, LogicalPlan& probePlan, LogicalPlan& buildPlan, - LogicalPlan& resultPlan) { +void Planner::appendAccHashJoin(const std::vector& joinConditions, + JoinType joinType, std::shared_ptr mark, LogicalPlan& probePlan, + LogicalPlan& buildPlan, LogicalPlan& resultPlan) { KU_ASSERT(probePlan.hasUpdate()); tryAppendAccumulate(probePlan); - appendHashJoin(joinNodeIDs, joinType, mark, probePlan, buildPlan, resultPlan); + appendHashJoin(joinConditions, joinType, mark, probePlan, buildPlan, resultPlan); auto& sipInfo = probePlan.getLastOperator()->cast().getSIPInfoUnsafe(); sipInfo.direction = SIPDirection::PROBE_TO_BUILD; } void Planner::appendMarkJoin(const expression_vector& joinNodeIDs, - const std::shared_ptr& mark, LogicalPlan& probePlan, LogicalPlan& buildPlan) { + const std::shared_ptr& mark, LogicalPlan& probePlan, LogicalPlan& buildPlan, + LogicalPlan& resultPlan) { std::vector joinConditions; for (auto& joinNodeID : joinNodeIDs) { joinConditions.emplace_back(joinNodeID, joinNodeID); } + appendMarkJoin(joinConditions, mark, probePlan, buildPlan, resultPlan); +} + +void Planner::appendMarkJoin(const std::vector& joinConditions, + const std::shared_ptr& mark, LogicalPlan& probePlan, LogicalPlan& buildPlan, + LogicalPlan& resultPlan) { auto hashJoin = make_shared(joinConditions, JoinType::MARK, mark, probePlan.getLastOperator(), buildPlan.getLastOperator()); // Apply flattening to probe side @@ -69,8 +82,8 @@ void Planner::appendMarkJoin(const expression_vector& joinNodeIDs, hashJoin->computeFactorizedSchema(); // update cost. Mark join does not change cardinality. hashJoin->setCardinality(probePlan.getCardinality()); - probePlan.setCost(CostModel::computeMarkJoinCost(joinNodeIDs, probePlan, buildPlan)); - probePlan.setLastOperator(std::move(hashJoin)); + resultPlan.setCost(CostModel::computeMarkJoinCost(joinConditions, probePlan, buildPlan)); + resultPlan.setLastOperator(std::move(hashJoin)); } void Planner::appendIntersect(const std::shared_ptr& intersectNodeID, diff --git a/src/planner/plan/plan_subquery.cpp b/src/planner/plan/plan_subquery.cpp index 91d22fc0a3c..43402b37873 100644 --- a/src/planner/plan/plan_subquery.cpp +++ b/src/planner/plan/plan_subquery.cpp @@ -1,5 +1,4 @@ #include "binder/expression/expression_util.h" -#include "binder/expression/property_expression.h" #include "binder/expression/subquery_expression.h" #include "binder/expression_visitor.h" #include "planner/operator/factorization/flatten_resolver.h" @@ -34,42 +33,86 @@ binder::expression_vector Planner::getCorrelatedExprs(const QueryGraphCollection return ExpressionUtil::removeDuplication(result); } -void Planner::planOptionalMatch(const QueryGraphCollection& queryGraphCollection, - const expression_vector& predicates, const binder::expression_vector& corrExprs, - LogicalPlan& leftPlan) { - planOptionalMatch(queryGraphCollection, predicates, corrExprs, nullptr /* mark */, leftPlan); -} +class UnnestSubqueryAnalyzer { +public: + UnnestSubqueryAnalyzer(const Schema& schema, const QueryGraphCollection& queryGraphCollection, + expression_vector predicates) + : schema{schema}, queryGraphCollection{queryGraphCollection}, + predicates{std::move(predicates)} {} -static bool isInternalIDCorrelated(const QueryGraphCollection& queryGraphCollection, - const expression_vector& exprs) { - for (auto& expr : exprs) { - if (expr->getDataType().getLogicalTypeID() != LogicalTypeID::INTERNAL_ID) { - return false; + void analyze() { + for (auto predicate : predicates) { + if (predicate->expressionType != common::ExpressionType::EQUALS) { + unnestAsJoin_ = false; + return; + } + if (isJoinCondition(*predicate->getChild(0), *predicate->getChild(1))) { + joinConditions.emplace_back(predicate->getChild(0), predicate->getChild(1)); + } else if (isJoinCondition(*predicate->getChild(1), *predicate->getChild(0))) { + joinConditions.emplace_back(predicate->getChild(1), predicate->getChild(0)); + } else { + unnestAsJoin_ = false; + return; + } } - // Internal ID might be collected from exists subquery so we need to further check if - // it is in query graph. - if (!queryGraphCollection.contains( - expr->constCast().getVariableName())) { - return false; + if (unnestAsJoin_) { + for (auto& node : queryGraphCollection.getQueryNodes()) { + if (schema.isExpressionInScope(*node->getInternalID())) { + joinConditions.emplace_back(node->getInternalID(), node->getInternalID()); + } + } } } - return true; -} + + bool unnestAsJoin() const { return unnestAsJoin_; } + std::vector getJoinConditions() const { return joinConditions; } + + expression_vector getCorrelatedInternalIDs() const { + expression_vector exprs; + for (auto& node : queryGraphCollection.getQueryNodes()) { + if (schema.isExpressionInScope(*node->getInternalID())) { + exprs.push_back(node->getInternalID()); + } + } + return exprs; + } + +private: + bool isJoinCondition(const Expression& left, const Expression& right) { + return right.expressionType == ExpressionType::PROPERTY && + schema.isExpressionInScope(left) && !schema.isExpressionInScope(right); + } + +private: + const Schema& schema; + const QueryGraphCollection& queryGraphCollection; + expression_vector predicates; + + bool unnestAsJoin_ = true; + std::vector joinConditions; +}; void Planner::planOptionalMatch(const QueryGraphCollection& queryGraphCollection, const expression_vector& predicates, const binder::expression_vector& corrExprs, + LogicalPlan& leftPlan) { + planOptionalMatch(queryGraphCollection, predicates, corrExprs, nullptr /* mark */, leftPlan); +} + +void Planner::planOptionalMatch(const QueryGraphCollection& queryGraphCollection, + const expression_vector& predicates, const binder::expression_vector& correlatedExprs, std::shared_ptr mark, LogicalPlan& leftPlan) { auto info = QueryGraphPlanningInfo(); - info.predicates = predicates; if (leftPlan.isEmpty()) { - // Optional match is the first clause. No left plan to join. + // Optional match is the first clause, e.g. OPTIONAL MATCH RETURN * + info.predicates = predicates; auto plan = planQueryGraphCollection(queryGraphCollection, info); leftPlan.setLastOperator(plan->getLastOperator()); appendOptionalAccumulate(mark, leftPlan); return; } - if (corrExprs.empty()) { - // No join condition, apply cross product. + if (correlatedExprs.empty()) { + // Plan uncorrelated subquery (think of this as a CTE) + info.predicates = predicates; auto rightPlan = planQueryGraphCollection(queryGraphCollection, info); if (leftPlan.hasUpdate()) { appendAccOptionalCrossProduct(mark, leftPlan, *rightPlan, leftPlan); @@ -78,25 +121,33 @@ void Planner::planOptionalMatch(const QueryGraphCollection& queryGraphCollection } return; } - info.corrExprs = corrExprs; + // Plan correlated subquery info.corrExprsCard = leftPlan.getCardinality(); + auto analyzer = UnnestSubqueryAnalyzer(*leftPlan.getSchema(), queryGraphCollection, predicates); + analyzer.analyze(); + std::vector joinConditions; std::unique_ptr rightPlan; - if (isInternalIDCorrelated(queryGraphCollection, corrExprs)) { - // If all correlated expressions are node IDs. We can trivially unnest by scanning internal - // ID in both outer and inner plan as these are fast in-memory operations. For node - // properties, we only scan in the outer query. + if (analyzer.unnestAsJoin()) { + // Unnest as vanilla join info.subqueryType = SubqueryType::INTERNAL_ID_CORRELATED; + info.corrExprs = analyzer.getCorrelatedInternalIDs(); rightPlan = planQueryGraphCollectionInNewContext(queryGraphCollection, info); + joinConditions = analyzer.getJoinConditions(); } else { - // Unnest using ExpressionsScan which scans the accumulated table on probe side. + // Unnest as expression scan + distinct & inner join info.subqueryType = SubqueryType::CORRELATED; + info.corrExprs = correlatedExprs; + info.predicates = predicates; + for (auto& expr : correlatedExprs) { + joinConditions.emplace_back(expr, expr); + } rightPlan = planQueryGraphCollectionInNewContext(queryGraphCollection, info); - appendAccumulate(corrExprs, leftPlan); + appendAccumulate(correlatedExprs, leftPlan); } if (leftPlan.hasUpdate()) { - appendAccHashJoin(corrExprs, JoinType::LEFT, mark, leftPlan, *rightPlan, leftPlan); + appendAccHashJoin(joinConditions, JoinType::LEFT, mark, leftPlan, *rightPlan, leftPlan); } else { - appendHashJoin(corrExprs, JoinType::LEFT, mark, leftPlan, *rightPlan, leftPlan); + appendHashJoin(joinConditions, JoinType::LEFT, mark, leftPlan, *rightPlan, leftPlan); } } @@ -112,10 +163,10 @@ void Planner::planRegularMatch(const QueryGraphCollection& queryGraphCollection, predicatesToPullUp.push_back(predicate); } } - auto correlatedExpressions = + auto correlatedExprs = getCorrelatedExprs(queryGraphCollection, predicatesToPushDown, leftPlan.getSchema()); - auto joinNodeIDs = ExpressionUtil::getExpressionsWithDataType(correlatedExpressions, - LogicalTypeID::INTERNAL_ID); + auto joinNodeIDs = + ExpressionUtil::getExpressionsWithDataType(correlatedExprs, LogicalTypeID::INTERNAL_ID); auto info = QueryGraphPlanningInfo(); info.predicates = predicatesToPushDown; if (joinNodeIDs.empty()) { @@ -147,56 +198,74 @@ void Planner::planRegularMatch(const QueryGraphCollection& queryGraphCollection, void Planner::planSubquery(const std::shared_ptr& expression, LogicalPlan& outerPlan) { KU_ASSERT(expression->expressionType == ExpressionType::SUBQUERY); - auto subquery = static_pointer_cast(expression); - auto predicates = subquery->getPredicatesSplitOnAnd(); + auto subquery = expression->ptrCast(); auto correlatedExprs = getDependentExprs(expression, *outerPlan.getSchema()); + auto predicates = subquery->getPredicatesSplitOnAnd(); std::unique_ptr innerPlan; auto info = QueryGraphPlanningInfo(); - info.predicates = predicates; if (correlatedExprs.empty()) { + // Plan uncorrelated subquery info.subqueryType = SubqueryType::NONE; + info.predicates = predicates; innerPlan = planQueryGraphCollectionInNewContext(*subquery->getQueryGraphCollection(), info); + expression_vector emptyHashKeys; + auto projectExprs = expression_vector{subquery->getProjectionExpr()}; switch (subquery->getSubqueryType()) { case common::SubqueryType::EXISTS: { - appendAggregate(expression_vector{}, expression_vector{subquery->getCountStarExpr()}, - *innerPlan); - appendProjection(expression_vector{subquery->getProjectionExpr()}, *innerPlan); + auto aggregates = expression_vector{subquery->getCountStarExpr()}; + appendAggregate(emptyHashKeys, aggregates, *innerPlan); + appendProjection(projectExprs, *innerPlan); } break; case common::SubqueryType::COUNT: { - appendAggregate(expression_vector{}, expression_vector{subquery->getProjectionExpr()}, - *innerPlan); + appendAggregate(emptyHashKeys, projectExprs, *innerPlan); } break; default: KU_UNREACHABLE; } appendCrossProduct(outerPlan, *innerPlan, outerPlan); + return; + } + // Plan correlated subquery + info.corrExprsCard = outerPlan.getCardinality(); + auto analyzer = UnnestSubqueryAnalyzer(*outerPlan.getSchema(), + *subquery->getQueryGraphCollection(), predicates); + analyzer.analyze(); + std::vector joinConditions; + if (analyzer.unnestAsJoin()) { + // Unnest as vanilla join + info.subqueryType = SubqueryType::INTERNAL_ID_CORRELATED; + info.corrExprs = analyzer.getCorrelatedInternalIDs(); + innerPlan = + planQueryGraphCollectionInNewContext(*subquery->getQueryGraphCollection(), info); + joinConditions = analyzer.getJoinConditions(); } else { + // Unnest as expression scan + distinct & inner join + info.subqueryType = SubqueryType::CORRELATED; info.corrExprs = correlatedExprs; - info.corrExprsCard = outerPlan.getCardinality(); - if (isInternalIDCorrelated(*subquery->getQueryGraphCollection(), correlatedExprs)) { - info.subqueryType = SubqueryType::INTERNAL_ID_CORRELATED; - innerPlan = - planQueryGraphCollectionInNewContext(*subquery->getQueryGraphCollection(), info); - } else { - info.subqueryType = SubqueryType::CORRELATED; - innerPlan = - planQueryGraphCollectionInNewContext(*subquery->getQueryGraphCollection(), info); - appendAccumulate(correlatedExprs, outerPlan); + info.predicates = predicates; + for (auto& expr : correlatedExprs) { + joinConditions.emplace_back(expr, expr); } - switch (subquery->getSubqueryType()) { - case common::SubqueryType::EXISTS: { - appendMarkJoin(correlatedExprs, expression, outerPlan, *innerPlan); - } break; - case common::SubqueryType::COUNT: { - appendAggregate(correlatedExprs, expression_vector{subquery->getProjectionExpr()}, - *innerPlan); - appendHashJoin(correlatedExprs, common::JoinType::COUNT, outerPlan, *innerPlan, - outerPlan); - } break; - default: - KU_UNREACHABLE; + innerPlan = + planQueryGraphCollectionInNewContext(*subquery->getQueryGraphCollection(), info); + appendAccumulate(correlatedExprs, outerPlan); + } + switch (subquery->getSubqueryType()) { + case common::SubqueryType::EXISTS: { + appendMarkJoin(joinConditions, expression, outerPlan, *innerPlan, outerPlan); + } break; + case common::SubqueryType::COUNT: { + expression_vector hashKeys; + for (auto& joinCondition : joinConditions) { + hashKeys.push_back(joinCondition.second); } + appendAggregate(hashKeys, expression_vector{subquery->getProjectionExpr()}, *innerPlan); + appendHashJoin(joinConditions, common::JoinType::COUNT, nullptr, outerPlan, *innerPlan, + outerPlan); + } break; + default: + KU_UNREACHABLE; } }