Skip to content

Commit

Permalink
Parquet: Refactor BaseParquetReaders
Browse files Browse the repository at this point in the history
  • Loading branch information
ajantha-bhat committed Jan 3, 2025
1 parent 701e933 commit cdf3fac
Show file tree
Hide file tree
Showing 2 changed files with 276 additions and 242 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,8 @@
*/
package org.apache.iceberg.data.parquet;

import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.Schema;
import org.apache.iceberg.parquet.ParquetSchemaUtil;
Expand All @@ -45,7 +34,6 @@
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;
Expand Down Expand Up @@ -76,6 +64,16 @@ protected ParquetValueReader<T> createReader(
protected abstract ParquetValueReader<T> createStructReader(
List<Type> types, List<ParquetValueReader<?>> fieldReaders, Types.StructType structType);

protected abstract LogicalTypeAnnotation.LogicalTypeAnnotationVisitor<ParquetValueReader<?>>
logicalTypeReaderVisitor(
ColumnDescriptor desc,
org.apache.iceberg.types.Type.PrimitiveType expected,
PrimitiveType primitive);

protected abstract ParquetValueReaders.PrimitiveReader<?> fixedReader(ColumnDescriptor desc);

protected abstract ParquetValueReaders.PrimitiveReader<?> int96Reader(ColumnDescriptor desc);

protected Object convertConstant(org.apache.iceberg.types.Type type, Object value) {
return value;
}
Expand Down Expand Up @@ -114,113 +112,6 @@ public ParquetValueReader<?> struct(
}
}

private class LogicalTypeAnnotationParquetValueReaderVisitor
implements LogicalTypeAnnotation.LogicalTypeAnnotationVisitor<ParquetValueReader<?>> {

private final ColumnDescriptor desc;
private final org.apache.iceberg.types.Type.PrimitiveType expected;
private final PrimitiveType primitive;

LogicalTypeAnnotationParquetValueReaderVisitor(
ColumnDescriptor desc,
org.apache.iceberg.types.Type.PrimitiveType expected,
PrimitiveType primitive) {
this.desc = desc;
this.expected = expected;
this.primitive = primitive;
}

@Override
public Optional<ParquetValueReader<?>> visit(
LogicalTypeAnnotation.StringLogicalTypeAnnotation stringLogicalType) {
return Optional.of(new ParquetValueReaders.StringReader(desc));
}

@Override
public Optional<ParquetValueReader<?>> visit(
LogicalTypeAnnotation.EnumLogicalTypeAnnotation enumLogicalType) {
return Optional.of(new ParquetValueReaders.StringReader(desc));
}

@Override
public Optional<ParquetValueReader<?>> visit(DecimalLogicalTypeAnnotation decimalLogicalType) {
switch (primitive.getPrimitiveTypeName()) {
case BINARY:
case FIXED_LEN_BYTE_ARRAY:
return Optional.of(
new ParquetValueReaders.BinaryAsDecimalReader(desc, decimalLogicalType.getScale()));
case INT64:
return Optional.of(
new ParquetValueReaders.LongAsDecimalReader(desc, decimalLogicalType.getScale()));
case INT32:
return Optional.of(
new ParquetValueReaders.IntegerAsDecimalReader(desc, decimalLogicalType.getScale()));
default:
throw new UnsupportedOperationException(
"Unsupported base type for decimal: " + primitive.getPrimitiveTypeName());
}
}

@Override
public Optional<ParquetValueReader<?>> visit(
LogicalTypeAnnotation.DateLogicalTypeAnnotation dateLogicalType) {
return Optional.of(new DateReader(desc));
}

@Override
public Optional<ParquetValueReader<?>> visit(
LogicalTypeAnnotation.TimeLogicalTypeAnnotation timeLogicalType) {
if (timeLogicalType.getUnit() == LogicalTypeAnnotation.TimeUnit.MICROS) {
return Optional.of(new TimeReader(desc));
} else if (timeLogicalType.getUnit() == LogicalTypeAnnotation.TimeUnit.MILLIS) {
return Optional.of(new TimeMillisReader(desc));
}

return Optional.empty();
}

@Override
public Optional<ParquetValueReader<?>> visit(
LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestampLogicalType) {
if (timestampLogicalType.getUnit() == LogicalTypeAnnotation.TimeUnit.MICROS) {
Types.TimestampType tsMicrosType = (Types.TimestampType) expected;
return tsMicrosType.shouldAdjustToUTC()
? Optional.of(new TimestamptzReader(desc))
: Optional.of(new TimestampReader(desc));
} else if (timestampLogicalType.getUnit() == LogicalTypeAnnotation.TimeUnit.MILLIS) {
Types.TimestampType tsMillisType = (Types.TimestampType) expected;
return tsMillisType.shouldAdjustToUTC()
? Optional.of(new TimestamptzMillisReader(desc))
: Optional.of(new TimestampMillisReader(desc));
}

return LogicalTypeAnnotation.LogicalTypeAnnotationVisitor.super.visit(timestampLogicalType);
}

@Override
public Optional<ParquetValueReader<?>> visit(
LogicalTypeAnnotation.IntLogicalTypeAnnotation intLogicalType) {
if (intLogicalType.getBitWidth() == 64) {
return Optional.of(new ParquetValueReaders.UnboxedReader<>(desc));
}
return (expected.typeId() == org.apache.iceberg.types.Type.TypeID.LONG)
? Optional.of(new ParquetValueReaders.IntAsLongReader(desc))
: Optional.of(new ParquetValueReaders.UnboxedReader<>(desc));
}

@Override
public Optional<ParquetValueReader<?>> visit(
LogicalTypeAnnotation.JsonLogicalTypeAnnotation jsonLogicalType) {
return Optional.of(new ParquetValueReaders.StringReader(desc));
}

@Override
public Optional<ParquetValueReader<?>> visit(
LogicalTypeAnnotation.BsonLogicalTypeAnnotation bsonLogicalType) {
return Optional.of(new ParquetValueReaders.BytesReader(desc));
}
}

