From fe2c208ce6789eff3d64f0cf044773c22c677e45 Mon Sep 17 00:00:00 2001 From: ajantha-bhat Date: Thu, 2 Jan 2025 15:57:17 +0530 Subject: [PATCH 1/4] Parquet: Refactor BaseParquetWriter --- .../data/parquet/BaseParquetWriter.java | 173 +----------------- .../data/parquet/GenericParquetWriter.java | 168 +++++++++++++++++ .../iceberg/parquet/ParquetValueWriters.java | 15 ++ 3 files changed, 191 insertions(+), 165 deletions(-) diff --git a/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetWriter.java b/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetWriter.java index 470f95e8bc99..2bbcf691d8ca 100644 --- a/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetWriter.java +++ b/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetWriter.java @@ -18,22 +18,13 @@ */ package org.apache.iceberg.data.parquet; -import java.time.Instant; -import java.time.LocalDate; -import java.time.LocalDateTime; -import java.time.LocalTime; -import java.time.OffsetDateTime; -import java.time.ZoneOffset; -import java.time.temporal.ChronoUnit; import java.util.List; import java.util.Optional; import org.apache.iceberg.parquet.ParquetTypeVisitor; import org.apache.iceberg.parquet.ParquetValueWriter; import org.apache.iceberg.parquet.ParquetValueWriters; -import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.parquet.column.ColumnDescriptor; -import org.apache.parquet.io.api.Binary; import org.apache.parquet.schema.GroupType; import org.apache.parquet.schema.LogicalTypeAnnotation; import org.apache.parquet.schema.MessageType; @@ -50,6 +41,12 @@ protected ParquetValueWriter createWriter(MessageType type) { protected abstract ParquetValueWriters.StructWriter createStructWriter( List> writers); + protected abstract LogicalTypeAnnotation.LogicalTypeAnnotationVisitor< + ParquetValueWriters.PrimitiveWriter> + logicalTypeWriterVisitor(ColumnDescriptor desc); + + protected abstract ParquetValueWriters.PrimitiveWriter fixedWriter(ColumnDescriptor desc); + private class WriteBuilder extends ParquetTypeVisitor> { private final MessageType type; @@ -120,7 +117,7 @@ public ParquetValueWriter primitive(PrimitiveType primitive) { LogicalTypeAnnotation logicalType = primitive.getLogicalTypeAnnotation(); if (logicalType != null) { Optional> writer = - logicalType.accept(new LogicalTypeWriterVisitor(desc)); + logicalType.accept(logicalTypeWriterVisitor(desc)); if (writer.isPresent()) { return writer.get(); } @@ -128,7 +125,7 @@ public ParquetValueWriter primitive(PrimitiveType primitive) { switch (primitive.getPrimitiveTypeName()) { case FIXED_LEN_BYTE_ARRAY: - return new FixedWriter(desc); + return fixedWriter(desc); case BINARY: return ParquetValueWriters.byteBuffers(desc); case BOOLEAN: @@ -146,158 +143,4 @@ public ParquetValueWriter primitive(PrimitiveType primitive) { } } } - - private static class LogicalTypeWriterVisitor - implements LogicalTypeAnnotation.LogicalTypeAnnotationVisitor< - ParquetValueWriters.PrimitiveWriter> { - private final ColumnDescriptor desc; - - private LogicalTypeWriterVisitor(ColumnDescriptor desc) { - this.desc = desc; - } - - @Override - public Optional> visit( - LogicalTypeAnnotation.StringLogicalTypeAnnotation stringType) { - return Optional.of(ParquetValueWriters.strings(desc)); - } - - @Override - public Optional> visit( - LogicalTypeAnnotation.EnumLogicalTypeAnnotation enumType) { - return Optional.of(ParquetValueWriters.strings(desc)); - } - - @Override - public Optional> visit( - LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalType) { - switch (desc.getPrimitiveType().getPrimitiveTypeName()) { - case INT32: - return Optional.of( - ParquetValueWriters.decimalAsInteger( - desc, decimalType.getPrecision(), decimalType.getScale())); - case INT64: - return Optional.of( - ParquetValueWriters.decimalAsLong( - desc, decimalType.getPrecision(), decimalType.getScale())); - case BINARY: - case FIXED_LEN_BYTE_ARRAY: - return Optional.of( - ParquetValueWriters.decimalAsFixed( - desc, decimalType.getPrecision(), decimalType.getScale())); - } - return Optional.empty(); - } - - @Override - public Optional> visit( - LogicalTypeAnnotation.DateLogicalTypeAnnotation dateType) { - return Optional.of(new DateWriter(desc)); - } - - @Override - public Optional> visit( - LogicalTypeAnnotation.TimeLogicalTypeAnnotation timeType) { - return Optional.of(new TimeWriter(desc)); - } - - @Override - public Optional> visit( - LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestampType) { - Preconditions.checkArgument( - LogicalTypeAnnotation.TimeUnit.MICROS.equals(timestampType.getUnit()), - "Cannot write timestamp in %s, only MICROS is supported", - timestampType.getUnit()); - if (timestampType.isAdjustedToUTC()) { - return Optional.of(new TimestamptzWriter(desc)); - } else { - return Optional.of(new TimestampWriter(desc)); - } - } - - @Override - public Optional> visit( - LogicalTypeAnnotation.IntLogicalTypeAnnotation intType) { - Preconditions.checkArgument( - intType.isSigned() || intType.getBitWidth() < 64, - "Cannot read uint64: not a supported Java type"); - if (intType.getBitWidth() < 64) { - return Optional.of(ParquetValueWriters.ints(desc)); - } else { - return Optional.of(ParquetValueWriters.longs(desc)); - } - } - - @Override - public Optional> visit( - LogicalTypeAnnotation.JsonLogicalTypeAnnotation jsonLogicalType) { - return Optional.of(ParquetValueWriters.strings(desc)); - } - - @Override - public Optional> visit( - LogicalTypeAnnotation.BsonLogicalTypeAnnotation bsonType) { - return Optional.of(ParquetValueWriters.byteBuffers(desc)); - } - } - - private static final OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC); - private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate(); - - private static class DateWriter extends ParquetValueWriters.PrimitiveWriter { - private DateWriter(ColumnDescriptor desc) { - super(desc); - } - - @Override - public void write(int repetitionLevel, LocalDate value) { - column.writeInteger(repetitionLevel, (int) ChronoUnit.DAYS.between(EPOCH_DAY, value)); - } - } - - private static class TimeWriter extends ParquetValueWriters.PrimitiveWriter { - private TimeWriter(ColumnDescriptor desc) { - super(desc); - } - - @Override - public void write(int repetitionLevel, LocalTime value) { - column.writeLong(repetitionLevel, value.toNanoOfDay() / 1000); - } - } - - private static class TimestampWriter extends ParquetValueWriters.PrimitiveWriter { - private TimestampWriter(ColumnDescriptor desc) { - super(desc); - } - - @Override - public void write(int repetitionLevel, LocalDateTime value) { - column.writeLong( - repetitionLevel, ChronoUnit.MICROS.between(EPOCH, value.atOffset(ZoneOffset.UTC))); - } - } - - private static class TimestamptzWriter - extends ParquetValueWriters.PrimitiveWriter { - private TimestamptzWriter(ColumnDescriptor desc) { - super(desc); - } - - @Override - public void write(int repetitionLevel, OffsetDateTime value) { - column.writeLong(repetitionLevel, ChronoUnit.MICROS.between(EPOCH, value)); - } - } - - private static class FixedWriter extends ParquetValueWriters.PrimitiveWriter { - private FixedWriter(ColumnDescriptor desc) { - super(desc); - } - - @Override - public void write(int repetitionLevel, byte[] value) { - column.writeBinary(repetitionLevel, Binary.fromReusedByteArray(value)); - } - } } diff --git a/parquet/src/main/java/org/apache/iceberg/data/parquet/GenericParquetWriter.java b/parquet/src/main/java/org/apache/iceberg/data/parquet/GenericParquetWriter.java index 7f2c107b8dc8..e0a83d4abff9 100644 --- a/parquet/src/main/java/org/apache/iceberg/data/parquet/GenericParquetWriter.java +++ b/parquet/src/main/java/org/apache/iceberg/data/parquet/GenericParquetWriter.java @@ -18,10 +18,22 @@ */ package org.apache.iceberg.data.parquet; +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.time.temporal.ChronoUnit; import java.util.List; +import java.util.Optional; import org.apache.iceberg.data.Record; import org.apache.iceberg.parquet.ParquetValueWriter; +import org.apache.iceberg.parquet.ParquetValueWriters; import org.apache.iceberg.parquet.ParquetValueWriters.StructWriter; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.schema.LogicalTypeAnnotation; import org.apache.parquet.schema.MessageType; public class GenericParquetWriter extends BaseParquetWriter { @@ -38,6 +50,19 @@ protected StructWriter createStructWriter(List> wr return new RecordWriter(writers); } + @Override + protected LogicalTypeAnnotation.LogicalTypeAnnotationVisitor< + ParquetValueWriters.PrimitiveWriter> + logicalTypeWriterVisitor(ColumnDescriptor desc) { + return new LogicalTypeWriterVisitor(desc); + } + + @Override + protected ParquetValueWriters.PrimitiveWriter fixedWriter(ColumnDescriptor desc) { + // accepts byte[] and internally writes as binary. + return ParquetValueWriters.fixed(desc); + } + private static class RecordWriter extends StructWriter { private RecordWriter(List> writers) { super(writers); @@ -48,4 +73,147 @@ protected Object get(Record struct, int index) { return struct.get(index); } } + + private static class LogicalTypeWriterVisitor + implements LogicalTypeAnnotation.LogicalTypeAnnotationVisitor< + ParquetValueWriters.PrimitiveWriter> { + private final ColumnDescriptor desc; + + private LogicalTypeWriterVisitor(ColumnDescriptor desc) { + this.desc = desc; + } + + @Override + public Optional> visit( + LogicalTypeAnnotation.StringLogicalTypeAnnotation stringType) { + return Optional.of(ParquetValueWriters.strings(desc)); + } + + @Override + public Optional> visit( + LogicalTypeAnnotation.EnumLogicalTypeAnnotation enumType) { + return Optional.of(ParquetValueWriters.strings(desc)); + } + + @Override + public Optional> visit( + LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalType) { + switch (desc.getPrimitiveType().getPrimitiveTypeName()) { + case INT32: + return Optional.of( + ParquetValueWriters.decimalAsInteger( + desc, decimalType.getPrecision(), decimalType.getScale())); + case INT64: + return Optional.of( + ParquetValueWriters.decimalAsLong( + desc, decimalType.getPrecision(), decimalType.getScale())); + case BINARY: + case FIXED_LEN_BYTE_ARRAY: + return Optional.of( + ParquetValueWriters.decimalAsFixed( + desc, decimalType.getPrecision(), decimalType.getScale())); + } + return Optional.empty(); + } + + @Override + public Optional> visit( + LogicalTypeAnnotation.DateLogicalTypeAnnotation dateType) { + return Optional.of(new DateWriter(desc)); + } + + @Override + public Optional> visit( + LogicalTypeAnnotation.TimeLogicalTypeAnnotation timeType) { + return Optional.of(new TimeWriter(desc)); + } + + @Override + public Optional> visit( + LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestampType) { + Preconditions.checkArgument( + LogicalTypeAnnotation.TimeUnit.MICROS.equals(timestampType.getUnit()), + "Cannot write timestamp in %s, only MICROS is supported", + timestampType.getUnit()); + if (timestampType.isAdjustedToUTC()) { + return Optional.of(new TimestamptzWriter(desc)); + } else { + return Optional.of(new TimestampWriter(desc)); + } + } + + @Override + public Optional> visit( + LogicalTypeAnnotation.IntLogicalTypeAnnotation intType) { + Preconditions.checkArgument( + intType.isSigned() || intType.getBitWidth() < 64, + "Cannot read uint64: not a supported Java type"); + if (intType.getBitWidth() < 64) { + return Optional.of(ParquetValueWriters.ints(desc)); + } else { + return Optional.of(ParquetValueWriters.longs(desc)); + } + } + + @Override + public Optional> visit( + LogicalTypeAnnotation.JsonLogicalTypeAnnotation jsonLogicalType) { + return Optional.of(ParquetValueWriters.strings(desc)); + } + + @Override + public Optional> visit( + LogicalTypeAnnotation.BsonLogicalTypeAnnotation bsonType) { + return Optional.of(ParquetValueWriters.byteBuffers(desc)); + } + } + + private static final OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC); + private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate(); + + private static class DateWriter extends ParquetValueWriters.PrimitiveWriter { + private DateWriter(ColumnDescriptor desc) { + super(desc); + } + + @Override + public void write(int repetitionLevel, LocalDate value) { + column.writeInteger(repetitionLevel, (int) ChronoUnit.DAYS.between(EPOCH_DAY, value)); + } + } + + private static class TimeWriter extends ParquetValueWriters.PrimitiveWriter { + private TimeWriter(ColumnDescriptor desc) { + super(desc); + } + + @Override + public void write(int repetitionLevel, LocalTime value) { + column.writeLong(repetitionLevel, value.toNanoOfDay() / 1000); + } + } + + private static class TimestampWriter extends ParquetValueWriters.PrimitiveWriter { + private TimestampWriter(ColumnDescriptor desc) { + super(desc); + } + + @Override + public void write(int repetitionLevel, LocalDateTime value) { + column.writeLong( + repetitionLevel, ChronoUnit.MICROS.between(EPOCH, value.atOffset(ZoneOffset.UTC))); + } + } + + private static class TimestamptzWriter + extends ParquetValueWriters.PrimitiveWriter { + private TimestamptzWriter(ColumnDescriptor desc) { + super(desc); + } + + @Override + public void write(int repetitionLevel, OffsetDateTime value) { + column.writeLong(repetitionLevel, ChronoUnit.MICROS.between(EPOCH, value)); + } + } } diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueWriters.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueWriters.java index 90766983d8c8..70fde738f645 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueWriters.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueWriters.java @@ -106,6 +106,10 @@ public static PrimitiveWriter byteBuffers(ColumnDescriptor desc) { return new BytesWriter(desc); } + public static PrimitiveWriter fixed(ColumnDescriptor desc) { + return new FixedWriter(desc); + } + public static CollectionWriter collections(int dl, int rl, ParquetValueWriter writer) { return new CollectionWriter<>(dl, rl, writer); } @@ -313,6 +317,17 @@ public void write(int repetitionLevel, ByteBuffer buffer) { } } + private static class FixedWriter extends PrimitiveWriter { + private FixedWriter(ColumnDescriptor desc) { + super(desc); + } + + @Override + public void write(int repetitionLevel, byte[] value) { + column.writeBinary(repetitionLevel, Binary.fromReusedByteArray(value)); + } + } + private static class StringWriter extends PrimitiveWriter { private StringWriter(ColumnDescriptor desc) { super(desc); From a2b449b80bdcd72b4ac36e525b4dd8031c96913d Mon Sep 17 00:00:00 2001 From: ajantha-bhat Date: Fri, 3 Jan 2025 19:46:55 +0530 Subject: [PATCH 2/4] Parquet: Refactor BaseParquetReaders --- .../data/parquet/BaseParquetReaders.java | 255 +---------------- .../data/parquet/GenericParquetReaders.java | 263 ++++++++++++++++++ 2 files changed, 276 insertions(+), 242 deletions(-) diff --git a/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java b/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java index efdf9cc9b01d..65ff78513350 100644 --- a/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java +++ b/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java @@ -18,19 +18,8 @@ */ package org.apache.iceberg.data.parquet; -import java.nio.ByteBuffer; -import java.nio.ByteOrder; -import java.time.Instant; -import java.time.LocalDate; -import java.time.LocalDateTime; -import java.time.LocalTime; -import java.time.OffsetDateTime; -import java.time.ZoneOffset; -import java.time.temporal.ChronoUnit; import java.util.List; import java.util.Map; -import java.util.Optional; -import java.util.concurrent.TimeUnit; import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.Schema; import org.apache.iceberg.parquet.ParquetSchemaUtil; @@ -45,7 +34,6 @@ import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.schema.GroupType; import org.apache.parquet.schema.LogicalTypeAnnotation; -import org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.PrimitiveType; import org.apache.parquet.schema.Type; @@ -76,6 +64,16 @@ protected ParquetValueReader createReader( protected abstract ParquetValueReader createStructReader( List types, List> fieldReaders, Types.StructType structType); + protected abstract LogicalTypeAnnotation.LogicalTypeAnnotationVisitor> + logicalTypeReaderVisitor( + ColumnDescriptor desc, + org.apache.iceberg.types.Type.PrimitiveType expected, + PrimitiveType primitive); + + protected abstract ParquetValueReaders.PrimitiveReader fixedReader(ColumnDescriptor desc); + + protected abstract ParquetValueReaders.PrimitiveReader int96Reader(ColumnDescriptor desc); + protected Object convertConstant(org.apache.iceberg.types.Type type, Object value) { return value; } @@ -114,113 +112,6 @@ public ParquetValueReader struct( } } - private class LogicalTypeAnnotationParquetValueReaderVisitor - implements LogicalTypeAnnotation.LogicalTypeAnnotationVisitor> { - - private final ColumnDescriptor desc; - private final org.apache.iceberg.types.Type.PrimitiveType expected; - private final PrimitiveType primitive; - - LogicalTypeAnnotationParquetValueReaderVisitor( - ColumnDescriptor desc, - org.apache.iceberg.types.Type.PrimitiveType expected, - PrimitiveType primitive) { - this.desc = desc; - this.expected = expected; - this.primitive = primitive; - } - - @Override - public Optional> visit( - LogicalTypeAnnotation.StringLogicalTypeAnnotation stringLogicalType) { - return Optional.of(new ParquetValueReaders.StringReader(desc)); - } - - @Override - public Optional> visit( - LogicalTypeAnnotation.EnumLogicalTypeAnnotation enumLogicalType) { - return Optional.of(new ParquetValueReaders.StringReader(desc)); - } - - @Override - public Optional> visit(DecimalLogicalTypeAnnotation decimalLogicalType) { - switch (primitive.getPrimitiveTypeName()) { - case BINARY: - case FIXED_LEN_BYTE_ARRAY: - return Optional.of( - new ParquetValueReaders.BinaryAsDecimalReader(desc, decimalLogicalType.getScale())); - case INT64: - return Optional.of( - new ParquetValueReaders.LongAsDecimalReader(desc, decimalLogicalType.getScale())); - case INT32: - return Optional.of( - new ParquetValueReaders.IntegerAsDecimalReader(desc, decimalLogicalType.getScale())); - default: - throw new UnsupportedOperationException( - "Unsupported base type for decimal: " + primitive.getPrimitiveTypeName()); - } - } - - @Override - public Optional> visit( - LogicalTypeAnnotation.DateLogicalTypeAnnotation dateLogicalType) { - return Optional.of(new DateReader(desc)); - } - - @Override - public Optional> visit( - LogicalTypeAnnotation.TimeLogicalTypeAnnotation timeLogicalType) { - if (timeLogicalType.getUnit() == LogicalTypeAnnotation.TimeUnit.MICROS) { - return Optional.of(new TimeReader(desc)); - } else if (timeLogicalType.getUnit() == LogicalTypeAnnotation.TimeUnit.MILLIS) { - return Optional.of(new TimeMillisReader(desc)); - } - - return Optional.empty(); - } - - @Override - public Optional> visit( - LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestampLogicalType) { - if (timestampLogicalType.getUnit() == LogicalTypeAnnotation.TimeUnit.MICROS) { - Types.TimestampType tsMicrosType = (Types.TimestampType) expected; - return tsMicrosType.shouldAdjustToUTC() - ? Optional.of(new TimestamptzReader(desc)) - : Optional.of(new TimestampReader(desc)); - } else if (timestampLogicalType.getUnit() == LogicalTypeAnnotation.TimeUnit.MILLIS) { - Types.TimestampType tsMillisType = (Types.TimestampType) expected; - return tsMillisType.shouldAdjustToUTC() - ? Optional.of(new TimestamptzMillisReader(desc)) - : Optional.of(new TimestampMillisReader(desc)); - } - - return LogicalTypeAnnotation.LogicalTypeAnnotationVisitor.super.visit(timestampLogicalType); - } - - @Override - public Optional> visit( - LogicalTypeAnnotation.IntLogicalTypeAnnotation intLogicalType) { - if (intLogicalType.getBitWidth() == 64) { - return Optional.of(new ParquetValueReaders.UnboxedReader<>(desc)); - } - return (expected.typeId() == org.apache.iceberg.types.Type.TypeID.LONG) - ? Optional.of(new ParquetValueReaders.IntAsLongReader(desc)) - : Optional.of(new ParquetValueReaders.UnboxedReader<>(desc)); - } - - @Override - public Optional> visit( - LogicalTypeAnnotation.JsonLogicalTypeAnnotation jsonLogicalType) { - return Optional.of(new ParquetValueReaders.StringReader(desc)); - } - - @Override - public Optional> visit( - LogicalTypeAnnotation.BsonLogicalTypeAnnotation bsonLogicalType) { - return Optional.of(new ParquetValueReaders.BytesReader(desc)); - } - } - private class ReadBuilder extends TypeWithSchemaVisitor> { private final MessageType type; private final Map idToConstant; @@ -362,7 +253,7 @@ public ParquetValueReader primitive( if (primitive.getOriginalType() != null) { return primitive .getLogicalTypeAnnotation() - .accept(new LogicalTypeAnnotationParquetValueReaderVisitor(desc, expected, primitive)) + .accept(logicalTypeReaderVisitor(desc, expected, primitive)) .orElseThrow( () -> new UnsupportedOperationException( @@ -371,7 +262,7 @@ public ParquetValueReader primitive( switch (primitive.getPrimitiveTypeName()) { case FIXED_LEN_BYTE_ARRAY: - return new FixedReader(desc); + return fixedReader(desc); case BINARY: if (expected.typeId() == org.apache.iceberg.types.Type.TypeID.STRING) { return new ParquetValueReaders.StringReader(desc); @@ -397,7 +288,7 @@ public ParquetValueReader primitive( case INT96: // Impala & Spark used to write timestamps as INT96 without a logical type. For backwards // compatibility we try to read INT96 as timestamps. - return new TimestampInt96Reader(desc); + return int96Reader(desc); default: throw new UnsupportedOperationException("Unsupported type: " + primitive); } @@ -407,124 +298,4 @@ MessageType type() { return type; } } - - private static final OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC); - private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate(); - - private static class DateReader extends ParquetValueReaders.PrimitiveReader { - private DateReader(ColumnDescriptor desc) { - super(desc); - } - - @Override - public LocalDate read(LocalDate reuse) { - return EPOCH_DAY.plusDays(column.nextInteger()); - } - } - - private static class TimestampReader extends ParquetValueReaders.PrimitiveReader { - private TimestampReader(ColumnDescriptor desc) { - super(desc); - } - - @Override - public LocalDateTime read(LocalDateTime reuse) { - return EPOCH.plus(column.nextLong(), ChronoUnit.MICROS).toLocalDateTime(); - } - } - - private static class TimestampMillisReader - extends ParquetValueReaders.PrimitiveReader { - private TimestampMillisReader(ColumnDescriptor desc) { - super(desc); - } - - @Override - public LocalDateTime read(LocalDateTime reuse) { - return EPOCH.plus(column.nextLong() * 1000, ChronoUnit.MICROS).toLocalDateTime(); - } - } - - private static class TimestampInt96Reader - extends ParquetValueReaders.PrimitiveReader { - private static final long UNIX_EPOCH_JULIAN = 2_440_588L; - - private TimestampInt96Reader(ColumnDescriptor desc) { - super(desc); - } - - @Override - public OffsetDateTime read(OffsetDateTime reuse) { - final ByteBuffer byteBuffer = - column.nextBinary().toByteBuffer().order(ByteOrder.LITTLE_ENDIAN); - final long timeOfDayNanos = byteBuffer.getLong(); - final int julianDay = byteBuffer.getInt(); - - return Instant.ofEpochMilli(TimeUnit.DAYS.toMillis(julianDay - UNIX_EPOCH_JULIAN)) - .plusNanos(timeOfDayNanos) - .atOffset(ZoneOffset.UTC); - } - } - - private static class TimestamptzReader - extends ParquetValueReaders.PrimitiveReader { - private TimestamptzReader(ColumnDescriptor desc) { - super(desc); - } - - @Override - public OffsetDateTime read(OffsetDateTime reuse) { - return EPOCH.plus(column.nextLong(), ChronoUnit.MICROS); - } - } - - private static class TimestamptzMillisReader - extends ParquetValueReaders.PrimitiveReader { - private TimestamptzMillisReader(ColumnDescriptor desc) { - super(desc); - } - - @Override - public OffsetDateTime read(OffsetDateTime reuse) { - return EPOCH.plus(column.nextLong() * 1000, ChronoUnit.MICROS); - } - } - - private static class TimeMillisReader extends ParquetValueReaders.PrimitiveReader { - private TimeMillisReader(ColumnDescriptor desc) { - super(desc); - } - - @Override - public LocalTime read(LocalTime reuse) { - return LocalTime.ofNanoOfDay(column.nextLong() * 1000000L); - } - } - - private static class TimeReader extends ParquetValueReaders.PrimitiveReader { - private TimeReader(ColumnDescriptor desc) { - super(desc); - } - - @Override - public LocalTime read(LocalTime reuse) { - return LocalTime.ofNanoOfDay(column.nextLong() * 1000L); - } - } - - private static class FixedReader extends ParquetValueReaders.PrimitiveReader { - private FixedReader(ColumnDescriptor desc) { - super(desc); - } - - @Override - public byte[] read(byte[] reuse) { - if (reuse != null) { - column.nextBinary().toByteBuffer().duplicate().get(reuse); - return reuse; - } else { - return column.nextBinary().getBytes(); - } - } - } } diff --git a/parquet/src/main/java/org/apache/iceberg/data/parquet/GenericParquetReaders.java b/parquet/src/main/java/org/apache/iceberg/data/parquet/GenericParquetReaders.java index 8023cef71dae..9ab086a8ae80 100644 --- a/parquet/src/main/java/org/apache/iceberg/data/parquet/GenericParquetReaders.java +++ b/parquet/src/main/java/org/apache/iceberg/data/parquet/GenericParquetReaders.java @@ -18,16 +18,32 @@ */ package org.apache.iceberg.data.parquet; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.time.temporal.ChronoUnit; import java.util.List; import java.util.Map; +import java.util.Optional; +import java.util.concurrent.TimeUnit; import org.apache.iceberg.Schema; import org.apache.iceberg.data.GenericDataUtil; import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.Record; import org.apache.iceberg.parquet.ParquetValueReader; +import org.apache.iceberg.parquet.ParquetValueReaders; import org.apache.iceberg.parquet.ParquetValueReaders.StructReader; +import org.apache.iceberg.types.Types; import org.apache.iceberg.types.Types.StructType; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.schema.LogicalTypeAnnotation; import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType; import org.apache.parquet.schema.Type; public class GenericParquetReaders extends BaseParquetReaders { @@ -52,6 +68,25 @@ protected ParquetValueReader createStructReader( return new RecordReader(types, fieldReaders, structType); } + @Override + protected LogicalTypeAnnotation.LogicalTypeAnnotationVisitor> + logicalTypeReaderVisitor( + ColumnDescriptor desc, + org.apache.iceberg.types.Type.PrimitiveType expected, + PrimitiveType primitive) { + return new LogicalTypeAnnotationParquetValueReaderVisitor(desc, expected, primitive); + } + + @Override + protected ParquetValueReaders.PrimitiveReader fixedReader(ColumnDescriptor desc) { + return new FixedReader(desc); + } + + @Override + protected ParquetValueReaders.PrimitiveReader int96Reader(ColumnDescriptor desc) { + return new TimestampInt96Reader(desc); + } + @Override protected Object convertConstant(org.apache.iceberg.types.Type type, Object value) { return GenericDataUtil.internalToGeneric(type, value); @@ -92,4 +127,232 @@ protected void set(Record struct, int pos, Object value) { struct.set(pos, value); } } + + private class LogicalTypeAnnotationParquetValueReaderVisitor + implements LogicalTypeAnnotation.LogicalTypeAnnotationVisitor> { + + private final ColumnDescriptor desc; + private final org.apache.iceberg.types.Type.PrimitiveType expected; + private final PrimitiveType primitive; + + LogicalTypeAnnotationParquetValueReaderVisitor( + ColumnDescriptor desc, + org.apache.iceberg.types.Type.PrimitiveType expected, + PrimitiveType primitive) { + this.desc = desc; + this.expected = expected; + this.primitive = primitive; + } + + @Override + public Optional> visit( + LogicalTypeAnnotation.StringLogicalTypeAnnotation stringLogicalType) { + return Optional.of(new ParquetValueReaders.StringReader(desc)); + } + + @Override + public Optional> visit( + LogicalTypeAnnotation.EnumLogicalTypeAnnotation enumLogicalType) { + return Optional.of(new ParquetValueReaders.StringReader(desc)); + } + + @Override + public Optional> visit( + LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalLogicalType) { + switch (primitive.getPrimitiveTypeName()) { + case BINARY: + case FIXED_LEN_BYTE_ARRAY: + return Optional.of( + new ParquetValueReaders.BinaryAsDecimalReader(desc, decimalLogicalType.getScale())); + case INT64: + return Optional.of( + new ParquetValueReaders.LongAsDecimalReader(desc, decimalLogicalType.getScale())); + case INT32: + return Optional.of( + new ParquetValueReaders.IntegerAsDecimalReader(desc, decimalLogicalType.getScale())); + default: + throw new UnsupportedOperationException( + "Unsupported base type for decimal: " + primitive.getPrimitiveTypeName()); + } + } + + @Override + public Optional> visit( + LogicalTypeAnnotation.DateLogicalTypeAnnotation dateLogicalType) { + return Optional.of(new DateReader(desc)); + } + + @Override + public Optional> visit( + LogicalTypeAnnotation.TimeLogicalTypeAnnotation timeLogicalType) { + if (timeLogicalType.getUnit() == LogicalTypeAnnotation.TimeUnit.MICROS) { + return Optional.of(new TimeReader(desc)); + } else if (timeLogicalType.getUnit() == LogicalTypeAnnotation.TimeUnit.MILLIS) { + return Optional.of(new TimeMillisReader(desc)); + } + + return Optional.empty(); + } + + @Override + public Optional> visit( + LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestampLogicalType) { + if (timestampLogicalType.getUnit() == LogicalTypeAnnotation.TimeUnit.MICROS) { + Types.TimestampType tsMicrosType = (Types.TimestampType) expected; + return tsMicrosType.shouldAdjustToUTC() + ? Optional.of(new TimestamptzReader(desc)) + : Optional.of(new TimestampReader(desc)); + } else if (timestampLogicalType.getUnit() == LogicalTypeAnnotation.TimeUnit.MILLIS) { + Types.TimestampType tsMillisType = (Types.TimestampType) expected; + return tsMillisType.shouldAdjustToUTC() + ? Optional.of(new TimestamptzMillisReader(desc)) + : Optional.of(new TimestampMillisReader(desc)); + } + + return LogicalTypeAnnotation.LogicalTypeAnnotationVisitor.super.visit(timestampLogicalType); + } + + @Override + public Optional> visit( + LogicalTypeAnnotation.IntLogicalTypeAnnotation intLogicalType) { + if (intLogicalType.getBitWidth() == 64) { + return Optional.of(new ParquetValueReaders.UnboxedReader<>(desc)); + } + return (expected.typeId() == org.apache.iceberg.types.Type.TypeID.LONG) + ? Optional.of(new ParquetValueReaders.IntAsLongReader(desc)) + : Optional.of(new ParquetValueReaders.UnboxedReader<>(desc)); + } + + @Override + public Optional> visit( + LogicalTypeAnnotation.JsonLogicalTypeAnnotation jsonLogicalType) { + return Optional.of(new ParquetValueReaders.StringReader(desc)); + } + + @Override + public Optional> visit( + LogicalTypeAnnotation.BsonLogicalTypeAnnotation bsonLogicalType) { + return Optional.of(new ParquetValueReaders.BytesReader(desc)); + } + } + + private static final OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC); + private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate(); + + private static class DateReader extends ParquetValueReaders.PrimitiveReader { + private DateReader(ColumnDescriptor desc) { + super(desc); + } + + @Override + public LocalDate read(LocalDate reuse) { + return EPOCH_DAY.plusDays(column.nextInteger()); + } + } + + private static class TimestampReader extends ParquetValueReaders.PrimitiveReader { + private TimestampReader(ColumnDescriptor desc) { + super(desc); + } + + @Override + public LocalDateTime read(LocalDateTime reuse) { + return EPOCH.plus(column.nextLong(), ChronoUnit.MICROS).toLocalDateTime(); + } + } + + private static class TimestampMillisReader + extends ParquetValueReaders.PrimitiveReader { + private TimestampMillisReader(ColumnDescriptor desc) { + super(desc); + } + + @Override + public LocalDateTime read(LocalDateTime reuse) { + return EPOCH.plus(column.nextLong() * 1000, ChronoUnit.MICROS).toLocalDateTime(); + } + } + + private static class TimestampInt96Reader + extends ParquetValueReaders.PrimitiveReader { + private static final long UNIX_EPOCH_JULIAN = 2_440_588L; + + private TimestampInt96Reader(ColumnDescriptor desc) { + super(desc); + } + + @Override + public OffsetDateTime read(OffsetDateTime reuse) { + final ByteBuffer byteBuffer = + column.nextBinary().toByteBuffer().order(ByteOrder.LITTLE_ENDIAN); + final long timeOfDayNanos = byteBuffer.getLong(); + final int julianDay = byteBuffer.getInt(); + + return Instant.ofEpochMilli(TimeUnit.DAYS.toMillis(julianDay - UNIX_EPOCH_JULIAN)) + .plusNanos(timeOfDayNanos) + .atOffset(ZoneOffset.UTC); + } + } + + private static class TimestamptzReader + extends ParquetValueReaders.PrimitiveReader { + private TimestamptzReader(ColumnDescriptor desc) { + super(desc); + } + + @Override + public OffsetDateTime read(OffsetDateTime reuse) { + return EPOCH.plus(column.nextLong(), ChronoUnit.MICROS); + } + } + + private static class TimestamptzMillisReader + extends ParquetValueReaders.PrimitiveReader { + private TimestamptzMillisReader(ColumnDescriptor desc) { + super(desc); + } + + @Override + public OffsetDateTime read(OffsetDateTime reuse) { + return EPOCH.plus(column.nextLong() * 1000, ChronoUnit.MICROS); + } + } + + private static class TimeMillisReader extends ParquetValueReaders.PrimitiveReader { + private TimeMillisReader(ColumnDescriptor desc) { + super(desc); + } + + @Override + public LocalTime read(LocalTime reuse) { + return LocalTime.ofNanoOfDay(column.nextLong() * 1000000L); + } + } + + private static class TimeReader extends ParquetValueReaders.PrimitiveReader { + private TimeReader(ColumnDescriptor desc) { + super(desc); + } + + @Override + public LocalTime read(LocalTime reuse) { + return LocalTime.ofNanoOfDay(column.nextLong() * 1000L); + } + } + + private static class FixedReader extends ParquetValueReaders.PrimitiveReader { + private FixedReader(ColumnDescriptor desc) { + super(desc); + } + + @Override + public byte[] read(byte[] reuse) { + if (reuse != null) { + column.nextBinary().toByteBuffer().duplicate().get(reuse); + return reuse; + } else { + return column.nextBinary().getBytes(); + } + } + } } From cd8edeac0ccaf6ee350653f22b7cbfcb902dda52 Mon Sep 17 00:00:00 2001 From: ajantha-bhat Date: Fri, 3 Jan 2025 18:20:11 +0530 Subject: [PATCH 3/4] Parquet: Add internal writer and reader --- .../data/parquet/BaseParquetReaders.java | 2 +- .../iceberg/data/parquet/InternalReader.java | 207 ++++++++++++++++++ .../iceberg/data/parquet/InternalWriter.java | 150 +++++++++++++ .../iceberg/parquet/ParquetValueReaders.java | 13 ++ .../iceberg/parquet/ParquetValueWriters.java | 17 ++ .../iceberg/parquet/TestInternalWriter.java | 132 +++++++++++ 6 files changed, 520 insertions(+), 1 deletion(-) create mode 100644 parquet/src/main/java/org/apache/iceberg/data/parquet/InternalReader.java create mode 100644 parquet/src/main/java/org/apache/iceberg/data/parquet/InternalWriter.java create mode 100644 parquet/src/test/java/org/apache/iceberg/parquet/TestInternalWriter.java diff --git a/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java b/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java index 65ff78513350..6216087d2e9e 100644 --- a/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java +++ b/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java @@ -250,7 +250,7 @@ public ParquetValueReader primitive( ColumnDescriptor desc = type.getColumnDescription(currentPath()); - if (primitive.getOriginalType() != null) { + if (primitive.getLogicalTypeAnnotation() != null) { return primitive .getLogicalTypeAnnotation() .accept(logicalTypeReaderVisitor(desc, expected, primitive)) diff --git a/parquet/src/main/java/org/apache/iceberg/data/parquet/InternalReader.java b/parquet/src/main/java/org/apache/iceberg/data/parquet/InternalReader.java new file mode 100644 index 000000000000..51cdef1952c0 --- /dev/null +++ b/parquet/src/main/java/org/apache/iceberg/data/parquet/InternalReader.java @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.data.parquet; + +import java.util.List; +import java.util.Map; +import java.util.Optional; +import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.parquet.ParquetValueReader; +import org.apache.iceberg.parquet.ParquetValueReaders; +import org.apache.iceberg.parquet.ParquetValueReaders.StructReader; +import org.apache.iceberg.types.Types.StructType; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Type; + +public class InternalReader extends BaseParquetReaders { + + private static final InternalReader INSTANCE = new InternalReader(); + + private InternalReader() {} + + public static ParquetValueReader buildReader( + Schema expectedSchema, MessageType fileSchema) { + return INSTANCE.createReader(expectedSchema, fileSchema); + } + + public static ParquetValueReader buildReader( + Schema expectedSchema, MessageType fileSchema, Map idToConstant) { + return INSTANCE.createReader(expectedSchema, fileSchema, idToConstant); + } + + @Override + protected ParquetValueReader createStructReader( + List types, List> fieldReaders, StructType structType) { + return new ParquetStructReader(types, fieldReaders, structType); + } + + @Override + protected LogicalTypeAnnotation.LogicalTypeAnnotationVisitor> + logicalTypeReaderVisitor( + ColumnDescriptor desc, + org.apache.iceberg.types.Type.PrimitiveType expected, + PrimitiveType primitive) { + return new LogicalTypeAnnotationParquetValueReaderVisitor(desc, expected, primitive); + } + + @Override + protected ParquetValueReaders.PrimitiveReader fixedReader(ColumnDescriptor desc) { + return new ParquetValueReaders.BytesReader(desc); + } + + @Override + protected ParquetValueReaders.PrimitiveReader int96Reader(ColumnDescriptor desc) { + // normal handling as int96 + return new ParquetValueReaders.UnboxedReader<>(desc); + } + + private static class ParquetStructReader extends StructReader { + private final GenericRecord template; + + ParquetStructReader(List types, List> readers, StructType struct) { + super(types, readers); + this.template = struct != null ? GenericRecord.create(struct) : null; + } + + @Override + protected StructLike newStructData(StructLike reuse) { + if (reuse != null) { + return reuse; + } else { + return template.copy(); + } + } + + @Override + protected Object getField(StructLike intermediate, int pos) { + return intermediate.get(pos, Object.class); + } + + @Override + protected StructLike buildStruct(StructLike struct) { + return struct; + } + + @Override + protected void set(StructLike struct, int pos, Object value) { + struct.set(pos, value); + } + } + + private static class LogicalTypeAnnotationParquetValueReaderVisitor + implements LogicalTypeAnnotation.LogicalTypeAnnotationVisitor> { + + private final ColumnDescriptor desc; + private final org.apache.iceberg.types.Type.PrimitiveType expected; + private final PrimitiveType primitive; + + LogicalTypeAnnotationParquetValueReaderVisitor( + ColumnDescriptor desc, + org.apache.iceberg.types.Type.PrimitiveType expected, + PrimitiveType primitive) { + this.desc = desc; + this.expected = expected; + this.primitive = primitive; + } + + @Override + public Optional> visit( + LogicalTypeAnnotation.StringLogicalTypeAnnotation stringLogicalType) { + return Optional.of(new ParquetValueReaders.StringReader(desc)); + } + + @Override + public Optional> visit( + LogicalTypeAnnotation.EnumLogicalTypeAnnotation enumLogicalType) { + return Optional.of(new ParquetValueReaders.StringReader(desc)); + } + + @Override + public Optional> visit( + LogicalTypeAnnotation.UUIDLogicalTypeAnnotation uuidLogicalType) { + return Optional.of(new ParquetValueReaders.UUIDReader(desc)); + } + + @Override + public Optional> visit( + LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalLogicalType) { + switch (primitive.getPrimitiveTypeName()) { + case BINARY: + case FIXED_LEN_BYTE_ARRAY: + return Optional.of( + new ParquetValueReaders.BinaryAsDecimalReader(desc, decimalLogicalType.getScale())); + case INT64: + return Optional.of( + new ParquetValueReaders.LongAsDecimalReader(desc, decimalLogicalType.getScale())); + case INT32: + return Optional.of( + new ParquetValueReaders.IntegerAsDecimalReader(desc, decimalLogicalType.getScale())); + default: + throw new UnsupportedOperationException( + "Unsupported base type for decimal: " + primitive.getPrimitiveTypeName()); + } + } + + @Override + public Optional> visit( + LogicalTypeAnnotation.DateLogicalTypeAnnotation dateLogicalType) { + return Optional.of(new ParquetValueReaders.UnboxedReader<>(desc)); + } + + @Override + public Optional> visit( + LogicalTypeAnnotation.TimeLogicalTypeAnnotation timeLogicalType) { + return Optional.of(new ParquetValueReaders.UnboxedReader<>(desc)); + } + + @Override + public Optional> visit( + LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestampLogicalType) { + return Optional.of(new ParquetValueReaders.UnboxedReader<>(desc)); + } + + @Override + public Optional> visit( + LogicalTypeAnnotation.IntLogicalTypeAnnotation intLogicalType) { + if (intLogicalType.getBitWidth() == 64) { + return Optional.of(new ParquetValueReaders.UnboxedReader<>(desc)); + } + return (expected.typeId() == org.apache.iceberg.types.Type.TypeID.LONG) + ? Optional.of(new ParquetValueReaders.IntAsLongReader(desc)) + : Optional.of(new ParquetValueReaders.UnboxedReader<>(desc)); + } + + @Override + public Optional> visit( + LogicalTypeAnnotation.JsonLogicalTypeAnnotation jsonLogicalType) { + return Optional.of(new ParquetValueReaders.StringReader(desc)); + } + + @Override + public Optional> visit( + LogicalTypeAnnotation.BsonLogicalTypeAnnotation bsonLogicalType) { + return Optional.of(new ParquetValueReaders.BytesReader(desc)); + } + } +} diff --git a/parquet/src/main/java/org/apache/iceberg/data/parquet/InternalWriter.java b/parquet/src/main/java/org/apache/iceberg/data/parquet/InternalWriter.java new file mode 100644 index 000000000000..2acb184c4d3f --- /dev/null +++ b/parquet/src/main/java/org/apache/iceberg/data/parquet/InternalWriter.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.data.parquet; + +import java.util.List; +import java.util.Optional; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.parquet.ParquetValueWriter; +import org.apache.iceberg.parquet.ParquetValueWriters; +import org.apache.iceberg.parquet.ParquetValueWriters.StructWriter; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.types.Type; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.parquet.schema.MessageType; + +/** + * A Writer that consumes Iceberg's internal in-memory object model. + * + *

Iceberg's internal in-memory object model produces the types defined in {@link + * Type.TypeID#javaClass()}. + */ +public class InternalWriter extends BaseParquetWriter { + private static final InternalWriter INSTANCE = new InternalWriter(); + + private InternalWriter() {} + + public static ParquetValueWriter buildWriter(MessageType type) { + return INSTANCE.createWriter(type); + } + + @Override + protected StructWriter createStructWriter(List> writers) { + return new ParquetStructWriter(writers); + } + + @Override + protected LogicalTypeAnnotation.LogicalTypeAnnotationVisitor< + ParquetValueWriters.PrimitiveWriter> + logicalTypeWriterVisitor(ColumnDescriptor desc) { + return new LogicalTypeWriterVisitor(desc); + } + + @Override + protected ParquetValueWriters.PrimitiveWriter fixedWriter(ColumnDescriptor desc) { + // accepts ByteBuffer and internally writes as binary. + return ParquetValueWriters.byteBuffers(desc); + } + + private static class ParquetStructWriter extends StructWriter { + private ParquetStructWriter(List> writers) { + super(writers); + } + + @Override + protected Object get(StructLike struct, int index) { + return struct.get(index, Object.class); + } + } + + private static class LogicalTypeWriterVisitor + implements LogicalTypeAnnotation.LogicalTypeAnnotationVisitor< + ParquetValueWriters.PrimitiveWriter> { + private final ColumnDescriptor desc; + + private LogicalTypeWriterVisitor(ColumnDescriptor desc) { + this.desc = desc; + } + + @Override + public Optional> visit( + LogicalTypeAnnotation.StringLogicalTypeAnnotation stringType) { + return Optional.of(ParquetValueWriters.strings(desc)); + } + + @Override + public Optional> visit( + LogicalTypeAnnotation.EnumLogicalTypeAnnotation enumType) { + return Optional.of(ParquetValueWriters.strings(desc)); + } + + @Override + public Optional> visit( + LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalType) { + switch (desc.getPrimitiveType().getPrimitiveTypeName()) { + case INT32: + return Optional.of( + ParquetValueWriters.decimalAsInteger( + desc, decimalType.getPrecision(), decimalType.getScale())); + case INT64: + return Optional.of( + ParquetValueWriters.decimalAsLong( + desc, decimalType.getPrecision(), decimalType.getScale())); + case BINARY: + case FIXED_LEN_BYTE_ARRAY: + return Optional.of( + ParquetValueWriters.decimalAsFixed( + desc, decimalType.getPrecision(), decimalType.getScale())); + } + return Optional.empty(); + } + + @Override + public Optional> visit( + LogicalTypeAnnotation.IntLogicalTypeAnnotation intType) { + Preconditions.checkArgument( + intType.isSigned() || intType.getBitWidth() < 64, + "Cannot read uint64: not a supported Java type"); + if (intType.getBitWidth() < 64) { + return Optional.of(ParquetValueWriters.ints(desc)); + } else { + return Optional.of(ParquetValueWriters.longs(desc)); + } + } + + @Override + public Optional> visit( + LogicalTypeAnnotation.JsonLogicalTypeAnnotation jsonLogicalType) { + return Optional.of(ParquetValueWriters.strings(desc)); + } + + @Override + public Optional> visit( + LogicalTypeAnnotation.BsonLogicalTypeAnnotation bsonType) { + return Optional.of(ParquetValueWriters.byteBuffers(desc)); + } + + @Override + public Optional> visit( + LogicalTypeAnnotation.UUIDLogicalTypeAnnotation uuidLogicalType) { + return Optional.of(ParquetValueWriters.uuids(desc)); + } + } +} diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java index b055a139fa59..517d50887940 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java @@ -27,9 +27,11 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.UUID; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.util.UUIDUtil; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.page.PageReadStore; import org.apache.parquet.io.api.Binary; @@ -401,6 +403,17 @@ public ByteBuffer read(ByteBuffer reuse) { } } + public static class UUIDReader extends PrimitiveReader { + public UUIDReader(ColumnDescriptor desc) { + super(desc); + } + + @Override + public UUID read(UUID reuse) { + return UUIDUtil.convert(column.nextBinary().toByteBuffer()); + } + } + public static class ByteArrayReader extends ParquetValueReaders.PrimitiveReader { public ByteArrayReader(ColumnDescriptor desc) { super(desc); diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueWriters.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueWriters.java index 70fde738f645..857dc7ad19c2 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueWriters.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueWriters.java @@ -26,6 +26,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.UUID; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -38,6 +39,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.util.DecimalUtil; +import org.apache.iceberg.util.UUIDUtil; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.ColumnWriteStore; import org.apache.parquet.io.api.Binary; @@ -87,6 +89,10 @@ public static PrimitiveWriter strings(ColumnDescriptor desc) { return new StringWriter(desc); } + public static PrimitiveWriter uuids(ColumnDescriptor desc) { + return new UUIDWriter(desc); + } + public static PrimitiveWriter decimalAsInteger( ColumnDescriptor desc, int precision, int scale) { return new IntegerDecimalWriter(desc, precision, scale); @@ -345,6 +351,17 @@ public void write(int repetitionLevel, CharSequence value) { } } + private static class UUIDWriter extends PrimitiveWriter { + private UUIDWriter(ColumnDescriptor desc) { + super(desc); + } + + @Override + public void write(int repetitionLevel, UUID value) { + column.writeBinary(repetitionLevel, Binary.fromReusedByteArray(UUIDUtil.convert(value))); + } + } + static class OptionWriter implements ParquetValueWriter { private final int definitionLevel; private final ParquetValueWriter writer; diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/TestInternalWriter.java b/parquet/src/test/java/org/apache/iceberg/parquet/TestInternalWriter.java new file mode 100644 index 000000000000..3b21aca611de --- /dev/null +++ b/parquet/src/test/java/org/apache/iceberg/parquet/TestInternalWriter.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.parquet; + +import static org.apache.iceberg.parquet.ParquetWritingTestUtils.createTempFile; +import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.apache.iceberg.types.Types.NestedField.required; +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.file.Path; +import java.util.List; +import java.util.UUID; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileContent; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Files; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.data.parquet.InternalReader; +import org.apache.iceberg.data.parquet.InternalWriter; +import org.apache.iceberg.expressions.Literal; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.DataWriter; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.StructProjection; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +public class TestInternalWriter { + @TempDir private Path temp; + + @Test + public void testDataWriter() throws IOException { + Schema schema = + new Schema( + required(100, "b", Types.BooleanType.get()), + optional(101, "i", Types.IntegerType.get()), + required(102, "l", Types.LongType.get()), + optional(103, "f", Types.FloatType.get()), + required(104, "d", Types.DoubleType.get()), + optional(105, "date", Types.DateType.get()), + required(106, "time", Types.TimeType.get()), + required(107, "ts", Types.TimestampType.withoutZone()), + required(108, "ts_tz", Types.TimestampType.withZone()), + required(109, "s", Types.StringType.get()), + required(110, "uuid", Types.UUIDType.get()), + required(111, "fixed", Types.FixedType.ofLength(7)), + optional(112, "bytes", Types.BinaryType.get()), + required(113, "dec_38_10", Types.DecimalType.of(38, 10))); + + // Consuming the data as per Type.java + GenericRecord record = GenericRecord.create(schema); + record.set(0, true); + record.set(1, 42); + record.set(2, 42L); + record.set(3, 3.14f); + record.set(4, 3.141592653589793); + record.set(5, Literal.of("2022-01-01").to(Types.DateType.get()).value()); + record.set(6, Literal.of("10:10:10").to(Types.TimeType.get()).value()); + record.set( + 7, Literal.of("2017-12-01T10:12:55.038194").to(Types.TimestampType.withoutZone()).value()); + record.set( + 8, + Literal.of("2017-11-29T11:30:07.123456+01:00").to(Types.TimestampType.withZone()).value()); + record.set(9, "string"); + record.set(10, UUID.randomUUID()); + record.set(11, ByteBuffer.wrap(new byte[] {0, 1, 2, 3, 4, 5, 6})); + record.set(12, ByteBuffer.wrap(new byte[] {1, 2, 3})); + record.set( + 13, Literal.of("12345678901234567890.1234567890").to(Types.DecimalType.of(38, 10)).value()); + + StructProjection structProjection = StructProjection.create(schema, schema); + StructProjection row = structProjection.wrap(record); + + OutputFile file = Files.localOutput(createTempFile(temp)); + + DataWriter dataWriter = + Parquet.writeData(file) + .schema(schema) + .createWriterFunc(InternalWriter::buildWriter) + .overwrite() + .withSpec(PartitionSpec.unpartitioned()) + .build(); + + try { + dataWriter.write(row); + } finally { + dataWriter.close(); + } + + DataFile dataFile = dataWriter.toDataFile(); + + assertThat(dataFile.format()).as("Format should be Parquet").isEqualTo(FileFormat.PARQUET); + assertThat(dataFile.content()).as("Should be data file").isEqualTo(FileContent.DATA); + assertThat(dataFile.partition().size()).as("Partition should be empty").isEqualTo(0); + assertThat(dataFile.keyMetadata()).as("Key metadata should be null").isNull(); + + List writtenRecords; + try (CloseableIterable reader = + Parquet.read(file.toInputFile()) + .project(schema) + .createReaderFunc(fileSchema -> InternalReader.buildReader(schema, fileSchema)) + .build()) { + writtenRecords = Lists.newArrayList(reader); + } + assertThat(writtenRecords).hasSize(1); + assertThat(writtenRecords.get(0)).isEqualTo(record); + } +} From 3eaf3bc451a4ed9888a428b2ba04770c3d11724a Mon Sep 17 00:00:00 2001 From: ajantha-bhat Date: Fri, 3 Jan 2025 22:11:36 +0530 Subject: [PATCH 4/4] Address comments --- .../org/apache/iceberg/util/RandomUtil.java | 8 +- .../data/parquet/TestInternalData.java | 94 +++++++++++ .../data/parquet/BaseParquetReaders.java | 132 ++++++++++++++- .../data/parquet/BaseParquetWriter.java | 126 +++++++++++++- .../data/parquet/GenericParquetReaders.java | 154 ++++-------------- .../data/parquet/GenericParquetWriter.java | 117 ++----------- .../iceberg/data/parquet/InternalReader.java | 145 ++++------------- .../iceberg/data/parquet/InternalWriter.java | 94 +---------- .../iceberg/parquet/ParquetValueReaders.java | 36 ++++ .../iceberg/parquet/ParquetValueWriters.java | 31 ++++ .../iceberg/parquet/TestInternalWriter.java | 132 --------------- .../spark/data/SparkParquetReaders.java | 41 +---- 12 files changed, 503 insertions(+), 607 deletions(-) create mode 100644 data/src/test/java/org/apache/iceberg/data/parquet/TestInternalData.java delete mode 100644 parquet/src/test/java/org/apache/iceberg/parquet/TestInternalWriter.java diff --git a/api/src/test/java/org/apache/iceberg/util/RandomUtil.java b/api/src/test/java/org/apache/iceberg/util/RandomUtil.java index 7414a457c520..694dd1b630a6 100644 --- a/api/src/test/java/org/apache/iceberg/util/RandomUtil.java +++ b/api/src/test/java/org/apache/iceberg/util/RandomUtil.java @@ -237,7 +237,7 @@ private static BigInteger randomUnscaled(int precision, Random random) { } public static List generateList( - Random random, Types.ListType list, Supplier elementResult) { + Random random, Types.ListType list, Supplier elements) { int numElements = random.nextInt(20); List result = Lists.newArrayListWithExpectedSize(numElements); @@ -246,7 +246,7 @@ public static List generateList( if (list.isElementOptional() && random.nextInt(20) == 1) { result.add(null); } else { - result.add(elementResult.get()); + result.add(elements.get()); } } @@ -254,7 +254,7 @@ public static List generateList( } public static Map generateMap( - Random random, Types.MapType map, Supplier keyResult, Supplier valueResult) { + Random random, Types.MapType map, Supplier keyResult, Supplier values) { int numEntries = random.nextInt(20); Map result = Maps.newLinkedHashMap(); @@ -279,7 +279,7 @@ public static Map generateMap( if (map.isValueOptional() && random.nextInt(20) == 1) { result.put(key, null); } else { - result.put(key, valueResult.get()); + result.put(key, values.get()); } } diff --git a/data/src/test/java/org/apache/iceberg/data/parquet/TestInternalData.java b/data/src/test/java/org/apache/iceberg/data/parquet/TestInternalData.java new file mode 100644 index 000000000000..c6b698d5ec83 --- /dev/null +++ b/data/src/test/java/org/apache/iceberg/data/parquet/TestInternalData.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.data.parquet; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.File; +import java.io.IOException; +import java.util.List; +import org.apache.iceberg.Files; +import org.apache.iceberg.InternalTestHelpers; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.RandomInternalData; +import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.data.DataTest; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.DataWriter; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; + +public class TestInternalData extends DataTest { + @Override + protected void writeAndValidate(Schema schema) throws IOException { + writeAndValidate(schema, schema); + } + + @Override + protected void writeAndValidate(Schema writeSchema, Schema expectedSchema) throws IOException { + List expected = RandomInternalData.generate(writeSchema, 100, 42L); + + File testFile = File.createTempFile("junit", null, temp.toFile()); + assertThat(testFile.delete()).isTrue(); + + OutputFile outputFile = Files.localOutput(testFile); + + try (DataWriter dataWriter = + Parquet.writeData(outputFile) + .schema(writeSchema) + .createWriterFunc(InternalWriter::buildWriter) + .overwrite() + .withSpec(PartitionSpec.unpartitioned()) + .build()) { + for (StructLike record : expected) { + dataWriter.write(record); + } + } + + List rows; + try (CloseableIterable reader = + Parquet.read(Files.localInput(testFile)) + .project(expectedSchema) + .createReaderFunc(fileSchema -> InternalReader.buildReader(expectedSchema, fileSchema)) + .build()) { + rows = Lists.newArrayList(reader); + } + + for (int i = 0; i < expected.size(); i += 1) { + InternalTestHelpers.assertEquals(expectedSchema.asStruct(), expected.get(i), rows.get(i)); + } + + // test reuseContainers + try (CloseableIterable reader = + Parquet.read(Files.localInput(testFile)) + .project(expectedSchema) + .reuseContainers() + .createReaderFunc(fileSchema -> InternalReader.buildReader(expectedSchema, fileSchema)) + .build()) { + int index = 0; + for (StructLike actualRecord : reader) { + InternalTestHelpers.assertEquals( + expectedSchema.asStruct(), expected.get(index), actualRecord); + index += 1; + } + } + } +} diff --git a/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java b/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java index 6216087d2e9e..afaeb313a07d 100644 --- a/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java +++ b/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java @@ -20,6 +20,7 @@ import java.util.List; import java.util.Map; +import java.util.Optional; import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.Schema; import org.apache.iceberg.parquet.ParquetSchemaUtil; @@ -34,6 +35,8 @@ import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.schema.GroupType; import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation; +import org.apache.parquet.schema.LogicalTypeAnnotation.TimeUnit; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.PrimitiveType; import org.apache.parquet.schema.Type; @@ -64,15 +67,30 @@ protected ParquetValueReader createReader( protected abstract ParquetValueReader createStructReader( List types, List> fieldReaders, Types.StructType structType); - protected abstract LogicalTypeAnnotation.LogicalTypeAnnotationVisitor> - logicalTypeReaderVisitor( - ColumnDescriptor desc, - org.apache.iceberg.types.Type.PrimitiveType expected, - PrimitiveType primitive); + protected ParquetValueReaders.PrimitiveReader fixedReader(ColumnDescriptor desc) { + return null; + } + + protected ParquetValueReaders.PrimitiveReader int96Reader(ColumnDescriptor desc) { + return null; + } - protected abstract ParquetValueReaders.PrimitiveReader fixedReader(ColumnDescriptor desc); + protected Optional> dateReader(ColumnDescriptor desc) { + return Optional.empty(); + } - protected abstract ParquetValueReaders.PrimitiveReader int96Reader(ColumnDescriptor desc); + protected Optional> timeReader(ColumnDescriptor desc, TimeUnit unit) { + return Optional.empty(); + } + + protected Optional> timestampReader( + ColumnDescriptor desc, TimeUnit unit, boolean isAdjustedToUTC) { + return Optional.empty(); + } + + protected Optional> uuidReader(ColumnDescriptor desc) { + return Optional.empty(); + } protected Object convertConstant(org.apache.iceberg.types.Type type, Object value) { return value; @@ -112,6 +130,104 @@ public ParquetValueReader struct( } } + private class LogicalTypeAnnotationParquetValueReaderVisitor + implements LogicalTypeAnnotation.LogicalTypeAnnotationVisitor> { + + private final ColumnDescriptor desc; + private final org.apache.iceberg.types.Type.PrimitiveType expected; + private final PrimitiveType primitive; + + LogicalTypeAnnotationParquetValueReaderVisitor( + ColumnDescriptor desc, + org.apache.iceberg.types.Type.PrimitiveType expected, + PrimitiveType primitive) { + this.desc = desc; + this.expected = expected; + this.primitive = primitive; + } + + @Override + public Optional> visit( + LogicalTypeAnnotation.StringLogicalTypeAnnotation stringLogicalType) { + return Optional.of(new ParquetValueReaders.StringReader(desc)); + } + + @Override + public Optional> visit( + LogicalTypeAnnotation.EnumLogicalTypeAnnotation enumLogicalType) { + return Optional.of(new ParquetValueReaders.StringReader(desc)); + } + + @Override + public Optional> visit(DecimalLogicalTypeAnnotation decimalLogicalType) { + switch (primitive.getPrimitiveTypeName()) { + case BINARY: + case FIXED_LEN_BYTE_ARRAY: + return Optional.of( + new ParquetValueReaders.BinaryAsDecimalReader(desc, decimalLogicalType.getScale())); + case INT64: + return Optional.of( + new ParquetValueReaders.LongAsDecimalReader(desc, decimalLogicalType.getScale())); + case INT32: + return Optional.of( + new ParquetValueReaders.IntegerAsDecimalReader(desc, decimalLogicalType.getScale())); + default: + throw new UnsupportedOperationException( + "Unsupported base type for decimal: " + primitive.getPrimitiveTypeName()); + } + } + + @Override + public Optional> visit( + LogicalTypeAnnotation.DateLogicalTypeAnnotation dateLogicalType) { + return dateReader(desc); + } + + @Override + public Optional> visit( + LogicalTypeAnnotation.TimeLogicalTypeAnnotation timeLogicalType) { + return timeReader(desc, timeLogicalType.getUnit()); + } + + @Override + public Optional> visit( + LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestampLogicalType) { + return timestampReader( + desc, + timestampLogicalType.getUnit(), + ((Types.TimestampType) expected).shouldAdjustToUTC()); + } + + @Override + public Optional> visit( + LogicalTypeAnnotation.IntLogicalTypeAnnotation intLogicalType) { + if (intLogicalType.getBitWidth() == 64) { + return Optional.of(new ParquetValueReaders.UnboxedReader<>(desc)); + } + return (expected.typeId() == org.apache.iceberg.types.Type.TypeID.LONG) + ? Optional.of(new ParquetValueReaders.IntAsLongReader(desc)) + : Optional.of(new ParquetValueReaders.UnboxedReader<>(desc)); + } + + @Override + public Optional> visit( + LogicalTypeAnnotation.JsonLogicalTypeAnnotation jsonLogicalType) { + return Optional.of(new ParquetValueReaders.StringReader(desc)); + } + + @Override + public Optional> visit( + LogicalTypeAnnotation.BsonLogicalTypeAnnotation bsonLogicalType) { + return Optional.of(new ParquetValueReaders.BytesReader(desc)); + } + + @Override + public Optional> visit( + LogicalTypeAnnotation.UUIDLogicalTypeAnnotation uuidLogicalType) { + return uuidReader(desc); + } + } + private class ReadBuilder extends TypeWithSchemaVisitor> { private final MessageType type; private final Map idToConstant; @@ -253,7 +369,7 @@ public ParquetValueReader primitive( if (primitive.getLogicalTypeAnnotation() != null) { return primitive .getLogicalTypeAnnotation() - .accept(logicalTypeReaderVisitor(desc, expected, primitive)) + .accept(new LogicalTypeAnnotationParquetValueReaderVisitor(desc, expected, primitive)) .orElseThrow( () -> new UnsupportedOperationException( diff --git a/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetWriter.java b/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetWriter.java index 2bbcf691d8ca..be21fdbf5916 100644 --- a/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetWriter.java +++ b/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetWriter.java @@ -23,6 +23,7 @@ import org.apache.iceberg.parquet.ParquetTypeVisitor; import org.apache.iceberg.parquet.ParquetValueWriter; import org.apache.iceberg.parquet.ParquetValueWriters; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.schema.GroupType; @@ -41,11 +42,26 @@ protected ParquetValueWriter createWriter(MessageType type) { protected abstract ParquetValueWriters.StructWriter createStructWriter( List> writers); - protected abstract LogicalTypeAnnotation.LogicalTypeAnnotationVisitor< - ParquetValueWriters.PrimitiveWriter> - logicalTypeWriterVisitor(ColumnDescriptor desc); + protected ParquetValueWriters.PrimitiveWriter fixedWriter(ColumnDescriptor desc) { + return ParquetValueWriters.fixed(desc); + } + + protected Optional> dateWriter(ColumnDescriptor desc) { + return Optional.empty(); + } + + protected Optional> timeWriter(ColumnDescriptor desc) { + return Optional.empty(); + } + + protected Optional> timestampWriter( + ColumnDescriptor desc, boolean isAdjustedToUTC) { + return Optional.empty(); + } - protected abstract ParquetValueWriters.PrimitiveWriter fixedWriter(ColumnDescriptor desc); + protected Optional> uuidWriter(ColumnDescriptor desc) { + return Optional.empty(); + } private class WriteBuilder extends ParquetTypeVisitor> { private final MessageType type; @@ -117,7 +133,7 @@ public ParquetValueWriter primitive(PrimitiveType primitive) { LogicalTypeAnnotation logicalType = primitive.getLogicalTypeAnnotation(); if (logicalType != null) { Optional> writer = - logicalType.accept(logicalTypeWriterVisitor(desc)); + logicalType.accept(new LogicalTypeWriterVisitor(desc)); if (writer.isPresent()) { return writer.get(); } @@ -143,4 +159,104 @@ public ParquetValueWriter primitive(PrimitiveType primitive) { } } } + + private class LogicalTypeWriterVisitor + implements LogicalTypeAnnotation.LogicalTypeAnnotationVisitor< + ParquetValueWriters.PrimitiveWriter> { + private final ColumnDescriptor desc; + + private LogicalTypeWriterVisitor(ColumnDescriptor desc) { + this.desc = desc; + } + + @Override + public Optional> visit( + LogicalTypeAnnotation.StringLogicalTypeAnnotation stringType) { + return Optional.of(ParquetValueWriters.strings(desc)); + } + + @Override + public Optional> visit( + LogicalTypeAnnotation.EnumLogicalTypeAnnotation enumType) { + return Optional.of(ParquetValueWriters.strings(desc)); + } + + @Override + public Optional> visit( + LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalType) { + switch (desc.getPrimitiveType().getPrimitiveTypeName()) { + case INT32: + return Optional.of( + ParquetValueWriters.decimalAsInteger( + desc, decimalType.getPrecision(), decimalType.getScale())); + case INT64: + return Optional.of( + ParquetValueWriters.decimalAsLong( + desc, decimalType.getPrecision(), decimalType.getScale())); + case BINARY: + case FIXED_LEN_BYTE_ARRAY: + return Optional.of( + ParquetValueWriters.decimalAsFixed( + desc, decimalType.getPrecision(), decimalType.getScale())); + } + return Optional.empty(); + } + + @Override + public Optional> visit( + LogicalTypeAnnotation.DateLogicalTypeAnnotation dateType) { + return dateWriter(desc); + } + + @Override + public Optional> visit( + LogicalTypeAnnotation.TimeLogicalTypeAnnotation timeType) { + Preconditions.checkArgument( + LogicalTypeAnnotation.TimeUnit.MICROS.equals(timeType.getUnit()), + "Cannot write time in %s, only MICROS is supported", + timeType.getUnit()); + return timeWriter(desc); + } + + @Override + public Optional> visit( + LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestampType) { + Preconditions.checkArgument( + LogicalTypeAnnotation.TimeUnit.MICROS.equals(timestampType.getUnit()), + "Cannot write timestamp in %s, only MICROS is supported", + timestampType.getUnit()); + return timestampWriter(desc, timestampType.isAdjustedToUTC()); + } + + @Override + public Optional> visit( + LogicalTypeAnnotation.IntLogicalTypeAnnotation intType) { + Preconditions.checkArgument( + intType.isSigned() || intType.getBitWidth() < 64, + "Cannot read uint64: not a supported Java type"); + if (intType.getBitWidth() < 64) { + return Optional.of(ParquetValueWriters.ints(desc)); + } else { + return Optional.of(ParquetValueWriters.longs(desc)); + } + } + + @Override + public Optional> visit( + LogicalTypeAnnotation.JsonLogicalTypeAnnotation jsonLogicalType) { + return Optional.of(ParquetValueWriters.strings(desc)); + } + + @Override + public Optional> visit( + LogicalTypeAnnotation.BsonLogicalTypeAnnotation bsonType) { + return Optional.of(ParquetValueWriters.byteBuffers(desc)); + } + + @Override + public Optional> visit( + LogicalTypeAnnotation.UUIDLogicalTypeAnnotation uuidLogicalType) { + return uuidWriter(desc); + } + } } diff --git a/parquet/src/main/java/org/apache/iceberg/data/parquet/GenericParquetReaders.java b/parquet/src/main/java/org/apache/iceberg/data/parquet/GenericParquetReaders.java index 9ab086a8ae80..af6e77d7615d 100644 --- a/parquet/src/main/java/org/apache/iceberg/data/parquet/GenericParquetReaders.java +++ b/parquet/src/main/java/org/apache/iceberg/data/parquet/GenericParquetReaders.java @@ -38,12 +38,10 @@ import org.apache.iceberg.parquet.ParquetValueReader; import org.apache.iceberg.parquet.ParquetValueReaders; import org.apache.iceberg.parquet.ParquetValueReaders.StructReader; -import org.apache.iceberg.types.Types; import org.apache.iceberg.types.Types.StructType; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.schema.LogicalTypeAnnotation; import org.apache.parquet.schema.MessageType; -import org.apache.parquet.schema.PrimitiveType; import org.apache.parquet.schema.Type; public class GenericParquetReaders extends BaseParquetReaders { @@ -68,15 +66,6 @@ protected ParquetValueReader createStructReader( return new RecordReader(types, fieldReaders, structType); } - @Override - protected LogicalTypeAnnotation.LogicalTypeAnnotationVisitor> - logicalTypeReaderVisitor( - ColumnDescriptor desc, - org.apache.iceberg.types.Type.PrimitiveType expected, - PrimitiveType primitive) { - return new LogicalTypeAnnotationParquetValueReaderVisitor(desc, expected, primitive); - } - @Override protected ParquetValueReaders.PrimitiveReader fixedReader(ColumnDescriptor desc) { return new FixedReader(desc); @@ -87,6 +76,41 @@ protected ParquetValueReaders.PrimitiveReader int96Reader(ColumnDescriptor de return new TimestampInt96Reader(desc); } + @Override + protected Optional> dateReader(ColumnDescriptor desc) { + return Optional.of(new DateReader(desc)); + } + + @Override + protected Optional> timeReader( + ColumnDescriptor desc, LogicalTypeAnnotation.TimeUnit unit) { + switch (unit) { + case MICROS: + return Optional.of(new TimeReader(desc)); + case MILLIS: + return Optional.of(new TimeMillisReader(desc)); + default: + return Optional.empty(); + } + } + + @Override + protected Optional> timestampReader( + ColumnDescriptor desc, LogicalTypeAnnotation.TimeUnit unit, boolean isAdjustedToUTC) { + switch (unit) { + case MICROS: + return isAdjustedToUTC + ? Optional.of(new TimestamptzReader(desc)) + : Optional.of(new TimestampReader(desc)); + case MILLIS: + return isAdjustedToUTC + ? Optional.of(new TimestamptzMillisReader(desc)) + : Optional.of(new TimestampMillisReader(desc)); + default: + return Optional.empty(); + } + } + @Override protected Object convertConstant(org.apache.iceberg.types.Type type, Object value) { return GenericDataUtil.internalToGeneric(type, value); @@ -128,114 +152,6 @@ protected void set(Record struct, int pos, Object value) { } } - private class LogicalTypeAnnotationParquetValueReaderVisitor - implements LogicalTypeAnnotation.LogicalTypeAnnotationVisitor> { - - private final ColumnDescriptor desc; - private final org.apache.iceberg.types.Type.PrimitiveType expected; - private final PrimitiveType primitive; - - LogicalTypeAnnotationParquetValueReaderVisitor( - ColumnDescriptor desc, - org.apache.iceberg.types.Type.PrimitiveType expected, - PrimitiveType primitive) { - this.desc = desc; - this.expected = expected; - this.primitive = primitive; - } - - @Override - public Optional> visit( - LogicalTypeAnnotation.StringLogicalTypeAnnotation stringLogicalType) { - return Optional.of(new ParquetValueReaders.StringReader(desc)); - } - - @Override - public Optional> visit( - LogicalTypeAnnotation.EnumLogicalTypeAnnotation enumLogicalType) { - return Optional.of(new ParquetValueReaders.StringReader(desc)); - } - - @Override - public Optional> visit( - LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalLogicalType) { - switch (primitive.getPrimitiveTypeName()) { - case BINARY: - case FIXED_LEN_BYTE_ARRAY: - return Optional.of( - new ParquetValueReaders.BinaryAsDecimalReader(desc, decimalLogicalType.getScale())); - case INT64: - return Optional.of( - new ParquetValueReaders.LongAsDecimalReader(desc, decimalLogicalType.getScale())); - case INT32: - return Optional.of( - new ParquetValueReaders.IntegerAsDecimalReader(desc, decimalLogicalType.getScale())); - default: - throw new UnsupportedOperationException( - "Unsupported base type for decimal: " + primitive.getPrimitiveTypeName()); - } - } - - @Override - public Optional> visit( - LogicalTypeAnnotation.DateLogicalTypeAnnotation dateLogicalType) { - return Optional.of(new DateReader(desc)); - } - - @Override - public Optional> visit( - LogicalTypeAnnotation.TimeLogicalTypeAnnotation timeLogicalType) { - if (timeLogicalType.getUnit() == LogicalTypeAnnotation.TimeUnit.MICROS) { - return Optional.of(new TimeReader(desc)); - } else if (timeLogicalType.getUnit() == LogicalTypeAnnotation.TimeUnit.MILLIS) { - return Optional.of(new TimeMillisReader(desc)); - } - - return Optional.empty(); - } - - @Override - public Optional> visit( - LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestampLogicalType) { - if (timestampLogicalType.getUnit() == LogicalTypeAnnotation.TimeUnit.MICROS) { - Types.TimestampType tsMicrosType = (Types.TimestampType) expected; - return tsMicrosType.shouldAdjustToUTC() - ? Optional.of(new TimestamptzReader(desc)) - : Optional.of(new TimestampReader(desc)); - } else if (timestampLogicalType.getUnit() == LogicalTypeAnnotation.TimeUnit.MILLIS) { - Types.TimestampType tsMillisType = (Types.TimestampType) expected; - return tsMillisType.shouldAdjustToUTC() - ? Optional.of(new TimestamptzMillisReader(desc)) - : Optional.of(new TimestampMillisReader(desc)); - } - - return LogicalTypeAnnotation.LogicalTypeAnnotationVisitor.super.visit(timestampLogicalType); - } - - @Override - public Optional> visit( - LogicalTypeAnnotation.IntLogicalTypeAnnotation intLogicalType) { - if (intLogicalType.getBitWidth() == 64) { - return Optional.of(new ParquetValueReaders.UnboxedReader<>(desc)); - } - return (expected.typeId() == org.apache.iceberg.types.Type.TypeID.LONG) - ? Optional.of(new ParquetValueReaders.IntAsLongReader(desc)) - : Optional.of(new ParquetValueReaders.UnboxedReader<>(desc)); - } - - @Override - public Optional> visit( - LogicalTypeAnnotation.JsonLogicalTypeAnnotation jsonLogicalType) { - return Optional.of(new ParquetValueReaders.StringReader(desc)); - } - - @Override - public Optional> visit( - LogicalTypeAnnotation.BsonLogicalTypeAnnotation bsonLogicalType) { - return Optional.of(new ParquetValueReaders.BytesReader(desc)); - } - } - private static final OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC); private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate(); diff --git a/parquet/src/main/java/org/apache/iceberg/data/parquet/GenericParquetWriter.java b/parquet/src/main/java/org/apache/iceberg/data/parquet/GenericParquetWriter.java index e0a83d4abff9..6725e5f99460 100644 --- a/parquet/src/main/java/org/apache/iceberg/data/parquet/GenericParquetWriter.java +++ b/parquet/src/main/java/org/apache/iceberg/data/parquet/GenericParquetWriter.java @@ -31,9 +31,7 @@ import org.apache.iceberg.parquet.ParquetValueWriter; import org.apache.iceberg.parquet.ParquetValueWriters; import org.apache.iceberg.parquet.ParquetValueWriters.StructWriter; -import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.parquet.column.ColumnDescriptor; -import org.apache.parquet.schema.LogicalTypeAnnotation; import org.apache.parquet.schema.MessageType; public class GenericParquetWriter extends BaseParquetWriter { @@ -51,16 +49,23 @@ protected StructWriter createStructWriter(List> wr } @Override - protected LogicalTypeAnnotation.LogicalTypeAnnotationVisitor< - ParquetValueWriters.PrimitiveWriter> - logicalTypeWriterVisitor(ColumnDescriptor desc) { - return new LogicalTypeWriterVisitor(desc); + protected Optional> dateWriter(ColumnDescriptor desc) { + return Optional.of(new DateWriter(desc)); } @Override - protected ParquetValueWriters.PrimitiveWriter fixedWriter(ColumnDescriptor desc) { - // accepts byte[] and internally writes as binary. - return ParquetValueWriters.fixed(desc); + protected Optional> timeWriter(ColumnDescriptor desc) { + return Optional.of(new TimeWriter(desc)); + } + + @Override + protected Optional> timestampWriter( + ColumnDescriptor desc, boolean isAdjustedToUTC) { + if (isAdjustedToUTC) { + return Optional.of(new TimestamptzWriter(desc)); + } else { + return Optional.of(new TimestampWriter(desc)); + } } private static class RecordWriter extends StructWriter { @@ -74,100 +79,6 @@ protected Object get(Record struct, int index) { } } - private static class LogicalTypeWriterVisitor - implements LogicalTypeAnnotation.LogicalTypeAnnotationVisitor< - ParquetValueWriters.PrimitiveWriter> { - private final ColumnDescriptor desc; - - private LogicalTypeWriterVisitor(ColumnDescriptor desc) { - this.desc = desc; - } - - @Override - public Optional> visit( - LogicalTypeAnnotation.StringLogicalTypeAnnotation stringType) { - return Optional.of(ParquetValueWriters.strings(desc)); - } - - @Override - public Optional> visit( - LogicalTypeAnnotation.EnumLogicalTypeAnnotation enumType) { - return Optional.of(ParquetValueWriters.strings(desc)); - } - - @Override - public Optional> visit( - LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalType) { - switch (desc.getPrimitiveType().getPrimitiveTypeName()) { - case INT32: - return Optional.of( - ParquetValueWriters.decimalAsInteger( - desc, decimalType.getPrecision(), decimalType.getScale())); - case INT64: - return Optional.of( - ParquetValueWriters.decimalAsLong( - desc, decimalType.getPrecision(), decimalType.getScale())); - case BINARY: - case FIXED_LEN_BYTE_ARRAY: - return Optional.of( - ParquetValueWriters.decimalAsFixed( - desc, decimalType.getPrecision(), decimalType.getScale())); - } - return Optional.empty(); - } - - @Override - public Optional> visit( - LogicalTypeAnnotation.DateLogicalTypeAnnotation dateType) { - return Optional.of(new DateWriter(desc)); - } - - @Override - public Optional> visit( - LogicalTypeAnnotation.TimeLogicalTypeAnnotation timeType) { - return Optional.of(new TimeWriter(desc)); - } - - @Override - public Optional> visit( - LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestampType) { - Preconditions.checkArgument( - LogicalTypeAnnotation.TimeUnit.MICROS.equals(timestampType.getUnit()), - "Cannot write timestamp in %s, only MICROS is supported", - timestampType.getUnit()); - if (timestampType.isAdjustedToUTC()) { - return Optional.of(new TimestamptzWriter(desc)); - } else { - return Optional.of(new TimestampWriter(desc)); - } - } - - @Override - public Optional> visit( - LogicalTypeAnnotation.IntLogicalTypeAnnotation intType) { - Preconditions.checkArgument( - intType.isSigned() || intType.getBitWidth() < 64, - "Cannot read uint64: not a supported Java type"); - if (intType.getBitWidth() < 64) { - return Optional.of(ParquetValueWriters.ints(desc)); - } else { - return Optional.of(ParquetValueWriters.longs(desc)); - } - } - - @Override - public Optional> visit( - LogicalTypeAnnotation.JsonLogicalTypeAnnotation jsonLogicalType) { - return Optional.of(ParquetValueWriters.strings(desc)); - } - - @Override - public Optional> visit( - LogicalTypeAnnotation.BsonLogicalTypeAnnotation bsonType) { - return Optional.of(ParquetValueWriters.byteBuffers(desc)); - } - } - private static final OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC); private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate(); diff --git a/parquet/src/main/java/org/apache/iceberg/data/parquet/InternalReader.java b/parquet/src/main/java/org/apache/iceberg/data/parquet/InternalReader.java index 51cdef1952c0..2e176b4f2da1 100644 --- a/parquet/src/main/java/org/apache/iceberg/data/parquet/InternalReader.java +++ b/parquet/src/main/java/org/apache/iceberg/data/parquet/InternalReader.java @@ -31,7 +31,6 @@ import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.schema.LogicalTypeAnnotation; import org.apache.parquet.schema.MessageType; -import org.apache.parquet.schema.PrimitiveType; import org.apache.parquet.schema.Type; public class InternalReader extends BaseParquetReaders { @@ -53,16 +52,7 @@ public static ParquetValueReader buildReader( @Override protected ParquetValueReader createStructReader( List types, List> fieldReaders, StructType structType) { - return new ParquetStructReader(types, fieldReaders, structType); - } - - @Override - protected LogicalTypeAnnotation.LogicalTypeAnnotationVisitor> - logicalTypeReaderVisitor( - ColumnDescriptor desc, - org.apache.iceberg.types.Type.PrimitiveType expected, - PrimitiveType primitive) { - return new LogicalTypeAnnotationParquetValueReaderVisitor(desc, expected, primitive); + return new StructLikeReader(types, fieldReaders, structType); } @Override @@ -72,14 +62,43 @@ protected ParquetValueReaders.PrimitiveReader fixedReader(ColumnDescriptor de @Override protected ParquetValueReaders.PrimitiveReader int96Reader(ColumnDescriptor desc) { - // normal handling as int96 - return new ParquetValueReaders.UnboxedReader<>(desc); + return new ParquetValueReaders.TimestampInt96Reader(desc); + } + + @Override + protected Optional> dateReader(ColumnDescriptor desc) { + return Optional.of(new ParquetValueReaders.UnboxedReader<>(desc)); + } + + @Override + protected Optional> timeReader( + ColumnDescriptor desc, LogicalTypeAnnotation.TimeUnit unit) { + if (unit == LogicalTypeAnnotation.TimeUnit.MILLIS) { + return Optional.of(new ParquetValueReaders.TimestampMillisReader(desc)); + } + + return Optional.of(new ParquetValueReaders.UnboxedReader<>(desc)); + } + + @Override + protected Optional> timestampReader( + ColumnDescriptor desc, LogicalTypeAnnotation.TimeUnit unit, boolean isAdjustedToUTC) { + if (unit == LogicalTypeAnnotation.TimeUnit.MILLIS) { + return Optional.of(new ParquetValueReaders.TimestampMillisReader(desc)); + } + + return Optional.of(new ParquetValueReaders.UnboxedReader<>(desc)); } - private static class ParquetStructReader extends StructReader { + @Override + protected Optional> uuidReader(ColumnDescriptor desc) { + return Optional.of(new ParquetValueReaders.UUIDReader(desc)); + } + + private static class StructLikeReader extends StructReader { private final GenericRecord template; - ParquetStructReader(List types, List> readers, StructType struct) { + StructLikeReader(List types, List> readers, StructType struct) { super(types, readers); this.template = struct != null ? GenericRecord.create(struct) : null; } @@ -108,100 +127,4 @@ protected void set(StructLike struct, int pos, Object value) { struct.set(pos, value); } } - - private static class LogicalTypeAnnotationParquetValueReaderVisitor - implements LogicalTypeAnnotation.LogicalTypeAnnotationVisitor> { - - private final ColumnDescriptor desc; - private final org.apache.iceberg.types.Type.PrimitiveType expected; - private final PrimitiveType primitive; - - LogicalTypeAnnotationParquetValueReaderVisitor( - ColumnDescriptor desc, - org.apache.iceberg.types.Type.PrimitiveType expected, - PrimitiveType primitive) { - this.desc = desc; - this.expected = expected; - this.primitive = primitive; - } - - @Override - public Optional> visit( - LogicalTypeAnnotation.StringLogicalTypeAnnotation stringLogicalType) { - return Optional.of(new ParquetValueReaders.StringReader(desc)); - } - - @Override - public Optional> visit( - LogicalTypeAnnotation.EnumLogicalTypeAnnotation enumLogicalType) { - return Optional.of(new ParquetValueReaders.StringReader(desc)); - } - - @Override - public Optional> visit( - LogicalTypeAnnotation.UUIDLogicalTypeAnnotation uuidLogicalType) { - return Optional.of(new ParquetValueReaders.UUIDReader(desc)); - } - - @Override - public Optional> visit( - LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalLogicalType) { - switch (primitive.getPrimitiveTypeName()) { - case BINARY: - case FIXED_LEN_BYTE_ARRAY: - return Optional.of( - new ParquetValueReaders.BinaryAsDecimalReader(desc, decimalLogicalType.getScale())); - case INT64: - return Optional.of( - new ParquetValueReaders.LongAsDecimalReader(desc, decimalLogicalType.getScale())); - case INT32: - return Optional.of( - new ParquetValueReaders.IntegerAsDecimalReader(desc, decimalLogicalType.getScale())); - default: - throw new UnsupportedOperationException( - "Unsupported base type for decimal: " + primitive.getPrimitiveTypeName()); - } - } - - @Override - public Optional> visit( - LogicalTypeAnnotation.DateLogicalTypeAnnotation dateLogicalType) { - return Optional.of(new ParquetValueReaders.UnboxedReader<>(desc)); - } - - @Override - public Optional> visit( - LogicalTypeAnnotation.TimeLogicalTypeAnnotation timeLogicalType) { - return Optional.of(new ParquetValueReaders.UnboxedReader<>(desc)); - } - - @Override - public Optional> visit( - LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestampLogicalType) { - return Optional.of(new ParquetValueReaders.UnboxedReader<>(desc)); - } - - @Override - public Optional> visit( - LogicalTypeAnnotation.IntLogicalTypeAnnotation intLogicalType) { - if (intLogicalType.getBitWidth() == 64) { - return Optional.of(new ParquetValueReaders.UnboxedReader<>(desc)); - } - return (expected.typeId() == org.apache.iceberg.types.Type.TypeID.LONG) - ? Optional.of(new ParquetValueReaders.IntAsLongReader(desc)) - : Optional.of(new ParquetValueReaders.UnboxedReader<>(desc)); - } - - @Override - public Optional> visit( - LogicalTypeAnnotation.JsonLogicalTypeAnnotation jsonLogicalType) { - return Optional.of(new ParquetValueReaders.StringReader(desc)); - } - - @Override - public Optional> visit( - LogicalTypeAnnotation.BsonLogicalTypeAnnotation bsonLogicalType) { - return Optional.of(new ParquetValueReaders.BytesReader(desc)); - } - } } diff --git a/parquet/src/main/java/org/apache/iceberg/data/parquet/InternalWriter.java b/parquet/src/main/java/org/apache/iceberg/data/parquet/InternalWriter.java index 2acb184c4d3f..839ff77b84de 100644 --- a/parquet/src/main/java/org/apache/iceberg/data/parquet/InternalWriter.java +++ b/parquet/src/main/java/org/apache/iceberg/data/parquet/InternalWriter.java @@ -23,11 +23,10 @@ import org.apache.iceberg.StructLike; import org.apache.iceberg.parquet.ParquetValueWriter; import org.apache.iceberg.parquet.ParquetValueWriters; +import org.apache.iceberg.parquet.ParquetValueWriters.PrimitiveWriter; import org.apache.iceberg.parquet.ParquetValueWriters.StructWriter; -import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.types.Type; import org.apache.parquet.column.ColumnDescriptor; -import org.apache.parquet.schema.LogicalTypeAnnotation; import org.apache.parquet.schema.MessageType; /** @@ -47,24 +46,21 @@ public static ParquetValueWriter buildWriter(MessageType type) { @Override protected StructWriter createStructWriter(List> writers) { - return new ParquetStructWriter(writers); + return new StructLikeWriter(writers); } @Override - protected LogicalTypeAnnotation.LogicalTypeAnnotationVisitor< - ParquetValueWriters.PrimitiveWriter> - logicalTypeWriterVisitor(ColumnDescriptor desc) { - return new LogicalTypeWriterVisitor(desc); + protected PrimitiveWriter fixedWriter(ColumnDescriptor desc) { + return ParquetValueWriters.fixedBuffer(desc); } @Override - protected ParquetValueWriters.PrimitiveWriter fixedWriter(ColumnDescriptor desc) { - // accepts ByteBuffer and internally writes as binary. - return ParquetValueWriters.byteBuffers(desc); + protected Optional> uuidWriter(ColumnDescriptor desc) { + return Optional.of(ParquetValueWriters.uuids(desc)); } - private static class ParquetStructWriter extends StructWriter { - private ParquetStructWriter(List> writers) { + private static class StructLikeWriter extends StructWriter { + private StructLikeWriter(List> writers) { super(writers); } @@ -73,78 +69,4 @@ protected Object get(StructLike struct, int index) { return struct.get(index, Object.class); } } - - private static class LogicalTypeWriterVisitor - implements LogicalTypeAnnotation.LogicalTypeAnnotationVisitor< - ParquetValueWriters.PrimitiveWriter> { - private final ColumnDescriptor desc; - - private LogicalTypeWriterVisitor(ColumnDescriptor desc) { - this.desc = desc; - } - - @Override - public Optional> visit( - LogicalTypeAnnotation.StringLogicalTypeAnnotation stringType) { - return Optional.of(ParquetValueWriters.strings(desc)); - } - - @Override - public Optional> visit( - LogicalTypeAnnotation.EnumLogicalTypeAnnotation enumType) { - return Optional.of(ParquetValueWriters.strings(desc)); - } - - @Override - public Optional> visit( - LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalType) { - switch (desc.getPrimitiveType().getPrimitiveTypeName()) { - case INT32: - return Optional.of( - ParquetValueWriters.decimalAsInteger( - desc, decimalType.getPrecision(), decimalType.getScale())); - case INT64: - return Optional.of( - ParquetValueWriters.decimalAsLong( - desc, decimalType.getPrecision(), decimalType.getScale())); - case BINARY: - case FIXED_LEN_BYTE_ARRAY: - return Optional.of( - ParquetValueWriters.decimalAsFixed( - desc, decimalType.getPrecision(), decimalType.getScale())); - } - return Optional.empty(); - } - - @Override - public Optional> visit( - LogicalTypeAnnotation.IntLogicalTypeAnnotation intType) { - Preconditions.checkArgument( - intType.isSigned() || intType.getBitWidth() < 64, - "Cannot read uint64: not a supported Java type"); - if (intType.getBitWidth() < 64) { - return Optional.of(ParquetValueWriters.ints(desc)); - } else { - return Optional.of(ParquetValueWriters.longs(desc)); - } - } - - @Override - public Optional> visit( - LogicalTypeAnnotation.JsonLogicalTypeAnnotation jsonLogicalType) { - return Optional.of(ParquetValueWriters.strings(desc)); - } - - @Override - public Optional> visit( - LogicalTypeAnnotation.BsonLogicalTypeAnnotation bsonType) { - return Optional.of(ParquetValueWriters.byteBuffers(desc)); - } - - @Override - public Optional> visit( - LogicalTypeAnnotation.UUIDLogicalTypeAnnotation uuidLogicalType) { - return Optional.of(ParquetValueWriters.uuids(desc)); - } - } } diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java index 517d50887940..f575e0687a73 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java @@ -24,6 +24,7 @@ import java.math.BigDecimal; import java.math.BigInteger; import java.nio.ByteBuffer; +import java.nio.ByteOrder; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -425,6 +426,41 @@ public byte[] read(byte[] ignored) { } } + public static class TimestampInt96Reader extends UnboxedReader { + + public TimestampInt96Reader(ColumnDescriptor desc) { + super(desc); + } + + @Override + public Long read(Long ignored) { + return readLong(); + } + + @Override + public long readLong() { + final ByteBuffer byteBuffer = + column.nextBinary().toByteBuffer().order(ByteOrder.LITTLE_ENDIAN); + return ParquetUtil.extractTimestampInt96(byteBuffer); + } + } + + public static class TimestampMillisReader extends UnboxedReader { + public TimestampMillisReader(ColumnDescriptor desc) { + super(desc); + } + + @Override + public Long read(Long ignored) { + return readLong(); + } + + @Override + public long readLong() { + return 1000L * column.nextInteger(); + } + } + private static class OptionReader implements ParquetValueReader { private final int definitionLevel; private final ParquetValueReader reader; diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueWriters.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueWriters.java index 857dc7ad19c2..384a986e59b7 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueWriters.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueWriters.java @@ -112,6 +112,10 @@ public static PrimitiveWriter byteBuffers(ColumnDescriptor desc) { return new BytesWriter(desc); } + public static PrimitiveWriter fixedBuffer(ColumnDescriptor desc) { + return new FixedBufferWriter(desc); + } + public static PrimitiveWriter fixed(ColumnDescriptor desc) { return new FixedWriter(desc); } @@ -323,13 +327,40 @@ public void write(int repetitionLevel, ByteBuffer buffer) { } } + private static class FixedBufferWriter extends PrimitiveWriter { + private final int length; + + private FixedBufferWriter(ColumnDescriptor desc) { + super(desc); + this.length = desc.getPrimitiveType().getTypeLength(); + } + + @Override + public void write(int repetitionLevel, ByteBuffer buffer) { + Preconditions.checkArgument( + buffer.remaining() == length, + "Cannot write byte buffer of length %s as fixed[%s]", + buffer.remaining(), + length); + column.writeBinary(repetitionLevel, Binary.fromReusedByteBuffer(buffer)); + } + } + private static class FixedWriter extends PrimitiveWriter { + private final int length; + private FixedWriter(ColumnDescriptor desc) { super(desc); + this.length = desc.getPrimitiveType().getTypeLength(); } @Override public void write(int repetitionLevel, byte[] value) { + Preconditions.checkArgument( + value.length == length, + "Cannot write byte buffer of length %s as fixed[%s]", + value.length, + length); column.writeBinary(repetitionLevel, Binary.fromReusedByteArray(value)); } } diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/TestInternalWriter.java b/parquet/src/test/java/org/apache/iceberg/parquet/TestInternalWriter.java deleted file mode 100644 index 3b21aca611de..000000000000 --- a/parquet/src/test/java/org/apache/iceberg/parquet/TestInternalWriter.java +++ /dev/null @@ -1,132 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iceberg.parquet; - -import static org.apache.iceberg.parquet.ParquetWritingTestUtils.createTempFile; -import static org.apache.iceberg.types.Types.NestedField.optional; -import static org.apache.iceberg.types.Types.NestedField.required; -import static org.assertj.core.api.Assertions.assertThat; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.file.Path; -import java.util.List; -import java.util.UUID; -import org.apache.iceberg.DataFile; -import org.apache.iceberg.FileContent; -import org.apache.iceberg.FileFormat; -import org.apache.iceberg.Files; -import org.apache.iceberg.PartitionSpec; -import org.apache.iceberg.Schema; -import org.apache.iceberg.StructLike; -import org.apache.iceberg.data.GenericRecord; -import org.apache.iceberg.data.Record; -import org.apache.iceberg.data.parquet.InternalReader; -import org.apache.iceberg.data.parquet.InternalWriter; -import org.apache.iceberg.expressions.Literal; -import org.apache.iceberg.io.CloseableIterable; -import org.apache.iceberg.io.DataWriter; -import org.apache.iceberg.io.OutputFile; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.apache.iceberg.types.Types; -import org.apache.iceberg.util.StructProjection; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.io.TempDir; - -public class TestInternalWriter { - @TempDir private Path temp; - - @Test - public void testDataWriter() throws IOException { - Schema schema = - new Schema( - required(100, "b", Types.BooleanType.get()), - optional(101, "i", Types.IntegerType.get()), - required(102, "l", Types.LongType.get()), - optional(103, "f", Types.FloatType.get()), - required(104, "d", Types.DoubleType.get()), - optional(105, "date", Types.DateType.get()), - required(106, "time", Types.TimeType.get()), - required(107, "ts", Types.TimestampType.withoutZone()), - required(108, "ts_tz", Types.TimestampType.withZone()), - required(109, "s", Types.StringType.get()), - required(110, "uuid", Types.UUIDType.get()), - required(111, "fixed", Types.FixedType.ofLength(7)), - optional(112, "bytes", Types.BinaryType.get()), - required(113, "dec_38_10", Types.DecimalType.of(38, 10))); - - // Consuming the data as per Type.java - GenericRecord record = GenericRecord.create(schema); - record.set(0, true); - record.set(1, 42); - record.set(2, 42L); - record.set(3, 3.14f); - record.set(4, 3.141592653589793); - record.set(5, Literal.of("2022-01-01").to(Types.DateType.get()).value()); - record.set(6, Literal.of("10:10:10").to(Types.TimeType.get()).value()); - record.set( - 7, Literal.of("2017-12-01T10:12:55.038194").to(Types.TimestampType.withoutZone()).value()); - record.set( - 8, - Literal.of("2017-11-29T11:30:07.123456+01:00").to(Types.TimestampType.withZone()).value()); - record.set(9, "string"); - record.set(10, UUID.randomUUID()); - record.set(11, ByteBuffer.wrap(new byte[] {0, 1, 2, 3, 4, 5, 6})); - record.set(12, ByteBuffer.wrap(new byte[] {1, 2, 3})); - record.set( - 13, Literal.of("12345678901234567890.1234567890").to(Types.DecimalType.of(38, 10)).value()); - - StructProjection structProjection = StructProjection.create(schema, schema); - StructProjection row = structProjection.wrap(record); - - OutputFile file = Files.localOutput(createTempFile(temp)); - - DataWriter dataWriter = - Parquet.writeData(file) - .schema(schema) - .createWriterFunc(InternalWriter::buildWriter) - .overwrite() - .withSpec(PartitionSpec.unpartitioned()) - .build(); - - try { - dataWriter.write(row); - } finally { - dataWriter.close(); - } - - DataFile dataFile = dataWriter.toDataFile(); - - assertThat(dataFile.format()).as("Format should be Parquet").isEqualTo(FileFormat.PARQUET); - assertThat(dataFile.content()).as("Should be data file").isEqualTo(FileContent.DATA); - assertThat(dataFile.partition().size()).as("Partition should be empty").isEqualTo(0); - assertThat(dataFile.keyMetadata()).as("Key metadata should be null").isNull(); - - List writtenRecords; - try (CloseableIterable reader = - Parquet.read(file.toInputFile()) - .project(schema) - .createReaderFunc(fileSchema -> InternalReader.buildReader(schema, fileSchema)) - .build()) { - writtenRecords = Lists.newArrayList(reader); - } - assertThat(writtenRecords).hasSize(1); - assertThat(writtenRecords.get(0)).isEqualTo(record); - } -} diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java index 3ce54d2d9ffa..d75542958aba 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java @@ -21,14 +21,12 @@ import java.math.BigDecimal; import java.math.BigInteger; import java.nio.ByteBuffer; -import java.nio.ByteOrder; import java.util.Arrays; import java.util.List; import java.util.Map; import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.Schema; import org.apache.iceberg.parquet.ParquetSchemaUtil; -import org.apache.iceberg.parquet.ParquetUtil; import org.apache.iceberg.parquet.ParquetValueReader; import org.apache.iceberg.parquet.ParquetValueReaders; import org.apache.iceberg.parquet.ParquetValueReaders.FloatAsDoubleReader; @@ -265,7 +263,7 @@ public ParquetValueReader primitive( case TIMESTAMP_MICROS: return new UnboxedReader<>(desc); case TIMESTAMP_MILLIS: - return new TimestampMillisReader(desc); + return new ParquetValueReaders.TimestampMillisReader(desc); case DECIMAL: DecimalLogicalTypeAnnotation decimal = (DecimalLogicalTypeAnnotation) primitive.getLogicalTypeAnnotation(); @@ -315,7 +313,7 @@ public ParquetValueReader primitive( case INT96: // Impala & Spark used to write timestamps as INT96 without a logical type. For backwards // compatibility we try to read INT96 as timestamps. - return new TimestampInt96Reader(desc); + return new ParquetValueReaders.TimestampInt96Reader(desc); default: throw new UnsupportedOperationException("Unsupported type: " + primitive); } @@ -373,41 +371,6 @@ public Decimal read(Decimal ignored) { } } - private static class TimestampMillisReader extends UnboxedReader { - TimestampMillisReader(ColumnDescriptor desc) { - super(desc); - } - - @Override - public Long read(Long ignored) { - return readLong(); - } - - @Override - public long readLong() { - return 1000 * column.nextLong(); - } - } - - private static class TimestampInt96Reader extends UnboxedReader { - - TimestampInt96Reader(ColumnDescriptor desc) { - super(desc); - } - - @Override - public Long read(Long ignored) { - return readLong(); - } - - @Override - public long readLong() { - final ByteBuffer byteBuffer = - column.nextBinary().toByteBuffer().order(ByteOrder.LITTLE_ENDIAN); - return ParquetUtil.extractTimestampInt96(byteBuffer); - } - } - private static class StringReader extends PrimitiveReader { StringReader(ColumnDescriptor desc) { super(desc);