Skip to content

Commit

Permalink
Fix batching of the BulkProcessing
Browse files Browse the repository at this point in the history
  • Loading branch information
mconcas committed Oct 1, 2024
1 parent 8a86d08 commit 1a33857
Show file tree
Hide file tree
Showing 5 changed files with 132 additions and 65 deletions.
5 changes: 3 additions & 2 deletions Common/DCAFitter/GPU/DeviceInterface/GPUInterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,16 @@ class GPUInterface
void unregisterBuffer(void* addr);
void allocDevice(void**, size_t);
void freeDevice(void*);
Stream& getStream(short N = 0);
Stream& getStream(unsigned short N = 0);
Stream& getNextStream();

protected:
GPUInterface(size_t N = 1);
~GPUInterface();

void resize(size_t);
unsigned short getNextCursor();

std::atomic<unsigned short> mLastUsedStream{0};
static GPUInterface* sGPUInterface;
std::vector<std::thread> mPool{};
std::vector<Stream> mStreams{};
Expand Down
130 changes: 85 additions & 45 deletions Common/DCAFitter/GPU/cuda/DCAFitterN.cu
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,10 @@ GPUg() void processKernel(Fitter* fitter, int* res, Tr*... tracks)
}

template <typename Fitter, typename... Tr>
GPUg() void processBulkKernel(Fitter* fitters, int* results, unsigned int N, Tr*... tracks)
GPUg() void processBatchKernel(Fitter* fitters, int* results, size_t off, size_t N, Tr*... tracks)
{
for (auto iThread{blockIdx.x * blockDim.x + threadIdx.x}; iThread < N; iThread += blockDim.x * gridDim.x) {
results[iThread] = fitters[iThread].process(tracks[iThread]...);
results[iThread + off] = fitters[iThread + off].process(tracks[iThread + off]...);
}
}

Expand Down Expand Up @@ -131,71 +131,111 @@ int process(const int nBlocks,
}