private class ReadBuilder extends TypeWithSchemaVisitor<ParquetValueReader<?>> {
private final MessageType type;
private final Map<Integer, ?> idToConstant;
Expand Down Expand Up @@ -362,7 +253,7 @@ public ParquetValueReader<?> primitive(
if (primitive.getOriginalType() != null) {
return primitive
.getLogicalTypeAnnotation()
.accept(new LogicalTypeAnnotationParquetValueReaderVisitor(desc, expected, primitive))
.accept(logicalTypeReaderVisitor(desc, expected, primitive))
.orElseThrow(
() ->
new UnsupportedOperationException(
Expand All @@ -371,7 +262,7 @@ public ParquetValueReader<?> primitive(

switch (primitive.getPrimitiveTypeName()) {
case FIXED_LEN_BYTE_ARRAY:
return new FixedReader(desc);
return fixedReader(desc);
case BINARY:
if (expected.typeId() == org.apache.iceberg.types.Type.TypeID.STRING) {
return new ParquetValueReaders.StringReader(desc);
Expand All @@ -397,7 +288,7 @@ public ParquetValueReader<?> primitive(
case INT96:
// Impala & Spark used to write timestamps as INT96 without a logical type. For backwards
// compatibility we try to read INT96 as timestamps.
return new TimestampInt96Reader(desc);
return int96Reader(desc);
default:
throw new UnsupportedOperationException("Unsupported type: " + primitive);
}
Expand All @@ -407,124 +298,4 @@ MessageType type() {
return type;
}
}

private static final OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC);
private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate();

private static class DateReader extends ParquetValueReaders.PrimitiveReader<LocalDate> {
private DateReader(ColumnDescriptor desc) {
super(desc);
}

@Override
public LocalDate read(LocalDate reuse) {
return EPOCH_DAY.plusDays(column.nextInteger());
}
}

private static class TimestampReader extends ParquetValueReaders.PrimitiveReader<LocalDateTime> {
private TimestampReader(ColumnDescriptor desc) {
super(desc);
}

@Override
public LocalDateTime read(LocalDateTime reuse) {
return EPOCH.plus(column.nextLong(), ChronoUnit.MICROS).toLocalDateTime();
}
}

private static class TimestampMillisReader
extends ParquetValueReaders.PrimitiveReader<LocalDateTime> {
private TimestampMillisReader(ColumnDescriptor desc) {
super(desc);
}

@Override
public LocalDateTime read(LocalDateTime reuse) {
return EPOCH.plus(column.nextLong() * 1000, ChronoUnit.MICROS).toLocalDateTime();
}
}

private static class TimestampInt96Reader
extends ParquetValueReaders.PrimitiveReader<OffsetDateTime> {
private static final long UNIX_EPOCH_JULIAN = 2_440_588L;

private TimestampInt96Reader(ColumnDescriptor desc) {
super(desc);
}

@Override
public OffsetDateTime read(OffsetDateTime reuse) {
final ByteBuffer byteBuffer =
column.nextBinary().toByteBuffer().order(ByteOrder.LITTLE_ENDIAN);
final long timeOfDayNanos = byteBuffer.getLong();
final int julianDay = byteBuffer.getInt();

return Instant.ofEpochMilli(TimeUnit.DAYS.toMillis(julianDay - UNIX_EPOCH_JULIAN))
.plusNanos(timeOfDayNanos)
.atOffset(ZoneOffset.UTC);
}
}

private static class TimestamptzReader
extends ParquetValueReaders.PrimitiveReader<OffsetDateTime> {
private TimestamptzReader(ColumnDescriptor desc) {
super(desc);
}

@Override
public OffsetDateTime read(OffsetDateTime reuse) {
return EPOCH.plus(column.nextLong(), ChronoUnit.MICROS);
}
}

private static class TimestamptzMillisReader
extends ParquetValueReaders.PrimitiveReader<OffsetDateTime> {
private TimestamptzMillisReader(ColumnDescriptor desc) {
super(desc);
}

@Override
public OffsetDateTime read(OffsetDateTime reuse) {
return EPOCH.plus(column.nextLong() * 1000, ChronoUnit.MICROS);
}
}

private static class TimeMillisReader extends ParquetValueReaders.PrimitiveReader<LocalTime> {
private TimeMillisReader(ColumnDescriptor desc) {
super(desc);
}

@Override
public LocalTime read(LocalTime reuse) {
return LocalTime.ofNanoOfDay(column.nextLong() * 1000000L);
}
}

private static class TimeReader extends ParquetValueReaders.PrimitiveReader<LocalTime> {
private TimeReader(ColumnDescriptor desc) {
super(desc);
}

@Override
public LocalTime read(LocalTime reuse) {
return LocalTime.ofNanoOfDay(column.nextLong() * 1000L);
}
}

private static class FixedReader extends ParquetValueReaders.PrimitiveReader<byte[]> {
private FixedReader(ColumnDescriptor desc) {
super(desc);
}

@Override
public byte[] read(byte[] reuse) {
if (reuse != null) {
column.nextBinary().toByteBuffer().duplicate().get(reuse);
return reuse;
} else {
return column.nextBinary().getBytes();
}
}
}
}
Loading

0 comments on commit cdf3fac

Please sign in to comment.