Skip to content

Commit

Permalink
feat: upgrade to Zeebe Version 8.3.0
Browse files Browse the repository at this point in the history
  • Loading branch information
VonDerBeck committed Oct 12, 2023
1 parent 6f798c3 commit 4f3c810
Show file tree
Hide file tree
Showing 8 changed files with 58 additions and 35 deletions.
45 changes: 29 additions & 16 deletions connector-java/src/test/java/io/zeebe/redis/ExporterRecordTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import io.camunda.zeebe.client.ZeebeClient;
import io.camunda.zeebe.model.bpmn.Bpmn;
import io.camunda.zeebe.model.bpmn.BpmnModelInstance;
import io.camunda.zeebe.protocol.record.intent.*;
import io.lettuce.core.RedisClient;
import io.zeebe.exporter.proto.Schema;
import io.zeebe.redis.connect.java.ZeebeRedis;
Expand Down Expand Up @@ -35,6 +36,9 @@ public class ExporterRecordTest {
.message(m -> m.name("message").zeebeCorrelationKeyExpression("key"))
.boundaryEvent("timer", b -> b.timerWithDuration("PT1M"))
.endEvent()
.moveToNode("receive-task")
.serviceTask("errorTask", s -> s.zeebeJobType("error"))
.endEvent()
.done();

private static final BpmnModelInstance MESSAGE_PROCESS =
Expand Down Expand Up @@ -142,18 +146,23 @@ public void shouldExportRecords() {
client.newActivateJobsCommand().jobType("test").maxJobsToActivate(1).send().join();
jobsResponse.getJobs().forEach(job -> client.newCompleteCommand(job.getKey()).send().join());

final var errorResponse =
client.newActivateJobsCommand().jobType("error").maxJobsToActivate(1).send().join();
errorResponse.getJobs().forEach(job -> client.newFailCommand(job.getKey()).retries(0)
.errorMessage("Error").send().join());

// then
await().atMost(Duration.ofSeconds(20)).pollInterval(Duration.ofSeconds(2))
.untilAsserted(
() -> {
assertThat(deploymentRecords)
.hasSizeGreaterThanOrEqualTo(3)
.hasSizeGreaterThanOrEqualTo(2)
.allSatisfy(
r ->
assertThat(r.getMetadata().getValueType())
.isEqualTo(Schema.RecordMetadata.ValueType.DEPLOYMENT))
.extracting(r -> r.getMetadata().getIntent())
.contains("CREATE", "CREATED", "FULLY_DISTRIBUTED");
.contains(DeploymentIntent.CREATE.name(), DeploymentIntent.CREATED.name());

assertThat(incidentRecords)
.hasSizeGreaterThanOrEqualTo(1)
Expand All @@ -162,7 +171,7 @@ public void shouldExportRecords() {
assertThat(r.getMetadata().getValueType())
.isEqualTo(Schema.RecordMetadata.ValueType.INCIDENT))
.extracting(r -> r.getMetadata().getIntent())
.contains("CREATED");
.contains(IncidentIntent.CREATED.name());

assertThat(jobBatchRecords)
.hasSizeGreaterThanOrEqualTo(2)
Expand All @@ -171,7 +180,7 @@ public void shouldExportRecords() {
assertThat(r.getMetadata().getValueType())
.isEqualTo(Schema.RecordMetadata.ValueType.JOB_BATCH))
.extracting(r -> r.getMetadata().getIntent())
.contains("ACTIVATE", "ACTIVATED");
.contains(JobBatchIntent.ACTIVATE.name(), JobBatchIntent.ACTIVATED.name());

assertThat(jobRecords)
.hasSizeGreaterThanOrEqualTo(2)
Expand All @@ -180,7 +189,7 @@ public void shouldExportRecords() {
assertThat(r.getMetadata().getValueType())
.isEqualTo(Schema.RecordMetadata.ValueType.JOB))
.extracting(r -> r.getMetadata().getIntent())
.contains("CREATED", "COMPLETED");
.contains(JobIntent.CREATED.name(), JobIntent.COMPLETED.name());

assertThat(messageRecords)
.hasSizeGreaterThanOrEqualTo(2)
Expand All @@ -189,7 +198,7 @@ public void shouldExportRecords() {
assertThat(r.getMetadata().getValueType())
.isEqualTo(Schema.RecordMetadata.ValueType.MESSAGE))
.extracting(r -> r.getMetadata().getIntent())
.contains("PUBLISH", "PUBLISHED");
.contains(MessageIntent.PUBLISH.name(), MessageIntent.PUBLISHED.name());

assertThat(messageStartEventSubscriptionRecords)
.hasSizeGreaterThanOrEqualTo(1)
Expand All @@ -199,7 +208,7 @@ public void shouldExportRecords() {
.isEqualTo(
Schema.RecordMetadata.ValueType.MESSAGE_START_EVENT_SUBSCRIPTION))
.extracting(r -> r.getMetadata().getIntent())
.contains("CREATED");
.contains(MessageStartEventSubscriptionIntent.CREATED.name());

assertThat(messageSubscriptionRecords)
.hasSizeGreaterThanOrEqualTo(3)
Expand All @@ -208,7 +217,8 @@ public void shouldExportRecords() {
assertThat(r.getMetadata().getValueType())
.isEqualTo(Schema.RecordMetadata.ValueType.MESSAGE_SUBSCRIPTION))
.extracting(r -> r.getMetadata().getIntent())
.contains("CREATED", "CORRELATING", "CORRELATED");
.contains(MessageSubscriptionIntent.CREATED.name(), MessageSubscriptionIntent.CORRELATING.name(),
MessageSubscriptionIntent.CORRELATED.name());

assertThat(processEventRecords)
.hasSizeGreaterThanOrEqualTo(2)
Expand All @@ -217,7 +227,7 @@ public void shouldExportRecords() {
assertThat(r.getMetadata().getValueType())
.isEqualTo(Schema.RecordMetadata.ValueType.PROCESS_EVENT))
.extracting(r -> r.getMetadata().getIntent())
.contains("TRIGGERING", "TRIGGERED");
.contains(ProcessEventIntent.TRIGGERING.name(), ProcessEventIntent.TRIGGERED.name());

assertThat(processInstanceCreationRecords)
.hasSizeGreaterThanOrEqualTo(2)
Expand All @@ -226,7 +236,7 @@ public void shouldExportRecords() {
assertThat(r.getMetadata().getValueType())
.isEqualTo(Schema.RecordMetadata.ValueType.PROCESS_INSTANCE_CREATION))
.extracting(r -> r.getMetadata().getIntent())
.contains("CREATE", "CREATED");
.contains(ProcessInstanceCreationIntent.CREATE.name(), ProcessInstanceCreationIntent.CREATED.name());

assertThat(processInstanceRecords)
.hasSizeGreaterThanOrEqualTo(3)
Expand All @@ -235,7 +245,9 @@ public void shouldExportRecords() {
assertThat(r.getMetadata().getValueType())
.isEqualTo(Schema.RecordMetadata.ValueType.PROCESS_INSTANCE))
.extracting(r -> r.getMetadata().getIntent())
.contains("ACTIVATE_ELEMENT", "ELEMENT_ACTIVATING", "ELEMENT_ACTIVATED");
.contains(ProcessInstanceIntent.ACTIVATE_ELEMENT.name(),
ProcessInstanceIntent.ELEMENT_ACTIVATING.name(),
ProcessInstanceIntent.ELEMENT_ACTIVATED.name());

assertThat(processMessageSubscriptionRecords)
.hasSizeGreaterThanOrEqualTo(2)
Expand All @@ -245,7 +257,8 @@ public void shouldExportRecords() {
.isEqualTo(
Schema.RecordMetadata.ValueType.PROCESS_MESSAGE_SUBSCRIPTION))
.extracting(r -> r.getMetadata().getIntent())
.contains("CREATING", "CORRELATED");
.contains(ProcessMessageSubscriptionIntent.CREATED.name(),
ProcessMessageSubscriptionIntent.CORRELATED.name());

assertThat(processRecords)
.hasSizeGreaterThanOrEqualTo(2)
Expand All @@ -254,7 +267,7 @@ public void shouldExportRecords() {
assertThat(r.getMetadata().getValueType())
.isEqualTo(Schema.RecordMetadata.ValueType.PROCESS))
.extracting(r -> r.getMetadata().getIntent())
.contains("CREATED");
.contains(ProcessIntent.CREATED.name());

assertThat(timerRecords)
.hasSizeGreaterThanOrEqualTo(1)
Expand All @@ -263,7 +276,7 @@ public void shouldExportRecords() {
assertThat(r.getMetadata().getValueType())
.isEqualTo(Schema.RecordMetadata.ValueType.TIMER))
.extracting(r -> r.getMetadata().getIntent())
.contains("CREATED");
.contains(TimerIntent.CREATED.name());

assertThat(variableDocumentRecords)
.hasSizeGreaterThanOrEqualTo(1)
Expand All @@ -272,7 +285,7 @@ public void shouldExportRecords() {
assertThat(r.getMetadata().getValueType())
.isEqualTo(Schema.RecordMetadata.ValueType.VARIABLE_DOCUMENT))
.extracting(r -> r.getMetadata().getIntent())
.contains("UPDATE", "UPDATED");
.contains(VariableDocumentIntent.UPDATE.name(), VariableDocumentIntent.UPDATED.name());

assertThat(variableRecords)
.hasSizeGreaterThanOrEqualTo(2)
Expand All @@ -281,7 +294,7 @@ public void shouldExportRecords() {
assertThat(r.getMetadata().getValueType())
.isEqualTo(Schema.RecordMetadata.ValueType.VARIABLE))
.extracting(r -> r.getMetadata().getIntent())
.contains("CREATED");
.contains(VariableIntent.CREATED.name());
});
}
}
8 changes: 4 additions & 4 deletions connector-java/src/test/java/io/zeebe/redis/ExporterTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,12 @@ public void shouldListenToDeploymentAndProcessEvents() {
client.newDeployResourceCommand().addProcessModel(PROCESS, "process.bpmn").send().join();

// then
Awaitility.await("await until the deployment is fully distributed")
Awaitility.await("await until the deployment is created")
.untilAsserted(
() ->
assertThat(deploymentRecords)
.extracting(r -> r.getMetadata().getIntent())
.contains(DeploymentIntent.FULLY_DISTRIBUTED.name()));
.contains(DeploymentIntent.CREATED.name()));

