Skip to content

Commit

Permalink
tests: towards more stable tests
Browse files Browse the repository at this point in the history
  • Loading branch information
VonDerBeck committed Oct 20, 2023
1 parent c61bfc0 commit d1dd056
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ public class ZeebeTestContainer extends ZeebeContainer {

private RedisContainer redisContainer;

private ZeebeClient zeebeClient;

protected ZeebeTestContainer(RedisContainer redisContainer) {
super(DockerImageName.parse("ghcr.io/camunda-community-hub/zeebe-with-redis-exporter"));
withExposedPorts(26500,9600);
Expand All @@ -32,10 +34,13 @@ public static ZeebeTestContainer withMaxTTLInSeconds(long maxTimeToLiveInSeconds
}

public ZeebeClient getClient() {
return ZeebeClient.newClientBuilder()
.gatewayAddress(getExternalGatewayAddress())
.usePlaintext()
.build();
if (zeebeClient == null) {
zeebeClient = ZeebeClient.newClientBuilder()
.gatewayAddress(getExternalGatewayAddress())
.usePlaintext()
.build();
}
return zeebeClient;
}

@Override
Expand All @@ -50,6 +55,10 @@ public void start() {

@Override
public void stop() {
if (zeebeClient != null) {
zeebeClient.close();
}
zeebeClient = null;
super.stop();
if (redisContainer != null) {
redisContainer.stop();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ public void shouldDeleteAfterAcknowledge() throws Exception {
// given: some consumed and acknowledged messages
zeebeContainer.getClient().newDeployResourceCommand().addProcessModel(WORKFLOW, "process-1.bpmn").send().join();
zeebeContainer.getClient().newDeployResourceCommand().addProcessModel(WORKFLOW, "process-2.bpmn").send().join();
redisConnection.sync().xgroupCreate(XReadArgs.StreamOffset.from("zeebe:DEPLOYMENT", "0-0"), "application_1");
redisConnection.sync().xgroupCreate(XReadArgs.StreamOffset.from("zeebe:DEPLOYMENT", "0-0"),
"application_1", XGroupCreateArgs.Builder.mkstream());
Thread.sleep(1000);
var messages = redisConnection.sync()
.xreadgroup(Consumer.from("application_1", "consumer_1"),
Expand All @@ -83,7 +84,8 @@ public void shouldNotDeleteWhenConsumedButNotAcknowledged() throws Exception {
// given: some consumed but never acknowledged messages
zeebeContainer.getClient().newDeployResourceCommand().addProcessModel(WORKFLOW, "process-1.bpmn").send().join();
zeebeContainer.getClient().newDeployResourceCommand().addProcessModel(WORKFLOW, "process-2.bpmn").send().join();
redisConnection.sync().xgroupCreate(XReadArgs.StreamOffset.from("zeebe:DEPLOYMENT", "0-0"), "application_2");
redisConnection.sync().xgroupCreate(XReadArgs.StreamOffset.from("zeebe:DEPLOYMENT", "0-0"),
"application_2", XGroupCreateArgs.Builder.mkstream());
Thread.sleep(1000);
var messages = redisConnection.sync()
.xreadgroup(Consumer.from("application_2", "consumer_2"),
Expand All @@ -105,8 +107,10 @@ public void shouldNotDeleteWhenNotAcknowledgedByAllGroups() throws Exception {
// given: messages consumed and acknowledged by only one of two consumer groups
zeebeContainer.getClient().newDeployResourceCommand().addProcessModel(WORKFLOW, "process-1.bpmn").send().join();
zeebeContainer.getClient().newDeployResourceCommand().addProcessModel(WORKFLOW, "process-2.bpmn").send().join();
redisConnection.sync().xgroupCreate(XReadArgs.StreamOffset.from("zeebe:DEPLOYMENT", "0-0"), "application_3");
redisConnection.sync().xgroupCreate(XReadArgs.StreamOffset.from("zeebe:DEPLOYMENT", "0-0"), "application_4");
redisConnection.sync().xgroupCreate(XReadArgs.StreamOffset.from("zeebe:DEPLOYMENT", "0-0"),
"application_3", XGroupCreateArgs.Builder.mkstream());
redisConnection.sync().xgroupCreate(XReadArgs.StreamOffset.from("zeebe:DEPLOYMENT", "0-0"),
"application_4", XGroupCreateArgs.Builder.mkstream());
Thread.sleep(1000);
var messages = redisConnection.sync()
.xreadgroup(Consumer.from("application_3", "consumer_3"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,7 @@

import io.camunda.zeebe.model.bpmn.Bpmn;
import io.camunda.zeebe.model.bpmn.BpmnModelInstance;
import io.lettuce.core.Consumer;
import io.lettuce.core.RedisClient;
import io.lettuce.core.StreamMessage;
import io.lettuce.core.XReadArgs;
import io.lettuce.core.*;
import io.lettuce.core.api.StatefulRedisConnection;
import io.zeebe.redis.exporter.ProtobufCodec;
import io.zeebe.redis.testcontainers.ZeebeTestContainer;
Expand All @@ -14,8 +11,12 @@
import org.junit.jupiter.api.Test;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.shaded.org.awaitility.Awaitility;

import java.time.Duration;

import static org.assertj.core.api.Assertions.assertThat;
import static org.testcontainers.shaded.org.awaitility.Awaitility.await;

@Testcontainers
public class ExporterMinTimeToLiveTest {
Expand Down Expand Up @@ -55,7 +56,8 @@ public void shouldConsiderMinTtlWhenDeleteAfterAcknowledge() throws Exception {
// given: some consumed and acknowledged messages
zeebeContainer.getClient().newDeployResourceCommand().addProcessModel(WORKFLOW, "process-1.bpmn").send().join();
zeebeContainer.getClient().newDeployResourceCommand().addProcessModel(WORKFLOW, "process-2.bpmn").send().join();
redisConnection.sync().xgroupCreate(XReadArgs.StreamOffset.from("zeebe:DEPLOYMENT", "0-0"), "application_1");
redisConnection.sync().xgroupCreate(XReadArgs.StreamOffset.from("zeebe:DEPLOYMENT", "0-0"),
"application_1", XGroupCreateArgs.Builder.mkstream());
Thread.sleep(1000);
var messages = redisConnection.sync()
.xreadgroup(Consumer.from("application_1", "consumer_1"),
Expand All @@ -74,8 +76,9 @@ public void shouldConsiderMinTtlWhenDeleteAfterAcknowledge() throws Exception {
assertThat(redisConnection.sync().xlen("zeebe:DEPLOYMENT")).isEqualTo(xlen);

// but will delete them after min TTL
Thread.sleep(3000);
assertThat(redisConnection.sync().xlen("zeebe:DEPLOYMENT")).isLessThan(xlen);
var delay = Duration.ofSeconds(3);
await().atMost(Duration.ofSeconds(5)).pollDelay(delay).pollInterval(Duration.ofMillis(500)).untilAsserted(() ->
assertThat(redisConnection.sync().xlen("zeebe:DEPLOYMENT")).isLessThan(xlen));

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,12 @@ public void worksCorrectIfRedisIsTemporarilyUnavailable() throws Exception {
"application_1", XGroupCreateArgs.Builder.mkstream());

// then
Awaitility.await().pollInSameThread().pollInterval(Duration.ofSeconds(1))
.atMost(Duration.ofSeconds(20)).untilAsserted(() -> {
Awaitility.await().pollInterval(Duration.ofSeconds(1))
.atMost(Duration.ofSeconds(10)).untilAsserted(() -> {

var messages = redisConnection.sync()
.xreadgroup(Consumer.from("application_1", "consumer_1"),
XReadArgs.Builder.block(6000),
XReadArgs.Builder.block(1000),
XReadArgs.StreamOffset.lastConsumed("zeebe:DEPLOYMENT"));

long createdCount = messages.stream().map(m -> m.getBody().values().stream().findFirst().get())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@

import io.camunda.zeebe.client.ZeebeClient;
import io.zeebe.containers.ZeebeContainer;
import org.testcontainers.shaded.org.awaitility.Awaitility;
import org.testcontainers.utility.DockerImageName;

public class ZeebeTestContainer extends ZeebeContainer {

private RedisContainer redisContainer;

private ZeebeClient zeebeClient;

protected ZeebeTestContainer(RedisContainer redisContainer) {
super(DockerImageName.parse("ghcr.io/camunda-community-hub/zeebe-with-redis-exporter"));
withExposedPorts(26500,9600);
Expand Down Expand Up @@ -46,10 +49,13 @@ public ZeebeTestContainer doDeleteAfterAcknowledge(boolean deleteAfterAcknowledg
}

public ZeebeClient getClient() {
return ZeebeClient.newClientBuilder()
.gatewayAddress(getExternalGatewayAddress())
.usePlaintext()
.build();
if (zeebeClient == null) {
zeebeClient = ZeebeClient.newClientBuilder()
.gatewayAddress(getExternalGatewayAddress())
.usePlaintext()
.build();
}
return zeebeClient;
}

@Override
Expand All @@ -64,6 +70,10 @@ public void start() {

@Override
public void stop() {
if (zeebeClient != null) {
zeebeClient.close();
}
zeebeClient = null;
super.stop();
if (redisContainer != null) {
redisContainer.stop();
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
</parent>

<properties>
<version.zeebe>8.3.0</version.zeebe>
<version.zeebe>8.3.1</version.zeebe>
<version.exporter.protobuf>1.4.0</version.exporter.protobuf>
<version.lettuce>6.2.6.RELEASE</version.lettuce>
<version.log4j>2.20.0</version.log4j>
Expand Down

0 comments on commit d1dd056

Please sign in to comment.