From 8c79e8e1fffa06f95173b83dfc6d55817f0ae186 Mon Sep 17 00:00:00 2001 From: ajantha-bhat Date: Fri, 3 Jan 2025 18:20:11 +0530 Subject: [PATCH] Parquet: Add internal writer and reader --- .../data/parquet/BaseParquetReaders.java | 2 +- .../iceberg/data/parquet/InternalReader.java | 207 ++++++++++++++++++ .../iceberg/data/parquet/InternalWriter.java | 150 +++++++++++++ .../iceberg/parquet/ParquetValueReaders.java | 13 ++ .../iceberg/parquet/ParquetValueWriters.java | 17 ++ .../iceberg/parquet/TestInternalWriter.java | 132 +++++++++++ 6 files changed, 520 insertions(+), 1 deletion(-) create mode 100644 parquet/src/main/java/org/apache/iceberg/data/parquet/InternalReader.java create mode 100644 parquet/src/main/java/org/apache/iceberg/data/parquet/InternalWriter.java create mode 100644 parquet/src/test/java/org/apache/iceberg/parquet/TestInternalWriter.java diff --git a/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java b/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java index 65ff78513350..6216087d2e9e 100644 --- a/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java +++ b/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java @@ -250,7 +250,7 @@ public ParquetValueReader primitive( ColumnDescriptor desc = type.getColumnDescription(currentPath()); - if (primitive.getOriginalType() != null) { + if (primitive.getLogicalTypeAnnotation() != null) { return primitive .getLogicalTypeAnnotation() .accept(logicalTypeReaderVisitor(desc, expected, primitive)) diff --git a/parquet/src/main/java/org/apache/iceberg/data/parquet/InternalReader.java b/parquet/src/main/java/org/apache/iceberg/data/parquet/InternalReader.java new file mode 100644 index 000000000000..51cdef1952c0 --- /dev/null +++ b/parquet/src/main/java/org/apache/iceberg/data/parquet/InternalReader.java @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.data.parquet; + +import java.util.List; +import java.util.Map; +import java.util.Optional; +import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.parquet.ParquetValueReader; +import org.apache.iceberg.parquet.ParquetValueReaders; +import org.apache.iceberg.parquet.ParquetValueReaders.StructReader; +import org.apache.iceberg.types.Types.StructType; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Type; + +public class InternalReader extends BaseParquetReaders { + + private static final InternalReader INSTANCE = new InternalReader(); + + private InternalReader() {} + + public static ParquetValueReader buildReader( + Schema expectedSchema, MessageType fileSchema) { + return INSTANCE.createReader(expectedSchema, fileSchema); + } + + public static ParquetValueReader buildReader( + Schema expectedSchema, MessageType fileSchema, Map idToConstant) { + return INSTANCE.createReader(expectedSchema, fileSchema, idToConstant); + } + + @Override + protected ParquetValueReader createStructReader( + List types, List> fieldReaders, StructType structType) { + return new ParquetStructReader(types, fieldReaders, structType); + } + + @Override + protected LogicalTypeAnnotation.LogicalTypeAnnotationVisitor> + logicalTypeReaderVisitor( + ColumnDescriptor desc, + org.apache.iceberg.types.Type.PrimitiveType expected, + PrimitiveType primitive) { + return new LogicalTypeAnnotationParquetValueReaderVisitor(desc, expected, primitive); + } + + @Override + protected ParquetValueReaders.PrimitiveReader fixedReader(ColumnDescriptor desc) { + return new ParquetValueReaders.BytesReader(desc); + } + + @Override + protected ParquetValueReaders.PrimitiveReader int96Reader(ColumnDescriptor desc) { + // normal handling as int96 + return new ParquetValueReaders.UnboxedReader<>(desc); + } + + private static class ParquetStructReader extends StructReader { + private final GenericRecord template; + + ParquetStructReader(List types, List> readers, StructType struct) { + super(types, readers); + this.template = struct != null ? GenericRecord.create(struct) : null; + } + + @Override + protected StructLike newStructData(StructLike reuse) { + if (reuse != null) { + return reuse; + } else { + return template.copy(); + } + } + + @Override + protected Object getField(StructLike intermediate, int pos) { + return intermediate.get(pos, Object.class); + } + + @Override + protected StructLike buildStruct(StructLike struct) { + return struct; + } + + @Override + protected void set(StructLike struct, int pos, Object value) { + struct.set(pos, value); + } + } + + private static class LogicalTypeAnnotationParquetValueReaderVisitor + implements LogicalTypeAnnotation.LogicalTypeAnnotationVisitor> { + + private final ColumnDescriptor desc; + private final org.apache.iceberg.types.Type.PrimitiveType expected; + private final PrimitiveType primitive; + + LogicalTypeAnnotationParquetValueReaderVisitor( + ColumnDescriptor desc, + org.apache.iceberg.types.Type.PrimitiveType expected, + PrimitiveType primitive) { + this.desc = desc; + this.expected = expected; + this.primitive = primitive; + } + + @Override + public Optional> visit( + LogicalTypeAnnotation.StringLogicalTypeAnnotation stringLogicalType) { + return Optional.of(new ParquetValueReaders.StringReader(desc)); + } + + @Override + public Optional> visit( + LogicalTypeAnnotation.EnumLogicalTypeAnnotation enumLogicalType) { + return Optional.of(new ParquetValueReaders.StringReader(desc)); + } + + @Override + public Optional> visit( + LogicalTypeAnnotation.UUIDLogicalTypeAnnotation uuidLogicalType) { + return Optional.of(new ParquetValueReaders.UUIDReader(desc)); + } + + @Override + public Optional> visit( + LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalLogicalType) { + switch (primitive.getPrimitiveTypeName()) { + case BINARY: + case FIXED_LEN_BYTE_ARRAY: + return Optional.of( + new ParquetValueReaders.BinaryAsDecimalReader(desc, decimalLogicalType.getScale())); + case INT64: + return Optional.of( + new ParquetValueReaders.LongAsDecimalReader(desc, decimalLogicalType.getScale())); + case INT32: + return Optional.of( + new ParquetValueReaders.IntegerAsDecimalReader(desc, decimalLogicalType.getScale())); + default: + throw new UnsupportedOperationException( + "Unsupported base type for decimal: " + primitive.getPrimitiveTypeName()); + } + } + + @Override + public Optional> visit( + LogicalTypeAnnotation.DateLogicalTypeAnnotation dateLogicalType) { + return Optional.of(new ParquetValueReaders.UnboxedReader<>(desc)); + } + + @Override + public Optional> visit( + LogicalTypeAnnotation.TimeLogicalTypeAnnotation timeLogicalType) { + return Optional.of(new ParquetValueReaders.UnboxedReader<>(desc)); + } + + @Override + public Optional> visit( + LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestampLogicalType) { + return Optional.of(new ParquetValueReaders.UnboxedReader<>(desc)); + } + + @Override + public Optional> visit( + LogicalTypeAnnotation.IntLogicalTypeAnnotation intLogicalType) { + if (intLogicalType.getBitWidth() == 64) { + return Optional.of(new ParquetValueReaders.UnboxedReader<>(desc)); + } + return (expected.typeId() == org.apache.iceberg.types.Type.TypeID.LONG) + ? Optional.of(new ParquetValueReaders.IntAsLongReader(desc)) + : Optional.of(new ParquetValueReaders.UnboxedReader<>(desc)); + } + + @Override + public Optional> visit( + LogicalTypeAnnotation.JsonLogicalTypeAnnotation jsonLogicalType) { + return Optional.of(new ParquetValueReaders.StringReader(desc)); + } + + @Override + public Optional> visit( + LogicalTypeAnnotation.BsonLogicalTypeAnnotation bsonLogicalType) { + return Optional.of(new ParquetValueReaders.BytesReader(desc)); + } + } +} diff --git a/parquet/src/main/java/org/apache/iceberg/data/parquet/InternalWriter.java b/parquet/src/main/java/org/apache/iceberg/data/parquet/InternalWriter.java new file mode 100644 index 000000000000..2acb184c4d3f --- /dev/null +++ b/parquet/src/main/java/org/apache/iceberg/data/parquet/InternalWriter.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.data.parquet; + +import java.util.List; +import java.util.Optional; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.parquet.ParquetValueWriter; +import org.apache.iceberg.parquet.ParquetValueWriters; +import org.apache.iceberg.parquet.ParquetValueWriters.StructWriter; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.types.Type; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.parquet.schema.MessageType; + +/** + * A Writer that consumes Iceberg's internal in-memory object model. + * + *

Iceberg's internal in-memory object model produces the types defined in {@link + * Type.TypeID#javaClass()}. + */ +public class InternalWriter extends BaseParquetWriter { + private static final InternalWriter INSTANCE = new InternalWriter(); + + private InternalWriter() {} + + public static ParquetValueWriter buildWriter(MessageType type) { + return INSTANCE.createWriter(type); + } + + @Override + protected StructWriter createStructWriter(List> writers) { + return new ParquetStructWriter(writers); + } + + @Override + protected LogicalTypeAnnotation.LogicalTypeAnnotationVisitor< + ParquetValueWriters.PrimitiveWriter> + logicalTypeWriterVisitor(ColumnDescriptor desc) { + return new LogicalTypeWriterVisitor(desc); + } + + @Override + protected ParquetValueWriters.PrimitiveWriter fixedWriter(ColumnDescriptor desc) { + // accepts ByteBuffer and internally writes as binary. + return ParquetValueWriters.byteBuffers(desc); + } + + private static class ParquetStructWriter extends StructWriter { + private ParquetStructWriter(List> writers) { + super(writers); + } + + @Override + protected Object get(StructLike struct, int index) { + return struct.get(index, Object.class); + } + } + + private static class LogicalTypeWriterVisitor + implements LogicalTypeAnnotation.LogicalTypeAnnotationVisitor< + ParquetValueWriters.PrimitiveWriter> { + private final ColumnDescriptor desc; + + private LogicalTypeWriterVisitor(ColumnDescriptor desc) { + this.desc = desc; + } + + @Override + public Optional> visit( + LogicalTypeAnnotation.StringLogicalTypeAnnotation stringType) { + return Optional.of(ParquetValueWriters.strings(desc)); + } + + @Override + public Optional> visit( + LogicalTypeAnnotation.EnumLogicalTypeAnnotation enumType) { + return Optional.of(ParquetValueWriters.strings(desc)); + } + + @Override + public Optional> visit( + LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalType) { + switch (desc.getPrimitiveType().getPrimitiveTypeName()) { + case INT32: + return Optional.of( + ParquetValueWriters.decimalAsInteger( + desc, decimalType.getPrecision(), decimalType.getScale())); + case INT64: + return Optional.of( + ParquetValueWriters.decimalAsLong( + desc, decimalType.getPrecision(), decimalType.getScale())); + case BINARY: + case FIXED_LEN_BYTE_ARRAY: + return Optional.of( + ParquetValueWriters.decimalAsFixed( + desc, decimalType.getPrecision(), decimalType.getScale())); + } + return Optional.empty(); + } + + @Override + public Optional> visit( + LogicalTypeAnnotation.IntLogicalTypeAnnotation intType) { + Preconditions.checkArgument( + intType.isSigned() || intType.getBitWidth() < 64, + "Cannot read uint64: not a supported Java type"); + if (intType.getBitWidth() < 64) { + return Optional.of(ParquetValueWriters.ints(desc)); + } else { + return Optional.of(ParquetValueWriters.longs(desc)); + } + } + + @Override + public Optional> visit( + LogicalTypeAnnotation.JsonLogicalTypeAnnotation jsonLogicalType) { + return Optional.of(ParquetValueWriters.strings(desc)); + } + + @Override + public Optional> visit( + LogicalTypeAnnotation.BsonLogicalTypeAnnotation bsonType) { + return Optional.of(ParquetValueWriters.byteBuffers(desc)); + } + + @Override + public Optional> visit( + LogicalTypeAnnotation.UUIDLogicalTypeAnnotation uuidLogicalType) { + return Optional.of(ParquetValueWriters.uuids(desc)); + } + } +} diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java index b055a139fa59..517d50887940 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java @@ -27,9 +27,11 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.UUID; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.util.UUIDUtil; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.page.PageReadStore; import org.apache.parquet.io.api.Binary; @@ -401,6 +403,17 @@ public ByteBuffer read(ByteBuffer reuse) { } } + public static class UUIDReader extends PrimitiveReader { + public UUIDReader(ColumnDescriptor desc) { + super(desc); + } + + @Override + public UUID read(UUID reuse) { + return UUIDUtil.convert(column.nextBinary().toByteBuffer()); + } + } + public static class ByteArrayReader extends ParquetValueReaders.PrimitiveReader { public ByteArrayReader(ColumnDescriptor desc) { super(desc); diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueWriters.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueWriters.java index 70fde738f645..857dc7ad19c2 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueWriters.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueWriters.java @@ -26,6 +26,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.UUID; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -38,6 +39,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.util.DecimalUtil; +import org.apache.iceberg.util.UUIDUtil; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.ColumnWriteStore; import org.apache.parquet.io.api.Binary; @@ -87,6 +89,10 @@ public static PrimitiveWriter strings(ColumnDescriptor desc) { return new StringWriter(desc); } + public static PrimitiveWriter uuids(ColumnDescriptor desc) { + return new UUIDWriter(desc); + } + public static PrimitiveWriter decimalAsInteger( ColumnDescriptor desc, int precision, int scale) { return new IntegerDecimalWriter(desc, precision, scale); @@ -345,6 +351,17 @@ public void write(int repetitionLevel, CharSequence value) { } } + private static class UUIDWriter extends PrimitiveWriter { + private UUIDWriter(ColumnDescriptor desc) { + super(desc); + } + + @Override + public void write(int repetitionLevel, UUID value) { + column.writeBinary(repetitionLevel, Binary.fromReusedByteArray(UUIDUtil.convert(value))); + } + } + static class OptionWriter implements ParquetValueWriter { private final int definitionLevel; private final ParquetValueWriter writer; diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/TestInternalWriter.java b/parquet/src/test/java/org/apache/iceberg/parquet/TestInternalWriter.java new file mode 100644 index 000000000000..3b21aca611de --- /dev/null +++ b/parquet/src/test/java/org/apache/iceberg/parquet/TestInternalWriter.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.parquet; + +import static org.apache.iceberg.parquet.ParquetWritingTestUtils.createTempFile; +import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.apache.iceberg.types.Types.NestedField.required; +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.file.Path; +import java.util.List; +import java.util.UUID; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileContent; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Files; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.data.parquet.InternalReader; +import org.apache.iceberg.data.parquet.InternalWriter; +import org.apache.iceberg.expressions.Literal; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.DataWriter; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.StructProjection; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +public class TestInternalWriter { + @TempDir private Path temp; + + @Test + public void testDataWriter() throws IOException { + Schema schema = + new Schema( + required(100, "b", Types.BooleanType.get()), + optional(101, "i", Types.IntegerType.get()), + required(102, "l", Types.LongType.get()), + optional(103, "f", Types.FloatType.get()), + required(104, "d", Types.DoubleType.get()), + optional(105, "date", Types.DateType.get()), + required(106, "time", Types.TimeType.get()), + required(107, "ts", Types.TimestampType.withoutZone()), + required(108, "ts_tz", Types.TimestampType.withZone()), + required(109, "s", Types.StringType.get()), + required(110, "uuid", Types.UUIDType.get()), + required(111, "fixed", Types.FixedType.ofLength(7)), + optional(112, "bytes", Types.BinaryType.get()), + required(113, "dec_38_10", Types.DecimalType.of(38, 10))); + + // Consuming the data as per Type.java + GenericRecord record = GenericRecord.create(schema); + record.set(0, true); + record.set(1, 42); + record.set(2, 42L); + record.set(3, 3.14f); + record.set(4, 3.141592653589793); + record.set(5, Literal.of("2022-01-01").to(Types.DateType.get()).value()); + record.set(6, Literal.of("10:10:10").to(Types.TimeType.get()).value()); + record.set( + 7, Literal.of("2017-12-01T10:12:55.038194").to(Types.TimestampType.withoutZone()).value()); + record.set( + 8, + Literal.of("2017-11-29T11:30:07.123456+01:00").to(Types.TimestampType.withZone()).value()); + record.set(9, "string"); + record.set(10, UUID.randomUUID()); + record.set(11, ByteBuffer.wrap(new byte[] {0, 1, 2, 3, 4, 5, 6})); + record.set(12, ByteBuffer.wrap(new byte[] {1, 2, 3})); + record.set( + 13, Literal.of("12345678901234567890.1234567890").to(Types.DecimalType.of(38, 10)).value()); + + StructProjection structProjection = StructProjection.create(schema, schema); + StructProjection row = structProjection.wrap(record); + + OutputFile file = Files.localOutput(createTempFile(temp)); + + DataWriter dataWriter = + Parquet.writeData(file) + .schema(schema) + .createWriterFunc(InternalWriter::buildWriter) + .overwrite() + .withSpec(PartitionSpec.unpartitioned()) + .build(); + + try { + dataWriter.write(row); + } finally { + dataWriter.close(); + } + + DataFile dataFile = dataWriter.toDataFile(); + + assertThat(dataFile.format()).as("Format should be Parquet").isEqualTo(FileFormat.PARQUET); + assertThat(dataFile.content()).as("Should be data file").isEqualTo(FileContent.DATA); + assertThat(dataFile.partition().size()).as("Partition should be empty").isEqualTo(0); + assertThat(dataFile.keyMetadata()).as("Key metadata should be null").isNull(); + + List writtenRecords; + try (CloseableIterable reader = + Parquet.read(file.toInputFile()) + .project(schema) + .createReaderFunc(fileSchema -> InternalReader.buildReader(schema, fileSchema)) + .build()) { + writtenRecords = Lists.newArrayList(reader); + } + assertThat(writtenRecords).hasSize(1); + assertThat(writtenRecords.get(0)).isEqualTo(record); + } +}