Skip to content

Commit

Permalink
merge: #1020
Browse files Browse the repository at this point in the history
1020: Add support for StreamActivatedJobs RPC r=koevskinikola a=npepinpe

## Description

This PR adds support for the `StreamActivatedJobs` RPC with `zeebe-process-test`, including basic back pressure implementation.

<!-- Cut-off marker
_All lines under and including the cut-off marker will be removed from the merge commit message_

## Definition of Ready

Please check the items that apply, before requesting a review.

You can find more details about these items in our wiki page about [Pull Requests and Code Reviews](https://github.com/camunda/zeebe/wiki/Pull-Requests-and-Code-Reviews).

* [ ] I've reviewed my own code
* [ ] I've written a clear changelist description
* [ ] I've narrowly scoped my changes
* [ ] I've separated structural from behavioural changes
-->

## Definition of Done

<!-- Please check the items that apply, before merging or (if possible) before requesting a review. -->

_Not all items need to be done depending on the issue and the pull request._

Code changes:
* [ ] The changes are backwards compatibility with previous versions
* [ ] If it fixes a bug then PRs are created to backport the fix

Testing:
* [ ] There are unit/integration tests that verify all acceptance criterias of the issue
* [ ] New tests are written to ensure backwards compatibility with further versions
* [ ] The behavior is tested manually

Documentation:
* [ ] Javadoc has been written
* [ ] The documentation is updated


Co-authored-by: Nicolas Pepin-Perreault <[email protected]>
  • Loading branch information
zeebe-bors-camunda[bot] and npepinpe authored Jan 5, 2024
2 parents 67c23d8 + 97a0195 commit 41fbefb
Show file tree
Hide file tree
Showing 8 changed files with 430 additions and 30 deletions.
28 changes: 24 additions & 4 deletions engine/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,21 @@
<artifactId>zeebe-db</artifactId>
</dependency>

<dependency>
<groupId>io.camunda</groupId>
<artifactId>zeebe-scheduler</artifactId>
</dependency>

<dependency>
<groupId>io.camunda</groupId>
<artifactId>zeebe-stream-platform</artifactId>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>

<dependency>
<groupId>com.google.api.grpc</groupId>
<artifactId>proto-google-common-protos</artifactId>
Expand Down Expand Up @@ -116,6 +131,11 @@
<artifactId>grpc-stub</artifactId>
</dependency>

<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>

<!-- Test Scope -->
<dependency>
<groupId>org.junit.jupiter</groupId>
Expand Down Expand Up @@ -186,10 +206,10 @@
<goal>analyze-duplicate</goal>
</goals>
<configuration>
<!-- TODO remove this after version of
plugin is released which includes
https://github.com/apache/maven-dependency-plugin/pull/194 -->
<failOnWarning>false</failOnWarning>
<ignoredUnusedDeclaredDependencies>
<!-- runtime binding for tests -->
<dependency>org.slf4j:slf4j-simple</dependency>
</ignoredUnusedDeclaredDependencies>
</configuration>
</execution>
</executions>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,10 @@ public static ZeebeTestEngine create(final int port) {
final CommandWriter commandWriter = new CommandWriter(logStream.newLogStreamWriter().join());
final CommandSender commandSender = new CommandSender(commandWriter);
final GatewayRequestStore gatewayRequestStore = new GatewayRequestStore();
final InMemoryJobStreamer jobStreamer = new InMemoryJobStreamer(commandWriter);
final GrpcToLogStreamGateway gateway =
new GrpcToLogStreamGateway(
commandWriter, partitionId, partitionCount, port, gatewayRequestStore);
commandWriter, partitionId, partitionCount, port, gatewayRequestStore, jobStreamer);
final Server grpcServer = ServerBuilder.forPort(port).addService(gateway).build();

final GrpcResponseWriter grpcResponseWriter =
Expand All @@ -75,7 +76,13 @@ public static ZeebeTestEngine create(final int port) {

final StreamProcessor streamProcessor =
createStreamProcessor(
logStream, zeebeDb, scheduler, grpcResponseWriter, partitionCount, commandSender);
logStream,
zeebeDb,
scheduler,
grpcResponseWriter,
partitionCount,
commandSender,
jobStreamer);

final EngineStateMonitor engineStateMonitor =
new EngineStateMonitor(logStorage, streamProcessor);
Expand Down Expand Up @@ -144,7 +151,8 @@ private static StreamProcessor createStreamProcessor(
final ActorSchedulingService scheduler,
final GrpcResponseWriter grpcResponseWriter,
final int partitionCount,
final CommandSender commandSender) {
final CommandSender commandSender,
final JobStreamer jobStreamer) {
return StreamProcessor.builder()
.logStream(logStream)
.zeebeDb(database)
Expand All @@ -160,7 +168,7 @@ private static StreamProcessor createStreamProcessor(
new SubscriptionCommandSender(context.getPartitionId(), commandSender),
commandSender,
FeatureFlags.createDefault(),
JobStreamer.noop()),
jobStreamer),
new EngineConfiguration())))
.actorSchedulingService(scheduler)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,30 +341,29 @@ private GeneratedMessageV3 createJobBatchResponse() {
return ActivateJobsResponse.newBuilder()
.addAllJobs(
jobsWithKeys.entrySet().stream()
.map(
(entry) -> {
final JobRecord job = entry.getValue();
return ActivatedJob.newBuilder()
.setKey(entry.getKey())
.setType(job.getType())
.setRetries(job.getRetries())
.setWorker(job.getWorker())
.setDeadline(job.getDeadline())
.setProcessDefinitionKey(job.getProcessDefinitionKey())
.setBpmnProcessId(job.getBpmnProcessId())
.setProcessDefinitionVersion(job.getProcessDefinitionVersion())
.setProcessInstanceKey(job.getProcessInstanceKey())
.setElementId(job.getElementId())
.setElementInstanceKey(job.getElementInstanceKey())
.setCustomHeaders(
MsgPackConverter.convertToJson(job.getCustomHeadersBuffer()))
.setVariables(MsgPackConverter.convertToJson(job.getVariablesBuffer()))
.build();
})
.map((entry) -> mapToActivatedJob(entry.getKey(), entry.getValue()))
.collect(Collectors.toList()))
.build();
}

