Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Continue to make KvikIO a shared library by moving code from hpp to cpp #581

Merged
20 changes: 18 additions & 2 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# =============================================================================
# Copyright (c) 2021-2024, NVIDIA CORPORATION.
# Copyright (c) 2021-2025, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
# in compliance with the License. You may obtain a copy of the License at
Expand Down Expand Up @@ -131,7 +131,23 @@ include(cmake/thirdparty/get_thread_pool.cmake)
# ##################################################################################################
# * library targets --------------------------------------------------------------------------------

set(SOURCES "src/file_handle.cpp")
set(SOURCES
"src/batch.cpp"
"src/bounce_buffer.cpp"
"src/buffer.cpp"
"src/cufile/config.cpp"
"src/cufile/driver.cpp"
"src/defaults.cpp"
"src/error.cpp"
"src/file_handle.cpp"
"src/posix_io.cpp"
"src/shim/cuda.cpp"
"src/shim/cufile.cpp"
"src/shim/libcurl.cpp"
Copy link
Contributor

@bdice bdice Jan 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think libcurl.cpp should only be built if we have an option like CUDF_KVIKIO_REMOTE_IO=ON to enable it. Perhaps it should be named KVIKIO_REMOTE_IO? See NVIDIA/spark-rapids-jni#2766. No curl dependency should be required if that CMake option is off.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The option I am looking for may already exist: KvikIO_REMOTE_SUPPORT

"src/shim/utils.cpp"
"src/stream.cpp"
"src/utils.cpp"
)

if(KvikIO_REMOTE_SUPPORT)
list(APPEND SOURCES "src/remote_handle.cpp")
Expand Down
84 changes: 17 additions & 67 deletions cpp/include/kvikio/batch.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
* Copyright (c) 2023-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -73,68 +73,30 @@ class BatchHandle {
*
* @param max_num_events The maximum number of operations supported by this instance.
*/
BatchHandle(int max_num_events) : _initialized{true}, _max_num_events{max_num_events}
{
CUFILE_TRY(cuFileAPI::instance().BatchIOSetUp(&_handle, max_num_events));
}
BatchHandle(int max_num_events);

/**
* @brief BatchHandle support move semantic but isn't copyable
*/
BatchHandle(const BatchHandle&) = delete;
BatchHandle& operator=(BatchHandle const&) = delete;
BatchHandle(BatchHandle&& o) noexcept
: _initialized{std::exchange(o._initialized, false)},
_max_num_events{std::exchange(o._max_num_events, 0)}
{
_handle = std::exchange(o._handle, CUfileBatchHandle_t{});
}
~BatchHandle() noexcept { close(); }
BatchHandle(BatchHandle&& o) noexcept;
~BatchHandle() noexcept;

[[nodiscard]] bool closed() const noexcept { return !_initialized; }
[[nodiscard]] bool closed() const noexcept;

/**
* @brief Destroy the batch handle and free up resources
*/
void close() noexcept
{
if (closed()) { return; }
_initialized = false;

cuFileAPI::instance().BatchIODestroy(_handle);
}
void close() noexcept;

/**
* @brief Submit a vector of batch operations
*
* @param operations The vector of batch operations, which must not exceed the
* `max_num_events`.
*/
void submit(const std::vector<BatchOp>& operations)
{
if (convert_size2ssize(operations.size()) > _max_num_events) {
throw CUfileException("Cannot submit more than the max_num_events)");
}
std::vector<CUfileIOParams_t> io_batch_params;
io_batch_params.reserve(operations.size());
for (const auto& op : operations) {
if (op.file_handle.is_compat_mode_preferred()) {
throw CUfileException("Cannot submit a FileHandle opened in compatibility mode");
}

io_batch_params.push_back(CUfileIOParams_t{.mode = CUFILE_BATCH,
.u = {.batch = {.devPtr_base = op.devPtr_base,
.file_offset = op.file_offset,
.devPtr_offset = op.devPtr_offset,
.size = op.size}},
.fh = op.file_handle.handle(),
.opcode = op.opcode,
.cookie = nullptr});
}

CUFILE_TRY(cuFileAPI::instance().BatchIOSubmit(
_handle, io_batch_params.size(), io_batch_params.data(), 0));
}
void submit(const std::vector<BatchOp>& operations);

/**
* @brief Get status of submitted operations
Expand All @@ -148,16 +110,9 @@ class BatchHandle {
*/
std::vector<CUfileIOEvents_t> status(unsigned min_nr,
unsigned max_nr,
struct timespec* timeout = nullptr)
{
std::vector<CUfileIOEvents_t> ret;
ret.resize(_max_num_events);
CUFILE_TRY(cuFileAPI::instance().BatchIOGetStatus(_handle, min_nr, &max_nr, &ret[0], timeout));
ret.resize(max_nr);
return ret;
}

void cancel() { CUFILE_TRY(cuFileAPI::instance().BatchIOCancel(_handle)); }
struct timespec* timeout = nullptr);

