Skip to content

Commit

Permalink
Support negative preceding/following for ROW window functions (#14093)
Browse files Browse the repository at this point in the history
This commit adds support for "offset" ROW windows, where the preceding and following
window bounds are allowed to have negative values.  This allows window definitions to
exclude the current row entirely.

Prior to this change, ROW-based windows *had* to include the current row, causing
`preceding` and `following` to support only non-negative values.  Additionally, the
inclusion of the current row would count against the `min_periods` check.

The following is an example of the new "negative" semantics.  Consider the input:
```c++
auto const row = ints_column{1, 2, 3, 4};
```
If the window bounds are specified as (preceding=3, following=-1), then the window
for the third row (`3`) is `{1, 2}`.
`following=-1` indicates a "following" row *before* the current row.

A negative value for `preceding` follows the existing convention of including the
current row.  This makes it slightly more involved:
  1. `preceding=2` indicates *one* row before the current row.
  2. `preceding=1` indicates the current row.
  3. `preceding=0` indicates one row past (i.e. after) the current row.
  4. `preceding=-1` indicates two rows after the current row.
Et cetera.

`min_periods` checks continue to be honoured as before, but the requirement for
positive `min_periods` is dropped.  `min_periods` only need be non-negative.

Authors:
  - MithunR (https://github.com/mythrocks)

Approvers:
  - Divye Gala (https://github.com/divyegala)
  - Robert Maynard (https://github.com/robertmaynard)

URL: #14093
  • Loading branch information
mythrocks authored Sep 21, 2023
1 parent 05ee260 commit ec744de
Show file tree
Hide file tree
Showing 9 changed files with 552 additions and 99 deletions.
24 changes: 22 additions & 2 deletions cpp/include/cudf/rolling.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -199,10 +199,30 @@ struct window_bounds {
* column of the same type as the input. Therefore it is suggested to convert integer column types
* (especially low-precision integers) to `FLOAT32` or `FLOAT64` before doing a rolling `MEAN`.
*
* Note: `preceding_window` and `following_window` could well have negative values. This yields
* windows where the current row might not be included at all. For instance, consider a window
* defined as (preceding=3, following=-1). This produces a window from 2 (i.e. 3-1) rows preceding
* the current row, and 1 row *preceding* the current row. For the example above, the window for
* row#3 is:
*
* [ 10, 20, 10, 50, 60, 20, 30, 80, 40 ]
* <--window--> ^
* |
* current_row
*
* Similarly, `preceding` could have a negative value, indicating that the window begins at a
* position after the current row. It differs slightly from the semantics for `following`, because
* `preceding` includes the current row. Therefore:
* 1. preceding=1 => Window starts at the current row.
* 2. preceding=0 => Window starts at 1 past the current row.
* 3. preceding=-1 => Window starts at 2 past the current row. Etc.
*
* @param[in] group_keys The (pre-sorted) grouping columns
* @param[in] input The input column (to be aggregated)
* @param[in] preceding_window The static rolling window size in the backward direction
* @param[in] following_window The static rolling window size in the forward direction
* @param[in] preceding_window The static rolling window size in the backward direction (for
* positive values), or forward direction (for negative values)
* @param[in] following_window The static rolling window size in the forward direction (for positive
* values), or backward direction (for negative values)
* @param[in] min_periods Minimum number of observations in window required to have a value,
* otherwise element `i` is null.
* @param[in] aggr The rolling window aggregation type (SUM, MAX, MIN, etc.)
Expand Down
33 changes: 23 additions & 10 deletions cpp/src/rolling/detail/rolling.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,22 @@ namespace cudf {

namespace detail {

namespace { // anonymous
/// Helper function to materialize preceding/following offsets.
template <typename Calculator>
std::unique_ptr<column> expand_to_column(Calculator const& calc,
size_type const& num_rows,
rmm::cuda_stream_view stream)
{
auto window_column = cudf::make_numeric_column(
cudf::data_type{type_to_id<size_type>()}, num_rows, cudf::mask_state::UNALLOCATED, stream);

auto begin = cudf::detail::make_counting_transform_iterator(0, calc);

thrust::copy_n(
rmm::exec_policy(stream), begin, num_rows, window_column->mutable_view().data<size_type>());

return window_column;
}

/**
* @brief Operator for applying a generic (non-specialized) rolling aggregation on a single window.
Expand All @@ -91,14 +106,14 @@ struct DeviceRolling {

// operations we do support
template <typename T = InputType, aggregation::Kind O = op>
DeviceRolling(size_type _min_periods, std::enable_if_t<is_supported<T, O>()>* = nullptr)
explicit DeviceRolling(size_type _min_periods, std::enable_if_t<is_supported<T, O>()>* = nullptr)
: min_periods(_min_periods)
{
}

// operations we don't support
template <typename T = InputType, aggregation::Kind O = op>
DeviceRolling(size_type _min_periods, std::enable_if_t<!is_supported<T, O>()>* = nullptr)
explicit DeviceRolling(size_type _min_periods, std::enable_if_t<!is_supported<T, O>()>* = nullptr)
: min_periods(_min_periods)
{
CUDF_FAIL("Invalid aggregation/type pair");
Expand All @@ -111,7 +126,7 @@ struct DeviceRolling {
mutable_column_device_view& output,
size_type start_index,
size_type end_index,
size_type current_index)
size_type current_index) const
{
using AggOp = typename corresponding_operator<op>::type;
AggOp agg_op;
Expand Down Expand Up @@ -144,7 +159,7 @@ struct DeviceRolling {
template <typename InputType, aggregation::Kind op>
struct DeviceRollingArgMinMaxBase {
size_type min_periods;
DeviceRollingArgMinMaxBase(size_type _min_periods) : min_periods(_min_periods) {}
explicit DeviceRollingArgMinMaxBase(size_type _min_periods) : min_periods(_min_periods) {}

static constexpr bool is_supported()
{
Expand All @@ -162,7 +177,7 @@ struct DeviceRollingArgMinMaxBase {
*/
template <aggregation::Kind op>
struct DeviceRollingArgMinMaxString : DeviceRollingArgMinMaxBase<cudf::string_view, op> {
DeviceRollingArgMinMaxString(size_type _min_periods)
explicit DeviceRollingArgMinMaxString(size_type _min_periods)
: DeviceRollingArgMinMaxBase<cudf::string_view, op>(_min_periods)
{
}
Expand Down Expand Up @@ -461,8 +476,8 @@ struct agg_specific_empty_output {
}
};

std::unique_ptr<column> empty_output_for_rolling_aggregation(column_view const& input,
rolling_aggregation const& agg)
static std::unique_ptr<column> empty_output_for_rolling_aggregation(column_view const& input,
rolling_aggregation const& agg)
{
// TODO:
// Ideally, for UDF aggregations, the returned column would match
Expand Down Expand Up @@ -1215,8 +1230,6 @@ struct dispatch_rolling {
}
};

} // namespace

// Applies a user-defined rolling window function to the values in a column.
template <typename PrecedingWindowIterator, typename FollowingWindowIterator>
std::unique_ptr<column> rolling_window_udf(column_view const& input,
Expand Down
30 changes: 19 additions & 11 deletions cpp/src/rolling/detail/rolling_fixed_window.cu
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022, NVIDIA CORPORATION.
* Copyright (c) 2022-2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,8 +19,9 @@
#include <cudf/detail/aggregation/aggregation.hpp>
#include <cudf/utilities/default_stream.hpp>

#include <cudf_test/column_utilities.hpp>

#include <thrust/extrema.h>
#include <thrust/iterator/constant_iterator.h>

namespace cudf::detail {

Expand All @@ -43,6 +44,9 @@ std::unique_ptr<column> rolling_window(column_view const& input,
CUDF_EXPECTS((default_outputs.is_empty() || default_outputs.size() == input.size()),
"Defaults column must be either empty or have as many rows as the input column.");

CUDF_EXPECTS(-(preceding_window - 1) <= following_window,
"Preceding window bounds must precede the following window bounds.");

if (agg.kind == aggregation::CUDA || agg.kind == aggregation::PTX) {
// TODO: In future, might need to clamp preceding/following to column boundaries.
return cudf::detail::rolling_window_udf(input,
Expand All @@ -58,18 +62,22 @@ std::unique_ptr<column> rolling_window(column_view const& input,
// Clamp preceding/following to column boundaries.
// E.g. If preceding_window == 2, then for a column of 5 elements, preceding_window will be:
// [1, 2, 2, 2, 1]
auto const preceding_window_begin = cudf::detail::make_counting_transform_iterator(
0,
[preceding_window] __device__(size_type i) { return thrust::min(i + 1, preceding_window); });
auto const following_window_begin = cudf::detail::make_counting_transform_iterator(
0, [col_size = input.size(), following_window] __device__(size_type i) {
return thrust::min(col_size - i - 1, following_window);
});

auto const preceding_calc = [preceding_window] __device__(size_type i) {
return thrust::min(i + 1, preceding_window);
};

auto const following_calc = [col_size = input.size(),
following_window] __device__(size_type i) {
return thrust::min(col_size - i - 1, following_window);
};

auto const preceding_column = expand_to_column(preceding_calc, input.size(), stream);
auto const following_column = expand_to_column(following_calc, input.size(), stream);
return cudf::detail::rolling_window(input,
default_outputs,
preceding_window_begin,
following_window_begin,
preceding_column->view().begin<cudf::size_type>(),
following_column->view().begin<cudf::size_type>(),
min_periods,
agg,
stream,
Expand Down
Loading

0 comments on commit ec744de

Please sign in to comment.