From 4f3c81009bf232f2e790d3f4fe3a479a6dfa775f Mon Sep 17 00:00:00 2001 From: Gunnar von der Beck Date: Thu, 12 Oct 2023 11:02:36 +0200 Subject: [PATCH] feat: upgrade to Zeebe Version 8.3.0 --- .../io/zeebe/redis/ExporterRecordTest.java | 45 ++++++++++++------- .../java/io/zeebe/redis/ExporterTest.java | 8 ++-- ...Test.java => RedisUnavailabilityTest.java} | 6 +-- .../java/io/zeebe/redis/ExporterJsonTest.java | 5 ++- .../redis/ExporterMaxTimeToLiveTest.java | 4 +- .../java/io/zeebe/redis/ExporterTest.java | 4 +- ...Test.java => RedisUnavailabilityTest.java} | 19 +++++--- pom.xml | 2 +- 8 files changed, 58 insertions(+), 35 deletions(-) rename connector-java/src/test/java/io/zeebe/redis/{ZeebeRedisClientTest.java => RedisUnavailabilityTest.java} (94%) rename exporter/src/test/java/io/zeebe/redis/{ZeebeRedisExporterTest.java => RedisUnavailabilityTest.java} (80%) diff --git a/connector-java/src/test/java/io/zeebe/redis/ExporterRecordTest.java b/connector-java/src/test/java/io/zeebe/redis/ExporterRecordTest.java index 8bc6869..26111f7 100644 --- a/connector-java/src/test/java/io/zeebe/redis/ExporterRecordTest.java +++ b/connector-java/src/test/java/io/zeebe/redis/ExporterRecordTest.java @@ -3,6 +3,7 @@ import io.camunda.zeebe.client.ZeebeClient; import io.camunda.zeebe.model.bpmn.Bpmn; import io.camunda.zeebe.model.bpmn.BpmnModelInstance; +import io.camunda.zeebe.protocol.record.intent.*; import io.lettuce.core.RedisClient; import io.zeebe.exporter.proto.Schema; import io.zeebe.redis.connect.java.ZeebeRedis; @@ -35,6 +36,9 @@ public class ExporterRecordTest { .message(m -> m.name("message").zeebeCorrelationKeyExpression("key")) .boundaryEvent("timer", b -> b.timerWithDuration("PT1M")) .endEvent() + .moveToNode("receive-task") + .serviceTask("errorTask", s -> s.zeebeJobType("error")) + .endEvent() .done(); private static final BpmnModelInstance MESSAGE_PROCESS = @@ -142,18 +146,23 @@ public void shouldExportRecords() { client.newActivateJobsCommand().jobType("test").maxJobsToActivate(1).send().join(); jobsResponse.getJobs().forEach(job -> client.newCompleteCommand(job.getKey()).send().join()); + final var errorResponse = + client.newActivateJobsCommand().jobType("error").maxJobsToActivate(1).send().join(); + errorResponse.getJobs().forEach(job -> client.newFailCommand(job.getKey()).retries(0) + .errorMessage("Error").send().join()); + // then await().atMost(Duration.ofSeconds(20)).pollInterval(Duration.ofSeconds(2)) .untilAsserted( () -> { assertThat(deploymentRecords) - .hasSizeGreaterThanOrEqualTo(3) + .hasSizeGreaterThanOrEqualTo(2) .allSatisfy( r -> assertThat(r.getMetadata().getValueType()) .isEqualTo(Schema.RecordMetadata.ValueType.DEPLOYMENT)) .extracting(r -> r.getMetadata().getIntent()) - .contains("CREATE", "CREATED", "FULLY_DISTRIBUTED"); + .contains(DeploymentIntent.CREATE.name(), DeploymentIntent.CREATED.name()); assertThat(incidentRecords) .hasSizeGreaterThanOrEqualTo(1) @@ -162,7 +171,7 @@ public void shouldExportRecords() { assertThat(r.getMetadata().getValueType()) .isEqualTo(Schema.RecordMetadata.ValueType.INCIDENT)) .extracting(r -> r.getMetadata().getIntent()) - .contains("CREATED"); + .contains(IncidentIntent.CREATED.name()); assertThat(jobBatchRecords) .hasSizeGreaterThanOrEqualTo(2) @@ -171,7 +180,7 @@ public void shouldExportRecords() { assertThat(r.getMetadata().getValueType()) .isEqualTo(Schema.RecordMetadata.ValueType.JOB_BATCH)) .extracting(r -> r.getMetadata().getIntent()) - .contains("ACTIVATE", "ACTIVATED"); + .contains(JobBatchIntent.ACTIVATE.name(), JobBatchIntent.ACTIVATED.name()); assertThat(jobRecords) .hasSizeGreaterThanOrEqualTo(2) @@ -180,7 +189,7 @@ public void shouldExportRecords() { assertThat(r.getMetadata().getValueType()) .isEqualTo(Schema.RecordMetadata.ValueType.JOB)) .extracting(r -> r.getMetadata().getIntent()) - .contains("CREATED", "COMPLETED"); + .contains(JobIntent.CREATED.name(), JobIntent.COMPLETED.name()); assertThat(messageRecords) .hasSizeGreaterThanOrEqualTo(2) @@ -189,7 +198,7 @@ public void shouldExportRecords() { assertThat(r.getMetadata().getValueType()) .isEqualTo(Schema.RecordMetadata.ValueType.MESSAGE)) .extracting(r -> r.getMetadata().getIntent()) - .contains("PUBLISH", "PUBLISHED"); + .contains(MessageIntent.PUBLISH.name(), MessageIntent.PUBLISHED.name()); assertThat(messageStartEventSubscriptionRecords) .hasSizeGreaterThanOrEqualTo(1) @@ -199,7 +208,7 @@ public void shouldExportRecords() { .isEqualTo( Schema.RecordMetadata.ValueType.MESSAGE_START_EVENT_SUBSCRIPTION)) .extracting(r -> r.getMetadata().getIntent()) - .contains("CREATED"); + .contains(MessageStartEventSubscriptionIntent.CREATED.name()); assertThat(messageSubscriptionRecords) .hasSizeGreaterThanOrEqualTo(3) @@ -208,7 +217,8 @@ public void shouldExportRecords() { assertThat(r.getMetadata().getValueType()) .isEqualTo(Schema.RecordMetadata.ValueType.MESSAGE_SUBSCRIPTION)) .extracting(r -> r.getMetadata().getIntent()) - .contains("CREATED", "CORRELATING", "CORRELATED"); + .contains(MessageSubscriptionIntent.CREATED.name(), MessageSubscriptionIntent.CORRELATING.name(), + MessageSubscriptionIntent.CORRELATED.name()); assertThat(processEventRecords) .hasSizeGreaterThanOrEqualTo(2) @@ -217,7 +227,7 @@ public void shouldExportRecords() { assertThat(r.getMetadata().getValueType()) .isEqualTo(Schema.RecordMetadata.ValueType.PROCESS_EVENT)) .extracting(r -> r.getMetadata().getIntent()) - .contains("TRIGGERING", "TRIGGERED"); + .contains(ProcessEventIntent.TRIGGERING.name(), ProcessEventIntent.TRIGGERED.name()); assertThat(processInstanceCreationRecords) .hasSizeGreaterThanOrEqualTo(2) @@ -226,7 +236,7 @@ public void shouldExportRecords() { assertThat(r.getMetadata().getValueType()) .isEqualTo(Schema.RecordMetadata.ValueType.PROCESS_INSTANCE_CREATION)) .extracting(r -> r.getMetadata().getIntent()) - .contains("CREATE", "CREATED"); + .contains(ProcessInstanceCreationIntent.CREATE.name(), ProcessInstanceCreationIntent.CREATED.name()); assertThat(processInstanceRecords) .hasSizeGreaterThanOrEqualTo(3) @@ -235,7 +245,9 @@ public void shouldExportRecords() { assertThat(r.getMetadata().getValueType()) .isEqualTo(Schema.RecordMetadata.ValueType.PROCESS_INSTANCE)) .extracting(r -> r.getMetadata().getIntent()) - .contains("ACTIVATE_ELEMENT", "ELEMENT_ACTIVATING", "ELEMENT_ACTIVATED"); + .contains(ProcessInstanceIntent.ACTIVATE_ELEMENT.name(), + ProcessInstanceIntent.ELEMENT_ACTIVATING.name(), + ProcessInstanceIntent.ELEMENT_ACTIVATED.name()); assertThat(processMessageSubscriptionRecords) .hasSizeGreaterThanOrEqualTo(2) @@ -245,7 +257,8 @@ public void shouldExportRecords() { .isEqualTo( Schema.RecordMetadata.ValueType.PROCESS_MESSAGE_SUBSCRIPTION)) .extracting(r -> r.getMetadata().getIntent()) - .contains("CREATING", "CORRELATED"); + .contains(ProcessMessageSubscriptionIntent.CREATED.name(), + ProcessMessageSubscriptionIntent.CORRELATED.name()); assertThat(processRecords) .hasSizeGreaterThanOrEqualTo(2) @@ -254,7 +267,7 @@ public void shouldExportRecords() { assertThat(r.getMetadata().getValueType()) .isEqualTo(Schema.RecordMetadata.ValueType.PROCESS)) .extracting(r -> r.getMetadata().getIntent()) - .contains("CREATED"); + .contains(ProcessIntent.CREATED.name()); assertThat(timerRecords) .hasSizeGreaterThanOrEqualTo(1) @@ -263,7 +276,7 @@ public void shouldExportRecords() { assertThat(r.getMetadata().getValueType()) .isEqualTo(Schema.RecordMetadata.ValueType.TIMER)) .extracting(r -> r.getMetadata().getIntent()) - .contains("CREATED"); + .contains(TimerIntent.CREATED.name()); assertThat(variableDocumentRecords) .hasSizeGreaterThanOrEqualTo(1) @@ -272,7 +285,7 @@ public void shouldExportRecords() { assertThat(r.getMetadata().getValueType()) .isEqualTo(Schema.RecordMetadata.ValueType.VARIABLE_DOCUMENT)) .extracting(r -> r.getMetadata().getIntent()) - .contains("UPDATE", "UPDATED"); + .contains(VariableDocumentIntent.UPDATE.name(), VariableDocumentIntent.UPDATED.name()); assertThat(variableRecords) .hasSizeGreaterThanOrEqualTo(2) @@ -281,7 +294,7 @@ public void shouldExportRecords() { assertThat(r.getMetadata().getValueType()) .isEqualTo(Schema.RecordMetadata.ValueType.VARIABLE)) .extracting(r -> r.getMetadata().getIntent()) - .contains("CREATED"); + .contains(VariableIntent.CREATED.name()); }); } } diff --git a/connector-java/src/test/java/io/zeebe/redis/ExporterTest.java b/connector-java/src/test/java/io/zeebe/redis/ExporterTest.java index c6c6933..faa483e 100644 --- a/connector-java/src/test/java/io/zeebe/redis/ExporterTest.java +++ b/connector-java/src/test/java/io/zeebe/redis/ExporterTest.java @@ -73,12 +73,12 @@ public void shouldListenToDeploymentAndProcessEvents() { client.newDeployResourceCommand().addProcessModel(PROCESS, "process.bpmn").send().join(); // then - Awaitility.await("await until the deployment is fully distributed") + Awaitility.await("await until the deployment is created") .untilAsserted( () -> assertThat(deploymentRecords) .extracting(r -> r.getMetadata().getIntent()) - .contains(DeploymentIntent.FULLY_DISTRIBUTED.name())); + .contains(DeploymentIntent.CREATED.name())); // when client.newCreateInstanceCommand().bpmnProcessId("process").latestVersion().send().join(); @@ -117,13 +117,13 @@ public void shouldDeliverDifferentMessagesPerConsumerId() { client.newDeployResourceCommand().addProcessModel(PROCESS, "process4.bpmn").send().join(); // then - Awaitility.await("await until all deployments are fully distributed") + Awaitility.await("await until all deployments are created") .untilAsserted(() -> { var allRecords = new ArrayList<>(records1); allRecords.addAll(records2); assertThat(allRecords) .extracting(r -> r.getMetadata().getIntent()) - .filteredOn(i -> i.equals(DeploymentIntent.FULLY_DISTRIBUTED.name())) + .filteredOn(i -> i.equals(DeploymentIntent.CREATED.name())) .hasSize(4); }); diff --git a/connector-java/src/test/java/io/zeebe/redis/ZeebeRedisClientTest.java b/connector-java/src/test/java/io/zeebe/redis/RedisUnavailabilityTest.java similarity index 94% rename from connector-java/src/test/java/io/zeebe/redis/ZeebeRedisClientTest.java rename to connector-java/src/test/java/io/zeebe/redis/RedisUnavailabilityTest.java index 2140df3..3481983 100644 --- a/connector-java/src/test/java/io/zeebe/redis/ZeebeRedisClientTest.java +++ b/connector-java/src/test/java/io/zeebe/redis/RedisUnavailabilityTest.java @@ -23,7 +23,7 @@ import static org.assertj.core.api.Assertions.assertThat; @Testcontainers -public class ZeebeRedisClientTest { +public class RedisUnavailabilityTest { private static final BpmnModelInstance PROCESS = Bpmn.createExecutableProcess("process") @@ -76,12 +76,12 @@ public void shouldReconnectIfRedisIsTempUnavailable() throws Exception{ Thread.sleep(2000); // then - Awaitility.await("await until the deployment is fully distributed") + Awaitility.await("await until the deployment is created") .atMost(Duration.ofSeconds(10)) .pollInterval(Duration.ofSeconds(2)) .untilAsserted(() -> assertThat(deploymentRecords) .extracting(r -> r.getMetadata().getIntent()) - .contains(DeploymentIntent.FULLY_DISTRIBUTED.name())); + .contains(DeploymentIntent.CREATED.name())); var redisConnection = redisClient.connect(new ProtobufCodec()); Awaitility.await("await until all messages of deployment stream have been deleted") diff --git a/exporter/src/test/java/io/zeebe/redis/ExporterJsonTest.java b/exporter/src/test/java/io/zeebe/redis/ExporterJsonTest.java index 04f65da..3ca6da2 100644 --- a/exporter/src/test/java/io/zeebe/redis/ExporterJsonTest.java +++ b/exporter/src/test/java/io/zeebe/redis/ExporterJsonTest.java @@ -4,6 +4,8 @@ import io.camunda.zeebe.model.bpmn.BpmnModelInstance; import io.lettuce.core.Range; import io.lettuce.core.RedisClient; +import io.lettuce.core.XGroupCreateArgs; +import io.lettuce.core.XReadArgs; import io.lettuce.core.api.StatefulRedisConnection; import io.zeebe.redis.exporter.ExporterConfiguration; import io.zeebe.redis.testcontainers.ZeebeTestContainer; @@ -50,9 +52,10 @@ public void cleanUp() { @Test - public void shouldExportEventsAsJson() { + public void shouldExportEventsAsJson() throws Exception { // given zeebeContainer.getClient().newDeployResourceCommand().addProcessModel(WORKFLOW, "process.bpmn").send().join(); + Thread.sleep(1000); // when final var message = redisConnection.sync() diff --git a/exporter/src/test/java/io/zeebe/redis/ExporterMaxTimeToLiveTest.java b/exporter/src/test/java/io/zeebe/redis/ExporterMaxTimeToLiveTest.java index 158de9a..96f5b77 100644 --- a/exporter/src/test/java/io/zeebe/redis/ExporterMaxTimeToLiveTest.java +++ b/exporter/src/test/java/io/zeebe/redis/ExporterMaxTimeToLiveTest.java @@ -52,10 +52,10 @@ public void cleanUp() { public void shouldConsiderMaxTimeToLive() throws Exception { // given zeebeContainer.getClient().newDeployResourceCommand().addProcessModel(WORKFLOW, "process.bpmn").send().join(); + Thread.sleep(1000); final var message = redisConnection.sync() .xrange("zeebe:DEPLOYMENT", Range.create("-", "+")).get(0); assertThat(message).isNotNull(); - Thread.sleep(1000); var deploymentLen = redisConnection.sync().xlen("zeebe:DEPLOYMENT"); assertThat(deploymentLen).isGreaterThan(0); @@ -64,7 +64,7 @@ public void shouldConsiderMaxTimeToLive() throws Exception { // then assertThat(redisConnection.sync().xlen("zeebe:DEPLOYMENT")).isEqualTo(deploymentLen); - Thread.sleep(7000); + Thread.sleep(4000); assertThat(redisConnection.sync().xlen("zeebe:DEPLOYMENT")).isEqualTo(0); } } diff --git a/exporter/src/test/java/io/zeebe/redis/ExporterTest.java b/exporter/src/test/java/io/zeebe/redis/ExporterTest.java index 9b26a24..bbe7b5c 100644 --- a/exporter/src/test/java/io/zeebe/redis/ExporterTest.java +++ b/exporter/src/test/java/io/zeebe/redis/ExporterTest.java @@ -50,6 +50,7 @@ public void cleanUp() { public void shouldExportEventsAsProtobuf() throws Exception { // given zeebeContainer.getClient().newDeployResourceCommand().addProcessModel(WORKFLOW, "process.bpmn").send().join(); + Thread.sleep(1000); // when final var message = redisConnection.sync() @@ -71,7 +72,8 @@ final var record = Schema.Record.parseFrom(messageValue); public void shouldSupportConsumerGroups() throws Exception { // given zeebeContainer.getClient().newDeployResourceCommand().addProcessModel(WORKFLOW, "process.bpmn").send().join(); - redisConnection.sync().xgroupCreate(XReadArgs.StreamOffset.from("zeebe:DEPLOYMENT", "0-0"), "application_1"); + redisConnection.sync().xgroupCreate(XReadArgs.StreamOffset.from("zeebe:DEPLOYMENT", "0-0"), "application_1", + XGroupCreateArgs.Builder.mkstream()); Thread.sleep(1000); // when diff --git a/exporter/src/test/java/io/zeebe/redis/ZeebeRedisExporterTest.java b/exporter/src/test/java/io/zeebe/redis/RedisUnavailabilityTest.java similarity index 80% rename from exporter/src/test/java/io/zeebe/redis/ZeebeRedisExporterTest.java rename to exporter/src/test/java/io/zeebe/redis/RedisUnavailabilityTest.java index fc6c2a4..39434e4 100644 --- a/exporter/src/test/java/io/zeebe/redis/ZeebeRedisExporterTest.java +++ b/exporter/src/test/java/io/zeebe/redis/RedisUnavailabilityTest.java @@ -7,7 +7,6 @@ import io.lettuce.core.XGroupCreateArgs; import io.lettuce.core.XReadArgs; import io.lettuce.core.api.StatefulRedisConnection; -import io.zeebe.redis.exporter.ProtobufCodec; import io.zeebe.redis.testcontainers.ZeebeTestContainer; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -18,7 +17,7 @@ import static org.assertj.core.api.Assertions.assertThat; @Testcontainers -public class ZeebeRedisExporterTest { +public class RedisUnavailabilityTest { private static final BpmnModelInstance WORKFLOW = Bpmn.createExecutableProcess("process") @@ -29,15 +28,15 @@ public class ZeebeRedisExporterTest { .endEvent("end") .done(); @Container - public ZeebeTestContainer zeebeContainer = ZeebeTestContainer.withDefaultConfig(); + public ZeebeTestContainer zeebeContainer = ZeebeTestContainer.withJsonFormat(); private RedisClient redisClient; - private StatefulRedisConnection redisConnection; + private StatefulRedisConnection redisConnection; @BeforeEach public void init() { redisClient = RedisClient.create(zeebeContainer.getRedisAddress()); - redisConnection = redisClient.connect(new ProtobufCodec()); + redisConnection = redisClient.connect(); } @AfterEach @@ -61,7 +60,7 @@ public void worksCorrectIfRedisIsTemporarilyUnavailable() throws Exception { // when zeebeContainer.getRedisContainer().start(); redisClient = RedisClient.create(zeebeContainer.getRedisAddress()); - redisConnection = redisClient.connect(new ProtobufCodec()); + redisConnection = redisClient.connect(); Thread.sleep(5000); redisConnection.sync().xgroupCreate(XReadArgs.StreamOffset.from("zeebe:DEPLOYMENT", "0-0"), "application_1", XGroupCreateArgs.Builder.mkstream()); @@ -70,8 +69,14 @@ public void worksCorrectIfRedisIsTemporarilyUnavailable() throws Exception { XReadArgs.Builder.block(6000), XReadArgs.StreamOffset.lastConsumed("zeebe:DEPLOYMENT")); + long createdCount = messages.stream().map(m -> m.getBody().values().stream().findFirst().get()) + .filter(json -> json.contains("\"valueType\":\"DEPLOYMENT\"")) + .filter(json -> json.contains("\"recordType\":\"EVENT\"")) + .filter(json -> json.contains("\"intent\":\"CREATED\"")) + .count(); + // then - assertThat(messages.size()).isEqualTo(6); + assertThat(createdCount).isEqualTo(2); } } diff --git a/pom.xml b/pom.xml index 5ce7f88..b5421b1 100644 --- a/pom.xml +++ b/pom.xml @@ -14,7 +14,7 @@ - 8.2.16 + 8.3.0 1.4.0 6.2.6.RELEASE 2.20.0