diff --git a/README.md b/README.md index ef9e161..c4a902d 100644 --- a/README.md +++ b/README.md @@ -117,6 +117,16 @@ final ZeebeRedis zeebeRedis = ZeebeRedis.newBuilder(redisClient) ... ``` +*Additional recommendations* + +When creating the Redis Client it is recommended to set an appropriate IO-ThreadPool-Size: + +```java +var redisClient = RedisClient.create( + ClientResources.builder().ioThreadPoolSize(25).build(), + redisAddress); +``` + ## Install ### Docker @@ -212,6 +222,9 @@ zeebe: # Redis stream automatic cleanup of acknowledged messages. Default is false. deleteAfterAcknowledge: false + # Redis Client IO-Thread-Pool-Size. Default is 25. + ioThreadPoolSize: 25 + # record serialization format: [protobuf|json] format: "protobuf" ``` diff --git a/exporter/src/main/java/io/zeebe/redis/exporter/ExporterConfiguration.java b/exporter/src/main/java/io/zeebe/redis/exporter/ExporterConfiguration.java index 2421546..a40216c 100644 --- a/exporter/src/main/java/io/zeebe/redis/exporter/ExporterConfiguration.java +++ b/exporter/src/main/java/io/zeebe/redis/exporter/ExporterConfiguration.java @@ -26,6 +26,8 @@ public class ExporterConfiguration { private boolean deleteAfterAcknowledge = false; + private int ioThreadPoolSize = 25; + public long getCleanupCycleInSeconds() { return getEnv("CLEANUP_CYCLE_IN_SECONDS").map(Long::parseLong).orElse(cleanupCycleInSeconds); } @@ -42,6 +44,10 @@ public boolean isDeleteAfterAcknowledge() { return getEnv("DELETE_AFTER_ACKNOWLEDGE").map(Boolean::parseBoolean).orElse(deleteAfterAcknowledge); } + public int getIoThreadPoolSize() { + return getEnv("IO_THREAD_POOL_SIZE").map(Integer::parseInt).orElse(ioThreadPoolSize); + } + public String getFormat() { return getEnv("FORMAT").orElse(format); } @@ -81,6 +87,7 @@ public String toString() { ", minTimeToLiveInSeconds=" + getMinTimeToLiveInSeconds() + ", maxTimeToLiveInSeconds=" + getMaxTimeToLiveInSeconds() + ", deleteAfterAcknowledge=" + isDeleteAfterAcknowledge() + + ", ioThreadPoolSize=" + getIoThreadPoolSize() + ']'; } } diff --git a/exporter/src/main/java/io/zeebe/redis/exporter/RedisExporter.java b/exporter/src/main/java/io/zeebe/redis/exporter/RedisExporter.java index 18c1c44..bd24372 100644 --- a/exporter/src/main/java/io/zeebe/redis/exporter/RedisExporter.java +++ b/exporter/src/main/java/io/zeebe/redis/exporter/RedisExporter.java @@ -6,6 +6,7 @@ import io.camunda.zeebe.protocol.record.Record; import io.lettuce.core.*; import io.lettuce.core.api.StatefulRedisConnection; +import io.lettuce.core.resource.ClientResources; import io.zeebe.exporter.proto.RecordTransformer; import io.zeebe.exporter.proto.Schema; import org.slf4j.Logger; @@ -87,7 +88,9 @@ public void open(Controller controller) { throw new IllegalStateException("Missing ZEEBE_REDIS_REMOTE_ADDRESS configuration."); } - redisClient = RedisClient.create(config.getRemoteAddress().get()); + redisClient = RedisClient.create( + ClientResources.builder().ioThreadPoolSize(config.getIoThreadPoolSize()).build(), + config.getRemoteAddress().get()); redisConnection = useProtoBuf ? redisClient.connect(new ProtobufCodec()) : redisClient.connect(); logger.info("Successfully connected Redis exporter to {}", config.getRemoteAddress().get());