Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support using process instance migration from the zeebe client #1012

Merged
8 commits merged into from
Jan 2, 2024
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.FailJobResponse;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.FormMetadata;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.MatchedDecisionRule;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.MigrateProcessInstanceRequest;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.MigrateProcessInstanceResponse;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.ModifyProcessInstanceRequest;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.ModifyProcessInstanceResponse;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.ProcessMetadata;
Expand Down Expand Up @@ -100,6 +102,8 @@ class GrpcResponseMapper {
Map.entry(UpdateJobRetriesRequest.class, this::createJobUpdateRetriesResponse),
Map.entry(UpdateJobTimeoutRequest.class, this::createJobUpdateTimeOutResponse),
Map.entry(ModifyProcessInstanceRequest.class, this::createModifyProcessInstanceResponse),
Map.entry(
MigrateProcessInstanceRequest.class, this::createMigrateProcessInstanceResponse),
Map.entry(BroadcastSignalRequest.class, this::createBroadcastSignalResponse));

GeneratedMessageV3 map(
Expand Down Expand Up @@ -294,6 +298,10 @@ private GeneratedMessageV3 createModifyProcessInstanceResponse() {
return ModifyProcessInstanceResponse.newBuilder().build();
}

private GeneratedMessageV3 createMigrateProcessInstanceResponse() {
return MigrateProcessInstanceResponse.getDefaultInstance();
}

private GeneratedMessageV3 createBroadcastSignalResponse() {
final SignalRecord signal = new SignalRecord();
signal.wrap(valueBufferView);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.EvaluateDecisionResponse;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.FailJobRequest;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.FailJobResponse;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.MigrateProcessInstanceRequest;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.MigrateProcessInstanceResponse;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.ModifyProcessInstanceRequest;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.ModifyProcessInstanceResponse;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.Partition;
Expand Down Expand Up @@ -61,6 +63,8 @@
import io.camunda.zeebe.protocol.impl.record.value.message.MessageRecord;
import io.camunda.zeebe.protocol.impl.record.value.processinstance.ProcessInstanceCreationRecord;
import io.camunda.zeebe.protocol.impl.record.value.processinstance.ProcessInstanceCreationStartInstruction;
import io.camunda.zeebe.protocol.impl.record.value.processinstance.ProcessInstanceMigrationMappingInstruction;
import io.camunda.zeebe.protocol.impl.record.value.processinstance.ProcessInstanceMigrationRecord;
import io.camunda.zeebe.protocol.impl.record.value.processinstance.ProcessInstanceModificationActivateInstruction;
import io.camunda.zeebe.protocol.impl.record.value.processinstance.ProcessInstanceModificationRecord;
import io.camunda.zeebe.protocol.impl.record.value.processinstance.ProcessInstanceModificationTerminateInstruction;
Expand All @@ -79,6 +83,7 @@
import io.camunda.zeebe.protocol.record.intent.MessageIntent;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceCreationIntent;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceMigrationIntent;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceModificationIntent;
import io.camunda.zeebe.protocol.record.intent.ResourceDeletionIntent;
import io.camunda.zeebe.protocol.record.intent.SignalIntent;
Expand Down Expand Up @@ -474,6 +479,35 @@ public void modifyProcessInstance(
writer.writeCommandWithKey(request.getProcessInstanceKey(), record, recordMetadata);
}

@Override
public void migrateProcessInstance(
final MigrateProcessInstanceRequest request,
final StreamObserver<MigrateProcessInstanceResponse> responseObserver) {
final var requestId =
gatewayRequestStore.registerNewRequest(request.getClass(), responseObserver);

final var command =
new ProcessInstanceMigrationRecord()
.setProcessInstanceKey(request.getProcessInstanceKey())
.setTargetProcessDefinitionKey(
request.getMigrationPlan().getTargetProcessDefinitionKey());

request.getMigrationPlan().getMappingInstructionsList().stream()
.map(
instruction ->
new ProcessInstanceMigrationMappingInstruction()
.setSourceElementId(instruction.getSourceElementId())
.setTargetElementId(instruction.getTargetElementId()))
.forEach(command::addMappingInstruction);

writer.writeCommandWithoutKey(
command,
prepareRecordMetadata()
.requestId(requestId)
.valueType(ValueType.PROCESS_INSTANCE_MIGRATION)
.intent(ProcessInstanceMigrationIntent.MIGRATE));
}

@Override
public void updateJobTimeout(
final UpdateJobTimeoutRequest request,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,65 @@ void shouldModifyProcessInstance() throws InterruptedException, TimeoutException
.isNotEmpty();
}

@Test
void shouldMigrateProcessInstance() throws InterruptedException, TimeoutException {
// given
final DeploymentEvent deployment =
zeebeClient
.newDeployResourceCommand()
.addProcessModel(
Bpmn.createExecutableProcess("sourceProcess")
.startEvent()
.serviceTask("A", b -> b.zeebeJobType("a"))
.endEvent()
.done(),
"sourceProcess.bpmn")
.addProcessModel(
Bpmn.createExecutableProcess("targetProcess")
.startEvent()
.serviceTask("B", b -> b.zeebeJobTypeExpression("b"))
.endEvent()
.done(),
"targetProcess.bpmn")
.send()
.join();

final long processInstanceKey =
zeebeClient
.newCreateInstanceCommand()
.bpmnProcessId("sourceProcess")
.latestVersion()
.send()
.join()
.getProcessInstanceKey();

zeebeEngine.waitForIdleState(Duration.ofSeconds(1));

final long targetProcessDefinitionKey =
deployment.getProcesses().stream()
.filter(p -> p.getBpmnProcessId().equals("targetProcess"))
.findFirst()
.orElseThrow()
.getProcessDefinitionKey();
zeebeClient
.newMigrateProcessInstanceCommand(processInstanceKey)
.migrationPlan(targetProcessDefinitionKey)
.addMappingInstruction("A", "B")
.send()
.join();

assertThat(
StreamSupport.stream(
RecordStream.of(zeebeEngine.getRecordStreamSource())
.processInstanceRecords()
.spliterator(),
false)
.filter(r -> r.getIntent() == ProcessInstanceIntent.ELEMENT_MIGRATED)
.filter(r -> r.getValue().getElementId().equals("B"))
.findFirst())
.isNotEmpty();
}

@Test
void shouldUpdateVariablesOnProcessInstance() {
// given
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import io.camunda.zeebe.gateway.protocol.GatewayGrpc.GatewayImplBase;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.stream.Stream;
import org.junit.jupiter.params.ParameterizedTest;
Expand All @@ -21,6 +22,19 @@

class GrpcToLogStreamGatewayTest {

static final List<String> UNSUPPORTED_METHODS = List.of("streamActivatedJobs");

static final List<String> IGNORED_METHODS =
List.of(
"bindService",
"equals",
"getClass",
"hashCode",
"notify",
"notifyAll",
"toString",
"wait");

@ParameterizedTest(name = "{0}")
@MethodSource("provideMethods")
void testImplementsGatewayEndpoint(final String methodName) {
Expand All @@ -30,13 +44,21 @@ void testImplementsGatewayEndpoint(final String methodName) {
.findAny();

assertThat(optionalMethod)
.describedAs("Expected method %s to be implemented", methodName)
.describedAs(
"""
Expected method %s to be implemented. \
When this test fails, it's likely a new RPC that ZPT should support. \
Please check whether it should be supported by ZPT. \
If it should be suported, add a test case to EngineClientTest.java""",
methodName)
.isPresent();
}

private static Stream<Arguments> provideMethods() {
return Arrays.stream(GatewayImplBase.class.getDeclaredMethods())
.filter(method -> !method.getName().equals("bindService"))
.map(method -> Arguments.of(method.getName()));
static Stream<Arguments> provideMethods() {
return Arrays.stream(GatewayImplBase.class.getMethods())
.map(Method::getName)
.filter(name -> !IGNORED_METHODS.contains(name))
.filter(name -> !UNSUPPORTED_METHODS.contains(name))
.map(Arguments::of);
}
}
Loading