From 1e141d72484ade0e310754584c4bb346427d32af Mon Sep 17 00:00:00 2001 From: Ruoxi Sun Date: Sun, 29 Sep 2024 18:23:09 +0800 Subject: [PATCH 01/54] WIP --- cpp/src/arrow/CMakeLists.txt | 1 + .../arrow/compute/kernels/vector_gather.cc | 49 +++++++++++++++++++ .../compute/kernels/vector_gather_internal.h | 39 +++++++++++++++ 3 files changed, 89 insertions(+) create mode 100644 cpp/src/arrow/compute/kernels/vector_gather.cc create mode 100644 cpp/src/arrow/compute/kernels/vector_gather_internal.h diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt index c911f0f4e9481..b9d542fce8482 100644 --- a/cpp/src/arrow/CMakeLists.txt +++ b/cpp/src/arrow/CMakeLists.txt @@ -786,6 +786,7 @@ if(ARROW_COMPUTE) compute/kernels/scalar_validity.cc compute/kernels/vector_array_sort.cc compute/kernels/vector_cumulative_ops.cc + compute/kernels/vector_gather.cc compute/kernels/vector_pairwise.cc compute/kernels/vector_nested.cc compute/kernels/vector_rank.cc diff --git a/cpp/src/arrow/compute/kernels/vector_gather.cc b/cpp/src/arrow/compute/kernels/vector_gather.cc new file mode 100644 index 0000000000000..46a271099c90a --- /dev/null +++ b/cpp/src/arrow/compute/kernels/vector_gather.cc @@ -0,0 +1,49 @@ +// #include "arrow/compute/kernels/vector_gather_internal.h" +#include "arrow/compute/function.h" +#include "arrow/compute/kernels/codegen_internal.h" +#include "arrow/compute/registry.h" +#include "arrow/util/logging.h" + +namespace arrow::compute::internal { + +namespace { + +struct GatherKernelSignature { + InputType indices_type; + InputType value_type; + ArrayKernelExec exec; +}; + +std::unique_ptr MakeGatherFunction( + std::string name, int min_args, std::vector&& signatures, + FunctionDoc doc) { + auto func = std::make_unique(std::move(name), Arity::VarArgs(min_args), + std::move(doc)); + for (auto& signature : signatures) { + auto kernel = VectorKernel{}; + kernel.signature = KernelSignature::Make( + {std::move(signature.indices_type), std::move(signature.value_type)}, + OutputType(LastType), /*is_varargs=*/true); + kernel.exec = signature.exec; + kernel.can_execute_chunkwise = false; + DCHECK_OK(func->AddKernel(std::move(kernel))); + } + return func; +} + +const FunctionDoc gather_doc( + "Gather values from a list of inputs with an indices vector", + "The output is populated with values selected from the inputs, where each value is " + "chosen based on the corresponding index from the indices that specifies which input " + "in the list to use", + {"indices", "*inputs"}); + +} // namespace + +void RegisterVectorGather(FunctionRegistry* registry) { + std::vector signatures = {}; + DCHECK_OK(registry->AddFunction( + MakeGatherFunction("gather", /*min_args=*/2, std::move(signatures), gather_doc))); +} + +} // namespace arrow::compute::internal diff --git a/cpp/src/arrow/compute/kernels/vector_gather_internal.h b/cpp/src/arrow/compute/kernels/vector_gather_internal.h new file mode 100644 index 0000000000000..32988b1e148b8 --- /dev/null +++ b/cpp/src/arrow/compute/kernels/vector_gather_internal.h @@ -0,0 +1,39 @@ +#pragma once + +#include "arrow/compute/function.h" + +namespace arrow::compute::internal { + +// class GatherMetaFunction : public MetaFunction { +// public: +// GatherMetaFunction() : MetaFunction("gather", Arity::VarArgs(3), gather_doc) {} + +// Result ExecuteImpl(const std::vector& args, +// const FunctionOptions* options, +// ExecContext* ctx) const override { +// for (const auto& arg : args) { +// if (arg.kind() != Datum::ARRAY && arg.kind() != Datum::CHUNKED_ARRAY) { +// return Status::TypeError("Gather arguments should be array-like"); +// } +// } + +// if (!is_integer(*args[0].type())) { +// return Status::NotImplemented("Indices to gather must be integer type"); +// } + +// if (args[0].kind() == Datum::RECORD_BATCH) { +// ARROW_ASSIGN_OR_RAISE( +// std::shared_ptr out_batch, +// FilterRecordBatch(*args[0].record_batch(), args[1], options, ctx)); +// return Datum(out_batch); +// } else if (args[0].kind() == Datum::TABLE) { +// ARROW_ASSIGN_OR_RAISE(std::shared_ptr out_table, +// FilterTable(*args[0].table(), args[1], options, ctx)); +// return Datum(out_table); +// } else { +// return CallFunction("array_filter", args, options, ctx); +// } +// } +// }; + +} // namespace arrow::compute::internal From 216e217991967338a9db4b6036f4b3630764bc80 Mon Sep 17 00:00:00 2001 From: Ruoxi Sun Date: Mon, 30 Sep 2024 17:43:37 +0800 Subject: [PATCH 02/54] WIP --- cpp/src/arrow/CMakeLists.txt | 2 +- .../arrow/compute/kernels/vector_gather.cc | 49 ----- .../compute/kernels/vector_gather_internal.h | 39 ---- .../arrow/compute/kernels/vector_permute.cc | 175 ++++++++++++++++++ 4 files changed, 176 insertions(+), 89 deletions(-) delete mode 100644 cpp/src/arrow/compute/kernels/vector_gather.cc delete mode 100644 cpp/src/arrow/compute/kernels/vector_gather_internal.h create mode 100644 cpp/src/arrow/compute/kernels/vector_permute.cc diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt index b9d542fce8482..428eef04007c2 100644 --- a/cpp/src/arrow/CMakeLists.txt +++ b/cpp/src/arrow/CMakeLists.txt @@ -786,8 +786,8 @@ if(ARROW_COMPUTE) compute/kernels/scalar_validity.cc compute/kernels/vector_array_sort.cc compute/kernels/vector_cumulative_ops.cc - compute/kernels/vector_gather.cc compute/kernels/vector_pairwise.cc + compute/kernels/vector_permute.cc compute/kernels/vector_nested.cc compute/kernels/vector_rank.cc compute/kernels/vector_replace.cc diff --git a/cpp/src/arrow/compute/kernels/vector_gather.cc b/cpp/src/arrow/compute/kernels/vector_gather.cc deleted file mode 100644 index 46a271099c90a..0000000000000 --- a/cpp/src/arrow/compute/kernels/vector_gather.cc +++ /dev/null @@ -1,49 +0,0 @@ -// #include "arrow/compute/kernels/vector_gather_internal.h" -#include "arrow/compute/function.h" -#include "arrow/compute/kernels/codegen_internal.h" -#include "arrow/compute/registry.h" -#include "arrow/util/logging.h" - -namespace arrow::compute::internal { - -namespace { - -struct GatherKernelSignature { - InputType indices_type; - InputType value_type; - ArrayKernelExec exec; -}; - -std::unique_ptr MakeGatherFunction( - std::string name, int min_args, std::vector&& signatures, - FunctionDoc doc) { - auto func = std::make_unique(std::move(name), Arity::VarArgs(min_args), - std::move(doc)); - for (auto& signature : signatures) { - auto kernel = VectorKernel{}; - kernel.signature = KernelSignature::Make( - {std::move(signature.indices_type), std::move(signature.value_type)}, - OutputType(LastType), /*is_varargs=*/true); - kernel.exec = signature.exec; - kernel.can_execute_chunkwise = false; - DCHECK_OK(func->AddKernel(std::move(kernel))); - } - return func; -} - -const FunctionDoc gather_doc( - "Gather values from a list of inputs with an indices vector", - "The output is populated with values selected from the inputs, where each value is " - "chosen based on the corresponding index from the indices that specifies which input " - "in the list to use", - {"indices", "*inputs"}); - -} // namespace - -void RegisterVectorGather(FunctionRegistry* registry) { - std::vector signatures = {}; - DCHECK_OK(registry->AddFunction( - MakeGatherFunction("gather", /*min_args=*/2, std::move(signatures), gather_doc))); -} - -} // namespace arrow::compute::internal diff --git a/cpp/src/arrow/compute/kernels/vector_gather_internal.h b/cpp/src/arrow/compute/kernels/vector_gather_internal.h deleted file mode 100644 index 32988b1e148b8..0000000000000 --- a/cpp/src/arrow/compute/kernels/vector_gather_internal.h +++ /dev/null @@ -1,39 +0,0 @@ -#pragma once - -#include "arrow/compute/function.h" - -namespace arrow::compute::internal { - -// class GatherMetaFunction : public MetaFunction { -// public: -// GatherMetaFunction() : MetaFunction("gather", Arity::VarArgs(3), gather_doc) {} - -// Result ExecuteImpl(const std::vector& args, -// const FunctionOptions* options, -// ExecContext* ctx) const override { -// for (const auto& arg : args) { -// if (arg.kind() != Datum::ARRAY && arg.kind() != Datum::CHUNKED_ARRAY) { -// return Status::TypeError("Gather arguments should be array-like"); -// } -// } - -// if (!is_integer(*args[0].type())) { -// return Status::NotImplemented("Indices to gather must be integer type"); -// } - -// if (args[0].kind() == Datum::RECORD_BATCH) { -// ARROW_ASSIGN_OR_RAISE( -// std::shared_ptr out_batch, -// FilterRecordBatch(*args[0].record_batch(), args[1], options, ctx)); -// return Datum(out_batch); -// } else if (args[0].kind() == Datum::TABLE) { -// ARROW_ASSIGN_OR_RAISE(std::shared_ptr
out_table, -// FilterTable(*args[0].table(), args[1], options, ctx)); -// return Datum(out_table); -// } else { -// return CallFunction("array_filter", args, options, ctx); -// } -// } -// }; - -} // namespace arrow::compute::internal diff --git a/cpp/src/arrow/compute/kernels/vector_permute.cc b/cpp/src/arrow/compute/kernels/vector_permute.cc new file mode 100644 index 0000000000000..0c5559c6e7d91 --- /dev/null +++ b/cpp/src/arrow/compute/kernels/vector_permute.cc @@ -0,0 +1,175 @@ +// #include "arrow/compute/kernels/vector_gather_internal.h" +#include "arrow/compute/function.h" +#include "arrow/compute/kernels/codegen_internal.h" +#include "arrow/compute/registry.h" +#include "arrow/util/fixed_width_internal.h" +#include "arrow/util/logging.h" + +namespace arrow::compute::internal { + +namespace { + +template +struct FixedWidthTakeImpl { + static constexpr int kValueWidthInBits = ValueBitWidthConstant::value; + + static Status Exec(KernelContext* ctx, const ArraySpan& values, + const ArraySpan& indices, ArrayData* out_arr, int64_t factor) { +#ifndef NDEBUG + int64_t bit_width = util::FixedWidthInBits(*values.type); + DCHECK(WithFactor::value || (kValueWidthInBits == bit_width && factor == 1)); + DCHECK(!WithFactor::value || + (factor > 0 && kValueWidthInBits == 8 && // factors are used with bytes + static_cast(factor * kValueWidthInBits) == bit_width)); +#endif + const bool out_has_validity = values.MayHaveNulls() || indices.MayHaveNulls(); + + const uint8_t* src; + int64_t src_offset; + std::tie(src_offset, src) = util::OffsetPointerOfFixedBitWidthValues(values); + uint8_t* out = util::MutableFixedWidthValuesPointer(out_arr); + int64_t valid_count = 0; + arrow::internal::Gather gather{ + /*src_length=*/values.length, + src, + src_offset, + /*idx_length=*/indices.length, + /*idx=*/indices.GetValues(1), + out, + factor}; + if (out_has_validity) { + DCHECK_EQ(out_arr->offset, 0); + // out_is_valid must be zero-initiliazed, because Gather::Execute + // saves time by not having to ClearBit on every null element. + auto out_is_valid = out_arr->GetMutableValues(0); + memset(out_is_valid, 0, bit_util::BytesForBits(out_arr->length)); + valid_count = gather.template Execute( + /*src_validity=*/values, /*idx_validity=*/indices, out_is_valid); + } else { + valid_count = gather.Execute(); + } + out_arr->null_count = out_arr->length - valid_count; + return Status::OK(); + } +}; + +template