Skip to content

Commit

Permalink
EXP: apacheGH-44084: [C++] Improve merge step in chunked sorting
Browse files Browse the repository at this point in the history
  • Loading branch information
pitrou committed Sep 24, 2024
1 parent 83f35de commit 45566ce
Show file tree
Hide file tree
Showing 8 changed files with 345 additions and 101 deletions.
1 change: 1 addition & 0 deletions cpp/src/arrow/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -731,6 +731,7 @@ set(ARROW_COMPUTE_SRCS
compute/light_array_internal.cc
compute/ordering.cc
compute/registry.cc
compute/kernels/chunked_internal.cc
compute/kernels/codegen_internal.cc
compute/kernels/ree_util_internal.cc
compute/kernels/scalar_cast_boolean.cc
Expand Down
10 changes: 6 additions & 4 deletions cpp/src/arrow/chunk_resolver.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@

namespace arrow::internal {

using ::arrow::util::span;

namespace {
template <typename T>
int64_t GetLength(const T& array) {
Expand All @@ -42,7 +44,7 @@ int64_t GetLength<std::shared_ptr<RecordBatch>>(
}

template <typename T>
inline std::vector<int64_t> MakeChunksOffsets(const std::vector<T>& chunks) {
inline std::vector<int64_t> MakeChunksOffsets(span<T> chunks) {
std::vector<int64_t> offsets(chunks.size() + 1);
int64_t offset = 0;
std::transform(chunks.begin(), chunks.end(), offsets.begin(),
Expand Down Expand Up @@ -112,13 +114,13 @@ void ResolveManyInline(uint32_t num_offsets, const int64_t* signed_offsets,
} // namespace

ChunkResolver::ChunkResolver(const ArrayVector& chunks) noexcept
: offsets_(MakeChunksOffsets(chunks)), cached_chunk_(0) {}
: offsets_(MakeChunksOffsets(span(chunks))), cached_chunk_(0) {}

ChunkResolver::ChunkResolver(const std::vector<const Array*>& chunks) noexcept
ChunkResolver::ChunkResolver(span<const Array* const> chunks) noexcept
: offsets_(MakeChunksOffsets(chunks)), cached_chunk_(0) {}

ChunkResolver::ChunkResolver(const RecordBatchVector& batches) noexcept
: offsets_(MakeChunksOffsets(batches)), cached_chunk_(0) {}
: offsets_(MakeChunksOffsets(span(batches))), cached_chunk_(0) {}

ChunkResolver::ChunkResolver(ChunkResolver&& other) noexcept
: offsets_(std::move(other.offsets_)),
Expand Down
3 changes: 2 additions & 1 deletion cpp/src/arrow/chunk_resolver.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

#include "arrow/type_fwd.h"
#include "arrow/util/macros.h"
#include "arrow/util/span.h"

namespace arrow::internal {

Expand Down Expand Up @@ -76,7 +77,7 @@ struct ARROW_EXPORT ChunkResolver {

public:
explicit ChunkResolver(const ArrayVector& chunks) noexcept;
explicit ChunkResolver(const std::vector<const Array*>& chunks) noexcept;
explicit ChunkResolver(::arrow::util::span<const Array* const> chunks) noexcept;
explicit ChunkResolver(const RecordBatchVector& batches) noexcept;

/// \brief Construct a ChunkResolver from a vector of chunks.size() + 1 offsets.
Expand Down
108 changes: 108 additions & 0 deletions cpp/src/arrow/compute/kernels/chunked_internal.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "arrow/compute/kernels/chunked_internal.h"

#include <algorithm>

#include "arrow/util/logging.h"

namespace arrow::compute::internal {

using ::arrow::internal::TypedChunkLocation;

std::vector<const Array*> GetArrayPointers(const ArrayVector& arrays) {
std::vector<const Array*> pointers(arrays.size());
std::transform(arrays.begin(), arrays.end(), pointers.begin(),
[&](const std::shared_ptr<Array>& array) { return array.get(); });
return pointers;
}

ChunkedIndexMapper::ChunkedIndexMapper(util::span<const Array* const> chunks,
uint64_t* indices_begin, uint64_t* indices_end)
: resolver_(chunks),
chunks_(chunks),
indices_begin_(indices_begin),
indices_end_(indices_end) {}

Result<std::pair<CompressedChunkLocation*, CompressedChunkLocation*>>
ChunkedIndexMapper::LogicalToPhysical() {
// Check that indices would fall in bounds for CompressedChunkLocation
if (ARROW_PREDICT_FALSE(static_cast<int64_t>(chunks_.size()) >
CompressedChunkLocation::kMaxChunkIndex + 1)) {
return Status::NotImplemented("Chunked array has more than ",
CompressedChunkLocation::kMaxChunkIndex + 1, " chunks");
}
for (const Array* chunk : chunks_) {
if (ARROW_PREDICT_FALSE(chunk->length() >
CompressedChunkLocation::kMaxIndexInChunk + 1)) {
return Status::NotImplemented("Individual chunk in chunked array has more than ",
CompressedChunkLocation::kMaxIndexInChunk + 1,
" elements");
}
}

constexpr int64_t kMaxBatchSize = 512;
std::array<TypedChunkLocation<uint64_t>, kMaxBatchSize> batch;

const int64_t num_indices = static_cast<int64_t>(indices_end_ - indices_begin_);
CompressedChunkLocation* physical_begin =
reinterpret_cast<CompressedChunkLocation*>(indices_begin_);
DCHECK_EQ(physical_begin + num_indices,
reinterpret_cast<CompressedChunkLocation*>(indices_end_));

for (int64_t i = 0; i < num_indices; i += kMaxBatchSize) {
const int64_t batch_size = std::min(kMaxBatchSize, num_indices - i);
[[maybe_unused]] bool ok =
resolver_.ResolveMany(batch_size, indices_begin_ + i, batch.data());
DCHECK(ok) << "ResolveMany unexpectedly failed (invalid logical index?)";
for (int64_t j = 0; j < batch_size; ++j) {
const auto loc = batch[j];
physical_begin[i + j] = CompressedChunkLocation{
static_cast<uint32_t>(loc.chunk_index), loc.index_in_chunk};
}
}

return std::pair{physical_begin, physical_begin + num_indices};
}

Status ChunkedIndexMapper::PhysicalToLogical() {
std::vector<int64_t> chunk_offsets(chunks_.size());
{
int64_t offset = 0;
for (int64_t i = 0; i < static_cast<int64_t>(chunks_.size()); ++i) {
chunk_offsets[i] = offset;
offset += chunks_[i]->length();
}
}

const int64_t num_indices = static_cast<int64_t>(indices_end_ - indices_begin_);
CompressedChunkLocation* physical_begin =
reinterpret_cast<CompressedChunkLocation*>(indices_begin_);
for (int64_t i = 0; i < num_indices; ++i) {
const auto loc = physical_begin[i];
DCHECK_LT(loc.chunk_index, chunk_offsets.size());
DCHECK_LT(static_cast<int64_t>(loc.index_in_chunk),
chunks_[loc.chunk_index]->length());
indices_begin_[i] =
chunk_offsets[loc.chunk_index] + static_cast<int64_t>(loc.index_in_chunk);
}

return Status::OK();
}

} // namespace arrow::compute::internal
82 changes: 62 additions & 20 deletions cpp/src/arrow/compute/kernels/chunked_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,26 +20,32 @@
#include <algorithm>
#include <cstdint>
#include <memory>
#include <utility>
#include <vector>

#include "arrow/array.h"
#include "arrow/chunk_resolver.h"
#include "arrow/compute/kernels/codegen_internal.h"
#include "arrow/util/span.h"

namespace arrow {
namespace compute {
namespace internal {
namespace arrow::compute::internal {

// The target chunk in a chunked array.
struct ResolvedChunk {
// The target array in chunked array.
const Array* array;
// The index in the target array.
const int64_t index;
int64_t index;

ResolvedChunk(const Array* array, int64_t index) : array(array), index(index) {}

public:
friend bool operator==(const ResolvedChunk& left, const ResolvedChunk& right) {
return left.array == right.array && left.index == right.index;
}
friend bool operator!=(const ResolvedChunk& left, const ResolvedChunk& right) {
return left.array != right.array || left.index != right.index;
}

bool IsNull() const { return array->IsNull(index); }

template <typename ArrowType, typename ViewType = GetViewType<ArrowType>>
Expand All @@ -50,34 +56,70 @@ struct ResolvedChunk {
}
};

struct CompressedChunkLocation {
static constexpr int kChunkIndexBits = 24;
static constexpr int KIndexInChunkBits = 64 - kChunkIndexBits;

static constexpr int64_t kMaxChunkIndex = (1LL << kChunkIndexBits) - 1;
static constexpr int64_t kMaxIndexInChunk = (1LL << KIndexInChunkBits) - 1;

uint32_t chunk_index : kChunkIndexBits;
uint64_t index_in_chunk : KIndexInChunkBits;
};

static_assert(sizeof(uint64_t) == sizeof(CompressedChunkLocation));

class ChunkedArrayResolver {
private:
::arrow::internal::ChunkResolver resolver_;
std::vector<const Array*> chunks_;
util::span<const Array* const> chunks_;
std::vector<const Array*> owned_chunks_;

public:
explicit ChunkedArrayResolver(const std::vector<const Array*>& chunks)
explicit ChunkedArrayResolver(std::vector<const Array*>&& chunks)
: resolver_(chunks), chunks_(chunks), owned_chunks_(std::move(chunks)) {}
explicit ChunkedArrayResolver(util::span<const Array* const> chunks)
: resolver_(chunks), chunks_(chunks) {}

ChunkedArrayResolver(ChunkedArrayResolver&& other) = default;
ChunkedArrayResolver& operator=(ChunkedArrayResolver&& other) = default;
ARROW_DEFAULT_MOVE_AND_ASSIGN(ChunkedArrayResolver);

ChunkedArrayResolver(const ChunkedArrayResolver& other) = default;
ChunkedArrayResolver& operator=(const ChunkedArrayResolver& other) = default;
ChunkedArrayResolver(const ChunkedArrayResolver& other)
: resolver_(other.resolver_), owned_chunks_(other.owned_chunks_) {
// Rebind span to owned_chunks_ if necessary
chunks_ = owned_chunks_.empty() ? other.chunks_ : owned_chunks_;
}
ChunkedArrayResolver& operator=(const ChunkedArrayResolver& other) {
resolver_ = other.resolver_;
owned_chunks_ = other.owned_chunks_;
chunks_ = owned_chunks_.empty() ? other.chunks_ : owned_chunks_;
return *this;
}

ResolvedChunk Resolve(int64_t index) const {
const auto loc = resolver_.Resolve(index);
return {chunks_[loc.chunk_index], loc.index_in_chunk};
}
};

inline std::vector<const Array*> GetArrayPointers(const ArrayVector& arrays) {
std::vector<const Array*> pointers(arrays.size());
std::transform(arrays.begin(), arrays.end(), pointers.begin(),
[&](const std::shared_ptr<Array>& array) { return array.get(); });
return pointers;
}
std::vector<const Array*> GetArrayPointers(const ArrayVector& arrays);

class ChunkedIndexMapper {
public:
ChunkedIndexMapper(util::span<const Array* const> chunks, uint64_t* indices_begin,
uint64_t* indices_end);
ChunkedIndexMapper(const std::vector<const Array*>& chunks, uint64_t* indices_begin,
uint64_t* indices_end)
: ChunkedIndexMapper(util::span(chunks), indices_begin, indices_end) {}

Result<std::pair<CompressedChunkLocation*, CompressedChunkLocation*>>
LogicalToPhysical();
Status PhysicalToLogical();

private:
::arrow::internal::ChunkResolver resolver_;
util::span<const Array* const> chunks_;
uint64_t* indices_begin_;
uint64_t* indices_end_;
};

} // namespace internal
} // namespace compute
} // namespace arrow
} // namespace arrow::compute::internal
4 changes: 3 additions & 1 deletion cpp/src/arrow/compute/kernels/vector_rank.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

namespace arrow::compute::internal {

using ::arrow::util::span;

namespace {

// ----------------------------------------------------------------------
Expand Down Expand Up @@ -237,7 +239,7 @@ class Ranker<ChunkedArray> : public RankerMixin<ChunkedArray, Ranker<ChunkedArra
physical_chunks_, order_, null_placement_));

const auto arrays = GetArrayPointers(physical_chunks_);
auto value_selector = [resolver = ChunkedArrayResolver(arrays)](int64_t index) {
auto value_selector = [resolver = ChunkedArrayResolver(span(arrays))](int64_t index) {
return resolver.Resolve(index).Value<InType>();
};
ARROW_ASSIGN_OR_RAISE(*output_, CreateRankings(ctx_, sorted, null_placement_,
Expand Down
Loading

0 comments on commit 45566ce

Please sign in to comment.