-
Notifications
You must be signed in to change notification settings - Fork 2.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Parquet: Add readers and writers for the internal object model #11904
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,94 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
package org.apache.iceberg.data.parquet; | ||
|
||
import static org.assertj.core.api.Assertions.assertThat; | ||
|
||
import java.io.File; | ||
import java.io.IOException; | ||
import java.util.List; | ||
import org.apache.iceberg.Files; | ||
import org.apache.iceberg.InternalTestHelpers; | ||
import org.apache.iceberg.PartitionSpec; | ||
import org.apache.iceberg.RandomInternalData; | ||
import org.apache.iceberg.Schema; | ||
import org.apache.iceberg.StructLike; | ||
import org.apache.iceberg.data.DataTest; | ||
import org.apache.iceberg.io.CloseableIterable; | ||
import org.apache.iceberg.io.DataWriter; | ||
import org.apache.iceberg.io.OutputFile; | ||
import org.apache.iceberg.parquet.Parquet; | ||
import org.apache.iceberg.relocated.com.google.common.collect.Lists; | ||
|
||
public class TestInternalData extends DataTest { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Kept this class in data module instead of parquet because, parquet module don't have |
||
@Override | ||
protected void writeAndValidate(Schema schema) throws IOException { | ||
writeAndValidate(schema, schema); | ||
} | ||
|
||
@Override | ||
protected void writeAndValidate(Schema writeSchema, Schema expectedSchema) throws IOException { | ||
List<StructLike> expected = RandomInternalData.generate(writeSchema, 100, 42L); | ||
|
||
File testFile = File.createTempFile("junit", null, temp.toFile()); | ||
assertThat(testFile.delete()).isTrue(); | ||
|
||
OutputFile outputFile = Files.localOutput(testFile); | ||
|
||
try (DataWriter<StructLike> dataWriter = | ||
Parquet.writeData(outputFile) | ||
.schema(writeSchema) | ||
.createWriterFunc(InternalWriter::buildWriter) | ||
.overwrite() | ||
.withSpec(PartitionSpec.unpartitioned()) | ||
.build()) { | ||
for (StructLike record : expected) { | ||
dataWriter.write(record); | ||
} | ||
} | ||
|
||
List<StructLike> rows; | ||
try (CloseableIterable<StructLike> reader = | ||
Parquet.read(Files.localInput(testFile)) | ||
.project(expectedSchema) | ||
.createReaderFunc(fileSchema -> InternalReader.buildReader(expectedSchema, fileSchema)) | ||
.build()) { | ||
rows = Lists.newArrayList(reader); | ||
} | ||
|
||
for (int i = 0; i < expected.size(); i += 1) { | ||
InternalTestHelpers.assertEquals(expectedSchema.asStruct(), expected.get(i), rows.get(i)); | ||
} | ||
|
||
// test reuseContainers | ||
try (CloseableIterable<StructLike> reader = | ||
Parquet.read(Files.localInput(testFile)) | ||
.project(expectedSchema) | ||
.reuseContainers() | ||
.createReaderFunc(fileSchema -> InternalReader.buildReader(expectedSchema, fileSchema)) | ||
.build()) { | ||
int index = 0; | ||
for (StructLike actualRecord : reader) { | ||
InternalTestHelpers.assertEquals( | ||
expectedSchema.asStruct(), expected.get(index), actualRecord); | ||
index += 1; | ||
} | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,19 +18,9 @@ | |
*/ | ||
package org.apache.iceberg.data.parquet; | ||
|
||
import java.nio.ByteBuffer; | ||
import java.nio.ByteOrder; | ||
import java.time.Instant; | ||
import java.time.LocalDate; | ||
import java.time.LocalDateTime; | ||
import java.time.LocalTime; | ||
import java.time.OffsetDateTime; | ||
import java.time.ZoneOffset; | ||
import java.time.temporal.ChronoUnit; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Optional; | ||
import java.util.concurrent.TimeUnit; | ||
import org.apache.iceberg.MetadataColumns; | ||
import org.apache.iceberg.Schema; | ||
import org.apache.iceberg.parquet.ParquetSchemaUtil; | ||
|
@@ -46,6 +36,7 @@ | |
import org.apache.parquet.schema.GroupType; | ||
import org.apache.parquet.schema.LogicalTypeAnnotation; | ||
import org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation; | ||
import org.apache.parquet.schema.LogicalTypeAnnotation.TimeUnit; | ||
import org.apache.parquet.schema.MessageType; | ||
import org.apache.parquet.schema.PrimitiveType; | ||
import org.apache.parquet.schema.Type; | ||
|
@@ -76,6 +67,31 @@ protected ParquetValueReader<T> createReader( | |
protected abstract ParquetValueReader<T> createStructReader( | ||
List<Type> types, List<ParquetValueReader<?>> fieldReaders, Types.StructType structType); | ||
|
||
protected ParquetValueReaders.PrimitiveReader<?> fixedReader(ColumnDescriptor desc) { | ||
return null; | ||
} | ||
|
||
protected ParquetValueReaders.PrimitiveReader<?> int96Reader(ColumnDescriptor desc) { | ||
return null; | ||
} | ||
|
||
protected Optional<ParquetValueReader<?>> dateReader(ColumnDescriptor desc) { | ||
return Optional.empty(); | ||
} | ||
|
||
protected Optional<ParquetValueReader<?>> timeReader(ColumnDescriptor desc, TimeUnit unit) { | ||
return Optional.empty(); | ||
} | ||
|
||
protected Optional<ParquetValueReader<?>> timestampReader( | ||
ColumnDescriptor desc, TimeUnit unit, boolean isAdjustedToUTC) { | ||
return Optional.empty(); | ||
} | ||
|
||
protected Optional<ParquetValueReader<?>> uuidReader(ColumnDescriptor desc) { | ||
return Optional.empty(); | ||
} | ||
|
||
protected Object convertConstant(org.apache.iceberg.types.Type type, Object value) { | ||
return value; | ||
} | ||
|
@@ -164,37 +180,22 @@ public Optional<ParquetValueReader<?>> visit(DecimalLogicalTypeAnnotation decima | |
@Override | ||
public Optional<ParquetValueReader<?>> visit( | ||
LogicalTypeAnnotation.DateLogicalTypeAnnotation dateLogicalType) { | ||
return Optional.of(new DateReader(desc)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This and the following 2 methods are the only changes between the implementations of this class, so a lot of code is duplicated. In addition, this already introduces abstract factory methods for some readers -- including timestamps. I think it would be much cleaner to reuse this and call factory methods instead: protected abstract PrimitiveReader<?> dateReader(ColumnDescriptor desc);
protected abstract PrimitiveReader<?> timeReader(ColumnDescriptor desc, ChronoUnit unit);
protected abstract PrimitiveReader<?> timestampReader(ColumnDescriptor desc, ChronoUnit unit, boolean isAdjustedToUTC); |
||
return dateReader(desc); | ||
} | ||
|
||
@Override | ||
public Optional<ParquetValueReader<?>> visit( | ||
LogicalTypeAnnotation.TimeLogicalTypeAnnotation timeLogicalType) { | ||
if (timeLogicalType.getUnit() == LogicalTypeAnnotation.TimeUnit.MICROS) { | ||
return Optional.of(new TimeReader(desc)); | ||
} else if (timeLogicalType.getUnit() == LogicalTypeAnnotation.TimeUnit.MILLIS) { | ||
return Optional.of(new TimeMillisReader(desc)); | ||
} | ||
|
||
return Optional.empty(); | ||
return timeReader(desc, timeLogicalType.getUnit()); | ||
} | ||
|
||
@Override | ||
public Optional<ParquetValueReader<?>> visit( | ||
LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestampLogicalType) { | ||
if (timestampLogicalType.getUnit() == LogicalTypeAnnotation.TimeUnit.MICROS) { | ||
Types.TimestampType tsMicrosType = (Types.TimestampType) expected; | ||
return tsMicrosType.shouldAdjustToUTC() | ||
? Optional.of(new TimestamptzReader(desc)) | ||
: Optional.of(new TimestampReader(desc)); | ||
} else if (timestampLogicalType.getUnit() == LogicalTypeAnnotation.TimeUnit.MILLIS) { | ||
Types.TimestampType tsMillisType = (Types.TimestampType) expected; | ||
return tsMillisType.shouldAdjustToUTC() | ||
? Optional.of(new TimestamptzMillisReader(desc)) | ||
: Optional.of(new TimestampMillisReader(desc)); | ||
} | ||
|
||
return LogicalTypeAnnotation.LogicalTypeAnnotationVisitor.super.visit(timestampLogicalType); | ||
return timestampReader( | ||
desc, | ||
timestampLogicalType.getUnit(), | ||
((Types.TimestampType) expected).shouldAdjustToUTC()); | ||
} | ||
|
||
@Override | ||
|
@@ -219,6 +220,12 @@ public Optional<ParquetValueReader<?>> visit( | |
LogicalTypeAnnotation.BsonLogicalTypeAnnotation bsonLogicalType) { | ||
return Optional.of(new ParquetValueReaders.BytesReader(desc)); | ||
} | ||
|
||
@Override | ||
public Optional<ParquetValueReader<?>> visit( | ||
LogicalTypeAnnotation.UUIDLogicalTypeAnnotation uuidLogicalType) { | ||
return uuidReader(desc); | ||
} | ||
} | ||
|
||
private class ReadBuilder extends TypeWithSchemaVisitor<ParquetValueReader<?>> { | ||
|
@@ -359,7 +366,7 @@ public ParquetValueReader<?> primitive( | |
|
||
ColumnDescriptor desc = type.getColumnDescription(currentPath()); | ||
|
||
if (primitive.getOriginalType() != null) { | ||
if (primitive.getLogicalTypeAnnotation() != null) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree with this change, but please point these kinds of changes out for reviewers. The old version worked because all of the supported logical type annotations had an equivalent |
||
return primitive | ||
.getLogicalTypeAnnotation() | ||
.accept(new LogicalTypeAnnotationParquetValueReaderVisitor(desc, expected, primitive)) | ||
|
@@ -371,7 +378,7 @@ public ParquetValueReader<?> primitive( | |
|
||
switch (primitive.getPrimitiveTypeName()) { | ||
case FIXED_LEN_BYTE_ARRAY: | ||
return new FixedReader(desc); | ||
return fixedReader(desc); | ||
case BINARY: | ||
if (expected.typeId() == org.apache.iceberg.types.Type.TypeID.STRING) { | ||
return new ParquetValueReaders.StringReader(desc); | ||
|
@@ -397,7 +404,7 @@ public ParquetValueReader<?> primitive( | |
case INT96: | ||
// Impala & Spark used to write timestamps as INT96 without a logical type. For backwards | ||
// compatibility we try to read INT96 as timestamps. | ||
return new TimestampInt96Reader(desc); | ||
return int96Reader(desc); | ||
default: | ||
throw new UnsupportedOperationException("Unsupported type: " + primitive); | ||
} | ||
|
@@ -407,124 +414,4 @@ MessageType type() { | |
return type; | ||
} | ||
} | ||
|
||
private static final OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC); | ||
private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate(); | ||
|
||
private static class DateReader extends ParquetValueReaders.PrimitiveReader<LocalDate> { | ||
private DateReader(ColumnDescriptor desc) { | ||
super(desc); | ||
} | ||
|
||
@Override | ||
public LocalDate read(LocalDate reuse) { | ||
return EPOCH_DAY.plusDays(column.nextInteger()); | ||
} | ||
} | ||
|
||
private static class TimestampReader extends ParquetValueReaders.PrimitiveReader<LocalDateTime> { | ||
private TimestampReader(ColumnDescriptor desc) { | ||
super(desc); | ||
} | ||
|
||
@Override | ||
public LocalDateTime read(LocalDateTime reuse) { | ||
return EPOCH.plus(column.nextLong(), ChronoUnit.MICROS).toLocalDateTime(); | ||
} | ||
} | ||
|
||
private static class TimestampMillisReader | ||
extends ParquetValueReaders.PrimitiveReader<LocalDateTime> { | ||
private TimestampMillisReader(ColumnDescriptor desc) { | ||
super(desc); | ||
} | ||
|
||
@Override | ||
public LocalDateTime read(LocalDateTime reuse) { | ||
return EPOCH.plus(column.nextLong() * 1000, ChronoUnit.MICROS).toLocalDateTime(); | ||
} | ||
} | ||
|
||
private static class TimestampInt96Reader | ||
extends ParquetValueReaders.PrimitiveReader<OffsetDateTime> { | ||
private static final long UNIX_EPOCH_JULIAN = 2_440_588L; | ||
|
||
private TimestampInt96Reader(ColumnDescriptor desc) { | ||
super(desc); | ||
} | ||
|
||
@Override | ||
public OffsetDateTime read(OffsetDateTime reuse) { | ||
final ByteBuffer byteBuffer = | ||
column.nextBinary().toByteBuffer().order(ByteOrder.LITTLE_ENDIAN); | ||
final long timeOfDayNanos = byteBuffer.getLong(); | ||
final int julianDay = byteBuffer.getInt(); | ||
|
||
return Instant.ofEpochMilli(TimeUnit.DAYS.toMillis(julianDay - UNIX_EPOCH_JULIAN)) | ||
.plusNanos(timeOfDayNanos) | ||
.atOffset(ZoneOffset.UTC); | ||
} | ||
} | ||
|
||
private static class TimestamptzReader | ||
extends ParquetValueReaders.PrimitiveReader<OffsetDateTime> { | ||
private TimestamptzReader(ColumnDescriptor desc) { | ||
super(desc); | ||
} | ||
|
||
@Override | ||
public OffsetDateTime read(OffsetDateTime reuse) { | ||
return EPOCH.plus(column.nextLong(), ChronoUnit.MICROS); | ||
} | ||
} | ||
|
||
private static class TimestamptzMillisReader | ||
extends ParquetValueReaders.PrimitiveReader<OffsetDateTime> { | ||
private TimestamptzMillisReader(ColumnDescriptor desc) { | ||
super(desc); | ||
} | ||
|
||
@Override | ||
public OffsetDateTime read(OffsetDateTime reuse) { | ||
return EPOCH.plus(column.nextLong() * 1000, ChronoUnit.MICROS); | ||
} | ||
} | ||
|
||
private static class TimeMillisReader extends ParquetValueReaders.PrimitiveReader<LocalTime> { | ||
private TimeMillisReader(ColumnDescriptor desc) { | ||
super(desc); | ||
} | ||
|
||
@Override | ||
public LocalTime read(LocalTime reuse) { | ||
return LocalTime.ofNanoOfDay(column.nextLong() * 1000000L); | ||
} | ||
} | ||
|
||
private static class TimeReader extends ParquetValueReaders.PrimitiveReader<LocalTime> { | ||
private TimeReader(ColumnDescriptor desc) { | ||
super(desc); | ||
} | ||
|
||
@Override | ||
public LocalTime read(LocalTime reuse) { | ||
return LocalTime.ofNanoOfDay(column.nextLong() * 1000L); | ||
} | ||
} | ||
|
||
private static class FixedReader extends ParquetValueReaders.PrimitiveReader<byte[]> { | ||
private FixedReader(ColumnDescriptor desc) { | ||
super(desc); | ||
} | ||
|
||
@Override | ||
public byte[] read(byte[] reuse) { | ||
if (reuse != null) { | ||
column.nextBinary().toByteBuffer().duplicate().get(reuse); | ||
return reuse; | ||
} else { | ||
return column.nextBinary().getBytes(); | ||
} | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressing the nit from #11919