Skip to content

Commit

Permalink
Hash Aggregation parallelization
Browse files Browse the repository at this point in the history
  • Loading branch information
benjaminwinger committed Dec 19, 2024
1 parent cc63190 commit 1590c35
Show file tree
Hide file tree
Showing 15 changed files with 388 additions and 133 deletions.
1 change: 1 addition & 0 deletions src/function/aggregate/collect.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ static void combine(uint8_t* state_, uint8_t* otherState_, MemoryManager* /*memo
state->factorizedTable->merge(*otherState->factorizedTable);
}
otherState->factorizedTable.reset();
otherState->isNull = true;
}

static std::unique_ptr<FunctionBindData> bindFunc(ScalarBindFuncInput input) {
Expand Down
2 changes: 1 addition & 1 deletion src/include/common/constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ struct BufferPoolConstants {
#else
static constexpr uint64_t DEFAULT_VM_REGION_MAX_SIZE = static_cast<uint64_t>(1) << 43; // (8TB)
#endif
static constexpr uint64_t DEFAULT_BUFFER_POOL_SIZE_FOR_TESTING = 1ull << 26; // (64MB)
static constexpr uint64_t DEFAULT_BUFFER_POOL_SIZE_FOR_TESTING = 1ull << 28; // (256MB)
};

struct StorageConstants {
Expand Down
67 changes: 63 additions & 4 deletions src/include/processor/operator/aggregate/aggregate_hash_table.h
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
#pragma once

#include <cstdint>

#include "aggregate_input.h"
#include "common/vector/value_vector.h"
#include "function/aggregate_function.h"
#include "processor/result/base_hash_table.h"
#include "processor/result/factorized_table.h"
#include "processor/result/factorized_table_schema.h"
#include "storage/buffer_manager/memory_manager.h"

namespace kuzu {
Expand Down Expand Up @@ -81,7 +86,8 @@ class AggregateHashTable : public BaseHashTable {
common::ValueVector* aggregateVector);

//! merge aggregate hash table by combining aggregate states under the same key
void merge(AggregateHashTable& other);
void merge(const FactorizedTable& other);
void merge(const AggregateHashTable& other) { merge(*other.factorizedTable); }

void finalizeAggregateStates();

Expand Down Expand Up @@ -110,7 +116,7 @@ class AggregateHashTable : public BaseHashTable {
const std::vector<common::ValueVector*>& dependentKeyVectors,
common::DataChunkState* leadingState);

private:
protected:
void initializeFT(const std::vector<function::AggregateFunction>& aggregateFunctions,
FactorizedTableSchema tableSchema);

Expand Down Expand Up @@ -152,7 +158,7 @@ class AggregateHashTable : public BaseHashTable {
const std::vector<common::ValueVector*>& unFlatKeyVectors,
const std::vector<AggregateInput>& aggregateInputs, uint64_t resultSetMultiplicity);

void fillEntryWithInitialNullAggregateState(uint8_t* entry);
void fillEntryWithInitialNullAggregateState(FactorizedTable& factorizedTable, uint8_t* entry);

//! find an uninitialized hash slot for given hash and fill hash slot with block id and offset
void fillHashSlot(common::hash_t hash, uint8_t* groupByKeysAndAggregateStateBuffer);
Expand Down Expand Up @@ -200,14 +206,17 @@ class AggregateHashTable : public BaseHashTable {
function::AggregateFunction& aggregateFunction, common::ValueVector* aggVector,
uint64_t multiplicity, uint32_t aggStateOffset);

virtual FactorizedTable& getFactorizedTable(common::hash_t /*entryHash*/) {
return *factorizedTable;
}

protected:
uint32_t hashColIdxInFT{};
std::unique_ptr<uint64_t[]> mayMatchIdxes;
std::unique_ptr<uint64_t[]> noMatchIdxes;
std::unique_ptr<uint64_t[]> entryIdxesToInitialize;
std::unique_ptr<HashSlot*[]> hashSlotsToUpdateAggState;

private:
std::vector<common::LogicalType> payloadTypes;
std::vector<function::AggregateFunction> aggregateFunctions;

Expand All @@ -231,5 +240,55 @@ struct AggregateHashTableUtils {
const common::LogicalType& distinctKeyType);
};

