Skip to content

Commit

Permalink
Support sinking arbitrary Redis commands (#16)
Browse files Browse the repository at this point in the history
  • Loading branch information
jaredpetersen authored Nov 29, 2020
1 parent 921e36c commit 4bae458
Show file tree
Hide file tree
Showing 17 changed files with 295 additions and 18 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,13 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [1.0.4] - 2020-11-29
### Added
- Added support for sinking arbitrary Redis commands, primarily for use with Redis modules

### Fixed
- Fixed linter configuration

## [1.0.3] - 2020-11-21
### Added
- Added support for Redis EXPIRE commands
Expand Down
1 change: 1 addition & 0 deletions checkstyle.xml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
<property name="allowNonPrintableEscapes" value="true"/>
</module>
<module name="AvoidStarImport"/>
<module name="UnusedImports"/>
<module name="OneTopLevelClass"/>
<module name="NoLineWrap">
<property name="tokens" value="PACKAGE_DEF, IMPORT, STATIC_IMPORT"/>
Expand Down
47 changes: 47 additions & 0 deletions docs/connectors/SINK.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ The following commands are supported at this time:
- [PEXPIRE](https://redis.io/commands/pexpire)
- [SADD](https://redis.io/commands/sadd)
- [GEOADD](https://redis.io/commands/geoadd)
- Arbitrary -- useful for Redis modules

Support for additional write-based commands will be added in the future.

Expand Down Expand Up @@ -376,6 +377,52 @@ Keys are ignored.
}
```

#### Arbitrary
##### Avro
```json
{
"namespace": "io.github.jaredpetersen.kafkaconnectredis",
"name": "RedisArbitraryCommand",
"type": "record",
"fields": [
{
"name": "command",
"type": "string"
},
{
"name": "arguments",
"type": {
"type": "array",
"items": "string"
}
}
]
}
```

##### Connect JSON
```json
{
"name": "io.github.jaredpetersen.kafkaconnectredis.RedisArbitraryCommand",
"type": "struct",
"fields": [
{
"field": "command",
"type": "string",
"optional": false
},
{
"field": "arguments",
"type": "array",
"items": {
"type": "string"
},
"optional": false
}
]
}
```

## Configuration
### Connector Properties
| Name | Type | Default | Importance | Description |
Expand Down
17 changes: 14 additions & 3 deletions docs/demo/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,23 @@ minikube start --cpus 2 --memory 10g
```

### Docker
Now that we have Kubernetes set up locally, we'll need a Docker image that contains Kafka Connect Redis.
Now that we have Kubernetes set up locally, we'll need some Docker images.

Navigate to `demo/docker/` in this repository and run the following commands **in a separate terminal** to download the plugin and build the image for minikube:
Open a new terminal we can use to build images for minikube. Run the following command to connect the terminal to minikube:
```bash
curl -O https://oss.sonatype.org/service/local/repositories/releases/content/io/github/jaredpetersen/kafka-connect-redis/1.0.3/kafka-connect-redis-1.0.3.jar
eval $(minikube docker-env)
```

We'll use this terminal for the rest of this section.

Let's start by building Redis. Navigate to `demo/docker/redis` and run the following commands:
```bash
docker build -t jaredpetersen/redis:latest .
```

Next, we'll need to build a docker image for Kafka Connect Redis. Navigate to `demo/docker/kafka-connect-redis` and run the following commands:
```bash
curl -O https://oss.sonatype.org/service/local/repositories/releases/content/io/github/jaredpetersen/kafka-connect-redis/1.0.4/kafka-connect-redis-1.0.4.jar
docker build -t jaredpetersen/kafka-connect-redis:latest .
```

Expand Down
17 changes: 16 additions & 1 deletion docs/demo/SINK.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ curl --request POST \
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://kafka-schema-registry:8081",
"tasks.max": "1",
"topics": "redis.commands.set,redis.commands.expire,redis.commands.expireat,redis.commands.pexpire,redis.commands.sadd,redis.commands.geoadd",
"topics": "redis.commands.set,redis.commands.expire,redis.commands.expireat,redis.commands.pexpire,redis.commands.sadd,redis.commands.geoadd,redis.commands.arbitrary",
"redis.uri": "redis://IEPfIr0eLF7UsfwrIlzy80yUaBG258j9@redis-cluster",
"redis.cluster.enabled": true
}
Expand Down Expand Up @@ -112,6 +112,17 @@ kafka-avro-console-producer \
>{"key":"Sicily","values":[{"longitude":13.361389,"latitude":13.361389,"member":"Palermo"},{"longitude":15.087269,"latitude":37.502669,"member":"Catania"}]}
```

```bash
kafka-avro-console-producer \
--broker-list kafka-broker-0.kafka-broker:9092 \
--property schema.registry.url='http://kafka-schema-registry:8081' \
--property value.schema='{"namespace":"io.github.jaredpetersen.kafkaconnectredis","name":"RedisArbitraryCommand","type":"record","fields":[{"name":"command","type":"string"},{"name":"arguments","type":{"type":"array","items":"string"}}]}' \
--topic redis.commands.arbitrary
>{"command":"TS.CREATE","arguments":["temperature:3:11", "RETENTION", "60", "LABELS", "sensor_id", "2", "area_id", "32"]}
>{"command":"TS.ADD","arguments":["temperature:3:11", "1548149181", "30"]}
>{"command":"TS.ADD","arguments":["temperature:3:11", "1548149191", "42"]}
```

### Connect JSON
Create an interactive ephemeral query pod:
```bash
Expand All @@ -134,6 +145,9 @@ kafka-console-producer \
>{"payload":{"key":"{user.1}.interests","values":["reading"]},"schema":{"name":"io.github.jaredpetersen.kafkaconnectredis.RedisSaddCommand","type":"struct","fields":[{"field":"key","type":"string","optional":false},{"field":"values","type":"array","items":{"type":"string"},"optional":false}]}}
>{"payload":{"key":"{user.2}.interests","values":["sailing","woodworking","programming"]},"schema":{"name":"io.github.jaredpetersen.kafkaconnectredis.RedisSaddCommand","type":"struct","fields":[{"field":"key","type":"string","optional":false},{"field":"values","type":"array","items":{"type":"string"},"optional":false}]}}
>{"payload":{"key":"Sicily","values":[{"longitude":13.361389,"latitude":13.361389,"member":"Palermo"},{"longitude":15.087269,"latitude":37.502669,"member":"Catania"}]},"schema":{"name":"io.github.jaredpetersen.kafkaconnectredis.RedisGeoaddCommand","type":"struct","fields":[{"field":"key","type":"string","optional":false},{"field":"values","type":"array","items":{"type":"struct","fields":[{"field":"longitude","type":"double","optional":false},{"field":"latitude","type":"double","optional":false},{"field":"member","type":"string","optional":false}]},"optional":false}]}}
>{"payload":{"command":"TS.CREATE","arguments":["temperature:3:11", "RETENTION", "60", "LABELS", "sensor_id", "2", "area_id", "32"]},"schema":{"name":"io.github.jaredpetersen.kafkaconnectredis.RedisArbitraryCommand","type":"struct","fields":[{"field":"command","type":"string","optional":false},{"field":"arguments","type":"array","items":{"type":"string"},"optional":false}]}}
>{"payload":{"command":"TS.ADD","arguments":["temperature:3:11", "1548149181", "30"]},"schema":{"name":"io.github.jaredpetersen.kafkaconnectredis.RedisArbitraryCommand","type":"struct","fields":[{"field":"command","type":"string","optional":false},{"field":"arguments","type":"array","items":{"type":"string"},"optional":false}]}}
>{"payload":{"command":"TS.ADD","arguments":["temperature:3:11", "1548149191", "42"]},"schema":{"name":"io.github.jaredpetersen.kafkaconnectredis.RedisArbitraryCommand","type":"struct","fields":[{"field":"command","type":"string","optional":false},{"field":"arguments","type":"array","items":{"type":"string"},"optional":false}]}}
```

## Validate
Expand All @@ -160,4 +174,5 @@ PTTL product.waffles
SMEMBERS {user.1}.interests
SMEMBERS {user.2}.interests
GEOPOS Sicily Catania
TS.RANGE temperature:3:11 1548149180 1548149210 AGGREGATION avg 5
```
File renamed without changes.
9 changes: 9 additions & 0 deletions docs/demo/docker/redis/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
FROM redislabs/redistimeseries:1.4.6 as redistimeseries
FROM redis:6

ENV LIBRARY_PATH /usr/lib/redis/modules

COPY --from=redistimeseries ${LIBRARY_PATH}/redistimeseries.so ${LIBRARY_PATH}/redistimeseries.so

ENTRYPOINT ["redis-server"]
CMD ["/usr/local/etc/redis/redis.conf", "--loadmodule", "/usr/lib/redis/modules/redistimeseries.so"]
5 changes: 2 additions & 3 deletions docs/demo/kubernetes/redis/statefulset.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,8 @@ spec:
spec:
containers:
- name: redis
image: redis:6
args:
- /usr/local/etc/redis/redis.conf
image: jaredpetersen/redis:latest
imagePullPolicy: Never
ports:
- containerPort: 6379
name: client
Expand Down
14 changes: 4 additions & 10 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

<groupId>io.github.jaredpetersen</groupId>
<artifactId>kafka-connect-redis</artifactId>
<version>1.0.3</version>
<version>1.0.4</version>
<packaging>jar</packaging>

<name>kafka-connect-redis</name>
Expand Down Expand Up @@ -69,7 +69,7 @@
<dependency>
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
<version>6.0.0.RELEASE</version>
<version>6.0.1.RELEASE</version>
</dependency>

<dependency>
Expand Down Expand Up @@ -164,7 +164,8 @@
<configuration>
<configLocation>checkstyle.xml</configLocation>
<consoleOutput>true</consoleOutput>
<failsOnError>true</failsOnError>
<failOnViolation>true</failOnViolation>
<violationSeverity>warning</violationSeverity>
<includeTestSourceDirectory>true</includeTestSourceDirectory>
<suppressionsLocation>checkstyle-test-suppressions.xml</suppressionsLocation>
</configuration>
Expand All @@ -179,13 +180,6 @@
<execution>
<id>validate</id>
<phase>validate</phase>
<configuration>
<configLocation>checkstyle.xml</configLocation>
<consoleOutput>true</consoleOutput>
<failsOnError>true</failsOnError>
<violationSeverity>warning</violationSeverity>
<includeTestSourceDirectory>true</includeTestSourceDirectory>
</configuration>
<goals>
<goal>check</goal>
</goals>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package io.github.jaredpetersen.kafkaconnectredis.sink.writer;

import io.lettuce.core.codec.RedisCodec;
import io.lettuce.core.output.CommandOutput;
import java.nio.ByteBuffer;

/**
* Void output of a Redis command used to "fire and forget".
* <p />
* Temporary stopgap until https://github.com/lettuce-io/lettuce-core/issues/1529 is released.
*
* @param <K> Key type.
* @param <V> Value type.
*/
class LettuceVoidOutput<K, V> extends CommandOutput<K, V, Void> {
public LettuceVoidOutput(RedisCodec<K, V> codec) {
super(codec, null);
}

@Override
public void set(ByteBuffer bytes) {
}

@Override
public void set(long integer) {
}

@Override
public void set(double number) {
}

@Override
public void set(boolean value) {
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.github.jaredpetersen.kafkaconnectredis.sink.writer;

import io.github.jaredpetersen.kafkaconnectredis.sink.writer.record.RedisArbitraryCommand;
import io.github.jaredpetersen.kafkaconnectredis.sink.writer.record.RedisCommand;
import io.github.jaredpetersen.kafkaconnectredis.sink.writer.record.RedisExpireCommand;
import io.github.jaredpetersen.kafkaconnectredis.sink.writer.record.RedisExpireatCommand;
Expand Down Expand Up @@ -51,6 +52,9 @@ public Mono<RedisCommand> convert(SinkRecord sinkRecord) {
case "io.github.jaredpetersen.kafkaconnectredis.RedisGeoaddCommand":
redisCommandMono = convertGeoadd(recordValue);
break;
case "io.github.jaredpetersen.kafkaconnectredis.RedisArbitraryCommand":
redisCommandMono = convertArbitrary(recordValue);
break;
default:
redisCommandMono = Mono.error(new ConnectException("unsupported command schema " + recordValueSchemaName));
}
Expand Down Expand Up @@ -162,4 +166,17 @@ private Mono<RedisCommand> convertGeoadd(Struct value) {
.build();
}));
}

private Mono<RedisCommand> convertArbitrary(Struct value) {
return Mono.fromCallable(() -> {
final RedisArbitraryCommand.Payload payload = RedisArbitraryCommand.Payload.builder()
.command(value.getString("command"))
.arguments(value.getArray("arguments"))
.build();

return RedisArbitraryCommand.builder()
.payload(payload)
.build();
});
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.github.jaredpetersen.kafkaconnectredis.sink.writer;

import io.github.jaredpetersen.kafkaconnectredis.sink.writer.record.RedisArbitraryCommand;
import io.github.jaredpetersen.kafkaconnectredis.sink.writer.record.RedisCommand;
import io.github.jaredpetersen.kafkaconnectredis.sink.writer.record.RedisExpireCommand;
import io.github.jaredpetersen.kafkaconnectredis.sink.writer.record.RedisExpireatCommand;
Expand All @@ -10,6 +11,11 @@
import io.lettuce.core.SetArgs;
import io.lettuce.core.api.reactive.RedisReactiveCommands;
import io.lettuce.core.cluster.api.reactive.RedisClusterReactiveCommands;
import io.lettuce.core.codec.StringCodec;
import io.lettuce.core.output.CommandOutput;
import io.lettuce.core.protocol.CommandArgs;
import io.lettuce.core.protocol.ProtocolKeyword;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import org.apache.kafka.connect.errors.ConnectException;
import org.slf4j.Logger;
Expand Down Expand Up @@ -76,6 +82,9 @@ public Mono<Void> write(RedisCommand redisCommand) {
case GEOADD:
response = geoadd((RedisGeoaddCommand) redisCommand);
break;
case ARBITRARY:
response = arbitrary((RedisArbitraryCommand) redisCommand);
break;
default:
response = Mono.error(new ConnectException("redis command " + redisCommand + " is not supported"));
}
Expand Down Expand Up @@ -182,4 +191,28 @@ private Mono<Void> geoadd(RedisGeoaddCommand geoaddCommand) {

return geoaddResult.then();
}

private Mono<Void> arbitrary(RedisArbitraryCommand arbitraryCommand) {
// Set up arbitrary command
final ProtocolKeyword protocolKeyword = new ProtocolKeyword() {
public final byte[] bytes = name().getBytes(StandardCharsets.US_ASCII);

@Override
public byte[] getBytes() {
return this.bytes;
}

@Override
public String name() {
return arbitraryCommand.getPayload().getCommand();
}
};
final CommandOutput<String, String, Void> commandOutput = new LettuceVoidOutput<>(StringCodec.UTF8);
final CommandArgs<String, String> commandArgs = new CommandArgs<>(StringCodec.UTF8)
.addValues(arbitraryCommand.getPayload().getArguments());

return (this.clusterEnabled)
? this.redisClusterCommands.dispatch(protocolKeyword, commandOutput, commandArgs).then()
: this.redisStandaloneCommands.dispatch(protocolKeyword, commandOutput, commandArgs).then();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package io.github.jaredpetersen.kafkaconnectredis.sink.writer.record;

import java.util.List;
import lombok.Builder;
import lombok.Value;

@Value
@Builder(builderClassName = "Builder")
public class RedisArbitraryCommand implements RedisCommand {
Command command = Command.ARBITRARY;
Payload payload;

@Value
@lombok.Builder(builderClassName = "Builder")
public static class Payload {
String command;
List<String> arguments;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

public interface RedisCommand {
enum Command {
ARBITRARY,
SET,
EXPIRE,
EXPIREAT,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package io.github.jaredpetersen.kafkaconnectredis.source.config;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.kafka.common.config.AbstractConfig;
Expand Down
Loading

0 comments on commit 4bae458

Please sign in to comment.