From da505c0ce2640d27515d5af18f1574c31eb11b94 Mon Sep 17 00:00:00 2001 From: Ismail Simsek Date: Thu, 9 Jan 2025 12:55:31 +0100 Subject: [PATCH] Fix checkstyle, Apply code suggestions --- .../transforms/JsonToMapTransform.java | 4 ++- .../connect/transforms/JsonToMapUtils.java | 25 ++++++++++++------- .../transforms/KafkaMetadataTransform.java | 10 +++++--- .../transforms/MongoDebeziumTransform.java | 1 + 4 files changed, 26 insertions(+), 14 deletions(-) diff --git a/kafka-connect/kafka-connect-transforms/src/main/java/org/apache/iceberg/connect/transforms/JsonToMapTransform.java b/kafka-connect/kafka-connect-transforms/src/main/java/org/apache/iceberg/connect/transforms/JsonToMapTransform.java index f0d30909288b..08a1b4274256 100644 --- a/kafka-connect/kafka-connect-transforms/src/main/java/org/apache/iceberg/connect/transforms/JsonToMapTransform.java +++ b/kafka-connect/kafka-connect-transforms/src/main/java/org/apache/iceberg/connect/transforms/JsonToMapTransform.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectReader; import com.fasterxml.jackson.databind.node.ObjectNode; +import java.util.Locale; import java.util.Map; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.kafka.common.config.ConfigDef; @@ -135,7 +136,8 @@ private String collectRecordDetails(R record) { "topic %s partition %s offset %s", sinkRecord.topic(), sinkRecord.kafkaPartition(), sinkRecord.kafkaOffset()); } else { - return String.format("topic %s partition %S", record.topic(), record.kafkaPartition()); + return String.format( + Locale.ROOT, "topic %s partition %S", record.topic(), record.kafkaPartition()); } } diff --git a/kafka-connect/kafka-connect-transforms/src/main/java/org/apache/iceberg/connect/transforms/JsonToMapUtils.java b/kafka-connect/kafka-connect-transforms/src/main/java/org/apache/iceberg/connect/transforms/JsonToMapUtils.java index f44f99a398e2..40e040d74ff9 100644 --- a/kafka-connect/kafka-connect-transforms/src/main/java/org/apache/iceberg/connect/transforms/JsonToMapUtils.java +++ b/kafka-connect/kafka-connect-transforms/src/main/java/org/apache/iceberg/connect/transforms/JsonToMapUtils.java @@ -204,15 +204,22 @@ public static Struct addToStruct(ObjectNode node, Schema schema, Struct struct) field -> { JsonNode element = node.get(field.name()); Schema.Type targetType = field.schema().type(); - if (targetType == Schema.Type.ARRAY) { - struct.put( - field.name(), - populateArray( - element, field.schema().valueSchema(), field.name(), Lists.newArrayList())); - } else if (targetType == Schema.Type.MAP) { - struct.put(field.name(), populateMap(element, Maps.newHashMap())); - } else { - struct.put(field.name(), extractValue(element, targetType, field.name())); + switch (targetType) { + case ARRAY: + struct.put( + field.name(), + populateArray( + element, + field.schema().valueSchema(), + field.name(), + Lists.newArrayList())); + break; + case MAP: + struct.put(field.name(), populateMap(element, Maps.newHashMap())); + break; + default: + struct.put(field.name(), extractValue(element, targetType, field.name())); + break; } }); return struct; diff --git a/kafka-connect/kafka-connect-transforms/src/main/java/org/apache/iceberg/connect/transforms/KafkaMetadataTransform.java b/kafka-connect/kafka-connect-transforms/src/main/java/org/apache/iceberg/connect/transforms/KafkaMetadataTransform.java index 81e8c6d24eac..b4c99d9791b3 100644 --- a/kafka-connect/kafka-connect-transforms/src/main/java/org/apache/iceberg/connect/transforms/KafkaMetadataTransform.java +++ b/kafka-connect/kafka-connect-transforms/src/main/java/org/apache/iceberg/connect/transforms/KafkaMetadataTransform.java @@ -18,8 +18,10 @@ */ package org.apache.iceberg.connect.transforms; +import java.util.List; import java.util.Map; import java.util.function.Function; +import org.apache.iceberg.relocated.com.google.common.base.Splitter; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigException; @@ -61,13 +63,13 @@ private static RecordAppender getExternalFieldAppender( if (field == null) { return new NoOpRecordAppender(); } - String[] parts = field.split(","); - if (parts.length != 2) { + List parts = Splitter.on(',').splitToList(field); + if (parts.size() != 2) { throw new ConfigException( String.format("Could not parse %s for %s", field, EXTERNAL_KAFKA_METADATA)); } - String fieldName = fieldNamer.apply(parts[0]); - String fieldValue = parts[1]; + String fieldName = fieldNamer.apply(parts.get(0)); + String fieldValue = parts.get(1); return new RecordAppender() { @Override diff --git a/kafka-connect/kafka-connect-transforms/src/main/java/org/apache/iceberg/connect/transforms/MongoDebeziumTransform.java b/kafka-connect/kafka-connect-transforms/src/main/java/org/apache/iceberg/connect/transforms/MongoDebeziumTransform.java index b57261590e00..d08482d45eb2 100644 --- a/kafka-connect/kafka-connect-transforms/src/main/java/org/apache/iceberg/connect/transforms/MongoDebeziumTransform.java +++ b/kafka-connect/kafka-connect-transforms/src/main/java/org/apache/iceberg/connect/transforms/MongoDebeziumTransform.java @@ -311,6 +311,7 @@ private static ExtractField extractorForKeyField(String field) { return extractField; } + @SuppressWarnings("unused") private String kafkaMetadataForException(SinkRecord record) { return String.format( "topic: %s, partition: %s, offset: %s",