Skip to content

Commit

Permalink
Source connector parallelization (#17)
Browse files Browse the repository at this point in the history
  • Loading branch information
jaredpetersen authored Dec 11, 2020
1 parent 4bae458 commit 34c64f9
Show file tree
Hide file tree
Showing 17 changed files with 222 additions and 38 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,13 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [1.1.0] - 2020-12-11
### Added
- Parallelization for source connector based on channels/patterns

### Removed
- Default configuration for Kafka topic

## [1.0.4] - 2020-11-29
### Added
- Added support for sinking arbitrary Redis commands, primarily for use with Redis modules
Expand Down
3 changes: 3 additions & 0 deletions docs/connectors/SINK.md
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,9 @@ Keys are ignored.
```

## Configuration
### Parallelization
Splitting the workload between multiple tasks is possible via the configuration property `tasks.max`. The configured number will exactly determine the number of tasks that are created.

### Connector Properties
| Name | Type | Default | Importance | Description |
| ----------------------- | ------- | ------- | ---------- | ------------------------------------------------------- |
Expand Down
4 changes: 2 additions & 2 deletions docs/connectors/SOURCE.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,13 @@ In the case of subscribing to Redis keyspace notifications, it may be useful to
The plugin can be configured to use an alternative partitioning strategy if desired. Set the configuration property `connector.client.config.override.policy` to value `All` on the Kafka Connect worker (the overall Kafka Connect application that runs plugins). This will allow the override of the internal Kafka producer and consumer configurations. To override the partitioner for an individual connector plugin, add the configuration property `producer.override.partitioner.class` to the connector plugin with a value that points to a class implementing the [Partitioner](https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/Partitioner.java) interface, e.g. `org.apache.kafka.clients.producer.internals.DefaultPartitioner`.

## Parallelization
Splitting the workload between multiple tasks via the configuration property `max.tasks` is not supported at this time. Support for this will be added in the future.
Splitting the workload between multiple tasks is possible via the configuration property `tasks.max`. The connector splits the work based on the number of configured channels/patterns. If the max tasks configuration exceeds the number of channels/patterns, the number of channels/patterns will be used instead as the maximum.

## Configuration
### Connector Properties
| Name | Type | Default | Importance | Description |
| --------------------------------- | ------- | -------------- | ---------- | ------------------------------------------------------- |
| `topic` | string | `redis.events` | High | Topic to write to. |
| `topic` | string | | High | Topic to write to. |
| `redis.uri` | string | | High | Redis connection information provided via a URI string. |
| `redis.cluster.enabled` | boolean | false | High | Target Redis is running as a cluster. |
| `redis.channels` | string | | High | Redis channels to subscribe to separated by commas. |
Expand Down
2 changes: 1 addition & 1 deletion docs/demo/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ docker build -t jaredpetersen/redis:latest .

Next, we'll need to build a docker image for Kafka Connect Redis. Navigate to `demo/docker/kafka-connect-redis` and run the following commands:
```bash
curl -O https://oss.sonatype.org/service/local/repositories/releases/content/io/github/jaredpetersen/kafka-connect-redis/1.0.4/kafka-connect-redis-1.0.4.jar
curl -O https://repo1.maven.org/maven2/io/github/jaredpetersen/kafka-connect-redis/1.1.0/kafka-connect-redis-1.1.0.jar
docker build -t jaredpetersen/kafka-connect-redis:latest .
```

Expand Down
2 changes: 1 addition & 1 deletion docs/demo/SINK.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ curl --request POST \
"key.converter.schema.registry.url": "http://kafka-schema-registry:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://kafka-schema-registry:8081",
"tasks.max": "1",
"tasks.max": "3",
"topics": "redis.commands.set,redis.commands.expire,redis.commands.expireat,redis.commands.pexpire,redis.commands.sadd,redis.commands.geoadd,redis.commands.arbitrary",
"redis.uri": "redis://IEPfIr0eLF7UsfwrIlzy80yUaBG258j9@redis-cluster",
"redis.cluster.enabled": true
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

<groupId>io.github.jaredpetersen</groupId>
<artifactId>kafka-connect-redis</artifactId>
<version>1.0.4</version>
<version>1.1.0</version>
<packaging>jar</packaging>

<name>kafka-connect-redis</name>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,16 @@
import java.util.List;
import java.util.Map;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkConnector;

/**
* Entry point for Kafka Connect Redis Sink.
*/
public class RedisSinkConnector extends SinkConnector {
private Map<String, String> config;
private RedisSinkConfig config;

@Override
public String version() {
Expand All @@ -22,7 +24,12 @@ public String version() {

@Override
public void start(final Map<String, String> props) {
this.config = props;
try {
this.config = new RedisSinkConfig(props);
}
catch (ConfigException configException) {
throw new ConnectException("connector configuration error");
}
}

@Override
Expand All @@ -35,7 +42,7 @@ public List<Map<String, String>> taskConfigs(final int maxTasks) {
List<Map<String, String>> taskConfigs = new ArrayList<>(maxTasks);

for (int configIndex = 0; configIndex < maxTasks; ++configIndex) {
taskConfigs.add(this.config);
taskConfigs.add(this.config.originalsStrings());
}

return taskConfigs;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import io.lettuce.core.cluster.api.reactive.RedisClusterReactiveCommands;
import java.util.Collection;
import java.util.Map;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
Expand Down Expand Up @@ -43,8 +44,16 @@ public String version() {
@Override
public void start(final Map<String, String> props) {
// Map the task properties to config object
final RedisSinkConfig config = new RedisSinkConfig(props);
final RedisSinkConfig config;

try {
config = new RedisSinkConfig(props);
}
catch (ConfigException configException) {
throw new ConnectException("task configuration error");
}

// Set up the writer
if (config.isRedisClusterEnabled()) {
this.redisClusterClient = RedisClusterClient.create(config.getRedisUri());
this.redisClusterConnection = this.redisClusterClient.connect();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,11 @@
import org.apache.kafka.common.config.ConfigDef.Type;

public class RedisSinkConfig extends AbstractConfig {
// TODO Store as password
private static final String REDIS_URI = "redis.uri";
public static final String REDIS_URI = "redis.uri";
private static final String REDIS_URI_DOC = "Redis uri.";
private final String redisUri;

private static final String REDIS_CLUSTER_ENABLED = "redis.cluster.enabled";
public static final String REDIS_CLUSTER_ENABLED = "redis.cluster.enabled";
private static final String REDIS_CLUSTER_ENABLED_DOC = "Redis cluster mode enabled.";
private static final boolean REDIS_CLUSTER_ENABLED_DEFAULT = false;
private final boolean redisClusterEnabled;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,22 @@

import io.github.jaredpetersen.kafkaconnectredis.source.config.RedisSourceConfig;
import io.github.jaredpetersen.kafkaconnectredis.util.VersionUtil;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.util.ConnectorUtils;

/**
* Entry point for Kafka Connect Redis Sink.
*/
public class RedisSourceConnector extends SourceConnector {
private Map<String, String> config;
private RedisSourceConfig config;

@Override
public String version() {
Expand All @@ -22,7 +26,13 @@ public String version() {

@Override
public void start(final Map<String, String> props) {
this.config = props;
// Map the connector properties to config object
try {
this.config = new RedisSourceConfig(props);
}
catch (ConfigException configException) {
throw new ConnectException("connector configuration error", configException);
}
}

@Override
Expand All @@ -32,8 +42,19 @@ public Class<? extends Task> taskClass() {

@Override
public List<Map<String, String>> taskConfigs(final int maxTasks) {
// TODO create a task for each channel/pattern
return Collections.singletonList(this.config);
// Partition the configs based on channels
final List<List<String>> partitionedRedisChannels = ConnectorUtils
.groupPartitions(this.config.getRedisChannels(), Math.min(this.config.getRedisChannels().size(), maxTasks));

// Create task configs based on the partitions
return partitionedRedisChannels.stream()
.map(redisChannels -> {
final Map<String, String> taskConfig = new HashMap<>(this.config.originalsStrings());
taskConfig.put(RedisSourceConfig.REDIS_CHANNELS, String.join(",", redisChannels));

return taskConfig;
})
.collect(Collectors.toList());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
import java.util.List;
import java.util.Map;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.slf4j.Logger;
Expand Down Expand Up @@ -44,8 +46,16 @@ public String version() {
@Override
public void start(Map<String, String> props) {
// Map the task properties to config object
final RedisSourceConfig config = new RedisSourceConfig(props);
final RedisSourceConfig config;

try {
config = new RedisSourceConfig(props);
}
catch (ConfigException configException) {
throw new ConnectException("task configuration error", configException);
}

// Set up the subscriber for Redis
final RedisSubscriber redisSubscriber;

if (config.isRedisClusterEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,33 +8,31 @@
import org.apache.kafka.common.config.ConfigDef.Type;

public class RedisSourceConfig extends AbstractConfig {
private static final String TOPIC = "topic";
public static final String TOPIC = "topic";
private static final String TOPIC_DOC = "Topic to write to.";
private static final String TOPIC_DEFAULT = "redis";
private final String topic;

private static final String REDIS_URI = "redis.uri";
public static final String REDIS_URI = "redis.uri";
private static final String REDIS_URI_DOC = "Redis uri.";
private final String redisUri;

private static final String REDIS_CLUSTER_ENABLED = "redis.cluster.enabled";
public static final String REDIS_CLUSTER_ENABLED = "redis.cluster.enabled";
private static final String REDIS_CLUSTER_ENABLED_DOC = "Redis cluster mode enabled.";
private static final boolean REDIS_CLUSTER_ENABLED_DEFAULT = false;
private final boolean redisClusterEnabled;

private static final String REDIS_CHANNELS = "redis.channels";
public static final String REDIS_CHANNELS = "redis.channels";
private static final String REDIS_CHANNELS_DOC = "Redis channel(s) to subscribe to, comma-separated.";
private final List<String> redisChannels;

private static final String REDIS_CHANNELS_PATTERN_ENABLED = "redis.channels.pattern.enabled";
public static final String REDIS_CHANNELS_PATTERN_ENABLED = "redis.channels.pattern.enabled";
private static final String REDIS_CHANNELS_PATTERN_ENABLED_DOC = "Redis channel(s) utilize patterns.";
private final boolean redisChannelPatternEnabled;

public static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(
TOPIC,
Type.STRING,
TOPIC_DEFAULT,
Importance.HIGH,
TOPIC_DOC)
.define(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
package io.github.jaredpetersen.kafkaconnectredis.sink;

import io.github.jaredpetersen.kafkaconnectredis.sink.config.RedisSinkConfig;
import io.github.jaredpetersen.kafkaconnectredis.source.RedisSourceConnector;
import io.github.jaredpetersen.kafkaconnectredis.util.VersionUtil;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.connect.errors.ConnectException;
import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;

public class RedisSinkConnectorIT {
@Test
Expand Down Expand Up @@ -42,6 +45,17 @@ public void taskConfigsReturnsTaskConfigs() {
assertEquals(connectorConfig, taskConfigs.get(2));
}

@Test
public void startThrowsConnectExceptionForInvalidConfig() {
final RedisSourceConnector sourceConnector = new RedisSourceConnector();

final Map<String, String> connectorConfig = new HashMap<>();
connectorConfig.put("redis.uri", "redis://localhost:6379");

final ConnectException thrown = assertThrows(ConnectException.class, () -> sourceConnector.start(connectorConfig));
assertEquals("connector configuration error", thrown.getMessage());
}

@Test
public void stopDoesNothing() {
final RedisSinkConnector sinkConnector = new RedisSinkConnector();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
Expand All @@ -29,6 +30,7 @@
import reactor.test.StepVerifier;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;

@Testcontainers
public class RedisSinkTaskIT {
Expand Down Expand Up @@ -208,6 +210,17 @@ public void putEmptyRecordsDoesNothingToCluster() {
.verifyComplete();
}

@Test
public void startThrowsConnectExceptionForInvalidConfig() {
final RedisSinkTask sinkTask = new RedisSinkTask();

final Map<String, String> connectorConfig = new HashMap<>();
connectorConfig.put("redis.cluster.enabled", "false");

final ConnectException thrown = assertThrows(ConnectException.class, () -> sinkTask.start(connectorConfig));
assertEquals("task configuration error", thrown.getMessage());
}

@Test
public void stopClosesStandalone() {
// Set up task config
Expand Down
Loading

0 comments on commit 34c64f9

Please sign in to comment.