diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/source/input/utils/FilePatternUtils.java b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/utils/FilePatternUtils.java index 778f7778..12b36040 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/source/input/utils/FilePatternUtils.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/utils/FilePatternUtils.java @@ -25,8 +25,17 @@ import io.aiven.kafka.connect.common.source.task.Context; import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +/** + * FilePatternUtils allows the construction of a regex pattern to extract the + * {@link io.aiven.kafka.connect.common.source.task.Context Context} from a Object Key. The target topic can also be + * overridden using the constructor by supplying a 'targeTopic' otherwise it will be extracted from the Object Key + * + */ public final class FilePatternUtils { + private static final Logger LOGGER = LoggerFactory.getLogger(FilePatternUtils.class); public static final String PATTERN_PARTITION_KEY = "partition"; public static final String PATTERN_TOPIC_KEY = "topic"; public static final String PATTERN_START_OFFSET_KEY = "startOffset"; // no undercore allowed as it breaks the regex. @@ -41,16 +50,32 @@ public final class FilePatternUtils { public static final String START_OFFSET_NAMED_GROUP_REGEX_PATTERN = "(?<" + PATTERN_START_OFFSET_KEY + ">\\d+)"; public static final String NUMBER_REGEX_PATTERN = "(?:\\d+)"; public static final String TOPIC_NAMED_GROUP_REGEX_PATTERN = "(?<" + PATTERN_TOPIC_KEY + ">[a-zA-Z0-9\\-_.]+)"; + public static final String START_OFFSET = "Start offset"; final Pattern pattern; final Optional targetTopic; + private final boolean startOffsetConfigured; + private final boolean partitionConfigured; + /** + * Creates an instance of FilePatternUtils, this constructor is used to configure the Pattern that is used to + * extract Context from Object 'K'. The topic that is set in the Context can be overridden by setting the + * targetTopic in the constructor, to retrieve the topic from Object 'K' using the pattern set targetTopic to Null + * + * @param pattern + * + * @param targetTopic + */ public FilePatternUtils(final String pattern, final String targetTopic) { this.pattern = configurePattern(pattern); this.targetTopic = Optional.ofNullable(targetTopic); + startOffsetConfigured = pattern.contains(START_OFFSET_PATTERN); + partitionConfigured = pattern.contains(PARTITION_PATTERN); } /** + * Sets a Regex Pattern based on initial configuration that allows group regex to be used to extract information + * from the toString() of Object K which is passed in for Context extraction. * * @param expectedSourceNameFormat * This is a string in the expected compatible format which will allow object name or keys to have unique @@ -62,6 +87,7 @@ private Pattern configurePattern(final String expectedSourceNameFormat) { throw new ConfigException( "Source name format is missing please configure the expected source to include the partition pattern."); } + // Build REGEX Matcher String regexString = StringUtils.replace(expectedSourceNameFormat, START_OFFSET_PATTERN, START_OFFSET_NAMED_GROUP_REGEX_PATTERN); @@ -81,9 +107,9 @@ public > Optional> process(final K sourceName final Optional matcher = fileMatches(sourceName.toString()); if (matcher.isPresent()) { final Context ctx = new Context<>(sourceName); - getTopic(matcher.get()).ifPresent(ctx::setTopic); - getPartitionId(matcher.get()).ifPresent(ctx::setPartition); - getOffset(matcher.get()).ifPresent(ctx::setOffset); + getTopic(matcher.get(), sourceName.toString()).ifPresent(ctx::setTopic); + getPartitionId(matcher.get(), sourceName.toString()).ifPresent(ctx::setPartition); + getOffset(matcher.get(), sourceName.toString()).ifPresent(ctx::setOffset); return Optional.of(ctx); } return Optional.empty(); @@ -94,7 +120,7 @@ private Optional fileMatches(final String sourceName) { return matchPattern(sourceName); } - private Optional getTopic(final Matcher matcher) { + private Optional getTopic(final Matcher matcher, final String sourceName) { if (targetTopic.isPresent()) { return targetTopic; } @@ -104,28 +130,35 @@ private Optional getTopic(final Matcher matcher) { } catch (IllegalArgumentException ex) { // It is possible that when checking for the group it does not match and returns an // illegalArgumentException + LOGGER.error("Unable to extract Topic from {} and 'topics' not configured.", sourceName); return Optional.empty(); } } - private Optional getPartitionId(final Matcher matcher) { + private Optional getPartitionId(final Matcher matcher, final String sourceName) { try { return Optional.of(Integer.parseInt(matcher.group(PATTERN_PARTITION_KEY))); } catch (IllegalArgumentException e) { // It is possible that when checking for the group it does not match and returns an // illegalStateException, Number format exception is also covered by this in this case. + if (partitionConfigured) { + LOGGER.warn("Unable to extract Partition id from {}.", sourceName); + } return Optional.empty(); } } - private Optional getOffset(final Matcher matcher) { + private Optional getOffset(final Matcher matcher, final String sourceName) { try { return Optional.of(Integer.parseInt(matcher.group(PATTERN_START_OFFSET_KEY))); } catch (IllegalArgumentException e) { // It is possible that when checking for the group it does not match and returns an // illegalStateException, Number format exception is also covered by this in this case. + if (startOffsetConfigured) { + LOGGER.warn("Unable to extract start offset from {}.", sourceName); + } return Optional.empty(); } diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/source/task/Context.java b/commons/src/main/java/io/aiven/kafka/connect/common/source/task/Context.java index a75ba82a..265ade6d 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/source/task/Context.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/source/task/Context.java @@ -25,7 +25,7 @@ * @param * is is the type/class of the key unique to the object the context is being created about */ -public class Context { +public class Context> { private String topic; private Integer partition; diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/source/task/DistributionStrategy.java b/commons/src/main/java/io/aiven/kafka/connect/common/source/task/DistributionStrategy.java index f0653b8f..8644889c 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/source/task/DistributionStrategy.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/source/task/DistributionStrategy.java @@ -23,10 +23,9 @@ * An {@link DistributionStrategy} provides a mechanism to share the work of processing records from objects (or files) * into tasks, which are subsequently processed (potentially in parallel) by Kafka Connect workers. *

- * The number of objects in cloud storage can be very high, and they are distributed amongst tasks to minimize the - * overhead of assigning work to Kafka worker threads. All objects assigned to the same task will be processed together - * sequentially by the same worker, which can be useful for maintaining order between objects. There are usually fewer - * workers than tasks, and they will be assigned the remaining tasks as work completes. + * The number of objects in cloud storage can be very high, selecting a distribution strategy allows the connector to + * know how to distribute the load across Connector tasks and in some cases using an appropriate strategy can also + * decide on maintaining a level of ordering between messages as well. */ public final class DistributionStrategy { private int maxTasks; diff --git a/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/AwsIntegrationTest.java b/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/AwsIntegrationTest.java index 140b1910..39a6f7f2 100644 --- a/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/AwsIntegrationTest.java +++ b/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/AwsIntegrationTest.java @@ -16,6 +16,8 @@ package io.aiven.kafka.connect.s3.source; +import static io.aiven.kafka.connect.common.config.CommonConfig.MAX_TASKS; +import static io.aiven.kafka.connect.common.config.CommonConfig.TASK_ID; import static io.aiven.kafka.connect.common.config.FileNameFragment.FILE_NAME_TEMPLATE_CONFIG; import static io.aiven.kafka.connect.common.config.SchemaRegistryFragment.AVRO_VALUE_SERIALIZER; import static io.aiven.kafka.connect.common.config.SchemaRegistryFragment.INPUT_FORMAT_KEY; @@ -107,7 +109,7 @@ private Map getConfig(final String topics, final int maxTasks) { config.put(TARGET_TOPICS, topics); config.put("key.converter", "org.apache.kafka.connect.converters.ByteArrayConverter"); config.put(VALUE_CONVERTER_KEY, "org.apache.kafka.connect.converters.ByteArrayConverter"); - config.put("tasks.max", String.valueOf(maxTasks)); + config.put(MAX_TASKS, String.valueOf(maxTasks)); return config; } @@ -126,8 +128,8 @@ void sourceRecordIteratorBytesTest(final TestInfo testInfo) { configData.put(INPUT_FORMAT_KEY, InputFormat.BYTES.getValue()); configData.put(FILE_NAME_TEMPLATE_CONFIG, "{{topic}}-{{partition}}-{{start_offset}}"); - configData.put("task.id", String.valueOf(taskId)); - configData.put("tasks.max", String.valueOf(maxTasks)); + configData.put(TASK_ID, String.valueOf(taskId)); + configData.put(MAX_TASKS, String.valueOf(maxTasks)); final String testData1 = "Hello, Kafka Connect S3 Source! object 1"; final String testData2 = "Hello, Kafka Connect S3 Source! object 2"; @@ -180,8 +182,8 @@ void sourceRecordIteratorAvroTest(final TestInfo testInfo) throws IOException { configData.put(VALUE_CONVERTER_KEY, "io.confluent.connect.avro.AvroConverter"); configData.put(AVRO_VALUE_SERIALIZER, "io.confluent.kafka.serializers.KafkaAvroSerializer"); configData.put(FILE_NAME_TEMPLATE_CONFIG, "{{topic}}-{{partition}}-{{start_offset}}"); - configData.put("task.id", String.valueOf(taskId)); - configData.put("tasks.max", String.valueOf(maxTasks)); + configData.put(TASK_ID, String.valueOf(taskId)); + configData.put(MAX_TASKS, String.valueOf(maxTasks)); // Define Avro schema final String schemaJson = "{\n" + " \"type\": \"record\",\n" + " \"name\": \"TestRecord\",\n" diff --git a/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationTest.java b/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationTest.java index 474372a6..387a6105 100644 --- a/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationTest.java +++ b/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationTest.java @@ -16,6 +16,7 @@ package io.aiven.kafka.connect.s3.source; +import static io.aiven.kafka.connect.common.config.CommonConfig.MAX_TASKS; import static io.aiven.kafka.connect.common.config.FileNameFragment.FILE_NAME_TEMPLATE_CONFIG; import static io.aiven.kafka.connect.common.config.FileNameFragment.FILE_PATH_PREFIX_TEMPLATE_CONFIG; import static io.aiven.kafka.connect.common.config.SchemaRegistryFragment.AVRO_VALUE_SERIALIZER; @@ -362,7 +363,7 @@ private Map getConfig(final String connectorName, final String t config.put(TARGET_TOPICS, topics); config.put("key.converter", "org.apache.kafka.connect.converters.ByteArrayConverter"); config.put(VALUE_CONVERTER_KEY, "org.apache.kafka.connect.converters.ByteArrayConverter"); - config.put("tasks.max", String.valueOf(maxTasks)); + config.put(MAX_TASKS, String.valueOf(maxTasks)); config.put(DISTRIBUTION_TYPE, taskDistributionConfig.value()); config.put(FILE_NAME_TEMPLATE_CONFIG, "{{topic}}" + fileNameSeparator + "{{partition}}" + fileNameSeparator + "{{start_offset}}"); diff --git a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/AivenKafkaConnectS3SourceConnector.java b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/AivenKafkaConnectS3SourceConnector.java index ca0d10a1..18d0f0ad 100644 --- a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/AivenKafkaConnectS3SourceConnector.java +++ b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/AivenKafkaConnectS3SourceConnector.java @@ -16,6 +16,8 @@ package io.aiven.kafka.connect.s3.source; +import static io.aiven.kafka.connect.common.config.CommonConfig.TASK_ID; + import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -62,7 +64,7 @@ public List> taskConfigs(final int maxTasks) { final var taskProps = new ArrayList>(); for (int i = 0; i < maxTasks; i++) { final var props = new HashMap<>(configProperties); // NOPMD - props.put("task.id", String.valueOf(i)); + props.put(TASK_ID, String.valueOf(i)); taskProps.add(props); } return taskProps; diff --git a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/S3SourceTaskTest.java b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/S3SourceTaskTest.java index c915376c..e7b958ab 100644 --- a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/S3SourceTaskTest.java +++ b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/S3SourceTaskTest.java @@ -16,6 +16,8 @@ package io.aiven.kafka.connect.s3.source; +import static io.aiven.kafka.connect.common.config.CommonConfig.MAX_TASKS; +import static io.aiven.kafka.connect.common.config.CommonConfig.TASK_ID; import static io.aiven.kafka.connect.common.config.SchemaRegistryFragment.INPUT_FORMAT_KEY; import static io.aiven.kafka.connect.common.config.SourceConfigFragment.TARGET_TOPICS; import static io.aiven.kafka.connect.common.config.SourceConfigFragment.TARGET_TOPIC_PARTITIONS; @@ -173,8 +175,8 @@ private void setBasicProperties() { properties.putIfAbsent("name", "test_source_connector"); properties.putIfAbsent("key.converter", "org.apache.kafka.connect.converters.ByteArrayConverter"); properties.putIfAbsent("value.converter", "org.apache.kafka.connect.converters.ByteArrayConverter"); - properties.putIfAbsent("tasks.max", "1"); - properties.put("task.id", "1"); + properties.putIfAbsent(MAX_TASKS, "1"); + properties.put(TASK_ID, "1"); properties.putIfAbsent("connector.class", AivenKafkaConnectS3SourceConnector.class.getName()); properties.putIfAbsent(TARGET_TOPIC_PARTITIONS, "0,1"); properties.putIfAbsent(TARGET_TOPICS, "testtopic"); diff --git a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIteratorTest.java b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIteratorTest.java index 4d716c1b..0ea01233 100644 --- a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIteratorTest.java +++ b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIteratorTest.java @@ -73,8 +73,6 @@ @SuppressWarnings("PMD.ExcessiveImports") final class SourceRecordIteratorTest { - public static final String TASK_ID = "task.id"; - public static final String MAX_TASKS = "tasks.max"; private S3SourceConfig mockConfig; private OffsetManager mockOffsetManager; private Transformer mockTransformer;