From f918bd0011a35a472a4f6357ad6c9351422319ff Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bart=C5=82omiej=20Z=C4=85bek?= Date: Mon, 2 Dec 2024 10:29:13 +0100 Subject: [PATCH] apply changes suggested in a review --- .../SnowflakeConnectionServiceV1.java | 20 ++++++++------ .../SchemaEvolutionService.java | 1 + .../iceberg/IcebergColumnTreeTypeBuilder.java | 4 +-- .../IcebergSchemaEvolutionService.java | 20 +++++++++----- .../iceberg/IcebergTableSchemaResolver.java | 10 +++---- .../iceberg/ParseIcebergColumnTreeTest.java | 13 +++++----- .../IcebergIngestionSchemaEvolutionIT.java | 26 +++++++++---------- .../streaming/iceberg/TestJsons.java | 7 +---- 8 files changed, 53 insertions(+), 48 deletions(-) diff --git a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceV1.java b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceV1.java index 4afa0d315..51171e928 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceV1.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceV1.java @@ -30,6 +30,7 @@ import java.util.Optional; import java.util.Properties; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import net.snowflake.client.jdbc.SnowflakeConnectionV1; import net.snowflake.client.jdbc.SnowflakeDriver; import net.snowflake.client.jdbc.cloud.storage.StageInfo; @@ -515,15 +516,18 @@ public void alterColumnsDataTypeIcebergTable( private String generateAlterSetDataTypeQuery(Map columnsToModify) { StringBuilder setDataTypeQuery = new StringBuilder("alter iceberg "); setDataTypeQuery.append("table identifier(?) alter column "); - for (Map.Entry column : columnsToModify.entrySet()) { - String columnName = column.getKey(); - String dataType = column.getValue().getColumnType(); - setDataTypeQuery.append(columnName).append(" set data type ").append(dataType).append(", "); - } - // remove last comma and whitespace - setDataTypeQuery.deleteCharAt(setDataTypeQuery.length() - 1); - setDataTypeQuery.deleteCharAt(setDataTypeQuery.length() - 1); + String columnsPart = + columnsToModify.entrySet().stream() + .map( + column -> { + String columnName = column.getKey(); + String dataType = column.getValue().getColumnType(); + return columnName + " set data type " + dataType; + }) + .collect(Collectors.joining(", ")); + + setDataTypeQuery.append(columnsPart); return setDataTypeQuery.toString(); } diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/SchemaEvolutionService.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/SchemaEvolutionService.java index ce4222234..8be59e787 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/SchemaEvolutionService.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/SchemaEvolutionService.java @@ -13,6 +13,7 @@ public interface SchemaEvolutionService { * @param targetItems target items for schema evolution such as table name, columns to drop * nullability, and columns to add * @param record the sink record that contains the schema and actual data + * @param existingSchema schema stored in a channel */ void evolveSchemaIfNeeded( SchemaEvolutionTargetItems targetItems, diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTreeTypeBuilder.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTreeTypeBuilder.java index a83ab737e..bc25cc72d 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTreeTypeBuilder.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTreeTypeBuilder.java @@ -2,6 +2,8 @@ public class IcebergColumnTreeTypeBuilder { + private static final String ROOT_NODE_TYPE = "ROOT_NODE"; + /** Returns data type of the column */ String buildType(IcebergColumnTree columnTree) { StringBuilder sb = new StringBuilder(); @@ -53,6 +55,4 @@ private void removeLastSeparator(StringBuilder sb) { sb.deleteCharAt(sb.length() - 1); sb.deleteCharAt(sb.length() - 1); } - - private static final String ROOT_NODE_TYPE = "ROOT_NODE"; } diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergSchemaEvolutionService.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergSchemaEvolutionService.java index 6aa020f3c..99aa4fe1d 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergSchemaEvolutionService.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergSchemaEvolutionService.java @@ -6,9 +6,7 @@ import com.snowflake.kafka.connector.internal.streaming.schemaevolution.ColumnInfos; import com.snowflake.kafka.connector.internal.streaming.schemaevolution.SchemaEvolutionService; import com.snowflake.kafka.connector.internal.streaming.schemaevolution.SchemaEvolutionTargetItems; -import java.util.List; -import java.util.Map; -import java.util.Set; +import java.util.*; import java.util.stream.Collectors; import net.snowflake.ingest.streaming.internal.ColumnProperties; import org.apache.kafka.connect.sink.SinkRecord; @@ -150,12 +148,20 @@ private void mergeChangesIntoExistingColumns( List alreadyExistingColumns, List modifiedColumns) { alreadyExistingColumns.forEach( existingColumn -> { - IcebergColumnTree mewVersion = + List modifiedColumnMatchingExisting = modifiedColumns.stream() .filter(c -> c.getColumnName().equals(existingColumn.getColumnName())) - .collect(Collectors.toList()) - .get(0); - mergeTreeService.merge(existingColumn, mewVersion); + .collect(Collectors.toList()); + if (modifiedColumnMatchingExisting.size() != 1) { + LOGGER.warn( + "Skipping schema evolution of a column {}. Incorrect number of new versions of the" + + " column: {}", + existingColumn.getColumnName(), + modifiedColumnMatchingExisting.stream() + .map(IcebergColumnTree::getColumnName) + .collect(Collectors.toList())); + } + mergeTreeService.merge(existingColumn, modifiedColumnMatchingExisting.get(0)); }); } diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergTableSchemaResolver.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergTableSchemaResolver.java index 6998151f8..cf4df17e8 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergTableSchemaResolver.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergTableSchemaResolver.java @@ -47,12 +47,12 @@ public List resolveIcebergSchemaFromRecord( } if (hasSchema(record)) { LOGGER.debug( - "Schema found. Evolve columns basing on a record schema, column: " + columnsToEvolve); + "Schema found. Evolve columns basing on a record's schema, column: {}", columnsToEvolve); return getTableSchemaFromRecordSchema(record, columnsToEvolve); } else { LOGGER.debug( - "Schema NOT found. Evolve columns basing on a records payload, columns: " - + columnsToEvolve); + "Schema NOT found. Evolve columns basing on a record's payload, columns: {}", + columnsToEvolve); return getTableSchemaFromJson(record, columnsToEvolve); } } @@ -77,7 +77,7 @@ private static String getIcebergSchema(ColumnProperties columnProperties) { return (String) field.get(columnProperties); } catch (IllegalAccessException | NoSuchFieldException e) { throw new IllegalStateException( - "Couldn't set iceberg by accessing private field: " + "isIceberg", e); + "Couldn't set iceberg by accessing private field: isIceberg", e); } } @@ -102,7 +102,7 @@ private List getTableSchemaFromJson( * Given a SinkRecord, get the schema information from it * * @param record the sink record that contains the schema and actual data - * @return list of column represantation in a form of tree + * @return list of column representation in a form of tree */ private List getTableSchemaFromRecordSchema( SinkRecord record, Set columnsToEvolve) { diff --git a/src/test/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/ParseIcebergColumnTreeTest.java b/src/test/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/ParseIcebergColumnTreeTest.java index e2d765be7..328210378 100644 --- a/src/test/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/ParseIcebergColumnTreeTest.java +++ b/src/test/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/ParseIcebergColumnTreeTest.java @@ -93,7 +93,7 @@ static Stream icebergSchemas() { @MethodSource("parseFromJsonArguments") void parseFromJsonRecordSchema(String jsonString, String expectedType) { // given - SinkRecord record = createKafkaRecord(jsonString, false); + SinkRecord record = createKafkaRecord(jsonString); JsonNode recordNode = RecordService.convertToJson(record.valueSchema(), record.value(), true); IcebergColumnJsonValuePair columnValuePair = IcebergColumnJsonValuePair.from(recordNode.fields().next()); @@ -161,15 +161,15 @@ void mergeTwoTreesTest(String plainIcebergSchema, String recordJson, String expe IcebergColumnTree alreadyExistingTree = treeFactory.fromIcebergSchema(apacheSchema); // tree parsed from a record - SinkRecord record = createKafkaRecord(recordJson, false); + SinkRecord record = createKafkaRecord(recordJson); JsonNode recordNode = RecordService.convertToJson(record.valueSchema(), record.value(), true); IcebergColumnJsonValuePair columnValuePair = IcebergColumnJsonValuePair.from(recordNode.fields().next()); IcebergColumnTree modifiedTree = treeFactory.fromJson(columnValuePair); - // then + // when mergeTreeService.merge(alreadyExistingTree, modifiedTree); - + // then String expected = expectedResult.replaceAll("/ +/g", " "); Assertions.assertEquals(expected, typeBuilder.buildType(alreadyExistingTree)); Assertions.assertEquals("TESTSTRUCT", alreadyExistingTree.getColumnName()); @@ -220,11 +220,10 @@ static Stream mergeTestArguments() { "ARRAY(OBJECT(primitive BOOLEAN, new_field LONG))")); } - protected SinkRecord createKafkaRecord(String jsonString, boolean withSchema) { + protected SinkRecord createKafkaRecord(String jsonString) { int offset = 0; JsonConverter converter = new JsonConverter(); - converter.configure( - Collections.singletonMap("schemas.enable", Boolean.toString(withSchema)), false); + converter.configure(Collections.singletonMap("schemas.enable", Boolean.toString(false)), false); SchemaAndValue inputValue = converter.toConnectData("TOPIC_NAME", jsonString.getBytes(StandardCharsets.UTF_8)); Headers headers = new ConnectHeaders(); diff --git a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionSchemaEvolutionIT.java b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionSchemaEvolutionIT.java index 4dde98941..4f7b45b1d 100644 --- a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionSchemaEvolutionIT.java +++ b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionSchemaEvolutionIT.java @@ -24,6 +24,13 @@ public class IcebergIngestionSchemaEvolutionIT extends IcebergIngestionIT { + private static final String RECORD_METADATA_TYPE = + "OBJECT(offset NUMBER(10,0), topic VARCHAR(16777216), partition NUMBER(10,0), key" + + " VARCHAR(16777216), schema_id NUMBER(10,0), key_schema_id NUMBER(10,0)," + + " CreateTime NUMBER(19,0), LogAppendTime NUMBER(19,0)," + + " SnowflakeConnectorPushTime NUMBER(19,0), headers MAP(VARCHAR(16777216)," + + " VARCHAR(16777216)))"; + @Override protected Boolean isSchemaEvolutionEnabled() { return true; @@ -116,7 +123,7 @@ private static Stream prepareData() { /** Verify a scenario when structure is enriched with another field. */ @Test - @Disabled + // @Disabled public void alterStructure_noSchema() throws Exception { // k1, k2 String testStruct1 = "{ \"testStruct\": { \"k1\" : 1, \"k2\" : 2 } }"; @@ -194,7 +201,7 @@ private void assertRecordsInTable() { } @Test - @Disabled + // @Disabled public void testComplexRecordEvolution_withSchema() throws Exception { insertWithRetry(complexJsonWithSchemaExample, 0, true); waitForOffset(1); @@ -233,7 +240,7 @@ public void testComplexRecordEvolution_withSchema() throws Exception { } @Test - @Disabled + // @Disabled public void testComplexRecordEvolution() throws Exception { insertWithRetry(complexJsonPayloadExample, 0, false); waitForOffset(1); @@ -272,7 +279,7 @@ public void testComplexRecordEvolution() throws Exception { /** Test just for a scenario when we see a record for the first time. */ @ParameterizedTest @MethodSource("schemasAndPayloads_brandNewColumns") - @Disabled + // @Disabled public void addBrandNewColumns_withSchema( String payloadWithSchema, String expectedColumnName, String expectedType) throws Exception { // when @@ -303,7 +310,7 @@ private static Stream schemasAndPayloads_brandNewColumns() { @ParameterizedTest @MethodSource("primitiveEvolutionDataSource") - @Disabled + // @Disabled public void testEvolutionOfPrimitives_withSchema( String singleBooleanField, String booleanAndInt, @@ -392,7 +399,7 @@ private static Stream primitiveEvolutionDataSource() { @ParameterizedTest @MethodSource("testEvolutionOfComplexTypes_dataSource") - @Disabled + // @Disabled public void testEvolutionOfComplexTypes_withSchema( String objectVarchar, String objectWithNestedObject, @@ -476,11 +483,4 @@ private static Stream testEvolutionOfComplexTypes_dataSource() { twoObjectsExtendedWithMapAndArrayPayload(), false)); } - - private static final String RECORD_METADATA_TYPE = - "OBJECT(offset NUMBER(10,0), topic VARCHAR(16777216), partition NUMBER(10,0), key" - + " VARCHAR(16777216), schema_id NUMBER(10,0), key_schema_id NUMBER(10,0)," - + " CreateTime NUMBER(19,0), LogAppendTime NUMBER(19,0)," - + " SnowflakeConnectorPushTime NUMBER(19,0), headers MAP(VARCHAR(16777216)," - + " VARCHAR(16777216)))"; } diff --git a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/TestJsons.java b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/TestJsons.java index 5bfa9c637..449e1187e 100644 --- a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/TestJsons.java +++ b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/TestJsons.java @@ -6,7 +6,7 @@ */ class TestJsons { - public static String nestedObjectWithSchema() { + static String nestedObjectWithSchema() { return "{" + " \"schema\": {" + " \"type\": \"struct\"," @@ -501,9 +501,4 @@ static String twoObjectsExtendedWithMapAndArrayPayload() { private static final String SCHEMA_BEGINNING = "{ \"schema\": { \"type\": \"struct\", \"fields\": ["; private static final String SCHEMA_END = "]},"; - - private static final String OBJECT_SCHEMA_BEGINNING = - "{\"field\": \"object\", \"type\": \"struct\", \"fields\": ["; - - private static final String OBJECT_SCHEMA_END = "]}"; }