diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTree.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTree.java index 7d4f37591..03eb2d36b 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTree.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTree.java @@ -9,17 +9,11 @@ String getColumnName() { return rootNode.name; } - public IcebergFieldNode getRootNode() { + IcebergFieldNode getRootNode() { return rootNode; } IcebergColumnTree(IcebergFieldNode rootNode) { this.rootNode = rootNode; } - - /** Returns data type of the column */ - String buildType() { - StringBuilder sb = new StringBuilder(); - return rootNode.buildQuery(sb, "ROOT_NODE").toString(); - } } diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTreeFactory.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTreeFactory.java index 78b61639c..ef77dd248 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTreeFactory.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTreeFactory.java @@ -3,6 +3,7 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ObjectNode; +import com.snowflake.kafka.connector.internal.KCLogger; import java.util.LinkedHashMap; import java.util.Map; import java.util.stream.Collectors; @@ -12,6 +13,9 @@ import org.apache.kafka.connect.data.Schema; public class IcebergColumnTreeFactory { + + private final KCLogger LOGGER = new KCLogger(IcebergColumnTreeFactory.class.getName()); + private final IcebergColumnTypeMapper mapper; public IcebergColumnTreeFactory() { @@ -19,17 +23,20 @@ public IcebergColumnTreeFactory() { } IcebergColumnTree fromIcebergSchema(IcebergColumnSchema columnSchema) { + LOGGER.debug("Attempting to resolve schema from schema stored in a channel"); IcebergFieldNode rootNode = createNode(columnSchema.getColumnName().toUpperCase(), columnSchema.getSchema()); return new IcebergColumnTree(rootNode); } IcebergColumnTree fromJson(IcebergColumnJsonValuePair pair) { + LOGGER.debug("Attempting to resolve schema from records payload"); IcebergFieldNode rootNode = createNode(pair.getColumnName().toUpperCase(), pair.getJsonNode()); return new IcebergColumnTree(rootNode); } IcebergColumnTree fromConnectSchema(Field kafkaConnectField) { + LOGGER.debug("Attempting to resolve schema from schema attached to a record"); IcebergFieldNode rootNode = createNode(kafkaConnectField.name().toUpperCase(), kafkaConnectField.schema()); return new IcebergColumnTree(rootNode); diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTreeTypeBuilder.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTreeTypeBuilder.java new file mode 100644 index 000000000..a19a4f446 --- /dev/null +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTreeTypeBuilder.java @@ -0,0 +1,58 @@ +package com.snowflake.kafka.connector.internal.streaming.schemaevolution.iceberg; + +public class IcebergColumnTreeTypeBuilder { + + /** Returns data type of the column */ + String buildType(IcebergColumnTree columnTree) { + StringBuilder sb = new StringBuilder(); + IcebergFieldNode rootNode = columnTree.getRootNode(); + return buildType(sb, rootNode, ROOT_NODE_TYPE).toString(); + } + + /** + * Generate Snow SQL type for the column. + * + * @param sb StringBuilder + * @param parentType Snowflake Iceberg table compatible type. ROOT_NODE_TYPE is a special case, + * here we never generate column name for it. + * @return field name + data type + */ + private StringBuilder buildType(StringBuilder sb, IcebergFieldNode fieldNode, String parentType) { + if (parentType.equals("ARRAY") + || parentType.equals("MAP") + || parentType.equals(ROOT_NODE_TYPE)) { + sb.append(fieldNode.snowflakeIcebergType); + } else { + appendNameAndType(sb, fieldNode); + } + if (!fieldNode.children.isEmpty()) { + sb.append("("); + appendChildren(sb, fieldNode); + sb.append(")"); + } + return sb; + } + + private void appendNameAndType(StringBuilder sb, IcebergFieldNode fieldNode) { + sb.append(fieldNode.name); + sb.append(" "); + sb.append(fieldNode.snowflakeIcebergType); + } + + private void appendChildren(StringBuilder sb, IcebergFieldNode parentNode) { + String parentType = parentNode.snowflakeIcebergType; + parentNode.children.forEach( + (name, childNode) -> { + buildType(sb, childNode, parentType); + sb.append(", "); + }); + removeLastSeparator(sb); + } + + private void removeLastSeparator(StringBuilder sb) { + sb.deleteCharAt(sb.length() - 1); + sb.deleteCharAt(sb.length() - 1); + } + + private static final String ROOT_NODE_TYPE = "ROOT_NODE"; +} diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergFieldNode.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergFieldNode.java index e2d41b631..d93525078 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergFieldNode.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergFieldNode.java @@ -4,9 +4,6 @@ class IcebergFieldNode { - // todo consider refactoring into some more classes - private final IcebergColumnTypeMapper mapper = IcebergColumnTypeMapper.INSTANCE; - final String name; final String snowflakeIcebergType; @@ -19,44 +16,4 @@ public IcebergFieldNode( this.snowflakeIcebergType = snowflakeIcebergType; this.children = children; } - - /** - * @param sb StringBuilder - * @param parentType Snowflake Iceberg table compatible type. If a root node is a parent then - * "ROOT_NODE" is passed, because we always generate root nodes column name. - * @return field name + data type - */ - StringBuilder buildQuery(StringBuilder sb, String parentType) { - if (parentType.equals("ARRAY") || parentType.equals("MAP") || parentType.equals("ROOT_NODE")) { - sb.append(snowflakeIcebergType); - } else { - appendNameAndType(sb); - } - if (!children.isEmpty()) { - sb.append("("); - appendChildren(sb, this.snowflakeIcebergType); - sb.append(")"); - } - return sb; - } - - private void appendNameAndType(StringBuilder sb) { - sb.append(name); - sb.append(" "); - sb.append(snowflakeIcebergType); - } - - private void appendChildren(StringBuilder sb, String parentType) { - children.forEach( - (name, node) -> { - node.buildQuery(sb, parentType); - sb.append(", "); - }); - removeLastSeparator(sb); - } - - private void removeLastSeparator(StringBuilder sb) { - sb.deleteCharAt(sb.length() - 1); - sb.deleteCharAt(sb.length() - 1); - } } diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergSchemaEvolutionService.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergSchemaEvolutionService.java index bace2efde..6aa020f3c 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergSchemaEvolutionService.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergSchemaEvolutionService.java @@ -22,11 +22,13 @@ public class IcebergSchemaEvolutionService implements SchemaEvolutionService { private final SnowflakeConnectionService conn; private final IcebergTableSchemaResolver icebergTableSchemaResolver; private final IcebergColumnTreeMerger mergeTreeService; + private final IcebergColumnTreeTypeBuilder typeBuilder; public IcebergSchemaEvolutionService(SnowflakeConnectionService conn) { this.conn = conn; this.icebergTableSchemaResolver = new IcebergTableSchemaResolver(); this.mergeTreeService = new IcebergColumnTreeMerger(); + this.typeBuilder = new IcebergColumnTreeTypeBuilder(); } /** @@ -162,7 +164,7 @@ private Map toColumnInfos(List columnTre .map( columnTree -> Maps.immutableEntry( - columnTree.getColumnName(), new ColumnInfos(columnTree.buildType()))) + columnTree.getColumnName(), new ColumnInfos(typeBuilder.buildType(columnTree)))) .collect( Collectors.toMap( Map.Entry::getKey, Map.Entry::getValue, (oldValue, newValue) -> newValue)); diff --git a/src/test/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/ParseIcebergColumnTreeTest.java b/src/test/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/ParseIcebergColumnTreeTest.java index 2f0b3074e..645c51338 100644 --- a/src/test/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/ParseIcebergColumnTreeTest.java +++ b/src/test/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/ParseIcebergColumnTreeTest.java @@ -24,6 +24,7 @@ public class ParseIcebergColumnTreeTest { private final IcebergColumnTreeFactory treeFactory = new IcebergColumnTreeFactory(); private final IcebergColumnTreeMerger mergeTreeService = new IcebergColumnTreeMerger(); + private final IcebergColumnTreeTypeBuilder typeBuilder = new IcebergColumnTreeTypeBuilder(); @ParameterizedTest @MethodSource("icebergSchemas") @@ -34,7 +35,7 @@ void parseFromApacheIcebergSchema(String plainIcebergSchema, String expectedType IcebergColumnSchema apacheSchema = new IcebergColumnSchema(type, "TEST_COLUMN_NAME"); IcebergColumnTree tree = treeFactory.fromIcebergSchema(apacheSchema); // then - Assertions.assertEquals(expectedType, tree.buildType()); + Assertions.assertEquals(expectedType, typeBuilder.buildType(tree)); Assertions.assertEquals("TEST_COLUMN_NAME", tree.getColumnName()); } @@ -99,7 +100,7 @@ void parseFromJsonRecordSchema(String jsonString, String expectedType) { // when IcebergColumnTree tree = treeFactory.fromJson(columnValuePair); // then - Assertions.assertEquals(expectedType, tree.buildType()); + Assertions.assertEquals(expectedType, typeBuilder.buildType(tree)); Assertions.assertEquals("TESTCOLUMNNAME", tree.getColumnName()); } @@ -171,7 +172,7 @@ void mergeTwoTreesTest(String plainIcebergSchema, String recordJson, String expe mergeTreeService.merge(alreadyExistingTree, modifiedTree); String expected = expectedResult.replaceAll("/ +/g", " "); - Assertions.assertEquals(expected, alreadyExistingTree.buildType()); + Assertions.assertEquals(expected, typeBuilder.buildType(alreadyExistingTree)); Assertions.assertEquals("TESTSTRUCT", alreadyExistingTree.getColumnName()); }