diff --git a/java-connectors/kafka-connect-common/src/main/java/io/lenses/streamreactor/common/config/base/KcqlSettings.java b/java-connectors/kafka-connect-common/src/main/java/io/lenses/streamreactor/common/config/base/KcqlSettings.java index f2fe356404..55db05a610 100644 --- a/java-connectors/kafka-connect-common/src/main/java/io/lenses/streamreactor/common/config/base/KcqlSettings.java +++ b/java-connectors/kafka-connect-common/src/main/java/io/lenses/streamreactor/common/config/base/KcqlSettings.java @@ -26,12 +26,12 @@ import io.lenses.streamreactor.common.config.source.ConfigSource; import lombok.val; +@Getter public class KcqlSettings implements ConfigSettings> { - private final String KCQL_DOC = + private static final String KCQL_DOC = "Contains the Kafka Connect Query Language describing data mappings from the source to the target system."; - @Getter private final String kcqlSettingsKey; public KcqlSettings( diff --git a/java-connectors/kafka-connect-common/src/main/java/io/lenses/streamreactor/common/util/ListSplitter.java b/java-connectors/kafka-connect-common/src/main/java/io/lenses/streamreactor/common/util/ListSplitter.java index 4ab60296d3..d367f64de5 100644 --- a/java-connectors/kafka-connect-common/src/main/java/io/lenses/streamreactor/common/util/ListSplitter.java +++ b/java-connectors/kafka-connect-common/src/main/java/io/lenses/streamreactor/common/util/ListSplitter.java @@ -15,13 +15,17 @@ */ package io.lenses.streamreactor.common.util; -import lombok.experimental.UtilityClass; - import java.util.List; import java.util.stream.Collectors; import java.util.stream.IntStream; -@UtilityClass +import lombok.AccessLevel; +import lombok.NoArgsConstructor; + +/** + * Utility class for List splitting. + */ +@NoArgsConstructor(access = AccessLevel.PRIVATE) public class ListSplitter { /** diff --git a/java-connectors/kafka-connect-common/src/main/java/io/lenses/streamreactor/common/util/MapUtils.java b/java-connectors/kafka-connect-common/src/main/java/io/lenses/streamreactor/common/util/MapUtils.java index 39e5b023dd..9ba400be7f 100644 --- a/java-connectors/kafka-connect-common/src/main/java/io/lenses/streamreactor/common/util/MapUtils.java +++ b/java-connectors/kafka-connect-common/src/main/java/io/lenses/streamreactor/common/util/MapUtils.java @@ -15,24 +15,47 @@ */ package io.lenses.streamreactor.common.util; -import lombok.experimental.UtilityClass; - import java.util.Map; -@UtilityClass +import lombok.AccessLevel; +import lombok.NoArgsConstructor; + +/** + * Utility class for map operations. + */ +@NoArgsConstructor(access = AccessLevel.PRIVATE) public class MapUtils { + /** + * Casts a map to a specified key and value type. + * + * @param map the map to cast + * @param targetKeyType the class of the key type + * @param targetValueType the class of the value type + * @param the target key type + * @param the target value type + * @return the casted map + * @throws IllegalArgumentException if the map contains keys or values of incorrect types + */ @SuppressWarnings("unchecked") public static Map castMap(Map map, Class targetKeyType, Class targetValueType) { for (Map.Entry entry : map.entrySet()) { - if (!(isInstance(entry.getKey(), targetKeyType)) || !(isInstance(entry.getValue(), targetValueType))) { + if (!isAssignable(entry.getKey(), targetKeyType) || !isAssignable(entry.getValue(), targetValueType)) { throw new IllegalArgumentException("Map contains invalid key or value type"); } } return (Map) map; } - private static boolean isInstance(Object obj, Class type) { - return obj == null || type.isInstance(obj); + /** + * Checks if an object is assignable to a specified type, allowing for null values. + * + * @param obj the object to check + * @param type the target type + * @param the target type + * @return true if the object is null or assignable to the type, false otherwise + */ + private static boolean isAssignable(Object obj, Class type) { + return obj == null || type.isAssignableFrom(obj.getClass()); } } diff --git a/java-connectors/kafka-connect-common/src/main/java/io/lenses/streamreactor/common/util/TasksSplitter.java b/java-connectors/kafka-connect-common/src/main/java/io/lenses/streamreactor/common/util/TasksSplitter.java index 578ce0ff88..3571df3e96 100644 --- a/java-connectors/kafka-connect-common/src/main/java/io/lenses/streamreactor/common/util/TasksSplitter.java +++ b/java-connectors/kafka-connect-common/src/main/java/io/lenses/streamreactor/common/util/TasksSplitter.java @@ -24,13 +24,14 @@ import java.util.stream.Stream; import io.lenses.streamreactor.common.config.base.KcqlSettings; +import lombok.AccessLevel; +import lombok.NoArgsConstructor; import lombok.val; -import lombok.experimental.UtilityClass; /** * Utility class for splitting tasks based on KCQL statements. */ -@UtilityClass +@NoArgsConstructor(access = AccessLevel.PRIVATE) public class TasksSplitter { /** diff --git a/java-connectors/kafka-connect-common/src/test/java/io/lenses/streamreactor/common/util/MapUtilsTest.java b/java-connectors/kafka-connect-common/src/test/java/io/lenses/streamreactor/common/util/MapUtilsTest.java index 38cf55db49..0389a0508a 100644 --- a/java-connectors/kafka-connect-common/src/test/java/io/lenses/streamreactor/common/util/MapUtilsTest.java +++ b/java-connectors/kafka-connect-common/src/test/java/io/lenses/streamreactor/common/util/MapUtilsTest.java @@ -70,4 +70,68 @@ void testCastMap_NonIntegerKeyOrValue_ThrowsException() { }); } + @Test + void testCastMap_WithNullValues_ShouldHandleGracefully() { + Map rawMap = new HashMap<>(); + rawMap.put("key1", "value1"); + rawMap.put("key2", null); // Null value + + Map typedMap = MapUtils.castMap(rawMap, String.class, String.class); + + assertEquals("value1", typedMap.get("key1")); + assertNull(typedMap.get("key2")); + } + + @Test + void testCastMap_SuperclassTypeCompatibility_ShouldPass() { + Map rawMap = new HashMap<>(); + rawMap.put("key1", "value1"); + rawMap.put("key2", "value2"); + + // Cast to Map since String implements CharSequence + Map typedMap = MapUtils.castMap(rawMap, CharSequence.class, CharSequence.class); + + assertEquals("value1", typedMap.get("key1")); + assertEquals("value2", typedMap.get("key2")); + } + + @Test + void testCastMap_InterfaceTypeCompatibility_ShouldPass() { + Map rawMap = new HashMap<>(); + rawMap.put("key1", "value1"); + rawMap.put("key2", "value2"); + + // Cast to Map since String is an Object + Map typedMap = MapUtils.castMap(rawMap, Object.class, Object.class); + + assertEquals("value1", typedMap.get("key1")); + assertEquals("value2", typedMap.get("key2")); + } + + @Test + void testCastMap_StringToObject_ShouldPass() { + Map rawMap = new HashMap<>(); + rawMap.put("key1", "value1"); + rawMap.put("key2", "value2"); + + // Cast to Map since String is an Object + Map typedMap = MapUtils.castMap(rawMap, Object.class, Object.class); + + assertEquals("value1", typedMap.get("key1")); + assertEquals("value2", typedMap.get("key2")); + } + + @Test + void testCastMap_StringToCharSequence_ShouldPass() { + Map rawMap = new HashMap<>(); + rawMap.put("key1", "value1"); + rawMap.put("key2", "value2"); + + // Cast to Map since String implements CharSequence + Map typedMap = MapUtils.castMap(rawMap, CharSequence.class, CharSequence.class); + + assertEquals("value1", typedMap.get("key1")); + assertEquals("value2", typedMap.get("key2")); + } + } diff --git a/java-connectors/kafka-connect-gcp-common/src/main/java/io/lenses/streamreactor/connect/gcp/common/auth/GCPServiceBuilderConfigurer.java b/java-connectors/kafka-connect-gcp-common/src/main/java/io/lenses/streamreactor/connect/gcp/common/auth/GCPServiceBuilderConfigurer.java index e4732e7f1b..90ff88c449 100644 --- a/java-connectors/kafka-connect-gcp-common/src/main/java/io/lenses/streamreactor/connect/gcp/common/auth/GCPServiceBuilderConfigurer.java +++ b/java-connectors/kafka-connect-gcp-common/src/main/java/io/lenses/streamreactor/connect/gcp/common/auth/GCPServiceBuilderConfigurer.java @@ -24,6 +24,9 @@ import java.io.IOException; import java.util.Optional; import java.util.function.Supplier; + +import lombok.AccessLevel; +import lombok.NoArgsConstructor; import lombok.experimental.UtilityClass; import lombok.val; import org.apache.kafka.common.config.ConfigException; @@ -32,7 +35,7 @@ /** * Utility class for configuring generic GCP service clients using a {@link GCPConnectionConfig}. */ -@UtilityClass +@NoArgsConstructor(access = AccessLevel.PRIVATE) public class GCPServiceBuilderConfigurer { /** @@ -97,7 +100,7 @@ private static RetrySettings createRetrySettings(RetryConfig httpRetryConfig) { .build(); } - private Supplier createConfigException(String message) { + private static Supplier createConfigException(String message) { return () -> new ConfigException(message); } } diff --git a/java-connectors/kafka-connect-gcp-common/src/test/java/io/lenses/streamreactor/connect/gcp/common/auth/mode/TestFileUtil.java b/java-connectors/kafka-connect-gcp-common/src/test/java/io/lenses/streamreactor/connect/gcp/common/auth/mode/TestFileUtil.java index 1272445eaf..a68e141071 100644 --- a/java-connectors/kafka-connect-gcp-common/src/test/java/io/lenses/streamreactor/connect/gcp/common/auth/mode/TestFileUtil.java +++ b/java-connectors/kafka-connect-gcp-common/src/test/java/io/lenses/streamreactor/connect/gcp/common/auth/mode/TestFileUtil.java @@ -17,14 +17,17 @@ import static com.google.common.base.Preconditions.checkNotNull; -import com.google.common.io.ByteStreams; import java.io.File; import java.io.InputStream; import java.net.URL; import java.nio.charset.StandardCharsets; -import lombok.experimental.UtilityClass; -@UtilityClass +import com.google.common.io.ByteStreams; + +import lombok.AccessLevel; +import lombok.NoArgsConstructor; + +@NoArgsConstructor(access = AccessLevel.PRIVATE) public class TestFileUtil { static String streamToString(InputStream inputStream) throws Exception { diff --git a/java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/GCPPubSubSourceTask.java b/java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/GCPPubSubSourceTask.java index e08c25eebd..c1c9a8869b 100644 --- a/java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/GCPPubSubSourceTask.java +++ b/java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/GCPPubSubSourceTask.java @@ -51,9 +51,6 @@ public class GCPPubSubSourceTask extends SourceTask { private SourceRecordConverter converter; - public GCPPubSubSourceTask() { - } - @Override public String version() { return jarManifest.getVersion(); @@ -101,8 +98,9 @@ public void stop() { @Override public void commitRecord(SourceRecord sourceRecord, RecordMetadata metadata) { val sourcePartition = - SourcePartition.fromMap(MapUtils.castMap(sourceRecord.sourcePartition(), String.class, String.class)); - val sourceOffset = SourceOffset.fromMap(MapUtils.castMap(sourceRecord.sourceOffset(), String.class, String.class)); + PubSubSourcePartition.fromMap(MapUtils.castMap(sourceRecord.sourcePartition(), String.class, String.class)); + val sourceOffset = + PubSubSourceOffset.fromMap(MapUtils.castMap(sourceRecord.sourceOffset(), String.class, String.class)); pubSubSubscriberManager.commitRecord(sourcePartition, sourceOffset); } } diff --git a/java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/config/PubSubConfig.java b/java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/config/PubSubConfig.java index 81e8b60849..f20f4a1f88 100644 --- a/java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/config/PubSubConfig.java +++ b/java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/config/PubSubConfig.java @@ -20,7 +20,6 @@ import io.lenses.streamreactor.connect.gcp.common.auth.mode.AuthMode; import io.lenses.streamreactor.connect.gcp.pubsub.source.mapping.MappingConfig; import lombok.AllArgsConstructor; -import lombok.Builder; import lombok.Getter; /** diff --git a/java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/configdef/PubSubKcqlConverter.java b/java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/configdef/PubSubKcqlConverter.java index 47d4b3ac2f..408a2d31d2 100644 --- a/java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/configdef/PubSubKcqlConverter.java +++ b/java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/configdef/PubSubKcqlConverter.java @@ -30,10 +30,10 @@ public class PubSubKcqlConverter extends KcqlConverter { public static final int DEFAULT_BATCH_SIZE = 1000; + // 1 hour public static final long DEFAULT_CACHE_TTL_MILLIS = 3600L * 1000L; public static final int DEFAULT_CACHE_MAX = 10000; - // 1 hour public static final String KCQL_PROP_KEY_BATCH_SIZE = "batch.size"; public static final String KCQL_PROP_KEY_CACHE_TTL = "cache.ttl"; public static final String KCQL_PROP_KEY_QUEUE_MAX = "queue.max"; diff --git a/java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/mapping/MappingConfig.java b/java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/mapping/MappingConfig.java index ddffffa865..4a7f131661 100644 --- a/java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/mapping/MappingConfig.java +++ b/java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/mapping/MappingConfig.java @@ -15,15 +15,15 @@ */ package io.lenses.streamreactor.connect.gcp.pubsub.source.mapping; -import io.lenses.streamreactor.connect.gcp.pubsub.source.mapping.headers.HeaderMapping; -import io.lenses.streamreactor.connect.gcp.pubsub.source.mapping.headers.MinimalAndMessageAttributesHeaderMapping; -import io.lenses.streamreactor.connect.gcp.pubsub.source.mapping.headers.MinimalHeaderMapping; -import io.lenses.streamreactor.connect.gcp.pubsub.source.mapping.key.CompatibilityKeyMapping; -import io.lenses.streamreactor.connect.gcp.pubsub.source.mapping.key.KeyMapping; -import io.lenses.streamreactor.connect.gcp.pubsub.source.mapping.key.MessageIdKeyMapping; -import io.lenses.streamreactor.connect.gcp.pubsub.source.mapping.value.CompatibilityValueMapping; -import io.lenses.streamreactor.connect.gcp.pubsub.source.mapping.value.MessageValueMapping; -import io.lenses.streamreactor.connect.gcp.pubsub.source.mapping.value.ValueMapping; +import io.lenses.streamreactor.connect.gcp.pubsub.source.mapping.headers.HeaderMapper; +import io.lenses.streamreactor.connect.gcp.pubsub.source.mapping.headers.MinimalAndMessageAttributesHeaderMapper; +import io.lenses.streamreactor.connect.gcp.pubsub.source.mapping.headers.MinimalHeaderMapper; +import io.lenses.streamreactor.connect.gcp.pubsub.source.mapping.key.CompatibilityKeyMapper; +import io.lenses.streamreactor.connect.gcp.pubsub.source.mapping.key.KeyMapper; +import io.lenses.streamreactor.connect.gcp.pubsub.source.mapping.key.MessageIdKeyMapper; +import io.lenses.streamreactor.connect.gcp.pubsub.source.mapping.value.CompatibilityValueMapper; +import io.lenses.streamreactor.connect.gcp.pubsub.source.mapping.value.MessageValueMapper; +import io.lenses.streamreactor.connect.gcp.pubsub.source.mapping.value.ValueMapper; import lombok.AllArgsConstructor; import lombok.Getter; @@ -38,11 +38,11 @@ public class MappingConfig { public static final String OUTPUT_MODE_DEFAULT = "DEFAULT"; public static final String OUTPUT_MODE_COMPATIBILITY = "COMPATIBILITY"; - private KeyMapping keyMapper; + private KeyMapper keyMapper; - private ValueMapping valueMapper; + private ValueMapper valueMapper; - private HeaderMapping headerMapper; + private HeaderMapper headerMapper; public static MappingConfig fromOutputMode(String outputMode) { switch (outputMode.toUpperCase()) { @@ -56,16 +56,16 @@ public static MappingConfig fromOutputMode(String outputMode) { public static final MappingConfig DEFAULT_MAPPING_CONFIG = new MappingConfig( - new MessageIdKeyMapping(), - new MessageValueMapping(), - new MinimalAndMessageAttributesHeaderMapping() + new MessageIdKeyMapper(), + new MessageValueMapper(), + new MinimalAndMessageAttributesHeaderMapper() ); public static final MappingConfig COMPATIBILITY_MAPPING_CONFIG = new MappingConfig( - new CompatibilityKeyMapping(), - new CompatibilityValueMapping(), - new MinimalHeaderMapping() + new CompatibilityKeyMapper(), + new CompatibilityValueMapper(), + new MinimalHeaderMapper() ); } diff --git a/java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/mapping/SourceRecordConverter.java b/java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/mapping/SourceRecordConverter.java index 976c892bc4..1fe86b71b2 100644 --- a/java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/mapping/SourceRecordConverter.java +++ b/java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/mapping/SourceRecordConverter.java @@ -46,7 +46,7 @@ protected SourceRecord convert(final PubSubMessageData source) throws ConfigExce } private Object getValue(final PubSubMessageData source) { - return mappingConfig.getValueMapper().getValue(source); + return mappingConfig.getValueMapper().mapValue(source); } private Schema getValueSchema() { @@ -54,7 +54,7 @@ private Schema getValueSchema() { } private Object getKey(final PubSubMessageData source) { - return mappingConfig.getKeyMapper().getKey(source); + return mappingConfig.getKeyMapper().mapKey(source); } private Schema getKeySchema() { diff --git a/java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/mapping/headers/HeaderMapping.java b/java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/mapping/headers/HeaderMapper.java similarity index 90% rename from java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/mapping/headers/HeaderMapping.java rename to java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/mapping/headers/HeaderMapper.java index 511d85df71..5c9617b3cd 100644 --- a/java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/mapping/headers/HeaderMapping.java +++ b/java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/mapping/headers/HeaderMapper.java @@ -23,8 +23,8 @@ * HeaderMapping is an interface for mapping headers in the PubSubMessageData to Kafka Connect headers. * Implementations of this interface should define how this mapping is done. */ -public interface HeaderMapping { +public interface HeaderMapper { - Map getHeaders(final PubSubMessageData source); + Map mapHeaders(final PubSubMessageData source); } diff --git a/java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/mapping/headers/MinimalAndMessageAttributesHeaderMapping.java b/java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/mapping/headers/MinimalAndMessageAttributesHeaderMapper.java similarity index 81% rename from java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/mapping/headers/MinimalAndMessageAttributesHeaderMapping.java rename to java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/mapping/headers/MinimalAndMessageAttributesHeaderMapper.java index db93f4bac4..499acc1677 100644 --- a/java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/mapping/headers/MinimalAndMessageAttributesHeaderMapping.java +++ b/java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/mapping/headers/MinimalAndMessageAttributesHeaderMapper.java @@ -26,13 +26,13 @@ * PubSubMessageData to Kafka Connect headers. * It extends the functionality of MinimalHeaderMapping by adding message attributes to the headers. */ -public class MinimalAndMessageAttributesHeaderMapping implements HeaderMapping { +public class MinimalAndMessageAttributesHeaderMapper implements HeaderMapper { - private final MinimalHeaderMapping minimalHeaderMapping = new MinimalHeaderMapping(); + private final MinimalHeaderMapper minimalHeaderMapping = new MinimalHeaderMapper(); @Override - public Map getHeaders(final PubSubMessageData source) { - val miniMap = minimalHeaderMapping.getHeaders(source); + public Map mapHeaders(final PubSubMessageData source) { + val miniMap = minimalHeaderMapping.mapHeaders(source); val headMap = source.getMessage().getAttributesMap(); return ImmutableMap.builder() .putAll(miniMap) diff --git a/java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/mapping/headers/MinimalHeaderMapping.java b/java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/mapping/headers/MinimalHeaderMapper.java similarity index 90% rename from java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/mapping/headers/MinimalHeaderMapping.java rename to java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/mapping/headers/MinimalHeaderMapper.java index f2115f2272..a9018714d8 100644 --- a/java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/mapping/headers/MinimalHeaderMapping.java +++ b/java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/mapping/headers/MinimalHeaderMapper.java @@ -23,10 +23,10 @@ * MinimalHeaderMapping is responsible for mapping minimal headers from PubSubMessageData to Kafka Connect headers. * The minimal headers include only the most essential information from the PubSubMessageData. */ -public class MinimalHeaderMapping implements HeaderMapping { +public class MinimalHeaderMapper implements HeaderMapper { @Override - public Map getHeaders(final PubSubMessageData source) { + public Map mapHeaders(final PubSubMessageData source) { return Map.of( "PublishTimestamp", String.valueOf(source.getMessage().getPublishTime().getSeconds()) ); diff --git a/java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/mapping/key/CompatibilityKeyMapping.java b/java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/mapping/key/CompatibilityKeyMapper.java similarity index 95% rename from java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/mapping/key/CompatibilityKeyMapping.java rename to java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/mapping/key/CompatibilityKeyMapper.java index 3469031866..572b9d79ca 100644 --- a/java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/mapping/key/CompatibilityKeyMapping.java +++ b/java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/mapping/key/CompatibilityKeyMapper.java @@ -25,7 +25,7 @@ * The key is composed of the project id, topic id, subscription id and message id. The aim is to maintain compatibility * with another connector in the market. */ -public class CompatibilityKeyMapping implements KeyMapping { +public class CompatibilityKeyMapper implements KeyMapper { private static final String STRUCT_KEY_PROJECT_ID = "ProjectId"; private static final String STRUCT_KEY_TOPIC_ID = "TopicId"; @@ -42,7 +42,7 @@ public class CompatibilityKeyMapping implements KeyMapping { .build(); @Override - public Object getKey(final PubSubMessageData source) { + public Object mapKey(final PubSubMessageData source) { return new Struct(SCHEMA) .put(STRUCT_KEY_PROJECT_ID, source.getSourcePartition().getProjectId()) .put(STRUCT_KEY_TOPIC_ID, source.getSourcePartition().getTopicId()) diff --git a/java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/mapping/key/KeyMapping.java b/java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/mapping/key/KeyMapper.java similarity index 92% rename from java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/mapping/key/KeyMapping.java rename to java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/mapping/key/KeyMapper.java index 3f4deb0f45..1a3d8d7896 100644 --- a/java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/mapping/key/KeyMapping.java +++ b/java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/mapping/key/KeyMapper.java @@ -21,9 +21,9 @@ /** * KeyMapping is responsible for mapping the key from PubSubMessageData to Kafka Connect key. */ -public interface KeyMapping { +public interface KeyMapper { - Object getKey(PubSubMessageData source); + Object mapKey(PubSubMessageData source); Schema getSchema(); } diff --git a/java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/mapping/key/MessageIdKeyMapping.java b/java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/mapping/key/MessageIdKeyMapper.java similarity index 90% rename from java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/mapping/key/MessageIdKeyMapping.java rename to java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/mapping/key/MessageIdKeyMapper.java index 2a9a2ebd93..5f3f1ab075 100644 --- a/java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/mapping/key/MessageIdKeyMapping.java +++ b/java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/mapping/key/MessageIdKeyMapper.java @@ -21,10 +21,10 @@ /** * MessageIdKeyMapping is responsible for mapping the key from PubSubMessageData to Kafka Connect key. */ -public class MessageIdKeyMapping implements KeyMapping { +public class MessageIdKeyMapper implements KeyMapper { @Override - public Object getKey(final PubSubMessageData source) { + public Object mapKey(final PubSubMessageData source) { return source.getSourceOffset().getMessageId(); } diff --git a/java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/mapping/value/CompatibilityValueMapping.java b/java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/mapping/value/CompatibilityValueMapper.java similarity index 94% rename from java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/mapping/value/CompatibilityValueMapping.java rename to java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/mapping/value/CompatibilityValueMapper.java index 5fcae6449b..3c0d94464d 100644 --- a/java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/mapping/value/CompatibilityValueMapping.java +++ b/java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/mapping/value/CompatibilityValueMapper.java @@ -30,7 +30,7 @@ * CompatibilityValueMapping is responsible for mapping the value from PubSubMessageData to Kafka Connect value. This is * for a mode that aims to enable compatibility with another connector in the market. */ -public class CompatibilityValueMapping implements ValueMapping { +public class CompatibilityValueMapper implements ValueMapper { private static final String STRUCT_KEY_MESSAGE_DATA = "MessageData"; private static final String STRUCT_KEY_ATTRIBUTE_MAP = "AttributeMap"; @@ -44,7 +44,7 @@ public class CompatibilityValueMapping implements ValueMapping { private final Gson gson = new Gson(); @Override - public Object getValue(final PubSubMessageData source) { + public Object mapValue(final PubSubMessageData source) { return new Struct(SCHEMA) .put(STRUCT_KEY_MESSAGE_DATA, new String(source.getMessage().getData().toByteArray())) diff --git a/java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/mapping/value/MessageValueMapping.java b/java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/mapping/value/MessageValueMapper.java similarity index 90% rename from java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/mapping/value/MessageValueMapping.java rename to java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/mapping/value/MessageValueMapper.java index a51e710d23..72db18512a 100644 --- a/java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/mapping/value/MessageValueMapping.java +++ b/java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/mapping/value/MessageValueMapper.java @@ -21,10 +21,10 @@ /** * MessageValueMapping is responsible for mapping the value from PubSubMessageData to Kafka Connect value. */ -public class MessageValueMapping implements ValueMapping { +public class MessageValueMapper implements ValueMapper { @Override - public Object getValue(final PubSubMessageData source) { + public Object mapValue(final PubSubMessageData source) { return source.getMessage().getData().toByteArray(); } diff --git a/java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/mapping/value/ValueMapping.java b/java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/mapping/value/ValueMapper.java similarity index 91% rename from java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/mapping/value/ValueMapping.java rename to java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/mapping/value/ValueMapper.java index 7c23827717..9b91dc988b 100644 --- a/java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/mapping/value/ValueMapping.java +++ b/java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/mapping/value/ValueMapper.java @@ -21,9 +21,9 @@ /** * ValueMapping is responsible for mapping the value from PubSubMessageData to Kafka Connect value. */ -public interface ValueMapping { +public interface ValueMapper { - Object getValue(final PubSubMessageData source); + Object mapValue(final PubSubMessageData source); Schema getSchema(); } diff --git a/java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/subscriber/LooselyBoundedQueue.java b/java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/subscriber/LooselyBoundedQueue.java index 9b855798e1..b6e7dfbc6a 100644 --- a/java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/subscriber/LooselyBoundedQueue.java +++ b/java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/subscriber/LooselyBoundedQueue.java @@ -15,104 +15,67 @@ */ package io.lenses.streamreactor.connect.gcp.pubsub.source.subscriber; -import lombok.extern.slf4j.Slf4j; -import lombok.val; - import java.util.Collection; -import java.util.Iterator; -import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; + +import lombok.val; +import lombok.extern.slf4j.Slf4j; /** - * A LooselyBoundedQueue is a wrapper around a standard {@link Queue} implementation - * that introduces a maximum size limit. However, unlike a strictly bounded queue, - * it does not fail the {@link #add(Object)} or {@link #addAll(Collection)} operations - * when this limit is exceeded. Instead, it logs a debug message when the queue size - * exceeds the specified maximum size. + * LooselyBoundedQueue is an extension of {@link ConcurrentLinkedQueue} that introduces a maximum size limit. + * Unlike a strictly bounded queue, it does not fail the {@link #add(Object)} or {@link #addAll(Collection)} operations + * when this limit is exceeded. Instead, it logs a debug message when the queue size exceeds the specified maximum size. * - *

This class is designed to be used in multi-threaded environments where it is - * more important to avoid the failure of add operations than to strictly enforce - * a queue size limit. In such scenarios, failing the connector due to queue overflow - * is undesirable as it can lead to disruptions in the data processing pipeline. - * Hence, this loosely bounded approach allows for some flexibility by permitting - * the queue size to exceed the maximum limit while still providing an indication - * (through logging) that the limit has been breached.

+ *

This class is designed to be used in multi-threaded environments where it is more important to avoid the failure + * of add operations than to strictly enforce a queue size limit. In such scenarios, failing the connector due to queue + * overflow is undesirable as it can lead to disruptions in the data processing pipeline. Hence, this loosely bounded + * approach allows for some flexibility by permitting the queue size to exceed the maximum limit while still providing + * an indication (through logging) that the limit has been breached.

* * @param the type of elements held in this queue */ @Slf4j -public class LooselyBoundedQueue implements Queue { +public class LooselyBoundedQueue extends ConcurrentLinkedQueue { - private final Queue queue; private final int maxSize; /** - * Constructs a new LooselyBoundedQueue with the specified backing queue and maximum size. + * Constructs a new LooselyBoundedQueue with the specified maximum size. * - * @param queue the backing queue * @param maxSize the maximum size of the queue */ - public LooselyBoundedQueue(Queue queue, int maxSize) { - this.queue = queue; + public LooselyBoundedQueue(int maxSize) { this.maxSize = maxSize; } - @Override - public int size() { - return queue.size(); - } - - @Override - public boolean isEmpty() { - return queue.isEmpty(); - } - - @Override - public boolean contains(Object o) { - return queue.contains(o); - } - - @Override - public Iterator iterator() { - return queue.iterator(); - } - - @Override - public Object[] toArray() { - return queue.toArray(); - } - - @Override - public T[] toArray(T[] a) { - return queue.toArray(a); + /** + * Checks if there is space available in the queue for the specified number of elements. + * + * @param count the number of elements proposed to be added to the queue + * @return true if there is spare capacity for these elements, false otherwise + */ + public boolean hasSpareCapacity(int count) { + val newSize = getNewSize(count); + return (newSize <= maxSize); } @Override public boolean add(X x) { logIfNoSpareCapacity(1); - return queue.add(x); + return super.add(x); } @Override public boolean addAll(Collection c) { logIfNoSpareCapacity(c.size()); - return queue.addAll(c); - } - - @Override - public boolean remove(Object o) { - return queue.remove(o); - } - - @Override - public boolean containsAll(Collection c) { - return queue.containsAll(c); - } - - public boolean hasSpareCapacity(int count) { - val newSize = getNewSize(count); - return (newSize <= maxSize); + return super.addAll(c); } + /** + * Logs a debug message if adding the specified number of elements would exceed the queue's maximum size. + * + * @param numberOfElements the number of elements to be added + */ private void logIfNoSpareCapacity(int numberOfElements) { final var newSize = getNewSize(numberOfElements); if (newSize > maxSize) { @@ -120,48 +83,14 @@ private void logIfNoSpareCapacity(int numberOfElements) { } } + /** + * Calculates the new size of the queue if the specified number of elements were added. + * + * @param numberOfElements the number of elements to be added + * @return the new size of the queue + */ private int getNewSize(int numberOfElements) { - return queue.size() + numberOfElements; - } - - @Override - public boolean removeAll(Collection c) { - return queue.removeAll(c); - } - - @Override - public boolean retainAll(Collection c) { - return queue.retainAll(c); - } - - @Override - public void clear() { - queue.clear(); - } - - @Override - public boolean offer(X x) { - return queue.offer(x); - } - - @Override - public X remove() { - return queue.remove(); - } - - @Override - public X poll() { - return queue.poll(); - } - - @Override - public X element() { - return queue.element(); - } - - @Override - public X peek() { - return queue.peek(); + return super.size() + numberOfElements; } } diff --git a/java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/subscriber/PubSubMessageData.java b/java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/subscriber/PubSubMessageData.java index a077d8fc73..90629ed1a4 100644 --- a/java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/subscriber/PubSubMessageData.java +++ b/java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/subscriber/PubSubMessageData.java @@ -28,9 +28,9 @@ @Getter public class PubSubMessageData { - private SourcePartition sourcePartition; + private PubSubSourcePartition sourcePartition; - private SourceOffset sourceOffset; + private PubSubSourceOffset sourceOffset; private PubsubMessage message; diff --git a/java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/subscriber/SourceOffset.java b/java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/subscriber/PubSubSourceOffset.java similarity index 86% rename from java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/subscriber/SourceOffset.java rename to java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/subscriber/PubSubSourceOffset.java index d28662cca4..8173a4f0d8 100644 --- a/java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/subscriber/SourceOffset.java +++ b/java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/subscriber/PubSubSourceOffset.java @@ -28,13 +28,13 @@ @AllArgsConstructor @Getter @ToString -public class SourceOffset { +public class PubSubSourceOffset { private static final String KEY_MESSAGE_ID = "message.id"; private String messageId; - public static SourceOffset fromMap(Map sourceLocation) { - return new SourceOffset(sourceLocation.get(KEY_MESSAGE_ID)); + public static PubSubSourceOffset fromMap(Map sourceLocation) { + return new PubSubSourceOffset(sourceLocation.get(KEY_MESSAGE_ID)); } public Map toMap() { diff --git a/java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/subscriber/SourcePartition.java b/java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/subscriber/PubSubSourcePartition.java similarity index 91% rename from java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/subscriber/SourcePartition.java rename to java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/subscriber/PubSubSourcePartition.java index ef3e1337f6..7885c1c287 100644 --- a/java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/subscriber/SourcePartition.java +++ b/java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/subscriber/PubSubSourcePartition.java @@ -28,7 +28,7 @@ @Getter @AllArgsConstructor @ToString -public class SourcePartition { +public class PubSubSourcePartition { private static final String KEY_PROJECT_ID = "project.id"; private static final String KEY_TOPIC_ID = "topic.id"; @@ -38,8 +38,8 @@ public class SourcePartition { private String topicId; private String subscriptionId; - public static SourcePartition fromMap(Map sourceLocation) { - return new SourcePartition( + public static PubSubSourcePartition fromMap(Map sourceLocation) { + return new PubSubSourcePartition( sourceLocation.get(KEY_PROJECT_ID), sourceLocation.get(KEY_TOPIC_ID), sourceLocation.get(KEY_SUBSCRIPTION_ID) diff --git a/java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/subscriber/PubSubSubscriber.java b/java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/subscriber/PubSubSubscriber.java index 382ac5bfa3..e9ab31eedd 100644 --- a/java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/subscriber/PubSubSubscriber.java +++ b/java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/subscriber/PubSubSubscriber.java @@ -18,7 +18,6 @@ import java.util.List; import java.util.Objects; import java.util.Optional; -import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -42,7 +41,7 @@ public class PubSubSubscriber { private final LooselyBoundedQueue messageQueue; - private final SourcePartition sourcePartition; + private final PubSubSourcePartition sourcePartition; private final Cache ackCache; @@ -60,7 +59,7 @@ public PubSubSubscriber( log.info("Starting PubSubSubscriber for subscription {}", subscription.getSubscriptionId()); targetTopicName = subscription.getTargetKafkaTopic(); batchSize = subscription.getBatchSize(); - messageQueue = new LooselyBoundedQueue<>(new ConcurrentLinkedQueue<>(), subscription.getQueueMaxEntries()); + messageQueue = new LooselyBoundedQueue<>(subscription.getQueueMaxEntries()); ackCache = Caffeine .newBuilder() @@ -71,7 +70,7 @@ public PubSubSubscriber( gcpSubscriber = pubSubService.createSubscriber(subscription.getSubscriptionId(), receiver); sourcePartition = - new SourcePartition( + new PubSubSourcePartition( projectId, subscription.getSourceTopicId(), subscription.getSubscriptionId() @@ -103,7 +102,7 @@ public List getMessages() { .filter(Objects::nonNull) .map(psm -> new PubSubMessageData( sourcePartition, - new SourceOffset(psm.getMessageId()), + new PubSubSourceOffset(psm.getMessageId()), psm, targetTopicName )) diff --git a/java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/subscriber/PubSubSubscriberManager.java b/java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/subscriber/PubSubSubscriberManager.java index b385d3bc26..e0abdaeee1 100644 --- a/java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/subscriber/PubSubSubscriberManager.java +++ b/java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/subscriber/PubSubSubscriberManager.java @@ -63,8 +63,8 @@ public List poll() { } public void commitRecord( - SourcePartition sourcePartition, - SourceOffset sourceOffset + PubSubSourcePartition sourcePartition, + PubSubSourceOffset sourceOffset ) { log.trace("Committing record for partition {} with offset {}", sourcePartition, sourceOffset); subscribers diff --git a/java-connectors/kafka-connect-gcp-pubsub/src/test/java/io/lenses/streamreactor/connect/gcp/pubsub/source/configdef/PubSubConfigSettingsTest.java b/java-connectors/kafka-connect-gcp-pubsub/src/test/java/io/lenses/streamreactor/connect/gcp/pubsub/source/configdef/PubSubConfigSettingsTest.java index a0f11d9850..54bc67080e 100644 --- a/java-connectors/kafka-connect-gcp-pubsub/src/test/java/io/lenses/streamreactor/connect/gcp/pubsub/source/configdef/PubSubConfigSettingsTest.java +++ b/java-connectors/kafka-connect-gcp-pubsub/src/test/java/io/lenses/streamreactor/connect/gcp/pubsub/source/configdef/PubSubConfigSettingsTest.java @@ -44,7 +44,8 @@ void configDefShouldContainKcqlSettings() { val kcqlConfigSetting = PUB_SUB_CONFIG_SETTINGS.getConfigDef().configKeys().get("connect.pubsub.kcql"); assertEquals(ConfigDef.Type.STRING, kcqlConfigSetting.type); assertEquals(ConfigDef.Importance.HIGH, kcqlConfigSetting.importance); - assertEquals("Contains the Kafka Connect Query Language describing the flow from the source to the target system.", + assertEquals( + "Contains the Kafka Connect Query Language describing data mappings from the source to the target system.", kcqlConfigSetting.documentation); } diff --git a/java-connectors/kafka-connect-gcp-pubsub/src/test/java/io/lenses/streamreactor/connect/gcp/pubsub/source/mapping/SourceRecordConverterTest.java b/java-connectors/kafka-connect-gcp-pubsub/src/test/java/io/lenses/streamreactor/connect/gcp/pubsub/source/mapping/SourceRecordConverterTest.java index 1eee97cc7c..adb4d53f6b 100644 --- a/java-connectors/kafka-connect-gcp-pubsub/src/test/java/io/lenses/streamreactor/connect/gcp/pubsub/source/mapping/SourceRecordConverterTest.java +++ b/java-connectors/kafka-connect-gcp-pubsub/src/test/java/io/lenses/streamreactor/connect/gcp/pubsub/source/mapping/SourceRecordConverterTest.java @@ -29,8 +29,8 @@ import com.google.pubsub.v1.PubsubMessage; import io.lenses.streamreactor.connect.gcp.pubsub.source.subscriber.PubSubMessageData; -import io.lenses.streamreactor.connect.gcp.pubsub.source.subscriber.SourceOffset; -import io.lenses.streamreactor.connect.gcp.pubsub.source.subscriber.SourcePartition; +import io.lenses.streamreactor.connect.gcp.pubsub.source.subscriber.PubSubSourceOffset; +import io.lenses.streamreactor.connect.gcp.pubsub.source.subscriber.PubSubSourcePartition; import lombok.val; class SourceRecordConverterTest { @@ -61,8 +61,8 @@ void shouldConvertDataForCompatibilityMode() throws UnsupportedEncodingException private static PubSubMessageData setUpDataForCompatibilityMode() throws UnsupportedEncodingException { return new PubSubMessageData( - new SourcePartition("projectId1", "topicId1", "subscriptionId1"), - new SourceOffset("messageId1"), + new PubSubSourcePartition("projectId1", "topicId1", "subscriptionId1"), + new PubSubSourceOffset("messageId1"), PubsubMessage.newBuilder() .setMessageId("messageId") .setData( diff --git a/java-connectors/kafka-connect-gcp-pubsub/src/test/java/io/lenses/streamreactor/connect/gcp/pubsub/source/mapping/headers/MinimalAndMessageAttributesHeaderMappingTest.java b/java-connectors/kafka-connect-gcp-pubsub/src/test/java/io/lenses/streamreactor/connect/gcp/pubsub/source/mapping/headers/MinimalAndMessageAttributesHeaderMapperTest.java similarity index 91% rename from java-connectors/kafka-connect-gcp-pubsub/src/test/java/io/lenses/streamreactor/connect/gcp/pubsub/source/mapping/headers/MinimalAndMessageAttributesHeaderMappingTest.java rename to java-connectors/kafka-connect-gcp-pubsub/src/test/java/io/lenses/streamreactor/connect/gcp/pubsub/source/mapping/headers/MinimalAndMessageAttributesHeaderMapperTest.java index ffbb2db371..67a13b1403 100644 --- a/java-connectors/kafka-connect-gcp-pubsub/src/test/java/io/lenses/streamreactor/connect/gcp/pubsub/source/mapping/headers/MinimalAndMessageAttributesHeaderMappingTest.java +++ b/java-connectors/kafka-connect-gcp-pubsub/src/test/java/io/lenses/streamreactor/connect/gcp/pubsub/source/mapping/headers/MinimalAndMessageAttributesHeaderMapperTest.java @@ -34,7 +34,7 @@ import io.lenses.streamreactor.connect.gcp.pubsub.source.subscriber.PubSubMessageData; @ExtendWith(MockitoExtension.class) -class MinimalAndMessageAttributesHeaderMappingTest { +class MinimalAndMessageAttributesHeaderMapperTest { private static final String PUBLISH_TIME = "1955-11-12T10:04:00Z"; private static final Instant PUBLISH_TIME_INSTANT = Instant.parse(PUBLISH_TIME); @@ -47,11 +47,11 @@ class MinimalAndMessageAttributesHeaderMappingTest { @Mock private PubsubMessage pubsubMessage; - private MinimalAndMessageAttributesHeaderMapping minimalAndMessageAttributesHeaderMapping; + private MinimalAndMessageAttributesHeaderMapper minimalAndMessageAttributesHeaderMapping; @BeforeEach void setup() { - minimalAndMessageAttributesHeaderMapping = new MinimalAndMessageAttributesHeaderMapping(); + minimalAndMessageAttributesHeaderMapping = new MinimalAndMessageAttributesHeaderMapper(); } @Test @@ -61,7 +61,7 @@ void testGetHeaders() { when(pubsubMessage.getAttributesMap()).thenReturn(HEADERS_MAP); when(pubSubMessageData.getMessage()).thenReturn(pubsubMessage); - Map result = minimalAndMessageAttributesHeaderMapping.getHeaders(pubSubMessageData); + Map result = minimalAndMessageAttributesHeaderMapping.mapHeaders(pubSubMessageData); assertEquals( ImmutableMap.builder() diff --git a/java-connectors/kafka-connect-gcp-pubsub/src/test/java/io/lenses/streamreactor/connect/gcp/pubsub/source/mapping/headers/MinimalHeaderMappingTest.java b/java-connectors/kafka-connect-gcp-pubsub/src/test/java/io/lenses/streamreactor/connect/gcp/pubsub/source/mapping/headers/MinimalHeaderMapperTest.java similarity index 90% rename from java-connectors/kafka-connect-gcp-pubsub/src/test/java/io/lenses/streamreactor/connect/gcp/pubsub/source/mapping/headers/MinimalHeaderMappingTest.java rename to java-connectors/kafka-connect-gcp-pubsub/src/test/java/io/lenses/streamreactor/connect/gcp/pubsub/source/mapping/headers/MinimalHeaderMapperTest.java index 1d246620e8..e79fa50f28 100644 --- a/java-connectors/kafka-connect-gcp-pubsub/src/test/java/io/lenses/streamreactor/connect/gcp/pubsub/source/mapping/headers/MinimalHeaderMappingTest.java +++ b/java-connectors/kafka-connect-gcp-pubsub/src/test/java/io/lenses/streamreactor/connect/gcp/pubsub/source/mapping/headers/MinimalHeaderMapperTest.java @@ -34,7 +34,7 @@ import io.lenses.streamreactor.connect.gcp.pubsub.source.subscriber.PubSubMessageData; @ExtendWith(MockitoExtension.class) -class MinimalHeaderMappingTest { +class MinimalHeaderMapperTest { private static final String PUBLISH_TIME = "1955-11-12T10:04:00Z"; private static final Instant PUBLISH_TIME_INSTANT = Instant.parse(PUBLISH_TIME); @@ -45,11 +45,11 @@ class MinimalHeaderMappingTest { @Mock private PubsubMessage pubsubMessage; - private MinimalHeaderMapping minimalHeaderMapping; + private MinimalHeaderMapper minimalHeaderMapping; @BeforeEach void setup() { - minimalHeaderMapping = new MinimalHeaderMapping(); + minimalHeaderMapping = new MinimalHeaderMapper(); } @Test @@ -58,7 +58,7 @@ void testGetHeaders() { .getEpochSecond()).build()); when(pubSubMessageData.getMessage()).thenReturn(pubsubMessage); - Map result = minimalHeaderMapping.getHeaders(pubSubMessageData); + Map result = minimalHeaderMapping.mapHeaders(pubSubMessageData); assertEquals(ImmutableMap.builder().put("PublishTimestamp", String.valueOf(PUBLISH_TIME_INSTANT.getEpochSecond())) .build(), result); diff --git a/java-connectors/kafka-connect-gcp-pubsub/src/test/java/io/lenses/streamreactor/connect/gcp/pubsub/source/mapping/value/MessageValueMappingTest.java b/java-connectors/kafka-connect-gcp-pubsub/src/test/java/io/lenses/streamreactor/connect/gcp/pubsub/source/mapping/value/MessageValueMapperTest.java similarity index 84% rename from java-connectors/kafka-connect-gcp-pubsub/src/test/java/io/lenses/streamreactor/connect/gcp/pubsub/source/mapping/value/MessageValueMappingTest.java rename to java-connectors/kafka-connect-gcp-pubsub/src/test/java/io/lenses/streamreactor/connect/gcp/pubsub/source/mapping/value/MessageValueMapperTest.java index 5c98289e68..de6b2d35a4 100644 --- a/java-connectors/kafka-connect-gcp-pubsub/src/test/java/io/lenses/streamreactor/connect/gcp/pubsub/source/mapping/value/MessageValueMappingTest.java +++ b/java-connectors/kafka-connect-gcp-pubsub/src/test/java/io/lenses/streamreactor/connect/gcp/pubsub/source/mapping/value/MessageValueMapperTest.java @@ -25,19 +25,19 @@ import com.google.pubsub.v1.PubsubMessage; import io.lenses.streamreactor.connect.gcp.pubsub.source.subscriber.PubSubMessageData; -import io.lenses.streamreactor.connect.gcp.pubsub.source.subscriber.SourceOffset; -import io.lenses.streamreactor.connect.gcp.pubsub.source.subscriber.SourcePartition; +import io.lenses.streamreactor.connect.gcp.pubsub.source.subscriber.PubSubSourceOffset; +import io.lenses.streamreactor.connect.gcp.pubsub.source.subscriber.PubSubSourcePartition; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; @ExtendWith(MockitoExtension.class) -class MessageValueMappingTest { +class MessageValueMapperTest { @Mock - SourcePartition sourcePartition; + PubSubSourcePartition sourcePartition; @Mock - SourceOffset sourceOffset; + PubSubSourceOffset sourceOffset; @Test void testGetValue() { @@ -54,15 +54,15 @@ void testGetValue() { "notRelevantToThisTest" ); - MessageValueMapping messageValueMapping = new MessageValueMapping(); - byte[] result = (byte[]) messageValueMapping.getValue(pubSubMessageData); + MessageValueMapper messageValueMapping = new MessageValueMapper(); + byte[] result = (byte[]) messageValueMapping.mapValue(pubSubMessageData); assertArrayEquals(testMessageData.getBytes(), result); } @Test void testGetSchema() { - MessageValueMapping messageValueMapping = new MessageValueMapping(); + MessageValueMapper messageValueMapping = new MessageValueMapper(); Schema result = messageValueMapping.getSchema(); assertEquals(Schema.BYTES_SCHEMA, result); diff --git a/java-connectors/kafka-connect-gcp-pubsub/src/test/java/io/lenses/streamreactor/connect/gcp/pubsub/source/subscriber/SourceOffsetTest.java b/java-connectors/kafka-connect-gcp-pubsub/src/test/java/io/lenses/streamreactor/connect/gcp/pubsub/source/subscriber/PubSubSourceOffsetTest.java similarity index 85% rename from java-connectors/kafka-connect-gcp-pubsub/src/test/java/io/lenses/streamreactor/connect/gcp/pubsub/source/subscriber/SourceOffsetTest.java rename to java-connectors/kafka-connect-gcp-pubsub/src/test/java/io/lenses/streamreactor/connect/gcp/pubsub/source/subscriber/PubSubSourceOffsetTest.java index eb435283d7..96870023de 100644 --- a/java-connectors/kafka-connect-gcp-pubsub/src/test/java/io/lenses/streamreactor/connect/gcp/pubsub/source/subscriber/SourceOffsetTest.java +++ b/java-connectors/kafka-connect-gcp-pubsub/src/test/java/io/lenses/streamreactor/connect/gcp/pubsub/source/subscriber/PubSubSourceOffsetTest.java @@ -21,19 +21,19 @@ import org.junit.jupiter.api.Test; -class SourceOffsetTest { +class PubSubSourceOffsetTest { @Test void testFromMap() { Map sourceLocation = Map.of("message.id", "messageId1"); - SourceOffset sourceOffset = SourceOffset.fromMap(sourceLocation); + PubSubSourceOffset sourceOffset = PubSubSourceOffset.fromMap(sourceLocation); assertEquals("messageId1", sourceOffset.getMessageId()); } @Test void testToMap() { - SourceOffset sourceOffset = new SourceOffset("messageId1"); + PubSubSourceOffset sourceOffset = new PubSubSourceOffset("messageId1"); Map result = sourceOffset.toMap(); assertEquals(Map.of("message.id", "messageId1"), result); diff --git a/java-connectors/kafka-connect-gcp-pubsub/src/test/java/io/lenses/streamreactor/connect/gcp/pubsub/source/subscriber/SourcePartitionTest.java b/java-connectors/kafka-connect-gcp-pubsub/src/test/java/io/lenses/streamreactor/connect/gcp/pubsub/source/subscriber/PubSubSourcePartitionTest.java similarity index 85% rename from java-connectors/kafka-connect-gcp-pubsub/src/test/java/io/lenses/streamreactor/connect/gcp/pubsub/source/subscriber/SourcePartitionTest.java rename to java-connectors/kafka-connect-gcp-pubsub/src/test/java/io/lenses/streamreactor/connect/gcp/pubsub/source/subscriber/PubSubSourcePartitionTest.java index b035861dfc..6717c1bc6f 100644 --- a/java-connectors/kafka-connect-gcp-pubsub/src/test/java/io/lenses/streamreactor/connect/gcp/pubsub/source/subscriber/SourcePartitionTest.java +++ b/java-connectors/kafka-connect-gcp-pubsub/src/test/java/io/lenses/streamreactor/connect/gcp/pubsub/source/subscriber/PubSubSourcePartitionTest.java @@ -21,7 +21,7 @@ import org.junit.jupiter.api.Test; -class SourcePartitionTest { +class PubSubSourcePartitionTest { @Test void testFromMap() { @@ -31,7 +31,7 @@ void testFromMap() { "topic.id", "topicId1", "subscription.id", "subscriptionId1" ); - SourcePartition sourcePartition = SourcePartition.fromMap(sourceLocation); + PubSubSourcePartition sourcePartition = PubSubSourcePartition.fromMap(sourceLocation); assertEquals("projectId1", sourcePartition.getProjectId()); assertEquals("topicId1", sourcePartition.getTopicId()); @@ -40,7 +40,7 @@ void testFromMap() { @Test void testToMap() { - SourcePartition sourcePartition = new SourcePartition("projectId1", "topicId1", "subscriptionId1"); + PubSubSourcePartition sourcePartition = new PubSubSourcePartition("projectId1", "topicId1", "subscriptionId1"); Map result = sourcePartition.toMap(); assertEquals(Map.of( diff --git a/java-connectors/kafka-connect-gcp-pubsub/src/test/java/io/lenses/streamreactor/connect/gcp/pubsub/source/subscriber/PubSubSubscriberManagerTest.java b/java-connectors/kafka-connect-gcp-pubsub/src/test/java/io/lenses/streamreactor/connect/gcp/pubsub/source/subscriber/PubSubSubscriberManagerTest.java index 427887a417..f967faf068 100644 --- a/java-connectors/kafka-connect-gcp-pubsub/src/test/java/io/lenses/streamreactor/connect/gcp/pubsub/source/subscriber/PubSubSubscriberManagerTest.java +++ b/java-connectors/kafka-connect-gcp-pubsub/src/test/java/io/lenses/streamreactor/connect/gcp/pubsub/source/subscriber/PubSubSubscriberManagerTest.java @@ -135,12 +135,12 @@ private PubSubSubscription mockSubscriberCreator(PubSubSubscriber pubSubSubscrib return subscription; } - private SourcePartition sourcePartition(String id) { - return new SourcePartition(PROJECT_ID, TOPIC_ID, SUBSCRIPTION_ID_PREFIX + id); + private PubSubSourcePartition sourcePartition(String id) { + return new PubSubSourcePartition(PROJECT_ID, TOPIC_ID, SUBSCRIPTION_ID_PREFIX + id); } - private SourceOffset sourceOffset(String id) { - return new SourceOffset(MESSAGE_ID_PREFIX + id); + private PubSubSourceOffset sourceOffset(String id) { + return new PubSubSourceOffset(MESSAGE_ID_PREFIX + id); } }