Skip to content

Commit

Permalink
Revert 7dd301f and add unit test (#146)
Browse files Browse the repository at this point in the history
* Revert 7dd301f and add unit test
  • Loading branch information
jack-moseley authored Jun 2, 2023
1 parent 7dd301f commit dc5f7e8
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
Expand All @@ -40,15 +41,17 @@
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.SerDeInfo;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.avro.AvroObjectInspectorGenerator;
import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.iceberg.ClientPool;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.common.DynMethods;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.hive.HiveSchemaUtil;
import org.apache.iceberg.hive.HiveTableOperations;
import org.apache.iceberg.hive.MetastoreUtil;
import org.apache.iceberg.io.FileIO;
Expand All @@ -75,8 +78,6 @@ public class HiveMetadataPreservingTableOperations extends HiveTableOperations {
private final String database;
private final String tableName;
private final String fullName;
// Redefine avro.schema.literal to avoid dependency on hive-serde
public static final String AVRO_SCHEMA_LITERAL_KEY = "avro.schema.literal";

private static final DynMethods.UnboundMethod ALTER_TABLE =
DynMethods.builder("alter_table")
Expand Down Expand Up @@ -296,7 +297,13 @@ static boolean fixMismatchedSchema(Table table) {
return false;
}
Schema schema = new Schema.Parser().parse(avroSchemaLiteral);
List<FieldSchema> hiveCols = HiveSchemaUtil.convert(AvroSchemaUtil.toIceberg(schema));
List<FieldSchema> hiveCols;
try {
hiveCols = getColsFromAvroSchema(schema);
} catch (SerDeException e) {
LOG.error("Failed to get get columns from avro schema when checking schema", e);
return false;
}

boolean schemaMismatched;
if (table.getSd().getCols().size() != hiveCols.size()) {
Expand Down Expand Up @@ -333,10 +340,29 @@ static boolean fixMismatchedSchema(Table table) {
return schemaMismatched;
}

private static List<FieldSchema> getColsFromAvroSchema(Schema schema) throws SerDeException {
AvroObjectInspectorGenerator avroOI = new AvroObjectInspectorGenerator(schema);
List<String> columnNames = avroOI.getColumnNames();
List<TypeInfo> columnTypes = avroOI.getColumnTypes();
if (columnNames.size() != columnTypes.size()) {
throw new IllegalStateException();
}

return IntStream.range(0, columnNames.size())
.mapToObj(i -> new FieldSchema(columnNames.get(i), columnTypes.get(i).getTypeName(), ""))
.collect(Collectors.toList());
}

private static String getAvroSchemaLiteral(Table table) {
String schemaStr = table.getParameters().get(AVRO_SCHEMA_LITERAL_KEY);
String schemaStr =
table.getParameters().get(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName());
if (Strings.isNullOrEmpty(schemaStr)) {
schemaStr = table.getSd().getSerdeInfo().getParameters().get(AVRO_SCHEMA_LITERAL_KEY);
schemaStr =
table
.getSd()
.getSerdeInfo()
.getParameters()
.get(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName());
}
return schemaStr;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.iceberg.hivelink.core;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
Expand All @@ -45,46 +46,29 @@ public void testFixMismatchedSchema() {
+ ",{\"name\":\"Nested\", \"type\":{\"name\":\"Nested\",\"type\":\"record\",\"fields\":[{\"name\":\"Field1\","
+ "\"type\":\"string\"}, {\"name\":\"Field2\",\"type\":\"string\"}]}}]}";

long currentTimeMillis = System.currentTimeMillis();
StorageDescriptor storageDescriptor = new StorageDescriptor();
FieldSchema field1 = new FieldSchema("name", "string", null);
FieldSchema field2 = new FieldSchema("id", "int", null);
FieldSchema field3 = new FieldSchema("nested", "struct<field1:string,field2:string>", null);
// Set cols with incorrect nested type
storageDescriptor.setCols(
ImmutableList.of(
field1,
field2,
new FieldSchema("nested", "struct<field1:int," + "field2:string>", "")));
storageDescriptor.setInputFormat("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat");
Map<String, String> parameters = ImmutableMap.of("avro.schema.literal", testSchemaLiteral);
FieldSchema field1 = new FieldSchema("name", "string", "");
FieldSchema field2 = new FieldSchema("id", "int", "");
FieldSchema field3 = new FieldSchema("nested", "struct<field1:string,field2:string>", "");
Table tbl =
new Table(
"tableName",
"dbName",
System.getProperty("user.name"),
(int) currentTimeMillis / 1000,
(int) currentTimeMillis / 1000,
Integer.MAX_VALUE,
storageDescriptor,
Collections.emptyList(),
parameters,
null,
null,
TableType.EXTERNAL_TABLE.toString());
createTestTable(
ImmutableList.of(
field1,
field2,
new FieldSchema("nested", "struct<field1:int," + "field2:string>", "")),
testSchemaLiteral);

Assert.assertTrue(HiveMetadataPreservingTableOperations.fixMismatchedSchema(tbl));
Assert.assertEquals(3, tbl.getSd().getColsSize());
Assert.assertEquals(field1, tbl.getSd().getCols().get(0));
Assert.assertEquals(field2, tbl.getSd().getCols().get(1));
Assert.assertEquals(field3, tbl.getSd().getCols().get(2));
Assert.assertTrue(
storageDescriptor
tbl.getSd()
.getSerdeInfo()
.getParameters()
.containsKey(HiveMetadataPreservingTableOperations.ORC_COLUMNS));
Assert.assertTrue(
storageDescriptor
tbl.getSd()
.getSerdeInfo()
.getParameters()
.containsKey(HiveMetadataPreservingTableOperations.ORC_COLUMNS_TYPES));
Expand All @@ -93,4 +77,43 @@ public void testFixMismatchedSchema() {
tbl.setParameters(ImmutableMap.of("avro.schema.literal", testSchemaLiteralWithUppercase));
Assert.assertFalse(HiveMetadataPreservingTableOperations.fixMismatchedSchema(tbl));
}

@Test
public void testFixMismatchedSchemaWithSingleUnionType() {
// Schema literal with 2 fields (name, uniontest)
String testSchemaLiteral =
"{\"name\":\"testSchema\",\"type\":\"record\",\"namespace\":\"com.linkedin.test\","
+ "\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"testunion\","
+ "\"type\":[{\"type\":\"string\"}]}]}";

FieldSchema field1 = new FieldSchema("name", "string", "");
FieldSchema field2 = new FieldSchema("testunion", "uniontype<string>", "");
Table tbl = createTestTable(ImmutableList.of(field1, field2), testSchemaLiteral);

// Make sure that uniontype info is not lost and detected as a mismatch
Assert.assertFalse(HiveMetadataPreservingTableOperations.fixMismatchedSchema(tbl));
}

Table createTestTable(List<FieldSchema> fields, String schemaLiteral) {
long currentTimeMillis = System.currentTimeMillis();
StorageDescriptor storageDescriptor = new StorageDescriptor();
storageDescriptor.setCols(fields);
storageDescriptor.setInputFormat("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat");
Map<String, String> parameters = ImmutableMap.of("avro.schema.literal", schemaLiteral);
Table tbl =
new Table(
"tableName",
"dbName",
System.getProperty("user.name"),
(int) currentTimeMillis / 1000,
(int) currentTimeMillis / 1000,
Integer.MAX_VALUE,
storageDescriptor,
Collections.emptyList(),
parameters,
null,
null,
TableType.EXTERNAL_TABLE.toString());
return tbl;
}
}

0 comments on commit dc5f7e8

Please sign in to comment.