class HashAggregateSharedState;
class PartitioningAggregateHashTable : public AggregateHashTable {
public:
static constexpr size_t NUM_PARTITIONS = 128;

PartitioningAggregateHashTable(HashAggregateSharedState* sharedState,
storage::MemoryManager& memoryManager, std::vector<common::LogicalType> keyTypes,
std::vector<common::LogicalType> payloadTypes,
const std::vector<function::AggregateFunction>& aggregateFunctions,
const std::vector<common::LogicalType>& distinctAggKeyTypes,
FactorizedTableSchema tableSchema)
: AggregateHashTable(memoryManager, std::move(keyTypes), std::move(payloadTypes),
aggregateFunctions, distinctAggKeyTypes, NUM_PARTITIONS * 1024, tableSchema.copy()),
partitions{}, sharedState{sharedState}, tableSchema{std::move(tableSchema)},
memoryManager{memoryManager} {
std::generate(partitions.begin(), partitions.end(), [&]() {
return std::make_unique<FactorizedTable>(&memoryManager, this->tableSchema.copy());
});
}

uint64_t append(const std::vector<common::ValueVector*>& flatKeyVectors,
const std::vector<common::ValueVector*>& unFlatKeyVectors,
const std::vector<common::ValueVector*>& dependentKeyVectors,
common::DataChunkState* leadingState, const std::vector<AggregateInput>& aggregateInputs,
uint64_t resultSetMultiplicity);

void mergeAll();
uint64_t getNumEntries() const {
uint64_t numEntries = 0;
for (const auto& partition : partitions) {
numEntries += partition->getNumTuples();
}
return numEntries;
}

protected:
FactorizedTable& getFactorizedTable(common::hash_t entryHash) override {
return *partitions[entryHash % NUM_PARTITIONS];
}

private:
uint8_t* appendEmptyTuple(uint8_t partitionIdx);

private:
std::array<std::unique_ptr<FactorizedTable>, NUM_PARTITIONS> partitions;
HashAggregateSharedState* sharedState;
FactorizedTableSchema tableSchema;
storage::MemoryManager& memoryManager;
};

} // namespace processor
} // namespace kuzu
62 changes: 51 additions & 11 deletions src/include/processor/operator/aggregate/hash_aggregate.h
Original file line number Diff line number Diff line change
@@ -1,31 +1,48 @@
#pragma once

#include <sys/types.h>

#include <cstdint>
#include <memory>

#include "aggregate_hash_table.h"
#include "common/copy_constructors.h"
#include "common/mpsc_queue.h"
#include "common/vector/value_vector.h"
#include "processor/operator/aggregate/base_aggregate.h"
#include "processor/result/factorized_table.h"
#include "processor/result/factorized_table_schema.h"

