Skip to content

Commit

Permalink
Remove the need for ChunkResolver
Browse files Browse the repository at this point in the history
  • Loading branch information
pitrou committed Sep 25, 2024
1 parent 132b89a commit f69b3b8
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 16 deletions.
23 changes: 12 additions & 11 deletions cpp/src/arrow/compute/kernels/chunked_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -68,24 +68,25 @@ ChunkedIndexMapper::LogicalToPhysical() {
}
}

constexpr int64_t kMaxBatchSize = 512;
std::array<TypedChunkLocation<uint64_t>, kMaxBatchSize> batch;

const int64_t num_indices = static_cast<int64_t>(indices_end_ - indices_begin_);
ResolvedChunkIndex* physical_begin =
reinterpret_cast<ResolvedChunkIndex*>(indices_begin_);
DCHECK_EQ(physical_begin + num_indices,
reinterpret_cast<ResolvedChunkIndex*>(indices_end_));

for (int64_t i = 0; i < num_indices; i += kMaxBatchSize) {
const int64_t batch_size = std::min(kMaxBatchSize, num_indices - i);
[[maybe_unused]] bool ok =
resolver_.ResolveMany(batch_size, indices_begin_ + i, batch.data());
DCHECK(ok) << "ResolveMany unexpectedly failed (invalid logical index?)";
for (int64_t j = 0; j < batch_size; ++j) {
const auto loc = batch[j];
physical_begin[i + j] = ResolvedChunkIndex{loc.chunk_index, loc.index_in_chunk};
int64_t chunk_offset = 0;
for (int64_t chunk_index = 0; chunk_index < static_cast<int64_t>(chunk_lengths_.size());
++chunk_index) {
const int64_t chunk_length = chunk_lengths_[chunk_index];
for (int64_t i = 0; i < chunk_length; ++i) {
DCHECK_GE(indices_begin_[chunk_offset + i], static_cast<uint64_t>(chunk_offset));
DCHECK_LT(indices_begin_[chunk_offset + i],
static_cast<uint64_t>(chunk_offset + chunk_length));
physical_begin[chunk_offset + i] = ResolvedChunkIndex{
static_cast<uint64_t>(chunk_index),
indices_begin_[chunk_offset + i] - static_cast<uint64_t>(chunk_offset)};
}
chunk_offset += chunk_length;
}

return std::pair{physical_begin, physical_begin + num_indices};
Expand Down
9 changes: 4 additions & 5 deletions cpp/src/arrow/compute/kernels/chunked_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,20 +133,20 @@ class ChunkedIndexMapper {
: ChunkedIndexMapper(util::span(chunks), indices_begin, indices_end) {}
ChunkedIndexMapper(util::span<const Array* const> chunks, uint64_t* indices_begin,
uint64_t* indices_end)
: resolver_(chunks),
chunk_lengths_(GetChunkLengths(chunks)),
: chunk_lengths_(GetChunkLengths(chunks)),
indices_begin_(indices_begin),
indices_end_(indices_end) {}
ChunkedIndexMapper(const RecordBatchVector& chunks, uint64_t* indices_begin,
uint64_t* indices_end)
: resolver_(chunks),
chunk_lengths_(GetChunkLengths(chunks)),
: chunk_lengths_(GetChunkLengths(chunks)),
indices_begin_(indices_begin),
indices_end_(indices_end) {}

// Turn the original uint64_t logical indices into physical. This reuses the
// same memory area, so the logical indices cannot be used anymore until
// PhysicalToLogical() is called.
//
// This assumes that the logical indices are originally chunk-partitioned.
Result<std::pair<ResolvedChunkIndex*, ResolvedChunkIndex*>> LogicalToPhysical();

// Turn the physical indices back into logical, making the uint64_t indices
Expand All @@ -157,7 +157,6 @@ class ChunkedIndexMapper {
static std::vector<int64_t> GetChunkLengths(util::span<const Array* const> chunks);
static std::vector<int64_t> GetChunkLengths(const RecordBatchVector& chunks);

ChunkResolver resolver_;
std::vector<int64_t> chunk_lengths_;
uint64_t* indices_begin_;
uint64_t* indices_end_;
Expand Down

0 comments on commit f69b3b8

Please sign in to comment.