Skip to content

Commit

Permalink
From review, moving pattern config to utils
Browse files Browse the repository at this point in the history
  • Loading branch information
muralibasani committed Jan 7, 2025
1 parent d082cb8 commit 4e8ef85
Show file tree
Hide file tree
Showing 18 changed files with 253 additions and 185 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.apache.kafka.common.config.ConfigDef;

import io.aiven.kafka.connect.common.config.enums.ErrorsTolerance;
import io.aiven.kafka.connect.common.config.enums.ObjectDistributionStrategies;
import io.aiven.kafka.connect.common.config.enums.ObjectDistributionStrategy;
import io.aiven.kafka.connect.common.source.input.InputFormat;

public class SourceCommonConfig extends CommonConfig {
Expand Down Expand Up @@ -68,8 +68,8 @@ public ErrorsTolerance getErrorsTolerance() {
return ErrorsTolerance.forName(sourceConfigFragment.getErrorsTolerance());
}

public ObjectDistributionStrategies getObjectDistributionStrategy() {
return ObjectDistributionStrategies.forName(sourceConfigFragment.getObjectDistributionStrategy());
public ObjectDistributionStrategy getObjectDistributionStrategy() {
return ObjectDistributionStrategy.forName(sourceConfigFragment.getObjectDistributionStrategy());
}

public int getMaxPollRecords() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,15 @@

package io.aiven.kafka.connect.common.config;

import static io.aiven.kafka.connect.common.config.enums.ObjectDistributionStrategies.OBJECT_HASH;
import static io.aiven.kafka.connect.common.config.enums.ObjectDistributionStrategies.PARTITION_IN_FILENAME;
import static io.aiven.kafka.connect.common.config.enums.ObjectDistributionStrategies.PARTITION_IN_FILEPATH;
import static io.aiven.kafka.connect.common.config.enums.ObjectDistributionStrategy.OBJECT_HASH;
import static io.aiven.kafka.connect.common.config.enums.ObjectDistributionStrategy.PARTITION_IN_FILENAME;
import static io.aiven.kafka.connect.common.config.enums.ObjectDistributionStrategy.PARTITION_IN_FILEPATH;

import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;

import io.aiven.kafka.connect.common.config.enums.ErrorsTolerance;
import io.aiven.kafka.connect.common.config.enums.ObjectDistributionStrategies;
import io.aiven.kafka.connect.common.config.enums.ObjectDistributionStrategy;

import org.apache.commons.lang3.StringUtils;

Expand Down Expand Up @@ -126,7 +126,7 @@ public void ensureValid(final String name, final Object value) {
final String objectDistributionStrategy = (String) value;
if (StringUtils.isNotBlank(objectDistributionStrategy)) {
// This will throw an Exception if not a valid value.
ObjectDistributionStrategies.forName(objectDistributionStrategy);
ObjectDistributionStrategy.forName(objectDistributionStrategy);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

import org.apache.kafka.common.config.ConfigException;

public enum ObjectDistributionStrategies {
public enum ObjectDistributionStrategy {

OBJECT_HASH("object_hash"), PARTITION_IN_FILENAME("partition_in_filename"), PARTITION_IN_FILEPATH(
"partition_in_filepath");
Expand All @@ -32,18 +32,18 @@ public String value() {
return name;
}

ObjectDistributionStrategies(final String name) {
ObjectDistributionStrategy(final String name) {
this.name = name;
}

public static ObjectDistributionStrategies forName(final String name) {
public static ObjectDistributionStrategy forName(final String name) {
Objects.requireNonNull(name, "name cannot be null");
for (final ObjectDistributionStrategies objectDistributionStrategies : ObjectDistributionStrategies.values()) {
if (objectDistributionStrategies.name.equalsIgnoreCase(name)) {
return objectDistributionStrategies;
for (final ObjectDistributionStrategy objectDistributionStrategy : ObjectDistributionStrategy.values()) {
if (objectDistributionStrategy.name.equalsIgnoreCase(name)) {
return objectDistributionStrategy;
}
}
throw new ConfigException(String.format("Unknown object.distribution.strategy type: %s, allowed values %s ",
name, Arrays.toString(ObjectDistributionStrategies.values())));
name, Arrays.toString(ObjectDistributionStrategy.values())));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright 2025 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.kafka.connect.common.source.input;
public class FileExtractionPatterns {
public static final String PATTERN_PARTITION_KEY = "partition";
public static final String PATTERN_TOPIC_KEY = "topic";
public static final String START_OFFSET_PATTERN = "{{start_offset}}";
public static final String TIMESTAMP_PATTERN = "{{timestamp}}";
public static final String PARTITION_PATTERN = "{{" + PATTERN_PARTITION_KEY + "}}";
public static final String TOPIC_PATTERN = "{{" + PATTERN_TOPIC_KEY + "}}";

// Use a named group to return the partition in a complex string to always get the correct information for the
// partition number.
public static final String PARTITION_NAMED_GROUP_REGEX_PATTERN = "(?<" + PATTERN_PARTITION_KEY + ">\\d+)";
public static final String NUMBER_REGEX_PATTERN = "(?:\\d+)";
public static final String TOPIC_NAMED_GROUP_REGEX_PATTERN = "(?<" + PATTERN_TOPIC_KEY + ">[a-zA-Z0-9\\-_.]+)";
public static final String DEFAULT_PREFIX_FILE_PATH_PATTERN = "topics/{{" + PATTERN_TOPIC_KEY + "}}/partition={{"
+ PATTERN_PARTITION_KEY + "}}/";
public static final String ANY_FILENAME_PATTERN = ".*$";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright 2025 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.kafka.connect.common.source.input;

import static io.aiven.kafka.connect.common.source.input.FileExtractionPatterns.NUMBER_REGEX_PATTERN;
import static io.aiven.kafka.connect.common.source.input.FileExtractionPatterns.PARTITION_NAMED_GROUP_REGEX_PATTERN;
import static io.aiven.kafka.connect.common.source.input.FileExtractionPatterns.PARTITION_PATTERN;
import static io.aiven.kafka.connect.common.source.input.FileExtractionPatterns.START_OFFSET_PATTERN;
import static io.aiven.kafka.connect.common.source.input.FileExtractionPatterns.TIMESTAMP_PATTERN;
import static io.aiven.kafka.connect.common.source.input.FileExtractionPatterns.TOPIC_NAMED_GROUP_REGEX_PATTERN;
import static io.aiven.kafka.connect.common.source.input.FileExtractionPatterns.TOPIC_PATTERN;

import java.util.regex.Pattern;

import org.apache.kafka.common.config.ConfigException;

import org.apache.commons.lang3.StringUtils;

public final class FilePatternUtils {

private FilePatternUtils() {
// hidden
}
public static Pattern configurePattern(final String expectedSourceNameFormat) {
if (expectedSourceNameFormat == null || !expectedSourceNameFormat.contains(PARTITION_PATTERN)) {
throw new ConfigException(String.format(
"Source name format %s missing partition pattern {{partition}} please configure the expected source to include the partition pattern.",
expectedSourceNameFormat));
}
// Build REGEX Matcher
String regexString = StringUtils.replace(expectedSourceNameFormat, START_OFFSET_PATTERN, NUMBER_REGEX_PATTERN);
regexString = StringUtils.replace(regexString, TIMESTAMP_PATTERN, NUMBER_REGEX_PATTERN);
regexString = StringUtils.replace(regexString, TOPIC_PATTERN, TOPIC_NAMED_GROUP_REGEX_PATTERN);
regexString = StringUtils.replace(regexString, PARTITION_PATTERN, PARTITION_NAMED_GROUP_REGEX_PATTERN);
try {
return Pattern.compile(regexString);
} catch (IllegalArgumentException iae) {
throw new ConfigException(
String.format("Unable to compile the regex pattern %s to retrieve the partition id.", regexString),
iae);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,35 +18,16 @@

import java.util.regex.Pattern;

import org.apache.kafka.common.config.ConfigException;

import org.apache.commons.lang3.StringUtils;

/**
* An {@link ObjectDistributionStrategy} provides a mechanism to share the work of processing records from objects (or
* files) into tasks, which are subsequently processed (potentially in parallel) by Kafka Connect workers.
* An {@link DistributionStrategy} provides a mechanism to share the work of processing records from objects (or files)
* into tasks, which are subsequently processed (potentially in parallel) by Kafka Connect workers.
* <p>
* The number of objects in cloud storage can be very high, and they are distributed amongst tasks to minimize the
* overhead of assigning work to Kafka worker threads. All objects assigned to the same task will be processed together
* sequentially by the same worker, which can be useful for maintaining order between objects. There are usually fewer
* workers than tasks, and they will be assigned the remaining tasks as work completes.
*/
public interface ObjectDistributionStrategy {

String NUMBER_REGEX_PATTERN = "(?:\\d+)";
// Use a named group to return the partition in a complex string to always get the correct information for the
// partition number.
String PATTERN_PARTITION_KEY = "partition";
String PARTITION_NAMED_GROUP_REGEX_PATTERN = "(?<" + PATTERN_PARTITION_KEY + ">\\d+)";
String PARTITION_PATTERN = "{{" + PATTERN_PARTITION_KEY + "}}";
String PATTERN_TOPIC_KEY = "topic";
String TOPIC_PATTERN = "{{" + PATTERN_TOPIC_KEY + "}}";
String TOPIC_NAMED_GROUP_REGEX_PATTERN = "(?<" + PATTERN_TOPIC_KEY + ">[a-zA-Z0-9\\-_.]+)";
String START_OFFSET_PATTERN = "{{start_offset}}";
String TIMESTAMP_PATTERN = "{{timestamp}}";
String DEFAULT_PREFIX_FILE_PATH_PATTERN = "topics/{{topic}}/partition={{partition}}/";
String ANY_FILENAME_PATTERN = ".*$";

public interface DistributionStrategy {
/**
* Check if the object should be processed by the task with the given {@code taskId}. Any single object should be
* assigned deterministically to a single taskId.
Expand All @@ -65,10 +46,10 @@ public interface ObjectDistributionStrategy {
*
* @param maxTasks
* The maximum number of tasks created for the Connector
* @param expectedFormat
* The expected format, of files, path, table names or other ways to partition the tasks.
* @param filePattern
* The expected filePattern of files, path, table names or other ways to partition the tasks.
*/
void reconfigureDistributionStrategy(int maxTasks, String expectedFormat);
void reconfigureDistributionStrategy(int maxTasks, Pattern filePattern);

/**
* Check if the task is responsible for this set of files by checking if the given task matches the partition id.
Expand Down Expand Up @@ -98,39 +79,19 @@ default boolean taskMatchesPartition(final int taskId, final int partitionId) {
* @return true if the task supplied should handle the supplied partition
*/
default boolean taskMatchesModOfPartitionAndMaxTask(final int taskId, final int maxTasks, final int partitionId) {

return taskMatchesPartition(taskId, partitionId % maxTasks);
}

default boolean toBeProcessedByThisTask(final int taskId, final int maxTasks, final int partitionId) {
return partitionId < maxTasks
? taskMatchesPartition(taskId, partitionId)
: taskMatchesModOfPartitionAndMaxTask(taskId, maxTasks, partitionId);

}

Pattern getFilePattern();

/**
* Based on the format of the file name or prefix, Pattern is created for each of the strategies.
* return the configured file pattern
*
* @return Pattern file pattern
*/
default Pattern configurePattern(final String expectedSourceNameFormat) {
if (expectedSourceNameFormat == null || !expectedSourceNameFormat.contains(PARTITION_PATTERN)) {
throw new ConfigException(String.format(
"Source name format %s missing partition pattern {{partition}} please configure the expected source to include the partition pattern.",
expectedSourceNameFormat));
}
// Build REGEX Matcher
String regexString = StringUtils.replace(expectedSourceNameFormat, START_OFFSET_PATTERN, NUMBER_REGEX_PATTERN);
regexString = StringUtils.replace(regexString, TIMESTAMP_PATTERN, NUMBER_REGEX_PATTERN);
regexString = StringUtils.replace(regexString, TOPIC_PATTERN, TOPIC_NAMED_GROUP_REGEX_PATTERN);
regexString = StringUtils.replace(regexString, PARTITION_PATTERN, PARTITION_NAMED_GROUP_REGEX_PATTERN);
try {
return Pattern.compile(regexString);
} catch (IllegalArgumentException iae) {
throw new ConfigException(
String.format("Unable to compile the regex pattern %s to retrieve the partition id.", regexString),
iae);
}
}
Pattern getFilePattern();
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,18 @@
import org.slf4j.LoggerFactory;

/**
* {@link HashObjectDistributionStrategy} evenly distributes cloud storage objects between tasks using the hashcode of
* the object's filename, which is uniformly distributed and deterministic across workers.
* {@link HashDistributionStrategy} evenly distributes cloud storage objects between tasks using the hashcode of the
* object's filename, which is uniformly distributed and deterministic across workers.
* <p>
* This is well-suited to use cases where the order of events between records from objects is not important, especially
* when ingesting files into Kafka that were not previously created by a supported cloud storage Sink.
*/
public final class HashObjectDistributionStrategy implements ObjectDistributionStrategy {
private final static Logger LOG = LoggerFactory.getLogger(HashObjectDistributionStrategy.class);
public final class HashDistributionStrategy implements DistributionStrategy {
private final static Logger LOG = LoggerFactory.getLogger(HashDistributionStrategy.class);
private int maxTasks;
private Pattern filePattern;
public HashObjectDistributionStrategy(final int maxTasks, final String expectedSourceNameFormat) {
configureDistributionStrategy(maxTasks, expectedSourceNameFormat);
public HashDistributionStrategy(final int maxTasks, final Pattern filePattern) {
configureDistributionStrategy(maxTasks, filePattern);
}

@Override
Expand All @@ -49,7 +49,7 @@ public boolean isPartOfTask(final int taskId, final String filenameToBeEvaluated
}

@Override
public void reconfigureDistributionStrategy(final int maxTasks, final String expectedFormat) {
public void reconfigureDistributionStrategy(final int maxTasks, final Pattern filePattern) {
this.maxTasks = maxTasks;
}

Expand All @@ -62,8 +62,8 @@ public void setMaxTasks(final int maxTasks) {
this.maxTasks = maxTasks;
}

private void configureDistributionStrategy(final int maxTasks, final String expectedSourceNameFormat) {
private void configureDistributionStrategy(final int maxTasks, final Pattern filePattern) {
this.maxTasks = maxTasks;
this.filePattern = configurePattern(expectedSourceNameFormat);
this.filePattern = filePattern;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package io.aiven.kafka.connect.common.source.task;

import static io.aiven.kafka.connect.common.source.input.FileExtractionPatterns.PATTERN_PARTITION_KEY;

import java.util.regex.Matcher;
import java.util.regex.Pattern;

Expand All @@ -30,10 +32,8 @@
* {@code topicname-{{partition}}-{{start_offset}}}, and we want all objects with the same partition to be processed
* within a single task.
*/
public final class PartitionInFilenameDistributionStrategy implements ObjectDistributionStrategy {
public final class PartitionInFilenameDistributionStrategy implements DistributionStrategy {
private final static Logger LOG = LoggerFactory.getLogger(PartitionInFilenameDistributionStrategy.class);

public static final String PARTITION = "partition";
private Pattern filePattern;
private int maxTasks;

Expand All @@ -42,9 +42,9 @@ public Pattern getFilePattern() {
return filePattern;
}

public PartitionInFilenameDistributionStrategy(final int maxTasks, final String expectedSourceNameFormat) {
public PartitionInFilenameDistributionStrategy(final int maxTasks, final Pattern filePattern) {
this.maxTasks = maxTasks;
this.filePattern = configurePattern(expectedSourceNameFormat);
this.filePattern = filePattern;
}

/**
Expand All @@ -61,7 +61,7 @@ public boolean isPartOfTask(final int taskId, final String sourceNameToBeEvaluat
}
final Matcher match = filePattern.matcher(sourceNameToBeEvaluated);
if (match.find()) {
return toBeProcessedByThisTask(taskId, maxTasks, Integer.parseInt(match.group(PARTITION)));
return toBeProcessedByThisTask(taskId, maxTasks, Integer.parseInt(match.group(PATTERN_PARTITION_KEY)));
}
LOG.warn("Unable to find the partition from this file name {}", sourceNameToBeEvaluated);
return false;
Expand All @@ -73,12 +73,12 @@ public boolean isPartOfTask(final int taskId, final String sourceNameToBeEvaluat
*
* @param maxTasks
* maximum number of configured tasks for this connector
* @param expectedSourceNameFormat
* what the format of the source should appear like so to configure the task distribution.
* @param filePattern
* what the pattern of the source should appear like so to configure the task distribution.
*/
@Override
public void reconfigureDistributionStrategy(final int maxTasks, final String expectedSourceNameFormat) {
public void reconfigureDistributionStrategy(final int maxTasks, final Pattern filePattern) {
this.maxTasks = maxTasks;
this.filePattern = configurePattern(expectedSourceNameFormat);
this.filePattern = filePattern;
}
}
Loading

0 comments on commit 4e8ef85

Please sign in to comment.