namespace kuzu {
namespace processor {

struct HashAggregateInfo;

// NOLINTNEXTLINE(cppcoreguidelines-virtual-class-destructor): This is a final class.
class HashAggregateSharedState final : public BaseAggregateSharedState {

public:
explicit HashAggregateSharedState(
const std::vector<function::AggregateFunction>& aggregateFunctions)
: BaseAggregateSharedState{aggregateFunctions}, limitCounter{0},
limitNumber{common::INVALID_LIMIT} {}
const std::vector<function::AggregateFunction>& aggregateFunctions);

void appendAggregateHashTable(std::unique_ptr<AggregateHashTable> aggregateHashTable);
void initPartitions(main::ClientContext* context, std::vector<common::LogicalType> keyDataTypes,
std::vector<common::LogicalType> payloadDataTypes, HashAggregateInfo& info,
std::vector<common::LogicalType> types);

void combineAggregateHashTable(storage::MemoryManager& memoryManager);
// Will return either the original factorized table for reuse, or a nullptr if the
std::unique_ptr<FactorizedTable> mergeTable(uint8_t partitionIdx,
std::unique_ptr<FactorizedTable> aggregateHashTable);

void tryMergeQueue();
void finalizeAggregateHashTable();

std::pair<uint64_t, uint64_t> getNextRangeToRead() override;

inline uint8_t* getRow(uint64_t idx) { return globalAggregateHashTable->getEntry(idx); }
void scan(std::span<uint8_t*> entries, std::vector<common::ValueVector*>& keyVectors,
common::offset_t startOffset, common::offset_t numRowsToScan,
std::vector<uint32_t>& columnIndices);

FactorizedTable* getFactorizedTable() { return globalAggregateHashTable->getFactorizedTable(); }
uint64_t getNumTuples() const;

uint64_t getCurrentOffset() const { return currentOffset; }

Expand All @@ -34,11 +51,33 @@ class HashAggregateSharedState final : public BaseAggregateSharedState {

void setLimitNumber(uint64_t num) { limitNumber = num; }

const FactorizedTableSchema& getTableSchema() const {
return *globalPartitions[0].hashTable->getFactorizedTable()->getTableSchema();
}

void setThreadFinishedProducing() { numThreadsFinishedProducing++; }
bool allThreadsFinishedProducing() const { return numThreadsFinishedProducing >= numThreads; }

void registerThread() { numThreads++; }

void assertFinalized() const;

protected:
std::tuple<const FactorizedTable*, common::offset_t> getPartitionForOffset(
common::offset_t offset) const;

private:
std::vector<std::unique_ptr<AggregateHashTable>> localAggregateHashTables;
std::unique_ptr<AggregateHashTable> globalAggregateHashTable;
struct Partition {
std::unique_ptr<AggregateHashTable> hashTable;
std::mutex mtx;
common::MPSCQueue<std::unique_ptr<FactorizedTable>> queuedTuples;
bool finalized = false;
};
std::array<Partition, PartitioningAggregateHashTable::NUM_PARTITIONS> globalPartitions;
std::atomic_uint64_t limitCounter;
uint64_t limitNumber;
std::atomic<size_t> numThreadsFinishedProducing;
std::atomic<size_t> numThreads;
};

struct HashAggregateInfo {
Expand All @@ -57,9 +96,10 @@ struct HashAggregateLocalState {
std::vector<common::ValueVector*> unFlatKeyVectors;
std::vector<common::ValueVector*> dependentKeyVectors;
common::DataChunkState* leadingState = nullptr;
std::unique_ptr<AggregateHashTable> aggregateHashTable;
std::unique_ptr<PartitioningAggregateHashTable> aggregateHashTable;

void init(ResultSet& resultSet, main::ClientContext* context, HashAggregateInfo& info,
void init(HashAggregateSharedState* sharedState, ResultSet& resultSet,
main::ClientContext* context, HashAggregateInfo& info,
std::vector<function::AggregateFunction>& aggregateFunctions,
std::vector<common::LogicalType> types);
uint64_t append(const std::vector<AggregateInput>& aggregateInputs,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class HashAggregateScan : public BaseAggregateScan {
std::vector<common::ValueVector*> groupByKeyVectors;
std::shared_ptr<HashAggregateSharedState> sharedState;
std::vector<uint32_t> groupByKeyVectorsColIdxes;
std::vector<uint8_t*> entries;
};

} // namespace processor
Expand Down
3 changes: 3 additions & 0 deletions src/include/processor/result/factorized_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class DataBlock {
}

uint8_t* getData() const { return block->getBuffer().data(); }
std::span<uint8_t> getSizedData() const { return block->getBuffer(); }
uint8_t* getWritableData() const { return block->getBuffer().last(freeSize).data(); }
void resetNumTuplesAndFreeSize() {
freeSize = block->getBuffer().size();
Expand Down Expand Up @@ -104,6 +105,8 @@ class KUZU_API FactorizedTable {
void scan(std::vector<common::ValueVector*>& vectors, ft_tuple_idx_t tupleIdx,
uint64_t numTuplesToScan, std::vector<uint32_t>& colIdxToScan) const;
// TODO(Guodong): Unify these two interfaces along with `readUnflatCol`.
// startPos is the starting position in the tuplesToRead, not the starting position in the
// factorizedTable
void lookup(std::vector<common::ValueVector*>& vectors, std::vector<uint32_t>& colIdxesToScan,
uint8_t** tuplesToRead, uint64_t startPos, uint64_t numTuplesToRead) const;
void lookup(std::vector<common::ValueVector*>& vectors,
Expand Down
Loading

0 comments on commit 1590c35

Please sign in to comment.