Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support negative preceding/following for ROW window functions #14093

Merged
merged 18 commits into from
Sep 21, 2023
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 23 additions & 10 deletions cpp/src/rolling/detail/rolling.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,22 @@ namespace cudf {

namespace detail {

namespace { // anonymous
/// Helper function to materialize preceding/following offsets.
template <typename Calculator>
std::unique_ptr<column> expand_to_column(Calculator const& calc,
size_type const& num_rows,
rmm::cuda_stream_view stream)
{
auto window_column = cudf::make_numeric_column(
cudf::data_type{type_to_id<size_type>()}, num_rows, cudf::mask_state::UNALLOCATED, stream);

auto begin = cudf::detail::make_counting_transform_iterator(0, calc);

thrust::copy_n(
rmm::exec_policy(stream), begin, num_rows, window_column->mutable_view().data<size_type>());

return window_column;
}

/**
* @brief Operator for applying a generic (non-specialized) rolling aggregation on a single window.
Expand All @@ -91,14 +106,14 @@ struct DeviceRolling {

// operations we do support
template <typename T = InputType, aggregation::Kind O = op>
DeviceRolling(size_type _min_periods, std::enable_if_t<is_supported<T, O>()>* = nullptr)
explicit DeviceRolling(size_type _min_periods, std::enable_if_t<is_supported<T, O>()>* = nullptr)
: min_periods(_min_periods)
{
}

// operations we don't support
template <typename T = InputType, aggregation::Kind O = op>
DeviceRolling(size_type _min_periods, std::enable_if_t<!is_supported<T, O>()>* = nullptr)
explicit DeviceRolling(size_type _min_periods, std::enable_if_t<!is_supported<T, O>()>* = nullptr)
: min_periods(_min_periods)
{
CUDF_FAIL("Invalid aggregation/type pair");
Expand All @@ -111,7 +126,7 @@ struct DeviceRolling {
mutable_column_device_view& output,
size_type start_index,
size_type end_index,
size_type current_index)
size_type current_index) const
{
using AggOp = typename corresponding_operator<op>::type;
AggOp agg_op;
Expand Down Expand Up @@ -144,7 +159,7 @@ struct DeviceRolling {
template <typename InputType, aggregation::Kind op>
struct DeviceRollingArgMinMaxBase {
size_type min_periods;
DeviceRollingArgMinMaxBase(size_type _min_periods) : min_periods(_min_periods) {}
explicit DeviceRollingArgMinMaxBase(size_type _min_periods) : min_periods(_min_periods) {}

static constexpr bool is_supported()
{
Expand All @@ -162,7 +177,7 @@ struct DeviceRollingArgMinMaxBase {
*/
template <aggregation::Kind op>
struct DeviceRollingArgMinMaxString : DeviceRollingArgMinMaxBase<cudf::string_view, op> {
DeviceRollingArgMinMaxString(size_type _min_periods)
explicit DeviceRollingArgMinMaxString(size_type _min_periods)
: DeviceRollingArgMinMaxBase<cudf::string_view, op>(_min_periods)
{
}
Expand Down Expand Up @@ -461,8 +476,8 @@ struct agg_specific_empty_output {
}
};

std::unique_ptr<column> empty_output_for_rolling_aggregation(column_view const& input,
rolling_aggregation const& agg)
static std::unique_ptr<column> empty_output_for_rolling_aggregation(column_view const& input,
rolling_aggregation const& agg)
{
// TODO:
// Ideally, for UDF aggregations, the returned column would match
Expand Down Expand Up @@ -1215,8 +1230,6 @@ struct dispatch_rolling {
}
};

} // namespace

// Applies a user-defined rolling window function to the values in a column.
template <typename PrecedingWindowIterator, typename FollowingWindowIterator>
std::unique_ptr<column> rolling_window_udf(column_view const& input,
Expand Down
30 changes: 19 additions & 11 deletions cpp/src/rolling/detail/rolling_fixed_window.cu
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022, NVIDIA CORPORATION.
* Copyright (c) 2022-2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,8 +19,9 @@
#include <cudf/detail/aggregation/aggregation.hpp>
#include <cudf/utilities/default_stream.hpp>

#include <cudf_test/column_utilities.hpp>

#include <thrust/extrema.h>
#include <thrust/iterator/constant_iterator.h>

namespace cudf::detail {

Expand All @@ -43,6 +44,9 @@ std::unique_ptr<column> rolling_window(column_view const& input,
CUDF_EXPECTS((default_outputs.is_empty() || default_outputs.size() == input.size()),
"Defaults column must be either empty or have as many rows as the input column.");

CUDF_EXPECTS(-(preceding_window - 1) <= following_window,
"Preceding window bounds must precede the following window bounds.");

if (agg.kind == aggregation::CUDA || agg.kind == aggregation::PTX) {
// TODO: In future, might need to clamp preceding/following to column boundaries.
return cudf::detail::rolling_window_udf(input,
Expand All @@ -58,18 +62,22 @@ std::unique_ptr<column> rolling_window(column_view const& input,
// Clamp preceding/following to column boundaries.
// E.g. If preceding_window == 2, then for a column of 5 elements, preceding_window will be:
// [1, 2, 2, 2, 1]
auto const preceding_window_begin = cudf::detail::make_counting_transform_iterator(
0,
[preceding_window] __device__(size_type i) { return thrust::min(i + 1, preceding_window); });
auto const following_window_begin = cudf::detail::make_counting_transform_iterator(
0, [col_size = input.size(), following_window] __device__(size_type i) {
return thrust::min(col_size - i - 1, following_window);
});

auto const preceding_calc = [preceding_window] __device__(size_type i) {
return thrust::min(i + 1, preceding_window);
};

auto const following_calc = [col_size = input.size(),
following_window] __device__(size_type i) {
return thrust::min(col_size - i - 1, following_window);
};

auto const preceding_column = expand_to_column(preceding_calc, input.size(), stream);
auto const following_column = expand_to_column(following_calc, input.size(), stream);
return cudf::detail::rolling_window(input,
default_outputs,
preceding_window_begin,
following_window_begin,
preceding_column->view().begin<cudf::size_type>(),
following_column->view().begin<cudf::size_type>(),
min_periods,
agg,
stream,
Expand Down
Loading