From 1a338575cf44fe34530663f42d67145444d1bacc Mon Sep 17 00:00:00 2001 From: Matteo Concas Date: Mon, 30 Sep 2024 18:55:40 +0200 Subject: [PATCH] Fix batching of the BulkProcessing --- .../GPU/DeviceInterface/GPUInterface.h | 5 +- Common/DCAFitter/GPU/cuda/DCAFitterN.cu | 130 ++++++++++++------ Common/DCAFitter/GPU/cuda/GPUInterface.cu | 12 +- .../GPU/cuda/test/testDCAFitterNGPU.cxx | 48 ++++--- .../DCAFitter/include/DCAFitter/DCAFitterN.h | 2 +- 5 files changed, 132 insertions(+), 65 deletions(-) diff --git a/Common/DCAFitter/GPU/DeviceInterface/GPUInterface.h b/Common/DCAFitter/GPU/DeviceInterface/GPUInterface.h index 8474a68d757b8..effd13021d538 100644 --- a/Common/DCAFitter/GPU/DeviceInterface/GPUInterface.h +++ b/Common/DCAFitter/GPU/DeviceInterface/GPUInterface.h @@ -49,15 +49,16 @@ class GPUInterface void unregisterBuffer(void* addr); void allocDevice(void**, size_t); void freeDevice(void*); - Stream& getStream(short N = 0); + Stream& getStream(unsigned short N = 0); + Stream& getNextStream(); protected: GPUInterface(size_t N = 1); ~GPUInterface(); void resize(size_t); - unsigned short getNextCursor(); + std::atomic mLastUsedStream{0}; static GPUInterface* sGPUInterface; std::vector mPool{}; std::vector mStreams{}; diff --git a/Common/DCAFitter/GPU/cuda/DCAFitterN.cu b/Common/DCAFitter/GPU/cuda/DCAFitterN.cu index 5a0b79007b202..28a969f25dee3 100644 --- a/Common/DCAFitter/GPU/cuda/DCAFitterN.cu +++ b/Common/DCAFitter/GPU/cuda/DCAFitterN.cu @@ -61,10 +61,10 @@ GPUg() void processKernel(Fitter* fitter, int* res, Tr*... tracks) } template -GPUg() void processBulkKernel(Fitter* fitters, int* results, unsigned int N, Tr*... tracks) +GPUg() void processBatchKernel(Fitter* fitters, int* results, size_t off, size_t N, Tr*... tracks) { for (auto iThread{blockIdx.x * blockDim.x + threadIdx.x}; iThread < N; iThread += blockDim.x * gridDim.x) { - results[iThread] = fitters[iThread].process(tracks[iThread]...); + results[iThread + off] = fitters[iThread + off].process(tracks[iThread + off]...); } } @@ -131,71 +131,111 @@ int process(const int nBlocks, } template -std::vector processBulk(const int nBlocks, - const int nThreads, - std::vector& fitters, - std::vector&... args) +void processBulk(const int nBlocks, + const int nThreads, + const int nStreams, + std::vector& fitters, + std::vector& results, + std::vector&... args) { + auto* gpuInterface = GPUInterface::Instance(); kernel::warmUpGpuKernel<<<1, 1>>>(); - cudaEvent_t start, stop; - gpuCheckError(cudaEventCreate(&start)); - gpuCheckError(cudaEventCreate(&stop)); - const auto nFits{fitters.size()}; // for clarity: size of all the vectors needs to be equal, not enforcing it here yet. - std::vector results(nFits); - int* results_device; - Fitter* fitters_device; + // Benchmarking events + // std::vector start(nStreams), stop(nStreams); + // cudaEvent_t totalStart, totalStop; + // gpuCheckError(cudaEventCreate(&totalStart)); + // gpuCheckError(cudaEventCreate(&totalStop)); + // for (int iBatch{0}; iBatch < nStreams; ++iBatch) { + // gpuCheckError(cudaEventCreate(&start[iBatch])); + // gpuCheckError(cudaEventCreate(&stop[iBatch])); + // } + + // Tracks std::array tracks_device; - auto* gpuInterface = GPUInterface::Instance(); - int iArg{0}; ([&] { gpuInterface->registerBuffer(reinterpret_cast(args.data()), sizeof(Tr) * args.size()); gpuInterface->allocDevice(reinterpret_cast(&(tracks_device[iArg])), sizeof(Tr) * args.size()); - gpuCheckError(cudaMemcpyAsync(tracks_device[iArg], args.data(), sizeof(Tr) * args.size(), cudaMemcpyHostToDevice, gpuInterface->getStream(iArg))); ++iArg; }(), ...); - gpuInterface->registerBuffer(reinterpret_cast(fitters.data()), sizeof(Fitter) * nFits); - gpuInterface->registerBuffer(reinterpret_cast(results.data()), sizeof(int) * nFits); - gpuInterface->allocDevice(reinterpret_cast(&results_device), sizeof(int) * nFits); - gpuInterface->allocDevice(reinterpret_cast(&fitters_device), sizeof(Fitter) * nFits); - gpuCheckError(cudaMemcpy(fitters_device, fitters.data(), sizeof(Fitter) * nFits, cudaMemcpyHostToDevice)); - gpuCheckError(cudaEventRecord(start)); - std::apply([&](auto&&... args) { kernel::processBulkKernel<<>>(fitters_device, results_device, nFits, args...); }, tracks_device); - gpuCheckError(cudaEventRecord(stop)); - - gpuCheckError(cudaPeekAtLastError()); - gpuCheckError(cudaDeviceSynchronize()); + // Fitters + gpuInterface->registerBuffer(reinterpret_cast(fitters.data()), sizeof(Fitter) * fitters.size()); + Fitter* fitters_device; + gpuInterface->allocDevice(reinterpret_cast(&fitters_device), sizeof(Fitter) * fitters.size()); - gpuCheckError(cudaMemcpy(results.data(), results_device, sizeof(int) * results.size(), cudaMemcpyDeviceToHost)); - gpuCheckError(cudaMemcpy(fitters.data(), fitters_device, sizeof(Fitter) * nFits, cudaMemcpyDeviceToHost)); + // Results + gpuInterface->registerBuffer(reinterpret_cast(results.data()), sizeof(int) * fitters.size()); + int* results_device; + gpuInterface->allocDevice(reinterpret_cast(&results_device), sizeof(int) * fitters.size()); + + // gpuCheckError(cudaEventRecord(totalStart)); + int totalSize = fitters.size(); + int batchSize = totalSize / nStreams; + int remainder = totalSize % nStreams; + + for (int iBatch{0}; iBatch < nStreams; ++iBatch) { + auto& stream = gpuInterface->getNextStream(); + auto offset = iBatch * batchSize + std::min(iBatch, remainder); + auto nFits = batchSize + (iBatch < remainder ? 1 : 0); + + gpuCheckError(cudaMemcpyAsync(fitters_device + offset, fitters.data() + offset, sizeof(Fitter) * nFits, cudaMemcpyHostToDevice, stream)); + iArg = 0; + ([&] { + gpuCheckError(cudaMemcpyAsync(tracks_device[iArg] + offset, args.data() + offset, sizeof(Tr) * nFits, cudaMemcpyHostToDevice, stream)); + ++iArg; + }(), + ...); + // gpuCheckError(cudaEventRecord(start[iBatch])); + std::apply([&](auto&&... args) { kernel::processBatchKernel<<>>(fitters_device, results_device, offset, nFits, args...); }, tracks_device); + // gpuCheckError(cudaEventRecord(stop[iBatch])); + + gpuCheckError(cudaPeekAtLastError()); + gpuCheckError(cudaStreamSynchronize(stream)); + iArg = 0; + ([&] { + gpuCheckError(cudaMemcpyAsync(args.data() + offset, tracks_device[iArg] + offset, sizeof(Tr) * nFits, cudaMemcpyDeviceToHost, stream)); + ++iArg; + }(), + ...); + gpuCheckError(cudaMemcpyAsync(fitters.data() + offset, fitters_device + offset, sizeof(Fitter) * nFits, cudaMemcpyDeviceToHost, stream)); + gpuCheckError(cudaMemcpyAsync(results.data() + offset, results_device + offset, sizeof(int) * nFits, cudaMemcpyDeviceToHost, stream)); + } + ([&] { gpuInterface->unregisterBuffer(args.data()); }(), ...); + // gpuCheckError(cudaEventRecord(totalStop)); - iArg = 0; - ([&] { - gpuCheckError(cudaMemcpyAsync(args.data(), tracks_device[iArg], sizeof(Tr) * args.size(), cudaMemcpyDeviceToHost, gpuInterface->getStream(iArg))); - gpuInterface->freeDevice(tracks_device[iArg]); - gpuInterface->unregisterBuffer(args.data()); - ++iArg; - }(), - ...); + for (auto* tracksD : tracks_device) { + gpuInterface->freeDevice(tracksD); + } gpuInterface->freeDevice(fitters_device); gpuInterface->freeDevice(results_device); gpuInterface->unregisterBuffer(fitters.data()); gpuInterface->unregisterBuffer(results.data()); - gpuCheckError(cudaEventSynchronize(stop)); - - float milliseconds = 0; - gpuCheckError(cudaEventElapsedTime(&milliseconds, start, stop)); + // float milliseconds = 0; + // gpuCheckError(cudaEventElapsedTime(&milliseconds, start, stop)); - LOGP(info, "Kernel run in: {} ms using {} blocks and {} threads.", milliseconds, nBlocks, nThreads); - return results; + // LOGP(info, "Kernel run in: {} ms using {} blocks and {} threads.", milliseconds, nBlocks, nThreads); + // return results; } -template std::vector processBulk(const int, const int, std::vector>&, std::vector&, std::vector&); -template std::vector processBulk(const int, const int, std::vector>&, std::vector&, std::vector&, std::vector&); +template void processBulk(const int, + const int, + const int, + std::vector>&, + std::vector&, + std::vector&, + std::vector&); +template void processBulk(const int, + const int, + const int, + std::vector>&, + std::vector&, + std::vector&, + std::vector&, + std::vector&); template int process(const int, const int, o2::vertexing::DCAFitterN<2>&, o2::track::TrackParCov&, o2::track::TrackParCov&); template int process(const int, const int, o2::vertexing::DCAFitterN<3>&, o2::track::TrackParCov&, o2::track::TrackParCov&, o2::track::TrackParCov&); template void print(const int, const int, o2::vertexing::DCAFitterN<2>&); diff --git a/Common/DCAFitter/GPU/cuda/GPUInterface.cu b/Common/DCAFitter/GPU/cuda/GPUInterface.cu index 5054d5bbb6d87..26529d41b294b 100644 --- a/Common/DCAFitter/GPU/cuda/GPUInterface.cu +++ b/Common/DCAFitter/GPU/cuda/GPUInterface.cu @@ -23,6 +23,10 @@ { \ gpuAssert((x), __FILE__, __LINE__); \ } +#define gpuCheckErrorSoft(x) \ + { \ + gpuAssert((x), __FILE__, __LINE__, false); \ + } inline void gpuAssert(cudaError_t code, const char* file, int line, bool abort = true) { if (code != cudaSuccess) { @@ -86,8 +90,14 @@ void GPUInterface::freeDevice(void* addr) gpuCheckError(cudaFree(addr)); } -Stream& GPUInterface::getStream(short N) +Stream& GPUInterface::getStream(unsigned short N) { return mStreams[N % mStreams.size()]; } + +Stream& GPUInterface::getNextStream() +{ + unsigned short next = mLastUsedStream.fetch_add(1) % mStreams.size(); // wrap-around + automatic wrap-around beyond 65535 + return mStreams[next]; +} } // namespace o2::vertexing::device \ No newline at end of file diff --git a/Common/DCAFitter/GPU/cuda/test/testDCAFitterNGPU.cxx b/Common/DCAFitter/GPU/cuda/test/testDCAFitterNGPU.cxx index 14ed8004d3126..56da8e173bcf0 100644 --- a/Common/DCAFitter/GPU/cuda/test/testDCAFitterNGPU.cxx +++ b/Common/DCAFitter/GPU/cuda/test/testDCAFitterNGPU.cxx @@ -25,7 +25,8 @@ #define nBlocks 30 #define nThreads 256 -#define NTest 100000 +#define nStreams 8 +#define NTest 100001 namespace o2 { @@ -612,7 +613,8 @@ BOOST_AUTO_TEST_CASE(DCAFitterNProngsBulk) } swAb.Start(false); - auto ncAb = device::processBulk(nBlocks, nThreads, fitters_host, vctracks[0], vctracks[1]); // HERE WE FIT THE VERTICES + std::vector ncAb(NTest, 0); + device::processBulk(nBlocks, nThreads, nStreams, fitters_host, ncAb, vctracks[0], vctracks[1]); // HERE WE FIT THE VERTICES swAb.Stop(); for (int iev = 0; iev < NTest; iev++) { @@ -628,7 +630,8 @@ BOOST_AUTO_TEST_CASE(DCAFitterNProngsBulk) ft.setWeightedFinalPCA(true); std::fill(fitters_host.begin(), fitters_host.end(), ft); swAWb.Start(false); - auto ncAWb = device::processBulk(nBlocks, nThreads, fitters_host, vctracks[0], vctracks[1]); // HERE WE FIT THE VERTICES + std::vector ncAWb(NTest, 0); + device::processBulk(nBlocks, nThreads, nStreams, fitters_host, ncAWb, vctracks[0], vctracks[1]); // HERE WE FIT THE VERTICES swAWb.Stop(); for (int iev = 0; iev < NTest; iev++) { @@ -644,7 +647,8 @@ BOOST_AUTO_TEST_CASE(DCAFitterNProngsBulk) ft.setWeightedFinalPCA(false); std::fill(fitters_host.begin(), fitters_host.end(), ft); swWb.Start(false); - auto ncWb = device::processBulk(nBlocks, nThreads, fitters_host, vctracks[0], vctracks[1]); // HERE WE FIT THE VERTICES + std::vector ncWb(NTest, 0); + device::processBulk(nBlocks, nThreads, nStreams, fitters_host, ncWb, vctracks[0], vctracks[1]); // HERE WE FIT THE VERTICES swWb.Stop(); for (int iev = 0; iev < NTest; iev++) { @@ -708,7 +712,8 @@ BOOST_AUTO_TEST_CASE(DCAFitterNProngsBulk) } swAb.Start(false); - auto ncAb = device::processBulk(nBlocks, nThreads, fitters_host, vctracks[0], vctracks[1]); // HERE WE FIT THE VERTICES + std::vector ncAb(NTest, 0); + device::processBulk(nBlocks, nThreads, nStreams, fitters_host, ncAb, vctracks[0], vctracks[1]); // HERE WE FIT THE VERTICES swAb.Stop(); for (int iev = 0; iev < NTest; iev++) { @@ -724,7 +729,8 @@ BOOST_AUTO_TEST_CASE(DCAFitterNProngsBulk) ft.setWeightedFinalPCA(true); std::fill(fitters_host.begin(), fitters_host.end(), ft); swAWb.Start(false); - auto ncAWb = device::processBulk(nBlocks, nThreads, fitters_host, vctracks[0], vctracks[1]); // HERE WE FIT THE VERTICES + std::vector ncAWb(NTest, 0); + device::processBulk(nBlocks, nThreads, nStreams, fitters_host, ncAWb, vctracks[0], vctracks[1]); // HERE WE FIT THE VERTICES swAWb.Stop(); for (int iev = 0; iev < NTest; iev++) { @@ -740,7 +746,8 @@ BOOST_AUTO_TEST_CASE(DCAFitterNProngsBulk) ft.setWeightedFinalPCA(false); std::fill(fitters_host.begin(), fitters_host.end(), ft); swWb.Start(false); - auto ncWb = device::processBulk(nBlocks, nThreads, fitters_host, vctracks[0], vctracks[1]); // HERE WE FIT THE VERTICES + std::vector ncWb(NTest, 0); + device::processBulk(nBlocks, nThreads, nStreams, fitters_host, ncWb, vctracks[0], vctracks[1]); // HERE WE FIT THE VERTICES swWb.Stop(); for (int iev = 0; iev < NTest; iev++) { @@ -806,7 +813,8 @@ BOOST_AUTO_TEST_CASE(DCAFitterNProngsBulk) std::fill(fitters_host.begin(), fitters_host.end(), ft); swAb.Start(false); - auto ncAb = device::processBulk(nBlocks, nThreads, fitters_host, vctracks[0], vctracks[1]); // HERE WE FIT THE VERTICES + std::vector ncAb(NTest, 0); + device::processBulk(nBlocks, nThreads, nStreams, fitters_host, ncAb, vctracks[0], vctracks[1]); // HERE WE FIT THE VERTICES swAb.Stop(); for (int iev = 0; iev < NTest; iev++) { @@ -822,7 +830,8 @@ BOOST_AUTO_TEST_CASE(DCAFitterNProngsBulk) ft.setWeightedFinalPCA(true); std::fill(fitters_host.begin(), fitters_host.end(), ft); swAWb.Start(false); - auto ncAWb = device::processBulk(nBlocks, nThreads, fitters_host, vctracks[0], vctracks[1]); // HERE WE FIT THE VERTICES + std::vector ncAWb(NTest, 0); + device::processBulk(nBlocks, nThreads, nStreams, fitters_host, ncAWb, vctracks[0], vctracks[1]); // HERE WE FIT THE VERTICES swAWb.Stop(); for (int iev = 0; iev < NTest; iev++) { @@ -838,7 +847,8 @@ BOOST_AUTO_TEST_CASE(DCAFitterNProngsBulk) ft.setWeightedFinalPCA(false); std::fill(fitters_host.begin(), fitters_host.end(), ft); swWb.Start(false); - auto ncWb = device::processBulk(nBlocks, nThreads, fitters_host, vctracks[0], vctracks[1]); // HERE WE FIT THE VERTICES + std::vector ncWb(NTest, 0); + device::processBulk(nBlocks, nThreads, nStreams, fitters_host, ncWb, vctracks[0], vctracks[1]); // HERE WE FIT THE VERTICES swWb.Stop(); for (int iev = 0; iev < NTest; iev++) { @@ -903,7 +913,8 @@ BOOST_AUTO_TEST_CASE(DCAFitterNProngsBulk) std::fill(fitters_host.begin(), fitters_host.end(), ft); swAb.Start(false); - auto ncAb = device::processBulk(nBlocks, nThreads, fitters_host, vctracks[0], vctracks[1]); // HERE WE FIT THE VERTICES + std::vector ncAb(NTest, 0); + device::processBulk(nBlocks, nThreads, nStreams, fitters_host, ncAb, vctracks[0], vctracks[1]); // HERE WE FIT THE VERTICES swAb.Stop(); for (int iev = 0; iev < NTest; iev++) { @@ -919,7 +930,8 @@ BOOST_AUTO_TEST_CASE(DCAFitterNProngsBulk) ft.setWeightedFinalPCA(true); std::fill(fitters_host.begin(), fitters_host.end(), ft); swAWb.Start(false); - auto ncAWb = device::processBulk(nBlocks, nThreads, fitters_host, vctracks[0], vctracks[1]); // HERE WE FIT THE VERTICES + std::vector ncAWb(NTest, 0); + device::processBulk(nBlocks, nThreads, nStreams, fitters_host, ncAWb, vctracks[0], vctracks[1]); // HERE WE FIT THE VERTICES swAWb.Stop(); for (int iev = 0; iev < NTest; iev++) { LOG(debug) << "fit abs.dist " << iev << " NC: " << ncAWb[iev] << " Chi2: " << (ncAWb[iev] ? fitters_host[iev].getChi2AtPCACandidate(0) : -1); @@ -935,7 +947,8 @@ BOOST_AUTO_TEST_CASE(DCAFitterNProngsBulk) std::fill(fitters_host.begin(), fitters_host.end(), ft); swWb.Start(false); - auto ncWb = device::processBulk(nBlocks, nThreads, fitters_host, vctracks[0], vctracks[1]); // HERE WE FIT THE VERTICES + std::vector ncWb(NTest, 0); + device::processBulk(nBlocks, nThreads, nStreams, fitters_host, ncWb, vctracks[0], vctracks[1]); // HERE WE FIT THE VERTICES swWb.Stop(); for (int iev = 0; iev < NTest; iev++) { @@ -1000,7 +1013,8 @@ BOOST_AUTO_TEST_CASE(DCAFitterNProngsBulk) ft.setUseAbsDCA(true); std::fill(fitters_host.begin(), fitters_host.end(), ft); swAb.Start(false); - auto ncAb = device::processBulk(nBlocks, nThreads, fitters_host, vctracks[0], vctracks[1], vctracks[2]); // HERE WE FIT THE VERTICES + std::vector ncAb(NTest, 0); + device::processBulk(nBlocks, nThreads, nStreams, fitters_host, ncAb, vctracks[0], vctracks[1], vctracks[2]); // HERE WE FIT THE VERTICES swAb.Stop(); for (int iev = 0; iev < NTest; iev++) { LOG(debug) << "fit abs.dist " << iev << " NC: " << ncAb[iev] << " Chi2: " << (ncAb[iev] ? fitters_host[iev].getChi2AtPCACandidate(0) : -1); @@ -1016,7 +1030,8 @@ BOOST_AUTO_TEST_CASE(DCAFitterNProngsBulk) std::fill(fitters_host.begin(), fitters_host.end(), ft); swAWb.Start(false); - auto ncAWb = device::processBulk(nBlocks, nThreads, fitters_host, vctracks[0], vctracks[1], vctracks[2]); // HERE WE FIT THE VERTICES + std::vector ncAWb(NTest, 0); + device::processBulk(nBlocks, nThreads, nStreams, fitters_host, ncAWb, vctracks[0], vctracks[1], vctracks[2]); // HERE WE FIT THE VERTICES swAWb.Stop(); for (int iev = 0; iev < NTest; iev++) { LOG(debug) << "fit abs.dist " << iev << " NC: " << ncAWb[iev] << " Chi2: " << (ncAWb[iev] ? fitters_host[iev].getChi2AtPCACandidate(0) : -1); @@ -1032,7 +1047,8 @@ BOOST_AUTO_TEST_CASE(DCAFitterNProngsBulk) std::fill(fitters_host.begin(), fitters_host.end(), ft); swWb.Start(false); - auto ncWb = device::processBulk(nBlocks, nThreads, fitters_host, vctracks[0], vctracks[1], vctracks[2]); // HERE WE FIT THE VERTICES + std::vector ncWb(NTest, 0); + device::processBulk(nBlocks, nThreads, nStreams, fitters_host, ncWb, vctracks[0], vctracks[1], vctracks[2]); // HERE WE FIT THE VERTICES swWb.Stop(); for (int iev = 0; iev < NTest; iev++) { LOG(debug) << "fit wgh.dist " << iev << " NC: " << ncWb[iev] << " Chi2: " << (ncWb[iev] ? fitters_host[iev].getChi2AtPCACandidate(0) : -1); diff --git a/Common/DCAFitter/include/DCAFitter/DCAFitterN.h b/Common/DCAFitter/include/DCAFitter/DCAFitterN.h index 5a89597ad379a..2e36f7588e8be 100644 --- a/Common/DCAFitter/include/DCAFitter/DCAFitterN.h +++ b/Common/DCAFitter/include/DCAFitter/DCAFitterN.h @@ -1142,7 +1142,7 @@ template int process(const int nBlocks, const int nThreads, Fitter&, Tr&... args); template -std::vector processBulk(const int nBlocks, const int nThreads, std::vector& fitters, std::vector&... args); +void processBulk(const int nBlocks, const int nThreads, const int nBatches, std::vector& fitters, std::vector& results, std::vector&... args); } // namespace device } // namespace vertexing