Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parquet: Clean up Parquet generic and internal readers #12102

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,6 @@ public ParquetValueReader<RowData> struct(
expected != null ? expected.fields() : ImmutableList.of();
List<ParquetValueReader<?>> reorderedFields =
Lists.newArrayListWithExpectedSize(expectedFields.size());
List<Type> types = Lists.newArrayListWithExpectedSize(expectedFields.size());
// Defaulting to parent max definition level
int defaultMaxDefinitionLevel = type.getMaxDefinitionLevel(currentPath());
for (Types.NestedField field : expectedFields) {
Expand All @@ -128,32 +127,26 @@ public ParquetValueReader<RowData> struct(
maxDefinitionLevelsById.getOrDefault(id, defaultMaxDefinitionLevel);
reorderedFields.add(
ParquetValueReaders.constant(idToConstant.get(id), fieldMaxDefinitionLevel));
types.add(null);
} else if (id == MetadataColumns.ROW_POSITION.fieldId()) {
reorderedFields.add(ParquetValueReaders.position());
types.add(null);
} else if (id == MetadataColumns.IS_DELETED.fieldId()) {
reorderedFields.add(ParquetValueReaders.constant(false));
types.add(null);
} else if (reader != null) {
reorderedFields.add(reader);
types.add(typesById.get(id));
} else if (field.initialDefault() != null) {
reorderedFields.add(
ParquetValueReaders.constant(
RowDataUtil.convertConstant(field.type(), field.initialDefault()),
maxDefinitionLevelsById.getOrDefault(id, defaultMaxDefinitionLevel)));
types.add(typesById.get(id));
} else if (field.isOptional()) {
reorderedFields.add(ParquetValueReaders.nulls());
types.add(null);
} else {
throw new IllegalArgumentException(
String.format("Missing required field: %s", field.name()));
}
}

return new RowDataReader(types, reorderedFields);
return new RowDataReader(reorderedFields);
}

