diff --git a/core/src/main/java/org/apache/iceberg/TableProperties.java b/core/src/main/java/org/apache/iceberg/TableProperties.java index 8d8182963..16d941b1c 100644 --- a/core/src/main/java/org/apache/iceberg/TableProperties.java +++ b/core/src/main/java/org/apache/iceberg/TableProperties.java @@ -184,4 +184,8 @@ private TableProperties() { public static final String MERGE_CARDINALITY_CHECK_ENABLED = "write.merge.cardinality-check.enabled"; public static final boolean MERGE_CARDINALITY_CHECK_ENABLED_DEFAULT = true; + + public static final String ORC_DO_NOT_WRITE_FIELD_IDS = "write.orc.no-field-ids.enabled"; + + public static final boolean ORC_DO_NOT_WRITE_FIELD_IDS_DEFAULT = false; } diff --git a/orc/src/main/java/org/apache/iceberg/orc/ORCSchemaUtil.java b/orc/src/main/java/org/apache/iceberg/orc/ORCSchemaUtil.java index db14c1f1c..8a2d2c81d 100644 --- a/orc/src/main/java/org/apache/iceberg/orc/ORCSchemaUtil.java +++ b/orc/src/main/java/org/apache/iceberg/orc/ORCSchemaUtil.java @@ -506,7 +506,7 @@ static TypeDescription removeIds(TypeDescription type) { return OrcSchemaVisitor.visit(type, new RemoveIds()); } - static boolean hasIds(TypeDescription orcSchema) { + public static boolean hasIds(TypeDescription orcSchema) { return OrcSchemaVisitor.visit(orcSchema, new HasIds()); } diff --git a/orc/src/main/java/org/apache/iceberg/orc/OrcFileAppender.java b/orc/src/main/java/org/apache/iceberg/orc/OrcFileAppender.java index 34f71fcc2..f2f72bded 100644 --- a/orc/src/main/java/org/apache/iceberg/orc/OrcFileAppender.java +++ b/orc/src/main/java/org/apache/iceberg/orc/OrcFileAppender.java @@ -43,6 +43,9 @@ import org.apache.orc.Writer; import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch; +import static org.apache.iceberg.TableProperties.ORC_DO_NOT_WRITE_FIELD_IDS; +import static org.apache.iceberg.TableProperties.ORC_DO_NOT_WRITE_FIELD_IDS_DEFAULT; + /** * Create a file appender for ORC. */ @@ -66,13 +69,19 @@ class OrcFileAppender implements FileAppender { this.metricsConfig = metricsConfig; TypeDescription orcSchema = ORCSchemaUtil.convert(schema); + TypeDescription orcSchemaWithoutIds = ORCSchemaUtil.removeIds(orcSchema); + this.batch = orcSchema.createRowBatch(this.batchSize); OrcFile.WriterOptions options = OrcFile.writerOptions(conf).useUTCTimestamp(true); if (file instanceof HadoopOutputFile) { options.fileSystem(((HadoopOutputFile) file).getFileSystem()); } - options.setSchema(orcSchema); + if (conf.getBoolean(ORC_DO_NOT_WRITE_FIELD_IDS, ORC_DO_NOT_WRITE_FIELD_IDS_DEFAULT)) { + options.setSchema(orcSchemaWithoutIds); + } else { + options.setSchema(orcSchema); + } this.writer = newOrcWriter(file, options, metadata); this.valueWriter = newOrcRowWriter(schema, orcSchema, createWriterFunc); } diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkOrcWriteWithoutIds.java b/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkOrcWriteWithoutIds.java new file mode 100644 index 000000000..1088cd0c3 --- /dev/null +++ b/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkOrcWriteWithoutIds.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.data; + +import java.io.File; +import java.io.IOException; +import java.util.Iterator; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.Files; +import org.apache.iceberg.Schema; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.orc.ORC; +import org.apache.iceberg.orc.ORCSchemaUtil; +import org.apache.iceberg.types.Types; +import org.apache.orc.OrcFile; +import org.apache.orc.Reader; +import org.apache.spark.sql.catalyst.InternalRow; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import static org.apache.iceberg.TableProperties.ORC_DO_NOT_WRITE_FIELD_IDS; +import static org.apache.iceberg.spark.data.TestHelpers.assertEquals; +import static org.apache.iceberg.types.Types.NestedField.optional; + +public class TestSparkOrcWriteWithoutIds { + + @Rule + public TemporaryFolder temp = new TemporaryFolder(); + + private static final Schema SCHEMA = new Schema( + optional(1, "id", Types.IntegerType.get()), + optional(2, "data", Types.StringType.get()) + ); + + @Test + public void testWriteWithoutIds() throws IOException { + File testFile = temp.newFile(); + Assert.assertTrue("Delete should succeed", testFile.delete()); + + Iterable rows = RandomData.generateSpark(SCHEMA, 1, 0L); + try (FileAppender writer = ORC.write(Files.localOutput(testFile)) + .config(ORC_DO_NOT_WRITE_FIELD_IDS, String.valueOf(true)) + .createWriterFunc(SparkOrcWriter::new) + .schema(SCHEMA) + .build()) { + writer.addAll(rows); + } + + // read back using ORC native file reader and test file schema + try (Reader fileReader = OrcFile.createReader(new Path(testFile.getPath()), + new OrcFile.ReaderOptions(new Configuration()))) { + Assert.assertFalse(ORCSchemaUtil.hasIds(fileReader.getSchema())); + } + + // read back using Spark ORC reader and test overall readability + try (CloseableIterable reader = ORC.read(Files.localInput(testFile)) + .project(SCHEMA) + .createReaderFunc(readOrcSchema -> new SparkOrcReader(SCHEMA, readOrcSchema)) + .build()) { + final Iterator actualRows = reader.iterator(); + final Iterator expectedRows = rows.iterator(); + while (expectedRows.hasNext()) { + Assert.assertTrue("Should have expected number of rows", actualRows.hasNext()); + assertEquals(SCHEMA, expectedRows.next(), actualRows.next()); + } + Assert.assertFalse("Should not have extra rows", actualRows.hasNext()); + } + } +}