Skip to content

Commit

Permalink
[BugFix] fix wrong slot index when handling iceberg delete columns (#…
Browse files Browse the repository at this point in the history
…42087)

Signed-off-by: yanz <[email protected]>
  • Loading branch information
dirtysalt authored Mar 6, 2024
1 parent cd47136 commit e0242b2
Show file tree
Hide file tree
Showing 6 changed files with 10 additions and 20 deletions.
2 changes: 1 addition & 1 deletion be/src/exec/hdfs_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ struct HdfsScannerParams {
// The file last modification time
int64_t modification_time = 0;

TupleDescriptor* tuple_desc = nullptr;
const TupleDescriptor* tuple_desc = nullptr;

// columns read from file
std::vector<SlotDescriptor*> materialize_slots;
Expand Down
4 changes: 1 addition & 3 deletions be/src/formats/parquet/group_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -380,11 +380,9 @@ void GroupReader::collect_io_ranges(std::vector<io::SharedBufferedInputStream::I
}

void GroupReader::_init_read_chunk() {
const auto& slots = _param.tuple_desc->slots();
std::vector<SlotDescriptor*> read_slots;
for (const auto& column : _param.read_cols) {
int chunk_index = column.idx_in_chunk;
read_slots.emplace_back(slots[chunk_index]);
read_slots.emplace_back(column.slot_desc);
}
size_t chunk_size = _param.chunk_size;
_read_chunk = ChunkHelper::new_chunk(read_slots, chunk_size);
Expand Down
5 changes: 1 addition & 4 deletions be/src/formats/parquet/group_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,7 @@ struct GroupReaderParam {
// column type in parquet file
tparquet::Type::type type_in_parquet;

// column index in chunk
int32_t idx_in_chunk;

const SlotDescriptor* slot_desc = nullptr;
SlotDescriptor* slot_desc = nullptr;

const TIcebergSchemaField* t_iceberg_schema_field = nullptr;

Expand Down
10 changes: 4 additions & 6 deletions be/src/formats/parquet/meta_helper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,8 @@ void ParquetMetaHelper::prepare_read_columns(const std::vector<HdfsScannerContex
if (field_idx < 0) continue;

auto parquet_type = _file_metadata->schema().get_stored_column_by_field_idx(field_idx)->physical_type;
GroupReaderParam::Column column =
_build_column(field_idx, materialized_column.idx_in_chunk, parquet_type, materialized_column.slot_desc,
materialized_column.decode_needed);
GroupReaderParam::Column column = _build_column(field_idx, parquet_type, materialized_column.slot_desc,
materialized_column.decode_needed);
read_cols.emplace_back(column);
}
}
Expand Down Expand Up @@ -119,9 +118,8 @@ void IcebergMetaHelper::prepare_read_columns(const std::vector<HdfsScannerContex

auto parquet_type = _file_metadata->schema().get_stored_column_by_field_id(field_id)->physical_type;

GroupReaderParam::Column column =
_build_column(field_idx, materialized_column.idx_in_chunk, parquet_type, materialized_column.slot_desc,
materialized_column.decode_needed, iceberg_it->second);
GroupReaderParam::Column column = _build_column(field_idx, parquet_type, materialized_column.slot_desc,
materialized_column.decode_needed, iceberg_it->second);
read_cols.emplace_back(column);
}
}
Expand Down
6 changes: 2 additions & 4 deletions be/src/formats/parquet/meta_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,12 @@ class MetaHelper {
}

protected:
GroupReaderParam::Column _build_column(int32_t idx_in_parquet, int32_t idx_in_chunk,
const tparquet::Type::type& type_in_parquet, const SlotDescriptor* slot_desc,
bool decode_needed,
GroupReaderParam::Column _build_column(int32_t idx_in_parquet, const tparquet::Type::type& type_in_parquet,
SlotDescriptor* slot_desc, bool decode_needed,
const TIcebergSchemaField* t_iceberg_schema_field = nullptr) const {
GroupReaderParam::Column column{};
column.idx_in_parquet = idx_in_parquet;
column.type_in_parquet = type_in_parquet;
column.idx_in_chunk = idx_in_chunk;
column.slot_desc = slot_desc;
column.t_iceberg_schema_field = t_iceberg_schema_field;
column.decode_needed = decode_needed;
Expand Down
3 changes: 1 addition & 2 deletions be/test/formats/parquet/group_reader_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ ChunkPtr GroupReaderTest::_create_chunk(GroupReaderParam* param) {
ChunkPtr chunk = std::make_shared<Chunk>();
for (auto& column : param->read_cols) {
auto c = ColumnHelper::create_column(column.slot_type(), true);
chunk->append_column(c, column.idx_in_chunk);
chunk->append_column(c, column.slot_id());
}
return chunk;
}
Expand Down Expand Up @@ -321,7 +321,6 @@ static GroupReaderParam::Column _create_group_reader_param_of_column(int idx, tp
new SlotDescriptor(idx, fmt::format("col{}", idx), TypeDescriptor::from_logical_type(prim_type));
GroupReaderParam::Column c;
c.idx_in_parquet = idx;
c.idx_in_chunk = idx;
c.type_in_parquet = par_type;
c.slot_desc = slot;
return c;
Expand Down

0 comments on commit e0242b2

Please sign in to comment.