void cancel();
};

#else
Expand All @@ -166,24 +121,19 @@ class BatchHandle {
public:
BatchHandle() noexcept = default;

BatchHandle(int max_num_events)
{
throw CUfileException("BatchHandle requires cuFile's batch API, please build with CUDA v12.1+");
}
BatchHandle(int max_num_events);

[[nodiscard]] bool closed() const noexcept { return true; }
[[nodiscard]] bool closed() const noexcept;

void close() noexcept {}
void close() noexcept;

void submit(const std::vector<BatchOp>& operations) {}
void submit(const std::vector<BatchOp>& operations);

std::vector<CUfileIOEvents_t> status(unsigned min_nr,
unsigned max_nr,
struct timespec* timeout = nullptr)
{
return std::vector<CUfileIOEvents_t>{};
}
void cancel() {}
struct timespec* timeout = nullptr);

void cancel();
};

#endif
Expand Down
84 changes: 14 additions & 70 deletions cpp/include/kvikio/bounce_buffer.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
* Copyright (c) 2024-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -15,7 +15,6 @@
*/
#pragma once

#include <mutex>
#include <stack>

#include <kvikio/defaults.hpp>
Expand Down Expand Up @@ -47,18 +46,15 @@ class AllocRetain {
std::size_t const _size;

public:
Alloc(AllocRetain* manager, void* alloc, std::size_t size)
: _manager(manager), _alloc{alloc}, _size{size}
{
}
Alloc(AllocRetain* manager, void* alloc, std::size_t size);
Alloc(Alloc const&) = delete;
Alloc& operator=(Alloc const&) = delete;
Alloc(Alloc&& o) = delete;
Alloc& operator=(Alloc&& o) = delete;
~Alloc() noexcept { _manager->put(_alloc, _size); }
void* get() noexcept { return _alloc; }
void* get(std::ptrdiff_t offset) noexcept { return static_cast<char*>(_alloc) + offset; }
std::size_t size() noexcept { return _size; }
~Alloc() noexcept;
void* get() noexcept;
void* get(std::ptrdiff_t offset) noexcept;
std::size_t size() noexcept;
};

