Skip to content

Commit

Permalink
Async I/O using by-value arguments (#275)
Browse files Browse the repository at this point in the history
Implements a version of `read_async` and `write_async` that takes by-value arguments and returns a `StreamFuture`. 

This is to make the common case easy where the user knows the file and buffer size.

cc. @tell-rebanta

Authors:
  - Mads R. B. Kristensen (https://github.com/madsbk)

Approvers:
  - https://github.com/tell-rebanta
  - Lawrence Mitchell (https://github.com/wence-)

URL: #275
  • Loading branch information
madsbk authored Sep 21, 2023
1 parent 6d5bf9d commit 71c2858
Show file tree
Hide file tree
Showing 3 changed files with 256 additions and 0 deletions.
21 changes: 21 additions & 0 deletions cpp/examples/basic_io.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -230,4 +230,25 @@ int main()
cout << "File async read: " << *bytes_done_p << endl;
check(cudaFreeHost((void*)bytes_done_p) == cudaSuccess);
}
{
cout << "Performing async I/O using by-value arguments" << endl;

// Let's create a new stream and submit an async write
CUstream stream{};
check(cudaStreamCreate(&stream) == cudaSuccess);
kvikio::FileHandle f_handle("/tmp/test-file", "w+");
check(cudaMemcpyAsync(a_dev, a, SIZE, cudaMemcpyHostToDevice, stream) == cudaSuccess);

// Notice, we get a handle `res`, which will synchronize the CUDA stream on destruction
kvikio::StreamFuture res = f_handle.write_async(a_dev, SIZE, 0, 0, stream);
// But we can also trigger the synchronization and get the bytes written by calling
// `check_bytes_done()`.
check(res.check_bytes_done() == SIZE);
cout << "File async write: " << res.check_bytes_done() << endl;

// Let's async read the data back into device memory
res = f_handle.read_async(c_dev, SIZE, 0, 0, stream);
check(res.check_bytes_done() == SIZE);
cout << "File async read: " << res.check_bytes_done() << endl;
}
}
77 changes: 77 additions & 0 deletions cpp/include/kvikio/file_handle.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include <kvikio/parallel_operation.hpp>
#include <kvikio/posix_io.hpp>
#include <kvikio/shim/cufile.hpp>
#include <kvikio/stream.hpp>
#include <kvikio/utils.hpp>

namespace kvikio {
Expand Down Expand Up @@ -551,6 +552,44 @@ class FileHandle {
static_cast<ssize_t>(read(devPtr_base, *size_p, *file_offset_p, *devPtr_offset_p));
}

/**
* @brief Reads specified bytes from the file into the device memory asynchronously.
*
* This is an asynchronous version of `.read()`, which will be executed in sequence
* for the specified stream.
*
* When running CUDA v12.1 or older, this function falls back to use `.read()` after
* `stream` has been synchronized.
*
* The arguments have the same meaning as in `.read()` but returns a `StreamFuture` object
* that the caller must keep alive until all data has been read from disk. One way to do this,
* is by calling `StreamFuture.check_bytes_done()`, which will synchronize the associated stream
* and return the number of bytes read.
*
* @param devPtr_base Base address of buffer in device memory. For registered buffers,
* `devPtr_base` must remain set to the base address used in the `buffer_register` call.
* @param size Size in bytes to read.
* @param file_offset Offset in the file to read from.
* @param devPtr_offset Offset relative to the `devPtr_base` pointer to read into.
* This parameter should be used only with registered buffers.
* @param stream CUDA stream in which to enqueue the operation. If NULL, make this operation
* synchronous.
* @return A future object that must be kept alive until all data has been read to disk e.g.
* by synchronizing `stream`.
*/
[[nodiscard]] StreamFuture read_async(void* devPtr_base,
std::size_t size,
off_t file_offset = 0,
off_t devPtr_offset = 0,
CUstream stream = nullptr)
{
StreamFuture ret(devPtr_base, size, file_offset, devPtr_offset, stream);
auto [devPtr_base_, size_p, file_offset_p, devPtr_offset_p, bytes_read_p, stream_] =
ret.get_args();
read_async(devPtr_base_, size_p, file_offset_p, devPtr_offset_p, bytes_read_p, stream_);
return ret;
}

/**
* @brief Writes specified bytes from the device memory into the file asynchronously.
*
Expand Down Expand Up @@ -606,6 +645,44 @@ class FileHandle {
static_cast<ssize_t>(write(devPtr_base, *size_p, *file_offset_p, *devPtr_offset_p));
}

/**
* @brief Writes specified bytes from the device memory into the file asynchronously.
*
* This is an asynchronous version of `.write()`, which will be executed in sequence
* for the specified stream.
*
* When running CUDA v12.1 or older, this function falls back to use `.read()` after
* `stream` has been synchronized.
*
* The arguments have the same meaning as in `.write()` but returns a `StreamFuture` object
* that the caller must keep alive until all data has been written to disk. One way to do this,
* is by calling `StreamFuture.check_bytes_done()`, which will synchronize the associated stream
* and return the number of bytes written.
*
* @param devPtr_base Base address of buffer in device memory. For registered buffers,
* `devPtr_base` must remain set to the base address used in the `buffer_register` call.
* @param size Size in bytes to write.
* @param file_offset Offset in the file to write from.
* @param devPtr_offset Offset relative to the `devPtr_base` pointer to write from.
* This parameter should be used only with registered buffers.
* @param stream CUDA stream in which to enqueue the operation. If NULL, make this operation
* synchronous.
* @return A future object that must be kept alive until all data has been written to disk e.g.
* by synchronizing `stream`.
*/
[[nodiscard]] StreamFuture write_async(void* devPtr_base,
std::size_t size,
off_t file_offset = 0,
off_t devPtr_offset = 0,
CUstream stream = nullptr)
{
StreamFuture ret(devPtr_base, size, file_offset, devPtr_offset, stream);
auto [devPtr_base_, size_p, file_offset_p, devPtr_offset_p, bytes_written_p, stream_] =
ret.get_args();
write_async(devPtr_base_, size_p, file_offset_p, devPtr_offset_p, bytes_written_p, stream_);
return ret;
}

/**
* @brief Returns `true` if the compatibility mode has been enabled for this file.
*
Expand Down
158 changes: 158 additions & 0 deletions cpp/include/kvikio/stream.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once

#include <sys/types.h>
#include <algorithm>
#include <cstdlib>
#include <kvikio/error.hpp>
#include <kvikio/shim/cuda.hpp>
#include <kvikio/shim/cufile.hpp>
#include <tuple>

namespace kvikio {

/**
* @brief Future of an asynchronous IO operation
*
* This class shouldn't be used directly, instead some stream operations such as
* `FileHandle.read_async` and `FileHandle.write_async` returns an instance of this class. Use
* `.check_bytes_done()` to synchronize the associated CUDA stream and return the number of bytes
* read or written by the operation.
*
* The goal of this class is twofold:
* - Have `read_async` and `write_async` return an object that clearly associates the function
* arguments with the CUDA stream used. This is useful because the current validity of the
* arguments depends on the stream.
* - Support of by-value arguments. In many cases, a user will use `read_async` and `write_async`
* like most other asynchronous CUDA functions that take by-value arguments.
*
* To support by-value arguments, we allocate the arguments on the heap (malloc `ArgByVal`) and have
* the by-reference arguments points into `ArgByVal`. This way, the `read_async` and `write_async`
* can call `.get_args()` to get the by-reference arguments required by cuFile's stream API.
*/
class StreamFuture {
private:
struct ArgByVal {
std::size_t size;
off_t file_offset;
off_t devPtr_offset;
ssize_t bytes_done;
};

void* _devPtr_base{nullptr};
CUstream _stream{nullptr};
ArgByVal* _val{nullptr};
bool _stream_synchronized{false};

public:
StreamFuture() noexcept = default;

StreamFuture(
void* devPtr_base, std::size_t size, off_t file_offset, off_t devPtr_offset, CUstream stream)
: _devPtr_base{devPtr_base}, _stream{stream}
{
// Notice, we allocate the arguments using malloc() as specified in the cuFile docs:
// <https://docs.nvidia.com/gpudirect-storage/api-reference-guide/index.html#cufilewriteasync>
if ((_val = static_cast<ArgByVal*>(std::malloc(sizeof(ArgByVal)))) == nullptr) {
throw std::bad_alloc{};
}
*_val = {
.size = size, .file_offset = file_offset, .devPtr_offset = devPtr_offset, .bytes_done = 0};
}

/**
* @brief StreamFuture support move semantic but isn't copyable
*/
StreamFuture(const StreamFuture&) = delete;
StreamFuture& operator=(StreamFuture& o) = delete;
StreamFuture(StreamFuture&& o) noexcept
: _devPtr_base{std::exchange(o._devPtr_base, nullptr)},
_stream{std::exchange(o._stream, nullptr)},
_val{std::exchange(o._val, nullptr)},
_stream_synchronized{o._stream_synchronized}
{
}
StreamFuture& operator=(StreamFuture&& o) noexcept
{
_devPtr_base = std::exchange(o._devPtr_base, nullptr);
_stream = std::exchange(o._stream, nullptr);
_val = std::exchange(o._val, nullptr);
_stream_synchronized = o._stream_synchronized;
return *this;
}

/**
* @brief Return the arguments of the future call
*
* @return Tuple of the arguments in the order matching `FileHandle.read()` and
* `FileHandle.write()`
*/
std::tuple<void*, std::size_t*, off_t*, off_t*, ssize_t*, CUstream> get_args() const
{
if (_val == nullptr) {
throw kvikio::CUfileException("cannot get arguments from an uninitialized StreamFuture");
}
return {_devPtr_base,
&_val->size,
&_val->file_offset,
&_val->devPtr_offset,
&_val->bytes_done,
_stream};
}

/**
* @brief Return the number of bytes read or written by the future operation.
*
* Synchronize the associated CUDA stream.
*
* @return Number of bytes read or written by the future operation.
*/
std::size_t check_bytes_done()
{
if (_val == nullptr) {
throw kvikio::CUfileException("cannot check bytes done on an uninitialized StreamFuture");
}

if (!_stream_synchronized) {
_stream_synchronized = true;
CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(_stream));
}

CUFILE_CHECK_STREAM_IO(&_val->bytes_done);
// At this point, we know `*_val->bytes_done` is a positive value otherwise
// CUFILE_CHECK_STREAM_IO() would have raised an exception.
return static_cast<std::size_t>(_val->bytes_done);
}

/**
* @brief Free the by-value arguments and make sure the associated CUDA stream has been
* synchronized.
*/
~StreamFuture() noexcept
{
if (_val != nullptr) {
try {
check_bytes_done();
} catch (const kvikio::CUfileException& e) {
std::cerr << e.what() << std::endl;
}
std::free(_val);
}
}
};

} // namespace kvikio

0 comments on commit 71c2858

Please sign in to comment.