Skip to content

Commit

Permalink
merge: #1013
Browse files Browse the repository at this point in the history
1013: [Backport release-8.4.0] Support using process instance migration from the zeebe client r=github-actions[bot] a=backport-action

# Description
Backport of #1012 to `release-8.4.0`.

relates to #972

Co-authored-by: Nico Korthout <[email protected]>
  • Loading branch information
zeebe-bors-camunda[bot] and korthout authored Jan 3, 2024
2 parents fe33845 + d76f381 commit 4894ff7
Show file tree
Hide file tree
Showing 4 changed files with 128 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.FailJobResponse;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.FormMetadata;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.MatchedDecisionRule;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.MigrateProcessInstanceRequest;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.MigrateProcessInstanceResponse;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.ModifyProcessInstanceRequest;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.ModifyProcessInstanceResponse;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.ProcessMetadata;
Expand Down Expand Up @@ -100,6 +102,8 @@ class GrpcResponseMapper {
Map.entry(UpdateJobRetriesRequest.class, this::createJobUpdateRetriesResponse),
Map.entry(UpdateJobTimeoutRequest.class, this::createJobUpdateTimeOutResponse),
Map.entry(ModifyProcessInstanceRequest.class, this::createModifyProcessInstanceResponse),
Map.entry(
MigrateProcessInstanceRequest.class, this::createMigrateProcessInstanceResponse),
Map.entry(BroadcastSignalRequest.class, this::createBroadcastSignalResponse));

