diff --git a/engine/src/main/java/io/camunda/zeebe/process/test/engine/GrpcResponseMapper.java b/engine/src/main/java/io/camunda/zeebe/process/test/engine/GrpcResponseMapper.java index 90ee63c3..ea52ca6c 100644 --- a/engine/src/main/java/io/camunda/zeebe/process/test/engine/GrpcResponseMapper.java +++ b/engine/src/main/java/io/camunda/zeebe/process/test/engine/GrpcResponseMapper.java @@ -40,6 +40,8 @@ import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.FailJobResponse; import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.FormMetadata; import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.MatchedDecisionRule; +import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.MigrateProcessInstanceRequest; +import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.MigrateProcessInstanceResponse; import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.ModifyProcessInstanceRequest; import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.ModifyProcessInstanceResponse; import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.ProcessMetadata; @@ -100,6 +102,8 @@ class GrpcResponseMapper { Map.entry(UpdateJobRetriesRequest.class, this::createJobUpdateRetriesResponse), Map.entry(UpdateJobTimeoutRequest.class, this::createJobUpdateTimeOutResponse), Map.entry(ModifyProcessInstanceRequest.class, this::createModifyProcessInstanceResponse), + Map.entry( + MigrateProcessInstanceRequest.class, this::createMigrateProcessInstanceResponse), Map.entry(BroadcastSignalRequest.class, this::createBroadcastSignalResponse)); GeneratedMessageV3 map( @@ -294,6 +298,10 @@ private GeneratedMessageV3 createModifyProcessInstanceResponse() { return ModifyProcessInstanceResponse.newBuilder().build(); } + private GeneratedMessageV3 createMigrateProcessInstanceResponse() { + return MigrateProcessInstanceResponse.getDefaultInstance(); + } + private GeneratedMessageV3 createBroadcastSignalResponse() { final SignalRecord signal = new SignalRecord(); signal.wrap(valueBufferView); diff --git a/engine/src/main/java/io/camunda/zeebe/process/test/engine/GrpcToLogStreamGateway.java b/engine/src/main/java/io/camunda/zeebe/process/test/engine/GrpcToLogStreamGateway.java index e54dcbb3..4389a024 100644 --- a/engine/src/main/java/io/camunda/zeebe/process/test/engine/GrpcToLogStreamGateway.java +++ b/engine/src/main/java/io/camunda/zeebe/process/test/engine/GrpcToLogStreamGateway.java @@ -32,6 +32,8 @@ import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.EvaluateDecisionResponse; import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.FailJobRequest; import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.FailJobResponse; +import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.MigrateProcessInstanceRequest; +import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.MigrateProcessInstanceResponse; import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.ModifyProcessInstanceRequest; import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.ModifyProcessInstanceResponse; import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.Partition; @@ -61,6 +63,8 @@ import io.camunda.zeebe.protocol.impl.record.value.message.MessageRecord; import io.camunda.zeebe.protocol.impl.record.value.processinstance.ProcessInstanceCreationRecord; import io.camunda.zeebe.protocol.impl.record.value.processinstance.ProcessInstanceCreationStartInstruction; +import io.camunda.zeebe.protocol.impl.record.value.processinstance.ProcessInstanceMigrationMappingInstruction; +import io.camunda.zeebe.protocol.impl.record.value.processinstance.ProcessInstanceMigrationRecord; import io.camunda.zeebe.protocol.impl.record.value.processinstance.ProcessInstanceModificationActivateInstruction; import io.camunda.zeebe.protocol.impl.record.value.processinstance.ProcessInstanceModificationRecord; import io.camunda.zeebe.protocol.impl.record.value.processinstance.ProcessInstanceModificationTerminateInstruction; @@ -79,6 +83,7 @@ import io.camunda.zeebe.protocol.record.intent.MessageIntent; import io.camunda.zeebe.protocol.record.intent.ProcessInstanceCreationIntent; import io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent; +import io.camunda.zeebe.protocol.record.intent.ProcessInstanceMigrationIntent; import io.camunda.zeebe.protocol.record.intent.ProcessInstanceModificationIntent; import io.camunda.zeebe.protocol.record.intent.ResourceDeletionIntent; import io.camunda.zeebe.protocol.record.intent.SignalIntent; @@ -474,6 +479,35 @@ public void modifyProcessInstance( writer.writeCommandWithKey(request.getProcessInstanceKey(), record, recordMetadata); } + @Override + public void migrateProcessInstance( + final MigrateProcessInstanceRequest request, + final StreamObserver responseObserver) { + final var requestId = + gatewayRequestStore.registerNewRequest(request.getClass(), responseObserver); + + final var command = + new ProcessInstanceMigrationRecord() + .setProcessInstanceKey(request.getProcessInstanceKey()) + .setTargetProcessDefinitionKey( + request.getMigrationPlan().getTargetProcessDefinitionKey()); + + request.getMigrationPlan().getMappingInstructionsList().stream() + .map( + instruction -> + new ProcessInstanceMigrationMappingInstruction() + .setSourceElementId(instruction.getSourceElementId()) + .setTargetElementId(instruction.getTargetElementId())) + .forEach(command::addMappingInstruction); + + writer.writeCommandWithoutKey( + command, + prepareRecordMetadata() + .requestId(requestId) + .valueType(ValueType.PROCESS_INSTANCE_MIGRATION) + .intent(ProcessInstanceMigrationIntent.MIGRATE)); + } + @Override public void updateJobTimeout( final UpdateJobTimeoutRequest request, diff --git a/engine/src/test/java/io/camunda/zeebe/process/test/engine/EngineClientTest.java b/engine/src/test/java/io/camunda/zeebe/process/test/engine/EngineClientTest.java index b7075b28..12e68bb6 100644 --- a/engine/src/test/java/io/camunda/zeebe/process/test/engine/EngineClientTest.java +++ b/engine/src/test/java/io/camunda/zeebe/process/test/engine/EngineClientTest.java @@ -412,6 +412,65 @@ void shouldModifyProcessInstance() throws InterruptedException, TimeoutException .isNotEmpty(); } + @Test + void shouldMigrateProcessInstance() throws InterruptedException, TimeoutException { + // given + final DeploymentEvent deployment = + zeebeClient + .newDeployResourceCommand() + .addProcessModel( + Bpmn.createExecutableProcess("sourceProcess") + .startEvent() + .serviceTask("A", b -> b.zeebeJobType("a")) + .endEvent() + .done(), + "sourceProcess.bpmn") + .addProcessModel( + Bpmn.createExecutableProcess("targetProcess") + .startEvent() + .serviceTask("B", b -> b.zeebeJobTypeExpression("b")) + .endEvent() + .done(), + "targetProcess.bpmn") + .send() + .join(); + + final long processInstanceKey = + zeebeClient + .newCreateInstanceCommand() + .bpmnProcessId("sourceProcess") + .latestVersion() + .send() + .join() + .getProcessInstanceKey(); + + zeebeEngine.waitForIdleState(Duration.ofSeconds(1)); + + final long targetProcessDefinitionKey = + deployment.getProcesses().stream() + .filter(p -> p.getBpmnProcessId().equals("targetProcess")) + .findFirst() + .orElseThrow() + .getProcessDefinitionKey(); + zeebeClient + .newMigrateProcessInstanceCommand(processInstanceKey) + .migrationPlan(targetProcessDefinitionKey) + .addMappingInstruction("A", "B") + .send() + .join(); + + assertThat( + StreamSupport.stream( + RecordStream.of(zeebeEngine.getRecordStreamSource()) + .processInstanceRecords() + .spliterator(), + false) + .filter(r -> r.getIntent() == ProcessInstanceIntent.ELEMENT_MIGRATED) + .filter(r -> r.getValue().getElementId().equals("B")) + .findFirst()) + .isNotEmpty(); + } + @Test void shouldUpdateVariablesOnProcessInstance() { // given diff --git a/engine/src/test/java/io/camunda/zeebe/process/test/engine/GrpcToLogStreamGatewayTest.java b/engine/src/test/java/io/camunda/zeebe/process/test/engine/GrpcToLogStreamGatewayTest.java index 2ca139db..2bd63957 100644 --- a/engine/src/test/java/io/camunda/zeebe/process/test/engine/GrpcToLogStreamGatewayTest.java +++ b/engine/src/test/java/io/camunda/zeebe/process/test/engine/GrpcToLogStreamGatewayTest.java @@ -13,6 +13,7 @@ import io.camunda.zeebe.gateway.protocol.GatewayGrpc.GatewayImplBase; import java.lang.reflect.Method; import java.util.Arrays; +import java.util.List; import java.util.Optional; import java.util.stream.Stream; import org.junit.jupiter.params.ParameterizedTest; @@ -21,6 +22,19 @@ class GrpcToLogStreamGatewayTest { + static final List UNSUPPORTED_METHODS = List.of("streamActivatedJobs"); + + static final List IGNORED_METHODS = + List.of( + "bindService", + "equals", + "getClass", + "hashCode", + "notify", + "notifyAll", + "toString", + "wait"); + @ParameterizedTest(name = "{0}") @MethodSource("provideMethods") void testImplementsGatewayEndpoint(final String methodName) { @@ -30,13 +44,21 @@ void testImplementsGatewayEndpoint(final String methodName) { .findAny(); assertThat(optionalMethod) - .describedAs("Expected method %s to be implemented", methodName) + .describedAs( + """ + Expected method %s to be implemented. \ + When this test fails, it's likely a new RPC that ZPT should support. \ + Please check whether it should be supported by ZPT. \ + If it should be suported, add a test case to EngineClientTest.java""", + methodName) .isPresent(); } - private static Stream provideMethods() { - return Arrays.stream(GatewayImplBase.class.getDeclaredMethods()) - .filter(method -> !method.getName().equals("bindService")) - .map(method -> Arguments.of(method.getName())); + static Stream provideMethods() { + return Arrays.stream(GatewayImplBase.class.getMethods()) + .map(Method::getName) + .filter(name -> !IGNORED_METHODS.contains(name)) + .filter(name -> !UNSUPPORTED_METHODS.contains(name)) + .map(Arguments::of); } }