template <typename Fitter, class... Tr>
std::vector<int> processBulk(const int nBlocks,
const int nThreads,
std::vector<Fitter>& fitters,
std::vector<Tr>&... args)
void processBulk(const int nBlocks,
const int nThreads,
const int nStreams,
std::vector<Fitter>& fitters,
std::vector<int>& results,
std::vector<Tr>&... args)
{
auto* gpuInterface = GPUInterface::Instance();
kernel::warmUpGpuKernel<<<1, 1>>>();

cudaEvent_t start, stop;
gpuCheckError(cudaEventCreate(&start));
gpuCheckError(cudaEventCreate(&stop));
const auto nFits{fitters.size()}; // for clarity: size of all the vectors needs to be equal, not enforcing it here yet.
std::vector<int> results(nFits);
int* results_device;
Fitter* fitters_device;
// Benchmarking events
// std::vector<cudaEvent_t> start(nStreams), stop(nStreams);
// cudaEvent_t totalStart, totalStop;
// gpuCheckError(cudaEventCreate(&totalStart));
// gpuCheckError(cudaEventCreate(&totalStop));
// for (int iBatch{0}; iBatch < nStreams; ++iBatch) {
// gpuCheckError(cudaEventCreate(&start[iBatch]));
// gpuCheckError(cudaEventCreate(&stop[iBatch]));
// }

// Tracks
std::array<o2::track::TrackParCov*, Fitter::getNProngs()> tracks_device;
auto* gpuInterface = GPUInterface::Instance();

int iArg{0};
([&] {
gpuInterface->registerBuffer(reinterpret_cast<void*>(args.data()), sizeof(Tr) * args.size());
gpuInterface->allocDevice(reinterpret_cast<void**>(&(tracks_device[iArg])), sizeof(Tr) * args.size());
gpuCheckError(cudaMemcpyAsync(tracks_device[iArg], args.data(), sizeof(Tr) * args.size(), cudaMemcpyHostToDevice, gpuInterface->getStream(iArg)));
++iArg;
}(),
...);
gpuInterface->registerBuffer(reinterpret_cast<void*>(fitters.data()), sizeof(Fitter) * nFits);
gpuInterface->registerBuffer(reinterpret_cast<void*>(results.data()), sizeof(int) * nFits);
gpuInterface->allocDevice(reinterpret_cast<void**>(&results_device), sizeof(int) * nFits);
gpuInterface->allocDevice(reinterpret_cast<void**>(&fitters_device), sizeof(Fitter) * nFits);
gpuCheckError(cudaMemcpy(fitters_device, fitters.data(), sizeof(Fitter) * nFits, cudaMemcpyHostToDevice));

gpuCheckError(cudaEventRecord(start));
std::apply([&](auto&&... args) { kernel::processBulkKernel<<<nBlocks, nThreads>>>(fitters_device, results_device, nFits, args...); }, tracks_device);
gpuCheckError(cudaEventRecord(stop));

gpuCheckError(cudaPeekAtLastError());
gpuCheckError(cudaDeviceSynchronize());
// Fitters
gpuInterface->registerBuffer(reinterpret_cast<void*>(fitters.data()), sizeof(Fitter) * fitters.size());
Fitter* fitters_device;
gpuInterface->allocDevice(reinterpret_cast<void**>(&fitters_device), sizeof(Fitter) * fitters.size());

gpuCheckError(cudaMemcpy(results.data(), results_device, sizeof(int) * results.size(), cudaMemcpyDeviceToHost));
gpuCheckError(cudaMemcpy(fitters.data(), fitters_device, sizeof(Fitter) * nFits, cudaMemcpyDeviceToHost));
// Results
gpuInterface->registerBuffer(reinterpret_cast<void*>(results.data()), sizeof(int) * fitters.size());
int* results_device;
gpuInterface->allocDevice(reinterpret_cast<void**>(&results_device), sizeof(int) * fitters.size());

// gpuCheckError(cudaEventRecord(totalStart));
int totalSize = fitters.size();
int batchSize = totalSize / nStreams;
int remainder = totalSize % nStreams;

for (int iBatch{0}; iBatch < nStreams; ++iBatch) {
auto& stream = gpuInterface->getNextStream();
auto offset = iBatch * batchSize + std::min(iBatch, remainder);
auto nFits = batchSize + (iBatch < remainder ? 1 : 0);

gpuCheckError(cudaMemcpyAsync(fitters_device + offset, fitters.data() + offset, sizeof(Fitter) * nFits, cudaMemcpyHostToDevice, stream));
iArg = 0;
([&] {
gpuCheckError(cudaMemcpyAsync(tracks_device[iArg] + offset, args.data() + offset, sizeof(Tr) * nFits, cudaMemcpyHostToDevice, stream));
++iArg;
}(),
...);
// gpuCheckError(cudaEventRecord(start[iBatch]));
std::apply([&](auto&&... args) { kernel::processBatchKernel<<<nBlocks, nThreads, 0, stream>>>(fitters_device, results_device, offset, nFits, args...); }, tracks_device);
// gpuCheckError(cudaEventRecord(stop[iBatch]));

gpuCheckError(cudaPeekAtLastError());
gpuCheckError(cudaStreamSynchronize(stream));
iArg = 0;
([&] {
gpuCheckError(cudaMemcpyAsync(args.data() + offset, tracks_device[iArg] + offset, sizeof(Tr) * nFits, cudaMemcpyDeviceToHost, stream));
++iArg;
}(),
...);
gpuCheckError(cudaMemcpyAsync(fitters.data() + offset, fitters_device + offset, sizeof(Fitter) * nFits, cudaMemcpyDeviceToHost, stream));
gpuCheckError(cudaMemcpyAsync(results.data() + offset, results_device + offset, sizeof(int) * nFits, cudaMemcpyDeviceToHost, stream));
}
([&] { gpuInterface->unregisterBuffer(args.data()); }(), ...);
// gpuCheckError(cudaEventRecord(totalStop));

iArg = 0;
([&] {
gpuCheckError(cudaMemcpyAsync(args.data(), tracks_device[iArg], sizeof(Tr) * args.size(), cudaMemcpyDeviceToHost, gpuInterface->getStream(iArg)));
gpuInterface->freeDevice(tracks_device[iArg]);
gpuInterface->unregisterBuffer(args.data());
++iArg;
}(),
...);
for (auto* tracksD : tracks_device) {
gpuInterface->freeDevice(tracksD);
}

gpuInterface->freeDevice(fitters_device);
gpuInterface->freeDevice(results_device);
gpuInterface->unregisterBuffer(fitters.data());
gpuInterface->unregisterBuffer(results.data());
gpuCheckError(cudaEventSynchronize(stop));

float milliseconds = 0;
gpuCheckError(cudaEventElapsedTime(&milliseconds, start, stop));
// float milliseconds = 0;
// gpuCheckError(cudaEventElapsedTime(&milliseconds, start, stop));

LOGP(info, "Kernel run in: {} ms using {} blocks and {} threads.", milliseconds, nBlocks, nThreads);
return results;
// LOGP(info, "Kernel run in: {} ms using {} blocks and {} threads.", milliseconds, nBlocks, nThreads);
// return results;
}

