Skip to content

Commit

Permalink
detach create and merge node logic to services
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-bzabek committed Nov 29, 2024
1 parent 7ff97e7 commit 665dbb8
Show file tree
Hide file tree
Showing 7 changed files with 221 additions and 187 deletions.
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
package com.snowflake.kafka.connector.internal.streaming.schemaevolution.iceberg;

import com.google.common.base.Preconditions;
import org.apache.kafka.connect.data.Field;

/** Class with object types compatible with Snowflake Iceberg table */
class IcebergColumnTree {

Expand All @@ -12,30 +9,12 @@ String getColumnName() {
return rootNode.name;
}

IcebergColumnTree(IcebergColumnSchema columnSchema) {
// rootNodes name serve as a name of the column, hence it is uppercase
String columnName = columnSchema.getColumnName().toUpperCase();
this.rootNode = new IcebergFieldNode(columnName, columnSchema.getSchema());
}

IcebergColumnTree(IcebergColumnJsonValuePair pair) {
// rootNodes name serve as a name of the column, hence it is uppercase
String columnName = pair.getColumnName().toUpperCase();
this.rootNode = new IcebergFieldNode(columnName, pair.getJsonNode());
}

IcebergColumnTree(Field field) {
String columnName = field.name().toUpperCase();
this.rootNode = new IcebergFieldNode(columnName, field.schema());
public IcebergFieldNode getRootNode() {
return rootNode;
}

/** Add fields from other tree. Do not override nor modify any already existing nodes. */
IcebergColumnTree merge(IcebergColumnTree modifiedTree) {
Preconditions.checkArgument(
this.getColumnName().equals(modifiedTree.getColumnName()),
"Error merging column schemas. Tried to merge schemas for two different columns");
this.rootNode.merge(modifiedTree.rootNode);
return this;
IcebergColumnTree(IcebergFieldNode rootNode) {
this.rootNode = rootNode;
}

/** Returns data type of the column */
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
package com.snowflake.kafka.connector.internal.streaming.schemaevolution.iceberg;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;

public class IcebergColumnTreeFactory {
private final IcebergColumnTypeMapper mapper;

public IcebergColumnTreeFactory() {
this.mapper = new IcebergColumnTypeMapper();
}

IcebergColumnTree fromIcebergSchema(IcebergColumnSchema columnSchema) {
IcebergFieldNode rootNode =
createNode(columnSchema.getColumnName().toUpperCase(), columnSchema.getSchema());
return new IcebergColumnTree(rootNode);
}

IcebergColumnTree fromJson(IcebergColumnJsonValuePair pair) {
IcebergFieldNode rootNode = createNode(pair.getColumnName().toUpperCase(), pair.getJsonNode());
return new IcebergColumnTree(rootNode);
}

IcebergColumnTree fromConnectSchema(Field kafkaConnectField) {
IcebergFieldNode rootNode =
createNode(kafkaConnectField.name().toUpperCase(), kafkaConnectField.schema());
return new IcebergColumnTree(rootNode);
}

// -- parse tree from Iceberg schema logic --
private IcebergFieldNode createNode(String name, Type apacheIcebergSchema) {
String snowflakeType = mapper.mapToColumnTypeFromIcebergSchema(apacheIcebergSchema);
return new IcebergFieldNode(name, snowflakeType, produceChildren(apacheIcebergSchema));
}

private LinkedHashMap<String, IcebergFieldNode> produceChildren(Type apacheIcebergSchema) {
// primitives must not have children
if (apacheIcebergSchema.isPrimitiveType()) {
return new LinkedHashMap<>();
}
Type.NestedType nestedField = apacheIcebergSchema.asNestedType();
return nestedField.fields().stream()
.collect(
Collectors.toMap(
Types.NestedField::name,
field -> createNode(field.name(), field.type()),
// It's impossible to have two same keys
(v1, v2) -> {
throw new IllegalArgumentException("Two same keys: " + v1);
},
LinkedHashMap::new));
}

// -- parse tree from kafka record payload logic --
private IcebergFieldNode createNode(String name, JsonNode jsonNode) {
String snowflakeType = mapper.mapToColumnTypeFromJson(jsonNode);
return new IcebergFieldNode(name, snowflakeType, produceChildren(jsonNode));
}

private LinkedHashMap<String, IcebergFieldNode> produceChildren(JsonNode recordNode) {
if (recordNode.isNull()) {
return new LinkedHashMap<>();
}
if (recordNode.isArray()) {
ArrayNode arrayNode = (ArrayNode) recordNode;
return produceChildrenFromArray(arrayNode);
}
if (recordNode.isObject()) {
ObjectNode objectNode = (ObjectNode) recordNode;
return produceChildrenFromObject(objectNode);
}
return new LinkedHashMap<>();
}

private LinkedHashMap<String, IcebergFieldNode> produceChildrenFromArray(ArrayNode arrayNode) {
JsonNode arrayElement = arrayNode.get(0);
// VARCHAR is set for an empty array: [] -> ARRAY(VARCHAR)
if (arrayElement == null) {
LinkedHashMap<String, IcebergFieldNode> child = new LinkedHashMap<>();
child.put(
"element", new IcebergFieldNode("element", "VARCHAR(16777216)", new LinkedHashMap<>()));
return child;
}
LinkedHashMap<String, IcebergFieldNode> child = new LinkedHashMap<>();
child.put("element", createNode("element", arrayElement));
return child;
}

private LinkedHashMap<String, IcebergFieldNode> produceChildrenFromObject(ObjectNode objectNode) {
return objectNode.properties().stream()
.collect(
Collectors.toMap(
Map.Entry::getKey,
stringJsonNodeEntry ->
createNode(stringJsonNodeEntry.getKey(), stringJsonNodeEntry.getValue()),
(v1, v2) -> {
throw new IllegalArgumentException("Two same keys: " + v1);
},
LinkedHashMap::new));
}

// -- parse tree from kafka record schema logic --
private IcebergFieldNode createNode(String name, Schema schema) {
String snowflakeType =
mapper.mapToColumnTypeFromKafkaSchema(schema.schema().type(), schema.schema().name());
return new IcebergFieldNode(name, snowflakeType, produceChildren(schema.schema()));
}

private LinkedHashMap<String, IcebergFieldNode> produceChildren(Schema connectSchema) {
if (connectSchema.type() == Schema.Type.STRUCT) {
return produceChildrenFromStruct(connectSchema);
}
if (connectSchema.type() == Schema.Type.MAP) {
return produceChildrenFromMap(connectSchema);
}
if (connectSchema.type() == Schema.Type.ARRAY) {
return produceChildrenForArray(connectSchema);
} else { // isPrimitive == true
return new LinkedHashMap<>();
}
}

private LinkedHashMap<String, IcebergFieldNode> produceChildrenForArray(
Schema connectSchemaForArray) {
LinkedHashMap<String, IcebergFieldNode> child = new LinkedHashMap<>();
child.put("element", createNode("element", connectSchemaForArray.valueSchema()));
return child;
}

private LinkedHashMap<String, IcebergFieldNode> produceChildrenFromStruct(Schema connectSchema) {
return connectSchema.fields().stream()
.collect(
Collectors.toMap(
Field::name,
f -> createNode(f.name(), f.schema()),
(v1, v2) -> {
throw new IllegalArgumentException("Two same keys: " + v1);
},
LinkedHashMap::new));
}

private LinkedHashMap<String, IcebergFieldNode> produceChildrenFromMap(Schema connectSchema) {
LinkedHashMap<String, IcebergFieldNode> keyValue = new LinkedHashMap<>();
// these names will not be used when creating a query
keyValue.put("key", createNode("key", connectSchema.keySchema()));
keyValue.put("value", createNode("value", connectSchema.valueSchema()));
return keyValue;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package com.snowflake.kafka.connector.internal.streaming.schemaevolution.iceberg;

import com.google.common.base.Preconditions;
import com.snowflake.kafka.connector.internal.KCLogger;

public class IcebergColumnTreeMerger {

private final KCLogger LOGGER = new KCLogger(IcebergColumnTreeMerger.class.getName());

/**
* Method designed for unstructured data types. Enhances already existing unstructured columns
* with new subfields.
*/
void merge(IcebergColumnTree currentTree, IcebergColumnTree treeWithNewType) {
validate(currentTree, treeWithNewType);
LOGGER.debug("Attempting to apply changes for column:" + currentTree.getColumnName());

merge(currentTree.getRootNode(), treeWithNewType.getRootNode());
}

/** Method adds new children to a node. It does not change anything else. */
private void merge(IcebergFieldNode currentNode, IcebergFieldNode nodeToMerge) {
nodeToMerge.children.forEach(
(key, node) -> {
IcebergFieldNode currentNodesChild = currentNode.children.get(key);
if (currentNodesChild == null) {
currentNode.children.put(key, node);
} else {
merge(currentNodesChild, node);
}
});
}

private void validate(IcebergColumnTree currentTree, IcebergColumnTree treeWithNewType) {
Preconditions.checkArgument(
currentTree.getColumnName().equals(treeWithNewType.getColumnName()),
"Error merging column schemas. Tried to merge schemas for two different columns");
}
}
Loading

0 comments on commit 665dbb8

Please sign in to comment.