Skip to content

Commit

Permalink
feat: auto reconnect in case Redis is not available at startup
Browse files Browse the repository at this point in the history
  • Loading branch information
VonDerBeck committed Nov 24, 2023
1 parent 009e294 commit 63d6b36
Show file tree
Hide file tree
Showing 7 changed files with 200 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ public RedisCleaner(StatefulRedisConnection<String, ?> redisConnection, boolean
trimScheduleDelay = Duration.ofSeconds(config.getCleanupCycleInSeconds());
}

public void setRedisConnection(StatefulRedisConnection<String, ?> redisConnection) {
this.redisConnection = redisConnection;
}

public void considerStream(String stream) {
streams.put(stream, Boolean.TRUE);
}
Expand Down
65 changes: 56 additions & 9 deletions exporter/src/main/java/io/zeebe/redis/exporter/RedisExporter.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import org.slf4j.Logger;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
Expand All @@ -38,12 +40,19 @@ public class RedisExporter implements Exporter {

private RedisSender redisSender;

private Controller controller;

// The ExecutorService allows to schedule a regular task independent of the actual load
// which controller.scheduleCancellableTask sadly didn't do.
private ScheduledExecutorService senderThread = Executors.newSingleThreadScheduledExecutor();

private ScheduledExecutorService cleanerThread = Executors.newSingleThreadScheduledExecutor();

// Startup handling in case of Redis connection failure
private ScheduledExecutorService startupThread;
private boolean fullyLoggedStartupException = false;
private List<Integer> reconnectIntervals = new ArrayList<>(List.of(2,3,3,4,4,4,5));

@Override
public void configure(Context context) {
logger = context.getLogger();
Expand Down Expand Up @@ -80,21 +89,59 @@ public void open(Controller controller) {
if (config.getRemoteAddress().isEmpty()) {
throw new IllegalStateException("Missing ZEEBE_REDIS_REMOTE_ADDRESS configuration.");
}
this.controller = controller;

redisClient = RedisClient.create(
ClientResources.builder().ioThreadPoolSize(config.getIoThreadPoolSize()).build(),
config.getRemoteAddress().get());
senderConnection = useProtoBuf ? redisClient.connect(new ProtobufCodec()) : redisClient.connect();
cleanupConnection = useProtoBuf ? redisClient.connect(new ProtobufCodec()) : redisClient.connect();
logger.info("Successfully connected Redis exporter to {}", config.getRemoteAddress().get());
connectToRedis();
}

redisSender = new RedisSender(config, controller, senderConnection, logger);
senderThread.schedule(this::sendBatches, config.getBatchCycleMillis(), TimeUnit.MILLISECONDS);
private void connectToRedis() {
boolean failure = false;
// try to connect
try {
senderConnection = useProtoBuf ? redisClient.connect(new ProtobufCodec()) : redisClient.connect();
cleanupConnection = useProtoBuf ? redisClient.connect(new ProtobufCodec()) : redisClient.connect();
logger.info("Successfully connected Redis exporter to {}", config.getRemoteAddress().get());
} catch (RedisConnectionException ex) {
if (!fullyLoggedStartupException) {
logger.error("Failure connecting Redis exporter to " + config.getRemoteAddress().get(), ex);
fullyLoggedStartupException = true;
} else {
logger.warn("Failure connecting Redis exporter to {}: {}", config.getRemoteAddress().get(), ex.getMessage());
}
failure = true;
}

// upon successful connection initialize the sender
if (redisSender == null && senderConnection != null) {
redisSender = new RedisSender(config, controller, senderConnection, logger);
senderThread.schedule(this::sendBatches, config.getBatchCycleMillis(), TimeUnit.MILLISECONDS);
}

// always initialize the cleaner
if (redisCleaner == null) {
redisCleaner = new RedisCleaner(cleanupConnection, useProtoBuf, config, logger);
if (config.getCleanupCycleInSeconds() > 0 &&
(config.isDeleteAfterAcknowledge() || config.getMaxTimeToLiveInSeconds() > 0)) {
cleanerThread.schedule(this::trimStreamValues, config.getCleanupCycleInSeconds(), TimeUnit.SECONDS);
}
// upon late successful connection propagate it to cleaner
} else if (cleanupConnection != null) {
redisCleaner.setRedisConnection(cleanupConnection);
}

redisCleaner = new RedisCleaner(cleanupConnection, useProtoBuf, config, logger);
if (config.getCleanupCycleInSeconds() > 0 &&
(config.isDeleteAfterAcknowledge() || config.getMaxTimeToLiveInSeconds() > 0)) {
cleanerThread.schedule(this::trimStreamValues, config.getCleanupCycleInSeconds(), TimeUnit.SECONDS);
// if initial connection has failed, try again later
if (failure) {
if (startupThread == null) {
startupThread = Executors.newSingleThreadScheduledExecutor();
}
int delay = reconnectIntervals.size() > 1 ? reconnectIntervals.remove(0) : reconnectIntervals.get(0);
startupThread.schedule(this::connectToRedis, delay, TimeUnit.SECONDS);
} else if (startupThread != null ) {
startupThread.shutdown();
startupThread = null;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public class ExporterMaxTimeToLiveTest {

@Container
public ZeebeTestContainer zeebeContainer = ZeebeTestContainer
.withCleanupCycleInSeconds(2).withMaxTTLInSeconds(6);
.withCleanupCycleInSeconds(2).andUseMaxTTLInSeconds(6);

private RedisClient redisClient;
private StatefulRedisConnection<String, byte[]> redisConnection;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class ExporterMinTimeToLiveTest {

@Container
public ZeebeTestContainer zeebeContainer = ZeebeTestContainer
.withCleanupCycleInSeconds(3).doDeleteAfterAcknowledge(true).withMinTTLInSeconds(10);
.withCleanupCycleInSeconds(3).doDeleteAfterAcknowledge(true).andUseMinTTLInSeconds(10);

private RedisClient redisClient;
private StatefulRedisConnection<String, byte[]> redisConnection;
Expand Down
118 changes: 118 additions & 0 deletions exporter/src/test/java/io/zeebe/redis/RedisLateStartupTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package io.zeebe.redis;

import io.camunda.zeebe.model.bpmn.Bpmn;
import io.camunda.zeebe.model.bpmn.BpmnModelInstance;
import io.lettuce.core.*;
import io.lettuce.core.api.StatefulRedisConnection;
import io.zeebe.redis.testcontainers.OnFailureExtension;
import io.zeebe.redis.testcontainers.ZeebeTestContainer;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.testcontainers.containers.output.WaitingConsumer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.shaded.org.awaitility.Awaitility;

import java.time.Duration;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import static org.assertj.core.api.Assertions.assertThat;
import static org.testcontainers.shaded.org.awaitility.Awaitility.await;

@Testcontainers
@ExtendWith(OnFailureExtension.class)
public class RedisLateStartupTest {

private static final BpmnModelInstance WORKFLOW =
Bpmn.createExecutableProcess("process")
.startEvent("start")
.sequenceFlowId("to-task")
.serviceTask("task", s -> s.zeebeJobType("test"))
.sequenceFlowId("to-end")
.endEvent("end")
.done();
@Container
public ZeebeTestContainer zeebeContainer = ZeebeTestContainer.withJsonFormat()
.andUseCleanupCycleInSeconds(2).doDeleteAfterAcknowledge(true);

private RedisClient redisClient;
private StatefulRedisConnection<String, String> redisConnection;

@BeforeEach
public void init() {
redisClient = RedisClient.create(zeebeContainer.getRedisAddress());
redisConnection = redisClient.connect();
redisConnection.sync().xtrim("zeebe:DEPLOYMENT", 0);
}

@AfterEach
public void cleanUp() {
redisConnection.sync().xtrim("zeebe:DEPLOYMENT", 0);
redisConnection.close();
redisClient.shutdown();
}

@Test
public void worksCorrectIfRedisIsUnavailableAtStartup() throws Exception {
// given
redisConnection.close();
redisClient.shutdown();
zeebeContainer.stop();
zeebeContainer.restartWithoutRedis();
WaitingConsumer consumer = new WaitingConsumer();
zeebeContainer.followOutput(consumer);
consumer.waitUntil(frame ->
frame.getUtf8String().contains("Broker is ready"), 30, TimeUnit.SECONDS);
zeebeContainer.getClient().newDeployResourceCommand().addProcessModel(WORKFLOW, "process.bpmn").send().join();
zeebeContainer.getClient().newDeployResourceCommand().addProcessModel(WORKFLOW, "process2.bpmn").send().join();
consumer.waitUntil(frame ->
frame.getUtf8String().contains("Failure connecting Redis exporter"), 20, TimeUnit.SECONDS);

// when
zeebeContainer.getRedisContainer().start();
redisClient = RedisClient.create(zeebeContainer.getRedisAddress());
redisConnection = redisClient.connect();
consumer.waitUntil(frame ->
frame.getUtf8String().contains("Successfully connected Redis exporter"), 20, TimeUnit.SECONDS);

Thread.sleep(1000);
zeebeContainer.getClient().newDeployResourceCommand().addProcessModel(WORKFLOW, "process3.bpmn").send().join();

redisConnection.sync().xgroupCreate(XReadArgs.StreamOffset.from("zeebe:DEPLOYMENT", "0-0"),
"application_1", XGroupCreateArgs.Builder.mkstream());

// then
AtomicReference<Long> xlen = new AtomicReference<>();
Awaitility.await().pollInSameThread().pollInterval(Duration.ofSeconds(1))
.atMost(Duration.ofSeconds(10)).untilAsserted(() -> {

var messages = redisConnection.sync()
.xreadgroup(Consumer.from("application_1", "consumer_1"),
XReadArgs.Builder.block(1000),
XReadArgs.StreamOffset.lastConsumed("zeebe:DEPLOYMENT"));

long createdCount = messages.stream().map(m -> m.getBody().values().stream().findFirst().get())
.filter(json -> json.contains("\"valueType\":\"DEPLOYMENT\""))
.filter(json -> json.contains("\"recordType\":\"EVENT\""))
.filter(json -> json.contains("\"intent\":\"CREATED\""))
.count();

// assert that all messages have been received
assertThat(createdCount).isEqualTo(3);

// acknowledge all messages so that cleanup can work
xlen.set(redisConnection.sync().xlen("zeebe:DEPLOYMENT"));
for (StreamMessage<String, String> message : messages) {
redisConnection.sync().xack("zeebe:DEPLOYMENT", "application_1", message.getId());
};
});

// assert that cleanup still works and removed all messages except the last ones
var delay = Duration.ofSeconds(3);
await().atMost(Duration.ofSeconds(7)).pollDelay(delay).pollInterval(Duration.ofSeconds(1)).pollInSameThread()
.untilAsserted(() -> assertThat(redisConnection.sync().xlen("zeebe:DEPLOYMENT")).isLessThan(xlen.get()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.testcontainers.containers.output.WaitingConsumer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.shaded.org.awaitility.Awaitility;

import java.time.Duration;
import java.util.concurrent.TimeUnit;

import static org.assertj.core.api.Assertions.assertThat;

Expand Down Expand Up @@ -59,7 +61,10 @@ public void worksCorrectIfRedisIsTemporarilyUnavailable() throws Exception {
redisConnection.close();
redisClient.shutdown();
zeebeContainer.getRedisContainer().stop();
Thread.sleep(2000);
WaitingConsumer consumer = new WaitingConsumer();
zeebeContainer.followOutput(consumer);
consumer.waitUntil(frame ->
frame.getUtf8String().contains("Connection refused: redis"), 10, TimeUnit.SECONDS);
zeebeContainer.getClient().newDeployResourceCommand().addProcessModel(WORKFLOW, "process.bpmn").send().join();
zeebeContainer.getClient().newDeployResourceCommand().addProcessModel(WORKFLOW, "process2.bpmn").send().join();
Thread.sleep(1000);
Expand All @@ -68,7 +73,8 @@ public void worksCorrectIfRedisIsTemporarilyUnavailable() throws Exception {
zeebeContainer.getRedisContainer().start();
redisClient = RedisClient.create(zeebeContainer.getRedisAddress());
redisConnection = redisClient.connect();
Thread.sleep(5000);
consumer.waitUntil(frame ->
frame.getUtf8String().contains("Reconnected to redis"), 20, TimeUnit.SECONDS);
redisConnection.sync().xgroupCreate(XReadArgs.StreamOffset.from("zeebe:DEPLOYMENT", "0-0"),
"application_1", XGroupCreateArgs.Builder.mkstream());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,8 @@

import io.camunda.zeebe.client.ZeebeClient;
import io.zeebe.containers.ZeebeContainer;
import org.slf4j.LoggerFactory;
import org.testcontainers.shaded.org.awaitility.Awaitility;
import org.testcontainers.utility.DockerImageName;

import java.util.logging.Level;
import java.util.logging.Logger;

public class ZeebeTestContainer extends ZeebeContainer {
Expand Down Expand Up @@ -34,20 +31,24 @@ public static ZeebeTestContainer withJsonFormat() {
return container;
}

public ZeebeTestContainer withMaxTTLInSeconds(long maxTimeToLiveInSeconds) {
public ZeebeTestContainer andUseMaxTTLInSeconds(long maxTimeToLiveInSeconds) {
withEnv("ZEEBE_REDIS_MAX_TIME_TO_LIVE_IN_SECONDS", Long.toString(maxTimeToLiveInSeconds));
return this;
}

public ZeebeTestContainer withMinTTLInSeconds(long minTimeToLiveInSeconds) {
public ZeebeTestContainer andUseMinTTLInSeconds(long minTimeToLiveInSeconds) {
withEnv("ZEEBE_REDIS_MIN_TIME_TO_LIVE_IN_SECONDS", Long.toString(minTimeToLiveInSeconds));
return this;
}

public static ZeebeTestContainer withCleanupCycleInSeconds(long cleanupCycleInSeconds) {
ZeebeTestContainer container = withDefaultConfig();
container.withEnv("ZEEBE_REDIS_CLEANUP_CYCLE_IN_SECONDS", Long.toString(cleanupCycleInSeconds));
return container;
return container.andUseCleanupCycleInSeconds(cleanupCycleInSeconds);
}

public ZeebeTestContainer andUseCleanupCycleInSeconds(long cleanupCycleInSeconds) {
withEnv("ZEEBE_REDIS_CLEANUP_CYCLE_IN_SECONDS", Long.toString(cleanupCycleInSeconds));
return this;
}
public ZeebeTestContainer doDeleteAfterAcknowledge(boolean deleteAfterAcknowledge) {
withEnv("ZEEBE_REDIS_DELETE_AFTER_ACKNOWLEDGE", Boolean.toString(deleteAfterAcknowledge));
Expand All @@ -74,6 +75,10 @@ public void start() {
super.start();
}

public void restartWithoutRedis() {
super.doStart();
}

@Override
public void stop() {
if (zeebeClient != null) {
Expand Down

0 comments on commit 63d6b36

Please sign in to comment.