Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parquet: Add readers and writers for the internal object model #11904

Open
wants to merge 4 commits into
base: main
Choose a base branch
from

Conversation

ajantha-bhat
Copy link
Member

Splitted into 3 commits,

a) Refactor BaseParquetWriter to only keep common functionality required for internal and generic writer.
b) Refactor BaseParquetReaders to only keep common functionality required for internal and generic reader.
c) Add internal writer and reader that consumes and produces the Iceberg in-memory data model.

@ajantha-bhat ajantha-bhat marked this pull request as draft January 3, 2025 16:27
@ajantha-bhat ajantha-bhat requested a review from rdblue January 3, 2025 16:30
@ajantha-bhat ajantha-bhat reopened this Jan 4, 2025
@ajantha-bhat ajantha-bhat force-pushed the parquet_internal_writer branch from 772f5c2 to 233a00b Compare January 6, 2025 11:33
- code: "java.method.abstractMethodAdded"
new: "method org.apache.iceberg.parquet.ParquetValueReaders.PrimitiveReader<?>\
\ org.apache.iceberg.data.parquet.BaseParquetReaders<T>::fixedReader(org.apache.parquet.column.ColumnDescriptor)"
justification: "{Refactor Parquet reader and writer}"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are there curly braces in the justification text?

@rdblue rdblue changed the title Parquet: Internal writer and reader Parquet: Add readers and writers for the internal object model Jan 7, 2025

@Override
public UUID read(UUID reuse) {
return UUIDUtil.convert(column.nextBinary().toByteBuffer());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks fine to me.

}

private static class LogicalTypeWriterVisitor
implements LogicalTypeAnnotation.LogicalTypeAnnotationVisitor<
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be better to import LogicalTypeAnnotation.LogicalTypeAnnotationVisitor and ParquetValueWriters.PrimitiveWriter to fix the formatting here. I know it matches what was copied, but that had been auto-formatted when the project moved to Google style.

return ParquetValueWriters.byteBuffers(desc);
}

private static class ParquetStructWriter extends StructWriter<StructLike> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be StructLikeWriter. This is in the parquet package and there isn't much value to adding it to this name.

