From 69ea6f9f13506d60934e9cf3e5297a2584d88c0d Mon Sep 17 00:00:00 2001 From: xiyang Date: Thu, 2 Jan 2025 16:14:22 +0800 Subject: [PATCH] Improve parallel kcore --- src/function/gds/CMakeLists.txt | 2 +- src/function/gds/K_Core_Decomposition.cpp | 333 ---------------- src/function/gds/k_core_decomposition.cpp | 368 ++++++++++++++++++ src/function/gds/page_rank.cpp | 11 - src/function/gds/variable_length_path.cpp | 10 +- .../gds/weakly_connected_components.cpp | 34 +- src/include/function/gds/gds_frontier.h | 3 + src/include/function/gds/rec_joins.h | 9 +- test/test_files/function/gds/basic.test | 1 - 9 files changed, 387 insertions(+), 384 deletions(-) delete mode 100644 src/function/gds/K_Core_Decomposition.cpp create mode 100644 src/function/gds/k_core_decomposition.cpp diff --git a/src/function/gds/CMakeLists.txt b/src/function/gds/CMakeLists.txt index e58add2b5f4..8e856fcabab 100644 --- a/src/function/gds/CMakeLists.txt +++ b/src/function/gds/CMakeLists.txt @@ -11,7 +11,7 @@ add_library(kuzu_function_algorithm output_writer.cpp variable_length_path.cpp weakly_connected_components.cpp - K_Core_Decomposition.cpp) + k_core_decomposition.cpp) set(ALL_OBJECT_FILES ${ALL_OBJECT_FILES} $ diff --git a/src/function/gds/K_Core_Decomposition.cpp b/src/function/gds/K_Core_Decomposition.cpp deleted file mode 100644 index 15f9944ffef..00000000000 --- a/src/function/gds/K_Core_Decomposition.cpp +++ /dev/null @@ -1,333 +0,0 @@ -#include "binder/binder.h" -#include "common/types/types.h" -#include "function/gds/gds_frontier.h" -#include "function/gds/gds_function_collection.h" -#include "function/gds/gds_object_manager.h" -#include "function/gds/gds_utils.h" -#include "function/gds/output_writer.h" -#include "function/gds_function.h" -#include "graph/graph.h" -#include "processor/execution_context.h" -#include "processor/result/factorized_table.h" - -using namespace kuzu::binder; -using namespace kuzu::common; -using namespace kuzu::processor; -using namespace kuzu::storage; -using namespace kuzu::graph; - -namespace kuzu { -namespace function { - -class KCoreFrontierPair : public FrontierPair { - friend struct KCoreEdgeCompute; - -public: - KCoreFrontierPair(std::shared_ptr curFrontier, - std::shared_ptr nextFrontier, uint64_t maxThreads, - table_id_map_t numNodesMap, storage::MemoryManager* mm) - : FrontierPair(curFrontier, nextFrontier, maxThreads), numNodesMap{numNodesMap} { - for (const auto& [tableID, curNumNodes] : numNodesMap) { - vertexValueMap.allocate(tableID, curNumNodes, mm); - nextVertexValueMap.allocate(tableID, curNumNodes, mm); - auto data = vertexValueMap.getData(tableID); - auto nextData = nextVertexValueMap.getData(tableID); - for (auto i = 0u; i < curNumNodes; ++i) { - data[i].store(0, std::memory_order_relaxed); - nextData[i].store(0, std::memory_order_relaxed); - } - } - } - - void initRJFromSource(common::nodeID_t /* source */) override{}; - - void pinCurrFrontier(common::table_id_t tableID) override { - FrontierPair::pinCurrFrontier(tableID); - curDenseFrontier->ptrCast()->pinCurFrontierTableID(tableID); - } - - void pinNextFrontier(common::table_id_t tableID) override { - FrontierPair::pinNextFrontier(tableID); - nextDenseFrontier->ptrCast()->pinNextFrontierTableID(tableID); - } - - void pinVertexValues(common::table_id_t tableID) { - curVertexValue = vertexValueMap.getData(tableID); - nextVertexValue = nextVertexValueMap.getData(tableID); - } - - void beginFrontierComputeBetweenTables(table_id_t curTableID, table_id_t nextTableID) override { - FrontierPair::beginFrontierComputeBetweenTables(curTableID, nextTableID); - pinVertexValues(nextTableID); - } - - void beginNewIterationInternalNoLock() override { - // Update currentFrontier and nextFrontier to be the same - for (const auto& [tableID, curNumNodes] : numNodesMap) { - for (auto i = 0u; i < curNumNodes; ++i) { - auto temp = vertexValueMap.getData(tableID)[i].load(std::memory_order_relaxed); - vertexValueMap.getData(tableID)[i].compare_exchange_strong(temp, - nextVertexValueMap.getData(tableID)[i].load(std::memory_order_relaxed), - std::memory_order_relaxed); - } - } - updateSmallestDegree(); - std::swap(curDenseFrontier, nextDenseFrontier); - curDenseFrontier->ptrCast()->incrementCurIter(); - nextDenseFrontier->ptrCast()->incrementCurIter(); - } - - uint64_t addToVertexDegree(common::offset_t offset, uint64_t degreeToAdd) { - nextVertexValue[offset].fetch_add(degreeToAdd, std::memory_order_relaxed); - return curVertexValue[offset].fetch_add(degreeToAdd, std::memory_order_relaxed); - } - - // Called to remove degrees from a neighbouring vertex - // Returns whether the neighbouring vertex should be set as active or not - bool removeFromVertex(common::nodeID_t nodeID) { - int curSmallest = curSmallestDegree.load(std::memory_order_relaxed); - int nextVertexDegree = nextVertexValueMap.getData(nodeID.tableID)[nodeID.offset].load( - std::memory_order_relaxed); - int curVertexDegree = - vertexValueMap.getData(nodeID.tableID)[nodeID.offset].load(std::memory_order_relaxed); - // The vertex should be set as active if it will be considered in a future iteration - // The vertex should be set as inactive if it will be processed this iteration - if (nextVertexDegree > curSmallest) { - nextVertexValueMap.getData(nodeID.tableID)[nodeID.offset].fetch_sub(1, - std::memory_order_relaxed); - } - if (curVertexDegree <= curSmallest) { - return false; - } - return true; - } - - void updateSmallestDegree() { - uint64_t nextSmallestDegree = UINT64_MAX; - for (const auto& [tableID, curNumNodes] : numNodesMap) { - curDenseFrontier->pinTableID(tableID); - for (auto i = 0u; i < curNumNodes; ++i) { - if (curDenseFrontier->isActive(i)) { - nextSmallestDegree = std::min(nextSmallestDegree, - vertexValueMap.getData(tableID)[i].load(std::memory_order_relaxed)); - } - } - } - uint64_t smallestDegree = curSmallestDegree.load(std::memory_order_relaxed); - curSmallestDegree.compare_exchange_strong(smallestDegree, nextSmallestDegree, - std::memory_order_relaxed); - } - - uint64_t getSmallestDegree() { return curSmallestDegree.load(std::memory_order_relaxed); } - - uint64_t getVertexValue(common::offset_t offset) { - return curVertexValue[offset].load(std::memory_order_relaxed); - } - -private: - std::atomic curSmallestDegree{UINT64_MAX}; - common::table_id_map_t numNodesMap; - std::atomic* curVertexValue = nullptr; - std::atomic* nextVertexValue = nullptr; - ObjectArraysMap> vertexValueMap; - ObjectArraysMap> nextVertexValueMap; -}; - -struct KCoreInitEdgeCompute : public EdgeCompute { - KCoreFrontierPair* frontierPair; - - explicit KCoreInitEdgeCompute(KCoreFrontierPair* frontierPair) : frontierPair{frontierPair} {} - - std::vector edgeCompute(common::nodeID_t boundNodeID, - graph::NbrScanState::Chunk& chunk, bool) override { - std::vector result; - result.push_back(boundNodeID); - frontierPair->addToVertexDegree(boundNodeID.offset, chunk.size()); - return result; - } - - std::unique_ptr copy() override { - return std::make_unique(frontierPair); - } -}; - -struct KCoreEdgeCompute : public EdgeCompute { - KCoreFrontierPair* frontierPair; - - explicit KCoreEdgeCompute(KCoreFrontierPair* frontierPair) : frontierPair{frontierPair} {} - - std::vector edgeCompute(common::nodeID_t boundNodeID, - graph::NbrScanState::Chunk& chunk, bool) override { - std::vector result; - uint64_t vertexDegree = frontierPair->getVertexValue(boundNodeID.offset); - uint64_t smallestDegree = frontierPair->getSmallestDegree(); - if (vertexDegree <= smallestDegree) { - chunk.forEach([&](auto nbrNodeID, auto) { - if (frontierPair->curDenseFrontier->isActive(nbrNodeID.offset)) { - auto shouldBeSetActive = frontierPair->removeFromVertex(nbrNodeID); - if (shouldBeSetActive) { - result.push_back(nbrNodeID); - } - } - }); - } - // If the node hasn't been considered this iteration, it will need to still be active in - // future iterations. - else { - result.push_back(boundNodeID); - } - return result; - } - - std::unique_ptr copy() override { - return std::make_unique(frontierPair); - } -}; - -class KCoreOutputWriter : GDSOutputWriter { -public: - explicit KCoreOutputWriter(main::ClientContext* context, - processor::NodeOffsetMaskMap* outputNodeMask, KCoreFrontierPair* frontierPair) - : GDSOutputWriter{context, outputNodeMask}, frontierPair{frontierPair} { - nodeIDVector = createVector(LogicalType::INTERNAL_ID(), context->getMemoryManager()); - kValueVector = createVector(LogicalType::UINT64(), context->getMemoryManager()); - } - - void pinTableID(common::table_id_t tableID) override { - GDSOutputWriter::pinTableID(tableID); - frontierPair->pinVertexValues(tableID); - } - - void materialize(offset_t startOffset, offset_t endOffset, table_id_t tableID, - FactorizedTable& table) const { - for (auto i = startOffset; i < endOffset; ++i) { - auto nodeID = nodeID_t{i, tableID}; - nodeIDVector->setValue(0, nodeID); - kValueVector->setValue(0, frontierPair->getVertexValue(i)); - table.append(vectors); - } - } - - std::unique_ptr copy() const { - return std::make_unique(context, outputNodeMask, frontierPair); - } - -private: - std::unique_ptr nodeIDVector; - std::unique_ptr kValueVector; - KCoreFrontierPair* frontierPair; -}; - -class KCoreVertexCompute : public VertexCompute { -public: - KCoreVertexCompute(storage::MemoryManager* mm, processor::GDSCallSharedState* sharedState, - std::unique_ptr outputWriter) - : mm{mm}, sharedState{sharedState}, outputWriter{std::move(outputWriter)} { - localFT = sharedState->claimLocalTable(mm); - } - ~KCoreVertexCompute() override { sharedState->returnLocalTable(localFT); } - - bool beginOnTable(common::table_id_t tableID) override { - outputWriter->pinTableID(tableID); - return true; - } - - void vertexCompute(common::offset_t startOffset, common::offset_t endOffset, - common::table_id_t tableID) override { - outputWriter->materialize(startOffset, endOffset, tableID, *localFT); - } - - std::unique_ptr copy() override { - return std::make_unique(mm, sharedState, outputWriter->copy()); - } - -private: - storage::MemoryManager* mm; - processor::GDSCallSharedState* sharedState; - std::unique_ptr outputWriter; - processor::FactorizedTable* localFT; -}; - -class KCoreDecomposition final : public GDSAlgorithm { - static constexpr char GROUP_ID_COLUMN_NAME[] = "k_degree"; - -public: - KCoreDecomposition() = default; - KCoreDecomposition(const KCoreDecomposition& other) : GDSAlgorithm{other} {} - - /* - * Inputs are - * - * graph::ANY - */ - std::vector getParameterTypeIDs() const override { - return std::vector{LogicalTypeID::ANY}; - } - /* - * Outputs are - * - * _node._id::INTERNAL_ID - * group_id::INT64 - */ - binder::expression_vector getResultColumns(binder::Binder* binder) const override { - expression_vector columns; - auto& outputNode = bindData->getNodeOutput()->constCast(); - columns.push_back(outputNode.getInternalID()); - columns.push_back(binder->createVariable(GROUP_ID_COLUMN_NAME, LogicalType::INT64())); - return columns; - } - void bind(const GDSBindInput& input, main::ClientContext&) override { - auto nodeOutput = bindNodeOutput(input.binder, input.graphEntry.nodeEntries); - bindData = std::make_unique(nodeOutput); - } - - void exec(processor::ExecutionContext* context) override { - auto clientContext = context->clientContext; - auto graph = sharedState->graph.get(); - auto numNodesMap = graph->getNumNodesMap(clientContext->getTx()); - auto numThreads = clientContext->getMaxNumThreadForExec(); - auto currentFrontier = getPathLengthsFrontier(context, PathLengths::UNVISITED); - auto nextFrontier = getPathLengthsFrontier(context, 0); - auto frontierPair = std::make_unique(currentFrontier, nextFrontier, - numThreads, numNodesMap, clientContext->getMemoryManager()); - - frontierPair->setActiveNodesForNextIter(); - frontierPair->getNextSparseFrontier().disable(); - - auto initEdgeCompute = std::make_unique(frontierPair.get()); - auto writer = std::make_unique(clientContext, - sharedState->getOutputNodeMaskMap(), frontierPair.get()); - auto computeState = GDSComputeState(std::move(frontierPair), std::move(initEdgeCompute), - sharedState->getOutputNodeMaskMap()); - - GDSUtils::runFrontiersUntilConvergence(context, computeState, graph, ExtendDirection::BOTH, - 1); - - auto edgeCompute = std::make_unique( - computeState.frontierPair->ptrCast()); - - computeState.edgeCompute = std::move(edgeCompute); - computeState.frontierPair->setActiveNodesForNextIter(); - - GDSUtils::runFrontiersUntilConvergence(context, computeState, graph, ExtendDirection::BOTH, - UINT16_MAX); - auto vertexCompute = std::make_unique(clientContext->getMemoryManager(), - sharedState.get(), std::move(writer)); - GDSUtils::runVertexCompute(context, sharedState->graph.get(), *vertexCompute); - sharedState->mergeLocalTables(); - } - - std::unique_ptr copy() const override { - return std::make_unique(*this); - } -}; -function_set KCoreDecompositionFunction::getFunctionSet() { - function_set result; - auto algo = std::make_unique(); - auto function = - std::make_unique(name, algo->getParameterTypeIDs(), std::move(algo)); - result.push_back(std::move(function)); - return result; -} -} // namespace function -} // namespace kuzu \ No newline at end of file diff --git a/src/function/gds/k_core_decomposition.cpp b/src/function/gds/k_core_decomposition.cpp new file mode 100644 index 00000000000..2bc07208cbf --- /dev/null +++ b/src/function/gds/k_core_decomposition.cpp @@ -0,0 +1,368 @@ +#include "binder/binder.h" +#include "common/types/types.h" +#include "function/gds/gds_frontier.h" +#include "function/gds/gds_function_collection.h" +#include "function/gds/gds_object_manager.h" +#include "function/gds/gds_utils.h" +#include "function/gds/output_writer.h" +#include "function/gds_function.h" +#include "graph/graph.h" +#include "processor/execution_context.h" +#include "processor/result/factorized_table.h" +#include "binder/expression/expression_util.h" + +using namespace kuzu::binder; +using namespace kuzu::common; +using namespace kuzu::processor; +using namespace kuzu::storage; +using namespace kuzu::graph; + +namespace kuzu { +namespace function { + +using degree_t = uint64_t; +static constexpr degree_t INVALID_DEGREE = UINT64_MAX; + +class Degrees { +public: + Degrees(const table_id_map_t& numNodesMap, MemoryManager* mm) { + init(numNodesMap, mm); + } + + void pinTable(table_id_t tableID) { degreeValues = degreeValuesMap.getData(tableID); } + + void addDegree(offset_t offset, uint64_t degree) { + degreeValues[offset].fetch_add(degree, std::memory_order_relaxed); + } + + void decreaseDegreeByOne(offset_t offset) { + degreeValues[offset].fetch_sub(1, std::memory_order_relaxed); + } + + degree_t getValue(offset_t offset) { + return degreeValues[offset].load(std::memory_order_relaxed); + } + +private: + void init(const table_id_map_t& numNodesMap, MemoryManager* mm) { + for (const auto& [tableID, numNodes] : numNodesMap) { + degreeValuesMap.allocate(tableID, numNodes, mm); + pinTable(tableID); + for (auto i = 0u; i < numNodes; ++i) { + degreeValues[i].store(0, std::memory_order_relaxed); + } + } + } + +private: + std::atomic* degreeValues = nullptr; + ObjectArraysMap> degreeValuesMap; +}; + +class CoreValues { +public: + CoreValues(const table_id_map_t& numNodesMap, MemoryManager* mm) { + init(numNodesMap, mm); + } + + void pinTable(table_id_t tableID) { coreValues = coreValuesMap.getData(tableID);} + + bool isValid(offset_t offset) { + return coreValues[offset].load(std::memory_order_relaxed) != INVALID_DEGREE; + } + degree_t getValue(offset_t offset) { + return coreValues[offset].load(std::memory_order_relaxed); + } + void setCoreValue(offset_t offset, degree_t value) { + coreValues[offset].store(value, std::memory_order_relaxed); + } + +private: + void init(const table_id_map_t& numNodesMap, MemoryManager* mm) { + for (const auto& [tableID, numNodes] : numNodesMap) { + coreValuesMap.allocate(tableID, numNodes, mm); + pinTable(tableID); + for (auto i = 0u; i < numNodes; ++i) { + coreValues[i].store(INVALID_DEGREE, std::memory_order_relaxed); + } + } + } + +private: + std::atomic* coreValues = nullptr; + ObjectArraysMap> coreValuesMap; +}; + +class KCoreFrontierPair : public FrontierPair { +public: + KCoreFrontierPair(std::shared_ptr curFrontier, + std::shared_ptr nextFrontier, Degrees* degrees, + CoreValues* coreValues) + : FrontierPair(curFrontier, nextFrontier), degrees{degrees}, + coreValues{coreValues} {} + + void initRJFromSource(nodeID_t /* source */) override{}; + + void pinCurrFrontier(table_id_t tableID) override { + FrontierPair::pinCurrFrontier(tableID); + curDenseFrontier->ptrCast()->pinCurFrontierTableID(tableID); + } + + void pinNextFrontier(table_id_t tableID) override { + FrontierPair::pinNextFrontier(tableID); + nextDenseFrontier->ptrCast()->pinNextFrontierTableID(tableID); + } + + void beginFrontierComputeBetweenTables(table_id_t curTableID, table_id_t nextTableID) override { + FrontierPair::beginFrontierComputeBetweenTables(curTableID, nextTableID); + degrees->pinTable(nextTableID); + coreValues->pinTable(nextTableID); + } + + void beginNewIterationInternalNoLock() override { + std::swap(curDenseFrontier, nextDenseFrontier); + curDenseFrontier->ptrCast()->incrementCurIter(); + nextDenseFrontier->ptrCast()->incrementCurIter(); + } + +private: + Degrees* degrees; + CoreValues* coreValues; +}; + +struct DegreeEdgeCompute : public EdgeCompute { + Degrees* degrees; + + explicit DegreeEdgeCompute(Degrees* degrees) : degrees{degrees} {} + + std::vector edgeCompute(nodeID_t boundNodeID, + graph::NbrScanState::Chunk& chunk, bool) override { + degrees->addDegree(boundNodeID.offset, chunk.size()); + return {}; + } + + std::unique_ptr copy() override { + return std::make_unique(degrees); + } +}; + +struct RemoveVertexEdgeCompute : public EdgeCompute { + Degrees* degrees; + + explicit RemoveVertexEdgeCompute(Degrees* degrees) : degrees{degrees} {} + + std::vector edgeCompute(nodeID_t, + graph::NbrScanState::Chunk& chunk, bool) override { + chunk.forEach([&](auto nbrNodeID, auto) { + degrees->decreaseDegreeByOne(nbrNodeID.offset); + }); + return {}; + } + + std::unique_ptr copy() override { + return std::make_unique(degrees); + } +}; + +class KCoreOutputWriter : GDSOutputWriter { +public: + KCoreOutputWriter(main::ClientContext* context, + processor::NodeOffsetMaskMap* outputNodeMask, CoreValues* coreValues) + : GDSOutputWriter{context, outputNodeMask}, coreValues{coreValues} { + nodeIDVector = createVector(LogicalType::INTERNAL_ID(), context->getMemoryManager()); + kValueVector = createVector(LogicalType::UINT64(), context->getMemoryManager()); + } + + void pinTableID(table_id_t tableID) override { + GDSOutputWriter::pinTableID(tableID); + coreValues->pinTable(tableID); + } + + void materialize(offset_t startOffset, offset_t endOffset, table_id_t tableID, + FactorizedTable& table) const { + for (auto i = startOffset; i < endOffset; ++i) { + auto nodeID = nodeID_t{i, tableID}; + nodeIDVector->setValue(0, nodeID); + kValueVector->setValue(0, coreValues->getValue(i)); + table.append(vectors); + } + } + + std::unique_ptr copy() const { + return std::make_unique(context, outputNodeMask, coreValues); + } + +private: + std::unique_ptr nodeIDVector; + std::unique_ptr kValueVector; + CoreValues* coreValues; +}; + +class OutputVertexCompute : public VertexCompute { +public: + OutputVertexCompute(MemoryManager* mm, processor::GDSCallSharedState* sharedState, + std::unique_ptr outputWriter) + : mm{mm}, sharedState{sharedState}, outputWriter{std::move(outputWriter)} { + localFT = sharedState->claimLocalTable(mm); + } + ~OutputVertexCompute() override { sharedState->returnLocalTable(localFT); } + + bool beginOnTable(table_id_t tableID) override { + outputWriter->pinTableID(tableID); + return true; + } + + void vertexCompute(offset_t startOffset, offset_t endOffset, table_id_t tableID) override { + outputWriter->materialize(startOffset, endOffset, tableID, *localFT); + } + + std::unique_ptr copy() override { + return std::make_unique(mm, sharedState, outputWriter->copy()); + } + +private: + MemoryManager* mm; + processor::GDSCallSharedState* sharedState; + std::unique_ptr outputWriter; + processor::FactorizedTable* localFT; +}; + +class DegreeLessThanCoreVertexCompute : public VertexCompute { +public: + DegreeLessThanCoreVertexCompute(Degrees* degrees, CoreValues* coreValues, + FrontierPair* frontierPair, degree_t coreValue, std::atomic& numActiveNodes) + : degrees{degrees}, coreValues{coreValues}, frontierPair{frontierPair}, + coreValue{coreValue}, numActiveNodes{numActiveNodes} {} + + bool beginOnTable(table_id_t tableID) override { + degrees->pinTable(tableID); + coreValues->pinTable(tableID); + return true; + } + + void vertexCompute(offset_t startOffset, offset_t endOffset, table_id_t tableID) override { + for (auto i = startOffset; i < endOffset; ++i) { + if (coreValues->isValid(i)) { // Core has been computed + continue; + } + auto degree = degrees->getValue(i); + if (degree <= coreValue) { + frontierPair->addNodeToNextDenseFrontier(nodeID_t{i, tableID}); + coreValues->setCoreValue(i, coreValue); + numActiveNodes.fetch_add(1, std::memory_order_relaxed); + } + } + } + + std::unique_ptr copy() override { + return std::make_unique(degrees, coreValues, + frontierPair, coreValue, numActiveNodes); + } + +private: + Degrees* degrees; + CoreValues* coreValues; + FrontierPair* frontierPair; + degree_t coreValue; + std::atomic& numActiveNodes; +}; + +class KCoreDecomposition final : public GDSAlgorithm { + static constexpr char GROUP_ID_COLUMN_NAME[] = "k_degree"; + +public: + KCoreDecomposition() = default; + KCoreDecomposition(const KCoreDecomposition& other) : GDSAlgorithm{other} {} + + std::vector getParameterTypeIDs() const override { + return std::vector{LogicalTypeID::ANY}; + } + + binder::expression_vector getResultColumns(binder::Binder* binder) const override { + expression_vector columns; + auto& outputNode = bindData->getNodeOutput()->constCast(); + columns.push_back(outputNode.getInternalID()); + columns.push_back(binder->createVariable(GROUP_ID_COLUMN_NAME, LogicalType::INT64())); + return columns; + } + + void bind(const GDSBindInput& input, main::ClientContext& context) override { + auto graphName = binder::ExpressionUtil::getLiteralValue(*input.getParam(0)); + auto graphEntry = bindGraphEntry(context, graphName); + auto nodeOutput = bindNodeOutput(input.binder, graphEntry.nodeEntries); + bindData = std::make_unique(std::move(graphEntry), nodeOutput); + } + + void exec(processor::ExecutionContext* context) override { + auto clientContext = context->clientContext; + auto mm = clientContext->getMemoryManager(); + auto graph = sharedState->graph.get(); + auto numNodesMap = graph->getNumNodesMap(clientContext->getTransaction()); + auto degrees = Degrees(numNodesMap, mm); + auto coreValues = CoreValues(numNodesMap, mm); + auto currentFrontier = getPathLengthsFrontier(context, PathLengths::UNVISITED); + auto nextFrontier = getPathLengthsFrontier(context, 0); + auto frontierPair = std::make_unique(currentFrontier, nextFrontier, + °rees, &coreValues); + // Initialize starting nodes (all nodes) in the next frontier. + // When beginNewIteration, next frontier will become current frontier + frontierPair->setActiveNodesForNextIter(); + frontierPair->getNextSparseFrontier().disable(); + // Compute Degree + auto degreeEdgeCompute = std::make_unique(°rees); + auto computeState = GDSComputeState(std::move(frontierPair), std::move(degreeEdgeCompute), + sharedState->getOutputNodeMaskMap()); + GDSUtils::runFrontiersUntilConvergence(context, computeState, graph, ExtendDirection::BOTH, + 1 /* maxIters */); + // Compute Core values + auto removeVertexEdgeCompute = std::make_unique(°rees); + computeState.edgeCompute = std::move(removeVertexEdgeCompute); + auto coreValue = 0u; + auto numNodes = graph->getNumNodes(clientContext->getTransaction()); + auto numNodesComputed = 0u; + while (numNodes != numNodesComputed) { + // Compute current core value + while (true) { + std::atomic numActiveNodes; + numActiveNodes.store(0); + // Find nodes with degree less than current core. + auto vc = DegreeLessThanCoreVertexCompute(°rees, &coreValues, computeState.frontierPair.get(), coreValue, numActiveNodes); + GDSUtils::runVertexCompute(context, sharedState->graph.get(), vc); + numNodesComputed += numActiveNodes.load(); + if (numActiveNodes.load() == 0) { + break; + } + // Remove found nodes by decreasing their nbrs degree by one. + computeState.frontierPair->setActiveNodesForNextIter(); + computeState.frontierPair->getNextSparseFrontier().disable(); + GDSUtils::runFrontiersUntilConvergence(context, computeState, graph, ExtendDirection::BOTH, + computeState.frontierPair->getCurrentIter() + 1 /* maxIters */); + // Repeat until all remaining nodes has degree greater than current core. + } + coreValue++; + } + // Write output + auto writer = std::make_unique(clientContext, + sharedState->getOutputNodeMaskMap(), &coreValues); + auto vertexCompute = OutputVertexCompute(clientContext->getMemoryManager(), + sharedState.get(), std::move(writer)); + GDSUtils::runVertexCompute(context, sharedState->graph.get(), vertexCompute); + sharedState->mergeLocalTables(); + } + + std::unique_ptr copy() const override { + return std::make_unique(*this); + } +}; + +function_set KCoreDecompositionFunction::getFunctionSet() { + function_set result; + auto algo = std::make_unique(); + auto function = + std::make_unique(name, algo->getParameterTypeIDs(), std::move(algo)); + result.push_back(std::move(function)); + return result; +} + +} // namespace function +} // namespace kuzu diff --git a/src/function/gds/page_rank.cpp b/src/function/gds/page_rank.cpp index dd115c63a85..21c9ee0211b 100644 --- a/src/function/gds/page_rank.cpp +++ b/src/function/gds/page_rank.cpp @@ -73,21 +73,10 @@ class PageRank final : public GDSAlgorithm { PageRank() = default; PageRank(const PageRank& other) : GDSAlgorithm{other} {} - /* - * Inputs are - * - * graph::ANY - */ std::vector getParameterTypeIDs() const override { return {LogicalTypeID::ANY}; } - /* - * Outputs are - * - * node_id::INTERNAL_ID - * rank::DOUBLE - */ binder::expression_vector getResultColumns(binder::Binder* binder) const override { expression_vector columns; auto& outputNode = bindData->getNodeOutput()->constCast(); diff --git a/src/function/gds/variable_length_path.cpp b/src/function/gds/variable_length_path.cpp index 949248230ce..e6deda1a350 100644 --- a/src/function/gds/variable_length_path.cpp +++ b/src/function/gds/variable_length_path.cpp @@ -80,15 +80,7 @@ class VarLenJoinsAlgorithm final : public RJAlgorithm { VarLenJoinsAlgorithm() = default; VarLenJoinsAlgorithm(const VarLenJoinsAlgorithm& other) : RJAlgorithm(other) {} - /* - * Inputs include the following: - * - * graph::ANY - * srcNode::NODE - * lowerBound::INT64 - * upperBound::INT64 - * direction::STRING - */ + // Inputs are: graph, srcNode, lowerBound, upperBound, direction std::vector getParameterTypeIDs() const override { return {LogicalTypeID::ANY, LogicalTypeID::NODE, LogicalTypeID::INT64, LogicalTypeID::INT64, LogicalTypeID::STRING}; diff --git a/src/function/gds/weakly_connected_components.cpp b/src/function/gds/weakly_connected_components.cpp index 3c8a9225703..ef1afb61cff 100644 --- a/src/function/gds/weakly_connected_components.cpp +++ b/src/function/gds/weakly_connected_components.cpp @@ -24,16 +24,8 @@ class WCCFrontierPair : public FrontierPair { public: WCCFrontierPair(std::shared_ptr curFrontier, std::shared_ptr nextFrontier, table_id_map_t numNodesMap, - storage::MemoryManager* mm) - : FrontierPair(curFrontier, nextFrontier) { - for (const auto& [tableID, numNodes] : numNodesMap) { - vertexValueMap.allocate(tableID, numNodes, mm); - auto data = vertexValueMap.getData(tableID); - // Cast a unique number to each node - for (auto i = 0u; i < numNodes; ++i) { - data[i].store(i, std::memory_order_relaxed); - } - } + storage::MemoryManager* mm) : FrontierPair(curFrontier, nextFrontier) { + initVertexValues(numNodesMap, mm); } void initRJFromSource(common::nodeID_t) override { setActiveNodesForNextIter(); }; @@ -77,6 +69,17 @@ class WCCFrontierPair : public FrontierPair { return vertexValues[offset].load(std::memory_order_relaxed); } +private: + void initVertexValues(table_id_map_t numNodesMap, storage::MemoryManager* mm) { + for (const auto& [tableID, numNodes] : numNodesMap) { + vertexValueMap.allocate(tableID, numNodes, mm); + auto data = vertexValueMap.getData(tableID); + for (auto i = 0u; i < numNodes; ++i) { + data[i].store(i, std::memory_order_relaxed); + } + } + } + private: std::atomic* vertexValues = nullptr; ObjectArraysMap> vertexValueMap; @@ -175,21 +178,10 @@ class WeaklyConnectedComponent final : public GDSAlgorithm { WeaklyConnectedComponent() = default; WeaklyConnectedComponent(const WeaklyConnectedComponent& other) : GDSAlgorithm{other} {} - /* - * Inputs are - * - * graph::ANY - */ std::vector getParameterTypeIDs() const override { return std::vector{LogicalTypeID::ANY}; } - /* - * Outputs are - * - * _node._id::INTERNAL_ID - * group_id::INT64 - */ binder::expression_vector getResultColumns(binder::Binder* binder) const override { expression_vector columns; auto& outputNode = bindData->getNodeOutput()->constCast(); diff --git a/src/include/function/gds/gds_frontier.h b/src/include/function/gds/gds_frontier.h index 13f4aee9c06..71163783604 100644 --- a/src/include/function/gds/gds_frontier.h +++ b/src/include/function/gds/gds_frontier.h @@ -277,6 +277,9 @@ class KUZU_API FrontierPair { SparseFrontier& getNextSparseFrontier() const { return *nextSparseFrontier; } SparseFrontier& getVertexComputeCandidates() const { return *vertexComputeCandidates; } + bool hasActiveNodes() { + return hasActiveNodesForNextIter_.load(std::memory_order_relaxed); + } bool continueNextIter(uint16_t maxIter); void addNodeToNextDenseFrontier(common::nodeID_t nodeID); diff --git a/src/include/function/gds/rec_joins.h b/src/include/function/gds/rec_joins.h index 83bc5e397c8..65a9bdf0f8a 100644 --- a/src/include/function/gds/rec_joins.h +++ b/src/include/function/gds/rec_joins.h @@ -116,14 +116,7 @@ class SPAlgorithm : public RJAlgorithm { SPAlgorithm() = default; SPAlgorithm(const SPAlgorithm& other) : RJAlgorithm{other} {} - /* - * Inputs include the following: - * - * graph::ANY - * srcNode::NODE - * upperBound::INT64 - * direction::STRING - */ + // Inputs are graph, srcNode, upperBound, direction std::vector getParameterTypeIDs() const override { return {common::LogicalTypeID::ANY, common::LogicalTypeID::NODE, common::LogicalTypeID::INT64, common::LogicalTypeID::STRING}; diff --git a/test/test_files/function/gds/basic.test b/test/test_files/function/gds/basic.test index 2805a3d0dc6..2bccfb558bb 100644 --- a/test/test_files/function/gds/basic.test +++ b/test/test_files/function/gds/basic.test @@ -3,7 +3,6 @@ -- -CASE BasicAlgorithm - -STATEMENT CALL create_project_graph('PK', ['person'], ['knows']) ---- ok -STATEMENT CALL create_project_graph('PK', [], [])