diff --git a/include/hashinator/hashers.h b/include/hashinator/hashers.h index a32aa88..86e96de 100644 --- a/include/hashinator/hashers.h +++ b/include/hashinator/hashers.h @@ -152,10 +152,12 @@ class Hasher { } // Reset wrapper for all elements - static void reset_all(hash_pair* dst, Hashinator::Info* info, size_t len, split_gpuStream_t s = 0) { + static void reset_all(hash_pair* dst, Hashinator::Info* info, size_t len, + split_gpuStream_t s = 0) { // fast ceil for positive ints size_t blocksNeeded = len / defaults::MAX_BLOCKSIZE + (len % defaults::MAX_BLOCKSIZE != 0); - reset_all_to_empty<<>>(dst,info, len); + reset_all_to_empty + <<>>(dst,info, len); SPLIT_CHECK_ERR(split_gpuStreamSynchronize(s)); } diff --git a/include/hashinator/hashinator.h b/include/hashinator/hashinator.h index 81e6798..b5fe8a0 100644 --- a/include/hashinator/hashinator.h +++ b/include/hashinator/hashinator.h @@ -85,13 +85,14 @@ class Hashmap { void preallocate_device_handles() { #ifndef HASHINATOR_CPU_ONLY_MODE SPLIT_CHECK_ERR(split_gpuMalloc((void**)&device_map, sizeof(Hashmap))); - device_buckets=reinterpret_cast>*>(reinterpret_cast(device_map)+offsetof(Hashmap,buckets)); + device_buckets = reinterpret_cast>*>( + reinterpret_cast(device_map) + offsetof(Hashmap, buckets)); #endif } // Deallocates the bookeepping info and the device pointer void deallocate_device_handles() { - if (device_map==nullptr){ + if (device_map == nullptr) { return; } #ifndef HASHINATOR_CPU_ONLY_MODE @@ -111,9 +112,9 @@ class Hashmap { *_mapInfo = MapInfo(5); buckets = split::SplitVector>( 1 << _mapInfo->sizePower, hash_pair(EMPTYBUCKET, VAL_TYPE())); - #ifndef HASHINATOR_CPU_ONLY_MODE +#ifndef HASHINATOR_CPU_ONLY_MODE SPLIT_CHECK_ERR(split_gpuMemcpy(device_map, this, sizeof(Hashmap), split_gpuMemcpyHostToDevice)); - #endif +#endif }; Hashmap(int sizepower) { @@ -122,9 +123,9 @@ class Hashmap { *_mapInfo = MapInfo(sizepower); buckets = split::SplitVector>( 1 << _mapInfo->sizePower, hash_pair(EMPTYBUCKET, VAL_TYPE())); - #ifndef HASHINATOR_CPU_ONLY_MODE +#ifndef HASHINATOR_CPU_ONLY_MODE SPLIT_CHECK_ERR(split_gpuMemcpy(device_map, this, sizeof(Hashmap), split_gpuMemcpyHostToDevice)); - #endif +#endif }; Hashmap(const Hashmap& other) { @@ -132,57 +133,58 @@ class Hashmap { _mapInfo = _metaAllocator.allocate(1); *_mapInfo = *(other._mapInfo); buckets = other.buckets; - #ifndef HASHINATOR_CPU_ONLY_MODE +#ifndef HASHINATOR_CPU_ONLY_MODE SPLIT_CHECK_ERR(split_gpuMemcpy(device_map, this, sizeof(Hashmap), split_gpuMemcpyHostToDevice)); - #endif +#endif }; Hashmap(Hashmap&& other) { preallocate_device_handles(); _mapInfo = other._mapInfo; - other._mapInfo=nullptr; + other._mapInfo = nullptr; buckets = std::move(other.buckets); - #ifndef HASHINATOR_CPU_ONLY_MODE +#ifndef HASHINATOR_CPU_ONLY_MODE SPLIT_CHECK_ERR(split_gpuMemcpy(device_map, this, sizeof(Hashmap), split_gpuMemcpyHostToDevice)); - #endif +#endif }; - Hashmap& operator=(const Hashmap& other) { + Hashmap& operator=(const Hashmap& other) { if (this == &other) { return *this; } *_mapInfo = *(other._mapInfo); buckets = other.buckets; - #ifndef HASHINATOR_CPU_ONLY_MODE +#ifndef HASHINATOR_CPU_ONLY_MODE SPLIT_CHECK_ERR(split_gpuMemcpy(device_map, this, sizeof(Hashmap), split_gpuMemcpyHostToDevice)); - #endif +#endif return *this; } - #ifndef HASHINATOR_CPU_ONLY_MODE +#ifndef HASHINATOR_CPU_ONLY_MODE /** Copy assign but using a provided stream */ - void overwrite(const Hashmap& other, split_gpuStream_t stream = 0) { + void overwrite(const Hashmap& other, split_gpuStream_t stream = 0) { if (this == &other) { return; } - SPLIT_CHECK_ERR(split_gpuMemcpyAsync(_mapInfo,other._mapInfo, sizeof(MapInfo), split_gpuMemcpyDeviceToDevice, stream)); + SPLIT_CHECK_ERR( + split_gpuMemcpyAsync(_mapInfo, other._mapInfo, sizeof(MapInfo), split_gpuMemcpyDeviceToDevice, stream)); buckets.overwrite(other.buckets, stream); SPLIT_CHECK_ERR(split_gpuMemcpyAsync(device_map, this, sizeof(Hashmap), split_gpuMemcpyHostToDevice, stream)); return; } - #endif +#endif - Hashmap& operator=(Hashmap&& other) { + Hashmap& operator=(Hashmap&& other) { if (this == &other) { return *this; } _metaAllocator.deallocate(_mapInfo, 1); _mapInfo = other._mapInfo; - other._mapInfo=nullptr; + other._mapInfo = nullptr; buckets = std::move(other.buckets); - #ifndef HASHINATOR_CPU_ONLY_MODE +#ifndef HASHINATOR_CPU_ONLY_MODE SPLIT_CHECK_ERR(split_gpuMemcpy(device_map, this, sizeof(Hashmap), split_gpuMemcpyHostToDevice)); - #endif +#endif return *this; } @@ -271,9 +273,9 @@ class Hashmap { buckets = newBuckets; _mapInfo->currentMaxBucketOverflow = Hashinator::defaults::BUCKET_OVERFLOW; _mapInfo->tombstoneCounter = 0; - #ifndef HASHINATOR_CPU_ONLY_MODE +#ifndef HASHINATOR_CPU_ONLY_MODE SPLIT_CHECK_ERR(split_gpuMemcpy(device_map, this, sizeof(Hashmap), split_gpuMemcpyHostToDevice)); - #endif +#endif } #ifndef HASHINATOR_CPU_ONLY_MODE @@ -317,11 +319,11 @@ class Hashmap { if (newSizePower == _mapInfo->sizePower) { // Just clear the current contents clear(targets::device, s, 1 << newSizePower); - //DeviceHasher::reset_all(buckets.data(),_mapInfo, buckets.size(), s); + // DeviceHasher::reset_all(buckets.data(),_mapInfo, buckets.size(), s); } else { // Need new buckets buckets = std::move(split::SplitVector>( - 1 << newSizePower, hash_pair(EMPTYBUCKET, VAL_TYPE()))); + 1 << newSizePower, hash_pair(EMPTYBUCKET, VAL_TYPE()))); SPLIT_CHECK_ERR(split_gpuMemcpyAsync(device_map, this, sizeof(Hashmap), split_gpuMemcpyHostToDevice, s)); optimizeGPU(s); } @@ -478,16 +480,16 @@ class Hashmap { break; case targets::device: - if constexpr(prefetches) { + if constexpr (prefetches) { optimizeGPU(s); } - if (len==0) { // If size is provided, no need to page fault size information. + if (len == 0) { // If size is provided, no need to page fault size information. len = buckets.size(); } - DeviceHasher::reset_all(buckets.data(),_mapInfo, len, s); - #ifdef HASHINATOR_DEBUG + DeviceHasher::reset_all(buckets.data(), _mapInfo, len, s); +#ifdef HASHINATOR_DEBUG set_status((_mapInfo->fill == 0) ? success : fail); - #endif +#endif break; default: @@ -499,7 +501,7 @@ class Hashmap { #endif #ifdef HASHINATOR_CPU_ONLY_MODE -// Try to grow our buckets until we achieve a targetLF load factor + // Try to grow our buckets until we achieve a targetLF load factor void resize_to_lf(float targetLF = 0.5) { while (load_factor() > targetLF) { rehash(_mapInfo->sizePower + 1); @@ -510,16 +512,16 @@ class Hashmap { void resize_to_lf(float targetLF = 0.5, targets t = targets::host, split_gpuStream_t s = 0) { while (load_factor() > targetLF) { switch (t) { - case targets::host: - rehash(_mapInfo->sizePower + 1); - break; - case targets::device: - device_rehash(_mapInfo->sizePower + 1, s); - break; - default: - std::cerr << "Defaulting to host rehashing" << std::endl; - resize(_mapInfo->sizePower + 1, targets::host); - break; + case targets::host: + rehash(_mapInfo->sizePower + 1); + break; + case targets::device: + device_rehash(_mapInfo->sizePower + 1, s); + break; + default: + std::cerr << "Defaulting to host rehashing" << std::endl; + resize(_mapInfo->sizePower + 1, targets::host); + break; } } return; @@ -887,13 +889,13 @@ class Hashmap { if (w_tid == winner) { KEY_TYPE old = split::s_atomicCAS(&buckets[probingindex].first, EMPTYBUCKET, candidateKey); if (old == EMPTYBUCKET) { - threadOverflow =(probingindex < optimalindex) ? (1 << sizePower) : (probingindex - optimalindex+1); + threadOverflow = (probingindex < optimalindex) ? (1 << sizePower) : (probingindex - optimalindex + 1); split::s_atomicExch(&buckets[probingindex].second, candidateVal); warpDone = 1; split::s_atomicAdd(&_mapInfo->fill, 1); if (threadOverflow > _mapInfo->currentMaxBucketOverflow) { split::s_atomicExch((unsigned long long*)(&_mapInfo->currentMaxBucketOverflow), - (unsigned long long)nextOverflow(threadOverflow,defaults::WARPSIZE)); + (unsigned long long)nextOverflow(threadOverflow, defaults::WARPSIZE)); } } else if (old == candidateKey) { // Parallel stuff are fun. Major edge case! @@ -971,14 +973,14 @@ class Hashmap { if (w_tid == winner) { KEY_TYPE old = split::s_atomicCAS(&buckets[probingindex].first, EMPTYBUCKET, candidateKey); if (old == EMPTYBUCKET) { - threadOverflow = (probingindex < optimalindex) ? (1 << sizePower) : (probingindex - optimalindex+1); + threadOverflow = (probingindex < optimalindex) ? (1 << sizePower) : (probingindex - optimalindex + 1); split::s_atomicExch(&buckets[probingindex].second, candidateVal); warpDone = 1; localCount = 1; split::s_atomicAdd(&_mapInfo->fill, 1); if (threadOverflow > _mapInfo->currentMaxBucketOverflow) { split::s_atomicExch((unsigned long long*)(&_mapInfo->currentMaxBucketOverflow), - (unsigned long long)nextOverflow(threadOverflow,defaults::WARPSIZE)); + (unsigned long long)nextOverflow(threadOverflow, defaults::WARPSIZE)); } } else if (old == candidateKey) { // Parallel stuff are fun. Major edge case! @@ -1128,7 +1130,7 @@ class Hashmap { size_t extractPattern(split::SplitVector>& elements, Rule rule, split_gpuStream_t s = 0) { elements.resize(_mapInfo->fill + 1, true); - if constexpr(prefetches) { + if constexpr (prefetches) { elements.optimizeGPU(s); } // Extract elements matching the Pattern Rule(element)==true; @@ -1157,10 +1159,11 @@ class Hashmap { return retval; } template - void extractPatternLoop(split::SplitVector>& elements, Rule rule, split_gpuStream_t s = 0) { + void extractPatternLoop(split::SplitVector>& elements, Rule rule, + split_gpuStream_t s = 0) { // Extract elements matching the Pattern Rule(element)==true; - split::tools::copy_if_loop, Rule, defaults::MAX_BLOCKSIZE, - defaults::WARPSIZE>(*device_buckets, elements, rule, s); + split::tools::copy_if_loop, Rule, defaults::MAX_BLOCKSIZE, defaults::WARPSIZE>( + *device_buckets, elements, rule, s); } void extractLoop(split::SplitVector>& elements, split_gpuStream_t s = 0) { // Extract all valid elements @@ -1173,23 +1176,24 @@ class Hashmap { template size_t extractKeysByPattern(split::SplitVector& elements, Rule rule, split_gpuStream_t s = 0) { elements.resize(_mapInfo->fill + 1, true); - if constexpr(prefetches) { + if constexpr (prefetches) { elements.optimizeGPU(s); } // Extract element **keys** matching the Pattern Rule(element)==true; split::tools::copy_keys_if, KEY_TYPE, Rule, defaults::MAX_BLOCKSIZE, defaults::WARPSIZE>(buckets, elements, rule, s); - //FIXME: there is an issue where paging to host occurs and following calls to hashmap operations take a hit. - //temp fix: call optimizeGPU() here + // FIXME: there is an issue where paging to host occurs and following calls to hashmap operations take a hit. + // temp fix: call optimizeGPU() here if constexpr (prefetches) { optimizeGPU(s); } return elements.size(); } template - size_t extractKeysByPattern(split::SplitVector& elements, Rule rule, void *stack, size_t max_size, split_gpuStream_t s = 0) { + size_t extractKeysByPattern(split::SplitVector& elements, Rule rule, void* stack, size_t max_size, + split_gpuStream_t s = 0) { elements.resize(_mapInfo->fill + 1, true); - if constexpr(prefetches) { + if constexpr (prefetches) { elements.optimizeGPU(s); } // Extract element **keys** matching the Pattern Rule(element)==true; @@ -1201,7 +1205,7 @@ class Hashmap { void extractKeysByPatternLoop(split::SplitVector& elements, Rule rule, split_gpuStream_t s = 0) { // Extract element **keys** matching the Pattern Rule(element)==true; split::tools::copy_if_keys_loop, KEY_TYPE, Rule, defaults::MAX_BLOCKSIZE, - defaults::WARPSIZE>(*device_buckets, elements, rule, s); + defaults::WARPSIZE>(*device_buckets, elements, rule, s); } template @@ -1213,7 +1217,8 @@ class Hashmap { return extractKeysByPattern(elements, rule, s); } template - size_t extractAllKeys(split::SplitVector& elements, void *stack, size_t max_size, split_gpuStream_t s = 0) { + size_t extractAllKeys(split::SplitVector& elements, void* stack, size_t max_size, + split_gpuStream_t s = 0) { // Extract all keys auto rule = [] __host__ __device__(const hash_pair& kval) -> bool { return kval.first != EMPTYBUCKET && kval.first != TOMBSTONE; @@ -1244,7 +1249,7 @@ class Hashmap { SPLIT_CHECK_ERR(split_gpuMallocAsync((void**)&overflownElements, (1 << _mapInfo->sizePower) * sizeof(hash_pair), s)); - if constexpr(prefetches) { + if constexpr (prefetches) { optimizeGPU(s); } SPLIT_CHECK_ERR(split_gpuStreamSynchronize(s)); @@ -1276,8 +1281,7 @@ class Hashmap { } // If we do have overflown elements we put them back in the buckets SPLIT_CHECK_ERR(split_gpuStreamSynchronize(s)); - DeviceHasher::reset(overflownElements, buckets.data(), _mapInfo, - nOverflownElements, s); + DeviceHasher::reset(overflownElements, buckets.data(), _mapInfo, nOverflownElements, s); DeviceHasher::insert(overflownElements, buckets.data(), _mapInfo, nOverflownElements, s); @@ -1294,7 +1298,7 @@ class Hashmap { set_status(status::success); return; } - if constexpr(prefetches) { + if constexpr (prefetches) { buckets.optimizeGPU(s); } int64_t neededPowerSize = std::ceil(std::log2((_mapInfo->fill + len) * (1.0 / targetLF))); @@ -1315,7 +1319,7 @@ class Hashmap { set_status(status::success); return; } - if constexpr(prefetches) { + if constexpr (prefetches) { buckets.optimizeGPU(s); } int64_t neededPowerSize = std::ceil(std::log2((_mapInfo->fill + len) * (1.0 / targetLF))); @@ -1334,7 +1338,7 @@ class Hashmap { set_status(status::success); return; } - if constexpr(prefetches) { + if constexpr (prefetches) { buckets.optimizeGPU(s); } // Here we do some calculations to estimate how much if any we need to grow our buckets @@ -1349,7 +1353,7 @@ class Hashmap { // Uses Hasher's retrieve_kernel to read all elements template void retrieve(KEY_TYPE* keys, VAL_TYPE* vals, size_t len, split_gpuStream_t s = 0) { - if constexpr(prefetches){ + if constexpr (prefetches) { buckets.optimizeGPU(s); } DeviceHasher::retrieve(keys, vals, buckets.data(), _mapInfo, len, s); @@ -1359,7 +1363,7 @@ class Hashmap { // Uses Hasher's retrieve_kernel to read all elements template void retrieve(hash_pair* src, size_t len, split_gpuStream_t s = 0) { - if constexpr(prefetches){ + if constexpr (prefetches) { buckets.optimizeGPU(s); } DeviceHasher::retrieve(src, buckets.data(), _mapInfo, len, s); @@ -1369,7 +1373,7 @@ class Hashmap { // Uses Hasher's erase_kernel to delete all elements template void erase(KEY_TYPE* keys, size_t len, split_gpuStream_t s = 0) { - if constexpr(prefetches){ + if constexpr (prefetches) { buckets.optimizeGPU(s); } // Remember the last numeber of tombstones @@ -1388,11 +1392,12 @@ class Hashmap { */ template Hashmap* upload(split_gpuStream_t stream = 0) { - if constexpr(prefetches) { + if constexpr (prefetches) { optimizeGPU(stream); } - // device_buckets = (split::SplitVector>*)((char*)device_map + offsetof(Hashmap, buckets)); - // SPLIT_CHECK_ERR(split_gpuMemcpyAsync(device_map, this, sizeof(Hashmap), split_gpuMemcpyHostToDevice, stream)); + // device_buckets = (split::SplitVector>*)((char*)device_map + offsetof(Hashmap, + // buckets)); SPLIT_CHECK_ERR(split_gpuMemcpyAsync(device_map, this, sizeof(Hashmap), split_gpuMemcpyHostToDevice, + // stream)); return device_map; } @@ -1448,7 +1453,7 @@ class Hashmap { public: HASHINATOR_DEVICEONLY - device_iterator(Hashmap& hashtable, size_t index) : index(index),hashtable(&hashtable) {} + device_iterator(Hashmap& hashtable, size_t index) : index(index), hashtable(&hashtable) {} HASHINATOR_DEVICEONLY size_t getIndex() { return index; } @@ -1495,7 +1500,7 @@ class Hashmap { public: HASHINATOR_DEVICEONLY explicit const_device_iterator(const Hashmap& hashtable, size_t index) - : index(index), hashtable(&hashtable){} + : index(index), hashtable(&hashtable) {} HASHINATOR_DEVICEONLY size_t getIndex() { return index; } @@ -1706,7 +1711,8 @@ class Hashmap { void set_element(const KEY_TYPE& key, VAL_TYPE val) { size_t thread_overflowLookup = 0; insert_element(key, val, thread_overflowLookup); - atomicMax((unsigned long long*)&(_mapInfo->currentMaxBucketOverflow), nextOverflow(thread_overflowLookup,defaults::WARPSIZE/defaults::elementsPerWarp)); + atomicMax((unsigned long long*)&(_mapInfo->currentMaxBucketOverflow), + nextOverflow(thread_overflowLookup, defaults::WARPSIZE / defaults::elementsPerWarp)); } HASHINATOR_DEVICEONLY diff --git a/include/splitvector/split_allocators.h b/include/splitvector/split_allocators.h index ccff591..2527400 100644 --- a/include/splitvector/split_allocators.h +++ b/include/splitvector/split_allocators.h @@ -33,7 +33,7 @@ namespace split { #define SPLIT_CHECK_ERR(err) (split::cuda_error(err, __FILE__, __LINE__)) static void cuda_error(cudaError_t err, const char* file, int line) { if (err != cudaSuccess) { - std::cerr<<"\n\n"<> LOG_NUM_BANKS) @@ -44,7 +44,7 @@ #ifdef __HIP__ #define SPLIT_VOTING_MASK 0xFFFFFFFFFFFFFFFFull // 64-bit wide for amd warps #define WARPLENGTH 64 -#define MASKTYPE uint64_t +#define MASKTYPE uint64_t #define ONE 1ul #endif @@ -425,8 +425,7 @@ void split_prefix_scan(split::SplitVector>& * Modified from (http://www-graphics.stanford.edu/~seander/bithacks.html#RoundUpPowerOf2) to support 64-bit uints * Included here as well for standalone use of splitvec outside of hashintor */ -__host__ __device__ -constexpr inline size_t nextPow2(size_t v) noexcept { +__host__ __device__ constexpr inline size_t nextPow2(size_t v) noexcept { v--; v |= v >> 1; v |= v >> 2; @@ -554,7 +553,8 @@ __global__ void scan_reduce_raw(T* input, uint32_t* output, Rule rule, size_t si * @brief Same as split_prefix_scan but with raw memory */ template -void split_prefix_scan_raw(T* input, T* output, splitStackArena& mPool, const size_t input_size, split_gpuStream_t s = 0) { +void split_prefix_scan_raw(T* input, T* output, splitStackArena& mPool, const size_t input_size, + split_gpuStream_t s = 0) { // Scan is performed in half Blocksizes size_t scanBlocksize = BLOCKSIZE / 2; @@ -601,9 +601,8 @@ void split_prefix_scan_raw(T* input, T* output, splitStackArena& mPool, const si } } -template -__global__ void block_compact(T* input,T* output,size_t inputSize,Rule rule,uint32_t *retval) -{ +template +__global__ void block_compact(T* input, T* output, size_t inputSize, Rule rule, uint32_t* retval) { // This must be equal to at least both WARPLENGTH and MAX_BLOCKSIZE/WARPLENGTH __shared__ uint32_t warpSums[WARPLENGTH]; const size_t tid = threadIdx.x + blockIdx.x * blockDim.x; @@ -611,23 +610,23 @@ __global__ void block_compact(T* input,T* output,size_t inputSize,Rule rule,uint const size_t w_tid = tid % WARPLENGTH; const uint warpsPerBlock = BLOCKSIZE / WARPLENGTH; // zero init shared buffer - if (wid==0) { + if (wid == 0) { warpSums[w_tid] = 0; } __syncthreads(); - //full warp votes for rule-> mask = [01010101010101010101010101010101] - const int active = (tid mask = [01010101010101010101010101010101] + const int active = (tid < inputSize) ? rule(input[tid]) : false; + const auto mask = split::s_warpVote(active == 1, SPLIT_VOTING_MASK); const auto warpCount = s_pop_count(mask); - if (w_tid==0) { + if (w_tid == 0) { warpSums[wid] = warpCount; } __syncthreads(); - //Figure out the total here because we overwrite shared mem later - if (wid==0) { + // Figure out the total here because we overwrite shared mem later + if (wid == 0) { // ceil int division - int activeWARPS = nextPow2( 1 + ((inputSize - 1) / WARPLENGTH)); - auto reduceCounts = [activeWARPS](int localCount)->int{ + int activeWARPS = nextPow2(1 + ((inputSize - 1) / WARPLENGTH)); + auto reduceCounts = [activeWARPS](int localCount) -> int { for (int i = activeWARPS / 2; i > 0; i = i / 2) { localCount += split::s_shuffle_down(localCount, i, SPLIT_VOTING_MASK); } @@ -635,33 +634,32 @@ __global__ void block_compact(T* input,T* output,size_t inputSize,Rule rule,uint }; auto localCount = warpSums[w_tid]; int totalCount = reduceCounts(localCount); - if (w_tid==0) { - *retval=(uint32_t)(totalCount); + if (w_tid == 0) { + *retval = (uint32_t)(totalCount); } } - //Prefix scan WarpSums on the first warp - if (wid==0) { + // Prefix scan WarpSums on the first warp + if (wid == 0) { auto value = warpSums[w_tid]; - for (int d=1; d= d) { - value+= res; + value += res; } - } - warpSums[w_tid] = value; + } + warpSums[w_tid] = value; } __syncthreads(); - auto offset = (wid==0) ? 0 : warpSums[wid-1]; - auto pp = s_pop_count(mask&((ONE< -__global__ void block_compact_keys(T* input,U* output,size_t inputSize,Rule rule,uint32_t *retval) -{ +template +__global__ void block_compact_keys(T* input, U* output, size_t inputSize, Rule rule, uint32_t* retval) { // This must be equal to at least both WARPLENGTH and MAX_BLOCKSIZE/WARPLENGTH __shared__ uint32_t warpSums[WARPLENGTH]; const size_t tid = threadIdx.x + blockIdx.x * blockDim.x; @@ -669,23 +667,23 @@ __global__ void block_compact_keys(T* input,U* output,size_t inputSize,Rule rule const size_t w_tid = tid % WARPLENGTH; const uint warpsPerBlock = BLOCKSIZE / WARPLENGTH; // zero init shared buffer - if (wid==0) { + if (wid == 0) { warpSums[w_tid] = 0; } __syncthreads(); - //full warp votes for rule-> mask = [01010101010101010101010101010101] - const int active = (tid mask = [01010101010101010101010101010101] + const int active = (tid < inputSize) ? rule(input[tid]) : false; + const auto mask = split::s_warpVote(active == 1, SPLIT_VOTING_MASK); const auto warpCount = s_pop_count(mask); - if (w_tid==0) { + if (w_tid == 0) { warpSums[wid] = warpCount; } __syncthreads(); - //Figure out the total here because we overwrite shared mem later - if (wid==0) { + // Figure out the total here because we overwrite shared mem later + if (wid == 0) { // ceil int division - int activeWARPS = nextPow2( 1 + ((inputSize - 1) / WARPLENGTH)); - auto reduceCounts = [activeWARPS](int localCount)->int{ + int activeWARPS = nextPow2(1 + ((inputSize - 1) / WARPLENGTH)); + auto reduceCounts = [activeWARPS](int localCount) -> int { for (int i = activeWARPS / 2; i > 0; i = i / 2) { localCount += split::s_shuffle_down(localCount, i, SPLIT_VOTING_MASK); } @@ -693,49 +691,47 @@ __global__ void block_compact_keys(T* input,U* output,size_t inputSize,Rule rule }; auto localCount = warpSums[w_tid]; int totalCount = reduceCounts(localCount); - if (w_tid==0) { - *retval=(uint32_t)(totalCount); + if (w_tid == 0) { + *retval = (uint32_t)(totalCount); } } - //Prefix scan WarpSums on the first warp - if (wid==0) { + // Prefix scan WarpSums on the first warp + if (wid == 0) { auto value = warpSums[w_tid]; - for (int d=1; d= d) { - value+= res; + value += res; } - } - warpSums[w_tid] = value; + } + warpSums[w_tid] = value; } __syncthreads(); - auto offset = (wid==0) ? 0 : warpSums[wid-1]; - auto pp = s_pop_count(mask&((ONE< -__global__ void loop_compact( - split::SplitVector>& inputVec, - split::SplitVector>& outputVec, - Rule rule) { +template +__global__ void loop_compact(split::SplitVector>& inputVec, + split::SplitVector>& outputVec, Rule rule) { // This must be equal to at least both WARPLENGTH and MAX_BLOCKSIZE/WARPLENGTH __shared__ uint32_t warpSums[WARPLENGTH]; __shared__ uint32_t outputCount; // blockIdx.x is always 0 for this kernel - const size_t tid = threadIdx.x;// + blockIdx.x * blockDim.x; + const size_t tid = threadIdx.x; // + blockIdx.x * blockDim.x; const size_t wid = tid / WARPLENGTH; const size_t w_tid = tid % WARPLENGTH; const uint warpsPerBlock = BLOCKSIZE / WARPLENGTH; // zero init shared buffer - if (wid==0) { + if (wid == 0) { warpSums[w_tid] = 0; } __syncthreads(); - //full warp votes for rule-> mask = [01010101010101010101010101010101] + // full warp votes for rule-> mask = [01010101010101010101010101010101] int64_t remaining = inputVec.size(); const uint capacity = outputVec.capacity(); uint32_t outputSize = 0; @@ -746,46 +742,46 @@ __global__ void loop_compact( while (remaining > 0) { int current = remaining > blockDim.x ? blockDim.x : remaining; __syncthreads(); - const int active = (tidint{ - for (int i = activeWARPS / 2; i > 0; i = i / 2) { - localCount += split::s_shuffle_down(localCount, i, SPLIT_VOTING_MASK); - } - return localCount; - }; + int activeWARPS = nextPow2(1 + ((current - 1) / WARPLENGTH)); + auto reduceCounts = [activeWARPS](int localCount) -> int { + for (int i = activeWARPS / 2; i > 0; i = i / 2) { + localCount += split::s_shuffle_down(localCount, i, SPLIT_VOTING_MASK); + } + return localCount; + }; auto localCount = warpSums[w_tid]; int totalCount = reduceCounts(localCount); - if (w_tid==0) { + if (w_tid == 0) { outputCount = totalCount; outputSize += totalCount; assert((outputSize <= capacity) && "loop_compact ran out of capacity!"); outputVec.device_resize(outputSize); } } - //Prefix scan WarpSums on the first warp - if (wid==0) { + // Prefix scan WarpSums on the first warp + if (wid == 0) { auto value = warpSums[w_tid]; - for (int d=1; d= d) { - value+= res; + for (int d = 1; d < warpsPerBlock; d = 2 * d) { + int res = split::s_shuffle_up(value, d, SPLIT_VOTING_MASK); + if (tid % warpsPerBlock >= d) { + value += res; } } warpSums[w_tid] = value; } __syncthreads(); - auto offset = (wid==0) ? 0 : warpSums[wid-1]; - auto pp = s_pop_count(mask&((ONE< -__global__ void loop_compact_keys( - split::SplitVector>& inputVec, - split::SplitVector>& outputVec, - Rule rule) { +template +__global__ void loop_compact_keys(split::SplitVector>& inputVec, + split::SplitVector>& outputVec, Rule rule) { // This must be equal to at least both WARPLENGTH and MAX_BLOCKSIZE/WARPLENGTH __shared__ uint32_t warpSums[WARPLENGTH]; __shared__ uint32_t outputCount; // blockIdx.x is always 0 for this kernel - const size_t tid = threadIdx.x;// + blockIdx.x * blockDim.x; + const size_t tid = threadIdx.x; // + blockIdx.x * blockDim.x; const size_t wid = tid / WARPLENGTH; const size_t w_tid = tid % WARPLENGTH; const uint warpsPerBlock = BLOCKSIZE / WARPLENGTH; // zero init shared buffer - if (wid==0) { + if (wid == 0) { warpSums[w_tid] = 0; } __syncthreads(); - //full warp votes for rule-> mask = [01010101010101010101010101010101] + // full warp votes for rule-> mask = [01010101010101010101010101010101] int64_t remaining = inputVec.size(); const uint capacity = outputVec.capacity(); uint32_t outputSize = 0; @@ -830,46 +824,46 @@ __global__ void loop_compact_keys( while (remaining > 0) { int current = remaining > blockDim.x ? blockDim.x : remaining; __syncthreads(); - const int active = (tidint{ - for (int i = activeWARPS / 2; i > 0; i = i / 2) { - localCount += split::s_shuffle_down(localCount, i, SPLIT_VOTING_MASK); - } - return localCount; - }; + int activeWARPS = nextPow2(1 + ((current - 1) / WARPLENGTH)); + auto reduceCounts = [activeWARPS](int localCount) -> int { + for (int i = activeWARPS / 2; i > 0; i = i / 2) { + localCount += split::s_shuffle_down(localCount, i, SPLIT_VOTING_MASK); + } + return localCount; + }; auto localCount = warpSums[w_tid]; int totalCount = reduceCounts(localCount); - if (w_tid==0) { + if (w_tid == 0) { outputCount = totalCount; outputSize += totalCount; assert((outputSize <= capacity) && "loop_compact ran out of capacity!"); outputVec.device_resize(outputSize); } } - //Prefix scan WarpSums on the first warp - if (wid==0) { + // Prefix scan WarpSums on the first warp + if (wid == 0) { auto value = warpSums[w_tid]; - for (int d=1; d= d) { - value+= res; + for (int d = 1; d < warpsPerBlock; d = 2 * d) { + int res = split::s_shuffle_up(value, d, SPLIT_VOTING_MASK); + if (tid % warpsPerBlock >= d) { + value += res; } } warpSums[w_tid] = value; } __syncthreads(); - auto offset = (wid==0) ? 0 : warpSums[wid-1]; - auto pp = s_pop_count(mask&((ONE< size_t copy_if_block(T* input, T* output, size_t size, Rule rule, void* stack, size_t max_size, - split_gpuStream_t s = 0) { + split_gpuStream_t s = 0) { assert(stack && "Invalid stack!"); splitStackArena mPool(stack, max_size); - uint32_t *dlen = (uint32_t*)mPool.allocate(sizeof(uint32_t)); - split::tools::block_compact<<<1,std::min(BLOCKSIZE,nextPow2(size))>>>(input,output,size,rule,dlen); - uint32_t len=0; + uint32_t* dlen = (uint32_t*)mPool.allocate(sizeof(uint32_t)); + split::tools::block_compact<<<1, std::min(BLOCKSIZE, nextPow2(size))>>>(input, output, size, rule, dlen); + uint32_t len = 0; SPLIT_CHECK_ERR(split_gpuMemcpyAsync(&len, dlen, sizeof(uint32_t), split_gpuMemcpyDeviceToHost, s)); SPLIT_CHECK_ERR(split_gpuStreamSynchronize(s)); return len; } template -size_t copy_if_block(T* input, T* output, size_t size, Rule rule, splitStackArena& mPool, - split_gpuStream_t s = 0) { - uint32_t *dlen = (uint32_t*)mPool.allocate(sizeof(uint32_t)); - split::tools::block_compact<<<1,std::min(BLOCKSIZE,nextPow2(size)),0,s>>>(input,output,size,rule,dlen); - uint32_t len=0; +size_t copy_if_block(T* input, T* output, size_t size, Rule rule, splitStackArena& mPool, split_gpuStream_t s = 0) { + uint32_t* dlen = (uint32_t*)mPool.allocate(sizeof(uint32_t)); + split::tools::block_compact<<<1, std::min(BLOCKSIZE, nextPow2(size)), 0, s>>>(input, output, size, rule, dlen); + uint32_t len = 0; SPLIT_CHECK_ERR(split_gpuMemcpyAsync(&len, dlen, sizeof(uint32_t), split_gpuMemcpyDeviceToHost, s)); SPLIT_CHECK_ERR(split_gpuStreamSynchronize(s)); return len; } -template +template size_t copy_if_keys_block(T* input, U* output, size_t size, Rule rule, splitStackArena& mPool, - split_gpuStream_t s = 0) { - uint32_t *dlen = (uint32_t*)mPool.allocate(sizeof(uint32_t)); - split::tools::block_compact_keys<<<1,std::min(BLOCKSIZE,nextPow2(size)),0,s>>>(input,output,size,rule,dlen); - uint32_t len=0; - SPLIT_CHECK_ERR(split_gpuMemcpyAsync(&len, dlen, sizeof(uint32_t), split_gpuMemcpyDeviceToHost,s)); + split_gpuStream_t s = 0) { + uint32_t* dlen = (uint32_t*)mPool.allocate(sizeof(uint32_t)); + split::tools::block_compact_keys<<<1, std::min(BLOCKSIZE, nextPow2(size)), 0, s>>>(input, output, size, rule, dlen); + uint32_t len = 0; + SPLIT_CHECK_ERR(split_gpuMemcpyAsync(&len, dlen, sizeof(uint32_t), split_gpuMemcpyDeviceToHost, s)); SPLIT_CHECK_ERR(split_gpuStreamSynchronize(s)); return len; } @@ -929,18 +922,18 @@ uint32_t copy_if_raw(split::SplitVector>& i size_t nBlocks, splitStackArena& mPool, split_gpuStream_t s = 0) { size_t _size = input.size(); - if (_size<=BLOCKSIZE){ - return copy_if_block(input.data(),output,_size,rule,mPool,s); + if (_size <= BLOCKSIZE) { + return copy_if_block(input.data(), output, _size, rule, mPool, s); } uint32_t* d_counts; uint32_t* d_offsets; d_counts = (uint32_t*)mPool.allocate(nBlocks * sizeof(uint32_t)); - SPLIT_CHECK_ERR(split_gpuMemsetAsync(d_counts, 0, nBlocks * sizeof(uint32_t),s)); + SPLIT_CHECK_ERR(split_gpuMemsetAsync(d_counts, 0, nBlocks * sizeof(uint32_t), s)); // Phase 1 -- Calculate per warp workload split::tools::scan_reduce_raw<<>>(input.data(), d_counts, rule, _size); d_offsets = (uint32_t*)mPool.allocate(nBlocks * sizeof(uint32_t)); - SPLIT_CHECK_ERR(split_gpuMemsetAsync(d_offsets, 0, nBlocks * sizeof(uint32_t),s)); + SPLIT_CHECK_ERR(split_gpuMemsetAsync(d_offsets, 0, nBlocks * sizeof(uint32_t), s)); // Step 2 -- Exclusive Prefix Scan on offsets if (nBlocks == 1) { @@ -952,8 +945,8 @@ uint32_t copy_if_raw(split::SplitVector>& i // Step 3 -- Compaction uint32_t* retval = (uint32_t*)mPool.allocate(sizeof(uint32_t)); split::tools::split_compact_raw - <<>>( - input.data(), d_counts, d_offsets, output, rule, _size, nBlocks, retval); + <<>>(input.data(), d_counts, d_offsets, + output, rule, _size, nBlocks, retval); uint32_t numel; SPLIT_CHECK_ERR(split_gpuMemcpyAsync(&numel, retval, sizeof(uint32_t), split_gpuMemcpyDeviceToHost, s)); SPLIT_CHECK_ERR(split_gpuStreamSynchronize(s)); @@ -961,11 +954,11 @@ uint32_t copy_if_raw(split::SplitVector>& i } template -uint32_t copy_if_raw(T* input, T* output, size_t size, Rule rule, - size_t nBlocks, splitStackArena& mPool, split_gpuStream_t s = 0) { +uint32_t copy_if_raw(T* input, T* output, size_t size, Rule rule, size_t nBlocks, splitStackArena& mPool, + split_gpuStream_t s = 0) { - if (size<=BLOCKSIZE){ - return copy_if_block(input,output,size,rule,mPool,s); + if (size <= BLOCKSIZE) { + return copy_if_block(input, output, size, rule, mPool, s); } uint32_t* d_counts; uint32_t* d_offsets; @@ -975,7 +968,7 @@ uint32_t copy_if_raw(T* input, T* output, size_t size, Rule rule, // Phase 1 -- Calculate per warp workload split::tools::scan_reduce_raw<<>>(input, d_counts, rule, size); d_offsets = (uint32_t*)mPool.allocate(nBlocks * sizeof(uint32_t)); - SPLIT_CHECK_ERR(split_gpuMemsetAsync(d_offsets, 0, nBlocks * sizeof(uint32_t),s)); + SPLIT_CHECK_ERR(split_gpuMemsetAsync(d_offsets, 0, nBlocks * sizeof(uint32_t), s)); // Step 2 -- Exclusive Prefix Scan on offsets if (nBlocks == 1) { @@ -987,8 +980,8 @@ uint32_t copy_if_raw(T* input, T* output, size_t size, Rule rule, // Step 3 -- Compaction uint32_t* retval = (uint32_t*)mPool.allocate(sizeof(uint32_t)); split::tools::split_compact_raw - <<>>( - input, d_counts, d_offsets, output, rule, size, nBlocks, retval); + <<>>(input, d_counts, d_offsets, output, + rule, size, nBlocks, retval); uint32_t numel; SPLIT_CHECK_ERR(split_gpuMemcpyAsync(&numel, retval, sizeof(uint32_t), split_gpuMemcpyDeviceToHost, s)); SPLIT_CHECK_ERR(split_gpuStreamSynchronize(s)); @@ -1001,29 +994,29 @@ uint32_t copy_if_raw(T* input, T* output, size_t size, Rule rule, */ template -void copy_if_loop( - split::SplitVector>& input, - split::SplitVector>& output, - Rule rule, split_gpuStream_t s = 0) { - #ifdef HASHINATOR_DEBUG - bool input_ok = isDeviceAccessible( reinterpret_cast(&input)); - bool output_ok= isDeviceAccessible( reinterpret_cast(&output)); - assert( (input_ok && output_ok) && "This method supports splitvectors dynamically allocated on device or unified memory!"); - #endif - split::tools::loop_compact<<<1,BLOCKSIZE,0,s>>>(input,output,rule); +void copy_if_loop(split::SplitVector>& input, + split::SplitVector>& output, Rule rule, + split_gpuStream_t s = 0) { +#ifdef HASHINATOR_DEBUG + bool input_ok = isDeviceAccessible(reinterpret_cast(&input)); + bool output_ok = isDeviceAccessible(reinterpret_cast(&output)); + assert((input_ok && output_ok) && + "This method supports splitvectors dynamically allocated on device or unified memory!"); +#endif + split::tools::loop_compact<<<1, BLOCKSIZE, 0, s>>>(input, output, rule); } -template -void copy_if_keys_loop( - split::SplitVector>& input, - split::SplitVector>& output, - Rule rule, split_gpuStream_t s = 0) { - #ifdef HASHINATOR_DEBUG - bool input_ok = isDeviceAccessible( reinterpret_cast(&input)); - bool output_ok= isDeviceAccessible( reinterpret_cast(&output)); - assert( (input_ok && output_ok) && "This method supports splitvectors dynamically allocated on device or unified memory!"); - #endif - split::tools::loop_compact_keys<<<1,BLOCKSIZE,0,s>>>(input,output,rule); +template +void copy_if_keys_loop(split::SplitVector>& input, + split::SplitVector>& output, Rule rule, + split_gpuStream_t s = 0) { +#ifdef HASHINATOR_DEBUG + bool input_ok = isDeviceAccessible(reinterpret_cast(&input)); + bool output_ok = isDeviceAccessible(reinterpret_cast(&output)); + assert((input_ok && output_ok) && + "This method supports splitvectors dynamically allocated on device or unified memory!"); +#endif + split::tools::loop_compact_keys<<<1, BLOCKSIZE, 0, s>>>(input, output, rule); } /** @@ -1034,18 +1027,18 @@ size_t copy_keys_if_raw(split::SplitVector> size_t nBlocks, splitStackArena& mPool, split_gpuStream_t s = 0) { size_t _size = input.size(); - if (_size<=BLOCKSIZE){ - return copy_if_keys_block(input.data(),output,_size,rule,mPool,s); + if (_size <= BLOCKSIZE) { + return copy_if_keys_block(input.data(), output, _size, rule, mPool, s); } uint32_t* d_counts; uint32_t* d_offsets; d_counts = (uint32_t*)mPool.allocate(nBlocks * sizeof(uint32_t)); - SPLIT_CHECK_ERR(split_gpuMemsetAsync(d_counts, 0, nBlocks * sizeof(uint32_t),s)); + SPLIT_CHECK_ERR(split_gpuMemsetAsync(d_counts, 0, nBlocks * sizeof(uint32_t), s)); // Phase 1 -- Calculate per warp workload split::tools::scan_reduce_raw<<>>(input.data(), d_counts, rule, _size); d_offsets = (uint32_t*)mPool.allocate(nBlocks * sizeof(uint32_t)); - SPLIT_CHECK_ERR(split_gpuMemsetAsync(d_offsets, 0, nBlocks * sizeof(uint32_t),s)); + SPLIT_CHECK_ERR(split_gpuMemsetAsync(d_offsets, 0, nBlocks * sizeof(uint32_t), s)); // Step 2 -- Exclusive Prefix Scan on offsets if (nBlocks == 1) { @@ -1057,8 +1050,8 @@ size_t copy_keys_if_raw(split::SplitVector> // Step 3 -- Compaction uint32_t* retval = (uint32_t*)mPool.allocate(sizeof(uint32_t)); split::tools::split_compact_keys_raw - <<>>( - input.data(), d_counts, d_offsets, output, rule, _size, nBlocks, retval); + <<>>(input.data(), d_counts, d_offsets, + output, rule, _size, nBlocks, retval); uint32_t numel; SPLIT_CHECK_ERR(split_gpuMemcpyAsync(&numel, retval, sizeof(uint32_t), split_gpuMemcpyDeviceToHost, s)); SPLIT_CHECK_ERR(split_gpuStreamSynchronize(s)); @@ -1083,8 +1076,7 @@ estimateMemoryForCompaction(const split::SplitVector -[[nodiscard]] size_t -estimateMemoryForCompaction(const size_t inputSize) noexcept { +[[nodiscard]] size_t estimateMemoryForCompaction(const size_t inputSize) noexcept { // Figure out Blocks to use size_t _s = std::ceil((float(inputSize)) / (float)BLOCKSIZE); size_t nBlocks = nextPow2(_s); @@ -1216,8 +1208,7 @@ void copy_if(split::SplitVector>& input, } template -size_t copy_if(T* input, T* output, size_t size, Rule rule, void* stack, size_t max_size, - split_gpuStream_t s = 0) { +size_t copy_if(T* input, T* output, size_t size, Rule rule, void* stack, size_t max_size, split_gpuStream_t s = 0) { // Figure out Blocks to use size_t _s = std::ceil((float(size)) / (float)BLOCKSIZE); diff --git a/include/splitvector/splitvec.h b/include/splitvector/splitvec.h index da49a69..fbbb08f 100644 --- a/include/splitvector/splitvec.h +++ b/include/splitvector/splitvec.h @@ -479,8 +479,7 @@ class SplitVector { * @return Pointer to the uploaded SplitVector on the GPU. */ template - HOSTONLY - SplitVector* upload(split_gpuStream_t stream = 0) { + HOSTONLY SplitVector* upload(split_gpuStream_t stream = 0) { if (!d_vec) { SPLIT_CHECK_ERR(split_gpuMallocAsync((void**)&d_vec, sizeof(SplitVector), stream)); }