Skip to content

Commit

Permalink
Split hash-based groupby into multiple smaller files to reduce build …
Browse files Browse the repository at this point in the history
…time (#17089)

This work is part of splitting the original bulk shared memory groupby PR #16619.

This PR splits the hash-based groupby file into multiple translation units and uses explicit template instantiations to help reduce build time. It also includes some minor cleanups without significant functional changes.

Authors:
  - Yunsong Wang (https://github.com/PointKernel)

Approvers:
  - Kyle Edwards (https://github.com/KyleFromNVIDIA)
  - Nghia Truong (https://github.com/ttnghia)
  - David Wendt (https://github.com/davidwendt)
  - Muhammad Haseeb (https://github.com/mhaseeb123)

URL: #17089
  • Loading branch information
PointKernel authored Oct 19, 2024
1 parent 1ce2526 commit 074ab74
Show file tree
Hide file tree
Showing 14 changed files with 1,012 additions and 446 deletions.
5 changes: 5 additions & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -368,8 +368,13 @@ add_library(
src/filling/repeat.cu
src/filling/sequence.cu
src/groupby/groupby.cu
src/groupby/hash/compute_groupby.cu
src/groupby/hash/compute_single_pass_aggs.cu
src/groupby/hash/create_sparse_results_table.cu
src/groupby/hash/flatten_single_pass_aggs.cpp
src/groupby/hash/groupby.cu
src/groupby/hash/hash_compound_agg_finalizer.cu
src/groupby/hash/sparse_to_dense_results.cu
src/groupby/sort/aggregate.cpp
src/groupby/sort/group_argmax.cu
src/groupby/sort/group_argmin.cu
Expand Down
142 changes: 142 additions & 0 deletions cpp/src/groupby/hash/compute_groupby.cu
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "compute_groupby.hpp"
#include "compute_single_pass_aggs.hpp"
#include "helpers.cuh"
#include "sparse_to_dense_results.hpp"

#include <cudf/detail/aggregation/aggregation.cuh>
#include <cudf/detail/aggregation/result_cache.hpp>
#include <cudf/detail/cuco_helpers.hpp>
#include <cudf/detail/gather.hpp>
#include <cudf/groupby.hpp>
#include <cudf/null_mask.hpp>
#include <cudf/types.hpp>
#include <cudf/utilities/span.hpp>

#include <rmm/cuda_stream_view.hpp>
#include <rmm/device_uvector.hpp>
#include <rmm/mr/device/device_memory_resource.hpp>

#include <cuco/static_set.cuh>

#include <iterator>
#include <memory>

namespace cudf::groupby::detail::hash {
template <typename SetType>
rmm::device_uvector<size_type> extract_populated_keys(SetType const& key_set,
size_type num_keys,
rmm::cuda_stream_view stream)
{
rmm::device_uvector<size_type> populated_keys(num_keys, stream);
auto const keys_end = key_set.retrieve_all(populated_keys.begin(), stream.value());

populated_keys.resize(std::distance(populated_keys.begin(), keys_end), stream);
return populated_keys;
}

template <typename Equal, typename Hash>
std::unique_ptr<table> compute_groupby(table_view const& keys,
host_span<aggregation_request const> requests,
bool skip_rows_with_nulls,
Equal const& d_row_equal,
Hash const& d_row_hash,
cudf::detail::result_cache* cache,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
// convert to int64_t to avoid potential overflow with large `keys`
auto const num_keys = static_cast<int64_t>(keys.num_rows());

// Cache of sparse results where the location of aggregate value in each
// column is indexed by the hash set
cudf::detail::result_cache sparse_results(requests.size());

auto const set = cuco::static_set{
num_keys,
cudf::detail::CUCO_DESIRED_LOAD_FACTOR, // 50% load factor
cuco::empty_key{cudf::detail::CUDF_SIZE_TYPE_SENTINEL},
d_row_equal,
probing_scheme_t{d_row_hash},
cuco::thread_scope_device,
cuco::storage<GROUPBY_WINDOW_SIZE>{},
cudf::detail::cuco_allocator<char>{rmm::mr::polymorphic_allocator<char>{}, stream},
stream.value()};

auto row_bitmask =
skip_rows_with_nulls
? cudf::bitmask_and(keys, stream, cudf::get_current_device_resource_ref()).first
: rmm::device_buffer{};

// Compute all single pass aggs first
compute_single_pass_aggs(num_keys,
skip_rows_with_nulls,
static_cast<bitmask_type*>(row_bitmask.data()),
set.ref(cuco::insert_and_find),
requests,
&sparse_results,
stream);

// Extract the populated indices from the hash set and create a gather map.
// Gathering using this map from sparse results will give dense results.
auto gather_map = extract_populated_keys(set, keys.num_rows(), stream);

// Compact all results from sparse_results and insert into cache
sparse_to_dense_results(requests,
&sparse_results,
cache,
gather_map,
set.ref(cuco::find),
static_cast<bitmask_type*>(row_bitmask.data()),
stream,
mr);

return cudf::detail::gather(keys,
gather_map,
out_of_bounds_policy::DONT_CHECK,
cudf::detail::negative_index_policy::NOT_ALLOWED,
stream,
mr);
}

template rmm::device_uvector<size_type> extract_populated_keys<global_set_t>(
global_set_t const& key_set, size_type num_keys, rmm::cuda_stream_view stream);

template rmm::device_uvector<size_type> extract_populated_keys<nullable_global_set_t>(
nullable_global_set_t const& key_set, size_type num_keys, rmm::cuda_stream_view stream);

template std::unique_ptr<table> compute_groupby<row_comparator_t, row_hash_t>(
table_view const& keys,
host_span<aggregation_request const> requests,
bool skip_rows_with_nulls,
row_comparator_t const& d_row_equal,
row_hash_t const& d_row_hash,
cudf::detail::result_cache* cache,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr);

template std::unique_ptr<table> compute_groupby<nullable_row_comparator_t, row_hash_t>(
table_view const& keys,
host_span<aggregation_request const> requests,
bool skip_rows_with_nulls,
nullable_row_comparator_t const& d_row_equal,
row_hash_t const& d_row_hash,
cudf::detail::result_cache* cache,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr);
} // namespace cudf::groupby::detail::hash
95 changes: 95 additions & 0 deletions cpp/src/groupby/hash/compute_groupby.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once

#include <cudf/detail/aggregation/result_cache.hpp>
#include <cudf/groupby.hpp>
#include <cudf/table/table_view.hpp>
#include <cudf/types.hpp>
#include <cudf/utilities/span.hpp>

#include <rmm/cuda_stream_view.hpp>
#include <rmm/device_uvector.hpp>
#include <rmm/mr/device/device_memory_resource.hpp>

#include <memory>

namespace cudf::groupby::detail::hash {
/**
* @brief Computes and returns a device vector containing all populated keys in
* `key_set`.
*
* @tparam SetType Type of key hash set
*
* @param key_set Key hash set
* @param num_keys Number of input keys
* @param stream CUDA stream used for device memory operations and kernel launches
* @return An array of unique keys contained in `key_set`
*/
template <typename SetType>
rmm::device_uvector<size_type> extract_populated_keys(SetType const& key_set,
size_type num_keys,
rmm::cuda_stream_view stream);

/**
* @brief Computes groupby using hash table.
*
* First, we create a hash table that stores the indices of unique rows in
* `keys`. The upper limit on the number of values in this map is the number
* of rows in `keys`.
*
* To store the results of aggregations, we create temporary sparse columns
* which have the same size as input value columns. Using the hash map, we
* determine the location within the sparse column to write the result of the
* aggregation into.
*
* The sparse column results of all aggregations are stored into the cache
* `sparse_results`. This enables the use of previously calculated results in
* other aggregations.
*
* All the aggregations which can be computed in a single pass are computed
* first, in a combined kernel. Then using these results, aggregations that
* require multiple passes, will be computed.
*
* Finally, using the hash map, we generate a vector of indices of populated
* values in sparse result columns. Then, for each aggregation originally
* requested in `requests`, we gather sparse results into a column of dense
* results using the aforementioned index vector. Dense results are stored into
* the in/out parameter `cache`.
*
* @tparam Equal Device row comparator type
* @tparam Hash Device row hasher type
*
* @param keys Table whose rows act as the groupby keys
* @param requests The set of columns to aggregate and the aggregations to perform
* @param skip_rows_with_nulls Flag indicating whether to ignore nulls or not
* @param d_row_equal Device row comparator
* @param d_row_hash Device row hasher
* @param cache Dense aggregation results
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource used to allocate the returned table
* @return Table of unique keys
*/
template <typename Equal, typename Hash>
std::unique_ptr<cudf::table> compute_groupby(table_view const& keys,
host_span<aggregation_request const> requests,
bool skip_rows_with_nulls,
Equal const& d_row_equal,
Hash const& d_row_hash,
cudf::detail::result_cache* cache,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr);
} // namespace cudf::groupby::detail::hash
99 changes: 99 additions & 0 deletions cpp/src/groupby/hash/compute_single_pass_aggs.cu
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "compute_single_pass_aggs.hpp"
#include "create_sparse_results_table.hpp"
#include "flatten_single_pass_aggs.hpp"
#include "helpers.cuh"
#include "single_pass_functors.cuh"
#include "var_hash_functor.cuh"

#include <cudf/column/column_factories.hpp>
#include <cudf/detail/aggregation/aggregation.cuh>
#include <cudf/detail/aggregation/result_cache.hpp>
#include <cudf/detail/gather.hpp>
#include <cudf/detail/utilities/vector_factories.hpp>
#include <cudf/groupby.hpp>
#include <cudf/null_mask.hpp>
#include <cudf/table/table_view.hpp>
#include <cudf/types.hpp>
#include <cudf/utilities/span.hpp>

#include <rmm/cuda_stream_view.hpp>
#include <rmm/exec_policy.hpp>

#include <algorithm>
#include <memory>
#include <vector>

namespace cudf::groupby::detail::hash {
/**
* @brief Computes all aggregations from `requests` that require a single pass
* over the data and stores the results in `sparse_results`
*/
template <typename SetType>
void compute_single_pass_aggs(int64_t num_keys,
bool skip_rows_with_nulls,
bitmask_type const* row_bitmask,
SetType set,
host_span<aggregation_request const> requests,
cudf::detail::result_cache* sparse_results,
rmm::cuda_stream_view stream)
{
// flatten the aggs to a table that can be operated on by aggregate_row
auto const [flattened_values, agg_kinds, aggs] = flatten_single_pass_aggs(requests);

// make table that will hold sparse results
table sparse_table = create_sparse_results_table(flattened_values, agg_kinds, stream);
// prepare to launch kernel to do the actual aggregation
auto d_sparse_table = mutable_table_device_view::create(sparse_table, stream);
auto d_values = table_device_view::create(flattened_values, stream);
auto const d_aggs = cudf::detail::make_device_uvector_async(
agg_kinds, stream, cudf::get_current_device_resource_ref());

thrust::for_each_n(
rmm::exec_policy_nosync(stream),
thrust::make_counting_iterator(0),
num_keys,
hash::compute_single_pass_aggs_fn{
set, *d_values, *d_sparse_table, d_aggs.data(), row_bitmask, skip_rows_with_nulls});
// Add results back to sparse_results cache
auto sparse_result_cols = sparse_table.release();
for (size_t i = 0; i < aggs.size(); i++) {
// Note that the cache will make a copy of this temporary aggregation
sparse_results->add_result(
flattened_values.column(i), *aggs[i], std::move(sparse_result_cols[i]));
}
}

template void compute_single_pass_aggs<hash_set_ref_t<cuco::insert_and_find_tag>>(
int64_t num_keys,
bool skip_rows_with_nulls,
bitmask_type const* row_bitmask,
hash_set_ref_t<cuco::insert_and_find_tag> set,
host_span<aggregation_request const> requests,
cudf::detail::result_cache* sparse_results,
rmm::cuda_stream_view stream);

template void compute_single_pass_aggs<nullable_hash_set_ref_t<cuco::insert_and_find_tag>>(
int64_t num_keys,
bool skip_rows_with_nulls,
bitmask_type const* row_bitmask,
nullable_hash_set_ref_t<cuco::insert_and_find_tag> set,
host_span<aggregation_request const> requests,
cudf::detail::result_cache* sparse_results,
rmm::cuda_stream_view stream);
} // namespace cudf::groupby::detail::hash
38 changes: 38 additions & 0 deletions cpp/src/groupby/hash/compute_single_pass_aggs.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once

#include <cudf/detail/aggregation/result_cache.hpp>
#include <cudf/groupby.hpp>
#include <cudf/types.hpp>
#include <cudf/utilities/span.hpp>

#include <rmm/cuda_stream_view.hpp>

namespace cudf::groupby::detail::hash {
/**
* @brief Computes all aggregations from `requests` that require a single pass
* over the data and stores the results in `sparse_results`
*/
template <typename SetType>
void compute_single_pass_aggs(int64_t num_keys,
bool skip_rows_with_nulls,
bitmask_type const* row_bitmask,
SetType set,
cudf::host_span<cudf::groupby::aggregation_request const> requests,
cudf::detail::result_cache* sparse_results,
rmm::cuda_stream_view stream);
} // namespace cudf::groupby::detail::hash
Loading

0 comments on commit 074ab74

Please sign in to comment.