From 63d6b36d637208a829af29c5af3b96a048041275 Mon Sep 17 00:00:00 2001 From: Gunnar von der Beck Date: Fri, 24 Nov 2023 13:30:44 +0100 Subject: [PATCH] feat: auto reconnect in case Redis is not available at startup --- .../io/zeebe/redis/exporter/RedisCleaner.java | 4 + .../zeebe/redis/exporter/RedisExporter.java | 65 ++++++++-- .../redis/ExporterMaxTimeToLiveTest.java | 2 +- .../redis/ExporterMinTimeToLiveTest.java | 2 +- .../io/zeebe/redis/RedisLateStartupTest.java | 118 ++++++++++++++++++ .../zeebe/redis/RedisUnavailabilityTest.java | 10 +- .../testcontainers/ZeebeTestContainer.java | 19 +-- 7 files changed, 200 insertions(+), 20 deletions(-) create mode 100644 exporter/src/test/java/io/zeebe/redis/RedisLateStartupTest.java diff --git a/exporter/src/main/java/io/zeebe/redis/exporter/RedisCleaner.java b/exporter/src/main/java/io/zeebe/redis/exporter/RedisCleaner.java index 74482d4..b309a1f 100644 --- a/exporter/src/main/java/io/zeebe/redis/exporter/RedisCleaner.java +++ b/exporter/src/main/java/io/zeebe/redis/exporter/RedisCleaner.java @@ -37,6 +37,10 @@ public RedisCleaner(StatefulRedisConnection redisConnection, boolean trimScheduleDelay = Duration.ofSeconds(config.getCleanupCycleInSeconds()); } + public void setRedisConnection(StatefulRedisConnection redisConnection) { + this.redisConnection = redisConnection; + } + public void considerStream(String stream) { streams.put(stream, Boolean.TRUE); } diff --git a/exporter/src/main/java/io/zeebe/redis/exporter/RedisExporter.java b/exporter/src/main/java/io/zeebe/redis/exporter/RedisExporter.java index 6326dbc..a01e0dc 100644 --- a/exporter/src/main/java/io/zeebe/redis/exporter/RedisExporter.java +++ b/exporter/src/main/java/io/zeebe/redis/exporter/RedisExporter.java @@ -13,6 +13,8 @@ import org.slf4j.Logger; import java.time.Duration; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -38,12 +40,19 @@ public class RedisExporter implements Exporter { private RedisSender redisSender; + private Controller controller; + // The ExecutorService allows to schedule a regular task independent of the actual load // which controller.scheduleCancellableTask sadly didn't do. private ScheduledExecutorService senderThread = Executors.newSingleThreadScheduledExecutor(); private ScheduledExecutorService cleanerThread = Executors.newSingleThreadScheduledExecutor(); + // Startup handling in case of Redis connection failure + private ScheduledExecutorService startupThread; + private boolean fullyLoggedStartupException = false; + private List reconnectIntervals = new ArrayList<>(List.of(2,3,3,4,4,4,5)); + @Override public void configure(Context context) { logger = context.getLogger(); @@ -80,21 +89,59 @@ public void open(Controller controller) { if (config.getRemoteAddress().isEmpty()) { throw new IllegalStateException("Missing ZEEBE_REDIS_REMOTE_ADDRESS configuration."); } + this.controller = controller; redisClient = RedisClient.create( ClientResources.builder().ioThreadPoolSize(config.getIoThreadPoolSize()).build(), config.getRemoteAddress().get()); - senderConnection = useProtoBuf ? redisClient.connect(new ProtobufCodec()) : redisClient.connect(); - cleanupConnection = useProtoBuf ? redisClient.connect(new ProtobufCodec()) : redisClient.connect(); - logger.info("Successfully connected Redis exporter to {}", config.getRemoteAddress().get()); + connectToRedis(); + } - redisSender = new RedisSender(config, controller, senderConnection, logger); - senderThread.schedule(this::sendBatches, config.getBatchCycleMillis(), TimeUnit.MILLISECONDS); + private void connectToRedis() { + boolean failure = false; + // try to connect + try { + senderConnection = useProtoBuf ? redisClient.connect(new ProtobufCodec()) : redisClient.connect(); + cleanupConnection = useProtoBuf ? redisClient.connect(new ProtobufCodec()) : redisClient.connect(); + logger.info("Successfully connected Redis exporter to {}", config.getRemoteAddress().get()); + } catch (RedisConnectionException ex) { + if (!fullyLoggedStartupException) { + logger.error("Failure connecting Redis exporter to " + config.getRemoteAddress().get(), ex); + fullyLoggedStartupException = true; + } else { + logger.warn("Failure connecting Redis exporter to {}: {}", config.getRemoteAddress().get(), ex.getMessage()); + } + failure = true; + } + + // upon successful connection initialize the sender + if (redisSender == null && senderConnection != null) { + redisSender = new RedisSender(config, controller, senderConnection, logger); + senderThread.schedule(this::sendBatches, config.getBatchCycleMillis(), TimeUnit.MILLISECONDS); + } + + // always initialize the cleaner + if (redisCleaner == null) { + redisCleaner = new RedisCleaner(cleanupConnection, useProtoBuf, config, logger); + if (config.getCleanupCycleInSeconds() > 0 && + (config.isDeleteAfterAcknowledge() || config.getMaxTimeToLiveInSeconds() > 0)) { + cleanerThread.schedule(this::trimStreamValues, config.getCleanupCycleInSeconds(), TimeUnit.SECONDS); + } + // upon late successful connection propagate it to cleaner + } else if (cleanupConnection != null) { + redisCleaner.setRedisConnection(cleanupConnection); + } - redisCleaner = new RedisCleaner(cleanupConnection, useProtoBuf, config, logger); - if (config.getCleanupCycleInSeconds() > 0 && - (config.isDeleteAfterAcknowledge() || config.getMaxTimeToLiveInSeconds() > 0)) { - cleanerThread.schedule(this::trimStreamValues, config.getCleanupCycleInSeconds(), TimeUnit.SECONDS); + // if initial connection has failed, try again later + if (failure) { + if (startupThread == null) { + startupThread = Executors.newSingleThreadScheduledExecutor(); + } + int delay = reconnectIntervals.size() > 1 ? reconnectIntervals.remove(0) : reconnectIntervals.get(0); + startupThread.schedule(this::connectToRedis, delay, TimeUnit.SECONDS); + } else if (startupThread != null ) { + startupThread.shutdown(); + startupThread = null; } } diff --git a/exporter/src/test/java/io/zeebe/redis/ExporterMaxTimeToLiveTest.java b/exporter/src/test/java/io/zeebe/redis/ExporterMaxTimeToLiveTest.java index efac242..160691a 100644 --- a/exporter/src/test/java/io/zeebe/redis/ExporterMaxTimeToLiveTest.java +++ b/exporter/src/test/java/io/zeebe/redis/ExporterMaxTimeToLiveTest.java @@ -35,7 +35,7 @@ public class ExporterMaxTimeToLiveTest { @Container public ZeebeTestContainer zeebeContainer = ZeebeTestContainer - .withCleanupCycleInSeconds(2).withMaxTTLInSeconds(6); + .withCleanupCycleInSeconds(2).andUseMaxTTLInSeconds(6); private RedisClient redisClient; private StatefulRedisConnection redisConnection; diff --git a/exporter/src/test/java/io/zeebe/redis/ExporterMinTimeToLiveTest.java b/exporter/src/test/java/io/zeebe/redis/ExporterMinTimeToLiveTest.java index 6c3bb61..550ceb4 100644 --- a/exporter/src/test/java/io/zeebe/redis/ExporterMinTimeToLiveTest.java +++ b/exporter/src/test/java/io/zeebe/redis/ExporterMinTimeToLiveTest.java @@ -34,7 +34,7 @@ public class ExporterMinTimeToLiveTest { @Container public ZeebeTestContainer zeebeContainer = ZeebeTestContainer - .withCleanupCycleInSeconds(3).doDeleteAfterAcknowledge(true).withMinTTLInSeconds(10); + .withCleanupCycleInSeconds(3).doDeleteAfterAcknowledge(true).andUseMinTTLInSeconds(10); private RedisClient redisClient; private StatefulRedisConnection redisConnection; diff --git a/exporter/src/test/java/io/zeebe/redis/RedisLateStartupTest.java b/exporter/src/test/java/io/zeebe/redis/RedisLateStartupTest.java new file mode 100644 index 0000000..2672beb --- /dev/null +++ b/exporter/src/test/java/io/zeebe/redis/RedisLateStartupTest.java @@ -0,0 +1,118 @@ +package io.zeebe.redis; + +import io.camunda.zeebe.model.bpmn.Bpmn; +import io.camunda.zeebe.model.bpmn.BpmnModelInstance; +import io.lettuce.core.*; +import io.lettuce.core.api.StatefulRedisConnection; +import io.zeebe.redis.testcontainers.OnFailureExtension; +import io.zeebe.redis.testcontainers.ZeebeTestContainer; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.testcontainers.containers.output.WaitingConsumer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.shaded.org.awaitility.Awaitility; + +import java.time.Duration; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.testcontainers.shaded.org.awaitility.Awaitility.await; + +@Testcontainers +@ExtendWith(OnFailureExtension.class) +public class RedisLateStartupTest { + + private static final BpmnModelInstance WORKFLOW = + Bpmn.createExecutableProcess("process") + .startEvent("start") + .sequenceFlowId("to-task") + .serviceTask("task", s -> s.zeebeJobType("test")) + .sequenceFlowId("to-end") + .endEvent("end") + .done(); + @Container + public ZeebeTestContainer zeebeContainer = ZeebeTestContainer.withJsonFormat() + .andUseCleanupCycleInSeconds(2).doDeleteAfterAcknowledge(true); + + private RedisClient redisClient; + private StatefulRedisConnection redisConnection; + + @BeforeEach + public void init() { + redisClient = RedisClient.create(zeebeContainer.getRedisAddress()); + redisConnection = redisClient.connect(); + redisConnection.sync().xtrim("zeebe:DEPLOYMENT", 0); + } + + @AfterEach + public void cleanUp() { + redisConnection.sync().xtrim("zeebe:DEPLOYMENT", 0); + redisConnection.close(); + redisClient.shutdown(); + } + + @Test + public void worksCorrectIfRedisIsUnavailableAtStartup() throws Exception { + // given + redisConnection.close(); + redisClient.shutdown(); + zeebeContainer.stop(); + zeebeContainer.restartWithoutRedis(); + WaitingConsumer consumer = new WaitingConsumer(); + zeebeContainer.followOutput(consumer); + consumer.waitUntil(frame -> + frame.getUtf8String().contains("Broker is ready"), 30, TimeUnit.SECONDS); + zeebeContainer.getClient().newDeployResourceCommand().addProcessModel(WORKFLOW, "process.bpmn").send().join(); + zeebeContainer.getClient().newDeployResourceCommand().addProcessModel(WORKFLOW, "process2.bpmn").send().join(); + consumer.waitUntil(frame -> + frame.getUtf8String().contains("Failure connecting Redis exporter"), 20, TimeUnit.SECONDS); + + // when + zeebeContainer.getRedisContainer().start(); + redisClient = RedisClient.create(zeebeContainer.getRedisAddress()); + redisConnection = redisClient.connect(); + consumer.waitUntil(frame -> + frame.getUtf8String().contains("Successfully connected Redis exporter"), 20, TimeUnit.SECONDS); + + Thread.sleep(1000); + zeebeContainer.getClient().newDeployResourceCommand().addProcessModel(WORKFLOW, "process3.bpmn").send().join(); + + redisConnection.sync().xgroupCreate(XReadArgs.StreamOffset.from("zeebe:DEPLOYMENT", "0-0"), + "application_1", XGroupCreateArgs.Builder.mkstream()); + + // then + AtomicReference xlen = new AtomicReference<>(); + Awaitility.await().pollInSameThread().pollInterval(Duration.ofSeconds(1)) + .atMost(Duration.ofSeconds(10)).untilAsserted(() -> { + + var messages = redisConnection.sync() + .xreadgroup(Consumer.from("application_1", "consumer_1"), + XReadArgs.Builder.block(1000), + XReadArgs.StreamOffset.lastConsumed("zeebe:DEPLOYMENT")); + + long createdCount = messages.stream().map(m -> m.getBody().values().stream().findFirst().get()) + .filter(json -> json.contains("\"valueType\":\"DEPLOYMENT\"")) + .filter(json -> json.contains("\"recordType\":\"EVENT\"")) + .filter(json -> json.contains("\"intent\":\"CREATED\"")) + .count(); + + // assert that all messages have been received + assertThat(createdCount).isEqualTo(3); + + // acknowledge all messages so that cleanup can work + xlen.set(redisConnection.sync().xlen("zeebe:DEPLOYMENT")); + for (StreamMessage message : messages) { + redisConnection.sync().xack("zeebe:DEPLOYMENT", "application_1", message.getId()); + }; + }); + + // assert that cleanup still works and removed all messages except the last ones + var delay = Duration.ofSeconds(3); + await().atMost(Duration.ofSeconds(7)).pollDelay(delay).pollInterval(Duration.ofSeconds(1)).pollInSameThread() + .untilAsserted(() -> assertThat(redisConnection.sync().xlen("zeebe:DEPLOYMENT")).isLessThan(xlen.get())); + } +} diff --git a/exporter/src/test/java/io/zeebe/redis/RedisUnavailabilityTest.java b/exporter/src/test/java/io/zeebe/redis/RedisUnavailabilityTest.java index 171d10c..0784253 100644 --- a/exporter/src/test/java/io/zeebe/redis/RedisUnavailabilityTest.java +++ b/exporter/src/test/java/io/zeebe/redis/RedisUnavailabilityTest.java @@ -13,11 +13,13 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.testcontainers.containers.output.WaitingConsumer; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; import org.testcontainers.shaded.org.awaitility.Awaitility; import java.time.Duration; +import java.util.concurrent.TimeUnit; import static org.assertj.core.api.Assertions.assertThat; @@ -59,7 +61,10 @@ public void worksCorrectIfRedisIsTemporarilyUnavailable() throws Exception { redisConnection.close(); redisClient.shutdown(); zeebeContainer.getRedisContainer().stop(); - Thread.sleep(2000); + WaitingConsumer consumer = new WaitingConsumer(); + zeebeContainer.followOutput(consumer); + consumer.waitUntil(frame -> + frame.getUtf8String().contains("Connection refused: redis"), 10, TimeUnit.SECONDS); zeebeContainer.getClient().newDeployResourceCommand().addProcessModel(WORKFLOW, "process.bpmn").send().join(); zeebeContainer.getClient().newDeployResourceCommand().addProcessModel(WORKFLOW, "process2.bpmn").send().join(); Thread.sleep(1000); @@ -68,7 +73,8 @@ public void worksCorrectIfRedisIsTemporarilyUnavailable() throws Exception { zeebeContainer.getRedisContainer().start(); redisClient = RedisClient.create(zeebeContainer.getRedisAddress()); redisConnection = redisClient.connect(); - Thread.sleep(5000); + consumer.waitUntil(frame -> + frame.getUtf8String().contains("Reconnected to redis"), 20, TimeUnit.SECONDS); redisConnection.sync().xgroupCreate(XReadArgs.StreamOffset.from("zeebe:DEPLOYMENT", "0-0"), "application_1", XGroupCreateArgs.Builder.mkstream()); diff --git a/exporter/src/test/java/io/zeebe/redis/testcontainers/ZeebeTestContainer.java b/exporter/src/test/java/io/zeebe/redis/testcontainers/ZeebeTestContainer.java index 742bb4b..10a5bdd 100644 --- a/exporter/src/test/java/io/zeebe/redis/testcontainers/ZeebeTestContainer.java +++ b/exporter/src/test/java/io/zeebe/redis/testcontainers/ZeebeTestContainer.java @@ -2,11 +2,8 @@ import io.camunda.zeebe.client.ZeebeClient; import io.zeebe.containers.ZeebeContainer; -import org.slf4j.LoggerFactory; -import org.testcontainers.shaded.org.awaitility.Awaitility; import org.testcontainers.utility.DockerImageName; -import java.util.logging.Level; import java.util.logging.Logger; public class ZeebeTestContainer extends ZeebeContainer { @@ -34,20 +31,24 @@ public static ZeebeTestContainer withJsonFormat() { return container; } - public ZeebeTestContainer withMaxTTLInSeconds(long maxTimeToLiveInSeconds) { + public ZeebeTestContainer andUseMaxTTLInSeconds(long maxTimeToLiveInSeconds) { withEnv("ZEEBE_REDIS_MAX_TIME_TO_LIVE_IN_SECONDS", Long.toString(maxTimeToLiveInSeconds)); return this; } - public ZeebeTestContainer withMinTTLInSeconds(long minTimeToLiveInSeconds) { + public ZeebeTestContainer andUseMinTTLInSeconds(long minTimeToLiveInSeconds) { withEnv("ZEEBE_REDIS_MIN_TIME_TO_LIVE_IN_SECONDS", Long.toString(minTimeToLiveInSeconds)); return this; } public static ZeebeTestContainer withCleanupCycleInSeconds(long cleanupCycleInSeconds) { ZeebeTestContainer container = withDefaultConfig(); - container.withEnv("ZEEBE_REDIS_CLEANUP_CYCLE_IN_SECONDS", Long.toString(cleanupCycleInSeconds)); - return container; + return container.andUseCleanupCycleInSeconds(cleanupCycleInSeconds); + } + + public ZeebeTestContainer andUseCleanupCycleInSeconds(long cleanupCycleInSeconds) { + withEnv("ZEEBE_REDIS_CLEANUP_CYCLE_IN_SECONDS", Long.toString(cleanupCycleInSeconds)); + return this; } public ZeebeTestContainer doDeleteAfterAcknowledge(boolean deleteAfterAcknowledge) { withEnv("ZEEBE_REDIS_DELETE_AFTER_ACKNOWLEDGE", Boolean.toString(deleteAfterAcknowledge)); @@ -74,6 +75,10 @@ public void start() { super.start(); } + public void restartWithoutRedis() { + super.doStart(); + } + @Override public void stop() { if (zeebeClient != null) {