Skip to content

Commit

Permalink
Added some overloads, increased use of streams in methods
Browse files Browse the repository at this point in the history
  • Loading branch information
markusbattarbee committed Jan 8, 2024
1 parent 1028c5c commit 07d1ee4
Show file tree
Hide file tree
Showing 4 changed files with 174 additions and 26 deletions.
33 changes: 32 additions & 1 deletion include/hashinator/hashinator.h
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,16 @@ class Hashmap {
return *this;
}

/** Copy assign but using a provided stream */
void overwrite(const Hashmap<KEY_TYPE,VAL_TYPE>& other, split_gpuStream_t stream = 0) {
if (this == &other) {
return;
}
SPLIT_CHECK_ERR(split_gpuMemcpyAsync(_mapInfo,other._mapInfo, sizeof(MapInfo), split_gpuMemcpyDeviceToDevice, stream));
buckets.overwrite(other.buckets);
return;
}

Hashmap& operator=(Hashmap<KEY_TYPE,VAL_TYPE>&& other) {
if (this == &other) {
return *this;
Expand Down Expand Up @@ -450,12 +460,33 @@ class Hashmap {
}
#endif

// Try to grow our buckets until we achieve a targetLF load factor
#ifdef HASHINATOR_CPU_ONLY_MODE
// Try to grow our buckets until we achieve a targetLF load factor
void resize_to_lf(float targetLF = 0.5) {
while (load_factor() > targetLF) {
rehash(_mapInfo->sizePower + 1);
}
}
#else
// Try to grow our buckets until we achieve a targetLF load factor
void resize_to_lf(float targetLF = 0.5, targets t = targets::host, split_gpuStream_t s = 0) {
while (load_factor() > targetLF) {
switch (t) {
case targets::host:
rehash(_mapInfo->sizePower + 1);
break;
case targets::device:
device_rehash(_mapInfo->sizePower + 1, s);
break;
default:
std::cerr << "Defaulting to host rehashing" << std::endl;
resize(_mapInfo->sizePower + 1, targets::host);
break;
}
}
return;
}
#endif

#ifdef HASHINATOR_CPU_ONLY_MODE
void resize(int newSizePower) { rehash(newSizePower); }
Expand Down
17 changes: 12 additions & 5 deletions include/splitvector/split_allocators.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ namespace split {
#define SPLIT_CHECK_ERR(err) (split::cuda_error(err, __FILE__, __LINE__))
static void cuda_error(cudaError_t err, const char* file, int line) {
if (err != cudaSuccess) {
printf("\n\n%s in %s at line %d\n", cudaGetErrorString(err), file, line);
std::cerr<<"\n\n"<<cudaGetErrorString(err)<<" in "<<file<<" at line "<<line<<"\n";
abort();
}
}
Expand All @@ -43,7 +43,7 @@ static void cuda_error(cudaError_t err, const char* file, int line) {
#define SPLIT_CHECK_ERR(err) (split::hip_error(err, __FILE__, __LINE__))
static void hip_error(hipError_t err, const char* file, int line) {
if (err != hipSuccess) {
printf("\n\n%s in %s at line %d\n", hipGetErrorString(err), file, line);
std::cerr<<"\n\n"<<hipGetErrorString(err)<<" in "<<file<<" at line "<<line<<"\n";
abort();
}
}
Expand Down Expand Up @@ -105,9 +105,16 @@ class split_unified_allocator {
return ret;
}

void deallocate(pointer p, size_type) { SPLIT_CHECK_ERR(split_gpuFree(p)); }

static void deallocate(void* p, size_type) { SPLIT_CHECK_ERR(split_gpuFree(p)); }
void deallocate(pointer p, size_type n) {
if (n != 0 && p != 0) {
SPLIT_CHECK_ERR(split_gpuFree(p));
}
}
static void deallocate(void* p, size_type n) {
if (n != 0 && p != 0) {
SPLIT_CHECK_ERR(split_gpuFree(p));
}
}

size_type max_size() const throw() {
size_type max = static_cast<size_type>(-1) / sizeof(value_type);
Expand Down
66 changes: 66 additions & 0 deletions include/splitvector/split_tools.h
Original file line number Diff line number Diff line change
Expand Up @@ -651,6 +651,42 @@ uint32_t copy_if_raw(split::SplitVector<T, split::split_unified_allocator<T>>& i
return numel;
}

template <typename T, typename Rule, size_t BLOCKSIZE = 1024, size_t WARP = WARPLENGTH>
uint32_t copy_if_raw(T* input, T* output, size_t size, Rule rule,
size_t nBlocks, Cuda_mempool& mPool, split_gpuStream_t s = 0) {

uint32_t* d_counts;
uint32_t* d_offsets;
d_counts = (uint32_t*)mPool.allocate(nBlocks * sizeof(uint32_t));
SPLIT_CHECK_ERR(split_gpuStreamSynchronize(s));
SPLIT_CHECK_ERR(split_gpuMemsetAsync(d_counts, 0, nBlocks * sizeof(uint32_t), s));

// Phase 1 -- Calculate per warp workload
split::tools::scan_reduce_raw<<<nBlocks, BLOCKSIZE, 0, s>>>(input, d_counts, rule, size);
d_offsets = (uint32_t*)mPool.allocate(nBlocks * sizeof(uint32_t));
SPLIT_CHECK_ERR(split_gpuStreamSynchronize(s));
SPLIT_CHECK_ERR(split_gpuMemsetAsync(d_offsets, 0, nBlocks * sizeof(uint32_t),s));

// Step 2 -- Exclusive Prefix Scan on offsets
if (nBlocks == 1) {
split_prefix_scan_raw<uint32_t, 2, WARP>(d_counts, d_offsets, mPool, nBlocks, s);
} else {
split_prefix_scan_raw<uint32_t, BLOCKSIZE, WARP>(d_counts, d_offsets, mPool, nBlocks, s);
}
SPLIT_CHECK_ERR(split_gpuStreamSynchronize(s));

// Step 3 -- Compaction
uint32_t* retval = (uint32_t*)mPool.allocate(sizeof(uint32_t));
split::tools::split_compact_raw<T, Rule, BLOCKSIZE, WARP>
<<<nBlocks, BLOCKSIZE, 2 * (BLOCKSIZE / WARP) * sizeof(unsigned int), s>>>(
input, d_counts, d_offsets, output, rule, size, nBlocks, retval);
SPLIT_CHECK_ERR(split_gpuStreamSynchronize(s));
uint32_t numel;
SPLIT_CHECK_ERR(split_gpuMemcpyAsync(&numel, retval, sizeof(uint32_t), split_gpuMemcpyDeviceToHost, s));
SPLIT_CHECK_ERR(split_gpuStreamSynchronize(s));
return numel;
}

/**
* @brief Same as copy_keys_if but using raw memory
*/
Expand Down Expand Up @@ -708,6 +744,20 @@ estimateMemoryForCompaction(const split::SplitVector<T, split::split_unified_all
return 8 * nBlocks * sizeof(uint32_t);
}

template <int BLOCKSIZE = 1024>
[[nodiscard]] size_t
estimateMemoryForCompaction(const size_t inputSize) noexcept {
// Figure out Blocks to use
size_t _s = std::ceil((float(inputSize)) / (float)BLOCKSIZE);
size_t nBlocks = nextPow2(_s);
if (nBlocks == 0) {
nBlocks += 1;
}

// Allocate with Mempool
return 8 * nBlocks * sizeof(uint32_t);
}

/**
* @brief Same as copy_if but only for Hashinator keys
*/
Expand Down Expand Up @@ -827,5 +877,21 @@ void copy_if(split::SplitVector<T, split::split_unified_allocator<T>>& input,
output.erase(&output[len], output.end());
}

template <typename T, typename Rule, size_t BLOCKSIZE = 1024, size_t WARP = WARPLENGTH>
size_t copy_if(T* input, T* output, size_t size, Rule rule, void* stack, size_t max_size,
split_gpuStream_t s = 0) {

// Figure out Blocks to use
size_t _s = std::ceil((float(size)) / (float)BLOCKSIZE);
size_t nBlocks = nextPow2(_s);
if (nBlocks == 0) {
nBlocks += 1;
}
assert(stack && "Invalid stack!");
Cuda_mempool mPool(stack, max_size);
auto len = copy_if_raw(input, output, size, rule, nBlocks, mPool, s);
return len;
}

} // namespace tools
} // namespace split
84 changes: 64 additions & 20 deletions include/splitvector/splitvec.h
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,33 @@ class SplitVector {
_location = Residency::host;
return *this;
}

/** Copy assign but using a provided stream */
HOSTONLY void overwrite(const SplitVector<T, Allocator>& other, split_gpuStream_t stream = 0) {
if (this == &other) {
return;
}
// Match other's size prior to copying
resize(other.size(), true, stream);
auto copySafe = [&]() -> void {
for (size_t i = 0; i < size(); i++) {
_data[i] = other._data[i];
}
};

if constexpr (std::is_trivially_copyable<T>::value) {
if (other._location == Residency::device) {
_location = Residency::device;
optimizeGPU(stream);
SPLIT_CHECK_ERR(split_gpuMemcpyAsync(_data, other._data, size() * sizeof(T), split_gpuMemcpyDeviceToDevice,stream));
return;
}
}
copySafe();
_location = Residency::host;
return;
}

#endif

/**
Expand Down Expand Up @@ -616,7 +643,7 @@ class SplitVector {
*
* @param requested_space The size of the requested space.
*/
HOSTONLY void reallocate(size_t requested_space) {
HOSTONLY void reallocate(size_t requested_space, split_gpuStream_t stream = 0) {
if (requested_space == 0) {
if (_data != nullptr) {
_deallocate_and_destroy(capacity(), _data);
Expand All @@ -633,19 +660,31 @@ class SplitVector {
this->_deallocate();
throw std::bad_alloc();
}

// Copy over
for (size_t i = 0; i < size(); i++) {
_new_data[i] = _data[i];
}

// Deallocate old space
_deallocate_and_destroy(capacity(), _data);

// Store addresses
const size_t __size = *_size;
const size_t __old_capacity = *_capacity;
T* __new_data = _new_data;
T* __data = _data;
// Swap pointers & update capacity
// Size remains the same ofc
_data = _new_data;
*_capacity = requested_space;
// Perform copy on device
if (__size>0) {
int device;
SPLIT_CHECK_ERR(split_gpuGetDevice(&device));
SPLIT_CHECK_ERR(split_gpuMemPrefetchAsync(__data, __size * sizeof(T), device, stream));//
SPLIT_CHECK_ERR(split_gpuMemPrefetchAsync(__new_data, requested_space * sizeof(T), device, stream));
SPLIT_CHECK_ERR(split_gpuStreamSynchronize(stream));
SPLIT_CHECK_ERR(split_gpuMemcpy(__new_data, __data, __size * sizeof(T), split_gpuMemcpyDeviceToDevice));
SPLIT_CHECK_ERR(split_gpuStreamSynchronize(stream));
}
// for (size_t i = 0; i < size(); i++) {
// _new_data[i] = _data[i];
// }

// Deallocate old space
_deallocate_and_destroy(__old_capacity, __data);
return;
}

Expand All @@ -660,8 +699,8 @@ class SplitVector {
* will be invalidated after a call.
*/
HOSTONLY
void reserve(size_t requested_space, bool eco = false) {
size_t current_space = *_capacity;
void reserve(size_t requested_space, bool eco = false, split_gpuStream_t stream = 0) {
const size_t current_space = *_capacity;
// Vector was default initialized
if (_data == nullptr) {
_deallocate();
Expand All @@ -670,9 +709,14 @@ class SplitVector {
return;
}
// Nope.
const size_t currentSize = size();
if (requested_space <= current_space) {
for (size_t i = size(); i < requested_space; ++i) {
_allocator.construct(&_data[i], T());
if (std::is_trivially_constructible<T>::value && _location == Residency::device) {
SPLIT_CHECK_ERR( split_gpuMemsetAsync(&_data[currentSize],0,(requested_space-currentSize)*sizeof(T), stream) );
} else {
for (size_t i = currentSize; i < requested_space; ++i) {
_allocator.construct(&_data[i], T());
}
}
return;
}
Expand All @@ -681,7 +725,7 @@ class SplitVector {
if (!eco) {
requested_space *= _alloc_multiplier;
}
reallocate(requested_space);
reallocate(requested_space,stream);
return;
}

Expand All @@ -697,13 +741,13 @@ class SplitVector {
* will be invalid from now on.
*/
HOSTONLY
void resize(size_t newSize, bool eco = false) {
void resize(size_t newSize, bool eco = false, split_gpuStream_t stream = 0) {
// Let's reserve some space and change our size
if (newSize <= size()) {
*_size = newSize;
return;
}
reserve(newSize, eco);
reserve(newSize, eco, stream);
*_size = newSize;
}

Expand All @@ -729,21 +773,21 @@ class SplitVector {
* @brief Increase the capacity of the SplitVector by 1.
*/
HOSTONLY
void grow() { reserve(capacity() + 1); }
void grow(split_gpuStream_t stream = 0) { reserve(capacity() + 1, false, stream); }

/**
* @brief Reduce the capacity of the SplitVector to match its size.
*/
HOSTONLY
void shrink_to_fit() {
void shrink_to_fit(split_gpuStream_t stream = 0) {
size_t curr_cap = *_capacity;
size_t curr_size = *_size;

if (curr_cap == curr_size) {
return;
}

reallocate(curr_size);
reallocate(curr_size,stream);
return;
}

Expand Down

0 comments on commit 07d1ee4

Please sign in to comment.