From c61bfc0d1c862f7ff1b44f77bdd1ccbc33a89582 Mon Sep 17 00:00:00 2001 From: Gunnar von der Beck Date: Thu, 19 Oct 2023 13:07:39 +0200 Subject: [PATCH] tests: further test stability --- .../redis/ExporterMaxTimeToLiveTest.java | 6 +++- .../zeebe/redis/RedisUnavailabilityTest.java | 30 ++++++++++++------- 2 files changed, 24 insertions(+), 12 deletions(-) diff --git a/exporter/src/test/java/io/zeebe/redis/ExporterMaxTimeToLiveTest.java b/exporter/src/test/java/io/zeebe/redis/ExporterMaxTimeToLiveTest.java index 213fd5b..55df3cb 100644 --- a/exporter/src/test/java/io/zeebe/redis/ExporterMaxTimeToLiveTest.java +++ b/exporter/src/test/java/io/zeebe/redis/ExporterMaxTimeToLiveTest.java @@ -12,6 +12,9 @@ import org.junit.jupiter.api.Test; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.shaded.org.awaitility.Awaitility; + +import java.time.Duration; import static org.assertj.core.api.Assertions.assertThat; @@ -65,6 +68,7 @@ public void shouldConsiderMaxTimeToLive() throws Exception { // then assertThat(redisConnection.sync().xlen("zeebe:DEPLOYMENT")).isEqualTo(deploymentLen); Thread.sleep(5000); - assertThat(redisConnection.sync().xlen("zeebe:DEPLOYMENT")).isEqualTo(0); + Awaitility.await().pollInSameThread().pollInterval(Duration.ofMillis(500)).atMost(Duration.ofSeconds(5)) + .untilAsserted(() -> assertThat(redisConnection.sync().xlen("zeebe:DEPLOYMENT")).isEqualTo(0)); } } diff --git a/exporter/src/test/java/io/zeebe/redis/RedisUnavailabilityTest.java b/exporter/src/test/java/io/zeebe/redis/RedisUnavailabilityTest.java index 39434e4..35d3348 100644 --- a/exporter/src/test/java/io/zeebe/redis/RedisUnavailabilityTest.java +++ b/exporter/src/test/java/io/zeebe/redis/RedisUnavailabilityTest.java @@ -13,6 +13,9 @@ import org.junit.jupiter.api.Test; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.shaded.org.awaitility.Awaitility; + +import java.time.Duration; import static org.assertj.core.api.Assertions.assertThat; @@ -64,19 +67,24 @@ public void worksCorrectIfRedisIsTemporarilyUnavailable() throws Exception { Thread.sleep(5000); redisConnection.sync().xgroupCreate(XReadArgs.StreamOffset.from("zeebe:DEPLOYMENT", "0-0"), "application_1", XGroupCreateArgs.Builder.mkstream()); - var messages = redisConnection.sync() - .xreadgroup(Consumer.from("application_1", "consumer_1"), - XReadArgs.Builder.block(6000), - XReadArgs.StreamOffset.lastConsumed("zeebe:DEPLOYMENT")); - - long createdCount = messages.stream().map(m -> m.getBody().values().stream().findFirst().get()) - .filter(json -> json.contains("\"valueType\":\"DEPLOYMENT\"")) - .filter(json -> json.contains("\"recordType\":\"EVENT\"")) - .filter(json -> json.contains("\"intent\":\"CREATED\"")) - .count(); // then - assertThat(createdCount).isEqualTo(2); + Awaitility.await().pollInSameThread().pollInterval(Duration.ofSeconds(1)) + .atMost(Duration.ofSeconds(20)).untilAsserted(() -> { + + var messages = redisConnection.sync() + .xreadgroup(Consumer.from("application_1", "consumer_1"), + XReadArgs.Builder.block(6000), + XReadArgs.StreamOffset.lastConsumed("zeebe:DEPLOYMENT")); + + long createdCount = messages.stream().map(m -> m.getBody().values().stream().findFirst().get()) + .filter(json -> json.contains("\"valueType\":\"DEPLOYMENT\"")) + .filter(json -> json.contains("\"recordType\":\"EVENT\"")) + .filter(json -> json.contains("\"intent\":\"CREATED\"")) + .count(); + + assertThat(createdCount).isEqualTo(2); + }); } }