Skip to content

Commit

Permalink
Parquet: Add internal writer and reader
Browse files Browse the repository at this point in the history
  • Loading branch information
ajantha-bhat committed Jan 3, 2025
1 parent cdf3fac commit 8c79e8e
Show file tree
Hide file tree
Showing 6 changed files with 520 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ public ParquetValueReader<?> primitive(

ColumnDescriptor desc = type.getColumnDescription(currentPath());

if (primitive.getOriginalType() != null) {
if (primitive.getLogicalTypeAnnotation() != null) {
return primitive
.getLogicalTypeAnnotation()
.accept(logicalTypeReaderVisitor(desc, expected, primitive))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.data.parquet;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.parquet.ParquetValueReader;
import org.apache.iceberg.parquet.ParquetValueReaders;
import org.apache.iceberg.parquet.ParquetValueReaders.StructReader;
import org.apache.iceberg.types.Types.StructType;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;

public class InternalReader extends BaseParquetReaders<StructLike> {

private static final InternalReader INSTANCE = new InternalReader();

private InternalReader() {}

public static ParquetValueReader<StructLike> buildReader(
Schema expectedSchema, MessageType fileSchema) {
return INSTANCE.createReader(expectedSchema, fileSchema);
}

public static ParquetValueReader<StructLike> buildReader(
Schema expectedSchema, MessageType fileSchema, Map<Integer, ?> idToConstant) {
return INSTANCE.createReader(expectedSchema, fileSchema, idToConstant);
}

@Override
protected ParquetValueReader<StructLike> createStructReader(
List<Type> types, List<ParquetValueReader<?>> fieldReaders, StructType structType) {
return new ParquetStructReader(types, fieldReaders, structType);
}

@Override
protected LogicalTypeAnnotation.LogicalTypeAnnotationVisitor<ParquetValueReader<?>>
logicalTypeReaderVisitor(
ColumnDescriptor desc,
org.apache.iceberg.types.Type.PrimitiveType expected,
PrimitiveType primitive) {
return new LogicalTypeAnnotationParquetValueReaderVisitor(desc, expected, primitive);
}

@Override
protected ParquetValueReaders.PrimitiveReader<?> fixedReader(ColumnDescriptor desc) {
return new ParquetValueReaders.BytesReader(desc);
}

@Override
protected ParquetValueReaders.PrimitiveReader<?> int96Reader(ColumnDescriptor desc) {
// normal handling as int96
return new ParquetValueReaders.UnboxedReader<>(desc);
}

private static class ParquetStructReader extends StructReader<StructLike, StructLike> {
private final GenericRecord template;

ParquetStructReader(List<Type> types, List<ParquetValueReader<?>> readers, StructType struct) {
super(types, readers);
this.template = struct != null ? GenericRecord.create(struct) : null;
}

@Override
protected StructLike newStructData(StructLike reuse) {
if (reuse != null) {
return reuse;
} else {
return template.copy();
}
}

@Override
protected Object getField(StructLike intermediate, int pos) {
return intermediate.get(pos, Object.class);
}

@Override
protected StructLike buildStruct(StructLike struct) {
return struct;
}

@Override
protected void set(StructLike struct, int pos, Object value) {
struct.set(pos, value);
}
}

private static class LogicalTypeAnnotationParquetValueReaderVisitor
implements LogicalTypeAnnotation.LogicalTypeAnnotationVisitor<ParquetValueReader<?>> {

private final ColumnDescriptor desc;
private final org.apache.iceberg.types.Type.PrimitiveType expected;
private final PrimitiveType primitive;

LogicalTypeAnnotationParquetValueReaderVisitor(
ColumnDescriptor desc,
org.apache.iceberg.types.Type.PrimitiveType expected,
PrimitiveType primitive) {
this.desc = desc;
this.expected = expected;
this.primitive = primitive;
}

@Override
public Optional<ParquetValueReader<?>> visit(
LogicalTypeAnnotation.StringLogicalTypeAnnotation stringLogicalType) {
return Optional.of(new ParquetValueReaders.StringReader(desc));
}

@Override
public Optional<ParquetValueReader<?>> visit(
LogicalTypeAnnotation.EnumLogicalTypeAnnotation enumLogicalType) {
return Optional.of(new ParquetValueReaders.StringReader(desc));
}

@Override
public Optional<ParquetValueReader<?>> visit(
LogicalTypeAnnotation.UUIDLogicalTypeAnnotation uuidLogicalType) {
return Optional.of(new ParquetValueReaders.UUIDReader(desc));
}

@Override
public Optional<ParquetValueReader<?>> visit(
LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalLogicalType) {
switch (primitive.getPrimitiveTypeName()) {
case BINARY:
case FIXED_LEN_BYTE_ARRAY:
return Optional.of(
new ParquetValueReaders.BinaryAsDecimalReader(desc, decimalLogicalType.getScale()));
case INT64:
return Optional.of(
new ParquetValueReaders.LongAsDecimalReader(desc, decimalLogicalType.getScale()));
case INT32:
return Optional.of(
new ParquetValueReaders.IntegerAsDecimalReader(desc, decimalLogicalType.getScale()));
default:
throw new UnsupportedOperationException(
"Unsupported base type for decimal: " + primitive.getPrimitiveTypeName());
}
}

@Override
public Optional<ParquetValueReader<?>> visit(
LogicalTypeAnnotation.DateLogicalTypeAnnotation dateLogicalType) {
return Optional.of(new ParquetValueReaders.UnboxedReader<>(desc));
}

@Override
public Optional<ParquetValueReader<?>> visit(
LogicalTypeAnnotation.TimeLogicalTypeAnnotation timeLogicalType) {
return Optional.of(new ParquetValueReaders.UnboxedReader<>(desc));
}

@Override
public Optional<ParquetValueReader<?>> visit(
LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestampLogicalType) {
return Optional.of(new ParquetValueReaders.UnboxedReader<>(desc));
}

@Override
public Optional<ParquetValueReader<?>> visit(
LogicalTypeAnnotation.IntLogicalTypeAnnotation intLogicalType) {
if (intLogicalType.getBitWidth() == 64) {
return Optional.of(new ParquetValueReaders.UnboxedReader<>(desc));
}
return (expected.typeId() == org.apache.iceberg.types.Type.TypeID.LONG)
? Optional.of(new ParquetValueReaders.IntAsLongReader(desc))
: Optional.of(new ParquetValueReaders.UnboxedReader<>(desc));
}

@Override
public Optional<ParquetValueReader<?>> visit(
LogicalTypeAnnotation.JsonLogicalTypeAnnotation jsonLogicalType) {
return Optional.of(new ParquetValueReaders.StringReader(desc));
}

@Override
public Optional<ParquetValueReader<?>> visit(
LogicalTypeAnnotation.BsonLogicalTypeAnnotation bsonLogicalType) {
return Optional.of(new ParquetValueReaders.BytesReader(desc));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.data.parquet;

import java.util.List;
import java.util.Optional;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.parquet.ParquetValueWriter;
import org.apache.iceberg.parquet.ParquetValueWriters;
import org.apache.iceberg.parquet.ParquetValueWriters.StructWriter;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.types.Type;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.MessageType;

/**
* A Writer that consumes Iceberg's internal in-memory object model.
*
* <p>Iceberg's internal in-memory object model produces the types defined in {@link
* Type.TypeID#javaClass()}.
*/
public class InternalWriter extends BaseParquetWriter<StructLike> {
private static final InternalWriter INSTANCE = new InternalWriter();

private InternalWriter() {}

public static ParquetValueWriter<StructLike> buildWriter(MessageType type) {
return INSTANCE.createWriter(type);
}

@Override
protected StructWriter<StructLike> createStructWriter(List<ParquetValueWriter<?>> writers) {
return new ParquetStructWriter(writers);
}

@Override
protected LogicalTypeAnnotation.LogicalTypeAnnotationVisitor<
ParquetValueWriters.PrimitiveWriter<?>>
logicalTypeWriterVisitor(ColumnDescriptor desc) {
return new LogicalTypeWriterVisitor(desc);
}

@Override
protected ParquetValueWriters.PrimitiveWriter<?> fixedWriter(ColumnDescriptor desc) {
// accepts ByteBuffer and internally writes as binary.
return ParquetValueWriters.byteBuffers(desc);
}

private static class ParquetStructWriter extends StructWriter<StructLike> {
private ParquetStructWriter(List<ParquetValueWriter<?>> writers) {
super(writers);
}

@Override
protected Object get(StructLike struct, int index) {
return struct.get(index, Object.class);
}
}

private static class LogicalTypeWriterVisitor
implements LogicalTypeAnnotation.LogicalTypeAnnotationVisitor<
ParquetValueWriters.PrimitiveWriter<?>> {
private final ColumnDescriptor desc;

private LogicalTypeWriterVisitor(ColumnDescriptor desc) {
this.desc = desc;
}

@Override
public Optional<ParquetValueWriters.PrimitiveWriter<?>> visit(
LogicalTypeAnnotation.StringLogicalTypeAnnotation stringType) {
return Optional.of(ParquetValueWriters.strings(desc));
}

@Override
public Optional<ParquetValueWriters.PrimitiveWriter<?>> visit(
LogicalTypeAnnotation.EnumLogicalTypeAnnotation enumType) {
return Optional.of(ParquetValueWriters.strings(desc));
}

@Override
public Optional<ParquetValueWriters.PrimitiveWriter<?>> visit(
LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalType) {
switch (desc.getPrimitiveType().getPrimitiveTypeName()) {
case INT32:
return Optional.of(
ParquetValueWriters.decimalAsInteger(
desc, decimalType.getPrecision(), decimalType.getScale()));
case INT64:
return Optional.of(
ParquetValueWriters.decimalAsLong(
desc, decimalType.getPrecision(), decimalType.getScale()));
case BINARY:
case FIXED_LEN_BYTE_ARRAY:
return Optional.of(
ParquetValueWriters.decimalAsFixed(
desc, decimalType.getPrecision(), decimalType.getScale()));
}
return Optional.empty();
}

@Override
public Optional<ParquetValueWriters.PrimitiveWriter<?>> visit(
LogicalTypeAnnotation.IntLogicalTypeAnnotation intType) {
Preconditions.checkArgument(
intType.isSigned() || intType.getBitWidth() < 64,
"Cannot read uint64: not a supported Java type");
if (intType.getBitWidth() < 64) {
return Optional.of(ParquetValueWriters.ints(desc));
} else {
return Optional.of(ParquetValueWriters.longs(desc));
}
}

@Override
public Optional<ParquetValueWriters.PrimitiveWriter<?>> visit(
LogicalTypeAnnotation.JsonLogicalTypeAnnotation jsonLogicalType) {
return Optional.of(ParquetValueWriters.strings(desc));
}

@Override
public Optional<ParquetValueWriters.PrimitiveWriter<?>> visit(
LogicalTypeAnnotation.BsonLogicalTypeAnnotation bsonType) {
return Optional.of(ParquetValueWriters.byteBuffers(desc));
}

@Override
public Optional<ParquetValueWriters.PrimitiveWriter<?>> visit(
LogicalTypeAnnotation.UUIDLogicalTypeAnnotation uuidLogicalType) {
return Optional.of(ParquetValueWriters.uuids(desc));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.util.UUIDUtil;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.io.api.Binary;
Expand Down Expand Up @@ -401,6 +403,17 @@ public ByteBuffer read(ByteBuffer reuse) {
}
}

public static class UUIDReader extends PrimitiveReader<UUID> {
public UUIDReader(ColumnDescriptor desc) {
super(desc);
}

@Override
public UUID read(UUID reuse) {
return UUIDUtil.convert(column.nextBinary().toByteBuffer());
}
}

public static class ByteArrayReader extends ParquetValueReaders.PrimitiveReader<byte[]> {
public ByteArrayReader(ColumnDescriptor desc) {
super(desc);
Expand Down
Loading

0 comments on commit 8c79e8e

Please sign in to comment.