AllocRetain() = default;
Expand All @@ -77,80 +73,28 @@ class AllocRetain {
*
* @return The number of bytes cleared
*/
std::size_t _clear()
{
std::size_t ret = _free_allocs.size() * _size;
while (!_free_allocs.empty()) {
CUDA_DRIVER_TRY(cudaAPI::instance().MemFreeHost(_free_allocs.top()));
_free_allocs.pop();
}
return ret;
}
std::size_t _clear();

/**
* @brief Ensure the sizes of the retained allocations match `defaults::bounce_buffer_size()`
*
* NB: `_mutex` must be taken prior to calling this function.
*/
void _ensure_alloc_size()
{
auto const bounce_buffer_size = defaults::bounce_buffer_size();
if (_size != bounce_buffer_size) {
_clear();
_size = bounce_buffer_size;
}
}
void _ensure_alloc_size();

public:
[[nodiscard]] Alloc get()
{
std::lock_guard const lock(_mutex);
_ensure_alloc_size();

// Check if we have an allocation available
if (!_free_allocs.empty()) {
void* ret = _free_allocs.top();
_free_allocs.pop();
return Alloc(this, ret, _size);
}

// If no available allocation, allocate and register a new one
void* alloc{};
// Allocate page-locked host memory
CUDA_DRIVER_TRY(cudaAPI::instance().MemHostAlloc(&alloc, _size, CU_MEMHOSTREGISTER_PORTABLE));
return Alloc(this, alloc, _size);
}

void put(void* alloc, std::size_t size)
{
std::lock_guard const lock(_mutex);
_ensure_alloc_size();

// If the size of `alloc` matches the sizes of the retained allocations,
// it is added to the set of free allocation otherwise it is freed.
if (size == _size) {
_free_allocs.push(alloc);
} else {
CUDA_DRIVER_TRY(cudaAPI::instance().MemFreeHost(alloc));
}
}
[[nodiscard]] Alloc get();

void put(void* alloc, std::size_t size);

/**
* @brief Free all retained allocations
*
* @return The number of bytes cleared
*/
std::size_t clear()
{
std::lock_guard const lock(_mutex);
return _clear();
}

KVIKIO_EXPORT static AllocRetain& instance()
{
static AllocRetain _instance;
return _instance;
}
std::size_t clear();

KVIKIO_EXPORT static AllocRetain& instance();

AllocRetain(AllocRetain const&) = delete;
AllocRetain& operator=(AllocRetain const&) = delete;
Expand Down
52 changes: 10 additions & 42 deletions cpp/include/kvikio/buffer.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021-2024, NVIDIA CORPORATION.
* Copyright (c) 2021-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -15,17 +15,8 @@
*/
#pragma once

#include <algorithm>
#include <iostream>
#include <map>
#include <vector>

#include <kvikio/defaults.hpp>
#include <kvikio/error.hpp>
#include <kvikio/shim/cufile.hpp>
#include <kvikio/shim/cufile_h_wrapper.hpp>
#include <kvikio/utils.hpp>

namespace kvikio {

/**
Expand All @@ -44,32 +35,17 @@ namespace kvikio {
* streaming buffer that is reused across multiple cuFile IO operations.
*/
/*NOLINTNEXTLINE(readability-function-cognitive-complexity)*/
inline void buffer_register(const void* devPtr_base,
std::size_t size,
int flags = 0,
const std::vector<int>& errors_to_ignore = std::vector<int>())
{
if (defaults::is_compat_mode_preferred()) { return; }
CUfileError_t status = cuFileAPI::instance().BufRegister(devPtr_base, size, flags);
if (status.err != CU_FILE_SUCCESS) {
// Check if `status.err` is in `errors_to_ignore`
if (std::find(errors_to_ignore.begin(), errors_to_ignore.end(), status.err) ==
errors_to_ignore.end()) {
CUFILE_TRY(status);
}
}
}
void buffer_register(const void* devPtr_base,
std::size_t size,
int flags = 0,
const std::vector<int>& errors_to_ignore = std::vector<int>());

/**
* @brief deregister an already registered device memory from cuFile
*
* @param devPtr_base device pointer to deregister
*/
inline void buffer_deregister(const void* devPtr_base)
{
if (defaults::is_compat_mode_preferred()) { return; }
CUFILE_TRY(cuFileAPI::instance().BufDeregister(devPtr_base));
}
void buffer_deregister(const void* devPtr_base);

/**
* @brief Register device memory allocation which is part of devPtr. Use this
Expand All @@ -85,23 +61,15 @@ inline void buffer_deregister(const void* devPtr_base)
* @warning This API is intended for usecases where the memory is used as
* streaming buffer that is reused across multiple cuFile IO operations.
*/
inline void memory_register(const void* devPtr,
int flags = 0,
const std::vector<int>& errors_to_ignore = {})
{
auto [base, nbytes, offset] = get_alloc_info(devPtr);
buffer_register(base, nbytes, flags, errors_to_ignore);
}
void memory_register(const void* devPtr,
int flags = 0,
const std::vector<int>& errors_to_ignore = {});

/**
* @brief deregister an already registered device memory from cuFile.
*
* @param devPtr device pointer to deregister
*/
inline void memory_deregister(const void* devPtr)
{
auto [base, nbytes, offset] = get_alloc_info(devPtr);
buffer_deregister(base);
}
void memory_deregister(const void* devPtr);

} // namespace kvikio
Loading
Loading