// when
client.newCreateInstanceCommand().bpmnProcessId("process").latestVersion().send().join();
Expand Down Expand Up @@ -117,13 +117,13 @@ public void shouldDeliverDifferentMessagesPerConsumerId() {
client.newDeployResourceCommand().addProcessModel(PROCESS, "process4.bpmn").send().join();

// then
Awaitility.await("await until all deployments are fully distributed")
Awaitility.await("await until all deployments are created")
.untilAsserted(() -> {
var allRecords = new ArrayList<>(records1);
allRecords.addAll(records2);
assertThat(allRecords)
.extracting(r -> r.getMetadata().getIntent())
.filteredOn(i -> i.equals(DeploymentIntent.FULLY_DISTRIBUTED.name()))
.filteredOn(i -> i.equals(DeploymentIntent.CREATED.name()))
.hasSize(4);
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import static org.assertj.core.api.Assertions.assertThat;

@Testcontainers
public class ZeebeRedisClientTest {
public class RedisUnavailabilityTest {

private static final BpmnModelInstance PROCESS =
Bpmn.createExecutableProcess("process")
Expand Down Expand Up @@ -76,12 +76,12 @@ public void shouldReconnectIfRedisIsTempUnavailable() throws Exception{
Thread.sleep(2000);

// then
Awaitility.await("await until the deployment is fully distributed")
Awaitility.await("await until the deployment is created")
.atMost(Duration.ofSeconds(10))
.pollInterval(Duration.ofSeconds(2))
.untilAsserted(() -> assertThat(deploymentRecords)
.extracting(r -> r.getMetadata().getIntent())
.contains(DeploymentIntent.FULLY_DISTRIBUTED.name()));
.contains(DeploymentIntent.CREATED.name()));

var redisConnection = redisClient.connect(new ProtobufCodec());
Awaitility.await("await until all messages of deployment stream have been deleted")
Expand Down
5 changes: 4 additions & 1 deletion exporter/src/test/java/io/zeebe/redis/ExporterJsonTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import io.camunda.zeebe.model.bpmn.BpmnModelInstance;
import io.lettuce.core.Range;
import io.lettuce.core.RedisClient;
import io.lettuce.core.XGroupCreateArgs;
import io.lettuce.core.XReadArgs;
import io.lettuce.core.api.StatefulRedisConnection;
import io.zeebe.redis.exporter.ExporterConfiguration;
import io.zeebe.redis.testcontainers.ZeebeTestContainer;
Expand Down Expand Up @@ -50,9 +52,10 @@ public void cleanUp() {


@Test
public void shouldExportEventsAsJson() {
public void shouldExportEventsAsJson() throws Exception {
// given
zeebeContainer.getClient().newDeployResourceCommand().addProcessModel(WORKFLOW, "process.bpmn").send().join();
Thread.sleep(1000);

// when
final var message = redisConnection.sync()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,10 @@ public void cleanUp() {
public void shouldConsiderMaxTimeToLive() throws Exception {
// given
zeebeContainer.getClient().newDeployResourceCommand().addProcessModel(WORKFLOW, "process.bpmn").send().join();
Thread.sleep(1000);
final var message = redisConnection.sync()
.xrange("zeebe:DEPLOYMENT", Range.create("-", "+")).get(0);
assertThat(message).isNotNull();
Thread.sleep(1000);
var deploymentLen = redisConnection.sync().xlen("zeebe:DEPLOYMENT");
assertThat(deploymentLen).isGreaterThan(0);

Expand All @@ -64,7 +64,7 @@ public void shouldConsiderMaxTimeToLive() throws Exception {

// then
assertThat(redisConnection.sync().xlen("zeebe:DEPLOYMENT")).isEqualTo(deploymentLen);
Thread.sleep(7000);
Thread.sleep(4000);
assertThat(redisConnection.sync().xlen("zeebe:DEPLOYMENT")).isEqualTo(0);
}
}
4 changes: 3 additions & 1 deletion exporter/src/test/java/io/zeebe/redis/ExporterTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public void cleanUp() {
public void shouldExportEventsAsProtobuf() throws Exception {
// given
zeebeContainer.getClient().newDeployResourceCommand().addProcessModel(WORKFLOW, "process.bpmn").send().join();
Thread.sleep(1000);

// when
final var message = redisConnection.sync()
Expand All @@ -71,7 +72,8 @@ final var record = Schema.Record.parseFrom(messageValue);
public void shouldSupportConsumerGroups() throws Exception {
// given
zeebeContainer.getClient().newDeployResourceCommand().addProcessModel(WORKFLOW, "process.bpmn").send().join();
redisConnection.sync().xgroupCreate(XReadArgs.StreamOffset.from("zeebe:DEPLOYMENT", "0-0"), "application_1");
redisConnection.sync().xgroupCreate(XReadArgs.StreamOffset.from("zeebe:DEPLOYMENT", "0-0"), "application_1",
XGroupCreateArgs.Builder.mkstream());
Thread.sleep(1000);

// when
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import io.lettuce.core.XGroupCreateArgs;
import io.lettuce.core.XReadArgs;
import io.lettuce.core.api.StatefulRedisConnection;
import io.zeebe.redis.exporter.ProtobufCodec;
import io.zeebe.redis.testcontainers.ZeebeTestContainer;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
Expand All @@ -18,7 +17,7 @@
import static org.assertj.core.api.Assertions.assertThat;

@Testcontainers
public class ZeebeRedisExporterTest {
public class RedisUnavailabilityTest {

private static final BpmnModelInstance WORKFLOW =
Bpmn.createExecutableProcess("process")
Expand All @@ -29,15 +28,15 @@ public class ZeebeRedisExporterTest {
.endEvent("end")
.done();
@Container
public ZeebeTestContainer zeebeContainer = ZeebeTestContainer.withDefaultConfig();
public ZeebeTestContainer zeebeContainer = ZeebeTestContainer.withJsonFormat();

private RedisClient redisClient;
private StatefulRedisConnection<String, byte[]> redisConnection;
private StatefulRedisConnection<String, String> redisConnection;

@BeforeEach
public void init() {
redisClient = RedisClient.create(zeebeContainer.getRedisAddress());
redisConnection = redisClient.connect(new ProtobufCodec());
redisConnection = redisClient.connect();
}

@AfterEach
Expand All @@ -61,7 +60,7 @@ public void worksCorrectIfRedisIsTemporarilyUnavailable() throws Exception {
// when
zeebeContainer.getRedisContainer().start();
redisClient = RedisClient.create(zeebeContainer.getRedisAddress());
redisConnection = redisClient.connect(new ProtobufCodec());
redisConnection = redisClient.connect();
Thread.sleep(5000);
redisConnection.sync().xgroupCreate(XReadArgs.StreamOffset.from("zeebe:DEPLOYMENT", "0-0"),
"application_1", XGroupCreateArgs.Builder.mkstream());
Expand All @@ -70,8 +69,14 @@ public void worksCorrectIfRedisIsTemporarilyUnavailable() throws Exception {
XReadArgs.Builder.block(6000),
XReadArgs.StreamOffset.lastConsumed("zeebe:DEPLOYMENT"));

long createdCount = messages.stream().map(m -> m.getBody().values().stream().findFirst().get())
.filter(json -> json.contains("\"valueType\":\"DEPLOYMENT\""))
.filter(json -> json.contains("\"recordType\":\"EVENT\""))
.filter(json -> json.contains("\"intent\":\"CREATED\""))
.count();

// then
assertThat(messages.size()).isEqualTo(6);
assertThat(createdCount).isEqualTo(2);

}
}
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
</parent>

<properties>
<version.zeebe>8.2.16</version.zeebe>
<version.zeebe>8.3.0</version.zeebe>
<version.exporter.protobuf>1.4.0</version.exporter.protobuf>
<version.lettuce>6.2.6.RELEASE</version.lettuce>
<version.log4j>2.20.0</version.log4j>
Expand Down

0 comments on commit 4f3c810

Please sign in to comment.