Skip to content

Commit

Permalink
feat: new config parameter for Redis ioThreadPoolSize
Browse files Browse the repository at this point in the history
  • Loading branch information
VonDerBeck committed Sep 28, 2023
1 parent d726f06 commit a705e29
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 1 deletion.
13 changes: 13 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,16 @@ final ZeebeRedis zeebeRedis = ZeebeRedis.newBuilder(redisClient)
...
```

*Additional recommendations*

When creating the Redis Client it is recommended to set an appropriate IO-ThreadPool-Size:

```java
var redisClient = RedisClient.create(
ClientResources.builder().ioThreadPoolSize(25).build(),
redisAddress);
```

## Install

### Docker
Expand Down Expand Up @@ -212,6 +222,9 @@ zeebe:
# Redis stream automatic cleanup of acknowledged messages. Default is false.
deleteAfterAcknowledge: false

# Redis Client IO-Thread-Pool-Size. Default is 25.
ioThreadPoolSize: 25

# record serialization format: [protobuf|json]
format: "protobuf"
```
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ public class ExporterConfiguration {

private boolean deleteAfterAcknowledge = false;

private int ioThreadPoolSize = 25;

public long getCleanupCycleInSeconds() {
return getEnv("CLEANUP_CYCLE_IN_SECONDS").map(Long::parseLong).orElse(cleanupCycleInSeconds);
}
Expand All @@ -42,6 +44,10 @@ public boolean isDeleteAfterAcknowledge() {
return getEnv("DELETE_AFTER_ACKNOWLEDGE").map(Boolean::parseBoolean).orElse(deleteAfterAcknowledge);
}

public int getIoThreadPoolSize() {
return getEnv("IO_THREAD_POOL_SIZE").map(Integer::parseInt).orElse(ioThreadPoolSize);
}

public String getFormat() {
return getEnv("FORMAT").orElse(format);
}
Expand Down Expand Up @@ -81,6 +87,7 @@ public String toString() {
", minTimeToLiveInSeconds=" + getMinTimeToLiveInSeconds() +
", maxTimeToLiveInSeconds=" + getMaxTimeToLiveInSeconds() +
", deleteAfterAcknowledge=" + isDeleteAfterAcknowledge() +
", ioThreadPoolSize=" + getIoThreadPoolSize() +
']';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import io.camunda.zeebe.protocol.record.Record;
import io.lettuce.core.*;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.resource.ClientResources;
import io.zeebe.exporter.proto.RecordTransformer;
import io.zeebe.exporter.proto.Schema;
import org.slf4j.Logger;
Expand Down Expand Up @@ -87,7 +88,9 @@ public void open(Controller controller) {
throw new IllegalStateException("Missing ZEEBE_REDIS_REMOTE_ADDRESS configuration.");
}

redisClient = RedisClient.create(config.getRemoteAddress().get());
redisClient = RedisClient.create(
ClientResources.builder().ioThreadPoolSize(config.getIoThreadPoolSize()).build(),
config.getRemoteAddress().get());
redisConnection = useProtoBuf ? redisClient.connect(new ProtobufCodec()) : redisClient.connect();

logger.info("Successfully connected Redis exporter to {}", config.getRemoteAddress().get());
Expand Down

0 comments on commit a705e29

Please sign in to comment.