@Override
Expand Down Expand Up @@ -662,8 +655,8 @@ private static class RowDataReader
extends ParquetValueReaders.StructReader<RowData, GenericRowData> {
private final int numFields;

RowDataReader(List<Type> types, List<ParquetValueReader<?>> readers) {
super(types, readers);
RowDataReader(List<ParquetValueReader<?>> readers) {
super(readers);
this.numFields = readers.size();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,25 @@
import org.apache.iceberg.parquet.ParquetValueReader;
import org.apache.iceberg.parquet.ParquetValueReaders;
import org.apache.iceberg.parquet.TypeWithSchemaVisitor;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Type.TypeID;
import org.apache.iceberg.types.Types;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.LogicalTypeAnnotation.DateLogicalTypeAnnotation;
import org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation;
import org.apache.parquet.schema.LogicalTypeAnnotation.EnumLogicalTypeAnnotation;
import org.apache.parquet.schema.LogicalTypeAnnotation.IntLogicalTypeAnnotation;
import org.apache.parquet.schema.LogicalTypeAnnotation.JsonLogicalTypeAnnotation;
import org.apache.parquet.schema.LogicalTypeAnnotation.LogicalTypeAnnotationVisitor;
import org.apache.parquet.schema.LogicalTypeAnnotation.StringLogicalTypeAnnotation;
import org.apache.parquet.schema.LogicalTypeAnnotation.TimeLogicalTypeAnnotation;
import org.apache.parquet.schema.LogicalTypeAnnotation.TimestampLogicalTypeAnnotation;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;
Expand Down Expand Up @@ -67,8 +77,18 @@ protected ParquetValueReader<T> createReader(
}
}

/**
* @deprecated will be removed in 1.9.0; use {@link #createStructReader(List, Types.StructType)}
* instead.
*/
@Deprecated
protected ParquetValueReader<T> createStructReader(
List<Type> types, List<ParquetValueReader<?>> fieldReaders, Types.StructType structType) {
return createStructReader(fieldReaders, structType);
}

protected abstract ParquetValueReader<T> createStructReader(
List<Type> types, List<ParquetValueReader<?>> fieldReaders, Types.StructType structType);
List<ParquetValueReader<?>> fieldReaders, Types.StructType structType);
Comment on lines 90 to +91
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RevAPI is failing due to the new abstract method that implementations will need to implement (which I understand the rationale, the previous types argument were not used). I think we'll need to add the breaking change to revAPI

./gradlew :iceberg-parquet:revapiAcceptBreak --justification "Implementations of ParquetValueReader.createStructReader should not have to pass in explicit types" \
          --code "java.method.abstractMethodAdded" \
          --new "method org.apache.iceberg.parquet.ParquetValueReader<T> org.apache.iceberg.data.parquet.BaseParquetReaders<T>::createStructReader(java.util.List<org.apache.iceberg.parquet.ParquetValueReader<?>>, org.apache.iceberg.types.Types.StructType)"


protected ParquetValueReader<?> fixedReader(ColumnDescriptor desc) {
return new GenericParquetReaders.FixedReader(desc);
Expand All @@ -78,8 +98,12 @@ protected ParquetValueReader<?> dateReader(ColumnDescriptor desc) {
return new GenericParquetReaders.DateReader(desc);
}

protected ParquetValueReader<?> timeReader(
ColumnDescriptor desc, LogicalTypeAnnotation.TimeUnit unit) {
protected ParquetValueReader<?> timeReader(ColumnDescriptor desc) {
LogicalTypeAnnotation time = desc.getPrimitiveType().getLogicalTypeAnnotation();
Preconditions.checkArgument(
time instanceof TimeLogicalTypeAnnotation, "Invalid time logical type: " + time);

LogicalTypeAnnotation.TimeUnit unit = ((TimeLogicalTypeAnnotation) time).getUnit();
switch (unit) {
case MICROS:
return new GenericParquetReaders.TimeReader(desc);
Expand All @@ -90,12 +114,17 @@ protected ParquetValueReader<?> timeReader(
}
}

protected ParquetValueReader<?> timestampReader(
ColumnDescriptor desc, LogicalTypeAnnotation.TimeUnit unit, boolean isAdjustedToUTC) {
protected ParquetValueReader<?> timestampReader(ColumnDescriptor desc, boolean isAdjustedToUTC) {
if (desc.getPrimitiveType().getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.INT96) {
return new GenericParquetReaders.TimestampInt96Reader(desc);
}

LogicalTypeAnnotation timestamp = desc.getPrimitiveType().getLogicalTypeAnnotation();
Preconditions.checkArgument(
timestamp instanceof TimestampLogicalTypeAnnotation,
"Invalid timestamp logical type: " + timestamp);

LogicalTypeAnnotation.TimeUnit unit = ((TimestampLogicalTypeAnnotation) timestamp).getUnit();
switch (unit) {
case MICROS:
return isAdjustedToUTC
Expand Down Expand Up @@ -148,96 +177,81 @@ public ParquetValueReader<?> struct(
}
}

private class LogicalTypeAnnotationParquetValueReaderVisitor
implements LogicalTypeAnnotation.LogicalTypeAnnotationVisitor<ParquetValueReader<?>> {
private class LogicalTypeReadBuilder
implements LogicalTypeAnnotationVisitor<ParquetValueReader<?>> {

private final ColumnDescriptor desc;
private final org.apache.iceberg.types.Type.PrimitiveType expected;
private final PrimitiveType primitive;

LogicalTypeAnnotationParquetValueReaderVisitor(
ColumnDescriptor desc,
org.apache.iceberg.types.Type.PrimitiveType expected,
PrimitiveType primitive) {
LogicalTypeReadBuilder(
ColumnDescriptor desc, org.apache.iceberg.types.Type.PrimitiveType expected) {
this.desc = desc;
this.expected = expected;
this.primitive = primitive;
}

@Override
public Optional<ParquetValueReader<?>> visit(
LogicalTypeAnnotation.StringLogicalTypeAnnotation stringLogicalType) {
return Optional.of(new ParquetValueReaders.StringReader(desc));
public Optional<ParquetValueReader<?>> visit(StringLogicalTypeAnnotation stringLogicalType) {
return Optional.of(ParquetValueReaders.strings(desc));
}

@Override
public Optional<ParquetValueReader<?>> visit(
LogicalTypeAnnotation.EnumLogicalTypeAnnotation enumLogicalType) {
return Optional.of(new ParquetValueReaders.StringReader(desc));
public Optional<ParquetValueReader<?>> visit(EnumLogicalTypeAnnotation enumLogicalType) {
return Optional.of(ParquetValueReaders.strings(desc));
}

@Override
public Optional<ParquetValueReader<?>> visit(DecimalLogicalTypeAnnotation decimalLogicalType) {
switch (primitive.getPrimitiveTypeName()) {
case BINARY:
case FIXED_LEN_BYTE_ARRAY:
return Optional.of(
new ParquetValueReaders.BinaryAsDecimalReader(desc, decimalLogicalType.getScale()));
case INT64:
return Optional.of(
new ParquetValueReaders.LongAsDecimalReader(desc, decimalLogicalType.getScale()));
case INT32:
return Optional.of(
new ParquetValueReaders.IntegerAsDecimalReader(desc, decimalLogicalType.getScale()));
default:
throw new UnsupportedOperationException(
"Unsupported base type for decimal: " + primitive.getPrimitiveTypeName());
}
return Optional.of(ParquetValueReaders.bigDecimals(desc));
}

@Override
public Optional<ParquetValueReader<?>> visit(
LogicalTypeAnnotation.DateLogicalTypeAnnotation dateLogicalType) {
public Optional<ParquetValueReader<?>> visit(DateLogicalTypeAnnotation dateLogicalType) {
return Optional.of(dateReader(desc));
}

@Override
public Optional<ParquetValueReader<?>> visit(
LogicalTypeAnnotation.TimeLogicalTypeAnnotation timeLogicalType) {
return Optional.of(timeReader(desc, timeLogicalType.getUnit()));
public Optional<ParquetValueReader<?>> visit(TimeLogicalTypeAnnotation timeLogicalType) {
return Optional.of(timeReader(desc));
}

@Override
public Optional<ParquetValueReader<?>> visit(
LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestampLogicalType) {
TimestampLogicalTypeAnnotation timestampLogicalType) {
return Optional.of(
timestampReader(
desc,
timestampLogicalType.getUnit(),
((Types.TimestampType) expected).shouldAdjustToUTC()));
timestampReader(desc, ((Types.TimestampType) expected).shouldAdjustToUTC()));
}

@Override
public Optional<ParquetValueReader<?>> visit(
LogicalTypeAnnotation.IntLogicalTypeAnnotation intLogicalType) {
public Optional<ParquetValueReader<?>> visit(IntLogicalTypeAnnotation intLogicalType) {
if (intLogicalType.getBitWidth() == 64) {
if (intLogicalType.isSigned()) {
// this will throw an UnsupportedOperationException
return Optional.empty();
}

return Optional.of(new ParquetValueReaders.UnboxedReader<>(desc));
}
return (expected.typeId() == org.apache.iceberg.types.Type.TypeID.LONG)
? Optional.of(new ParquetValueReaders.IntAsLongReader(desc))
: Optional.of(new ParquetValueReaders.UnboxedReader<>(desc));

if (expected.typeId() == TypeID.LONG) {
return Optional.of(new ParquetValueReaders.IntAsLongReader(desc));
}

Preconditions.checkArgument(
intLogicalType.isSigned() || intLogicalType.getBitWidth() < 32,
"Cannot read UINT32 as an int value");

return Optional.of(new ParquetValueReaders.UnboxedReader<>(desc));
}

@Override
public Optional<ParquetValueReader<?>> visit(
LogicalTypeAnnotation.JsonLogicalTypeAnnotation jsonLogicalType) {
return Optional.of(new ParquetValueReaders.StringReader(desc));
public Optional<ParquetValueReader<?>> visit(JsonLogicalTypeAnnotation jsonLogicalType) {
return Optional.of(ParquetValueReaders.strings(desc));
}

@Override
public Optional<ParquetValueReader<?>> visit(
LogicalTypeAnnotation.BsonLogicalTypeAnnotation bsonLogicalType) {
return Optional.of(new ParquetValueReaders.BytesReader(desc));
return Optional.of(ParquetValueReaders.byteBuffers(desc));
}

@Override
Expand Down Expand Up @@ -388,7 +402,7 @@ public ParquetValueReader<?> primitive(
if (primitive.getLogicalTypeAnnotation() != null) {
return primitive
.getLogicalTypeAnnotation()
.accept(new LogicalTypeAnnotationParquetValueReaderVisitor(desc, expected, primitive))
.accept(new LogicalTypeReadBuilder(desc, expected))
.orElseThrow(
() ->
new UnsupportedOperationException(
Expand All @@ -399,31 +413,31 @@ public ParquetValueReader<?> primitive(
case FIXED_LEN_BYTE_ARRAY:
return fixedReader(desc);
case BINARY:
if (expected.typeId() == org.apache.iceberg.types.Type.TypeID.STRING) {
return new ParquetValueReaders.StringReader(desc);
if (expected.typeId() == TypeID.STRING) {
return ParquetValueReaders.strings(desc);
} else {
return new ParquetValueReaders.BytesReader(desc);
return ParquetValueReaders.byteBuffers(desc);
}
case INT32:
if (expected.typeId() == org.apache.iceberg.types.Type.TypeID.LONG) {
return new ParquetValueReaders.IntAsLongReader(desc);
if (expected.typeId() == TypeID.LONG) {
return ParquetValueReaders.intsAsLongs(desc);
} else {
return new ParquetValueReaders.UnboxedReader<>(desc);
return ParquetValueReaders.unboxed(desc);
}
case FLOAT:
if (expected.typeId() == org.apache.iceberg.types.Type.TypeID.DOUBLE) {
return new ParquetValueReaders.FloatAsDoubleReader(desc);
if (expected.typeId() == TypeID.DOUBLE) {
return ParquetValueReaders.floatsAsDoubles(desc);
} else {
return new ParquetValueReaders.UnboxedReader<>(desc);
return ParquetValueReaders.unboxed(desc);
}
case BOOLEAN:
case INT64:
case DOUBLE:
return new ParquetValueReaders.UnboxedReader<>(desc);
return ParquetValueReaders.unboxed(desc);
case INT96:
// Impala & Spark used to write timestamps as INT96 without a logical type. For backwards
// compatibility we try to read INT96 as timestamps.
return timestampReader(desc, LogicalTypeAnnotation.TimeUnit.NANOS, true);
return timestampReader(desc, true);
default:
throw new UnsupportedOperationException("Unsupported type: " + primitive);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.apache.iceberg.types.Types.StructType;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Type;

public class GenericParquetReaders extends BaseParquetReaders<Record> {

Expand All @@ -58,8 +57,8 @@ public static ParquetValueReader<Record> buildReader(

@Override
protected ParquetValueReader<Record> createStructReader(
List<Type> types, List<ParquetValueReader<?>> fieldReaders, StructType structType) {
return ParquetValueReaders.recordReader(types, fieldReaders, structType);
List<ParquetValueReader<?>> fieldReaders, StructType structType) {
return ParquetValueReaders.recordReader(fieldReaders, structType);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,7 @@
import org.apache.iceberg.parquet.ParquetValueReaders;
import org.apache.iceberg.types.Types.StructType;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;

public class InternalReader<T extends StructLike> extends BaseParquetReaders<T> {

Expand All @@ -52,9 +49,8 @@ public static <T extends StructLike> ParquetValueReader<T> create(
@Override
@SuppressWarnings("unchecked")
protected ParquetValueReader<T> createStructReader(
List<Type> types, List<ParquetValueReader<?>> fieldReaders, StructType structType) {
return (ParquetValueReader<T>)
ParquetValueReaders.recordReader(types, fieldReaders, structType);
List<ParquetValueReader<?>> fieldReaders, StructType structType) {
return (ParquetValueReader<T>) ParquetValueReaders.recordReader(fieldReaders, structType);
}

@Override
Expand All @@ -68,26 +64,12 @@ protected ParquetValueReader<?> dateReader(ColumnDescriptor desc) {
}

@Override
protected ParquetValueReader<?> timeReader(
ColumnDescriptor desc, LogicalTypeAnnotation.TimeUnit unit) {
if (unit == LogicalTypeAnnotation.TimeUnit.MILLIS) {
return ParquetValueReaders.millisAsTimes(desc);
}

return new ParquetValueReaders.UnboxedReader<>(desc);
protected ParquetValueReader<?> timeReader(ColumnDescriptor desc) {
return ParquetValueReaders.times(desc);
}

@Override
protected ParquetValueReader<?> timestampReader(
ColumnDescriptor desc, LogicalTypeAnnotation.TimeUnit unit, boolean isAdjustedToUTC) {
if (desc.getPrimitiveType().getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.INT96) {
return ParquetValueReaders.int96Timestamps(desc);
}

if (unit == LogicalTypeAnnotation.TimeUnit.MILLIS) {
return ParquetValueReaders.millisAsTimestamps(desc);
}

return new ParquetValueReaders.UnboxedReader<>(desc);
protected ParquetValueReader<?> timestampReader(ColumnDescriptor desc, boolean isAdjustedToUTC) {
return ParquetValueReaders.timestamps(desc);
}
}
Loading
Loading