static ActivatedJob mapToActivatedJob(final long key, final JobRecord job) {
return ActivatedJob.newBuilder()
.setKey(key)
.setType(job.getType())
.setRetries(job.getRetries())
.setWorker(job.getWorker())
.setDeadline(job.getDeadline())
.setProcessDefinitionKey(job.getProcessDefinitionKey())
.setBpmnProcessId(job.getBpmnProcessId())
.setProcessDefinitionVersion(job.getProcessDefinitionVersion())
.setProcessInstanceKey(job.getProcessInstanceKey())
.setElementId(job.getElementId())
.setElementInstanceKey(job.getElementInstanceKey())
.setCustomHeaders(MsgPackConverter.convertToJson(job.getCustomHeadersBuffer()))
.setVariables(MsgPackConverter.convertToJson(job.getVariablesBuffer()))
.build();
}

private GeneratedMessageV3 createCompleteJobResponse() {
return CompleteJobResponse.newBuilder().build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,14 @@
*/
package io.camunda.zeebe.process.test.engine;

import static io.camunda.zeebe.util.buffer.BufferUtil.wrapString;

import com.google.common.util.concurrent.MoreExecutors;
import io.camunda.zeebe.gateway.protocol.GatewayGrpc;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.ActivateJobsRequest;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.ActivateJobsResponse;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.ActivatedJob;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.BroadcastSignalRequest;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.BroadcastSignalResponse;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.BrokerInfo;
Expand Down Expand Up @@ -43,6 +47,7 @@
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.ResolveIncidentResponse;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.SetVariablesRequest;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.SetVariablesResponse;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.StreamActivatedJobsRequest;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.ThrowErrorRequest;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.ThrowErrorResponse;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.TopologyRequest;
Expand All @@ -51,7 +56,10 @@
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.UpdateJobRetriesResponse;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.UpdateJobTimeoutRequest;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.UpdateJobTimeoutResponse;
import io.camunda.zeebe.msgpack.value.StringValue;
import io.camunda.zeebe.msgpack.value.ValueArray;
import io.camunda.zeebe.process.test.engine.InMemoryJobStreamer.JobConsumer;
import io.camunda.zeebe.process.test.engine.InMemoryJobStreamer.PushStatus;
import io.camunda.zeebe.protocol.impl.encoding.MsgPackConverter;
import io.camunda.zeebe.protocol.impl.record.RecordMetadata;
import io.camunda.zeebe.protocol.impl.record.value.decision.DecisionEvaluationRecord;
Expand All @@ -73,6 +81,7 @@
import io.camunda.zeebe.protocol.impl.record.value.resource.ResourceDeletionRecord;
import io.camunda.zeebe.protocol.impl.record.value.signal.SignalRecord;
import io.camunda.zeebe.protocol.impl.record.value.variable.VariableDocumentRecord;
import io.camunda.zeebe.protocol.impl.stream.job.JobActivationPropertiesImpl;
import io.camunda.zeebe.protocol.record.RecordType;
import io.camunda.zeebe.protocol.record.ValueType;
import io.camunda.zeebe.protocol.record.intent.DecisionEvaluationIntent;
Expand All @@ -91,7 +100,13 @@
import io.camunda.zeebe.protocol.record.value.VariableDocumentUpdateSemantic;
import io.camunda.zeebe.util.VersionUtil;
import io.camunda.zeebe.util.buffer.BufferUtil;
import io.grpc.stub.ServerCallStreamObserver;
import io.grpc.stub.StreamObserver;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.ForkJoinPool;
import org.agrona.DirectBuffer;

class GrpcToLogStreamGateway extends GatewayGrpc.GatewayImplBase {

Expand All @@ -100,18 +115,21 @@ class GrpcToLogStreamGateway extends GatewayGrpc.GatewayImplBase {
private final int partitionCount;
private final int port;
private final GatewayRequestStore gatewayRequestStore;
private final InMemoryJobStreamer jobStreamer;

public GrpcToLogStreamGateway(
final CommandWriter writer,
final int partitionId,
final int partitionCount,
final int port,
final GatewayRequestStore gatewayRequestStore) {
final GatewayRequestStore gatewayRequestStore,
final InMemoryJobStreamer jobStreamer) {
this.writer = writer;
this.partitionId = partitionId;
this.partitionCount = partitionCount;
this.port = port;
this.gatewayRequestStore = gatewayRequestStore;
this.jobStreamer = jobStreamer;
}

@Override
Expand All @@ -137,6 +155,26 @@ public void activateJobs(
writer.writeCommandWithoutKey(jobBatchRecord, recordMetadata);
}

@Override
public void streamActivatedJobs(
final StreamActivatedJobsRequest request,
final StreamObserver<ActivatedJob> responseObserver) {
final var jobActivationProperties = new JobActivationPropertiesImpl();
final var worker = wrapString(request.getWorker());
final var jobType = wrapString(request.getType());
final var serverObserver = (ServerCallStreamObserver<ActivatedJob>) responseObserver;
final var consumer = new GrpcJobConsumer(jobType, serverObserver);
jobActivationProperties
.setWorker(worker, 0, worker.capacity())
.setTimeout(request.getTimeout())
.setFetchVariables(request.getFetchVariableList().stream().map(StringValue::new).toList())
.setTenantIds(request.getTenantIdsList());

jobStreamer.addStream(jobType, jobActivationProperties, consumer);
serverObserver.setOnCloseHandler(consumer::close);
serverObserver.setOnCancelHandler(consumer::close);
}

@Override
public void cancelProcessInstance(
final CancelProcessInstanceRequest request,
Expand Down Expand Up @@ -642,4 +680,46 @@ private DecisionEvaluationRecord createDecisionEvaluationRecord(
public String getAddress() {
return "0.0.0.0:" + port;
}

private final class GrpcJobConsumer implements JobConsumer, AutoCloseable {
private final DirectBuffer jobType;
private final ServerCallStreamObserver<ActivatedJob> observer;
private final Executor executor;

private GrpcJobConsumer(
final DirectBuffer jobType, final ServerCallStreamObserver<ActivatedJob> observer) {
this.jobType = jobType;
this.observer = observer;
executor = MoreExecutors.newSequentialExecutor(ForkJoinPool.commonPool());
}

@Override
public CompletionStage<PushStatus> consumeJob(
final io.camunda.zeebe.protocol.impl.stream.job.ActivatedJob job) {
return CompletableFuture.supplyAsync(() -> forwardJob(job), executor);
}

@Override
public void close() {
executor.execute(() -> jobStreamer.removeStream(jobType, this));
}

private PushStatus forwardJob(
final io.camunda.zeebe.protocol.impl.stream.job.ActivatedJob job) {
if (!observer.isReady()) {
return PushStatus.BLOCKED;
}

try {
final var activatedJob =
GrpcResponseMapper.mapToActivatedJob(job.jobKey(), job.jobRecord());
observer.onNext(activatedJob);
return PushStatus.PUSHED;
} catch (final Exception e) {
observer.onError(e);
close();
throw e;
}
}
}
}
Loading

0 comments on commit 41fbefb

Please sign in to comment.