Skip to content

Commit

Permalink
Review amendments
Browse files Browse the repository at this point in the history
  • Loading branch information
davidsloan committed May 21, 2024
1 parent e8e0858 commit f488266
Show file tree
Hide file tree
Showing 35 changed files with 244 additions and 220 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@
import io.lenses.streamreactor.common.config.source.ConfigSource;
import lombok.val;

@Getter
public class KcqlSettings implements ConfigSettings<List<Kcql>> {

private final String KCQL_DOC =
private static final String KCQL_DOC =
"Contains the Kafka Connect Query Language describing data mappings from the source to the target system.";

@Getter
private final String kcqlSettingsKey;

public KcqlSettings(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,17 @@
*/
package io.lenses.streamreactor.common.util;

import lombok.experimental.UtilityClass;

import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

@UtilityClass
import lombok.AccessLevel;
import lombok.NoArgsConstructor;

/**
* Utility class for List splitting.
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class ListSplitter {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,47 @@
*/
package io.lenses.streamreactor.common.util;

import lombok.experimental.UtilityClass;

import java.util.Map;

@UtilityClass
import lombok.AccessLevel;
import lombok.NoArgsConstructor;

/**
* Utility class for map operations.
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class MapUtils {

/**
* Casts a map to a specified key and value type.
*
* @param map the map to cast
* @param targetKeyType the class of the key type
* @param targetValueType the class of the value type
* @param <K> the target key type
* @param <V> the target value type
* @return the casted map
* @throws IllegalArgumentException if the map contains keys or values of incorrect types
*/
@SuppressWarnings("unchecked")
public static <K, V> Map<K, V> castMap(Map<?, ?> map, Class<K> targetKeyType, Class<V> targetValueType) {
for (Map.Entry<?, ?> entry : map.entrySet()) {
if (!(isInstance(entry.getKey(), targetKeyType)) || !(isInstance(entry.getValue(), targetValueType))) {
if (!isAssignable(entry.getKey(), targetKeyType) || !isAssignable(entry.getValue(), targetValueType)) {
throw new IllegalArgumentException("Map contains invalid key or value type");
}
}
return (Map<K, V>) map;
}

private static <T> boolean isInstance(Object obj, Class<T> type) {
return obj == null || type.isInstance(obj);
/**
* Checks if an object is assignable to a specified type, allowing for null values.
*
* @param obj the object to check
* @param type the target type
* @param <T> the target type
* @return true if the object is null or assignable to the type, false otherwise
*/
private static <T> boolean isAssignable(Object obj, Class<T> type) {
return obj == null || type.isAssignableFrom(obj.getClass());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,14 @@
import java.util.stream.Stream;

import io.lenses.streamreactor.common.config.base.KcqlSettings;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import lombok.val;
import lombok.experimental.UtilityClass;

/**
* Utility class for splitting tasks based on KCQL statements.
*/
@UtilityClass
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class TasksSplitter {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,4 +70,68 @@ void testCastMap_NonIntegerKeyOrValue_ThrowsException() {
});
}

@Test
void testCastMap_WithNullValues_ShouldHandleGracefully() {
Map<Object, Object> rawMap = new HashMap<>();
rawMap.put("key1", "value1");
rawMap.put("key2", null); // Null value

Map<String, String> typedMap = MapUtils.castMap(rawMap, String.class, String.class);

assertEquals("value1", typedMap.get("key1"));
assertNull(typedMap.get("key2"));
}

@Test
void testCastMap_SuperclassTypeCompatibility_ShouldPass() {
Map<Object, Object> rawMap = new HashMap<>();
rawMap.put("key1", "value1");
rawMap.put("key2", "value2");

// Cast to Map<CharSequence, CharSequence> since String implements CharSequence
Map<CharSequence, CharSequence> typedMap = MapUtils.castMap(rawMap, CharSequence.class, CharSequence.class);

assertEquals("value1", typedMap.get("key1"));
assertEquals("value2", typedMap.get("key2"));
}

@Test
void testCastMap_InterfaceTypeCompatibility_ShouldPass() {
Map<Object, Object> rawMap = new HashMap<>();
rawMap.put("key1", "value1");
rawMap.put("key2", "value2");

// Cast to Map<Object, Object> since String is an Object
Map<Object, Object> typedMap = MapUtils.castMap(rawMap, Object.class, Object.class);

assertEquals("value1", typedMap.get("key1"));
assertEquals("value2", typedMap.get("key2"));
}

@Test
void testCastMap_StringToObject_ShouldPass() {
Map<Object, Object> rawMap = new HashMap<>();
rawMap.put("key1", "value1");
rawMap.put("key2", "value2");

// Cast to Map<Object, Object> since String is an Object
Map<Object, Object> typedMap = MapUtils.castMap(rawMap, Object.class, Object.class);

assertEquals("value1", typedMap.get("key1"));
assertEquals("value2", typedMap.get("key2"));
}

@Test
void testCastMap_StringToCharSequence_ShouldPass() {
Map<Object, Object> rawMap = new HashMap<>();
rawMap.put("key1", "value1");
rawMap.put("key2", "value2");

// Cast to Map<CharSequence, CharSequence> since String implements CharSequence
Map<CharSequence, CharSequence> typedMap = MapUtils.castMap(rawMap, CharSequence.class, CharSequence.class);

assertEquals("value1", typedMap.get("key1"));
assertEquals("value2", typedMap.get("key2"));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
import java.io.IOException;
import java.util.Optional;
import java.util.function.Supplier;

import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import lombok.experimental.UtilityClass;
import lombok.val;
import org.apache.kafka.common.config.ConfigException;
Expand All @@ -32,7 +35,7 @@
/**
* Utility class for configuring generic GCP service clients using a {@link GCPConnectionConfig}.
*/
@UtilityClass
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class GCPServiceBuilderConfigurer {

/**
Expand Down Expand Up @@ -97,7 +100,7 @@ private static RetrySettings createRetrySettings(RetryConfig httpRetryConfig) {
.build();
}

private Supplier<ConfigException> createConfigException(String message) {
private static Supplier<ConfigException> createConfigException(String message) {
return () -> new ConfigException(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,17 @@

import static com.google.common.base.Preconditions.checkNotNull;

import com.google.common.io.ByteStreams;
import java.io.File;
import java.io.InputStream;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import lombok.experimental.UtilityClass;

@UtilityClass
import com.google.common.io.ByteStreams;

import lombok.AccessLevel;
import lombok.NoArgsConstructor;

@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class TestFileUtil {

static String streamToString(InputStream inputStream) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,6 @@ public class GCPPubSubSourceTask extends SourceTask {

private SourceRecordConverter converter;

public GCPPubSubSourceTask() {
}

@Override
public String version() {
return jarManifest.getVersion();
Expand Down Expand Up @@ -101,8 +98,9 @@ public void stop() {
@Override
public void commitRecord(SourceRecord sourceRecord, RecordMetadata metadata) {
val sourcePartition =
SourcePartition.fromMap(MapUtils.castMap(sourceRecord.sourcePartition(), String.class, String.class));
val sourceOffset = SourceOffset.fromMap(MapUtils.castMap(sourceRecord.sourceOffset(), String.class, String.class));
PubSubSourcePartition.fromMap(MapUtils.castMap(sourceRecord.sourcePartition(), String.class, String.class));
val sourceOffset =
PubSubSourceOffset.fromMap(MapUtils.castMap(sourceRecord.sourceOffset(), String.class, String.class));
pubSubSubscriberManager.commitRecord(sourcePartition, sourceOffset);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import io.lenses.streamreactor.connect.gcp.common.auth.mode.AuthMode;
import io.lenses.streamreactor.connect.gcp.pubsub.source.mapping.MappingConfig;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@
public class PubSubKcqlConverter extends KcqlConverter<PubSubSubscription> {

public static final int DEFAULT_BATCH_SIZE = 1000;
// 1 hour
public static final long DEFAULT_CACHE_TTL_MILLIS = 3600L * 1000L;
public static final int DEFAULT_CACHE_MAX = 10000;

// 1 hour
public static final String KCQL_PROP_KEY_BATCH_SIZE = "batch.size";
public static final String KCQL_PROP_KEY_CACHE_TTL = "cache.ttl";
public static final String KCQL_PROP_KEY_QUEUE_MAX = "queue.max";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@
*/
package io.lenses.streamreactor.connect.gcp.pubsub.source.mapping;

import io.lenses.streamreactor.connect.gcp.pubsub.source.mapping.headers.HeaderMapping;
import io.lenses.streamreactor.connect.gcp.pubsub.source.mapping.headers.MinimalAndMessageAttributesHeaderMapping;
import io.lenses.streamreactor.connect.gcp.pubsub.source.mapping.headers.MinimalHeaderMapping;
import io.lenses.streamreactor.connect.gcp.pubsub.source.mapping.key.CompatibilityKeyMapping;
import io.lenses.streamreactor.connect.gcp.pubsub.source.mapping.key.KeyMapping;
import io.lenses.streamreactor.connect.gcp.pubsub.source.mapping.key.MessageIdKeyMapping;
import io.lenses.streamreactor.connect.gcp.pubsub.source.mapping.value.CompatibilityValueMapping;
import io.lenses.streamreactor.connect.gcp.pubsub.source.mapping.value.MessageValueMapping;
import io.lenses.streamreactor.connect.gcp.pubsub.source.mapping.value.ValueMapping;
import io.lenses.streamreactor.connect.gcp.pubsub.source.mapping.headers.HeaderMapper;
import io.lenses.streamreactor.connect.gcp.pubsub.source.mapping.headers.MinimalAndMessageAttributesHeaderMapper;
import io.lenses.streamreactor.connect.gcp.pubsub.source.mapping.headers.MinimalHeaderMapper;
import io.lenses.streamreactor.connect.gcp.pubsub.source.mapping.key.CompatibilityKeyMapper;
import io.lenses.streamreactor.connect.gcp.pubsub.source.mapping.key.KeyMapper;
import io.lenses.streamreactor.connect.gcp.pubsub.source.mapping.key.MessageIdKeyMapper;
import io.lenses.streamreactor.connect.gcp.pubsub.source.mapping.value.CompatibilityValueMapper;
import io.lenses.streamreactor.connect.gcp.pubsub.source.mapping.value.MessageValueMapper;
import io.lenses.streamreactor.connect.gcp.pubsub.source.mapping.value.ValueMapper;
import lombok.AllArgsConstructor;
import lombok.Getter;

Expand All @@ -38,11 +38,11 @@ public class MappingConfig {

public static final String OUTPUT_MODE_DEFAULT = "DEFAULT";
public static final String OUTPUT_MODE_COMPATIBILITY = "COMPATIBILITY";
private KeyMapping keyMapper;
private KeyMapper keyMapper;

private ValueMapping valueMapper;
private ValueMapper valueMapper;

private HeaderMapping headerMapper;
private HeaderMapper headerMapper;

public static MappingConfig fromOutputMode(String outputMode) {
switch (outputMode.toUpperCase()) {
Expand All @@ -56,16 +56,16 @@ public static MappingConfig fromOutputMode(String outputMode) {

public static final MappingConfig DEFAULT_MAPPING_CONFIG =
new MappingConfig(
new MessageIdKeyMapping(),
new MessageValueMapping(),
new MinimalAndMessageAttributesHeaderMapping()
new MessageIdKeyMapper(),
new MessageValueMapper(),
new MinimalAndMessageAttributesHeaderMapper()
);

public static final MappingConfig COMPATIBILITY_MAPPING_CONFIG =
new MappingConfig(
new CompatibilityKeyMapping(),
new CompatibilityValueMapping(),
new MinimalHeaderMapping()
new CompatibilityKeyMapper(),
new CompatibilityValueMapper(),
new MinimalHeaderMapper()
);

}
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,15 @@ protected SourceRecord convert(final PubSubMessageData source) throws ConfigExce
}

private Object getValue(final PubSubMessageData source) {
return mappingConfig.getValueMapper().getValue(source);
return mappingConfig.getValueMapper().mapValue(source);
}

private Schema getValueSchema() {
return mappingConfig.getValueMapper().getSchema();
}

private Object getKey(final PubSubMessageData source) {
return mappingConfig.getKeyMapper().getKey(source);
return mappingConfig.getKeyMapper().mapKey(source);
}

private Schema getKeySchema() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
* HeaderMapping is an interface for mapping headers in the PubSubMessageData to Kafka Connect headers.
* Implementations of this interface should define how this mapping is done.
*/
public interface HeaderMapping {
public interface HeaderMapper {

Map<String, String> getHeaders(final PubSubMessageData source);
Map<String, String> mapHeaders(final PubSubMessageData source);

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@
* PubSubMessageData to Kafka Connect headers.
* It extends the functionality of MinimalHeaderMapping by adding message attributes to the headers.
*/
public class MinimalAndMessageAttributesHeaderMapping implements HeaderMapping {
public class MinimalAndMessageAttributesHeaderMapper implements HeaderMapper {

private final MinimalHeaderMapping minimalHeaderMapping = new MinimalHeaderMapping();
private final MinimalHeaderMapper minimalHeaderMapping = new MinimalHeaderMapper();

@Override
public Map<String, String> getHeaders(final PubSubMessageData source) {
val miniMap = minimalHeaderMapping.getHeaders(source);
public Map<String, String> mapHeaders(final PubSubMessageData source) {
val miniMap = minimalHeaderMapping.mapHeaders(source);
val headMap = source.getMessage().getAttributesMap();
return ImmutableMap.<String, String>builder()
.putAll(miniMap)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@
* MinimalHeaderMapping is responsible for mapping minimal headers from PubSubMessageData to Kafka Connect headers.
* The minimal headers include only the most essential information from the PubSubMessageData.
*/
public class MinimalHeaderMapping implements HeaderMapping {
public class MinimalHeaderMapper implements HeaderMapper {

@Override
public Map<String, String> getHeaders(final PubSubMessageData source) {
public Map<String, String> mapHeaders(final PubSubMessageData source) {
return Map.of(
"PublishTimestamp", String.valueOf(source.getMessage().getPublishTime().getSeconds())
);
Expand Down
Loading

0 comments on commit f488266

Please sign in to comment.