Skip to content

Commit

Permalink
Fix checkstyle, Apply code suggestions
Browse files Browse the repository at this point in the history
  • Loading branch information
Ismail Simsek committed Jan 9, 2025
1 parent e21dbe2 commit da505c0
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectReader;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.util.Locale;
import java.util.Map;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.kafka.common.config.ConfigDef;
Expand Down Expand Up @@ -135,7 +136,8 @@ private String collectRecordDetails(R record) {
"topic %s partition %s offset %s",
sinkRecord.topic(), sinkRecord.kafkaPartition(), sinkRecord.kafkaOffset());
} else {
return String.format("topic %s partition %S", record.topic(), record.kafkaPartition());
return String.format(
Locale.ROOT, "topic %s partition %S", record.topic(), record.kafkaPartition());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,15 +204,22 @@ public static Struct addToStruct(ObjectNode node, Schema schema, Struct struct)
field -> {
JsonNode element = node.get(field.name());
Schema.Type targetType = field.schema().type();
if (targetType == Schema.Type.ARRAY) {
struct.put(
field.name(),
populateArray(
element, field.schema().valueSchema(), field.name(), Lists.newArrayList()));
} else if (targetType == Schema.Type.MAP) {
struct.put(field.name(), populateMap(element, Maps.newHashMap()));
} else {
struct.put(field.name(), extractValue(element, targetType, field.name()));
switch (targetType) {
case ARRAY:
struct.put(
field.name(),
populateArray(
element,
field.schema().valueSchema(),
field.name(),
Lists.newArrayList()));
break;
case MAP:
struct.put(field.name(), populateMap(element, Maps.newHashMap()));
break;
default:
struct.put(field.name(), extractValue(element, targetType, field.name()));
break;
}
});
return struct;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
*/
package org.apache.iceberg.connect.transforms;

import java.util.List;
import java.util.Map;
import java.util.function.Function;
import org.apache.iceberg.relocated.com.google.common.base.Splitter;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
Expand Down Expand Up @@ -61,13 +63,13 @@ private static RecordAppender getExternalFieldAppender(
if (field == null) {
return new NoOpRecordAppender();
}
String[] parts = field.split(",");
if (parts.length != 2) {
List<String> parts = Splitter.on(',').splitToList(field);
if (parts.size() != 2) {
throw new ConfigException(
String.format("Could not parse %s for %s", field, EXTERNAL_KAFKA_METADATA));
}
String fieldName = fieldNamer.apply(parts[0]);
String fieldValue = parts[1];
String fieldName = fieldNamer.apply(parts.get(0));
String fieldValue = parts.get(1);
return new RecordAppender() {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,7 @@ private static ExtractField<SinkRecord> extractorForKeyField(String field) {
return extractField;
}

@SuppressWarnings("unused")
private String kafkaMetadataForException(SinkRecord record) {
return String.format(
"topic: %s, partition: %s, offset: %s",
Expand Down

0 comments on commit da505c0

Please sign in to comment.