Skip to content

Commit

Permalink
Fix Redis cluster topology change handling in source connector (#25)
Browse files Browse the repository at this point in the history
  • Loading branch information
jaredpetersen authored Jul 23, 2021
1 parent dfc486e commit 69baf71
Show file tree
Hide file tree
Showing 16 changed files with 276 additions and 47 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,14 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [1.2.2] - 2021-07-22
### Changed
- Use capitalization in log messages

### Fixed
- Fixed an issue with logging the number of records the source connector produced
- Fixed how the source connector handles Redis cluster topology changes in order to better support keyspace notifications

## [1.2.1] - 2021-07-17
### Changed
- Modified Confluent archive to follow new standards
Expand Down
2 changes: 2 additions & 0 deletions docs/connectors/SOURCE.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# Kafka Connect Redis - Source
Subscribes to Redis channels/patterns (including [keyspace notifications](https://redis.io/topics/notifications)) and writes the received messages to Kafka.

**WARNING** Delivery of keyspace notifications is not reliable for Redis clusters. Keyspace notifications are node-local and adding new upstream nodes to your Redis cluster may involve a short period where events on the new node are not picked up until the connector discovers the node and issues a `SUBSCRIBE` command to it. This is a limitation of keyspace notifications that the Redis organization would like to overcome in the future.

## Record Schema

### Key
Expand Down
57 changes: 56 additions & 1 deletion docs/demo/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ docker build -t jaredpetersen/redis:latest .

Next, we'll need to build a docker image for Kafka Connect Redis. Navigate to `demo/docker/kafka-connect-redis` and run the following commands:
```bash
curl -O https://repo1.maven.org/maven2/io/github/jaredpetersen/kafka-connect-redis/1.2.1/kafka-connect-redis-1.2.1.jar
curl -O https://repo1.maven.org/maven2/io/github/jaredpetersen/kafka-connect-redis/1.2.2/kafka-connect-redis-1.2.2.jar
docker build -t jaredpetersen/kafka-connect-redis:latest .
```

Expand All @@ -48,11 +48,66 @@ kubectl -n kcr-demo get pods

Be patient, this can take a few minutes.

### Redis Configuration
Run the following command to configure Redis to run in cluster mode instead of standalone mode:
```bash
kubectl -n kcr-demo run -it --rm redis-client --image redis:6 -- redis-cli --pass IEPfIr0eLF7UsfwrIlzy80yUaBG258j9 --cluster create $(kubectl -n kcr-demo get pods -l app=redis-cluster -o jsonpath='{range.items[*]}{.status.podIP}:6379 {end}') --cluster-yes
```

#### Add New Cluster Node (Optional)
You may find it useful to add a node to the Redis cluster later to simulate how the connector keeps up with topology changes.

To accomplish this, you need to update `kubernetes/redis/statefulset.yaml` to specify the new desired replica count and apply it with:
```bash
kubectl apply -k kubernetes
```

Next, you need to add the new node to the cluster configuration.

Find the IP address number of the new node:
```bash
kubectl -n kcr-demo get pod redis-cluster-### -o jsonpath='{.status.podIP}'
```

Find the IP address of one of the nodes already in the cluster:
```bash
kubectl -n kcr-demo get pods -l app=redis-cluster -o jsonpath='{.items[0].status.podIP}'
```

Create Redis client pod so that we can update the cluster configuration:
```bash
kubectl -n kcr-demo run -it --rm redis-client --image redis:6 -- /bin/bash
```

Save those two IP addresses -- and the Redis cluster password while we're at it -- as environment variables:
```bash
NEW_NODE=newnodeipaddress:6379
EXISTING_NODE=existingnodeipaddress:6379
PASSWORD=IEPfIr0eLF7UsfwrIlzy80yUaBG258j9
```

Add the node to the cluster using the IP address information you collected earlier:
```bash
redis-cli --pass $PASSWORD --cluster add-node $NEW_NODE $EXISTING_NODE
```

Connect to the cluster and confirm that there is now an additional entry in the cluster listing:
```bash
redis-cli -c -a $PASSWORD -u "redis://redis-cluster"
redis-cluster:6379> CLUSTER NODES
```

The new upstream node doesn't have any slots assigned to it. Without slots being assigned, it can't store any data. Let's fix that by rebalancing the cluster:
```bash
redis-cli --pass $PASSWORD --cluster rebalance $EXISTING_NODE --cluster-use-empty-masters
```

Then confirm that the new node has been assigned slots:
```bash
redis-cli -c -a $PASSWORD -u "redis://redis-cluster"
redis-cluster:6379> CLUSTER NODES
```

## Usage
[Source Connector](SOURCE.md)

Expand Down
2 changes: 1 addition & 1 deletion docs/demo/SINK.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ First, expose the Kafka Connect server:
kubectl -n kcr-demo port-forward service/kafka-connect :rest
```

Kubectl will choose an available port for you that you will need to use for the cURLs (`$PORT`).
Kubectl will choose an available port for you that you will need to use for the cURLs. Set this port to `$PORT`.

### Avro
```bash
Expand Down
60 changes: 29 additions & 31 deletions docs/demo/SOURCE.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ First, expose the Kafka Connect server:
kubectl -n kcr-demo port-forward service/kafka-connect :rest
```

Kubectl will choose an available port for you that you will need to use for the cURLs (`$PORT`).
Kubectl will choose an available port for you that you will need to use for the cURLs. Set this port to `$PORT`.

### Avro
```bash
Expand Down Expand Up @@ -53,32 +53,7 @@ curl --request POST \
}'
```

## Create Redis Events
Create Redis client pod:
```bash
kubectl -n kcr-demo run -it --rm redis-client --image redis:6 -- /bin/bash
```

Use redis-cli to connect to the cluster:
```bash
redis-cli -c -u 'redis://IEPfIr0eLF7UsfwrIlzy80yUaBG258j9@redis-cluster'
```

Run commands to create some different events:
```bash
SET {user.1}.username jetpackmelon22 EX 2
SET {user.2}.username anchorgoat74 EX 2
SADD {user.1}.interests reading
EXPIRE {user.1}.interests 2
SADD {user.2}.interests sailing woodworking programming
EXPIRE {user.2}.interests 2
GET {user.1}.username
GET {user.2}.username
SMEMBERS {user.1}.interests
SMEMBERS {user.2}.interests
```

## Validate
## Set up Kafka Topic Listener
### Avro
Create an interactive ephemeral query pod:
```bash
Expand All @@ -92,8 +67,7 @@ kafka-avro-console-consumer \
--property schema.registry.url='http://kafka-schema-registry:8081' \
--property print.key=true \
--property key.separator='|' \
--topic redis.events \
--from-beginning
--topic redis.events
```

### Connect JSON
Expand All @@ -108,6 +82,30 @@ kafka-console-consumer \
--bootstrap-server kafka-broker-0.kafka-broker:9092 \
--property print.key=true \
--property key.separator='|' \
--topic redis.events \
--from-beginning
--topic redis.events
```

## Create Redis Events
Create Redis client pod:
```bash
kubectl -n kcr-demo run -it --rm redis-client --image redis:6 -- /bin/bash
```

Use redis-cli to connect to the cluster:
```bash
redis-cli -c -u 'redis://IEPfIr0eLF7UsfwrIlzy80yUaBG258j9@redis-cluster'
```

Run commands to create some different events:
```bash
SET {user.1}.username jetpackmelon22 EX 2
SET {user.2}.username anchorgoat74 EX 2
SADD {user.1}.interests reading
EXPIRE {user.1}.interests 2
SADD {user.2}.interests sailing woodworking programming
EXPIRE {user.2}.interests 2
GET {user.1}.username
GET {user.2}.username
SMEMBERS {user.1}.interests
SMEMBERS {user.2}.interests
```
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

<groupId>io.github.jaredpetersen</groupId>
<artifactId>kafka-connect-redis</artifactId>
<version>1.2.1</version>
<version>1.2.2</version>
<packaging>jar</packaging>

<name>Kafka Redis Connector (Sink and Source)</name>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ public void put(final Collection<SinkRecord> records) {
return;
}

LOG.info("writing {} record(s) to redis", records.size());
LOG.debug("records: {}", records);
LOG.info("Writing {} record(s) to redis", records.size());
LOG.debug("Records: {}", records);

for (SinkRecord record : records) {
put(record);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public class RecordConverter {
* @return Redis command.
*/
public RedisCommand convert(SinkRecord sinkRecord) {
LOG.debug("converting record {}", sinkRecord);
LOG.debug("Converting record {}", sinkRecord);

final Struct recordValue = (Struct) sinkRecord.value();
final String recordValueSchemaName = recordValue.schema().name();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public Writer(RedisClusterCommands<String, String> redisClusterCommands) {
* @param redisCommand Command to apply
*/
public void write(RedisCommand redisCommand) {
LOG.debug("writing {}", redisCommand);
LOG.debug("Writing {}", redisCommand);

switch (redisCommand.getCommand()) {
case SET:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,8 @@ public List<SourceRecord> poll() {
}
}

if (sourceRecords.size() > 1) {
LOG.info("writing {} record(s) to kafka", sourceRecords.size());
if (sourceRecords.size() >= 1) {
LOG.info("Writing {} record(s) to kafka", sourceRecords.size());
}

return sourceRecords;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
package io.github.jaredpetersen.kafkaconnectredis.source.listener.subscriber;

import io.lettuce.core.cluster.event.ClusterTopologyChangedEvent;
import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import lombok.extern.slf4j.Slf4j;

/**
* Redis cluster-aware pub/sub subscriber that listens to channels and caches the retrieved messages for later
* retrieval.
*/
@Slf4j
public class RedisClusterChannelSubscriber extends RedisSubscriber {
/**
* Create a cluster-aware subscriber that listens to channels.
Expand All @@ -21,6 +24,39 @@ public RedisClusterChannelSubscriber(
) {
super(new ConcurrentLinkedQueue<>());
redisClusterPubSubConnection.addListener(new RedisClusterListener(this.messageQueue));
redisClusterPubSubConnection.sync().upstream().commands().subscribe(channels.toArray(new String[0]));
subscribeChannels(redisClusterPubSubConnection, channels);
}

/**
* Subscribe to the provided channels. Re-issue subscriptions asynchronously when the cluster topology changes.
*
* @param redisClusterPubSubConnection Cluster pub/sub connection used to facilitate the subscription
* @param channels Channels to subscribe and listen to
*/
private void subscribeChannels(
StatefulRedisClusterPubSubConnection<String, String> redisClusterPubSubConnection,
List<String> channels
) {
final String[] channelArray = channels.toArray(new String[0]);

// Perform an initial subscription
redisClusterPubSubConnection.sync()
.upstream()
.commands()
.subscribe(channelArray);

// Set up a listener to the Lettuce event bus so that we can issue subscriptions to nodes
redisClusterPubSubConnection.getResources().eventBus().get()
.filter(event -> event instanceof ClusterTopologyChangedEvent)
.doOnNext(event -> {
// Lettuce does its best to determine when the topology changed but there's always a possibility that
LOG.info("Redis cluster topology changed, issuing new subscriptions");

redisClusterPubSubConnection.sync()
.upstream()
.commands()
.subscribe(channelArray);
})
.subscribe();
}
}
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
package io.github.jaredpetersen.kafkaconnectredis.source.listener.subscriber;

import io.lettuce.core.cluster.event.ClusterTopologyChangedEvent;
import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import lombok.extern.slf4j.Slf4j;

/**
* Redis cluster-aware pub/sub subscriber that listens to patterns and caches the retrieved messages for later
* retrieval.
*/
@Slf4j
public class RedisClusterPatternSubscriber extends RedisSubscriber {
/**
* Create a cluster-aware subscriber that listens to patterns.
Expand All @@ -21,6 +24,39 @@ public RedisClusterPatternSubscriber(
) {
super(new ConcurrentLinkedQueue<>());
redisClusterPubSubConnection.addListener(new RedisClusterListener(this.messageQueue));
redisClusterPubSubConnection.sync().upstream().commands().psubscribe(patterns.toArray(new String[0]));
subscribePatterns(redisClusterPubSubConnection, patterns);
}

/**
* Subscribe to the provided channels. Re-issue subscriptions asynchronously when the cluster topology changes.
*
* @param redisClusterPubSubConnection Cluster pub/sub connection used to facilitate the subscription
* @param patterns Patterns to subscribe and listen to
*/
private void subscribePatterns(
StatefulRedisClusterPubSubConnection<String, String> redisClusterPubSubConnection,
List<String> patterns
) {
final String[] patternArray = patterns.toArray(new String[0]);

// Perform an initial subscription
redisClusterPubSubConnection.sync()
.upstream()
.commands()
.psubscribe(patternArray);

// Set up a listener to the Lettuce event bus so that we can issue subscriptions to nodes
redisClusterPubSubConnection.getResources().eventBus().get()
.filter(event -> event instanceof ClusterTopologyChangedEvent)
.doOnNext(event -> {
// Lettuce does its best to determine when the topology changed but there's always a possibility that
LOG.info("Redis cluster topology changed, issuing new subscriptions");

redisClusterPubSubConnection.sync()
.upstream()
.commands()
.psubscribe(patternArray);
})
.subscribe();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,18 @@ public void message(String pattern, String channel, String message) {
}

public void subscribed(String channel) {
LOG.info("subscribed to channel {}", channel);
LOG.info("Subscribed to channel {}", channel);
}

public void psubscribed(String pattern) {
LOG.info("psubscribed to pattern {}", pattern);
LOG.info("Subscribed to pattern {}", pattern);
}

public void unsubscribed(String channel) {
LOG.info("unsubscribed from channel {}", channel);
LOG.info("Unsubscribed from channel {}", channel);
}

public void punsubscribed(String pattern) {
LOG.info("unsubscribed from pattern {}", pattern);
LOG.info("Unsubscribed from pattern {}", pattern);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public class VersionUtil {
PROPERTIES.load(VersionUtil.class.getClassLoader().getResourceAsStream("kafka-connect-redis.properties"));
}
catch (IOException exception) {
LOG.error("failed to load properties", exception);
LOG.error("Failed to load properties", exception);
}
}

Expand Down
Loading

0 comments on commit 69baf71

Please sign in to comment.