diff --git a/hivelink-core/src/main/java/org/apache/iceberg/hivelink/core/HiveMetadataPreservingTableOperations.java b/hivelink-core/src/main/java/org/apache/iceberg/hivelink/core/HiveMetadataPreservingTableOperations.java index c95331e2a..0c5b92bd4 100644 --- a/hivelink-core/src/main/java/org/apache/iceberg/hivelink/core/HiveMetadataPreservingTableOperations.java +++ b/hivelink-core/src/main/java/org/apache/iceberg/hivelink/core/HiveMetadataPreservingTableOperations.java @@ -30,6 +30,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; +import java.util.stream.IntStream; import org.apache.avro.Schema; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; @@ -40,15 +41,17 @@ import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.SerDeInfo; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.avro.AvroObjectInspectorGenerator; +import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.iceberg.ClientPool; import org.apache.iceberg.TableMetadata; -import org.apache.iceberg.avro.AvroSchemaUtil; import org.apache.iceberg.common.DynMethods; import org.apache.iceberg.exceptions.AlreadyExistsException; import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.exceptions.CommitStateUnknownException; import org.apache.iceberg.exceptions.NoSuchTableException; -import org.apache.iceberg.hive.HiveSchemaUtil; import org.apache.iceberg.hive.HiveTableOperations; import org.apache.iceberg.hive.MetastoreUtil; import org.apache.iceberg.io.FileIO; @@ -75,8 +78,6 @@ public class HiveMetadataPreservingTableOperations extends HiveTableOperations { private final String database; private final String tableName; private final String fullName; - // Redefine avro.schema.literal to avoid dependency on hive-serde - public static final String AVRO_SCHEMA_LITERAL_KEY = "avro.schema.literal"; private static final DynMethods.UnboundMethod ALTER_TABLE = DynMethods.builder("alter_table") @@ -296,7 +297,13 @@ static boolean fixMismatchedSchema(Table table) { return false; } Schema schema = new Schema.Parser().parse(avroSchemaLiteral); - List hiveCols = HiveSchemaUtil.convert(AvroSchemaUtil.toIceberg(schema)); + List hiveCols; + try { + hiveCols = getColsFromAvroSchema(schema); + } catch (SerDeException e) { + LOG.error("Failed to get get columns from avro schema when checking schema", e); + return false; + } boolean schemaMismatched; if (table.getSd().getCols().size() != hiveCols.size()) { @@ -333,10 +340,29 @@ static boolean fixMismatchedSchema(Table table) { return schemaMismatched; } + private static List getColsFromAvroSchema(Schema schema) throws SerDeException { + AvroObjectInspectorGenerator avroOI = new AvroObjectInspectorGenerator(schema); + List columnNames = avroOI.getColumnNames(); + List columnTypes = avroOI.getColumnTypes(); + if (columnNames.size() != columnTypes.size()) { + throw new IllegalStateException(); + } + + return IntStream.range(0, columnNames.size()) + .mapToObj(i -> new FieldSchema(columnNames.get(i), columnTypes.get(i).getTypeName(), "")) + .collect(Collectors.toList()); + } + private static String getAvroSchemaLiteral(Table table) { - String schemaStr = table.getParameters().get(AVRO_SCHEMA_LITERAL_KEY); + String schemaStr = + table.getParameters().get(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName()); if (Strings.isNullOrEmpty(schemaStr)) { - schemaStr = table.getSd().getSerdeInfo().getParameters().get(AVRO_SCHEMA_LITERAL_KEY); + schemaStr = + table + .getSd() + .getSerdeInfo() + .getParameters() + .get(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName()); } return schemaStr; } diff --git a/hivelink-core/src/test/java/org/apache/iceberg/hivelink/core/TestHiveMetadataPreservingTableOperations.java b/hivelink-core/src/test/java/org/apache/iceberg/hivelink/core/TestHiveMetadataPreservingTableOperations.java index 262289d01..cfcc271d3 100644 --- a/hivelink-core/src/test/java/org/apache/iceberg/hivelink/core/TestHiveMetadataPreservingTableOperations.java +++ b/hivelink-core/src/test/java/org/apache/iceberg/hivelink/core/TestHiveMetadataPreservingTableOperations.java @@ -19,6 +19,7 @@ package org.apache.iceberg.hivelink.core; import java.util.Collections; +import java.util.List; import java.util.Map; import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.FieldSchema; @@ -45,33 +46,16 @@ public void testFixMismatchedSchema() { + ",{\"name\":\"Nested\", \"type\":{\"name\":\"Nested\",\"type\":\"record\",\"fields\":[{\"name\":\"Field1\"," + "\"type\":\"string\"}, {\"name\":\"Field2\",\"type\":\"string\"}]}}]}"; - long currentTimeMillis = System.currentTimeMillis(); - StorageDescriptor storageDescriptor = new StorageDescriptor(); - FieldSchema field1 = new FieldSchema("name", "string", null); - FieldSchema field2 = new FieldSchema("id", "int", null); - FieldSchema field3 = new FieldSchema("nested", "struct", null); - // Set cols with incorrect nested type - storageDescriptor.setCols( - ImmutableList.of( - field1, - field2, - new FieldSchema("nested", "struct", ""))); - storageDescriptor.setInputFormat("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat"); - Map parameters = ImmutableMap.of("avro.schema.literal", testSchemaLiteral); + FieldSchema field1 = new FieldSchema("name", "string", ""); + FieldSchema field2 = new FieldSchema("id", "int", ""); + FieldSchema field3 = new FieldSchema("nested", "struct", ""); Table tbl = - new Table( - "tableName", - "dbName", - System.getProperty("user.name"), - (int) currentTimeMillis / 1000, - (int) currentTimeMillis / 1000, - Integer.MAX_VALUE, - storageDescriptor, - Collections.emptyList(), - parameters, - null, - null, - TableType.EXTERNAL_TABLE.toString()); + createTestTable( + ImmutableList.of( + field1, + field2, + new FieldSchema("nested", "struct", "")), + testSchemaLiteral); Assert.assertTrue(HiveMetadataPreservingTableOperations.fixMismatchedSchema(tbl)); Assert.assertEquals(3, tbl.getSd().getColsSize()); @@ -79,12 +63,12 @@ public void testFixMismatchedSchema() { Assert.assertEquals(field2, tbl.getSd().getCols().get(1)); Assert.assertEquals(field3, tbl.getSd().getCols().get(2)); Assert.assertTrue( - storageDescriptor + tbl.getSd() .getSerdeInfo() .getParameters() .containsKey(HiveMetadataPreservingTableOperations.ORC_COLUMNS)); Assert.assertTrue( - storageDescriptor + tbl.getSd() .getSerdeInfo() .getParameters() .containsKey(HiveMetadataPreservingTableOperations.ORC_COLUMNS_TYPES)); @@ -93,4 +77,43 @@ public void testFixMismatchedSchema() { tbl.setParameters(ImmutableMap.of("avro.schema.literal", testSchemaLiteralWithUppercase)); Assert.assertFalse(HiveMetadataPreservingTableOperations.fixMismatchedSchema(tbl)); } + + @Test + public void testFixMismatchedSchemaWithSingleUnionType() { + // Schema literal with 2 fields (name, uniontest) + String testSchemaLiteral = + "{\"name\":\"testSchema\",\"type\":\"record\",\"namespace\":\"com.linkedin.test\"," + + "\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"testunion\"," + + "\"type\":[{\"type\":\"string\"}]}]}"; + + FieldSchema field1 = new FieldSchema("name", "string", ""); + FieldSchema field2 = new FieldSchema("testunion", "uniontype", ""); + Table tbl = createTestTable(ImmutableList.of(field1, field2), testSchemaLiteral); + + // Make sure that uniontype info is not lost and detected as a mismatch + Assert.assertFalse(HiveMetadataPreservingTableOperations.fixMismatchedSchema(tbl)); + } + + Table createTestTable(List fields, String schemaLiteral) { + long currentTimeMillis = System.currentTimeMillis(); + StorageDescriptor storageDescriptor = new StorageDescriptor(); + storageDescriptor.setCols(fields); + storageDescriptor.setInputFormat("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat"); + Map parameters = ImmutableMap.of("avro.schema.literal", schemaLiteral); + Table tbl = + new Table( + "tableName", + "dbName", + System.getProperty("user.name"), + (int) currentTimeMillis / 1000, + (int) currentTimeMillis / 1000, + Integer.MAX_VALUE, + storageDescriptor, + Collections.emptyList(), + parameters, + null, + null, + TableType.EXTERNAL_TABLE.toString()); + return tbl; + } }