Skip to content

Commit

Permalink
Parquet: Refactor BaseParquetWriter
Browse files Browse the repository at this point in the history
  • Loading branch information
ajantha-bhat committed Jan 2, 2025
1 parent e3f50e5 commit 701e933
Show file tree
Hide file tree
Showing 3 changed files with 191 additions and 165 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,13 @@
*/
package org.apache.iceberg.data.parquet;

import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.Optional;
import org.apache.iceberg.parquet.ParquetTypeVisitor;
import org.apache.iceberg.parquet.ParquetValueWriter;
import org.apache.iceberg.parquet.ParquetValueWriters;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.MessageType;
Expand All @@ -50,6 +41,12 @@ protected ParquetValueWriter<T> createWriter(MessageType type) {
protected abstract ParquetValueWriters.StructWriter<T> createStructWriter(
List<ParquetValueWriter<?>> writers);

protected abstract LogicalTypeAnnotation.LogicalTypeAnnotationVisitor<
ParquetValueWriters.PrimitiveWriter<?>>
logicalTypeWriterVisitor(ColumnDescriptor desc);

protected abstract ParquetValueWriters.PrimitiveWriter<?> fixedWriter(ColumnDescriptor desc);

private class WriteBuilder extends ParquetTypeVisitor<ParquetValueWriter<?>> {
private final MessageType type;

Expand Down Expand Up @@ -120,15 +117,15 @@ public ParquetValueWriter<?> primitive(PrimitiveType primitive) {
LogicalTypeAnnotation logicalType = primitive.getLogicalTypeAnnotation();
if (logicalType != null) {
Optional<ParquetValueWriters.PrimitiveWriter<?>> writer =
logicalType.accept(new LogicalTypeWriterVisitor(desc));
logicalType.accept(logicalTypeWriterVisitor(desc));
if (writer.isPresent()) {
return writer.get();
}
}

switch (primitive.getPrimitiveTypeName()) {
case FIXED_LEN_BYTE_ARRAY:
return new FixedWriter(desc);
return fixedWriter(desc);
case BINARY:
return ParquetValueWriters.byteBuffers(desc);
case BOOLEAN:
Expand All @@ -146,158 +143,4 @@ public ParquetValueWriter<?> primitive(PrimitiveType primitive) {
}
}
}

private static class LogicalTypeWriterVisitor
implements LogicalTypeAnnotation.LogicalTypeAnnotationVisitor<
ParquetValueWriters.PrimitiveWriter<?>> {
private final ColumnDescriptor desc;

private LogicalTypeWriterVisitor(ColumnDescriptor desc) {
this.desc = desc;
}

@Override
public Optional<ParquetValueWriters.PrimitiveWriter<?>> visit(
LogicalTypeAnnotation.StringLogicalTypeAnnotation stringType) {
return Optional.of(ParquetValueWriters.strings(desc));
}

@Override
public Optional<ParquetValueWriters.PrimitiveWriter<?>> visit(
LogicalTypeAnnotation.EnumLogicalTypeAnnotation enumType) {
return Optional.of(ParquetValueWriters.strings(desc));
}

@Override
public Optional<ParquetValueWriters.PrimitiveWriter<?>> visit(
LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalType) {
switch (desc.getPrimitiveType().getPrimitiveTypeName()) {
case INT32:
return Optional.of(
ParquetValueWriters.decimalAsInteger(
desc, decimalType.getPrecision(), decimalType.getScale()));
case INT64:
return Optional.of(
ParquetValueWriters.decimalAsLong(
desc, decimalType.getPrecision(), decimalType.getScale()));
case BINARY:
case FIXED_LEN_BYTE_ARRAY:
return Optional.of(
ParquetValueWriters.decimalAsFixed(
desc, decimalType.getPrecision(), decimalType.getScale()));
}
return Optional.empty();
}

@Override
public Optional<ParquetValueWriters.PrimitiveWriter<?>> visit(
LogicalTypeAnnotation.DateLogicalTypeAnnotation dateType) {
return Optional.of(new DateWriter(desc));
}

@Override
public Optional<ParquetValueWriters.PrimitiveWriter<?>> visit(
LogicalTypeAnnotation.TimeLogicalTypeAnnotation timeType) {
return Optional.of(new TimeWriter(desc));
}

@Override
public Optional<ParquetValueWriters.PrimitiveWriter<?>> visit(
LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestampType) {
Preconditions.checkArgument(
LogicalTypeAnnotation.TimeUnit.MICROS.equals(timestampType.getUnit()),
"Cannot write timestamp in %s, only MICROS is supported",
timestampType.getUnit());
if (timestampType.isAdjustedToUTC()) {
return Optional.of(new TimestamptzWriter(desc));
} else {
return Optional.of(new TimestampWriter(desc));
}
}

@Override
public Optional<ParquetValueWriters.PrimitiveWriter<?>> visit(
LogicalTypeAnnotation.IntLogicalTypeAnnotation intType) {
Preconditions.checkArgument(
intType.isSigned() || intType.getBitWidth() < 64,
"Cannot read uint64: not a supported Java type");
if (intType.getBitWidth() < 64) {
return Optional.of(ParquetValueWriters.ints(desc));
} else {
return Optional.of(ParquetValueWriters.longs(desc));
}
}

@Override
public Optional<ParquetValueWriters.PrimitiveWriter<?>> visit(
LogicalTypeAnnotation.JsonLogicalTypeAnnotation jsonLogicalType) {
return Optional.of(ParquetValueWriters.strings(desc));
}

@Override
public Optional<ParquetValueWriters.PrimitiveWriter<?>> visit(
LogicalTypeAnnotation.BsonLogicalTypeAnnotation bsonType) {
return Optional.of(ParquetValueWriters.byteBuffers(desc));
}
}

private static final OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC);
private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate();

private static class DateWriter extends ParquetValueWriters.PrimitiveWriter<LocalDate> {
private DateWriter(ColumnDescriptor desc) {
super(desc);
}

@Override
public void write(int repetitionLevel, LocalDate value) {
column.writeInteger(repetitionLevel, (int) ChronoUnit.DAYS.between(EPOCH_DAY, value));
}
}

private static class TimeWriter extends ParquetValueWriters.PrimitiveWriter<LocalTime> {
private TimeWriter(ColumnDescriptor desc) {
super(desc);
}

@Override
public void write(int repetitionLevel, LocalTime value) {
column.writeLong(repetitionLevel, value.toNanoOfDay() / 1000);
}
}

private static class TimestampWriter extends ParquetValueWriters.PrimitiveWriter<LocalDateTime> {
private TimestampWriter(ColumnDescriptor desc) {
super(desc);
}

@Override
public void write(int repetitionLevel, LocalDateTime value) {
column.writeLong(
repetitionLevel, ChronoUnit.MICROS.between(EPOCH, value.atOffset(ZoneOffset.UTC)));
}
}

private static class TimestamptzWriter
extends ParquetValueWriters.PrimitiveWriter<OffsetDateTime> {
private TimestamptzWriter(ColumnDescriptor desc) {
super(desc);
}

@Override
public void write(int repetitionLevel, OffsetDateTime value) {
column.writeLong(repetitionLevel, ChronoUnit.MICROS.between(EPOCH, value));
}
}

private static class FixedWriter extends ParquetValueWriters.PrimitiveWriter<byte[]> {
private FixedWriter(ColumnDescriptor desc) {
super(desc);
}

@Override
public void write(int repetitionLevel, byte[] value) {
column.writeBinary(repetitionLevel, Binary.fromReusedByteArray(value));
}
}
}
Loading

0 comments on commit 701e933

Please sign in to comment.