template std::vector<int> processBulk(const int, const int, std::vector<o2::vertexing::DCAFitterN<2>>&, std::vector<o2::track::TrackParCov>&, std::vector<o2::track::TrackParCov>&);
template std::vector<int> processBulk(const int, const int, std::vector<o2::vertexing::DCAFitterN<3>>&, std::vector<o2::track::TrackParCov>&, std::vector<o2::track::TrackParCov>&, std::vector<o2::track::TrackParCov>&);
template void processBulk(const int,
const int,
const int,
std::vector<o2::vertexing::DCAFitterN<2>>&,
std::vector<int>&,
std::vector<o2::track::TrackParCov>&,
std::vector<o2::track::TrackParCov>&);
template void processBulk(const int,
const int,
const int,
std::vector<o2::vertexing::DCAFitterN<3>>&,
std::vector<int>&,
std::vector<o2::track::TrackParCov>&,
std::vector<o2::track::TrackParCov>&,
std::vector<o2::track::TrackParCov>&);
template int process(const int, const int, o2::vertexing::DCAFitterN<2>&, o2::track::TrackParCov&, o2::track::TrackParCov&);
template int process(const int, const int, o2::vertexing::DCAFitterN<3>&, o2::track::TrackParCov&, o2::track::TrackParCov&, o2::track::TrackParCov&);
template void print(const int, const int, o2::vertexing::DCAFitterN<2>&);
Expand Down
12 changes: 11 additions & 1 deletion Common/DCAFitter/GPU/cuda/GPUInterface.cu
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@
{ \
gpuAssert((x), __FILE__, __LINE__); \
}
#define gpuCheckErrorSoft(x) \
{ \
gpuAssert((x), __FILE__, __LINE__, false); \
}
inline void gpuAssert(cudaError_t code, const char* file, int line, bool abort = true)
{
if (code != cudaSuccess) {
Expand Down Expand Up @@ -86,8 +90,14 @@ void GPUInterface::freeDevice(void* addr)
gpuCheckError(cudaFree(addr));
}

Stream& GPUInterface::getStream(short N)
Stream& GPUInterface::getStream(unsigned short N)
{
return mStreams[N % mStreams.size()];
}

Stream& GPUInterface::getNextStream()
{
unsigned short next = mLastUsedStream.fetch_add(1) % mStreams.size(); // wrap-around + automatic wrap-around beyond 65535
return mStreams[next];
}
} // namespace o2::vertexing::device
48 changes: 32 additions & 16 deletions Common/DCAFitter/GPU/cuda/test/testDCAFitterNGPU.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@

#define nBlocks 30
#define nThreads 256
#define NTest 100000
#define nStreams 8
#define NTest 100001

namespace o2
{
Expand Down Expand Up @@ -612,7 +613,8 @@ BOOST_AUTO_TEST_CASE(DCAFitterNProngsBulk)
}

