diff --git a/cpp/examples/basic_io.cpp b/cpp/examples/basic_io.cpp index ae82632779..abc5fc5110 100644 --- a/cpp/examples/basic_io.cpp +++ b/cpp/examples/basic_io.cpp @@ -230,4 +230,25 @@ int main() cout << "File async read: " << *bytes_done_p << endl; check(cudaFreeHost((void*)bytes_done_p) == cudaSuccess); } + { + cout << "Performing async I/O using by-value arguments" << endl; + + // Let's create a new stream and submit an async write + CUstream stream{}; + check(cudaStreamCreate(&stream) == cudaSuccess); + kvikio::FileHandle f_handle("/tmp/test-file", "w+"); + check(cudaMemcpyAsync(a_dev, a, SIZE, cudaMemcpyHostToDevice, stream) == cudaSuccess); + + // Notice, we get a handle `res`, which will synchronize the CUDA stream on destruction + kvikio::StreamFuture res = f_handle.write_async(a_dev, SIZE, 0, 0, stream); + // But we can also trigger the synchronization and get the bytes written by calling + // `check_bytes_done()`. + check(res.check_bytes_done() == SIZE); + cout << "File async write: " << res.check_bytes_done() << endl; + + // Let's async read the data back into device memory + res = f_handle.read_async(c_dev, SIZE, 0, 0, stream); + check(res.check_bytes_done() == SIZE); + cout << "File async read: " << res.check_bytes_done() << endl; + } } diff --git a/cpp/include/kvikio/file_handle.hpp b/cpp/include/kvikio/file_handle.hpp index 4aaa719f9c..9cd74b2319 100644 --- a/cpp/include/kvikio/file_handle.hpp +++ b/cpp/include/kvikio/file_handle.hpp @@ -34,6 +34,7 @@ #include #include #include +#include #include namespace kvikio { @@ -551,6 +552,44 @@ class FileHandle { static_cast(read(devPtr_base, *size_p, *file_offset_p, *devPtr_offset_p)); } + /** + * @brief Reads specified bytes from the file into the device memory asynchronously. + * + * This is an asynchronous version of `.read()`, which will be executed in sequence + * for the specified stream. + * + * When running CUDA v12.1 or older, this function falls back to use `.read()` after + * `stream` has been synchronized. + * + * The arguments have the same meaning as in `.read()` but returns a `StreamFuture` object + * that the caller must keep alive until all data has been read from disk. One way to do this, + * is by calling `StreamFuture.check_bytes_done()`, which will synchronize the associated stream + * and return the number of bytes read. + * + * @param devPtr_base Base address of buffer in device memory. For registered buffers, + * `devPtr_base` must remain set to the base address used in the `buffer_register` call. + * @param size Size in bytes to read. + * @param file_offset Offset in the file to read from. + * @param devPtr_offset Offset relative to the `devPtr_base` pointer to read into. + * This parameter should be used only with registered buffers. + * @param stream CUDA stream in which to enqueue the operation. If NULL, make this operation + * synchronous. + * @return A future object that must be kept alive until all data has been read to disk e.g. + * by synchronizing `stream`. + */ + [[nodiscard]] StreamFuture read_async(void* devPtr_base, + std::size_t size, + off_t file_offset = 0, + off_t devPtr_offset = 0, + CUstream stream = nullptr) + { + StreamFuture ret(devPtr_base, size, file_offset, devPtr_offset, stream); + auto [devPtr_base_, size_p, file_offset_p, devPtr_offset_p, bytes_read_p, stream_] = + ret.get_args(); + read_async(devPtr_base_, size_p, file_offset_p, devPtr_offset_p, bytes_read_p, stream_); + return ret; + } + /** * @brief Writes specified bytes from the device memory into the file asynchronously. * @@ -606,6 +645,44 @@ class FileHandle { static_cast(write(devPtr_base, *size_p, *file_offset_p, *devPtr_offset_p)); } + /** + * @brief Writes specified bytes from the device memory into the file asynchronously. + * + * This is an asynchronous version of `.write()`, which will be executed in sequence + * for the specified stream. + * + * When running CUDA v12.1 or older, this function falls back to use `.read()` after + * `stream` has been synchronized. + * + * The arguments have the same meaning as in `.write()` but returns a `StreamFuture` object + * that the caller must keep alive until all data has been written to disk. One way to do this, + * is by calling `StreamFuture.check_bytes_done()`, which will synchronize the associated stream + * and return the number of bytes written. + * + * @param devPtr_base Base address of buffer in device memory. For registered buffers, + * `devPtr_base` must remain set to the base address used in the `buffer_register` call. + * @param size Size in bytes to write. + * @param file_offset Offset in the file to write from. + * @param devPtr_offset Offset relative to the `devPtr_base` pointer to write from. + * This parameter should be used only with registered buffers. + * @param stream CUDA stream in which to enqueue the operation. If NULL, make this operation + * synchronous. + * @return A future object that must be kept alive until all data has been written to disk e.g. + * by synchronizing `stream`. + */ + [[nodiscard]] StreamFuture write_async(void* devPtr_base, + std::size_t size, + off_t file_offset = 0, + off_t devPtr_offset = 0, + CUstream stream = nullptr) + { + StreamFuture ret(devPtr_base, size, file_offset, devPtr_offset, stream); + auto [devPtr_base_, size_p, file_offset_p, devPtr_offset_p, bytes_written_p, stream_] = + ret.get_args(); + write_async(devPtr_base_, size_p, file_offset_p, devPtr_offset_p, bytes_written_p, stream_); + return ret; + } + /** * @brief Returns `true` if the compatibility mode has been enabled for this file. * diff --git a/cpp/include/kvikio/stream.hpp b/cpp/include/kvikio/stream.hpp new file mode 100644 index 0000000000..6b573fcb06 --- /dev/null +++ b/cpp/include/kvikio/stream.hpp @@ -0,0 +1,158 @@ +/* + * Copyright (c) 2023, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +namespace kvikio { + +/** + * @brief Future of an asynchronous IO operation + * + * This class shouldn't be used directly, instead some stream operations such as + * `FileHandle.read_async` and `FileHandle.write_async` returns an instance of this class. Use + * `.check_bytes_done()` to synchronize the associated CUDA stream and return the number of bytes + * read or written by the operation. + * + * The goal of this class is twofold: + * - Have `read_async` and `write_async` return an object that clearly associates the function + * arguments with the CUDA stream used. This is useful because the current validity of the + * arguments depends on the stream. + * - Support of by-value arguments. In many cases, a user will use `read_async` and `write_async` + * like most other asynchronous CUDA functions that take by-value arguments. + * + * To support by-value arguments, we allocate the arguments on the heap (malloc `ArgByVal`) and have + * the by-reference arguments points into `ArgByVal`. This way, the `read_async` and `write_async` + * can call `.get_args()` to get the by-reference arguments required by cuFile's stream API. + */ +class StreamFuture { + private: + struct ArgByVal { + std::size_t size; + off_t file_offset; + off_t devPtr_offset; + ssize_t bytes_done; + }; + + void* _devPtr_base{nullptr}; + CUstream _stream{nullptr}; + ArgByVal* _val{nullptr}; + bool _stream_synchronized{false}; + + public: + StreamFuture() noexcept = default; + + StreamFuture( + void* devPtr_base, std::size_t size, off_t file_offset, off_t devPtr_offset, CUstream stream) + : _devPtr_base{devPtr_base}, _stream{stream} + { + // Notice, we allocate the arguments using malloc() as specified in the cuFile docs: + // + if ((_val = static_cast(std::malloc(sizeof(ArgByVal)))) == nullptr) { + throw std::bad_alloc{}; + } + *_val = { + .size = size, .file_offset = file_offset, .devPtr_offset = devPtr_offset, .bytes_done = 0}; + } + + /** + * @brief StreamFuture support move semantic but isn't copyable + */ + StreamFuture(const StreamFuture&) = delete; + StreamFuture& operator=(StreamFuture& o) = delete; + StreamFuture(StreamFuture&& o) noexcept + : _devPtr_base{std::exchange(o._devPtr_base, nullptr)}, + _stream{std::exchange(o._stream, nullptr)}, + _val{std::exchange(o._val, nullptr)}, + _stream_synchronized{o._stream_synchronized} + { + } + StreamFuture& operator=(StreamFuture&& o) noexcept + { + _devPtr_base = std::exchange(o._devPtr_base, nullptr); + _stream = std::exchange(o._stream, nullptr); + _val = std::exchange(o._val, nullptr); + _stream_synchronized = o._stream_synchronized; + return *this; + } + + /** + * @brief Return the arguments of the future call + * + * @return Tuple of the arguments in the order matching `FileHandle.read()` and + * `FileHandle.write()` + */ + std::tuple get_args() const + { + if (_val == nullptr) { + throw kvikio::CUfileException("cannot get arguments from an uninitialized StreamFuture"); + } + return {_devPtr_base, + &_val->size, + &_val->file_offset, + &_val->devPtr_offset, + &_val->bytes_done, + _stream}; + } + + /** + * @brief Return the number of bytes read or written by the future operation. + * + * Synchronize the associated CUDA stream. + * + * @return Number of bytes read or written by the future operation. + */ + std::size_t check_bytes_done() + { + if (_val == nullptr) { + throw kvikio::CUfileException("cannot check bytes done on an uninitialized StreamFuture"); + } + + if (!_stream_synchronized) { + _stream_synchronized = true; + CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(_stream)); + } + + CUFILE_CHECK_STREAM_IO(&_val->bytes_done); + // At this point, we know `*_val->bytes_done` is a positive value otherwise + // CUFILE_CHECK_STREAM_IO() would have raised an exception. + return static_cast(_val->bytes_done); + } + + /** + * @brief Free the by-value arguments and make sure the associated CUDA stream has been + * synchronized. + */ + ~StreamFuture() noexcept + { + if (_val != nullptr) { + try { + check_bytes_done(); + } catch (const kvikio::CUfileException& e) { + std::cerr << e.what() << std::endl; + } + std::free(_val); + } + } +}; + +} // namespace kvikio