-
Notifications
You must be signed in to change notification settings - Fork 903
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Properly handle the mapped and registered regions in memory_mapped_source
#16865
Merged
Merged
Changes from 20 commits
Commits
Show all changes
27 commits
Select commit
Hold shift + click to select a range
e3fd5bb
don't register the padding
vuule 47a494e
Merge branch 'branch-24.10' of https://github.com/rapidsai/cudf into …
vuule cc4ab53
works
vuule 879d450
mild clean up
vuule b7b4935
bit more clean up
vuule 63abe6a
well.. there it is
vuule 9a26102
well.. there it is
vuule a1f487d
Merge branch 'rework-read_csv-ingest' of https://github.com/vuule/cud…
vuule f5e5bae
Merge branch 'branch-24.10' into rework-read_csv-ingest
vuule cea8b7f
Merge branch 'impr-buffer_register-exact-range' into bug-fix-mmaped-s…
vuule 315119f
Merge branch 'branch-24.10' of https://github.com/rapidsai/cudf into …
vuule 42d560a
Merge branch 'branch-24.12' into rework-read_csv-ingest
vuule 8adbbf4
reorg file sources
vuule 3b38774
done
vuule ad93cec
Merge branch 'rework-read_csv-ingest' of https://github.com/vuule/cud…
vuule 0b33a4c
Merge branch 'branch-24.10' of https://github.com/rapidsai/cudf into …
vuule f490c30
Merge branch 'branch-24.12' of https://github.com/rapidsai/cudf into …
vuule e6d4111
Merge branch 'rework-read_csv-ingest' into bug-fix-mmaped-source
vuule f5ab98a
tests + fix
vuule 2282897
Merge branch 'branch-24.12' into bug-fix-mmaped-source
vuule f8ae1d6
comment fix
vuule a6bc941
docs, max >=min check
vuule 45e1e5e
Merge branch 'bug-fix-mmaped-source' of https://github.com/rapidsai/c…
vuule 1d0ece4
Merge branch 'branch-24.12' into bug-fix-mmaped-source
vuule 808a22c
bit more docs
vuule 7e53ed3
Merge branch 'bug-fix-mmaped-source' of https://github.com/rapidsai/c…
vuule 2d8363a
style
vuule File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -32,6 +32,7 @@ | |
#include <unistd.h> | ||
|
||
#include <unordered_map> | ||
#include <vector> | ||
|
||
namespace cudf { | ||
namespace io { | ||
|
@@ -54,6 +55,30 @@ class file_source : public datasource { | |
} | ||
} | ||
|
||
std::unique_ptr<buffer> host_read(size_t offset, size_t size) override | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. moved the implementation from the derived class because the memory mapped source now performs direct reads when the location is outside of the mapped range. |
||
{ | ||
lseek(_file.desc(), offset, SEEK_SET); | ||
|
||
// Clamp length to available data | ||
ssize_t const read_size = std::min(size, _file.size() - offset); | ||
|
||
std::vector<uint8_t> v(read_size); | ||
CUDF_EXPECTS(read(_file.desc(), v.data(), read_size) == read_size, "read failed"); | ||
return buffer::create(std::move(v)); | ||
} | ||
|
||
size_t host_read(size_t offset, size_t size, uint8_t* dst) override | ||
{ | ||
lseek(_file.desc(), offset, SEEK_SET); | ||
|
||
// Clamp length to available data | ||
auto const read_size = std::min(size, _file.size() - offset); | ||
|
||
CUDF_EXPECTS(read(_file.desc(), dst, read_size) == static_cast<ssize_t>(read_size), | ||
"read failed"); | ||
return read_size; | ||
} | ||
|
||
~file_source() override = default; | ||
|
||
[[nodiscard]] bool supports_device_read() const override | ||
|
@@ -138,40 +163,59 @@ class file_source : public datasource { | |
*/ | ||
class memory_mapped_source : public file_source { | ||
public: | ||
explicit memory_mapped_source(char const* filepath, size_t offset, size_t size) | ||
explicit memory_mapped_source(char const* filepath, | ||
size_t offset, | ||
size_t map_size, | ||
size_t register_size) | ||
: file_source(filepath) | ||
{ | ||
if (_file.size() != 0) { | ||
map(_file.desc(), offset, size); | ||
register_mmap_buffer(); | ||
map(_file.desc(), offset, map_size); | ||
register_mmap_buffer(offset, register_size); | ||
} | ||
} | ||
|
||
~memory_mapped_source() override | ||
{ | ||
if (_map_addr != nullptr) { | ||
munmap(_map_addr, _map_size); | ||
unmap(); | ||
unregister_mmap_buffer(); | ||
} | ||
} | ||
|
||
std::unique_ptr<buffer> host_read(size_t offset, size_t size) override | ||
{ | ||
CUDF_EXPECTS(offset >= _map_offset, "Requested offset is outside mapping"); | ||
// Clamp length to available data | ||
auto const read_size = std::min(size, +_file.size() - offset); | ||
|
||
// If the requested range is outside of the mapped region, read from the file | ||
if (offset < _map_offset or offset + read_size > (_map_offset + _map_size)) { | ||
return file_source::host_read(offset, read_size); | ||
} | ||
|
||
// Clamp length to available data in the mapped region | ||
auto const read_size = std::min(size, _map_size - (offset - _map_offset)); | ||
// If the requested range is only partially within the mapped region, copy to a new | ||
vuule marked this conversation as resolved.
Show resolved
Hide resolved
|
||
// host buffer to make the data safe to copy to the device | ||
if (_reg_addr != nullptr and | ||
(offset < _reg_offset or offset + read_size > (_reg_offset + _reg_size))) { | ||
auto const src = static_cast<uint8_t*>(_map_addr) + (offset - _map_offset); | ||
|
||
return std::make_unique<owning_buffer<std::vector<uint8_t>>>( | ||
std::vector<uint8_t>(src, src + read_size)); | ||
} | ||
|
||
return std::make_unique<non_owning_buffer>( | ||
static_cast<uint8_t*>(_map_addr) + (offset - _map_offset), read_size); | ||
static_cast<uint8_t*>(_map_addr) + offset - _map_offset, read_size); | ||
} | ||
|
||
size_t host_read(size_t offset, size_t size, uint8_t* dst) override | ||
{ | ||
CUDF_EXPECTS(offset >= _map_offset, "Requested offset is outside mapping"); | ||
// Clamp length to available data | ||
auto const read_size = std::min(size, +_file.size() - offset); | ||
|
||
// Clamp length to available data in the mapped region | ||
auto const read_size = std::min(size, _map_size - (offset - _map_offset)); | ||
// If the requested range is outside of the mapped region, read from the file | ||
if (offset < _map_offset or offset + read_size > (_map_offset + _map_size)) { | ||
return file_source::host_read(offset, read_size, dst); | ||
} | ||
|
||
auto const src = static_cast<uint8_t*>(_map_addr) + (offset - _map_offset); | ||
std::memcpy(dst, src, read_size); | ||
|
@@ -184,16 +228,18 @@ class memory_mapped_source : public file_source { | |
* | ||
* Fixes nvbugs/4215160 | ||
*/ | ||
void register_mmap_buffer() | ||
void register_mmap_buffer(size_t offset, size_t size) | ||
{ | ||
if (_map_addr == nullptr or _map_size == 0 or not pageableMemoryAccessUsesHostPageTables()) { | ||
return; | ||
} | ||
if (_map_addr == nullptr or not pageableMemoryAccessUsesHostPageTables()) { return; } | ||
|
||
auto const result = cudaHostRegister(_map_addr, _map_size, cudaHostRegisterDefault); | ||
if (result == cudaSuccess) { | ||
_is_map_registered = true; | ||
} else { | ||
// Registered region must be within the mapped region | ||
_reg_offset = std::max(offset, _map_offset); | ||
_reg_size = std::min(size != 0 ? size : _map_size, (_map_offset + _map_size) - _reg_offset); | ||
|
||
_reg_addr = static_cast<std::byte*>(_map_addr) - _map_offset + _reg_offset; | ||
auto const result = cudaHostRegister(_reg_addr, _reg_size, cudaHostRegisterReadOnly); | ||
if (result != cudaSuccess) { | ||
_reg_addr = nullptr; | ||
CUDF_LOG_WARN("cudaHostRegister failed with {} ({})", | ||
static_cast<int>(result), | ||
cudaGetErrorString(result)); | ||
|
@@ -205,10 +251,12 @@ class memory_mapped_source : public file_source { | |
*/ | ||
void unregister_mmap_buffer() | ||
{ | ||
if (not _is_map_registered) { return; } | ||
if (_reg_addr == nullptr) { return; } | ||
|
||
auto const result = cudaHostUnregister(_map_addr); | ||
if (result != cudaSuccess) { | ||
auto const result = cudaHostUnregister(_reg_addr); | ||
if (result == cudaSuccess) { | ||
_reg_addr = nullptr; | ||
} else { | ||
CUDF_LOG_WARN("cudaHostUnregister failed with {} ({})", | ||
static_cast<int>(result), | ||
cudaGetErrorString(result)); | ||
|
@@ -226,52 +274,30 @@ class memory_mapped_source : public file_source { | |
|
||
// Size for `mmap()` needs to include the page padding | ||
_map_size = size + (offset - _map_offset); | ||
if (_map_size == 0) { return; } | ||
|
||
// Check if accessing a region within already mapped area | ||
_map_addr = mmap(nullptr, _map_size, PROT_READ, MAP_PRIVATE, fd, _map_offset); | ||
CUDF_EXPECTS(_map_addr != MAP_FAILED, "Cannot create memory mapping"); | ||
} | ||
|
||
private: | ||
size_t _map_size = 0; | ||
size_t _map_offset = 0; | ||
void* _map_addr = nullptr; | ||
bool _is_map_registered = false; | ||
}; | ||
|
||
/** | ||
* @brief Implementation class for reading from a file using `read` calls | ||
* | ||
* Potentially faster than `memory_mapped_source` when only a small portion of the file is read | ||
* through the host. | ||
*/ | ||
class direct_read_source : public file_source { | ||
public: | ||
explicit direct_read_source(char const* filepath) : file_source(filepath) {} | ||
|
||
std::unique_ptr<buffer> host_read(size_t offset, size_t size) override | ||
void unmap() | ||
{ | ||
lseek(_file.desc(), offset, SEEK_SET); | ||
|
||
// Clamp length to available data | ||
ssize_t const read_size = std::min(size, _file.size() - offset); | ||
|
||
std::vector<uint8_t> v(read_size); | ||
CUDF_EXPECTS(read(_file.desc(), v.data(), read_size) == read_size, "read failed"); | ||
return buffer::create(std::move(v)); | ||
if (_map_addr != nullptr) { | ||
auto const result = munmap(_map_addr, _map_size); | ||
if (result != 0) { CUDF_LOG_WARN("munmap failed with {}", result); } | ||
_map_addr = nullptr; | ||
} | ||
} | ||
|
||
size_t host_read(size_t offset, size_t size, uint8_t* dst) override | ||
{ | ||
lseek(_file.desc(), offset, SEEK_SET); | ||
|
||
// Clamp length to available data | ||
auto const read_size = std::min(size, _file.size() - offset); | ||
private: | ||
size_t _map_offset = 0; | ||
size_t _map_size = 0; | ||
void* _map_addr = nullptr; | ||
|
||
CUDF_EXPECTS(read(_file.desc(), dst, read_size) == static_cast<ssize_t>(read_size), | ||
"read failed"); | ||
return read_size; | ||
} | ||
size_t _reg_offset = 0; | ||
size_t _reg_size = 0; | ||
void* _reg_addr = nullptr; | ||
}; | ||
|
||
/** | ||
|
@@ -431,16 +457,18 @@ class user_datasource_wrapper : public datasource { | |
|
||
std::unique_ptr<datasource> datasource::create(std::string const& filepath, | ||
size_t offset, | ||
size_t size) | ||
size_t max_size_estimate, | ||
size_t min_size_estimate) | ||
{ | ||
#ifdef CUFILE_FOUND | ||
if (cufile_integration::is_always_enabled()) { | ||
// avoid mmap as GDS is expected to be used for most reads | ||
return std::make_unique<direct_read_source>(filepath.c_str()); | ||
return std::make_unique<file_source>(filepath.c_str()); | ||
} | ||
#endif | ||
// Use our own memory mapping implementation for direct file reads | ||
return std::make_unique<memory_mapped_source>(filepath.c_str(), offset, size); | ||
return std::make_unique<memory_mapped_source>( | ||
filepath.c_str(), offset, max_size_estimate, min_size_estimate); | ||
} | ||
|
||
std::unique_ptr<datasource> datasource::create(host_buffer const& buffer) | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the max includes the padding for the last row when reading a byte range. We're now passing the byte range size without padding as well, for the operations that should not include the padding, such as the buffer registration (because registering overlapping ranges fails, and this can happen when we read a CSV or a JSON file in chunks).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we make that use case more obvious in the docstrings? I wouldn't know the difference between padded/non-padded and registration restrictions from this docstring.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this need to throw if
max < min
? If so that@throws
would go into the docstring.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added docs here and in
memory_mapped_source
constructor. I think they add up to a full explanation, let me know what you think :)There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is great! Thanks.