Skip to content

Commit

Permalink
Updates to reuse constants add javadocs and fix one issue.
Browse files Browse the repository at this point in the history
Signed-off-by: Aindriu Lavelle <[email protected]>
  • Loading branch information
aindriu-aiven committed Jan 18, 2025
1 parent f4a470b commit 5cfecf7
Show file tree
Hide file tree
Showing 8 changed files with 59 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,17 @@
import io.aiven.kafka.connect.common.source.task.Context;

import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* FilePatternUtils allows the construction of a regex pattern to extract the
* {@link io.aiven.kafka.connect.common.source.task.Context Context} from a Object Key. The target topic can also be
* overridden using the constructor by supplying a 'targeTopic' otherwise it will be extracted from the Object Key
*
*/
public final class FilePatternUtils {
private static final Logger LOGGER = LoggerFactory.getLogger(FilePatternUtils.class);
public static final String PATTERN_PARTITION_KEY = "partition";
public static final String PATTERN_TOPIC_KEY = "topic";
public static final String PATTERN_START_OFFSET_KEY = "startOffset"; // no undercore allowed as it breaks the regex.
Expand All @@ -41,16 +50,32 @@ public final class FilePatternUtils {
public static final String START_OFFSET_NAMED_GROUP_REGEX_PATTERN = "(?<" + PATTERN_START_OFFSET_KEY + ">\\d+)";
public static final String NUMBER_REGEX_PATTERN = "(?:\\d+)";
public static final String TOPIC_NAMED_GROUP_REGEX_PATTERN = "(?<" + PATTERN_TOPIC_KEY + ">[a-zA-Z0-9\\-_.]+)";
public static final String START_OFFSET = "Start offset";

final Pattern pattern;
final Optional<String> targetTopic;
private final boolean startOffsetConfigured;
private final boolean partitionConfigured;

/**
* Creates an instance of FilePatternUtils, this constructor is used to configure the Pattern that is used to
* extract Context from Object 'K'. The topic that is set in the Context can be overridden by setting the
* targetTopic in the constructor, to retrieve the topic from Object 'K' using the pattern set targetTopic to Null
*
* @param pattern
*
* @param targetTopic
*/
public FilePatternUtils(final String pattern, final String targetTopic) {
this.pattern = configurePattern(pattern);
this.targetTopic = Optional.ofNullable(targetTopic);
startOffsetConfigured = pattern.contains(START_OFFSET_PATTERN);
partitionConfigured = pattern.contains(PARTITION_PATTERN);
}

/**
* Sets a Regex Pattern based on initial configuration that allows group regex to be used to extract information
* from the toString() of Object K which is passed in for Context extraction.
*
* @param expectedSourceNameFormat
* This is a string in the expected compatible format which will allow object name or keys to have unique
Expand All @@ -62,6 +87,7 @@ private Pattern configurePattern(final String expectedSourceNameFormat) {
throw new ConfigException(
"Source name format is missing please configure the expected source to include the partition pattern.");
}

// Build REGEX Matcher
String regexString = StringUtils.replace(expectedSourceNameFormat, START_OFFSET_PATTERN,
START_OFFSET_NAMED_GROUP_REGEX_PATTERN);
Expand All @@ -81,9 +107,9 @@ public <K extends Comparable<K>> Optional<Context<K>> process(final K sourceName
final Optional<Matcher> matcher = fileMatches(sourceName.toString());
if (matcher.isPresent()) {
final Context<K> ctx = new Context<>(sourceName);
getTopic(matcher.get()).ifPresent(ctx::setTopic);
getPartitionId(matcher.get()).ifPresent(ctx::setPartition);
getOffset(matcher.get()).ifPresent(ctx::setOffset);
getTopic(matcher.get(), sourceName.toString()).ifPresent(ctx::setTopic);
getPartitionId(matcher.get(), sourceName.toString()).ifPresent(ctx::setPartition);
getOffset(matcher.get(), sourceName.toString()).ifPresent(ctx::setOffset);
return Optional.of(ctx);
}
return Optional.empty();
Expand All @@ -94,7 +120,7 @@ private Optional<Matcher> fileMatches(final String sourceName) {
return matchPattern(sourceName);
}

private Optional<String> getTopic(final Matcher matcher) {
private Optional<String> getTopic(final Matcher matcher, final String sourceName) {
if (targetTopic.isPresent()) {
return targetTopic;
}
Expand All @@ -104,28 +130,35 @@ private Optional<String> getTopic(final Matcher matcher) {
} catch (IllegalArgumentException ex) {
// It is possible that when checking for the group it does not match and returns an
// illegalArgumentException
LOGGER.error("Unable to extract Topic from {} and 'topics' not configured.", sourceName);
return Optional.empty();
}

}

private Optional<Integer> getPartitionId(final Matcher matcher) {
private Optional<Integer> getPartitionId(final Matcher matcher, final String sourceName) {
try {
return Optional.of(Integer.parseInt(matcher.group(PATTERN_PARTITION_KEY)));
} catch (IllegalArgumentException e) {
// It is possible that when checking for the group it does not match and returns an
// illegalStateException, Number format exception is also covered by this in this case.
if (partitionConfigured) {
LOGGER.warn("Unable to extract Partition id from {}.", sourceName);
}
return Optional.empty();
}

}

private Optional<Integer> getOffset(final Matcher matcher) {
private Optional<Integer> getOffset(final Matcher matcher, final String sourceName) {
try {
return Optional.of(Integer.parseInt(matcher.group(PATTERN_START_OFFSET_KEY)));
} catch (IllegalArgumentException e) {
// It is possible that when checking for the group it does not match and returns an
// illegalStateException, Number format exception is also covered by this in this case.
if (startOffsetConfigured) {
LOGGER.warn("Unable to extract start offset from {}.", sourceName);
}
return Optional.empty();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
* @param <K>
* is is the type/class of the key unique to the object the context is being created about
*/
public class Context<K> {
public class Context<K extends Comparable<K>> {

private String topic;
private Integer partition;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,9 @@
* An {@link DistributionStrategy} provides a mechanism to share the work of processing records from objects (or files)
* into tasks, which are subsequently processed (potentially in parallel) by Kafka Connect workers.
* <p>
* The number of objects in cloud storage can be very high, and they are distributed amongst tasks to minimize the
* overhead of assigning work to Kafka worker threads. All objects assigned to the same task will be processed together
* sequentially by the same worker, which can be useful for maintaining order between objects. There are usually fewer
* workers than tasks, and they will be assigned the remaining tasks as work completes.
* The number of objects in cloud storage can be very high, selecting a distribution strategy allows the connector to
* know how to distribute the load across Connector tasks and in some cases using an appropriate strategy can also
* decide on maintaining a level of ordering between messages as well.
*/
public final class DistributionStrategy {
private int maxTasks;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package io.aiven.kafka.connect.s3.source;

import static io.aiven.kafka.connect.common.config.CommonConfig.MAX_TASKS;
import static io.aiven.kafka.connect.common.config.CommonConfig.TASK_ID;
import static io.aiven.kafka.connect.common.config.FileNameFragment.FILE_NAME_TEMPLATE_CONFIG;
import static io.aiven.kafka.connect.common.config.SchemaRegistryFragment.AVRO_VALUE_SERIALIZER;
import static io.aiven.kafka.connect.common.config.SchemaRegistryFragment.INPUT_FORMAT_KEY;
Expand Down Expand Up @@ -107,7 +109,7 @@ private Map<String, String> getConfig(final String topics, final int maxTasks) {
config.put(TARGET_TOPICS, topics);
config.put("key.converter", "org.apache.kafka.connect.converters.ByteArrayConverter");
config.put(VALUE_CONVERTER_KEY, "org.apache.kafka.connect.converters.ByteArrayConverter");
config.put("tasks.max", String.valueOf(maxTasks));
config.put(MAX_TASKS, String.valueOf(maxTasks));
return config;
}

Expand All @@ -126,8 +128,8 @@ void sourceRecordIteratorBytesTest(final TestInfo testInfo) {

configData.put(INPUT_FORMAT_KEY, InputFormat.BYTES.getValue());
configData.put(FILE_NAME_TEMPLATE_CONFIG, "{{topic}}-{{partition}}-{{start_offset}}");
configData.put("task.id", String.valueOf(taskId));
configData.put("tasks.max", String.valueOf(maxTasks));
configData.put(TASK_ID, String.valueOf(taskId));
configData.put(MAX_TASKS, String.valueOf(maxTasks));
final String testData1 = "Hello, Kafka Connect S3 Source! object 1";
final String testData2 = "Hello, Kafka Connect S3 Source! object 2";

Expand Down Expand Up @@ -180,8 +182,8 @@ void sourceRecordIteratorAvroTest(final TestInfo testInfo) throws IOException {
configData.put(VALUE_CONVERTER_KEY, "io.confluent.connect.avro.AvroConverter");
configData.put(AVRO_VALUE_SERIALIZER, "io.confluent.kafka.serializers.KafkaAvroSerializer");
configData.put(FILE_NAME_TEMPLATE_CONFIG, "{{topic}}-{{partition}}-{{start_offset}}");
configData.put("task.id", String.valueOf(taskId));
configData.put("tasks.max", String.valueOf(maxTasks));
configData.put(TASK_ID, String.valueOf(taskId));
configData.put(MAX_TASKS, String.valueOf(maxTasks));

// Define Avro schema
final String schemaJson = "{\n" + " \"type\": \"record\",\n" + " \"name\": \"TestRecord\",\n"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.aiven.kafka.connect.s3.source;

import static io.aiven.kafka.connect.common.config.CommonConfig.MAX_TASKS;
import static io.aiven.kafka.connect.common.config.FileNameFragment.FILE_NAME_TEMPLATE_CONFIG;
import static io.aiven.kafka.connect.common.config.FileNameFragment.FILE_PATH_PREFIX_TEMPLATE_CONFIG;
import static io.aiven.kafka.connect.common.config.SchemaRegistryFragment.AVRO_VALUE_SERIALIZER;
Expand Down Expand Up @@ -362,7 +363,7 @@ private Map<String, String> getConfig(final String connectorName, final String t
config.put(TARGET_TOPICS, topics);
config.put("key.converter", "org.apache.kafka.connect.converters.ByteArrayConverter");
config.put(VALUE_CONVERTER_KEY, "org.apache.kafka.connect.converters.ByteArrayConverter");
config.put("tasks.max", String.valueOf(maxTasks));
config.put(MAX_TASKS, String.valueOf(maxTasks));
config.put(DISTRIBUTION_TYPE, taskDistributionConfig.value());
config.put(FILE_NAME_TEMPLATE_CONFIG,
"{{topic}}" + fileNameSeparator + "{{partition}}" + fileNameSeparator + "{{start_offset}}");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package io.aiven.kafka.connect.s3.source;

import static io.aiven.kafka.connect.common.config.CommonConfig.TASK_ID;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -62,7 +64,7 @@ public List<Map<String, String>> taskConfigs(final int maxTasks) {
final var taskProps = new ArrayList<Map<String, String>>();
for (int i = 0; i < maxTasks; i++) {
final var props = new HashMap<>(configProperties); // NOPMD
props.put("task.id", String.valueOf(i));
props.put(TASK_ID, String.valueOf(i));
taskProps.add(props);
}
return taskProps;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package io.aiven.kafka.connect.s3.source;

import static io.aiven.kafka.connect.common.config.CommonConfig.MAX_TASKS;
import static io.aiven.kafka.connect.common.config.CommonConfig.TASK_ID;
import static io.aiven.kafka.connect.common.config.SchemaRegistryFragment.INPUT_FORMAT_KEY;
import static io.aiven.kafka.connect.common.config.SourceConfigFragment.TARGET_TOPICS;
import static io.aiven.kafka.connect.common.config.SourceConfigFragment.TARGET_TOPIC_PARTITIONS;
Expand Down Expand Up @@ -173,8 +175,8 @@ private void setBasicProperties() {
properties.putIfAbsent("name", "test_source_connector");
properties.putIfAbsent("key.converter", "org.apache.kafka.connect.converters.ByteArrayConverter");
properties.putIfAbsent("value.converter", "org.apache.kafka.connect.converters.ByteArrayConverter");
properties.putIfAbsent("tasks.max", "1");
properties.put("task.id", "1");
properties.putIfAbsent(MAX_TASKS, "1");
properties.put(TASK_ID, "1");
properties.putIfAbsent("connector.class", AivenKafkaConnectS3SourceConnector.class.getName());
properties.putIfAbsent(TARGET_TOPIC_PARTITIONS, "0,1");
properties.putIfAbsent(TARGET_TOPICS, "testtopic");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,6 @@
@SuppressWarnings("PMD.ExcessiveImports")
final class SourceRecordIteratorTest {

public static final String TASK_ID = "task.id";
public static final String MAX_TASKS = "tasks.max";
private S3SourceConfig mockConfig;
private OffsetManager mockOffsetManager;
private Transformer mockTransformer;
Expand Down

0 comments on commit 5cfecf7

Please sign in to comment.