swAb.Start(false);
auto ncAb = device::processBulk(nBlocks, nThreads, fitters_host, vctracks[0], vctracks[1]); // HERE WE FIT THE VERTICES
std::vector<int> ncAb(NTest, 0);
device::processBulk(nBlocks, nThreads, nStreams, fitters_host, ncAb, vctracks[0], vctracks[1]); // HERE WE FIT THE VERTICES
swAb.Stop();

for (int iev = 0; iev < NTest; iev++) {
Expand All @@ -628,7 +630,8 @@ BOOST_AUTO_TEST_CASE(DCAFitterNProngsBulk)
ft.setWeightedFinalPCA(true);
std::fill(fitters_host.begin(), fitters_host.end(), ft);
swAWb.Start(false);
auto ncAWb = device::processBulk(nBlocks, nThreads, fitters_host, vctracks[0], vctracks[1]); // HERE WE FIT THE VERTICES
std::vector<int> ncAWb(NTest, 0);
device::processBulk(nBlocks, nThreads, nStreams, fitters_host, ncAWb, vctracks[0], vctracks[1]); // HERE WE FIT THE VERTICES
swAWb.Stop();

for (int iev = 0; iev < NTest; iev++) {
Expand All @@ -644,7 +647,8 @@ BOOST_AUTO_TEST_CASE(DCAFitterNProngsBulk)
ft.setWeightedFinalPCA(false);
std::fill(fitters_host.begin(), fitters_host.end(), ft);
swWb.Start(false);
auto ncWb = device::processBulk(nBlocks, nThreads, fitters_host, vctracks[0], vctracks[1]); // HERE WE FIT THE VERTICES
std::vector<int> ncWb(NTest, 0);
device::processBulk(nBlocks, nThreads, nStreams, fitters_host, ncWb, vctracks[0], vctracks[1]); // HERE WE FIT THE VERTICES
swWb.Stop();

for (int iev = 0; iev < NTest; iev++) {
Expand Down Expand Up @@ -708,7 +712,8 @@ BOOST_AUTO_TEST_CASE(DCAFitterNProngsBulk)
}

swAb.Start(false);
auto ncAb = device::processBulk(nBlocks, nThreads, fitters_host, vctracks[0], vctracks[1]); // HERE WE FIT THE VERTICES
std::vector<int> ncAb(NTest, 0);
device::processBulk(nBlocks, nThreads, nStreams, fitters_host, ncAb, vctracks[0], vctracks[1]); // HERE WE FIT THE VERTICES
swAb.Stop();

for (int iev = 0; iev < NTest; iev++) {
Expand All @@ -724,7 +729,8 @@ BOOST_AUTO_TEST_CASE(DCAFitterNProngsBulk)
ft.setWeightedFinalPCA(true);
std::fill(fitters_host.begin(), fitters_host.end(), ft);
swAWb.Start(false);
auto ncAWb = device::processBulk(nBlocks, nThreads, fitters_host, vctracks[0], vctracks[1]); // HERE WE FIT THE VERTICES
std::vector<int> ncAWb(NTest, 0);
device::processBulk(nBlocks, nThreads, nStreams, fitters_host, ncAWb, vctracks[0], vctracks[1]); // HERE WE FIT THE VERTICES
swAWb.Stop();

for (int iev = 0; iev < NTest; iev++) {
Expand All @@ -740,7 +746,8 @@ BOOST_AUTO_TEST_CASE(DCAFitterNProngsBulk)
ft.setWeightedFinalPCA(false);
std::fill(fitters_host.begin(), fitters_host.end(), ft);
swWb.Start(false);
auto ncWb = device::processBulk(nBlocks, nThreads, fitters_host, vctracks[0], vctracks[1]); // HERE WE FIT THE VERTICES
std::vector<int> ncWb(NTest, 0);
device::processBulk(nBlocks, nThreads, nStreams, fitters_host, ncWb, vctracks[0], vctracks[1]); // HERE WE FIT THE VERTICES
swWb.Stop();

for (int iev = 0; iev < NTest; iev++) {
Expand Down Expand Up @@ -806,7 +813,8 @@ BOOST_AUTO_TEST_CASE(DCAFitterNProngsBulk)
std::fill(fitters_host.begin(), fitters_host.end(), ft);

swAb.Start(false);
auto ncAb = device::processBulk(nBlocks, nThreads, fitters_host, vctracks[0], vctracks[1]); // HERE WE FIT THE VERTICES
std::vector<int> ncAb(NTest, 0);
device::processBulk(nBlocks, nThreads, nStreams, fitters_host, ncAb, vctracks[0], vctracks[1]); // HERE WE FIT THE VERTICES
swAb.Stop();

for (int iev = 0; iev < NTest; iev++) {
Expand All @@ -822,7 +830,8 @@ BOOST_AUTO_TEST_CASE(DCAFitterNProngsBulk)
ft.setWeightedFinalPCA(true);
std::fill(fitters_host.begin(), fitters_host.end(), ft);
swAWb.Start(false);
auto ncAWb = device::processBulk(nBlocks, nThreads, fitters_host, vctracks[0], vctracks[1]); // HERE WE FIT THE VERTICES
std::vector<int> ncAWb(NTest, 0);
device::processBulk(nBlocks, nThreads, nStreams, fitters_host, ncAWb, vctracks[0], vctracks[1]); // HERE WE FIT THE VERTICES
swAWb.Stop();

for (int iev = 0; iev < NTest; iev++) {
Expand All @@ -838,7 +847,8 @@ BOOST_AUTO_TEST_CASE(DCAFitterNProngsBulk)
ft.setWeightedFinalPCA(false);
std::fill(fitters_host.begin(), fitters_host.end(), ft);
swWb.Start(false);
auto ncWb = device::processBulk(nBlocks, nThreads, fitters_host, vctracks[0], vctracks[1]); // HERE WE FIT THE VERTICES
std::vector<int> ncWb(NTest, 0);
device::processBulk(nBlocks, nThreads, nStreams, fitters_host, ncWb, vctracks[0], vctracks[1]); // HERE WE FIT THE VERTICES
swWb.Stop();

for (int iev = 0; iev < NTest; iev++) {
Expand Down Expand Up @@ -903,7 +913,8 @@ BOOST_AUTO_TEST_CASE(DCAFitterNProngsBulk)
std::fill(fitters_host.begin(), fitters_host.end(), ft);

swAb.Start(false);
auto ncAb = device::processBulk(nBlocks, nThreads, fitters_host, vctracks[0], vctracks[1]); // HERE WE FIT THE VERTICES
std::vector<int> ncAb(NTest, 0);
device::processBulk(nBlocks, nThreads, nStreams, fitters_host, ncAb, vctracks[0], vctracks[1]); // HERE WE FIT THE VERTICES
swAb.Stop();

for (int iev = 0; iev < NTest; iev++) {
Expand All @@ -919,7 +930,8 @@ BOOST_AUTO_TEST_CASE(DCAFitterNProngsBulk)
ft.setWeightedFinalPCA(true);
std::fill(fitters_host.begin(), fitters_host.end(), ft);
swAWb.Start(false);
auto ncAWb = device::processBulk(nBlocks, nThreads, fitters_host, vctracks[0], vctracks[1]); // HERE WE FIT THE VERTICES
std::vector<int> ncAWb(NTest, 0);
device::processBulk(nBlocks, nThreads, nStreams, fitters_host, ncAWb, vctracks[0], vctracks[1]); // HERE WE FIT THE VERTICES
swAWb.Stop();
for (int iev = 0; iev < NTest; iev++) {
LOG(debug) << "fit abs.dist " << iev << " NC: " << ncAWb[iev] << " Chi2: " << (ncAWb[iev] ? fitters_host[iev].getChi2AtPCACandidate(0) : -1);
Expand All @@ -935,7 +947,8 @@ BOOST_AUTO_TEST_CASE(DCAFitterNProngsBulk)
std::fill(fitters_host.begin(), fitters_host.end(), ft);

swWb.Start(false);
auto ncWb = device::processBulk(nBlocks, nThreads, fitters_host, vctracks[0], vctracks[1]); // HERE WE FIT THE VERTICES
std::vector<int> ncWb(NTest, 0);
device::processBulk(nBlocks, nThreads, nStreams, fitters_host, ncWb, vctracks[0], vctracks[1]); // HERE WE FIT THE VERTICES
swWb.Stop();

for (int iev = 0; iev < NTest; iev++) {
Expand Down Expand Up @@ -1000,7 +1013,8 @@ BOOST_AUTO_TEST_CASE(DCAFitterNProngsBulk)
ft.setUseAbsDCA(true);
std::fill(fitters_host.begin(), fitters_host.end(), ft);
swAb.Start(false);
auto ncAb = device::processBulk(nBlocks, nThreads, fitters_host, vctracks[0], vctracks[1], vctracks[2]); // HERE WE FIT THE VERTICES
std::vector<int> ncAb(NTest, 0);
device::processBulk(nBlocks, nThreads, nStreams, fitters_host, ncAb, vctracks[0], vctracks[1], vctracks[2]); // HERE WE FIT THE VERTICES
swAb.Stop();
for (int iev = 0; iev < NTest; iev++) {
LOG(debug) << "fit abs.dist " << iev << " NC: " << ncAb[iev] << " Chi2: " << (ncAb[iev] ? fitters_host[iev].getChi2AtPCACandidate(0) : -1);
Expand All @@ -1016,7 +1030,8 @@ BOOST_AUTO_TEST_CASE(DCAFitterNProngsBulk)
std::fill(fitters_host.begin(), fitters_host.end(), ft);

swAWb.Start(false);
auto ncAWb = device::processBulk(nBlocks, nThreads, fitters_host, vctracks[0], vctracks[1], vctracks[2]); // HERE WE FIT THE VERTICES
std::vector<int> ncAWb(NTest, 0);
device::processBulk(nBlocks, nThreads, nStreams, fitters_host, ncAWb, vctracks[0], vctracks[1], vctracks[2]); // HERE WE FIT THE VERTICES
swAWb.Stop();
for (int iev = 0; iev < NTest; iev++) {
LOG(debug) << "fit abs.dist " << iev << " NC: " << ncAWb[iev] << " Chi2: " << (ncAWb[iev] ? fitters_host[iev].getChi2AtPCACandidate(0) : -1);
Expand All @@ -1032,7 +1047,8 @@ BOOST_AUTO_TEST_CASE(DCAFitterNProngsBulk)
std::fill(fitters_host.begin(), fitters_host.end(), ft);

swWb.Start(false);
auto ncWb = device::processBulk(nBlocks, nThreads, fitters_host, vctracks[0], vctracks[1], vctracks[2]); // HERE WE FIT THE VERTICES
std::vector<int> ncWb(NTest, 0);
device::processBulk(nBlocks, nThreads, nStreams, fitters_host, ncWb, vctracks[0], vctracks[1], vctracks[2]); // HERE WE FIT THE VERTICES
swWb.Stop();
for (int iev = 0; iev < NTest; iev++) {
LOG(debug) << "fit wgh.dist " << iev << " NC: " << ncWb[iev] << " Chi2: " << (ncWb[iev] ? fitters_host[iev].getChi2AtPCACandidate(0) : -1);
Expand Down
2 changes: 1 addition & 1 deletion Common/DCAFitter/include/DCAFitter/DCAFitterN.h
Original file line number Diff line number Diff line change
Expand Up @@ -1142,7 +1142,7 @@ template <typename Fitter, class... Tr>
int process(const int nBlocks, const int nThreads, Fitter&, Tr&... args);

template <class Fitter, class... Tr>
std::vector<int> processBulk(const int nBlocks, const int nThreads, std::vector<Fitter>& fitters, std::vector<Tr>&... args);
void processBulk(const int nBlocks, const int nThreads, const int nBatches, std::vector<Fitter>& fitters, std::vector<int>& results, std::vector<Tr>&... args);
} // namespace device

} // namespace vertexing
Expand Down

0 comments on commit 1a33857

Please sign in to comment.