Skip to content

Commit

Permalink
Re-size global aggregate hash tables based on the number of tuples in…
Browse files Browse the repository at this point in the history
… the queue
  • Loading branch information
benjaminwinger committed Jan 9, 2025
1 parent eab22da commit 32b124c
Show file tree
Hide file tree
Showing 6 changed files with 24 additions and 10 deletions.
2 changes: 1 addition & 1 deletion src/include/common/constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ struct BufferPoolConstants {
#else
static constexpr uint64_t DEFAULT_VM_REGION_MAX_SIZE = static_cast<uint64_t>(1) << 43; // (8TB)
#endif
static constexpr uint64_t DEFAULT_BUFFER_POOL_SIZE_FOR_TESTING = 1ull << 28; // (256MB)
static constexpr uint64_t DEFAULT_BUFFER_POOL_SIZE_FOR_TESTING = 1ull << 26; // (64MB)
};

struct StorageConstants {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ class AggregateHashTable : public BaseHashTable {
void finalizeAggregateStates();

void resize(uint64_t newSize);
void resizeHashTableIfNecessary(uint32_t maxNumDistinctHashKeys);

AggregateHashTable createEmptyCopy() const { return AggregateHashTable(*this); }

Expand All @@ -114,8 +115,6 @@ class AggregateHashTable : public BaseHashTable {
uint64_t matchFlatVecWithFTColumn(common::ValueVector* vector, uint64_t numMayMatches,
uint64_t& numNoMatches, uint32_t colIdx);

void resizeHashTableIfNecessary(uint32_t maxNumDistinctHashKeys);

void findHashSlots(const std::vector<common::ValueVector*>& flatKeyVectors,
const std::vector<common::ValueVector*>& unFlatKeyVectors,
const std::vector<common::ValueVector*>& dependentKeyVectors,
Expand Down Expand Up @@ -270,6 +269,9 @@ struct AggregateHashTableUtils {

// NOLINTNEXTLINE(cppcoreguidelines-virtual-class-destructor): This is a final class.
class HashAggregateSharedState;

// Fixed-sized Aggregate hash table that flushes tuples into partitions in the
// HashAggregateSharedState when full
class PartitioningAggregateHashTable final : public AggregateHashTable {
public:
PartitioningAggregateHashTable(HashAggregateSharedState* sharedState,
Expand All @@ -279,7 +281,8 @@ class PartitioningAggregateHashTable final : public AggregateHashTable {
const std::vector<common::LogicalType>& distinctAggKeyTypes,
FactorizedTableSchema tableSchema)
: AggregateHashTable(memoryManager, std::move(keyTypes), std::move(payloadTypes),
aggregateFunctions, distinctAggKeyTypes, 0 /*minimum size*/, tableSchema.copy()),
aggregateFunctions, distinctAggKeyTypes,
common::DEFAULT_VECTOR_CAPACITY /*minimum size*/, tableSchema.copy()),
sharedState{sharedState}, tableSchema{std::move(tableSchema)} {}

uint64_t append(const std::vector<common::ValueVector*>& flatKeyVectors,
Expand Down
13 changes: 12 additions & 1 deletion src/processor/operator/aggregate/hash_aggregate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -205,14 +205,25 @@ void HashAggregateSharedState::finalizeAggregateHashTable(
partition.hashTable =
std::make_unique<AggregateHashTable>(localHashTable.createEmptyCopy());
}
std::vector<Partition::TupleBlock*> partitionsToMerge;
partitionsToMerge.reserve(partition.queuedTuples.approxSize());
Partition::TupleBlock* partitionToMerge = nullptr;
// Over-estimate the total number of distinct groups using the total number of tuples
// after the per-thread pre-aggregation
// This will probably use more memory than necessary, but will greatly reduce the need
// to resize the hash table. It will only use memory proportional to what is already
// used by the queuedTuples already, and the hash slots are usually small in comparison
// to the full tuple data.
auto& ft = partition.headBlock.load()->table;
partition.hashTable->resizeHashTableIfNecessary(
partition.queuedTuples.approxSize() * ft.getNumTuplesPerBlock() +
partition.headBlock.load()->numTuplesWritten);
while (partition.queuedTuples.pop(partitionToMerge)) {
KU_ASSERT(partitionToMerge->numTuplesWritten ==

Check warning on line 222 in src/processor/operator/aggregate/hash_aggregate.cpp

View check run for this annotation

Codecov / codecov/patch

src/processor/operator/aggregate/hash_aggregate.cpp#L222

Added line #L222 was not covered by tests
partitionToMerge->table.getNumTuplesPerBlock());
partition.hashTable->merge(std::move(partitionToMerge->table));
delete partitionToMerge;

Check warning on line 225 in src/processor/operator/aggregate/hash_aggregate.cpp

View check run for this annotation

Codecov / codecov/patch

src/processor/operator/aggregate/hash_aggregate.cpp#L224-L225

Added lines #L224 - L225 were not covered by tests
}
auto& ft = partition.headBlock.load()->table;
if (partition.headBlock.load()->numTuplesWritten > 0) {
ft.resize(partition.headBlock.load()->numTuplesWritten);
partition.hashTable->merge(std::move(ft));
Expand Down
2 changes: 1 addition & 1 deletion src/processor/operator/hash_join/hash_join_probe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ void HashJoinProbe::initLocalStateInternal(ResultSet* resultSet, ExecutionContex
// We only need to read nonKeys from the factorizedTable. Key columns are always kept as first k
// columns in the factorizedTable, so we skip the first k columns.
KU_ASSERT(probeDataInfo.keysDataPos.size() + probeDataInfo.getNumPayloads() + 2 ==
sharedState->getHashTable()->getTableSchema().getNumColumns());
sharedState->getHashTable()->getTableSchema()->getNumColumns());
columnIdxsToReadFrom.resize(probeDataInfo.getNumPayloads());
iota(columnIdxsToReadFrom.begin(), columnIdxsToReadFrom.end(),
probeDataInfo.keysDataPos.size());
Expand Down
6 changes: 3 additions & 3 deletions src/processor/result/factorized_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -297,9 +297,9 @@ void FactorizedTable::setNonOverflowColNull(uint8_t* nullBuffer, ft_col_idx_t co

void FactorizedTable::clear() {
numTuples = 0;
flatTupleBlockCollection = std::make_unique<DataBlockCollection>(
tableSchema.getNumBytesPerTuple(), numFlatTuplesPerBlock);
unFlatTupleBlockCollection = std::make_unique<DataBlockCollection>();
new (flatTupleBlockCollection.get())
DataBlockCollection(tableSchema.getNumBytesPerTuple(), numFlatTuplesPerBlock);
new (unFlatTupleBlockCollection.get()) DataBlockCollection();
inMemOverflowBuffer->resetBuffer();
}

Expand Down
2 changes: 1 addition & 1 deletion test/test_files/call/call.test
Original file line number Diff line number Diff line change
Expand Up @@ -274,4 +274,4 @@ Binder exception: a.fName has type PROPERTY but LITERAL,PARAMETER was expected.
-LOG BMInfo
-STATEMENT CALL bm_info() RETURN mem_limit
---- 1
268435456
67108864

0 comments on commit 32b124c

Please sign in to comment.