Skip to content

Commit

Permalink
Properly handle the mapped and registered regions in `memory_mapped_s…
Browse files Browse the repository at this point in the history
…ource` (#16865)

Depends on #16826

Set of fixes that improve robustness on the non-GDS file input:

1. Avoid registering beyond the byte range - addresses problems when reading adjacent byte ranges from multiple threads (GH only).
2. Allow reading data outside of the memory mapped region. This prevents issues with very long rows in CSV or JSON input.
3. Copy host data when the range being read is only partially registered. This avoids errors when trying to copy the host data range to the device (GH only).

Modifies the datasource class hierarchy to avoid reuse of direct file `host_read`s

Authors:
  - Vukasin Milovanovic (https://github.com/vuule)

Approvers:
  - Basit Ayantunde (https://github.com/lamarrr)
  - Mads R. B. Kristensen (https://github.com/madsbk)
  - Bradley Dice (https://github.com/bdice)

URL: #16865
  • Loading branch information
vuule authored Oct 4, 2024
1 parent a784321 commit 39342b8
Show file tree
Hide file tree
Showing 4 changed files with 158 additions and 70 deletions.
22 changes: 18 additions & 4 deletions cpp/include/cudf/io/datasource.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,14 +86,28 @@ class datasource {
/**
* @brief Creates a source from a file path.
*
* @note Parameters `offset`, `max_size_estimate` and `min_size_estimate` are hints to the
* `datasource` implementation about the expected range of the data that will be read. The
* implementation may use these hints to optimize the read operation. These parameters are usually
* based on the byte range option. In this case, `min_size_estimate` should be no greater than the
* byte range to avoid potential issues when reading adjacent ranges. `max_size_estimate` can
* include padding after the byte range, to include additional data that may be needed for
* processing.
*
@throws cudf::logic_error if the minimum size estimate is greater than the maximum size estimate
*
* @param[in] filepath Path to the file to use
* @param[in] offset Bytes from the start of the file (the default is zero)
* @param[in] size Bytes from the offset; use zero for entire file (the default is zero)
* @param[in] offset Starting byte offset from which data will be read (the default is zero)
* @param[in] max_size_estimate Upper estimate of the data range that will be read (the default is
* zero, which means the whole file after `offset`)
* @param[in] min_size_estimate Lower estimate of the data range that will be read (the default is
* zero, which means the whole file after `offset`)
* @return Constructed datasource object
*/
static std::unique_ptr<datasource> create(std::string const& filepath,
size_t offset = 0,
size_t size = 0);
size_t offset = 0,
size_t max_size_estimate = 0,
size_t min_size_estimate = 0);

/**
* @brief Creates a source from a host memory buffer.
Expand Down
14 changes: 9 additions & 5 deletions cpp/src/io/functions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,14 +122,16 @@ chunked_parquet_writer_options_builder chunked_parquet_writer_options::builder(
namespace {

std::vector<std::unique_ptr<cudf::io::datasource>> make_datasources(source_info const& info,
size_t range_offset = 0,
size_t range_size = 0)
size_t offset = 0,
size_t max_size_estimate = 0,
size_t min_size_estimate = 0)
{
switch (info.type()) {
case io_type::FILEPATH: {
auto sources = std::vector<std::unique_ptr<cudf::io::datasource>>();
for (auto const& filepath : info.filepaths()) {
sources.emplace_back(cudf::io::datasource::create(filepath, range_offset, range_size));
sources.emplace_back(
cudf::io::datasource::create(filepath, offset, max_size_estimate, min_size_estimate));
}
return sources;
}
Expand Down Expand Up @@ -211,7 +213,8 @@ table_with_metadata read_json(json_reader_options options,

auto datasources = make_datasources(options.get_source(),
options.get_byte_range_offset(),
options.get_byte_range_size_with_padding());
options.get_byte_range_size_with_padding(),
options.get_byte_range_size());

return json::detail::read_json(datasources, options, stream, mr);
}
Expand All @@ -238,7 +241,8 @@ table_with_metadata read_csv(csv_reader_options options,

auto datasources = make_datasources(options.get_source(),
options.get_byte_range_offset(),
options.get_byte_range_size_with_padding());
options.get_byte_range_size_with_padding(),
options.get_byte_range_size());

CUDF_EXPECTS(datasources.size() == 1, "Only a single source is currently supported.");

Expand Down
157 changes: 96 additions & 61 deletions cpp/src/io/utilities/datasource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include <unistd.h>

#include <unordered_map>
#include <vector>

namespace cudf {
namespace io {
Expand All @@ -54,6 +55,30 @@ class file_source : public datasource {
}
}

std::unique_ptr<buffer> host_read(size_t offset, size_t size) override
{
lseek(_file.desc(), offset, SEEK_SET);

// Clamp length to available data
ssize_t const read_size = std::min(size, _file.size() - offset);

std::vector<uint8_t> v(read_size);
CUDF_EXPECTS(read(_file.desc(), v.data(), read_size) == read_size, "read failed");
return buffer::create(std::move(v));
}

size_t host_read(size_t offset, size_t size, uint8_t* dst) override
{
lseek(_file.desc(), offset, SEEK_SET);

// Clamp length to available data
auto const read_size = std::min(size, _file.size() - offset);

CUDF_EXPECTS(read(_file.desc(), dst, read_size) == static_cast<ssize_t>(read_size),
"read failed");
return read_size;
}

~file_source() override = default;

[[nodiscard]] bool supports_device_read() const override
Expand Down Expand Up @@ -138,40 +163,63 @@ class file_source : public datasource {
*/
class memory_mapped_source : public file_source {
public:
explicit memory_mapped_source(char const* filepath, size_t offset, size_t size)
explicit memory_mapped_source(char const* filepath,
size_t offset,
size_t max_size_estimate,
size_t min_size_estimate)
: file_source(filepath)
{
if (_file.size() != 0) {
map(_file.desc(), offset, size);
register_mmap_buffer();
// Memory mapping is not exclusive, so we can include the whole region we expect to read
map(_file.desc(), offset, max_size_estimate);
// Buffer registration is exclusive (can't overlap with other registered buffers) so we
// register the lower estimate; this avoids issues when reading adjacent ranges from the same
// file from multiple threads
register_mmap_buffer(offset, min_size_estimate);
}
}

~memory_mapped_source() override
{
if (_map_addr != nullptr) {
munmap(_map_addr, _map_size);
unmap();
unregister_mmap_buffer();
}
}

std::unique_ptr<buffer> host_read(size_t offset, size_t size) override
{
CUDF_EXPECTS(offset >= _map_offset, "Requested offset is outside mapping");
// Clamp length to available data
auto const read_size = std::min(size, +_file.size() - offset);

// If the requested range is outside of the mapped region, read from the file
if (offset < _map_offset or offset + read_size > (_map_offset + _map_size)) {
return file_source::host_read(offset, read_size);
}

// Clamp length to available data in the mapped region
auto const read_size = std::min(size, _map_size - (offset - _map_offset));
// If the requested range is only partially within the registered region, copy to a new
// host buffer to make the data safe to copy to the device
if (_reg_addr != nullptr and
(offset < _reg_offset or offset + read_size > (_reg_offset + _reg_size))) {
auto const src = static_cast<uint8_t*>(_map_addr) + (offset - _map_offset);

return std::make_unique<owning_buffer<std::vector<uint8_t>>>(
std::vector<uint8_t>(src, src + read_size));
}

return std::make_unique<non_owning_buffer>(
static_cast<uint8_t*>(_map_addr) + (offset - _map_offset), read_size);
static_cast<uint8_t*>(_map_addr) + offset - _map_offset, read_size);
}

size_t host_read(size_t offset, size_t size, uint8_t* dst) override
{
CUDF_EXPECTS(offset >= _map_offset, "Requested offset is outside mapping");
// Clamp length to available data
auto const read_size = std::min(size, +_file.size() - offset);

// Clamp length to available data in the mapped region
auto const read_size = std::min(size, _map_size - (offset - _map_offset));
// If the requested range is outside of the mapped region, read from the file
if (offset < _map_offset or offset + read_size > (_map_offset + _map_size)) {
return file_source::host_read(offset, read_size, dst);
}

auto const src = static_cast<uint8_t*>(_map_addr) + (offset - _map_offset);
std::memcpy(dst, src, read_size);
Expand All @@ -184,16 +232,18 @@ class memory_mapped_source : public file_source {
*
* Fixes nvbugs/4215160
*/
void register_mmap_buffer()
void register_mmap_buffer(size_t offset, size_t size)
{
if (_map_addr == nullptr or _map_size == 0 or not pageableMemoryAccessUsesHostPageTables()) {
return;
}
if (_map_addr == nullptr or not pageableMemoryAccessUsesHostPageTables()) { return; }

auto const result = cudaHostRegister(_map_addr, _map_size, cudaHostRegisterDefault);
if (result == cudaSuccess) {
_is_map_registered = true;
} else {
// Registered region must be within the mapped region
_reg_offset = std::max(offset, _map_offset);
_reg_size = std::min(size != 0 ? size : _map_size, (_map_offset + _map_size) - _reg_offset);

_reg_addr = static_cast<std::byte*>(_map_addr) - _map_offset + _reg_offset;
auto const result = cudaHostRegister(_reg_addr, _reg_size, cudaHostRegisterReadOnly);
if (result != cudaSuccess) {
_reg_addr = nullptr;
CUDF_LOG_WARN("cudaHostRegister failed with {} ({})",
static_cast<int>(result),
cudaGetErrorString(result));
Expand All @@ -205,10 +255,12 @@ class memory_mapped_source : public file_source {
*/
void unregister_mmap_buffer()
{
if (not _is_map_registered) { return; }
if (_reg_addr == nullptr) { return; }

auto const result = cudaHostUnregister(_map_addr);
if (result != cudaSuccess) {
auto const result = cudaHostUnregister(_reg_addr);
if (result == cudaSuccess) {
_reg_addr = nullptr;
} else {
CUDF_LOG_WARN("cudaHostUnregister failed with {} ({})",
static_cast<int>(result),
cudaGetErrorString(result));
Expand All @@ -226,52 +278,30 @@ class memory_mapped_source : public file_source {

// Size for `mmap()` needs to include the page padding
_map_size = size + (offset - _map_offset);
if (_map_size == 0) { return; }

// Check if accessing a region within already mapped area
_map_addr = mmap(nullptr, _map_size, PROT_READ, MAP_PRIVATE, fd, _map_offset);
CUDF_EXPECTS(_map_addr != MAP_FAILED, "Cannot create memory mapping");
}

private:
size_t _map_size = 0;
size_t _map_offset = 0;
void* _map_addr = nullptr;
bool _is_map_registered = false;
};

/**
* @brief Implementation class for reading from a file using `read` calls
*
* Potentially faster than `memory_mapped_source` when only a small portion of the file is read
* through the host.
*/
class direct_read_source : public file_source {
public:
explicit direct_read_source(char const* filepath) : file_source(filepath) {}

std::unique_ptr<buffer> host_read(size_t offset, size_t size) override
void unmap()
{
lseek(_file.desc(), offset, SEEK_SET);

// Clamp length to available data
ssize_t const read_size = std::min(size, _file.size() - offset);

std::vector<uint8_t> v(read_size);
CUDF_EXPECTS(read(_file.desc(), v.data(), read_size) == read_size, "read failed");
return buffer::create(std::move(v));
if (_map_addr != nullptr) {
auto const result = munmap(_map_addr, _map_size);
if (result != 0) { CUDF_LOG_WARN("munmap failed with {}", result); }
_map_addr = nullptr;
}
}

size_t host_read(size_t offset, size_t size, uint8_t* dst) override
{
lseek(_file.desc(), offset, SEEK_SET);

// Clamp length to available data
auto const read_size = std::min(size, _file.size() - offset);
private:
size_t _map_offset = 0;
size_t _map_size = 0;
void* _map_addr = nullptr;

CUDF_EXPECTS(read(_file.desc(), dst, read_size) == static_cast<ssize_t>(read_size),
"read failed");
return read_size;
}
size_t _reg_offset = 0;
size_t _reg_size = 0;
void* _reg_addr = nullptr;
};

/**
Expand Down Expand Up @@ -431,16 +461,21 @@ class user_datasource_wrapper : public datasource {

std::unique_ptr<datasource> datasource::create(std::string const& filepath,
size_t offset,
size_t size)
size_t max_size_estimate,
size_t min_size_estimate)
{
CUDF_EXPECTS(max_size_estimate == 0 or min_size_estimate <= max_size_estimate,
"Invalid min/max size estimates for datasource creation");

#ifdef CUFILE_FOUND
if (cufile_integration::is_always_enabled()) {
// avoid mmap as GDS is expected to be used for most reads
return std::make_unique<direct_read_source>(filepath.c_str());
return std::make_unique<file_source>(filepath.c_str());
}
#endif
// Use our own memory mapping implementation for direct file reads
return std::make_unique<memory_mapped_source>(filepath.c_str(), offset, size);
return std::make_unique<memory_mapped_source>(
filepath.c_str(), offset, max_size_estimate, min_size_estimate);
}

std::unique_ptr<datasource> datasource::create(host_buffer const& buffer)
Expand Down
35 changes: 35 additions & 0 deletions cpp/tests/io/csv_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2516,4 +2516,39 @@ TEST_F(CsvReaderTest, UTF8BOM)
CUDF_TEST_EXPECT_TABLES_EQUIVALENT(result_view, expected);
}

void expect_buffers_equal(cudf::io::datasource::buffer* lhs, cudf::io::datasource::buffer* rhs)
{
ASSERT_EQ(lhs->size(), rhs->size());
EXPECT_EQ(0, std::memcmp(lhs->data(), rhs->data(), lhs->size()));
}

TEST_F(CsvReaderTest, OutOfMapBoundsReads)
{
// write a lot of data into a file
auto filepath = temp_env->get_temp_dir() + "OutOfMapBoundsReads.csv";
auto const num_rows = 1 << 20;
auto const row = std::string{"0,1,2,3,4,5,6,7,8,9\n"};
auto const file_size = num_rows * row.size();
{
std::ofstream outfile(filepath, std::ofstream::out);
for (size_t i = 0; i < num_rows; ++i) {
outfile << row;
}
}

// Only memory map the middle of the file
auto source = cudf::io::datasource::create(filepath, file_size / 2, file_size / 4);
auto full_source = cudf::io::datasource::create(filepath);
auto const all_data = source->host_read(0, file_size);
auto ref_data = full_source->host_read(0, file_size);
expect_buffers_equal(ref_data.get(), all_data.get());

auto const start_data = source->host_read(file_size / 2, file_size / 2);
expect_buffers_equal(full_source->host_read(file_size / 2, file_size / 2).get(),
start_data.get());

auto const end_data = source->host_read(0, file_size / 2 + 512);
expect_buffers_equal(full_source->host_read(0, file_size / 2 + 512).get(), end_data.get());
}

CUDF_TEST_PROGRAM_MAIN()

0 comments on commit 39342b8

Please sign in to comment.