@Override
protected ParquetValueWriters.PrimitiveWriter<?> fixedWriter(ColumnDescriptor desc) {
// accepts ByteBuffer and internally writes as binary.
return ParquetValueWriters.byteBuffers(desc);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sure this writer checks the length of the incoming bytes.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok. Also, the existing code of GenericParquetWriter also not having this length check while writing as byte[], I will add that check.


@Override
protected ParquetValueWriters.PrimitiveWriter<?> fixedWriter(ColumnDescriptor desc) {
// accepts ByteBuffer and internally writes as binary.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this comment is very helpful. Probably remove it.

return new ParquetValueReaders.UnboxedReader<>(desc);
}

private static class ParquetStructReader extends StructReader<StructLike, StructLike> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here also, there's not much value in using Parquet in the class name. Since this will produce GenericRecord instances, how about RecordReader?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When checking that name (RecordReader) for consistency, I noticed that there's already a RecordReader in GenericParquetReaders. You can reuse that class.

Copy link
Member Author

@ajantha-bhat ajantha-bhat Jan 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cannot reuse the class from GenericParquetReaders as it is based on Record interface, we need a class based on StructLike interface.

I will rename to StructLikeReader, just like the StructLikeWriter from InternalWriter class.

@Override
protected ParquetValueReaders.PrimitiveReader<?> int96Reader(ColumnDescriptor desc) {
// normal handling as int96
return new ParquetValueReaders.UnboxedReader<>(desc);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't correct. The unboxed reader will return a Binary for int96 columns. Instead, this needs to use the same logic as the Spark reader (which also uses the internal representation):

  private static class TimestampInt96Reader extends UnboxedReader<Long> {
    TimestampInt96Reader(ColumnDescriptor desc) {
      super(desc);
    }

    @Override
    public Long read(Long ignored) {
      return readLong();
    }
    @Override
    public long readLong() {
      final ByteBuffer byteBuffer =
          column.nextBinary().toByteBuffer().order(ByteOrder.LITTLE_ENDIAN);
      return ParquetUtil.extractTimestampInt96(byteBuffer);
    }
  }

You can move that class into the parquet package to share it.

@@ -359,10 +250,10 @@ public ParquetValueReader<?> primitive(

ColumnDescriptor desc = type.getColumnDescription(currentPath());

if (primitive.getOriginalType() != null) {
if (primitive.getLogicalTypeAnnotation() != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with this change, but please point these kinds of changes out for reviewers.

The old version worked because all of the supported logical type annotations had an equivalent ConvertedType (which is what OriginalType is called in Parquet format and the logical type docs).

@Override
public Optional<ParquetValueReader<?>> visit(
LogicalTypeAnnotation.DateLogicalTypeAnnotation dateLogicalType) {
return Optional.of(new DateReader(desc));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This and the following 2 methods are the only changes between the implementations of this class, so a lot of code is duplicated. In addition, this already introduces abstract factory methods for some readers -- including timestamps. I think it would be much cleaner to reuse this and call factory methods instead:

  protected abstract PrimitiveReader<?> dateReader(ColumnDescriptor desc);
  protected abstract PrimitiveReader<?> timeReader(ColumnDescriptor desc, ChronoUnit unit);
  protected abstract PrimitiveReader<?> timestampReader(ColumnDescriptor desc, ChronoUnit unit, boolean isAdjustedToUTC);

@@ -76,6 +64,16 @@ protected ParquetValueReader<T> createReader(
protected abstract ParquetValueReader<T> createStructReader(
List<Type> types, List<ParquetValueReader<?>> fieldReaders, Types.StructType structType);

protected abstract LogicalTypeAnnotation.LogicalTypeAnnotationVisitor<ParquetValueReader<?>>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it makes sense to have the subclasses provide this visitor.

private static final OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC);
private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate();

private static class DateReader extends ParquetValueReaders.PrimitiveReader<LocalDate> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with moving the date/time reader classes here.

@Override
public Optional<ParquetValueReader<?>> visit(
LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestampLogicalType) {
return Optional.of(new ParquetValueReaders.UnboxedReader<>(desc));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't correct. The unit of the incoming timestamp value still needs to be handled, even if the in-memory representation of the value is the same (a long).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like the Spark implementations for this should work well, just like the int96 cases.

@Override
public Optional<ParquetValueReader<?>> visit(
LogicalTypeAnnotation.TimeLogicalTypeAnnotation timeLogicalType) {
return Optional.of(new ParquetValueReaders.UnboxedReader<>(desc));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't correct. Like timestamp, this needs to handle the unit of the incoming value. In addition, millisecond values must annotate an int32 according to the Parquet logical type docs. When the unit is a millisecond value, this needs to call readInt and multiply by 1000.

It looks like both Spark currently gets the underlying Parquet type for milliseconds wrong (which makes sense because this is never used in Spark). We can go ahead and fix this now and share the reader between Internal and Spark.

  private static class TimestampMillisReader extends UnboxedReader<Long> {
    TimestampMillisReader(ColumnDescriptor desc) {
      super(desc);
    }

    @Override
    public Long read(Long ignored) {
      return readLong();
    }

    @Override
    public long readLong() {
      return 1000L * column.nextInteger();
    }
  }


@Override
protected ParquetValueWriters.PrimitiveWriter<?> fixedWriter(ColumnDescriptor desc) {
// accepts byte[] and internally writes as binary.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: unhelpful comment.

protected LogicalTypeAnnotation.LogicalTypeAnnotationVisitor<
ParquetValueWriters.PrimitiveWriter<?>>
logicalTypeWriterVisitor(ColumnDescriptor desc) {
return new LogicalTypeWriterVisitor(desc);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, I would also prefer not to have subclasses provide the visitor.

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

public class TestInternalWriter {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As with the Avro tests, I think this should extend DataTest. It is probably easier to do the Avro work first and then reuse it here.

\ org.apache.iceberg.data.parquet.BaseParquetReaders<T>::logicalTypeReaderVisitor(org.apache.parquet.column.ColumnDescriptor,\
\ org.apache.iceberg.types.Type.PrimitiveType, org.apache.parquet.schema.PrimitiveType)"
justification: "{Refactor Parquet reader and writer}"
- code: "java.method.abstractMethodAdded"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR should not introduce revapi failures. Instead, the new methods should have default implementations that match the previous behavior (returning the generic representations).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New methods are abstract and abstract method cannot have default implementation. So, I think we have to handle revapi failures.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I think what you mean is don't add it as abstract. Add it as methods with default implementation. I got it. I will update it today.

@ajantha-bhat ajantha-bhat force-pushed the parquet_internal_writer branch from c977d2a to 8a33e15 Compare January 11, 2025 00:08
@github-actions github-actions bot removed the API label Jan 11, 2025
@ajantha-bhat ajantha-bhat force-pushed the parquet_internal_writer branch from 8a33e15 to dae6c77 Compare January 11, 2025 08:02
@github-actions github-actions bot added the API label Jan 11, 2025
@@ -237,7 +237,7 @@ private static BigInteger randomUnscaled(int precision, Random random) {
}

public static List<Object> generateList(
Random random, Types.ListType list, Supplier<Object> elementResult) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressing the nit from #11919

import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;

public class TestInternalData extends DataTest {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kept this class in data module instead of parquet because, parquet module don't have DataTest. It will be lot of code duplication to have it in parquet module.

}

@Override
public Optional<ParquetValueWriters.PrimitiveWriter<?>> visit(
LogicalTypeAnnotation.TimeLogicalTypeAnnotation timeType) {
return Optional.of(new TimeWriter(desc));
Preconditions.checkArgument(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added this check as for timestamp there was a check to process only MICROS.

} else {
return Optional.of(new TimestampWriter(desc));
}
return timestampWriter(desc, timestampType.isAdjustedToUTC());
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should I pass a TimeUnit to avoid modifying the signature in the future when other units are supported? Also same for timeType

}
}

private static class FixedWriter extends PrimitiveWriter<byte[]> {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just moved this class from BaseParquetWriter, it was missing validation for length. I added it.

@@ -373,41 +371,6 @@ public Decimal read(Decimal ignored) {
}
}

private static class TimestampMillisReader extends UnboxedReader<Long> {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only removed for latest version of spark, thinking when we deprecate the older versions, this code will be removed.
Spark-3.3 is deprecated. Spark-3.4 Should I handle it?

@ajantha-bhat ajantha-bhat marked this pull request as ready for review January 11, 2025 08:15
@ajantha-bhat
Copy link
Member Author

@rdblue: Thanks for the review. I have addressed the comments. Please take a look at it again.

@ajantha-bhat ajantha-bhat force-pushed the parquet_internal_writer branch from dae6c77 to 3eaf3bc Compare January 11, 2025 13:28
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants