Skip to content

Commit

Permalink
Add std::exclusive_scan (#19)
Browse files Browse the repository at this point in the history
Add `std::exclusive_scan`
  • Loading branch information
alugowski authored Nov 22, 2023
1 parent 45f9c12 commit 42e56e4
Show file tree
Hide file tree
Showing 6 changed files with 159 additions and 6 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ Algorithms are added on an as-needed basis. If you need one [open an issue](http
* [transform](https://en.cppreference.com/w/cpp/algorithm/transform)

### `<numeric>`
* [exclusive_scan](https://en.cppreference.com/w/cpp/algorithm/exclusive_scan) (C++17 only)
* [reduce](https://en.cppreference.com/w/cpp/algorithm/reduce)
* [transform_reduce](https://en.cppreference.com/w/cpp/algorithm/transform_reduce) (C++17 only)

Expand Down Expand Up @@ -195,6 +196,9 @@ sort(std::execution::par)/real_time 121 ms
transform()/real_time 95.0 ms 94.9 ms 7
transform(poolstl::par)/real_time 17.4 ms 0.037 ms 38
transform(std::execution::par)/real_time 15.3 ms 13.2 ms 45
exclusive_scan()/real_time 33.7 ms 33.7 ms 21
exclusive_scan(poolstl::par)/real_time 11.6 ms 0.095 ms 55
exclusive_scan(std::execution::par)/real_time 19.8 ms 15.3 ms 32
reduce()/real_time 15.2 ms 15.2 ms 46
reduce(poolstl::par)/real_time 4.06 ms 0.044 ms 169
reduce(std::execution::par)/real_time 3.38 ms 3.16 ms 214
Expand Down
24 changes: 24 additions & 0 deletions benchmark/numeric_bench.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,30 @@
#include "utils.hpp"


////////////////////////////////

template <class ExecPolicy>
void exclusive_scan(benchmark::State& state) {
auto values = iota_vector(arr_length);
std::vector<int> dest(arr_length);

for ([[maybe_unused]] auto _ : state) {
if constexpr (is_policy<ExecPolicy>::value) {
std::exclusive_scan(policy<ExecPolicy>::get(), values.begin(), values.end(), dest.begin(), 0);
} else {
std::exclusive_scan(values.begin(), values.end(), dest.begin(), 0);
}
benchmark::DoNotOptimize(dest);
benchmark::ClobberMemory();
}
}

BENCHMARK(exclusive_scan<seq>)->Name("exclusive_scan()")->UseRealTime();
BENCHMARK(exclusive_scan<poolstl_par>)->Name("exclusive_scan(poolstl::par)")->UseRealTime();
#ifdef POOLSTL_BENCH_STD_PAR
BENCHMARK(exclusive_scan<std_par>)->Name("exclusive_scan(std::execution::par)")->UseRealTime();
#endif

////////////////////////////////

template <class ExecPolicy>
Expand Down
33 changes: 27 additions & 6 deletions include/poolstl/internal/ttp_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,24 @@
namespace poolstl {
namespace internal {

#if POOLSTL_HAVE_CXX17_LIB
/**
* Call std::apply in parallel.
*/
template <class ExecPolicy, class Op, class ArgContainer>
std::vector<std::future<void>>
parallel_apply(ExecPolicy &&policy, Op op, const ArgContainer& args_list) {
std::vector<std::future<void>> futures;
auto& task_pool = policy.pool();

for (const auto& args : args_list) {
futures.emplace_back(task_pool.submit([op](const auto& args_fwd) { std::apply(op, args_fwd); }, args));
}

return futures;
}
#endif

/**
* Chunk a single range.
*/
Expand All @@ -26,13 +44,14 @@ namespace poolstl {
std::vector<std::future<
decltype(std::declval<Chunk>()(std::declval<RandIt>(), std::declval<RandIt>()))
>> futures;
auto chunk_size = get_chunk_size(first, last, extra_split_factor * policy.pool().get_num_threads());
auto& task_pool = policy.pool();
auto chunk_size = get_chunk_size(first, last, extra_split_factor * task_pool.get_num_threads());

while (first < last) {
auto iter_chunk_size = get_iter_chunk_size(first, last, chunk_size);
RandIt loop_end = advanced(first, iter_chunk_size);

futures.emplace_back(policy.pool().submit(chunk, first, loop_end));
futures.emplace_back(task_pool.submit(chunk, first, loop_end));

first = loop_end;
}
Expand All @@ -54,13 +73,14 @@ namespace poolstl {
std::declval<RandIt1>(),
std::declval<RandIt2>()))
>> futures;
auto chunk_size = get_chunk_size(first1, last1, policy.pool().get_num_threads());
auto& task_pool = policy.pool();
auto chunk_size = get_chunk_size(first1, last1, task_pool.get_num_threads());

while (first1 < last1) {
auto iter_chunk_size = get_iter_chunk_size(first1, last1, chunk_size);
RandIt1 loop_end = advanced(first1, iter_chunk_size);

futures.emplace_back(policy.pool().submit(chunk, first1, loop_end, first2));
futures.emplace_back(task_pool.submit(chunk, first1, loop_end, first2));

first1 = loop_end;
std::advance(first2, iter_chunk_size);
Expand All @@ -86,13 +106,14 @@ namespace poolstl {
std::declval<RandIt2>(),
std::declval<RandIt3>()))
>> futures;
auto chunk_size = get_chunk_size(first1, last1, policy.pool().get_num_threads());
auto& task_pool = policy.pool();
auto chunk_size = get_chunk_size(first1, last1, task_pool.get_num_threads());

while (first1 < last1) {
auto iter_chunk_size = get_iter_chunk_size(first1, last1, chunk_size);
RandIt1 loop_end = advanced(first1, iter_chunk_size);

futures.emplace_back(policy.pool().submit(chunk, first1, loop_end, first2, first3));
futures.emplace_back(task_pool.submit(chunk, first1, loop_end, first2, first3));

first1 = loop_end;
std::advance(first2, iter_chunk_size);
Expand Down
62 changes: 62 additions & 0 deletions include/poolstl/numeric
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,74 @@
#define POOLSTL_NUMERIC_HPP

#include <functional>
#include <tuple>

#include "execution"
#include "internal/ttp_impl.hpp"

namespace std {

#if POOLSTL_HAVE_CXX17_LIB
/**
* NOTE: Iterators are expected to be random access.
* See std::exclusive_scan https://en.cppreference.com/w/cpp/algorithm/exclusive_scan
*/
template <class ExecPolicy, class RandIt1, class RandIt2, class T, class BinaryOp>
poolstl::internal::enable_if_par<ExecPolicy, RandIt2>
exclusive_scan(ExecPolicy &&policy, RandIt1 first, RandIt1 last, RandIt2 dest, T init, BinaryOp binop) {
if (first == last) {
return dest;
}

// Pass 1: Chunk the input and find the sum of each chunk
auto futures = poolstl::internal::parallel_chunk_for(std::forward<ExecPolicy>(policy), first, last,
[binop](RandIt1 chunk_first, RandIt1 chunk_last) {
auto sum = std::accumulate(chunk_first, chunk_last, T{}, binop);
return std::make_tuple(std::make_pair(chunk_first, chunk_last), sum);
});

std::vector<std::pair<RandIt1, RandIt1>> ranges;
std::vector<T> sums;

for (auto& future : futures) {
auto res = future.get();
ranges.push_back(std::get<0>(res));
sums.push_back(std::get<1>(res));
}

// find initial values for each range
std::exclusive_scan(sums.begin(), sums.end(), sums.begin(), init, binop);

// Pass 2: perform exclusive scan of each chunk, using the sum of previous chunks as init
std::vector<std::tuple<RandIt1, RandIt1, RandIt2, T>> args;
for (std::size_t i = 0; i < sums.size(); ++i) {
auto chunk_first = std::get<0>(ranges[i]);
args.emplace_back(std::make_tuple(
chunk_first, std::get<1>(ranges[i]),
dest + (chunk_first - first),
sums[i]));
}

auto futures2 = poolstl::internal::parallel_apply(std::forward<ExecPolicy>(policy),
[binop](RandIt1 chunk_first, RandIt1 chunk_last, RandIt2 chunk_dest, T chunk_init){
std::exclusive_scan(chunk_first, chunk_last, chunk_dest, chunk_init, binop);
}, args);

poolstl::internal::get_futures(futures2);
return dest + (last - first);
}

/**
* NOTE: Iterators are expected to be random access.
* See std::exclusive_scan https://en.cppreference.com/w/cpp/algorithm/exclusive_scan
*/
template <class ExecPolicy, class RandIt1, class RandIt2, class T>
poolstl::internal::enable_if_par<ExecPolicy, RandIt2>
exclusive_scan(ExecPolicy &&policy, RandIt1 first, RandIt1 last, RandIt2 dest, T init) {
return std::exclusive_scan(std::forward<ExecPolicy>(policy), first, last, dest, init, std::plus<T>());
}
#endif

/**
* NOTE: Iterators are expected to be random access.
* See std::reduce https://en.cppreference.com/w/cpp/algorithm/reduce
Expand Down
1 change: 1 addition & 0 deletions include/poolstl/seq_fwd.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ namespace std {
POOLSTL_DEFINE_BOTH_SEQ_FWD_AND_PAR_IF(transform)

#if POOLSTL_HAVE_CXX17_LIB
POOLSTL_DEFINE_BOTH_SEQ_FWD_AND_PAR_IF(exclusive_scan)
POOLSTL_DEFINE_BOTH_SEQ_FWD_AND_PAR_IF(reduce)
POOLSTL_DEFINE_BOTH_SEQ_FWD_AND_PAR_IF(transform_reduce)
#endif
Expand Down
41 changes: 41 additions & 0 deletions tests/poolstl_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,47 @@ TEST_CASE("transform_2", "[alg][algorithm]") {
}
}

#if POOLSTL_HAVE_CXX17_LIB
TEST_CASE("exclusive_scan", "[alg][algorithm]") {
for (auto num_threads : test_thread_counts) {
ttp::task_thread_pool pool(num_threads);

for (auto num_iters : test_arr_sizes) {
for (int init : {0, 10}) {
auto v = iota_vector(num_iters);
std::vector<int> dest1(v.size());
std::vector<int> dest2(v.size());

auto seq_res = std::exclusive_scan(poolstl::par_if(false), v.cbegin(), v.cend(), dest1.begin(), init);
auto par_res = std::exclusive_scan(poolstl::par.on(pool), v.cbegin(), v.cend(), dest2.begin(), init);
// test return value
REQUIRE((par_res - dest2.begin()) == (seq_res - dest1.begin()));
REQUIRE(dest1 == dest2);

// test in-place
std::exclusive_scan(poolstl::par.on(pool), v.begin(), v.end(), v.begin(), init);
REQUIRE(v == dest2);

// test commutativity
{
std::vector<std::string> sv;
sv.reserve(v.size());
for (auto val : v) {
sv.emplace_back(std::to_string(val));
}
std::vector<std::string> sdest1(sv.size());
std::vector<std::string> sdest2(sv.size());

std::exclusive_scan(poolstl::par_if(false), sv.cbegin(), sv.cend(), sdest1.begin(), std::to_string(init));
std::exclusive_scan(poolstl::par.on(pool), sv.cbegin(), sv.cend(), sdest2.begin(), std::to_string(init));
REQUIRE(sdest1 == sdest2);
}
}
}
}
}
#endif

TEST_CASE("reduce", "[alg][numeric]") {
for (auto num_threads : test_thread_counts) {
ttp::task_thread_pool pool(num_threads);
Expand Down

0 comments on commit 42e56e4

Please sign in to comment.