From b93333f4240bad2c21498622f67eec7c6d0142d4 Mon Sep 17 00:00:00 2001 From: Shivam Malhotra Date: Tue, 26 Nov 2024 23:43:07 +0530 Subject: [PATCH] fix: Added support to read iceberg tables partitioned by date (#6430) --- .../deephaven/iceberg/base/IcebergUtils.java | 6 ----- .../IcebergKeyValuePartitionedLayout.java | 26 +++++++++++++++---- .../iceberg/junit5/SqliteCatalogBase.java | 9 +------ 3 files changed, 22 insertions(+), 19 deletions(-) diff --git a/extensions/iceberg/src/main/java/io/deephaven/iceberg/base/IcebergUtils.java b/extensions/iceberg/src/main/java/io/deephaven/iceberg/base/IcebergUtils.java index 925c7a3a923..a0607d671db 100644 --- a/extensions/iceberg/src/main/java/io/deephaven/iceberg/base/IcebergUtils.java +++ b/extensions/iceberg/src/main/java/io/deephaven/iceberg/base/IcebergUtils.java @@ -3,19 +3,14 @@ // package io.deephaven.iceberg.base; -import io.deephaven.base.Pair; -import io.deephaven.engine.context.ExecutionContext; -import io.deephaven.engine.context.QueryScope; import io.deephaven.engine.table.ColumnDefinition; import io.deephaven.engine.table.TableDefinition; import io.deephaven.engine.table.impl.locations.TableDataException; import io.deephaven.iceberg.util.IcebergReadInstructions; import org.apache.iceberg.DataFile; -import org.apache.iceberg.DataFiles; import org.apache.iceberg.ManifestContent; import org.apache.iceberg.ManifestFile; import org.apache.iceberg.ManifestFiles; -import org.apache.iceberg.PartitionData; import org.apache.iceberg.PartitionField; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; @@ -26,7 +21,6 @@ import org.apache.iceberg.catalog.SupportsNamespaces; import org.apache.iceberg.exceptions.AlreadyExistsException; import org.apache.iceberg.exceptions.NamespaceNotEmptyException; -import org.apache.iceberg.types.Conversions; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; import org.jetbrains.annotations.NotNull; diff --git a/extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergKeyValuePartitionedLayout.java b/extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergKeyValuePartitionedLayout.java index f362139133f..86e63f7dbb9 100644 --- a/extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergKeyValuePartitionedLayout.java +++ b/extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergKeyValuePartitionedLayout.java @@ -13,6 +13,7 @@ import io.deephaven.util.type.TypeUtils; import org.apache.commons.lang3.mutable.MutableInt; import org.apache.iceberg.*; +import org.apache.iceberg.data.IdentityPartitionConverters; import org.jetbrains.annotations.NotNull; import java.net.URI; @@ -54,6 +55,13 @@ public IcebergKeyValuePartitionedLayout( // in the output definition, so we can ignore duplicates. final MutableInt icebergIndex = new MutableInt(0); final Map availablePartitioningColumns = partitionSpec.fields().stream() + .peek(partitionField -> { + // TODO (deephaven-core#6438): Add support to handle non-identity transforms + if (!partitionField.transform().isIdentity()) { + throw new TableDataException("Partition field " + partitionField.name() + " has a " + + "non-identity transform: " + partitionField.transform() + ", which is not supported"); + } + }) .map(PartitionField::name) .map(name -> instructions.columnRenames().getOrDefault(name, name)) .collect(Collectors.toMap( @@ -89,11 +97,19 @@ IcebergTableLocationKey keyFromDataFile( final PartitionData partitionData = (PartitionData) dataFile.partition(); for (final ColumnData colData : outputPartitioningColumns) { final String colName = colData.name; - final Object colValue = partitionData.get(colData.index); - if (colValue != null && !colData.type.isAssignableFrom(colValue.getClass())) { - throw new TableDataException("Partitioning column " + colName - + " has type " + colValue.getClass().getName() - + " but expected " + colData.type.getName()); + final Object colValue; + final Object valueFromPartitionData = partitionData.get(colData.index); + if (valueFromPartitionData != null) { + // TODO (deephaven-core#6438): Assuming identity transform here + colValue = IdentityPartitionConverters.convertConstant( + partitionData.getType(colData.index), valueFromPartitionData); + if (!colData.type.isAssignableFrom(colValue.getClass())) { + throw new TableDataException("Partitioning column " + colName + + " has type " + colValue.getClass().getName() + + " but expected " + colData.type.getName()); + } + } else { + colValue = null; } partitions.put(colName, (Comparable) colValue); } diff --git a/extensions/iceberg/src/test/java/io/deephaven/iceberg/junit5/SqliteCatalogBase.java b/extensions/iceberg/src/test/java/io/deephaven/iceberg/junit5/SqliteCatalogBase.java index 068e77b31c3..11962bf71ec 100644 --- a/extensions/iceberg/src/test/java/io/deephaven/iceberg/junit5/SqliteCatalogBase.java +++ b/extensions/iceberg/src/test/java/io/deephaven/iceberg/junit5/SqliteCatalogBase.java @@ -754,14 +754,7 @@ void testPartitionedAppendWithAllPartitioningTypes() { "DoublePC = (double) 4.0", "LocalDatePC = LocalDate.parse(`2023-10-01`)") .moveColumns(7, "data"); - - // TODO (deephaven-core#6419) Dropping the local data column since it is not supported on the read side. - // Remove this when the issue is fixed. - final TableDefinition tableDefinitionWithoutLocalDate = fromIceberg.dropColumns("LocalDatePC").getDefinition(); - final Table fromIcebergWithoutLocalDate = tableAdapter.table(IcebergReadInstructions.builder() - .tableDefinition(tableDefinitionWithoutLocalDate) - .build()); - assertTableEquals(expected.dropColumns("LocalDatePC"), fromIcebergWithoutLocalDate); + assertTableEquals(expected, fromIceberg); } @Test