Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable direct ingestion and production of Arrow scalars #14121

Merged
merged 59 commits into from
Sep 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
3aa461f
Initial version of scalar
vyasr Aug 31, 2023
1c4f990
Implement basic scatter algorithm with Scalar
vyasr Aug 31, 2023
8ad8367
Build DeviceScalar around pylibcudf.Scalar
vyasr Sep 1, 2023
c147972
Fix some initialization
vyasr Sep 1, 2023
0e5e5d3
Add missing no_gc_clear
vyasr Sep 1, 2023
e300884
Implement from_libcudf
vyasr Sep 1, 2023
4a2a436
Remove mr from DeviceScalar
vyasr Sep 1, 2023
abd5550
Inline _set_value
vyasr Sep 1, 2023
e8f6847
Add a comment
vyasr Sep 5, 2023
8360f80
Implement from_arrow
vyasr Sep 6, 2023
8814772
Add to package
vyasr Sep 7, 2023
3a04947
Implement release for gpumemoryview
vyasr Sep 7, 2023
032adbb
Add proper constructor for scalar from pyarrow scalar
vyasr Sep 7, 2023
16f4528
Construct pylibcudf.Scalar for numeric types
vyasr Sep 7, 2023
b75949a
Construct pylibcudf.Scalar for string types
vyasr Sep 7, 2023
8d46046
Add comment
vyasr Sep 7, 2023
ce164a0
Stop using reduction in favor of get_element
vyasr Sep 7, 2023
a8d8cb6
Move construction from pyarrow scalar to a factory
vyasr Sep 8, 2023
3fd071e
Implement from_arrow at the C level
vyasr Sep 8, 2023
33abe2b
Enable datetime
vyasr Sep 8, 2023
c6b595d
Enable timedeltas
vyasr Sep 8, 2023
ec4b510
Enable lists
vyasr Sep 8, 2023
636242b
Enable structs
vyasr Sep 9, 2023
4c08dc3
Make list and struct code paths more parallel
vyasr Sep 9, 2023
50bbdc0
Unify builders and use recursion for all paths
vyasr Sep 9, 2023
7b34df4
Enable decimals
vyasr Sep 9, 2023
392407f
Some cleanup
vyasr Sep 9, 2023
f5def95
Combine nestrepl
vyasr Sep 9, 2023
16cef15
Unify constructor
vyasr Sep 9, 2023
edcca20
Copy to avoid overwriting inputs
vyasr Sep 9, 2023
c81e158
Add error check for nested null in scalar and fix test
vyasr Sep 11, 2023
14b2f86
Initial implementation of to_arrow
vyasr Sep 11, 2023
cd80fe0
Add from_arrow_scalar to Cython API
vyasr Sep 11, 2023
5fffc4f
Add Python wrappers for to_arrow
vyasr Sep 11, 2023
cb52d2b
Add pylibcudf converter
vyasr Sep 11, 2023
643d992
Fix bugs in impl
vyasr Sep 11, 2023
309cdfa
Make to_arrow work for all column types
vyasr Sep 11, 2023
4e4bc64
Add to_arrow overload for 32-bit decimal
vyasr Sep 12, 2023
c62c71b
Add support for list/struct up to output field naming
vyasr Sep 14, 2023
d6eee4b
Fully implement metadata
vyasr Sep 14, 2023
1591bdb
Split up detail and public APIs
vyasr Sep 14, 2023
7fff56d
Add streams to public APIs
vyasr Sep 14, 2023
00050f6
Add tests of from_arrow
vyasr Sep 14, 2023
1c2570b
Add tests of to_arrow
vyasr Sep 14, 2023
663fde0
Add stream tests and fix missing stream usage
vyasr Sep 14, 2023
cf30fef
Also add stream tests for the table APIs
vyasr Sep 15, 2023
49b2693
Revert Python changes
vyasr Sep 15, 2023
9b7f240
Switch back to older implementation of writing decimal bytes directly…
vyasr Sep 18, 2023
3d4ac77
Specify unroll
vyasr Sep 18, 2023
9311663
Remove unnecessary includes
vyasr Sep 18, 2023
4313212
Pass in precision instead of computing
vyasr Sep 18, 2023
5f5a542
Merge remote-tracking branch 'origin/branch-23.10' into feat/libcudf_…
vyasr Sep 19, 2023
2aade20
Address reviews
vyasr Sep 19, 2023
f80f089
Merge branch 'branch-23.10' into feat/libcudf_scalar_to_arrow
vyasr Sep 21, 2023
37376e0
PR reviews
vyasr Sep 21, 2023
11d3d12
Merge remote-tracking branch 'origin/branch-23.10' into feat/libcudf_…
vyasr Sep 21, 2023
a46db3f
Merge branch 'branch-23.10' into feat/libcudf_scalar_to_arrow
vyasr Sep 22, 2023
e42f04d
Merge remote-tracking branch 'origin/branch-23.10' into feat/libcudf_…
vyasr Sep 22, 2023
637e0a4
Update cpp/include/cudf/interop.hpp
vyasr Sep 22, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 74 additions & 6 deletions cpp/include/cudf/detail/interop.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,27 +104,95 @@ std::shared_ptr<arrow::Array> to_arrow_array(cudf::type_id id, Ts&&... args)
}
}

/**
* @brief Invokes an `operator()` template with the type instantiation based on
* the specified `arrow::DataType`'s `id()`.
*
* This function is analogous to libcudf's type_dispatcher, but instead applies
* to Arrow functions. Its primary use case is to leverage Arrow's
* metaprogramming facilities like arrow::TypeTraits that require translating
* the runtime dtype information into compile-time types.
*/
template <typename Functor, typename... Ts>
constexpr decltype(auto) arrow_type_dispatcher(arrow::DataType const& dtype,
Functor f,
Ts&&... args)
{
switch (dtype.id()) {
case arrow::Type::INT8:
return f.template operator()<arrow::Int8Type>(std::forward<Ts>(args)...);
case arrow::Type::INT16:
return f.template operator()<arrow::Int16Type>(std::forward<Ts>(args)...);
case arrow::Type::INT32:
return f.template operator()<arrow::Int32Type>(std::forward<Ts>(args)...);
case arrow::Type::INT64:
return f.template operator()<arrow::Int64Type>(std::forward<Ts>(args)...);
case arrow::Type::UINT8:
return f.template operator()<arrow::UInt8Type>(std::forward<Ts>(args)...);
case arrow::Type::UINT16:
return f.template operator()<arrow::UInt16Type>(std::forward<Ts>(args)...);
case arrow::Type::UINT32:
return f.template operator()<arrow::UInt32Type>(std::forward<Ts>(args)...);
case arrow::Type::UINT64:
return f.template operator()<arrow::UInt64Type>(std::forward<Ts>(args)...);
case arrow::Type::FLOAT:
return f.template operator()<arrow::FloatType>(std::forward<Ts>(args)...);
case arrow::Type::DOUBLE:
return f.template operator()<arrow::DoubleType>(std::forward<Ts>(args)...);
case arrow::Type::BOOL:
return f.template operator()<arrow::BooleanType>(std::forward<Ts>(args)...);
case arrow::Type::TIMESTAMP:
return f.template operator()<arrow::TimestampType>(std::forward<Ts>(args)...);
case arrow::Type::DURATION:
return f.template operator()<arrow::DurationType>(std::forward<Ts>(args)...);
case arrow::Type::STRING:
return f.template operator()<arrow::StringType>(std::forward<Ts>(args)...);
case arrow::Type::LIST:
return f.template operator()<arrow::ListType>(std::forward<Ts>(args)...);
case arrow::Type::DECIMAL128:
return f.template operator()<arrow::Decimal128Type>(std::forward<Ts>(args)...);
case arrow::Type::STRUCT:
return f.template operator()<arrow::StructType>(std::forward<Ts>(args)...);
default: {
CUDF_FAIL("Invalid type.");
}
}
}

// Converting arrow type to cudf type
data_type arrow_to_cudf_type(arrow::DataType const& arrow_type);

/**
* @copydoc cudf::to_arrow
*
* @param stream CUDA stream used for device memory operations and kernel launches.
* @copydoc cudf::to_arrow(table_view input, std::vector<column_metadata> const& metadata,
* rmm::cuda_stream_view stream, arrow::MemoryPool* ar_mr)
*/
std::shared_ptr<arrow::Table> to_arrow(table_view input,
std::vector<column_metadata> const& metadata,
rmm::cuda_stream_view stream,
arrow::MemoryPool* ar_mr);

/**
* @copydoc cudf::arrow_to_cudf
*
* @param stream CUDA stream used for device memory operations and kernel launches.
* @copydoc cudf::to_arrow(cudf::scalar const& input, column_metadata const& metadata,
* rmm::cuda_stream_view stream, arrow::MemoryPool* ar_mr)
*/
std::shared_ptr<arrow::Scalar> to_arrow(cudf::scalar const& input,
column_metadata const& metadata,
rmm::cuda_stream_view stream,
arrow::MemoryPool* ar_mr);
/**
* @copydoc cudf::from_arrow(arrow::Table const& input_table, rmm::cuda_stream_view stream,
* rmm::mr::device_memory_resource* mr)
*/
std::unique_ptr<table> from_arrow(arrow::Table const& input_table,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr);

/**
* @copydoc cudf::from_arrow(arrow::Scalar const& input, rmm::cuda_stream_view stream,
* rmm::mr::device_memory_resource* mr)
*/
std::unique_ptr<cudf::scalar> from_arrow(arrow::Scalar const& input,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr);
} // namespace detail
} // namespace cudf
35 changes: 34 additions & 1 deletion cpp/include/cudf/interop.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,23 +126,56 @@ struct column_metadata {
*
* @param input table_view that needs to be converted to arrow Table
* @param metadata Contains hierarchy of names of columns and children
* @param stream CUDA stream used for device memory operations and kernel launches
* @param ar_mr arrow memory pool to allocate memory for arrow Table
* @return arrow Table generated from `input`
*/
std::shared_ptr<arrow::Table> to_arrow(table_view input,
std::vector<column_metadata> const& metadata = {},
arrow::MemoryPool* ar_mr = arrow::default_memory_pool());
rmm::cuda_stream_view stream = cudf::get_default_stream(),
arrow::MemoryPool* ar_mr = arrow::default_memory_pool());

/**
* @brief Create `arrow::Scalar` from cudf scalar `input`
*
* Converts the `cudf::scalar` to `arrow::Scalar`.
*
* @param input scalar that needs to be converted to arrow Scalar
* @param metadata Contains hierarchy of names of columns and children
* @param stream CUDA stream used for device memory operations and kernel launches
* @param ar_mr arrow memory pool to allocate memory for arrow Scalar
* @return arrow Scalar generated from `input`
*/
std::shared_ptr<arrow::Scalar> to_arrow(cudf::scalar const& input,
column_metadata const& metadata = {},
rmm::cuda_stream_view stream = cudf::get_default_stream(),
arrow::MemoryPool* ar_mr = arrow::default_memory_pool());
/**
* @brief Create `cudf::table` from given arrow Table input
*
* @param input arrow:Table that needs to be converted to `cudf::table`
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource used to allocate `cudf::table`
* @return cudf table generated from given arrow Table
*/