GeneratedMessageV3 map(
Expand Down Expand Up @@ -294,6 +298,10 @@ private GeneratedMessageV3 createModifyProcessInstanceResponse() {
return ModifyProcessInstanceResponse.newBuilder().build();
}

private GeneratedMessageV3 createMigrateProcessInstanceResponse() {
return MigrateProcessInstanceResponse.getDefaultInstance();
}

private GeneratedMessageV3 createBroadcastSignalResponse() {
final SignalRecord signal = new SignalRecord();
signal.wrap(valueBufferView);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.EvaluateDecisionResponse;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.FailJobRequest;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.FailJobResponse;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.MigrateProcessInstanceRequest;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.MigrateProcessInstanceResponse;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.ModifyProcessInstanceRequest;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.ModifyProcessInstanceResponse;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.Partition;
Expand Down Expand Up @@ -61,6 +63,8 @@
import io.camunda.zeebe.protocol.impl.record.value.message.MessageRecord;
import io.camunda.zeebe.protocol.impl.record.value.processinstance.ProcessInstanceCreationRecord;
import io.camunda.zeebe.protocol.impl.record.value.processinstance.ProcessInstanceCreationStartInstruction;
import io.camunda.zeebe.protocol.impl.record.value.processinstance.ProcessInstanceMigrationMappingInstruction;
import io.camunda.zeebe.protocol.impl.record.value.processinstance.ProcessInstanceMigrationRecord;
import io.camunda.zeebe.protocol.impl.record.value.processinstance.ProcessInstanceModificationActivateInstruction;
import io.camunda.zeebe.protocol.impl.record.value.processinstance.ProcessInstanceModificationRecord;
import io.camunda.zeebe.protocol.impl.record.value.processinstance.ProcessInstanceModificationTerminateInstruction;
Expand All @@ -79,6 +83,7 @@
import io.camunda.zeebe.protocol.record.intent.MessageIntent;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceCreationIntent;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceMigrationIntent;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceModificationIntent;
import io.camunda.zeebe.protocol.record.intent.ResourceDeletionIntent;
import io.camunda.zeebe.protocol.record.intent.SignalIntent;
Expand Down Expand Up @@ -474,6 +479,35 @@ public void modifyProcessInstance(
writer.writeCommandWithKey(request.getProcessInstanceKey(), record, recordMetadata);
}

@Override
public void migrateProcessInstance(
final MigrateProcessInstanceRequest request,
final StreamObserver<MigrateProcessInstanceResponse> responseObserver) {
final var requestId =
gatewayRequestStore.registerNewRequest(request.getClass(), responseObserver);

final var command =
new ProcessInstanceMigrationRecord()
.setProcessInstanceKey(request.getProcessInstanceKey())
.setTargetProcessDefinitionKey(
request.getMigrationPlan().getTargetProcessDefinitionKey());

request.getMigrationPlan().getMappingInstructionsList().stream()
.map(
instruction ->
new ProcessInstanceMigrationMappingInstruction()
.setSourceElementId(instruction.getSourceElementId())
.setTargetElementId(instruction.getTargetElementId()))
.forEach(command::addMappingInstruction);

writer.writeCommandWithoutKey(
command,
prepareRecordMetadata()
.requestId(requestId)
.valueType(ValueType.PROCESS_INSTANCE_MIGRATION)
.intent(ProcessInstanceMigrationIntent.MIGRATE));
}

@Override
public void updateJobTimeout(
final UpdateJobTimeoutRequest request,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,65 @@ void shouldModifyProcessInstance() throws InterruptedException, TimeoutException
.isNotEmpty();
}

@Test
void shouldMigrateProcessInstance() throws InterruptedException, TimeoutException {
// given
final DeploymentEvent deployment =
zeebeClient
.newDeployResourceCommand()
.addProcessModel(
Bpmn.createExecutableProcess("sourceProcess")
.startEvent()
.serviceTask("A", b -> b.zeebeJobType("a"))
.endEvent()
.done(),
"sourceProcess.bpmn")
.addProcessModel(
Bpmn.createExecutableProcess("targetProcess")
.startEvent()
.serviceTask("B", b -> b.zeebeJobTypeExpression("b"))
.endEvent()
.done(),
"targetProcess.bpmn")
.send()
.join();

final long processInstanceKey =
zeebeClient
.newCreateInstanceCommand()
.bpmnProcessId("sourceProcess")
.latestVersion()
.send()
.join()
.getProcessInstanceKey();

zeebeEngine.waitForIdleState(Duration.ofSeconds(1));

final long targetProcessDefinitionKey =
deployment.getProcesses().stream()
.filter(p -> p.getBpmnProcessId().equals("targetProcess"))
.findFirst()
.orElseThrow()
.getProcessDefinitionKey();
zeebeClient
.newMigrateProcessInstanceCommand(processInstanceKey)
.migrationPlan(targetProcessDefinitionKey)
.addMappingInstruction("A", "B")
.send()
.join();

assertThat(
StreamSupport.stream(
RecordStream.of(zeebeEngine.getRecordStreamSource())
.processInstanceRecords()
.spliterator(),
false)
.filter(r -> r.getIntent() == ProcessInstanceIntent.ELEMENT_MIGRATED)
.filter(r -> r.getValue().getElementId().equals("B"))
.findFirst())
.isNotEmpty();
}

@Test
void shouldUpdateVariablesOnProcessInstance() {
// given
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import io.camunda.zeebe.gateway.protocol.GatewayGrpc.GatewayImplBase;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.stream.Stream;
import org.junit.jupiter.params.ParameterizedTest;
Expand All @@ -21,6 +22,19 @@

class GrpcToLogStreamGatewayTest {

static final List<String> UNSUPPORTED_METHODS = List.of("streamActivatedJobs");

static final List<String> IGNORED_METHODS =
List.of(
"bindService",
"equals",
"getClass",
"hashCode",
"notify",
"notifyAll",
"toString",
"wait");

@ParameterizedTest(name = "{0}")
@MethodSource("provideMethods")
void testImplementsGatewayEndpoint(final String methodName) {
Expand All @@ -30,13 +44,21 @@ void testImplementsGatewayEndpoint(final String methodName) {
.findAny();

assertThat(optionalMethod)
.describedAs("Expected method %s to be implemented", methodName)
.describedAs(
"""
Expected method %s to be implemented. \
When this test fails, it's likely a new RPC that ZPT should support. \
Please check whether it should be supported by ZPT. \
If it should be suported, add a test case to EngineClientTest.java""",
methodName)
.isPresent();
}

private static Stream<Arguments> provideMethods() {
return Arrays.stream(GatewayImplBase.class.getDeclaredMethods())
.filter(method -> !method.getName().equals("bindService"))
.map(method -> Arguments.of(method.getName()));
static Stream<Arguments> provideMethods() {
return Arrays.stream(GatewayImplBase.class.getMethods())
.map(Method::getName)
.filter(name -> !IGNORED_METHODS.contains(name))
.filter(name -> !UNSUPPORTED_METHODS.contains(name))
.map(Arguments::of);
}
}

0 comments on commit 4894ff7

Please sign in to comment.