From f048bf6ca6bbfd7756e4131494fada6ad98b4624 Mon Sep 17 00:00:00 2001 From: Nico Korthout Date: Tue, 2 Jan 2024 13:51:31 +0100 Subject: [PATCH 1/8] refactor: remove duplicate getName Simplifies the map and filter by placing map before filter (cherry picked from commit 6e20e518700861ca821676b3c50338c2326d9c8f) --- .../process/test/engine/GrpcToLogStreamGatewayTest.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/engine/src/test/java/io/camunda/zeebe/process/test/engine/GrpcToLogStreamGatewayTest.java b/engine/src/test/java/io/camunda/zeebe/process/test/engine/GrpcToLogStreamGatewayTest.java index 2ca139db..ce396153 100644 --- a/engine/src/test/java/io/camunda/zeebe/process/test/engine/GrpcToLogStreamGatewayTest.java +++ b/engine/src/test/java/io/camunda/zeebe/process/test/engine/GrpcToLogStreamGatewayTest.java @@ -34,9 +34,10 @@ void testImplementsGatewayEndpoint(final String methodName) { .isPresent(); } - private static Stream provideMethods() { + static Stream provideMethods() { return Arrays.stream(GatewayImplBase.class.getDeclaredMethods()) - .filter(method -> !method.getName().equals("bindService")) - .map(method -> Arguments.of(method.getName())); + .map(Method::getName) + .filter(name -> !name.equals("bindService")) + .map(Arguments::of); } } From 814dbf3925d31e39badc018773095f1ada3ff3da Mon Sep 17 00:00:00 2001 From: Nico Korthout Date: Tue, 2 Jan 2024 13:54:06 +0100 Subject: [PATCH 2/8] refactor: simplify adding other ignored methods Currently, only the bindService method needs to be ignored as it is explicitly declared by GatewayImplBase. But the other methods from the interfaces BindableService and AsyncService are not retrieved by getDeclaredMethods. For this, we'll need to use getMethods. However, that method will also pass undeclared methods like equals which we'll need to explicitly ignore. This we can easily achieve using a list of strings. (cherry picked from commit b75727036c14ec8b97871100b0a8cdd1768f07e7) --- .../process/test/engine/GrpcToLogStreamGatewayTest.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/engine/src/test/java/io/camunda/zeebe/process/test/engine/GrpcToLogStreamGatewayTest.java b/engine/src/test/java/io/camunda/zeebe/process/test/engine/GrpcToLogStreamGatewayTest.java index ce396153..5fb82608 100644 --- a/engine/src/test/java/io/camunda/zeebe/process/test/engine/GrpcToLogStreamGatewayTest.java +++ b/engine/src/test/java/io/camunda/zeebe/process/test/engine/GrpcToLogStreamGatewayTest.java @@ -13,6 +13,7 @@ import io.camunda.zeebe.gateway.protocol.GatewayGrpc.GatewayImplBase; import java.lang.reflect.Method; import java.util.Arrays; +import java.util.List; import java.util.Optional; import java.util.stream.Stream; import org.junit.jupiter.params.ParameterizedTest; @@ -21,6 +22,8 @@ class GrpcToLogStreamGatewayTest { + static final List IGNORED_METHODS = List.of("bindService"); + @ParameterizedTest(name = "{0}") @MethodSource("provideMethods") void testImplementsGatewayEndpoint(final String methodName) { @@ -37,7 +40,7 @@ void testImplementsGatewayEndpoint(final String methodName) { static Stream provideMethods() { return Arrays.stream(GatewayImplBase.class.getDeclaredMethods()) .map(Method::getName) - .filter(name -> !name.equals("bindService")) + .filter(name -> !IGNORED_METHODS.contains(name)) .map(Arguments::of); } } From 2777fe2728da36d2d4b25b5a77bd22d4520d1e0b Mon Sep 17 00:00:00 2001 From: Nico Korthout Date: Tue, 2 Jan 2024 13:59:18 +0100 Subject: [PATCH 3/8] test: ensure all gateway rpcs supported This test was not testing what was intended. It wants to ensure that all the RPCs of the gateway are supported by the GrpcToLogStreamGateway. It does this by verifying that the GrpcToLogStreamGateway declares an implementation for each of the gateway's methods. However, the getDeclaredMethods doesn't return those. Possibly these were moved to AsyncService at some point. Note that this is generated code and that this may have been changed by changes upstream in the grpc library. Instead, we must use getMethods to retrieve all methods available on this class (also those defined through default implementations on its interfaces). When we do that, we must also ignore a bunch of undeclared methods, like equals and hashCode. What we're left with is a failing test that fails for two missing methods: - streamActivatedJobs - migrateProcessInstance (cherry picked from commit 00b726b5d6be8621d6c48b3eb683bc39bced722d) --- .../test/engine/GrpcToLogStreamGatewayTest.java | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/engine/src/test/java/io/camunda/zeebe/process/test/engine/GrpcToLogStreamGatewayTest.java b/engine/src/test/java/io/camunda/zeebe/process/test/engine/GrpcToLogStreamGatewayTest.java index 5fb82608..c30beb97 100644 --- a/engine/src/test/java/io/camunda/zeebe/process/test/engine/GrpcToLogStreamGatewayTest.java +++ b/engine/src/test/java/io/camunda/zeebe/process/test/engine/GrpcToLogStreamGatewayTest.java @@ -22,7 +22,16 @@ class GrpcToLogStreamGatewayTest { - static final List IGNORED_METHODS = List.of("bindService"); + static final List IGNORED_METHODS = + List.of( + "bindService", + "equals", + "getClass", + "hashCode", + "notify", + "notifyAll", + "toString", + "wait"); @ParameterizedTest(name = "{0}") @MethodSource("provideMethods") @@ -38,7 +47,7 @@ void testImplementsGatewayEndpoint(final String methodName) { } static Stream provideMethods() { - return Arrays.stream(GatewayImplBase.class.getDeclaredMethods()) + return Arrays.stream(GatewayImplBase.class.getMethods()) .map(Method::getName) .filter(name -> !IGNORED_METHODS.contains(name)) .map(Arguments::of); From 1c9a9efd0caf6e2d6f286ade25a48f94f2da8b14 Mon Sep 17 00:00:00 2001 From: Nico Korthout Date: Tue, 2 Jan 2024 14:16:30 +0100 Subject: [PATCH 4/8] fix: support migrate process instance rpc Fixes failing test case: - GrpcToLogStreamGatewayTest.testImplementsGatewayEndpoint#migrateProcessInstance (cherry picked from commit cb3a81aeb23c8da90c8eea9bbec65259f7d03d18) --- .../test/engine/GrpcToLogStreamGateway.java | 34 +++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/engine/src/main/java/io/camunda/zeebe/process/test/engine/GrpcToLogStreamGateway.java b/engine/src/main/java/io/camunda/zeebe/process/test/engine/GrpcToLogStreamGateway.java index e54dcbb3..4389a024 100644 --- a/engine/src/main/java/io/camunda/zeebe/process/test/engine/GrpcToLogStreamGateway.java +++ b/engine/src/main/java/io/camunda/zeebe/process/test/engine/GrpcToLogStreamGateway.java @@ -32,6 +32,8 @@ import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.EvaluateDecisionResponse; import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.FailJobRequest; import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.FailJobResponse; +import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.MigrateProcessInstanceRequest; +import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.MigrateProcessInstanceResponse; import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.ModifyProcessInstanceRequest; import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.ModifyProcessInstanceResponse; import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.Partition; @@ -61,6 +63,8 @@ import io.camunda.zeebe.protocol.impl.record.value.message.MessageRecord; import io.camunda.zeebe.protocol.impl.record.value.processinstance.ProcessInstanceCreationRecord; import io.camunda.zeebe.protocol.impl.record.value.processinstance.ProcessInstanceCreationStartInstruction; +import io.camunda.zeebe.protocol.impl.record.value.processinstance.ProcessInstanceMigrationMappingInstruction; +import io.camunda.zeebe.protocol.impl.record.value.processinstance.ProcessInstanceMigrationRecord; import io.camunda.zeebe.protocol.impl.record.value.processinstance.ProcessInstanceModificationActivateInstruction; import io.camunda.zeebe.protocol.impl.record.value.processinstance.ProcessInstanceModificationRecord; import io.camunda.zeebe.protocol.impl.record.value.processinstance.ProcessInstanceModificationTerminateInstruction; @@ -79,6 +83,7 @@ import io.camunda.zeebe.protocol.record.intent.MessageIntent; import io.camunda.zeebe.protocol.record.intent.ProcessInstanceCreationIntent; import io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent; +import io.camunda.zeebe.protocol.record.intent.ProcessInstanceMigrationIntent; import io.camunda.zeebe.protocol.record.intent.ProcessInstanceModificationIntent; import io.camunda.zeebe.protocol.record.intent.ResourceDeletionIntent; import io.camunda.zeebe.protocol.record.intent.SignalIntent; @@ -474,6 +479,35 @@ public void modifyProcessInstance( writer.writeCommandWithKey(request.getProcessInstanceKey(), record, recordMetadata); } + @Override + public void migrateProcessInstance( + final MigrateProcessInstanceRequest request, + final StreamObserver responseObserver) { + final var requestId = + gatewayRequestStore.registerNewRequest(request.getClass(), responseObserver); + + final var command = + new ProcessInstanceMigrationRecord() + .setProcessInstanceKey(request.getProcessInstanceKey()) + .setTargetProcessDefinitionKey( + request.getMigrationPlan().getTargetProcessDefinitionKey()); + + request.getMigrationPlan().getMappingInstructionsList().stream() + .map( + instruction -> + new ProcessInstanceMigrationMappingInstruction() + .setSourceElementId(instruction.getSourceElementId()) + .setTargetElementId(instruction.getTargetElementId())) + .forEach(command::addMappingInstruction); + + writer.writeCommandWithoutKey( + command, + prepareRecordMetadata() + .requestId(requestId) + .valueType(ValueType.PROCESS_INSTANCE_MIGRATION) + .intent(ProcessInstanceMigrationIntent.MIGRATE)); + } + @Override public void updateJobTimeout( final UpdateJobTimeoutRequest request, From a1efe27eef81d7db2b84150685fc750187531694 Mon Sep 17 00:00:00 2001 From: Nico Korthout Date: Tue, 2 Jan 2024 14:29:29 +0100 Subject: [PATCH 5/8] fix: ignore unsupported streamActivatedJobs RPC It's not yet possible to support this method as it functions differently from the other RPCS. Typically, the RPCs are implemented by appending a command to the log stream, but for streaming RPCs it works differently. Instead, the gateway opens a stream through which the broker can stream events. As I understand the implementation in Zeebe, this works without appending a command. In order to support this in ZPT, some new mechanism must be added that allows streaming events to the calling client. (cherry picked from commit 46c70fa55f56b69871790bf6c24fee9829a91835) --- .../zeebe/process/test/engine/GrpcToLogStreamGatewayTest.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/engine/src/test/java/io/camunda/zeebe/process/test/engine/GrpcToLogStreamGatewayTest.java b/engine/src/test/java/io/camunda/zeebe/process/test/engine/GrpcToLogStreamGatewayTest.java index c30beb97..2727c012 100644 --- a/engine/src/test/java/io/camunda/zeebe/process/test/engine/GrpcToLogStreamGatewayTest.java +++ b/engine/src/test/java/io/camunda/zeebe/process/test/engine/GrpcToLogStreamGatewayTest.java @@ -22,6 +22,8 @@ class GrpcToLogStreamGatewayTest { + static final List UNSUPPORTED_METHODS = List.of("streamActivatedJobs"); + static final List IGNORED_METHODS = List.of( "bindService", @@ -50,6 +52,7 @@ static Stream provideMethods() { return Arrays.stream(GatewayImplBase.class.getMethods()) .map(Method::getName) .filter(name -> !IGNORED_METHODS.contains(name)) + .filter(name -> !UNSUPPORTED_METHODS.contains(name)) .map(Arguments::of); } } From 267db0fdf0ca5a20ffb0d68bcb798b3498f26d95 Mon Sep 17 00:00:00 2001 From: Nico Korthout Date: Tue, 2 Jan 2024 14:40:49 +0100 Subject: [PATCH 6/8] test: support migrating process instances Adds a simple example test case that shows we can migrate a process instance through the zeebe client in ZPT. (cherry picked from commit c6e082bc33e2ed2a73855021f0f0cd9cdbc1b019) --- .../process/test/engine/EngineClientTest.java | 59 +++++++++++++++++++ 1 file changed, 59 insertions(+) diff --git a/engine/src/test/java/io/camunda/zeebe/process/test/engine/EngineClientTest.java b/engine/src/test/java/io/camunda/zeebe/process/test/engine/EngineClientTest.java index b7075b28..12e68bb6 100644 --- a/engine/src/test/java/io/camunda/zeebe/process/test/engine/EngineClientTest.java +++ b/engine/src/test/java/io/camunda/zeebe/process/test/engine/EngineClientTest.java @@ -412,6 +412,65 @@ void shouldModifyProcessInstance() throws InterruptedException, TimeoutException .isNotEmpty(); } + @Test + void shouldMigrateProcessInstance() throws InterruptedException, TimeoutException { + // given + final DeploymentEvent deployment = + zeebeClient + .newDeployResourceCommand() + .addProcessModel( + Bpmn.createExecutableProcess("sourceProcess") + .startEvent() + .serviceTask("A", b -> b.zeebeJobType("a")) + .endEvent() + .done(), + "sourceProcess.bpmn") + .addProcessModel( + Bpmn.createExecutableProcess("targetProcess") + .startEvent() + .serviceTask("B", b -> b.zeebeJobTypeExpression("b")) + .endEvent() + .done(), + "targetProcess.bpmn") + .send() + .join(); + + final long processInstanceKey = + zeebeClient + .newCreateInstanceCommand() + .bpmnProcessId("sourceProcess") + .latestVersion() + .send() + .join() + .getProcessInstanceKey(); + + zeebeEngine.waitForIdleState(Duration.ofSeconds(1)); + + final long targetProcessDefinitionKey = + deployment.getProcesses().stream() + .filter(p -> p.getBpmnProcessId().equals("targetProcess")) + .findFirst() + .orElseThrow() + .getProcessDefinitionKey(); + zeebeClient + .newMigrateProcessInstanceCommand(processInstanceKey) + .migrationPlan(targetProcessDefinitionKey) + .addMappingInstruction("A", "B") + .send() + .join(); + + assertThat( + StreamSupport.stream( + RecordStream.of(zeebeEngine.getRecordStreamSource()) + .processInstanceRecords() + .spliterator(), + false) + .filter(r -> r.getIntent() == ProcessInstanceIntent.ELEMENT_MIGRATED) + .filter(r -> r.getValue().getElementId().equals("B")) + .findFirst()) + .isNotEmpty(); + } + @Test void shouldUpdateVariablesOnProcessInstance() { // given From 9d13cd57c8d588d8fe2950007090059d49f93f2c Mon Sep 17 00:00:00 2001 From: Nico Korthout Date: Tue, 2 Jan 2024 14:49:27 +0100 Subject: [PATCH 7/8] fix: support instance migration response (cherry picked from commit 0c24ced2f752b9eac6ab0c28c13c3002c1648308) --- .../zeebe/process/test/engine/GrpcResponseMapper.java | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/engine/src/main/java/io/camunda/zeebe/process/test/engine/GrpcResponseMapper.java b/engine/src/main/java/io/camunda/zeebe/process/test/engine/GrpcResponseMapper.java index 90ee63c3..ea52ca6c 100644 --- a/engine/src/main/java/io/camunda/zeebe/process/test/engine/GrpcResponseMapper.java +++ b/engine/src/main/java/io/camunda/zeebe/process/test/engine/GrpcResponseMapper.java @@ -40,6 +40,8 @@ import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.FailJobResponse; import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.FormMetadata; import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.MatchedDecisionRule; +import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.MigrateProcessInstanceRequest; +import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.MigrateProcessInstanceResponse; import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.ModifyProcessInstanceRequest; import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.ModifyProcessInstanceResponse; import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.ProcessMetadata; @@ -100,6 +102,8 @@ class GrpcResponseMapper { Map.entry(UpdateJobRetriesRequest.class, this::createJobUpdateRetriesResponse), Map.entry(UpdateJobTimeoutRequest.class, this::createJobUpdateTimeOutResponse), Map.entry(ModifyProcessInstanceRequest.class, this::createModifyProcessInstanceResponse), + Map.entry( + MigrateProcessInstanceRequest.class, this::createMigrateProcessInstanceResponse), Map.entry(BroadcastSignalRequest.class, this::createBroadcastSignalResponse)); GeneratedMessageV3 map( @@ -294,6 +298,10 @@ private GeneratedMessageV3 createModifyProcessInstanceResponse() { return ModifyProcessInstanceResponse.newBuilder().build(); } + private GeneratedMessageV3 createMigrateProcessInstanceResponse() { + return MigrateProcessInstanceResponse.getDefaultInstance(); + } + private GeneratedMessageV3 createBroadcastSignalResponse() { final SignalRecord signal = new SignalRecord(); signal.wrap(valueBufferView); From d76f3812686c8000ae69b5561441fb0b16f4d154 Mon Sep 17 00:00:00 2001 From: Nico Korthout Date: Tue, 2 Jan 2024 14:55:08 +0100 Subject: [PATCH 8/8] docs: describe what to do when test fails This test is quite important, because it ensure we don't forget supporting RPCs in ZPT. To help clarify this, I've added instructions in case the test fails. (cherry picked from commit 46a2e5c97970bf71cd0cd998264f4a04e2ba871f) --- .../process/test/engine/GrpcToLogStreamGatewayTest.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/engine/src/test/java/io/camunda/zeebe/process/test/engine/GrpcToLogStreamGatewayTest.java b/engine/src/test/java/io/camunda/zeebe/process/test/engine/GrpcToLogStreamGatewayTest.java index 2727c012..2bd63957 100644 --- a/engine/src/test/java/io/camunda/zeebe/process/test/engine/GrpcToLogStreamGatewayTest.java +++ b/engine/src/test/java/io/camunda/zeebe/process/test/engine/GrpcToLogStreamGatewayTest.java @@ -44,7 +44,13 @@ void testImplementsGatewayEndpoint(final String methodName) { .findAny(); assertThat(optionalMethod) - .describedAs("Expected method %s to be implemented", methodName) + .describedAs( + """ + Expected method %s to be implemented. \ + When this test fails, it's likely a new RPC that ZPT should support. \ + Please check whether it should be supported by ZPT. \ + If it should be suported, add a test case to EngineClientTest.java""", + methodName) .isPresent(); }