diff --git a/engine/pom.xml b/engine/pom.xml
index 59e16a5d8..b778043b6 100644
--- a/engine/pom.xml
+++ b/engine/pom.xml
@@ -76,6 +76,21 @@
zeebe-db
+
+ io.camunda
+ zeebe-scheduler
+
+
+
+ io.camunda
+ zeebe-stream-platform
+
+
+
+ com.fasterxml.jackson.core
+ jackson-databind
+
+
com.google.api.grpc
proto-google-common-protos
@@ -116,6 +131,11 @@
grpc-stub
+
+ com.google.guava
+ guava
+
+
org.junit.jupiter
@@ -186,10 +206,10 @@
analyze-duplicate
-
- false
+
+
+ org.slf4j:slf4j-simple
+
diff --git a/engine/src/main/java/io/camunda/zeebe/process/test/engine/EngineFactory.java b/engine/src/main/java/io/camunda/zeebe/process/test/engine/EngineFactory.java
index 71fc9baca..d2221a905 100644
--- a/engine/src/main/java/io/camunda/zeebe/process/test/engine/EngineFactory.java
+++ b/engine/src/main/java/io/camunda/zeebe/process/test/engine/EngineFactory.java
@@ -63,9 +63,10 @@ public static ZeebeTestEngine create(final int port) {
final CommandWriter commandWriter = new CommandWriter(logStream.newLogStreamWriter().join());
final CommandSender commandSender = new CommandSender(commandWriter);
final GatewayRequestStore gatewayRequestStore = new GatewayRequestStore();
+ final InMemoryJobStreamer jobStreamer = new InMemoryJobStreamer(commandWriter);
final GrpcToLogStreamGateway gateway =
new GrpcToLogStreamGateway(
- commandWriter, partitionId, partitionCount, port, gatewayRequestStore);
+ commandWriter, partitionId, partitionCount, port, gatewayRequestStore, jobStreamer);
final Server grpcServer = ServerBuilder.forPort(port).addService(gateway).build();
final GrpcResponseWriter grpcResponseWriter =
@@ -75,7 +76,13 @@ public static ZeebeTestEngine create(final int port) {
final StreamProcessor streamProcessor =
createStreamProcessor(
- logStream, zeebeDb, scheduler, grpcResponseWriter, partitionCount, commandSender);
+ logStream,
+ zeebeDb,
+ scheduler,
+ grpcResponseWriter,
+ partitionCount,
+ commandSender,
+ jobStreamer);
final EngineStateMonitor engineStateMonitor =
new EngineStateMonitor(logStorage, streamProcessor);
@@ -144,7 +151,8 @@ private static StreamProcessor createStreamProcessor(
final ActorSchedulingService scheduler,
final GrpcResponseWriter grpcResponseWriter,
final int partitionCount,
- final CommandSender commandSender) {
+ final CommandSender commandSender,
+ final JobStreamer jobStreamer) {
return StreamProcessor.builder()
.logStream(logStream)
.zeebeDb(database)
@@ -160,7 +168,7 @@ private static StreamProcessor createStreamProcessor(
new SubscriptionCommandSender(context.getPartitionId(), commandSender),
commandSender,
FeatureFlags.createDefault(),
- JobStreamer.noop()),
+ jobStreamer),
new EngineConfiguration())))
.actorSchedulingService(scheduler)
.build();
diff --git a/engine/src/main/java/io/camunda/zeebe/process/test/engine/GrpcResponseMapper.java b/engine/src/main/java/io/camunda/zeebe/process/test/engine/GrpcResponseMapper.java
index ea52ca6ca..d948ed3bd 100644
--- a/engine/src/main/java/io/camunda/zeebe/process/test/engine/GrpcResponseMapper.java
+++ b/engine/src/main/java/io/camunda/zeebe/process/test/engine/GrpcResponseMapper.java
@@ -341,30 +341,29 @@ private GeneratedMessageV3 createJobBatchResponse() {
return ActivateJobsResponse.newBuilder()
.addAllJobs(
jobsWithKeys.entrySet().stream()
- .map(
- (entry) -> {
- final JobRecord job = entry.getValue();
- return ActivatedJob.newBuilder()
- .setKey(entry.getKey())
- .setType(job.getType())
- .setRetries(job.getRetries())
- .setWorker(job.getWorker())
- .setDeadline(job.getDeadline())
- .setProcessDefinitionKey(job.getProcessDefinitionKey())
- .setBpmnProcessId(job.getBpmnProcessId())
- .setProcessDefinitionVersion(job.getProcessDefinitionVersion())
- .setProcessInstanceKey(job.getProcessInstanceKey())
- .setElementId(job.getElementId())
- .setElementInstanceKey(job.getElementInstanceKey())
- .setCustomHeaders(
- MsgPackConverter.convertToJson(job.getCustomHeadersBuffer()))
- .setVariables(MsgPackConverter.convertToJson(job.getVariablesBuffer()))
- .build();
- })
+ .map((entry) -> mapToActivatedJob(entry.getKey(), entry.getValue()))
.collect(Collectors.toList()))
.build();
}
+ static ActivatedJob mapToActivatedJob(final long key, final JobRecord job) {
+ return ActivatedJob.newBuilder()
+ .setKey(key)
+ .setType(job.getType())
+ .setRetries(job.getRetries())
+ .setWorker(job.getWorker())
+ .setDeadline(job.getDeadline())
+ .setProcessDefinitionKey(job.getProcessDefinitionKey())
+ .setBpmnProcessId(job.getBpmnProcessId())
+ .setProcessDefinitionVersion(job.getProcessDefinitionVersion())
+ .setProcessInstanceKey(job.getProcessInstanceKey())
+ .setElementId(job.getElementId())
+ .setElementInstanceKey(job.getElementInstanceKey())
+ .setCustomHeaders(MsgPackConverter.convertToJson(job.getCustomHeadersBuffer()))
+ .setVariables(MsgPackConverter.convertToJson(job.getVariablesBuffer()))
+ .build();
+ }
+
private GeneratedMessageV3 createCompleteJobResponse() {
return CompleteJobResponse.newBuilder().build();
}
diff --git a/engine/src/main/java/io/camunda/zeebe/process/test/engine/GrpcToLogStreamGateway.java b/engine/src/main/java/io/camunda/zeebe/process/test/engine/GrpcToLogStreamGateway.java
index 4389a024e..f15c758d5 100644
--- a/engine/src/main/java/io/camunda/zeebe/process/test/engine/GrpcToLogStreamGateway.java
+++ b/engine/src/main/java/io/camunda/zeebe/process/test/engine/GrpcToLogStreamGateway.java
@@ -7,10 +7,14 @@
*/
package io.camunda.zeebe.process.test.engine;
+import static io.camunda.zeebe.util.buffer.BufferUtil.wrapString;
+
+import com.google.common.util.concurrent.MoreExecutors;
import io.camunda.zeebe.gateway.protocol.GatewayGrpc;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.ActivateJobsRequest;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.ActivateJobsResponse;
+import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.ActivatedJob;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.BroadcastSignalRequest;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.BroadcastSignalResponse;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.BrokerInfo;
@@ -43,6 +47,7 @@
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.ResolveIncidentResponse;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.SetVariablesRequest;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.SetVariablesResponse;
+import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.StreamActivatedJobsRequest;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.ThrowErrorRequest;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.ThrowErrorResponse;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.TopologyRequest;
@@ -51,7 +56,10 @@
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.UpdateJobRetriesResponse;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.UpdateJobTimeoutRequest;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.UpdateJobTimeoutResponse;
+import io.camunda.zeebe.msgpack.value.StringValue;
import io.camunda.zeebe.msgpack.value.ValueArray;
+import io.camunda.zeebe.process.test.engine.InMemoryJobStreamer.JobConsumer;
+import io.camunda.zeebe.process.test.engine.InMemoryJobStreamer.PushStatus;
import io.camunda.zeebe.protocol.impl.encoding.MsgPackConverter;
import io.camunda.zeebe.protocol.impl.record.RecordMetadata;
import io.camunda.zeebe.protocol.impl.record.value.decision.DecisionEvaluationRecord;
@@ -73,6 +81,7 @@
import io.camunda.zeebe.protocol.impl.record.value.resource.ResourceDeletionRecord;
import io.camunda.zeebe.protocol.impl.record.value.signal.SignalRecord;
import io.camunda.zeebe.protocol.impl.record.value.variable.VariableDocumentRecord;
+import io.camunda.zeebe.protocol.impl.stream.job.JobActivationPropertiesImpl;
import io.camunda.zeebe.protocol.record.RecordType;
import io.camunda.zeebe.protocol.record.ValueType;
import io.camunda.zeebe.protocol.record.intent.DecisionEvaluationIntent;
@@ -91,7 +100,13 @@
import io.camunda.zeebe.protocol.record.value.VariableDocumentUpdateSemantic;
import io.camunda.zeebe.util.VersionUtil;
import io.camunda.zeebe.util.buffer.BufferUtil;
+import io.grpc.stub.ServerCallStreamObserver;
import io.grpc.stub.StreamObserver;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ForkJoinPool;
+import org.agrona.DirectBuffer;
class GrpcToLogStreamGateway extends GatewayGrpc.GatewayImplBase {
@@ -100,18 +115,21 @@ class GrpcToLogStreamGateway extends GatewayGrpc.GatewayImplBase {
private final int partitionCount;
private final int port;
private final GatewayRequestStore gatewayRequestStore;
+ private final InMemoryJobStreamer jobStreamer;
public GrpcToLogStreamGateway(
final CommandWriter writer,
final int partitionId,
final int partitionCount,
final int port,
- final GatewayRequestStore gatewayRequestStore) {
+ final GatewayRequestStore gatewayRequestStore,
+ final InMemoryJobStreamer jobStreamer) {
this.writer = writer;
this.partitionId = partitionId;
this.partitionCount = partitionCount;
this.port = port;
this.gatewayRequestStore = gatewayRequestStore;
+ this.jobStreamer = jobStreamer;
}
@Override
@@ -137,6 +155,26 @@ public void activateJobs(
writer.writeCommandWithoutKey(jobBatchRecord, recordMetadata);
}
+ @Override
+ public void streamActivatedJobs(
+ final StreamActivatedJobsRequest request,
+ final StreamObserver responseObserver) {
+ final var jobActivationProperties = new JobActivationPropertiesImpl();
+ final var worker = wrapString(request.getWorker());
+ final var jobType = wrapString(request.getType());
+ final var serverObserver = (ServerCallStreamObserver) responseObserver;
+ final var consumer = new GrpcJobConsumer(jobType, serverObserver);
+ jobActivationProperties
+ .setWorker(worker, 0, worker.capacity())
+ .setTimeout(request.getTimeout())
+ .setFetchVariables(request.getFetchVariableList().stream().map(StringValue::new).toList())
+ .setTenantIds(request.getTenantIdsList());
+
+ jobStreamer.addStream(jobType, jobActivationProperties, consumer);
+ serverObserver.setOnCloseHandler(consumer::close);
+ serverObserver.setOnCancelHandler(consumer::close);
+ }
+
@Override
public void cancelProcessInstance(
final CancelProcessInstanceRequest request,
@@ -642,4 +680,46 @@ private DecisionEvaluationRecord createDecisionEvaluationRecord(
public String getAddress() {
return "0.0.0.0:" + port;
}
+
+ private final class GrpcJobConsumer implements JobConsumer, AutoCloseable {
+ private final DirectBuffer jobType;
+ private final ServerCallStreamObserver observer;
+ private final Executor executor;
+
+ private GrpcJobConsumer(
+ final DirectBuffer jobType, final ServerCallStreamObserver observer) {
+ this.jobType = jobType;
+ this.observer = observer;
+ executor = MoreExecutors.newSequentialExecutor(ForkJoinPool.commonPool());
+ }
+
+ @Override
+ public CompletionStage consumeJob(
+ final io.camunda.zeebe.protocol.impl.stream.job.ActivatedJob job) {
+ return CompletableFuture.supplyAsync(() -> forwardJob(job), executor);
+ }
+
+ @Override
+ public void close() {
+ executor.execute(() -> jobStreamer.removeStream(jobType, this));
+ }
+
+ private PushStatus forwardJob(
+ final io.camunda.zeebe.protocol.impl.stream.job.ActivatedJob job) {
+ if (!observer.isReady()) {
+ return PushStatus.BLOCKED;
+ }
+
+ try {
+ final var activatedJob =
+ GrpcResponseMapper.mapToActivatedJob(job.jobKey(), job.jobRecord());
+ observer.onNext(activatedJob);
+ return PushStatus.PUSHED;
+ } catch (final Exception e) {
+ observer.onError(e);
+ close();
+ throw e;
+ }
+ }
+ }
}
diff --git a/engine/src/main/java/io/camunda/zeebe/process/test/engine/InMemoryJobStreamer.java b/engine/src/main/java/io/camunda/zeebe/process/test/engine/InMemoryJobStreamer.java
new file mode 100644
index 000000000..1d84f168e
--- /dev/null
+++ b/engine/src/main/java/io/camunda/zeebe/process/test/engine/InMemoryJobStreamer.java
@@ -0,0 +1,155 @@
+/*
+ * Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
+ * one or more contributor license agreements. See the NOTICE file distributed
+ * with this work for additional information regarding copyright ownership.
+ * Licensed under the Zeebe Community License 1.1. You may not use this file
+ * except in compliance with the Zeebe Community License 1.1.
+ */
+
+package io.camunda.zeebe.process.test.engine;
+
+import io.camunda.zeebe.engine.processing.streamprocessor.JobStreamer;
+import io.camunda.zeebe.protocol.impl.record.RecordMetadata;
+import io.camunda.zeebe.protocol.impl.stream.job.ActivatedJob;
+import io.camunda.zeebe.protocol.impl.stream.job.JobActivationProperties;
+import io.camunda.zeebe.protocol.record.RecordType;
+import io.camunda.zeebe.protocol.record.ValueType;
+import io.camunda.zeebe.protocol.record.intent.JobIntent;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.function.Predicate;
+import org.agrona.DirectBuffer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class InMemoryJobStreamer implements JobStreamer {
+ private static final Logger LOGGER = LoggerFactory.getLogger(InMemoryJobStreamer.class);
+
+ private final ConcurrentMap streams = new ConcurrentHashMap<>();
+ private final CommandWriter yieldWriter;
+
+ InMemoryJobStreamer(final CommandWriter yieldWriter) {
+ this.yieldWriter = yieldWriter;
+ }
+
+ @Override
+ public Optional streamFor(
+ final DirectBuffer jobType, final Predicate filter) {
+ return Optional.ofNullable(streams.get(jobType))
+ .flatMap(s -> filter.test(s.properties()) ? Optional.of(s) : Optional.empty());
+ }
+
+ void addStream(
+ final DirectBuffer jobType,
+ final JobActivationProperties properties,
+ final JobConsumer consumer) {
+ streams.compute(
+ jobType,
+ (ignored, s) -> {
+ final var stream =
+ s == null ? new InMemoryJobStream(properties, new CopyOnWriteArraySet<>()) : s;
+ stream.consumers.add(consumer);
+ return stream;
+ });
+ }
+
+ void removeStream(final DirectBuffer jobType, final JobConsumer consumer) {
+ streams.compute(
+ jobType,
+ (ignored, stream) -> {
+ if (stream == null) {
+ return null;
+ }
+
+ stream.consumers.remove(consumer);
+ if (stream.consumers.isEmpty()) {
+ return null;
+ }
+
+ return stream;
+ });
+ }
+
+ private void yieldJob(final ActivatedJob job) {
+ final var metadata =
+ new RecordMetadata()
+ .intent(JobIntent.YIELD)
+ .recordType(RecordType.COMMAND)
+ .valueType(ValueType.JOB);
+ yieldWriter.writeCommandWithKey(job.jobKey(), job.jobRecord(), metadata);
+ }
+
+ interface JobConsumer {
+ CompletionStage consumeJob(final ActivatedJob job);
+ }
+
+ enum PushStatus {
+ PUSHED,
+ BLOCKED;
+ }
+
+ private final class InMemoryJobStream implements JobStream {
+ private final JobActivationProperties properties;
+ private final Set consumers;
+
+ InMemoryJobStream(final JobActivationProperties properties, final Set consumers) {
+ this.properties = properties;
+ this.consumers = consumers;
+ }
+
+ @Override
+ public JobActivationProperties properties() {
+ return properties;
+ }
+
+ @Override
+ public void push(final ActivatedJob payload) {
+ final var shuffled = new LinkedList<>(consumers);
+ Collections.shuffle(shuffled);
+ push(shuffled, payload);
+ }
+
+ private void push(final Queue consumers, final ActivatedJob job) {
+ final var consumer = consumers.poll();
+ if (consumer == null) {
+ LOGGER.debug("Failed to push job to clients, exhausted all known clients");
+ yieldJob(job);
+ return;
+ }
+
+ try {
+ consumer
+ .consumeJob(job)
+ .whenCompleteAsync(
+ (status, error) -> {
+ if (error != null) {
+ onPushError(consumers, job, error);
+ return;
+ }
+
+ if (status == PushStatus.BLOCKED) {
+ LOGGER.trace(
+ "Underlying stream or client is blocked, retrying with next consumer");
+ CompletableFuture.runAsync(() -> push(consumers, job));
+ }
+ });
+ } catch (final Exception e) {
+ onPushError(consumers, job, e);
+ }
+ }
+
+ private void onPushError(
+ final Queue consumers, final ActivatedJob job, final Throwable error) {
+ LOGGER.debug("Failed to push job to client, retrying with next consumer", error);
+ CompletableFuture.runAsync(() -> push(consumers, job));
+ }
+ }
+}
diff --git a/engine/src/test/java/io/camunda/zeebe/process/test/engine/EngineClientTest.java b/engine/src/test/java/io/camunda/zeebe/process/test/engine/EngineClientTest.java
index 12e68bb68..580f67866 100644
--- a/engine/src/test/java/io/camunda/zeebe/process/test/engine/EngineClientTest.java
+++ b/engine/src/test/java/io/camunda/zeebe/process/test/engine/EngineClientTest.java
@@ -10,6 +10,7 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import com.google.common.util.concurrent.Uninterruptibles;
import io.camunda.zeebe.client.ZeebeClient;
import io.camunda.zeebe.client.api.ZeebeFuture;
import io.camunda.zeebe.client.api.command.ClientException;
@@ -30,6 +31,7 @@
import io.camunda.zeebe.client.api.response.Topology;
import io.camunda.zeebe.model.bpmn.Bpmn;
import io.camunda.zeebe.process.test.api.ZeebeTestEngine;
+import io.camunda.zeebe.process.test.filters.JobRecordStreamFilter;
import io.camunda.zeebe.process.test.filters.RecordStream;
import io.camunda.zeebe.protocol.record.Record;
import io.camunda.zeebe.protocol.record.RecordType;
@@ -43,9 +45,12 @@
import io.camunda.zeebe.protocol.record.value.TimerRecordValue;
import io.camunda.zeebe.util.VersionUtil;
import java.time.Duration;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
@@ -1191,4 +1196,125 @@ void shouldBroadcastSignalWithVariables() {
// then
assertThat(response.getKey()).isPositive();
}
+
+ @Test
+ void shouldStreamJobs() {
+ // given
+ final var jobs = new CopyOnWriteArrayList();
+ final var deployment = deploySingleTaskProcess();
+
+ // when
+ final var stream =
+ zeebeClient
+ .newStreamJobsCommand()
+ .jobType("jobType")
+ .consumer(jobs::add)
+ .fetchVariables(List.of("test"))
+ .workerName("worker")
+ .timeout(Duration.ofMinutes(1))
+ .send();
+
+ // then - since streams cannot receive jobs created before they are registered, and registration
+ // is asynchronous, we just create some jobs until we receive at least one
+ try {
+ Awaitility.await("until we've received some jobs")
+ .untilAsserted(
+ () -> {
+ createSingleTaskInstance(Map.of("test", 1));
+ assertThat(jobs).isNotEmpty();
+ });
+
+ assertThat(jobs)
+ .allSatisfy(
+ job -> {
+ assertThat(job.getBpmnProcessId()).isEqualTo("simpleProcess");
+ assertThat(job.getProcessDefinitionKey())
+ .isEqualTo(deployment.getProcesses().get(0).getProcessDefinitionKey());
+ assertThat(job.getRetries()).isEqualTo(3);
+ assertThat(job.getType()).isEqualTo("jobType");
+ assertThat(job.getWorker()).isEqualTo("worker");
+ });
+ } finally {
+ stream.cancel(true);
+ }
+ }
+
+ @Test
+ void shouldYieldJobIfBlocked() {
+ // given
+ final var deployment = deploySingleTaskProcess();
+ final var latch = new CountDownLatch(1);
+ final var recordStream = RecordStream.of(zeebeEngine.getRecordStreamSource());
+ // we create a large variable to trigger back pressure on the client side, otherwise it would
+ // take tens of thousands of them to reach the hardcoded 32KB threshold
+ final var variables = Map.of("foo", "x".repeat(1024 * 1024));
+
+ // when
+ final var stream =
+ zeebeClient
+ .newStreamJobsCommand()
+ .jobType("jobType")
+ .consumer(job -> Uninterruptibles.awaitUninterruptibly(latch))
+ .fetchVariables(List.of("foo"))
+ .workerName("worker")
+ .timeout(Duration.ofMinutes(1))
+ .send();
+
+ // then - since streams cannot receive jobs created before they are registered, and registration
+ // is asynchronous, we just create some jobs until at least one job is yielded
+ final Map> yieldedJobs = new HashMap<>();
+ try {
+ Awaitility.await("until some jobs are yielded")
+ .pollInSameThread()
+ .pollInterval(Duration.ofMillis(50))
+ .untilAsserted(
+ () -> {
+ createSingleTaskInstance(variables);
+ new JobRecordStreamFilter(recordStream.jobRecords())
+ .withIntent(JobIntent.YIELDED).stream()
+ .forEach(job -> yieldedJobs.put(job.getKey(), job));
+ assertThat(yieldedJobs).isNotEmpty();
+ });
+ latch.countDown();
+
+ // since we're not exactly tracking which jobs are yielded, we can only do a coarse validation
+ // that the right job was yielded
+ assertThat(yieldedJobs)
+ .allSatisfy(
+ (key, job) -> {
+ assertThat(job.getIntent()).isEqualTo(JobIntent.YIELDED);
+ assertThat(job.getValue().getBpmnProcessId()).isEqualTo("simpleProcess");
+ assertThat(job.getValue().getProcessDefinitionKey())
+ .isEqualTo(deployment.getProcesses().get(0).getProcessDefinitionKey());
+ assertThat(job.getValue().getRetries()).isEqualTo(3);
+ assertThat(job.getValue().getType()).isEqualTo("jobType");
+ });
+ } finally {
+ stream.cancel(true);
+ }
+ }
+
+ private ProcessInstanceEvent createSingleTaskInstance(final Map variables) {
+ return zeebeClient
+ .newCreateInstanceCommand()
+ .bpmnProcessId("simpleProcess")
+ .latestVersion()
+ .variables(variables)
+ .send()
+ .join();
+ }
+
+ private DeploymentEvent deploySingleTaskProcess() {
+ return zeebeClient
+ .newDeployResourceCommand()
+ .addProcessModel(
+ Bpmn.createExecutableProcess("simpleProcess")
+ .startEvent()
+ .serviceTask("task", (task) -> task.zeebeJobType("jobType"))
+ .endEvent()
+ .done(),
+ "simpleProcess.bpmn")
+ .send()
+ .join();
+ }
}
diff --git a/engine/src/test/java/io/camunda/zeebe/process/test/engine/GrpcToLogStreamGatewayTest.java b/engine/src/test/java/io/camunda/zeebe/process/test/engine/GrpcToLogStreamGatewayTest.java
index 2bd639573..7f53cdc06 100644
--- a/engine/src/test/java/io/camunda/zeebe/process/test/engine/GrpcToLogStreamGatewayTest.java
+++ b/engine/src/test/java/io/camunda/zeebe/process/test/engine/GrpcToLogStreamGatewayTest.java
@@ -22,7 +22,7 @@
class GrpcToLogStreamGatewayTest {
- static final List UNSUPPORTED_METHODS = List.of("streamActivatedJobs");
+ static final List UNSUPPORTED_METHODS = List.of();
static final List IGNORED_METHODS =
List.of(
diff --git a/pom.xml b/pom.xml
index d73c61338..58fac44d0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -236,6 +236,18 @@
${dependency.zeebe.version}
+
+ io.camunda
+ zeebe-scheduler
+ ${dependency.zeebe.version}
+
+
+
+ io.camunda
+ zeebe-stream-platform
+ ${dependency.zeebe.version}
+
+
io.camunda
zeebe-util