Skip to content

Commit

Permalink
KCore Parallel implementation (#4661)
Browse files Browse the repository at this point in the history
* current draft implementation

---------

Co-authored-by: CI Bot <[email protected]>
  • Loading branch information
2 people authored and andyfengHKU committed Jan 9, 2025
1 parent 6c78434 commit 6e4cad2
Show file tree
Hide file tree
Showing 7 changed files with 406 additions and 5 deletions.
7 changes: 4 additions & 3 deletions src/function/function_collection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -229,9 +229,10 @@ FunctionCollection* FunctionCollection::getFunctions() {

// Algorithm functions
ALGORITHM_FUNCTION(WeaklyConnectedComponentsFunction),
ALGORITHM_FUNCTION(VarLenJoinsFunction), ALGORITHM_FUNCTION(AllSPDestinationsFunction),
ALGORITHM_FUNCTION(AllSPPathsFunction), ALGORITHM_FUNCTION(SingleSPDestinationsFunction),
ALGORITHM_FUNCTION(SingleSPPathsFunction), ALGORITHM_FUNCTION(PageRankFunction),
ALGORITHM_FUNCTION(KCoreDecompositionFunction), ALGORITHM_FUNCTION(VarLenJoinsFunction),
ALGORITHM_FUNCTION(AllSPDestinationsFunction), ALGORITHM_FUNCTION(AllSPPathsFunction),
ALGORITHM_FUNCTION(SingleSPDestinationsFunction), ALGORITHM_FUNCTION(SingleSPPathsFunction),
ALGORITHM_FUNCTION(PageRankFunction),

// Export functions
EXPORT_FUNCTION(ExportCSVFunction), EXPORT_FUNCTION(ExportParquetFunction),
Expand Down
3 changes: 2 additions & 1 deletion src/function/gds/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ add_library(kuzu_function_algorithm
gds_utils.cpp
output_writer.cpp
variable_length_path.cpp
weakly_connected_components.cpp)
weakly_connected_components.cpp
K_Core_Decomposition.cpp)

set(ALL_OBJECT_FILES
${ALL_OBJECT_FILES} $<TARGET_OBJECTS:kuzu_function_algorithm>
Expand Down
333 changes: 333 additions & 0 deletions src/function/gds/K_Core_Decomposition.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,333 @@
#include "binder/binder.h"
#include "common/types/types.h"
#include "function/gds/gds_frontier.h"
#include "function/gds/gds_function_collection.h"
#include "function/gds/gds_object_manager.h"
#include "function/gds/gds_utils.h"
#include "function/gds/output_writer.h"
#include "function/gds_function.h"
#include "graph/graph.h"
#include "processor/execution_context.h"
#include "processor/result/factorized_table.h"

using namespace kuzu::binder;
using namespace kuzu::common;
using namespace kuzu::processor;
using namespace kuzu::storage;
using namespace kuzu::graph;

namespace kuzu {
namespace function {

class KCoreFrontierPair : public FrontierPair {
friend struct KCoreEdgeCompute;

public:
KCoreFrontierPair(std::shared_ptr<GDSFrontier> curFrontier,
std::shared_ptr<GDSFrontier> nextFrontier, uint64_t maxThreads,
table_id_map_t<offset_t> numNodesMap, storage::MemoryManager* mm)
: FrontierPair(curFrontier, nextFrontier, maxThreads), numNodesMap{numNodesMap} {
for (const auto& [tableID, curNumNodes] : numNodesMap) {
vertexValueMap.allocate(tableID, curNumNodes, mm);
nextVertexValueMap.allocate(tableID, curNumNodes, mm);
auto data = vertexValueMap.getData(tableID);
auto nextData = nextVertexValueMap.getData(tableID);
for (auto i = 0u; i < curNumNodes; ++i) {
data[i].store(0, std::memory_order_relaxed);
nextData[i].store(0, std::memory_order_relaxed);
}
}
}

void initRJFromSource(common::nodeID_t /* source */) override{};

void pinCurrFrontier(common::table_id_t tableID) override {
FrontierPair::pinCurrFrontier(tableID);
curDenseFrontier->ptrCast<PathLengths>()->pinCurFrontierTableID(tableID);
}

void pinNextFrontier(common::table_id_t tableID) override {
FrontierPair::pinNextFrontier(tableID);
nextDenseFrontier->ptrCast<PathLengths>()->pinNextFrontierTableID(tableID);
}

void pinVertexValues(common::table_id_t tableID) {
curVertexValue = vertexValueMap.getData(tableID);
nextVertexValue = nextVertexValueMap.getData(tableID);
}

void beginFrontierComputeBetweenTables(table_id_t curTableID, table_id_t nextTableID) override {
FrontierPair::beginFrontierComputeBetweenTables(curTableID, nextTableID);
pinVertexValues(nextTableID);
}

void beginNewIterationInternalNoLock() override {
// Update currentFrontier and nextFrontier to be the same
for (const auto& [tableID, curNumNodes] : numNodesMap) {
for (auto i = 0u; i < curNumNodes; ++i) {
auto temp = vertexValueMap.getData(tableID)[i].load(std::memory_order_relaxed);
vertexValueMap.getData(tableID)[i].compare_exchange_strong(temp,
nextVertexValueMap.getData(tableID)[i].load(std::memory_order_relaxed),
std::memory_order_relaxed);
}
}
updateSmallestDegree();
std::swap(curDenseFrontier, nextDenseFrontier);
curDenseFrontier->ptrCast<PathLengths>()->incrementCurIter();
nextDenseFrontier->ptrCast<PathLengths>()->incrementCurIter();
}

uint64_t addToVertexDegree(common::offset_t offset, uint64_t degreeToAdd) {
nextVertexValue[offset].fetch_add(degreeToAdd, std::memory_order_relaxed);
return curVertexValue[offset].fetch_add(degreeToAdd, std::memory_order_relaxed);
}

// Called to remove degrees from a neighbouring vertex
// Returns whether the neighbouring vertex should be set as active or not
bool removeFromVertex(common::nodeID_t nodeID) {
int curSmallest = curSmallestDegree.load(std::memory_order_relaxed);
int nextVertexDegree = nextVertexValueMap.getData(nodeID.tableID)[nodeID.offset].load(
std::memory_order_relaxed);
int curVertexDegree =
vertexValueMap.getData(nodeID.tableID)[nodeID.offset].load(std::memory_order_relaxed);
// The vertex should be set as active if it will be considered in a future iteration
// The vertex should be set as inactive if it will be processed this iteration
if (nextVertexDegree > curSmallest) {
nextVertexValueMap.getData(nodeID.tableID)[nodeID.offset].fetch_sub(1,
std::memory_order_relaxed);
}
if (curVertexDegree <= curSmallest) {
return false;
}
return true;
}

void updateSmallestDegree() {
uint64_t nextSmallestDegree = UINT64_MAX;
for (const auto& [tableID, curNumNodes] : numNodesMap) {
curDenseFrontier->pinTableID(tableID);
for (auto i = 0u; i < curNumNodes; ++i) {
if (curDenseFrontier->isActive(i)) {
nextSmallestDegree = std::min(nextSmallestDegree,
vertexValueMap.getData(tableID)[i].load(std::memory_order_relaxed));
}
}
}
uint64_t smallestDegree = curSmallestDegree.load(std::memory_order_relaxed);
curSmallestDegree.compare_exchange_strong(smallestDegree, nextSmallestDegree,
std::memory_order_relaxed);
}

uint64_t getSmallestDegree() { return curSmallestDegree.load(std::memory_order_relaxed); }

uint64_t getVertexValue(common::offset_t offset) {
return curVertexValue[offset].load(std::memory_order_relaxed);
}

private:
std::atomic<uint64_t> curSmallestDegree{UINT64_MAX};
common::table_id_map_t<common::offset_t> numNodesMap;
std::atomic<uint64_t>* curVertexValue = nullptr;
std::atomic<uint64_t>* nextVertexValue = nullptr;
ObjectArraysMap<std::atomic<uint64_t>> vertexValueMap;
ObjectArraysMap<std::atomic<uint64_t>> nextVertexValueMap;
};

struct KCoreInitEdgeCompute : public EdgeCompute {
KCoreFrontierPair* frontierPair;

explicit KCoreInitEdgeCompute(KCoreFrontierPair* frontierPair) : frontierPair{frontierPair} {}

std::vector<common::nodeID_t> edgeCompute(common::nodeID_t boundNodeID,
graph::NbrScanState::Chunk& chunk, bool) override {
std::vector<common::nodeID_t> result;
result.push_back(boundNodeID);
frontierPair->addToVertexDegree(boundNodeID.offset, chunk.size());
return result;
}

std::unique_ptr<EdgeCompute> copy() override {
return std::make_unique<KCoreInitEdgeCompute>(frontierPair);
}
};

struct KCoreEdgeCompute : public EdgeCompute {
KCoreFrontierPair* frontierPair;

explicit KCoreEdgeCompute(KCoreFrontierPair* frontierPair) : frontierPair{frontierPair} {}

std::vector<common::nodeID_t> edgeCompute(common::nodeID_t boundNodeID,
graph::NbrScanState::Chunk& chunk, bool) override {
std::vector<common::nodeID_t> result;
uint64_t vertexDegree = frontierPair->getVertexValue(boundNodeID.offset);
uint64_t smallestDegree = frontierPair->getSmallestDegree();
if (vertexDegree <= smallestDegree) {
chunk.forEach([&](auto nbrNodeID, auto) {
if (frontierPair->curDenseFrontier->isActive(nbrNodeID.offset)) {
auto shouldBeSetActive = frontierPair->removeFromVertex(nbrNodeID);
if (shouldBeSetActive) {
result.push_back(nbrNodeID);
}
}
});
}
// If the node hasn't been considered this iteration, it will need to still be active in
// future iterations.
else {
result.push_back(boundNodeID);
}
return result;
}

std::unique_ptr<EdgeCompute> copy() override {
return std::make_unique<KCoreEdgeCompute>(frontierPair);
}
};

class KCoreOutputWriter : GDSOutputWriter {
public:
explicit KCoreOutputWriter(main::ClientContext* context,
processor::NodeOffsetMaskMap* outputNodeMask, KCoreFrontierPair* frontierPair)
: GDSOutputWriter{context, outputNodeMask}, frontierPair{frontierPair} {
nodeIDVector = createVector(LogicalType::INTERNAL_ID(), context->getMemoryManager());
kValueVector = createVector(LogicalType::UINT64(), context->getMemoryManager());
}

void pinTableID(common::table_id_t tableID) override {
GDSOutputWriter::pinTableID(tableID);
frontierPair->pinVertexValues(tableID);
}

void materialize(offset_t startOffset, offset_t endOffset, table_id_t tableID,
FactorizedTable& table) const {
for (auto i = startOffset; i < endOffset; ++i) {
auto nodeID = nodeID_t{i, tableID};
nodeIDVector->setValue<nodeID_t>(0, nodeID);
kValueVector->setValue<uint64_t>(0, frontierPair->getVertexValue(i));
table.append(vectors);
}
}

std::unique_ptr<KCoreOutputWriter> copy() const {
return std::make_unique<KCoreOutputWriter>(context, outputNodeMask, frontierPair);
}

private:
std::unique_ptr<ValueVector> nodeIDVector;
std::unique_ptr<ValueVector> kValueVector;
KCoreFrontierPair* frontierPair;
};

class KCoreVertexCompute : public VertexCompute {
public:
KCoreVertexCompute(storage::MemoryManager* mm, processor::GDSCallSharedState* sharedState,
std::unique_ptr<KCoreOutputWriter> outputWriter)
: mm{mm}, sharedState{sharedState}, outputWriter{std::move(outputWriter)} {
localFT = sharedState->claimLocalTable(mm);
}
~KCoreVertexCompute() override { sharedState->returnLocalTable(localFT); }

bool beginOnTable(common::table_id_t tableID) override {
outputWriter->pinTableID(tableID);
return true;
}

void vertexCompute(common::offset_t startOffset, common::offset_t endOffset,
common::table_id_t tableID) override {
outputWriter->materialize(startOffset, endOffset, tableID, *localFT);
}

std::unique_ptr<VertexCompute> copy() override {
return std::make_unique<KCoreVertexCompute>(mm, sharedState, outputWriter->copy());
}

private:
storage::MemoryManager* mm;
processor::GDSCallSharedState* sharedState;
std::unique_ptr<KCoreOutputWriter> outputWriter;
processor::FactorizedTable* localFT;
};

class KCoreDecomposition final : public GDSAlgorithm {
static constexpr char GROUP_ID_COLUMN_NAME[] = "k_degree";

public:
KCoreDecomposition() = default;
KCoreDecomposition(const KCoreDecomposition& other) : GDSAlgorithm{other} {}

/*
* Inputs are
*
* graph::ANY
*/
std::vector<common::LogicalTypeID> getParameterTypeIDs() const override {
return std::vector<LogicalTypeID>{LogicalTypeID::ANY};
}
/*
* Outputs are
*
* _node._id::INTERNAL_ID
* group_id::INT64
*/
binder::expression_vector getResultColumns(binder::Binder* binder) const override {
expression_vector columns;
auto& outputNode = bindData->getNodeOutput()->constCast<NodeExpression>();
columns.push_back(outputNode.getInternalID());
columns.push_back(binder->createVariable(GROUP_ID_COLUMN_NAME, LogicalType::INT64()));
return columns;
}
void bind(const GDSBindInput& input, main::ClientContext&) override {
auto nodeOutput = bindNodeOutput(input.binder, input.graphEntry.nodeEntries);
bindData = std::make_unique<GDSBindData>(nodeOutput);
}

void exec(processor::ExecutionContext* context) override {
auto clientContext = context->clientContext;
auto graph = sharedState->graph.get();
auto numNodesMap = graph->getNumNodesMap(clientContext->getTx());
auto numThreads = clientContext->getMaxNumThreadForExec();
auto currentFrontier = getPathLengthsFrontier(context, PathLengths::UNVISITED);
auto nextFrontier = getPathLengthsFrontier(context, 0);
auto frontierPair = std::make_unique<KCoreFrontierPair>(currentFrontier, nextFrontier,
numThreads, numNodesMap, clientContext->getMemoryManager());

frontierPair->setActiveNodesForNextIter();
frontierPair->getNextSparseFrontier().disable();

auto initEdgeCompute = std::make_unique<KCoreInitEdgeCompute>(frontierPair.get());
auto writer = std::make_unique<KCoreOutputWriter>(clientContext,
sharedState->getOutputNodeMaskMap(), frontierPair.get());
auto computeState = GDSComputeState(std::move(frontierPair), std::move(initEdgeCompute),
sharedState->getOutputNodeMaskMap());

GDSUtils::runFrontiersUntilConvergence(context, computeState, graph, ExtendDirection::BOTH,
1);

auto edgeCompute = std::make_unique<KCoreEdgeCompute>(
computeState.frontierPair->ptrCast<KCoreFrontierPair>());

computeState.edgeCompute = std::move(edgeCompute);
computeState.frontierPair->setActiveNodesForNextIter();

GDSUtils::runFrontiersUntilConvergence(context, computeState, graph, ExtendDirection::BOTH,
UINT16_MAX);
auto vertexCompute = std::make_unique<KCoreVertexCompute>(clientContext->getMemoryManager(),
sharedState.get(), std::move(writer));
GDSUtils::runVertexCompute(context, sharedState->graph.get(), *vertexCompute);
sharedState->mergeLocalTables();
}

std::unique_ptr<GDSAlgorithm> copy() const override {
return std::make_unique<KCoreDecomposition>(*this);
}
};
function_set KCoreDecompositionFunction::getFunctionSet() {
function_set result;
auto algo = std::make_unique<KCoreDecomposition>();
auto function =
std::make_unique<GDSFunction>(name, algo->getParameterTypeIDs(), std::move(algo));
result.push_back(std::move(function));
return result;
}
} // namespace function
} // namespace kuzu
1 change: 0 additions & 1 deletion src/function/gds/gds_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ void GDSUtils::scheduleFrontierTask(table_id_t boundTableID, table_id_t nbrTable
task->runSparse();
return;
}

// GDSUtils::runFrontiersUntilConvergence is called from a GDSCall operator, which is
// already executed by a worker thread Tm of the task scheduler. So this function is
// executed by Tm. Because this function will monitor the task and wait for it to
Expand Down
6 changes: 6 additions & 0 deletions src/include/function/gds/gds_function_collection.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@ struct WeaklyConnectedComponentsFunction {
static function_set getFunctionSet();
};

struct KCoreDecompositionFunction {
static constexpr const char* name = "K_CORE_DECOMPOSITION";

static function_set getFunctionSet();
};

struct VarLenJoinsFunction {
static constexpr const char* name = "VAR_LEN_JOINS";

Expand Down
10 changes: 10 additions & 0 deletions test/test_files/function/gds/basic.test
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,16 @@ Hubert Blaine Wolfeschlegelsteinhausenbergerdorff|7
#|ABFsUni|0
#|CsWork|1
#|DEsWork|1
-STATEMENT CALL k_core_decomposition('PK') RETURN _node.fName, k_degree;
---- 8
Alice|6
Bob|6
Carol|6
Dan|6
Elizabeth|1
Farooq|1
Greg|1
Hubert Blaine Wolfeschlegelsteinhausenbergerdorff|0
-STATEMENT CALL page_rank('PK') RETURN _node.fName, rank;
---- 8
Alice|0.125000
Expand Down
Loading

0 comments on commit 6e4cad2

Please sign in to comment.