Skip to content

Commit

Permalink
Implement improved cudf::io::datasource::create() functionality.
Browse files Browse the repository at this point in the history
Introduce new `datasource_kind` and `datasource_params` data types, and
update the cudf::io::datasource::create() signature to allow
parameterized datasource creation.

Additionally, implement new datasources:

    - host_source: base class that does simple host-based pread() calls
    - odirect_source: derivation of above that uses O_DIRECT
    - kvikio_source: simple Kvikio-based class (that does not fall back
      to mmap)
  • Loading branch information
tpn committed Oct 21, 2024
1 parent 074ab74 commit fe981b1
Show file tree
Hide file tree
Showing 2 changed files with 834 additions and 23 deletions.
267 changes: 266 additions & 1 deletion cpp/include/cudf/io/datasource.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

#include <future>
#include <memory>
#include <variant>

namespace CUDF_EXPORT cudf {
//! IO interfaces
Expand All @@ -36,6 +37,212 @@ namespace io {
* @file
*/

/**
* @brief Kind of data source to create when calling
* `cudf::io::datasource::create()`.
*
* @see cudf::io::datasource::create()
* @see cudf::io::datasource_params
*
* N.B. GDS = GPUDirect Storage
*/
enum class datasource_kind {
/**
* @brief Kvikio-based data source (default).
*
* This data source is the default for cuDF, and should be the most performant
* option for most use cases. It supports GDS where possible, falling back to
* multi-threaded host-based reads when GDS is not available.
*
* It supports asynchronous reads, and will use the provided CUDA stream for
* all I/O operations when possible.
*/
KVIKIO = 0,
DEFAULT = KVIKIO,

/**
* @brief Kvikio-based data source that does not attempt to use GDS, instead
* falling back to multi-threaded host-based reads.
*
* It supports asynchronous reads, but does not do any stream synchronization,
* as the reads are all performed on the host.
*/
KVIKIO_COMPAT,

/**
* @brief Kvikio-based data source that will fail if GDS is not available.
* Specifically, `cudf::io::datasource::create()` when called with this kind
* of data source will throw a `cudf::logic_error` if GDS is not available.
*/
KVIKIO_GDS,

/**
* @brief Host-based data source that does not support any device or async
* operations.
*
* All reads are performed via standard POSIX pread() calls. No
* multi-threading or asynchronous operations are supported.
*
* The primary purpose of this datasource type is to be a base class for the
* `O_DIRECT` implementation, which needs to issue pread() calls against a
* file descriptor that *hasn't* been opened with `O_DIRECT` if certain
* constraints aren't met (specifically: when reading the final bytes of a
* file that isn't perfectly aligned to a sector-size boundary).
*
* The time required to service reads from this data source will be affected
* by the presence or absence of the desired data in the Linux page cache.
* Thus, back-to-back runs of the same file will have significantly different
* performance characteristics, depending on whether the data is in the page
* cache or not.
*
* Generally, this data source should be avoided in favor of the `KVIKIO`
* data source, which will be more performant in most cases. Thus, it can
* be used as a baseline for which improved `KVIKIO` performance can be
* empirically measured.
*/
HOST,

/**
* @brief Host-based data source that issues reads against a file descriptor
* opened with `O_DIRECT`, where possible, bypassing the Linux page cache.
*
* This data source will always result in the slowest possible read times,
* as all reads are serviced directly from the underlying device. However,
* it will be consistently slow, and that consistency can be critical when
* benchmarking or profiling changes purporting to improve performance in
* unrelated areas.
*
* Thus, the primary use case for this data source is for benchmarking and
* profiling purposes, where you want to eliminate any runtime variance in
* back-to-back runs that would be caused by the presence or absence of data
* in the host's page cache.
*
* A secondary use case for this data source is when you specifically do not
* want to pollute the host's page cache with the data being read, either
* because it won't be read again soon, or you want to remove the memory
* pressure (or small but non-trivial amount of compute overhead) that would
* otherwise be introduced by servicing I/O through the page cache. In some
* scenarios, this can yield a net performance improvement, despite a higher
* per-read latency.
*
* A real-life example of how this can manifest is when doing very large TPC-H
* or TPC-DS runs, where the data set is orders of magnitude larger than the
* available host memory, e.g. 30TB or 100TB runs on hosts with <= 4TB of RAM.
*
* For certain queries--typically read-heavy, join-heavy ones--doing `O_DIRECT`
* reads can result in a net performance improvement, as the host's page cache
* won't be polluted with data that will never be read again, and compute
* overhead associated with cache thrashing when memory is tight is
* eliminated.
*/
ODIRECT,

/**
* @brief Host-based data source that uses memory mapped files to satisfy
* read requests.
*
* Note that this can result in pathological performance problems in certain
* environments, such as when small reads are done against files residing on
* a network file system (including accelerated file systems like WekaFS).
*/
HOST_MMAP,

/**
* @brief This is a special sentinel value that is used to indicate the
* datasource is not one of the publicly available types above.
*
* N.B. You cannot create a datasource of this kind directly via create().
*/
OTHER,
};

/**
* @brief Parameters for the kvikio data source.
*/
struct kvikio_datasource_params {
/**
* @brief When set, explicitly disables any attempts at using GPUDirect
* Storage, resulting in kvikio falling back to its "compat" mode using
* multi-threaded host-based reads.
*
* Defaults to false.
*
* N.B. Compat mode will still be used if GDS isn't available, regardless
* of the value of this parameter.
*/
bool use_compat_mode{false};

/**
* @brief The threshold at which the data source will switch from using
* host-based reads to device-based (i.e. GPUDirect) reads, if GPUDirect is
* available.
*
* This parameter should represent the read size where GDS is faster than
* a posix read() plus the overhead of a host-to-device memcpy.
*
* Defaults to 128KB.
*/
size_t device_read_threshold{128 << 10};

/**
* @brief The number of threads in the kvikio thread pool.
*
* This parameter only applies to the kvikio data source when GDS is not
* available and it is in compat mode.
*
* Defaults to 0, which defers the thread pool sizing to kvikio.
*/
uint16_t num_threads{0};

/**
* @brief The size in bytes into which I/O operations will be split.
*
* Defaults to 1MB.
*/
size_t task_size{1 << 20};
};

/**
* @brief Parameters for the `O_DIRECT` data source.
*/
struct odirect_datasource_params {
/**
* @brief The sector size, in bytes, to use for alignment when issuing
* `O_DIRECT` reads. This size dictates the alignment used for three things:
* the file offset, the buffer address, and the buffer size. It *must* be a
* multiple of the underlying device's sector size, which is typically 512
* bytes. A larger size is fine as long as it's a multiple of the device's
* sector size.
*
* Defaults to 4096.
*
* N.B. On Linux, you can determine the sector size of a device with the
* the `blockdev` command, e.g.: `sudo blockdev --getss /dev/sda`.
*/
size_t sector_size{4096};

/**
* @brief The minimum permissible sector size. All sector sizes must be a
* multiple of this value. This is hardcoded to 512 bytes as a simple means
* to catch misconfigurations. The underlying device's sector size may be
* larger, but it will certainly be a multiple of this value.
*/
static constexpr size_t min_sector_size{512};

/**
* @brief Returns true iff the sector size is a multiple of the minimum sector
* size.
*/
[[nodiscard]] bool is_valid_sector_size() const {
return ((sector_size > 0) && ((sector_size % min_sector_size) == 0));
}
};

/**
* @brief Union of parameters for different data sources.
*/
using datasource_params = std::variant<kvikio_datasource_params, odirect_datasource_params>;

/**
* @brief Interface class for providing input data to the readers.
*/
Expand Down Expand Up @@ -92,15 +299,23 @@ class datasource {
* this case, `max_size_estimate` can include padding after the byte range, to include additional
* data that may be needed for processing.
*
* @throws cudf::logic_error if the minimum size estimate is greater than the maximum size estimate
* @throws cudf::logic_error if `KVIKIO_GDS` is specified as the desired kind of data source,
* and GDS is not available for the file.
*
* @param[in] filepath Path to the file to use
* @param[in] offset Starting byte offset from which data will be read (the default is zero)
* @param[in] max_size_estimate Upper estimate of the data range that will be read (the default is
* zero, which means the whole file after `offset`)
* @param[in] kind Optionally supplies the kind of data source to create
* @param[in] params Optionally supplies parameters for the data source
* @return Constructed datasource object
*/
static std::unique_ptr<datasource> create(std::string const& filepath,
size_t offset = 0,
size_t max_size_estimate = 0);
size_t max_size_estimate = 0,
datasource_kind kind = datasource_kind::DEFAULT,
std::optional<const datasource_params> params = std::nullopt);

/**
* @brief Creates a source from a host memory buffer.
Expand Down Expand Up @@ -291,6 +506,37 @@ class datasource {
*/
[[nodiscard]] virtual bool is_empty() const { return size() == 0; }

/**
* @brief Returns the appropriate size, in bytes, of a read request, given
* the supplied requested size and offset.
*
* The returned size is clamped to ensure it does not exceed the total size
* of the data source, once the requested size and offset are taken into
* account.
*
* @param requested_size[in] Supplies the desired size of the read request,
* in bytes.
*
* @param offset[in] Supplies the offset, in bytes, from the start of the
* data.
*
* @return The size of the read request in bytes. This will be the minimum
* of the requested size and the remaining size of the data source after the
* offset. If the offset is beyond the end of the data source, this will
* return 0.
*/
[[nodiscard]] size_t get_read_size(size_t requested_size, size_t offset) const
{
return std::min(requested_size, size() > offset ? size() - offset : 0);
}

/**
* @brief Returns the kind of data source.
*
* @return The kind of data source.
*/
[[nodiscard]] datasource_kind kind() const { return _kind; }

/**
* @brief Implementation for non owning buffer where datasource holds buffer until destruction.
*/
Expand Down Expand Up @@ -380,6 +626,25 @@ class datasource {
void const* _data_ptr;
size_t _size;
};

protected:
/**
* @brief Constructor for the datasource object.
*
* @param kind The kind of data source
*/
datasource(datasource_kind kind) : _kind(kind) {}

/**
* @brief Sets the kind of data source.
*
* @note This is intended for use by derived classes that need to change the
* kind of data source after construction.
*/
void set_datasource_kind(datasource_kind kind) { _kind = kind; }

private:
datasource_kind _kind{datasource_kind::DEFAULT};
};

/** @} */ // end of group
Expand Down
Loading

0 comments on commit fe981b1

Please sign in to comment.