From 2dce5162bf744d23be5bb32a01b4e0544041c657 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bart=C5=82omiej=20Z=C4=85bek?= Date: Tue, 3 Dec 2024 10:35:38 +0100 Subject: [PATCH] SNOW-1842220 Add comments to columns created by schema evolution --- .../kafka/connector/internal/DescribeTableRow.java | 13 +++++++++++++ .../internal/SnowflakeConnectionServiceV1.java | 3 ++- .../schemaevolution/iceberg/IcebergColumnTree.java | 11 +++++++++++ .../iceberg/IcebergColumnTreeFactory.java | 2 +- .../iceberg/IcebergSchemaEvolutionService.java | 3 ++- .../iceberg/IcebergIngestionSchemaEvolutionIT.java | 11 +++++++++++ .../connector/streaming/iceberg/TestJsons.java | 12 ++++++++++++ 7 files changed, 52 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/snowflake/kafka/connector/internal/DescribeTableRow.java b/src/main/java/com/snowflake/kafka/connector/internal/DescribeTableRow.java index 5a385819e..40db5623e 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/DescribeTableRow.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/DescribeTableRow.java @@ -7,9 +7,18 @@ public class DescribeTableRow { private final String column; private final String type; + private final String comment; + + public DescribeTableRow(String column, String type, String comment) { + this.column = column; + this.type = type; + this.comment = comment; + } + public DescribeTableRow(String column, String type) { this.column = column; this.type = type; + this.comment = null; } public String getColumn() { @@ -20,6 +29,10 @@ public String getType() { return type; } + public String getComment() { + return comment; + } + @Override public boolean equals(Object o) { if (this == o) return true; diff --git a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceV1.java b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceV1.java index 51171e928..3cb5bd7c2 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceV1.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceV1.java @@ -1179,7 +1179,8 @@ public Optional> describeTable(String tableName) { while (result.next()) { String columnName = result.getString("name"); String type = result.getString("type"); - rows.add(new DescribeTableRow(columnName, type)); + String comment = result.getString("comment"); + rows.add(new DescribeTableRow(columnName, type, comment)); } return Optional.of(rows); } catch (Exception e) { diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTree.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTree.java index 03eb2d36b..d6e9a3c50 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTree.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTree.java @@ -4,6 +4,7 @@ class IcebergColumnTree { private final IcebergFieldNode rootNode; + private final String comment; String getColumnName() { return rootNode.name; @@ -13,7 +14,17 @@ IcebergFieldNode getRootNode() { return rootNode; } + String getComment() { + return comment; + } + + public IcebergColumnTree(IcebergFieldNode rootNode, String comment) { + this.rootNode = rootNode; + this.comment = comment; + } + IcebergColumnTree(IcebergFieldNode rootNode) { this.rootNode = rootNode; + this.comment = null; } } diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTreeFactory.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTreeFactory.java index 4cd2a72bc..17e3d4cc8 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTreeFactory.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTreeFactory.java @@ -44,7 +44,7 @@ IcebergColumnTree fromConnectSchema(Field kafkaConnectField) { + kafkaConnectField.name()); IcebergFieldNode rootNode = createNode(kafkaConnectField.name().toUpperCase(), kafkaConnectField.schema()); - return new IcebergColumnTree(rootNode); + return new IcebergColumnTree(rootNode, kafkaConnectField.schema().doc()); } // -- parse tree from Iceberg schema logic -- diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergSchemaEvolutionService.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergSchemaEvolutionService.java index 99aa4fe1d..d2a59dfba 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergSchemaEvolutionService.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergSchemaEvolutionService.java @@ -170,7 +170,8 @@ private Map toColumnInfos(List columnTre .map( columnTree -> Maps.immutableEntry( - columnTree.getColumnName(), new ColumnInfos(typeBuilder.buildType(columnTree)))) + columnTree.getColumnName(), + new ColumnInfos(typeBuilder.buildType(columnTree), columnTree.getComment()))) .collect( Collectors.toMap( Map.Entry::getKey, Map.Entry::getValue, (oldValue, newValue) -> newValue)); diff --git a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionSchemaEvolutionIT.java b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionSchemaEvolutionIT.java index 338bc4130..5305b6545 100644 --- a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionSchemaEvolutionIT.java +++ b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionSchemaEvolutionIT.java @@ -483,4 +483,15 @@ private static Stream testEvolutionOfComplexTypes_dataSource() { twoObjectsExtendedWithMapAndArrayPayload(), false)); } + + @Test + // @Disabled + void shouldAppendedCommentTest() throws Exception { + // when + insertWithRetry(schemaAndPayloadWithComment(), 0, true); + waitForOffset(1); + // then + List columns = describeTable(tableName); + assertEquals("Test comment", columns.get(1).getComment()); + } } diff --git a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/TestJsons.java b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/TestJsons.java index 449e1187e..dd2a539ea 100644 --- a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/TestJsons.java +++ b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/TestJsons.java @@ -476,6 +476,15 @@ static String twoObjectsExtendedWithMapAndArrayPayload() { + " }"; } + static String schemaAndPayloadWithComment() { + return SCHEMA_BEGINNING + + COMMENTED_SCHEMA + + SCHEMA_END + + "\"payload\": {" + + STRING_PAYLOAD + + "}}"; + } + static String BOOL_SCHEMA = " { \"field\" : \"test_boolean\", \"type\" : \"boolean\"} "; static String INT64_SCHEMA = "{ \"field\" : \"test_int64\", \"type\" : \"int64\" }"; @@ -489,6 +498,9 @@ static String twoObjectsExtendedWithMapAndArrayPayload() { static String STRING_SCHEMA = "{ \"field\" : \"test_string\", \"type\" : \"string\" }"; + static String COMMENTED_SCHEMA = + "{ \"field\" : \"test_string\", \"type\" : \"string\", \"doc\": \"Test comment\" }"; + static final String BOOL_PAYLOAD = "\"test_boolean\" : true "; static final String INT64_PAYLOAD = "\"test_int64\" : 2137324241343241 "; static final String INT32_PAYLOAD = "\"test_int32\" : 2137 ";