Skip to content

Commit

Permalink
tests: further test stability
Browse files Browse the repository at this point in the history
  • Loading branch information
VonDerBeck committed Oct 19, 2023
1 parent 50a52a4 commit c61bfc0
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
import org.junit.jupiter.api.Test;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.shaded.org.awaitility.Awaitility;

import java.time.Duration;

import static org.assertj.core.api.Assertions.assertThat;

Expand Down Expand Up @@ -65,6 +68,7 @@ public void shouldConsiderMaxTimeToLive() throws Exception {
// then
assertThat(redisConnection.sync().xlen("zeebe:DEPLOYMENT")).isEqualTo(deploymentLen);
Thread.sleep(5000);
assertThat(redisConnection.sync().xlen("zeebe:DEPLOYMENT")).isEqualTo(0);
Awaitility.await().pollInSameThread().pollInterval(Duration.ofMillis(500)).atMost(Duration.ofSeconds(5))
.untilAsserted(() -> assertThat(redisConnection.sync().xlen("zeebe:DEPLOYMENT")).isEqualTo(0));
}
}
30 changes: 19 additions & 11 deletions exporter/src/test/java/io/zeebe/redis/RedisUnavailabilityTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
import org.junit.jupiter.api.Test;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.shaded.org.awaitility.Awaitility;

import java.time.Duration;

import static org.assertj.core.api.Assertions.assertThat;

Expand Down Expand Up @@ -64,19 +67,24 @@ public void worksCorrectIfRedisIsTemporarilyUnavailable() throws Exception {
Thread.sleep(5000);
redisConnection.sync().xgroupCreate(XReadArgs.StreamOffset.from("zeebe:DEPLOYMENT", "0-0"),
"application_1", XGroupCreateArgs.Builder.mkstream());
var messages = redisConnection.sync()
.xreadgroup(Consumer.from("application_1", "consumer_1"),
XReadArgs.Builder.block(6000),
XReadArgs.StreamOffset.lastConsumed("zeebe:DEPLOYMENT"));

long createdCount = messages.stream().map(m -> m.getBody().values().stream().findFirst().get())
.filter(json -> json.contains("\"valueType\":\"DEPLOYMENT\""))
.filter(json -> json.contains("\"recordType\":\"EVENT\""))
.filter(json -> json.contains("\"intent\":\"CREATED\""))
.count();

// then
assertThat(createdCount).isEqualTo(2);
Awaitility.await().pollInSameThread().pollInterval(Duration.ofSeconds(1))
.atMost(Duration.ofSeconds(20)).untilAsserted(() -> {

var messages = redisConnection.sync()
.xreadgroup(Consumer.from("application_1", "consumer_1"),
XReadArgs.Builder.block(6000),
XReadArgs.StreamOffset.lastConsumed("zeebe:DEPLOYMENT"));

long createdCount = messages.stream().map(m -> m.getBody().values().stream().findFirst().get())
.filter(json -> json.contains("\"valueType\":\"DEPLOYMENT\""))
.filter(json -> json.contains("\"recordType\":\"EVENT\""))
.filter(json -> json.contains("\"intent\":\"CREATED\""))
.count();

assertThat(createdCount).isEqualTo(2);
});

}
}

0 comments on commit c61bfc0

Please sign in to comment.