Skip to content

Commit

Permalink
GCP Pub/Sub Source Connector (#1224)
Browse files Browse the repository at this point in the history
* GCP Pub/Sub Source Connector

* Reducing verbose logging

* Changes following code review

* Validate kcql on start

* Bounce messages when queue full.

* Providing unix epoch seconds and date

* Applying formats

* Revert double date

* Update java-connectors/kafka-connect-common/src/main/java/io/lenses/streamreactor/common/config/base/KcqlSettings.java

Co-authored-by: Mati Urban <[email protected]>
Signed-off-by: David Sloan <[email protected]>

* Review amendments

* Use Map.of wherever possible

---------

Signed-off-by: David Sloan <[email protected]>
Co-authored-by: Mati Urban <[email protected]>
  • Loading branch information
davidsloan and GoMati-MU authored May 22, 2024
1 parent b38f3a9 commit 2c565bd
Show file tree
Hide file tree
Showing 81 changed files with 3,493 additions and 94 deletions.
3 changes: 2 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,11 @@ lazy val `query-language` = (project in file("java-connectors/kafka-connect-quer
),
)
.configureAssembly(true)
.configureTests(baseTestDeps)
.configureTests(baseTestDeps ++ javaCommonTestDeps)
.configureAntlr()

lazy val `java-common` = (project in file("java-connectors/kafka-connect-common"))
.dependsOn(`query-language`)
.settings(
settings ++
Seq(
Expand Down
4 changes: 3 additions & 1 deletion java-connectors/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ allprojects {
jUnitVersion = '5.9.1'
mockitoJupiterVersion = '5.10.0'
apacheToConfluentVersionAxis = ["2.8.1": "6.2.2", "3.3.0": "7.3.1"]
caffeineVersion = '3.1.8'

//Other Manifest Info
mainClassName = ''
Expand Down Expand Up @@ -62,7 +63,8 @@ allprojects {

//tests
testImplementation group: 'org.mockito', name: 'mockito-core', version: mockitoJupiterVersion
testImplementation group: 'org.junit.jupiter', name: 'junit-jupiter', version: mockitoJupiterVersion
testImplementation group: 'org.mockito', name: 'mockito-junit-jupiter', version: mockitoJupiterVersion
testImplementation group: 'org.junit.jupiter', name: 'junit-jupiter', version: jUnitVersion
testImplementation group: 'org.assertj', name: 'assertj-core', version: '3.25.3'

}
Expand Down
3 changes: 3 additions & 0 deletions java-connectors/kafka-connect-common/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ project(":kafka-connect-common") {
}

dependencies {

implementation project(":kafka-connect-query-language")

//apache kafka
api group: 'org.apache.kafka', name: 'connect-json', version: kafkaVersion
api group: 'org.apache.kafka', name: 'kafka-clients', version: kafkaVersion
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright 2017-2024 Lenses.io Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.lenses.streamreactor.common.config.base;

import java.util.List;

import lombok.Getter;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;

import io.lenses.kcql.Kcql;
import io.lenses.streamreactor.common.config.base.model.ConnectorPrefix;
import io.lenses.streamreactor.common.config.source.ConfigSource;
import lombok.val;

@Getter
public class KcqlSettings implements ConfigSettings<List<Kcql>> {

private static final String KCQL_DOC =
"Contains the Kafka Connect Query Language describing data mappings from the source to the target system.";

private final String kcqlSettingsKey;

public KcqlSettings(
ConnectorPrefix connectorPrefix) {
kcqlSettingsKey = connectorPrefix.prefixKey("kcql");
}

@Override
public ConfigDef withSettings(ConfigDef configDef) {
return configDef.define(
kcqlSettingsKey,
ConfigDef.Type.STRING,
ConfigDef.Importance.HIGH,
KCQL_DOC
);
}

@Override
public List<Kcql> parseFromConfig(ConfigSource configSource) {
return Kcql.parseMultiple(getKCQLString(configSource));
}

private String getKCQLString(ConfigSource configSource) {
val raw = configSource.getString(kcqlSettingsKey);
return raw.orElseThrow(() -> new ConfigException(String.format("Missing [%s]", kcqlSettingsKey)));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright 2017-2024 Lenses.io Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.lenses.streamreactor.common.config.base.intf;

import org.apache.kafka.common.config.ConfigException;

import java.util.List;
import java.util.stream.Collectors;

/**
* Picks out the settings required from KCQL
*/
public abstract class Converter<S, T> {

public List<T> convertAll(List<S> source) throws ConfigException {
return source.stream().map(this::convert).collect(Collectors.toList());
}

protected abstract T convert(S source) throws ConfigException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright 2017-2024 Lenses.io Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.lenses.streamreactor.common.config.base.intf;

import io.lenses.kcql.Kcql;
import org.apache.kafka.common.config.ConfigException;

import java.util.List;
import java.util.stream.Collectors;

/**
* Picks out the settings required from KCQL
*/
public abstract class KcqlConverter<T> extends Converter<Kcql, T> {

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
*/
package io.lenses.streamreactor.common.config.source;

import java.util.Map;
import java.util.Optional;
import lombok.AllArgsConstructor;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.types.Password;

/**
Expand All @@ -27,6 +29,10 @@
@AllArgsConstructor
public class ConfigWrapperSource implements ConfigSource {

public static ConfigWrapperSource fromConfigDef(ConfigDef configDef, Map<String, String> props) {
return new ConfigWrapperSource(new AbstractConfig(configDef, props));
}

private final AbstractConfig abstractConfig;

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright 2017-2024 Lenses.io Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.lenses.streamreactor.common.util;

import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import lombok.AccessLevel;
import lombok.NoArgsConstructor;

/**
* Utility class for List splitting.
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class ListSplitter {

/**
* Splits the given list into {@code maxN} sublists of roughly equal size.
* If the list cannot be divided evenly, the remaining elements are distributed
* among the sublists so that the size difference between any two sublists is at most 1.
*
* @param list the list to be split
* @param maxN the number of sublists to create
* @param <T> the type of elements in the list
* @return a list of sublists, where each sublist contains a portion of the original list
* @throws IllegalArgumentException if {@code maxN} is less than or equal to 0
*/
public static <T> List<List<T>> splitList(List<T> list, int maxN) {
if (maxN <= 0) {
throw new IllegalArgumentException("Number of parts must be greater than zero.");
}

int totalSize = list.size();
int partSize = totalSize / maxN;
int remainder = totalSize % maxN;

return IntStream.range(0, maxN)
.mapToObj(i -> {
int start = i * partSize + Math.min(i, remainder);
int end = start + partSize + (i < remainder ? 1 : 0);
return list.subList(start, Math.min(end, totalSize));
})
.filter(sublist -> !sublist.isEmpty())
.collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright 2017-2024 Lenses.io Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.lenses.streamreactor.common.util;

import java.util.Map;

import lombok.AccessLevel;
import lombok.NoArgsConstructor;

/**
* Utility class for map operations.
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class MapUtils {

/**
* Casts a map to a specified key and value type.
*
* @param map the map to cast
* @param targetKeyType the class of the key type
* @param targetValueType the class of the value type
* @param <K> the target key type
* @param <V> the target value type
* @return the casted map
* @throws IllegalArgumentException if the map contains keys or values of incorrect types
*/
@SuppressWarnings("unchecked")
public static <K, V> Map<K, V> castMap(Map<?, ?> map, Class<K> targetKeyType, Class<V> targetValueType) {
map.forEach((key, value) -> {
if (!isAssignable(key, targetKeyType) || !isAssignable(value, targetValueType)) {
throw new IllegalArgumentException("Map contains invalid key or value type");
}
});
return (Map<K, V>) map;
}

/**
* Checks if an object is assignable to a specified type, allowing for null values.
*
* @param obj the object to check
* @param type the target type
* @param <T> the target type
* @return true if the object is null or assignable to the type, false otherwise
*/
private static <T> boolean isAssignable(Object obj, Class<T> type) {
return obj == null || type.isAssignableFrom(obj.getClass());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright 2017-2024 Lenses.io Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.lenses.streamreactor.common.util;

import static io.lenses.kcql.Kcql.KCQL_MULTI_STATEMENT_SEPARATOR;

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import io.lenses.streamreactor.common.config.base.KcqlSettings;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import lombok.val;

/**
* Utility class for splitting tasks based on KCQL statements.
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class TasksSplitter {

/**
* Splits tasks based on the KCQL statements provided in the properties map.
* Each resulting map will contain the original properties and a subset of the KCQL statements.
*
* @param maxTasks the maximum number of tasks to split into
* @param props the original properties map containing KCQL settings
* @param kcqlSettings the KCQL settings object that provides the key for KCQL settings in the properties map
* @return a list of maps, each containing the original properties and a subset of the KCQL statements
*/
public static List<Map<String, String>> splitByKcqlStatements(int maxTasks, Map<String, String> props,
KcqlSettings kcqlSettings) {
val kcqlSettingsKey = kcqlSettings.getKcqlSettingsKey();
val kcqls =
Arrays
.stream(props.get(kcqlSettingsKey).split(KCQL_MULTI_STATEMENT_SEPARATOR))
.collect(Collectors.toList());

return ListSplitter
.splitList(kcqls, maxTasks)
.stream()
.map(kcqlsForTask -> Stream.concat(
props.entrySet().stream(),
Stream.of(Map.entry(kcqlSettingsKey, String.join(";", kcqlsForTask)))
).collect(Collectors.toUnmodifiableMap(
Map.Entry::getKey,
Map.Entry::getValue,
(existing, replacement) -> replacement
)))
.collect(Collectors.toUnmodifiableList());
}

}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>

<!-- -->
<!-- Copyright 2021 Celonis. -->
<!-- Copyright 2017-2024 Lenses.io Ltd -->
<!-- -->
<!-- Licensed under the Apache License, Version 2.0 (the "License"); -->
<!-- you may not use this file except in compliance with the License. -->
Expand Down
Loading

0 comments on commit 2c565bd

Please sign in to comment.