std::unique_ptr<table> from_arrow(
arrow::Table const& input,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/**
* @brief Create `cudf::scalar` from given arrow Scalar input
*
* @param input `arrow::Scalar` that needs to be converted to `cudf::scalar`
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource used to allocate `cudf::scalar`
* @return cudf scalar generated from given arrow Scalar
*/

std::unique_ptr<cudf::scalar> from_arrow(
arrow::Scalar const& input,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/** @} */ // end of group
Expand Down
88 changes: 87 additions & 1 deletion cpp/src/interop/from_arrow.cu
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,52 @@ std::unique_ptr<column> get_column(arrow::Array const& array,
: get_empty_type_column(array.length());
}

struct BuilderGenerator {
template <typename T,
CUDF_ENABLE_IF(!std::is_same_v<T, arrow::ListType> &&
!std::is_same_v<T, arrow::StructType>)>
std::shared_ptr<arrow::ArrayBuilder> operator()(std::shared_ptr<arrow::DataType> const& type)
{
return std::make_shared<typename arrow::TypeTraits<T>::BuilderType>(
type, arrow::default_memory_pool());
}

template <typename T,
CUDF_ENABLE_IF(std::is_same_v<T, arrow::ListType> ||
std::is_same_v<T, arrow::StructType>)>
std::shared_ptr<arrow::ArrayBuilder> operator()(std::shared_ptr<arrow::DataType> const& type)
{
CUDF_FAIL("Type not supported by BuilderGenerator");
}
};

std::shared_ptr<arrow::ArrayBuilder> make_builder(std::shared_ptr<arrow::DataType> const& type)
{
switch (type->id()) {
case arrow::Type::STRUCT: {
std::vector<std::shared_ptr<arrow::ArrayBuilder>> field_builders;

for (auto field : type->fields()) {
auto const vt = field->type();
if (vt->id() == arrow::Type::STRUCT || vt->id() == arrow::Type::LIST) {
field_builders.push_back(make_builder(vt));
} else {
field_builders.push_back(arrow_type_dispatcher(*vt, BuilderGenerator{}, vt));
}
}
return std::make_shared<arrow::StructBuilder>(
type, arrow::default_memory_pool(), field_builders);
}
case arrow::Type::LIST: {
return std::make_shared<arrow::ListBuilder>(arrow::default_memory_pool(),
make_builder(type->field(0)->type()));
}
default: {
return arrow_type_dispatcher(*type, BuilderGenerator{}, type);
}
}
}

} // namespace

std::unique_ptr<table> from_arrow(arrow::Table const& input_table,
Expand Down Expand Up @@ -462,14 +508,54 @@ std::unique_ptr<table> from_arrow(arrow::Table const& input_table,
return std::make_unique<table>(std::move(columns));
}

std::unique_ptr<cudf::scalar> from_arrow(arrow::Scalar const& input,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
// Get a builder for the scalar type
auto builder = detail::make_builder(input.type);

auto status = builder->AppendScalar(input);
if (status != arrow::Status::OK()) {
if (status.IsNotImplemented()) {
// The only known failure case here is for nulls
CUDF_FAIL("Cannot create untyped null scalars or nested types with untyped null leaf nodes",
std::invalid_argument);
}
CUDF_FAIL("Arrow ArrayBuilder::AppendScalar failed");
}

auto maybe_array = builder->Finish();
if (!maybe_array.ok()) { CUDF_FAIL("Arrow ArrayBuilder::Finish failed"); }
auto array = *maybe_array;

auto field = arrow::field("", input.type);

auto table = arrow::Table::Make(arrow::schema({field}), {array});

auto cudf_table = detail::from_arrow(*table, stream, mr);

auto cv = cudf_table->view().column(0);
return get_element(cv, 0, stream);
}

} // namespace detail

std::unique_ptr<table> from_arrow(arrow::Table const& input_table,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
CUDF_FUNC_RANGE();

return detail::from_arrow(input_table, cudf::get_default_stream(), mr);
return detail::from_arrow(input_table, stream, mr);
}

std::unique_ptr<cudf::scalar> from_arrow(arrow::Scalar const& input,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
CUDF_FUNC_RANGE();

return detail::from_arrow(input, stream, mr);
}
} // namespace cudf
Loading