From 1962ea6ea50a79611a3bcc501eb5192f47863a28 Mon Sep 17 00:00:00 2001 From: Nicolas Pepin-Perreault Date: Wed, 3 Jan 2024 20:37:11 +0100 Subject: [PATCH 1/4] deps(parent): add missing used undeclared dependencies --- pom.xml | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/pom.xml b/pom.xml index d73c61338..58fac44d0 100644 --- a/pom.xml +++ b/pom.xml @@ -236,6 +236,18 @@ ${dependency.zeebe.version} + + io.camunda + zeebe-scheduler + ${dependency.zeebe.version} + + + + io.camunda + zeebe-stream-platform + ${dependency.zeebe.version} + + io.camunda zeebe-util From 8c15af5912598dd1d338fb0bbd8544cad5980cdb Mon Sep 17 00:00:00 2001 From: Nicolas Pepin-Perreault Date: Wed, 3 Jan 2024 20:37:32 +0100 Subject: [PATCH 2/4] feat(engine): add StreamActivatedJobs RPC implementation --- .../process/test/engine/EngineFactory.java | 16 +- .../test/engine/GrpcResponseMapper.java | 39 +++-- .../test/engine/GrpcToLogStreamGateway.java | 82 ++++++++- .../test/engine/InMemoryJobStreamer.java | 155 ++++++++++++++++++ 4 files changed, 267 insertions(+), 25 deletions(-) create mode 100644 engine/src/main/java/io/camunda/zeebe/process/test/engine/InMemoryJobStreamer.java diff --git a/engine/src/main/java/io/camunda/zeebe/process/test/engine/EngineFactory.java b/engine/src/main/java/io/camunda/zeebe/process/test/engine/EngineFactory.java index 71fc9baca..d2221a905 100644 --- a/engine/src/main/java/io/camunda/zeebe/process/test/engine/EngineFactory.java +++ b/engine/src/main/java/io/camunda/zeebe/process/test/engine/EngineFactory.java @@ -63,9 +63,10 @@ public static ZeebeTestEngine create(final int port) { final CommandWriter commandWriter = new CommandWriter(logStream.newLogStreamWriter().join()); final CommandSender commandSender = new CommandSender(commandWriter); final GatewayRequestStore gatewayRequestStore = new GatewayRequestStore(); + final InMemoryJobStreamer jobStreamer = new InMemoryJobStreamer(commandWriter); final GrpcToLogStreamGateway gateway = new GrpcToLogStreamGateway( - commandWriter, partitionId, partitionCount, port, gatewayRequestStore); + commandWriter, partitionId, partitionCount, port, gatewayRequestStore, jobStreamer); final Server grpcServer = ServerBuilder.forPort(port).addService(gateway).build(); final GrpcResponseWriter grpcResponseWriter = @@ -75,7 +76,13 @@ public static ZeebeTestEngine create(final int port) { final StreamProcessor streamProcessor = createStreamProcessor( - logStream, zeebeDb, scheduler, grpcResponseWriter, partitionCount, commandSender); + logStream, + zeebeDb, + scheduler, + grpcResponseWriter, + partitionCount, + commandSender, + jobStreamer); final EngineStateMonitor engineStateMonitor = new EngineStateMonitor(logStorage, streamProcessor); @@ -144,7 +151,8 @@ private static StreamProcessor createStreamProcessor( final ActorSchedulingService scheduler, final GrpcResponseWriter grpcResponseWriter, final int partitionCount, - final CommandSender commandSender) { + final CommandSender commandSender, + final JobStreamer jobStreamer) { return StreamProcessor.builder() .logStream(logStream) .zeebeDb(database) @@ -160,7 +168,7 @@ private static StreamProcessor createStreamProcessor( new SubscriptionCommandSender(context.getPartitionId(), commandSender), commandSender, FeatureFlags.createDefault(), - JobStreamer.noop()), + jobStreamer), new EngineConfiguration()))) .actorSchedulingService(scheduler) .build(); diff --git a/engine/src/main/java/io/camunda/zeebe/process/test/engine/GrpcResponseMapper.java b/engine/src/main/java/io/camunda/zeebe/process/test/engine/GrpcResponseMapper.java index ea52ca6ca..d948ed3bd 100644 --- a/engine/src/main/java/io/camunda/zeebe/process/test/engine/GrpcResponseMapper.java +++ b/engine/src/main/java/io/camunda/zeebe/process/test/engine/GrpcResponseMapper.java @@ -341,30 +341,29 @@ private GeneratedMessageV3 createJobBatchResponse() { return ActivateJobsResponse.newBuilder() .addAllJobs( jobsWithKeys.entrySet().stream() - .map( - (entry) -> { - final JobRecord job = entry.getValue(); - return ActivatedJob.newBuilder() - .setKey(entry.getKey()) - .setType(job.getType()) - .setRetries(job.getRetries()) - .setWorker(job.getWorker()) - .setDeadline(job.getDeadline()) - .setProcessDefinitionKey(job.getProcessDefinitionKey()) - .setBpmnProcessId(job.getBpmnProcessId()) - .setProcessDefinitionVersion(job.getProcessDefinitionVersion()) - .setProcessInstanceKey(job.getProcessInstanceKey()) - .setElementId(job.getElementId()) - .setElementInstanceKey(job.getElementInstanceKey()) - .setCustomHeaders( - MsgPackConverter.convertToJson(job.getCustomHeadersBuffer())) - .setVariables(MsgPackConverter.convertToJson(job.getVariablesBuffer())) - .build(); - }) + .map((entry) -> mapToActivatedJob(entry.getKey(), entry.getValue())) .collect(Collectors.toList())) .build(); } + static ActivatedJob mapToActivatedJob(final long key, final JobRecord job) { + return ActivatedJob.newBuilder() + .setKey(key) + .setType(job.getType()) + .setRetries(job.getRetries()) + .setWorker(job.getWorker()) + .setDeadline(job.getDeadline()) + .setProcessDefinitionKey(job.getProcessDefinitionKey()) + .setBpmnProcessId(job.getBpmnProcessId()) + .setProcessDefinitionVersion(job.getProcessDefinitionVersion()) + .setProcessInstanceKey(job.getProcessInstanceKey()) + .setElementId(job.getElementId()) + .setElementInstanceKey(job.getElementInstanceKey()) + .setCustomHeaders(MsgPackConverter.convertToJson(job.getCustomHeadersBuffer())) + .setVariables(MsgPackConverter.convertToJson(job.getVariablesBuffer())) + .build(); + } + private GeneratedMessageV3 createCompleteJobResponse() { return CompleteJobResponse.newBuilder().build(); } diff --git a/engine/src/main/java/io/camunda/zeebe/process/test/engine/GrpcToLogStreamGateway.java b/engine/src/main/java/io/camunda/zeebe/process/test/engine/GrpcToLogStreamGateway.java index 4389a024e..f15c758d5 100644 --- a/engine/src/main/java/io/camunda/zeebe/process/test/engine/GrpcToLogStreamGateway.java +++ b/engine/src/main/java/io/camunda/zeebe/process/test/engine/GrpcToLogStreamGateway.java @@ -7,10 +7,14 @@ */ package io.camunda.zeebe.process.test.engine; +import static io.camunda.zeebe.util.buffer.BufferUtil.wrapString; + +import com.google.common.util.concurrent.MoreExecutors; import io.camunda.zeebe.gateway.protocol.GatewayGrpc; import io.camunda.zeebe.gateway.protocol.GatewayOuterClass; import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.ActivateJobsRequest; import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.ActivateJobsResponse; +import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.ActivatedJob; import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.BroadcastSignalRequest; import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.BroadcastSignalResponse; import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.BrokerInfo; @@ -43,6 +47,7 @@ import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.ResolveIncidentResponse; import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.SetVariablesRequest; import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.SetVariablesResponse; +import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.StreamActivatedJobsRequest; import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.ThrowErrorRequest; import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.ThrowErrorResponse; import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.TopologyRequest; @@ -51,7 +56,10 @@ import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.UpdateJobRetriesResponse; import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.UpdateJobTimeoutRequest; import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.UpdateJobTimeoutResponse; +import io.camunda.zeebe.msgpack.value.StringValue; import io.camunda.zeebe.msgpack.value.ValueArray; +import io.camunda.zeebe.process.test.engine.InMemoryJobStreamer.JobConsumer; +import io.camunda.zeebe.process.test.engine.InMemoryJobStreamer.PushStatus; import io.camunda.zeebe.protocol.impl.encoding.MsgPackConverter; import io.camunda.zeebe.protocol.impl.record.RecordMetadata; import io.camunda.zeebe.protocol.impl.record.value.decision.DecisionEvaluationRecord; @@ -73,6 +81,7 @@ import io.camunda.zeebe.protocol.impl.record.value.resource.ResourceDeletionRecord; import io.camunda.zeebe.protocol.impl.record.value.signal.SignalRecord; import io.camunda.zeebe.protocol.impl.record.value.variable.VariableDocumentRecord; +import io.camunda.zeebe.protocol.impl.stream.job.JobActivationPropertiesImpl; import io.camunda.zeebe.protocol.record.RecordType; import io.camunda.zeebe.protocol.record.ValueType; import io.camunda.zeebe.protocol.record.intent.DecisionEvaluationIntent; @@ -91,7 +100,13 @@ import io.camunda.zeebe.protocol.record.value.VariableDocumentUpdateSemantic; import io.camunda.zeebe.util.VersionUtil; import io.camunda.zeebe.util.buffer.BufferUtil; +import io.grpc.stub.ServerCallStreamObserver; import io.grpc.stub.StreamObserver; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.Executor; +import java.util.concurrent.ForkJoinPool; +import org.agrona.DirectBuffer; class GrpcToLogStreamGateway extends GatewayGrpc.GatewayImplBase { @@ -100,18 +115,21 @@ class GrpcToLogStreamGateway extends GatewayGrpc.GatewayImplBase { private final int partitionCount; private final int port; private final GatewayRequestStore gatewayRequestStore; + private final InMemoryJobStreamer jobStreamer; public GrpcToLogStreamGateway( final CommandWriter writer, final int partitionId, final int partitionCount, final int port, - final GatewayRequestStore gatewayRequestStore) { + final GatewayRequestStore gatewayRequestStore, + final InMemoryJobStreamer jobStreamer) { this.writer = writer; this.partitionId = partitionId; this.partitionCount = partitionCount; this.port = port; this.gatewayRequestStore = gatewayRequestStore; + this.jobStreamer = jobStreamer; } @Override @@ -137,6 +155,26 @@ public void activateJobs( writer.writeCommandWithoutKey(jobBatchRecord, recordMetadata); } + @Override + public void streamActivatedJobs( + final StreamActivatedJobsRequest request, + final StreamObserver responseObserver) { + final var jobActivationProperties = new JobActivationPropertiesImpl(); + final var worker = wrapString(request.getWorker()); + final var jobType = wrapString(request.getType()); + final var serverObserver = (ServerCallStreamObserver) responseObserver; + final var consumer = new GrpcJobConsumer(jobType, serverObserver); + jobActivationProperties + .setWorker(worker, 0, worker.capacity()) + .setTimeout(request.getTimeout()) + .setFetchVariables(request.getFetchVariableList().stream().map(StringValue::new).toList()) + .setTenantIds(request.getTenantIdsList()); + + jobStreamer.addStream(jobType, jobActivationProperties, consumer); + serverObserver.setOnCloseHandler(consumer::close); + serverObserver.setOnCancelHandler(consumer::close); + } + @Override public void cancelProcessInstance( final CancelProcessInstanceRequest request, @@ -642,4 +680,46 @@ private DecisionEvaluationRecord createDecisionEvaluationRecord( public String getAddress() { return "0.0.0.0:" + port; } + + private final class GrpcJobConsumer implements JobConsumer, AutoCloseable { + private final DirectBuffer jobType; + private final ServerCallStreamObserver observer; + private final Executor executor; + + private GrpcJobConsumer( + final DirectBuffer jobType, final ServerCallStreamObserver observer) { + this.jobType = jobType; + this.observer = observer; + executor = MoreExecutors.newSequentialExecutor(ForkJoinPool.commonPool()); + } + + @Override + public CompletionStage consumeJob( + final io.camunda.zeebe.protocol.impl.stream.job.ActivatedJob job) { + return CompletableFuture.supplyAsync(() -> forwardJob(job), executor); + } + + @Override + public void close() { + executor.execute(() -> jobStreamer.removeStream(jobType, this)); + } + + private PushStatus forwardJob( + final io.camunda.zeebe.protocol.impl.stream.job.ActivatedJob job) { + if (!observer.isReady()) { + return PushStatus.BLOCKED; + } + + try { + final var activatedJob = + GrpcResponseMapper.mapToActivatedJob(job.jobKey(), job.jobRecord()); + observer.onNext(activatedJob); + return PushStatus.PUSHED; + } catch (final Exception e) { + observer.onError(e); + close(); + throw e; + } + } + } } diff --git a/engine/src/main/java/io/camunda/zeebe/process/test/engine/InMemoryJobStreamer.java b/engine/src/main/java/io/camunda/zeebe/process/test/engine/InMemoryJobStreamer.java new file mode 100644 index 000000000..1d84f168e --- /dev/null +++ b/engine/src/main/java/io/camunda/zeebe/process/test/engine/InMemoryJobStreamer.java @@ -0,0 +1,155 @@ +/* + * Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under + * one or more contributor license agreements. See the NOTICE file distributed + * with this work for additional information regarding copyright ownership. + * Licensed under the Zeebe Community License 1.1. You may not use this file + * except in compliance with the Zeebe Community License 1.1. + */ + +package io.camunda.zeebe.process.test.engine; + +import io.camunda.zeebe.engine.processing.streamprocessor.JobStreamer; +import io.camunda.zeebe.protocol.impl.record.RecordMetadata; +import io.camunda.zeebe.protocol.impl.stream.job.ActivatedJob; +import io.camunda.zeebe.protocol.impl.stream.job.JobActivationProperties; +import io.camunda.zeebe.protocol.record.RecordType; +import io.camunda.zeebe.protocol.record.ValueType; +import io.camunda.zeebe.protocol.record.intent.JobIntent; +import java.util.Collections; +import java.util.LinkedList; +import java.util.Optional; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.function.Predicate; +import org.agrona.DirectBuffer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +final class InMemoryJobStreamer implements JobStreamer { + private static final Logger LOGGER = LoggerFactory.getLogger(InMemoryJobStreamer.class); + + private final ConcurrentMap streams = new ConcurrentHashMap<>(); + private final CommandWriter yieldWriter; + + InMemoryJobStreamer(final CommandWriter yieldWriter) { + this.yieldWriter = yieldWriter; + } + + @Override + public Optional streamFor( + final DirectBuffer jobType, final Predicate filter) { + return Optional.ofNullable(streams.get(jobType)) + .flatMap(s -> filter.test(s.properties()) ? Optional.of(s) : Optional.empty()); + } + + void addStream( + final DirectBuffer jobType, + final JobActivationProperties properties, + final JobConsumer consumer) { + streams.compute( + jobType, + (ignored, s) -> { + final var stream = + s == null ? new InMemoryJobStream(properties, new CopyOnWriteArraySet<>()) : s; + stream.consumers.add(consumer); + return stream; + }); + } + + void removeStream(final DirectBuffer jobType, final JobConsumer consumer) { + streams.compute( + jobType, + (ignored, stream) -> { + if (stream == null) { + return null; + } + + stream.consumers.remove(consumer); + if (stream.consumers.isEmpty()) { + return null; + } + + return stream; + }); + } + + private void yieldJob(final ActivatedJob job) { + final var metadata = + new RecordMetadata() + .intent(JobIntent.YIELD) + .recordType(RecordType.COMMAND) + .valueType(ValueType.JOB); + yieldWriter.writeCommandWithKey(job.jobKey(), job.jobRecord(), metadata); + } + + interface JobConsumer { + CompletionStage consumeJob(final ActivatedJob job); + } + + enum PushStatus { + PUSHED, + BLOCKED; + } + + private final class InMemoryJobStream implements JobStream { + private final JobActivationProperties properties; + private final Set consumers; + + InMemoryJobStream(final JobActivationProperties properties, final Set consumers) { + this.properties = properties; + this.consumers = consumers; + } + + @Override + public JobActivationProperties properties() { + return properties; + } + + @Override + public void push(final ActivatedJob payload) { + final var shuffled = new LinkedList<>(consumers); + Collections.shuffle(shuffled); + push(shuffled, payload); + } + + private void push(final Queue consumers, final ActivatedJob job) { + final var consumer = consumers.poll(); + if (consumer == null) { + LOGGER.debug("Failed to push job to clients, exhausted all known clients"); + yieldJob(job); + return; + } + + try { + consumer + .consumeJob(job) + .whenCompleteAsync( + (status, error) -> { + if (error != null) { + onPushError(consumers, job, error); + return; + } + + if (status == PushStatus.BLOCKED) { + LOGGER.trace( + "Underlying stream or client is blocked, retrying with next consumer"); + CompletableFuture.runAsync(() -> push(consumers, job)); + } + }); + } catch (final Exception e) { + onPushError(consumers, job, e); + } + } + + private void onPushError( + final Queue consumers, final ActivatedJob job, final Throwable error) { + LOGGER.debug("Failed to push job to client, retrying with next consumer", error); + CompletableFuture.runAsync(() -> push(consumers, job)); + } + } +} From 7942fe6215e7df1363da13f257403595ec1e2d86 Mon Sep 17 00:00:00 2001 From: Nicolas Pepin-Perreault Date: Wed, 3 Jan 2024 20:37:47 +0100 Subject: [PATCH 3/4] test(engine): add StreamActivatedJobs tests --- .../process/test/engine/EngineClientTest.java | 126 ++++++++++++++++++ .../engine/GrpcToLogStreamGatewayTest.java | 2 +- 2 files changed, 127 insertions(+), 1 deletion(-) diff --git a/engine/src/test/java/io/camunda/zeebe/process/test/engine/EngineClientTest.java b/engine/src/test/java/io/camunda/zeebe/process/test/engine/EngineClientTest.java index 12e68bb68..580f67866 100644 --- a/engine/src/test/java/io/camunda/zeebe/process/test/engine/EngineClientTest.java +++ b/engine/src/test/java/io/camunda/zeebe/process/test/engine/EngineClientTest.java @@ -10,6 +10,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import com.google.common.util.concurrent.Uninterruptibles; import io.camunda.zeebe.client.ZeebeClient; import io.camunda.zeebe.client.api.ZeebeFuture; import io.camunda.zeebe.client.api.command.ClientException; @@ -30,6 +31,7 @@ import io.camunda.zeebe.client.api.response.Topology; import io.camunda.zeebe.model.bpmn.Bpmn; import io.camunda.zeebe.process.test.api.ZeebeTestEngine; +import io.camunda.zeebe.process.test.filters.JobRecordStreamFilter; import io.camunda.zeebe.process.test.filters.RecordStream; import io.camunda.zeebe.protocol.record.Record; import io.camunda.zeebe.protocol.record.RecordType; @@ -43,9 +45,12 @@ import io.camunda.zeebe.protocol.record.value.TimerRecordValue; import io.camunda.zeebe.util.VersionUtil; import java.time.Duration; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; import java.util.stream.StreamSupport; @@ -1191,4 +1196,125 @@ void shouldBroadcastSignalWithVariables() { // then assertThat(response.getKey()).isPositive(); } + + @Test + void shouldStreamJobs() { + // given + final var jobs = new CopyOnWriteArrayList(); + final var deployment = deploySingleTaskProcess(); + + // when + final var stream = + zeebeClient + .newStreamJobsCommand() + .jobType("jobType") + .consumer(jobs::add) + .fetchVariables(List.of("test")) + .workerName("worker") + .timeout(Duration.ofMinutes(1)) + .send(); + + // then - since streams cannot receive jobs created before they are registered, and registration + // is asynchronous, we just create some jobs until we receive at least one + try { + Awaitility.await("until we've received some jobs") + .untilAsserted( + () -> { + createSingleTaskInstance(Map.of("test", 1)); + assertThat(jobs).isNotEmpty(); + }); + + assertThat(jobs) + .allSatisfy( + job -> { + assertThat(job.getBpmnProcessId()).isEqualTo("simpleProcess"); + assertThat(job.getProcessDefinitionKey()) + .isEqualTo(deployment.getProcesses().get(0).getProcessDefinitionKey()); + assertThat(job.getRetries()).isEqualTo(3); + assertThat(job.getType()).isEqualTo("jobType"); + assertThat(job.getWorker()).isEqualTo("worker"); + }); + } finally { + stream.cancel(true); + } + } + + @Test + void shouldYieldJobIfBlocked() { + // given + final var deployment = deploySingleTaskProcess(); + final var latch = new CountDownLatch(1); + final var recordStream = RecordStream.of(zeebeEngine.getRecordStreamSource()); + // we create a large variable to trigger back pressure on the client side, otherwise it would + // take tens of thousands of them to reach the hardcoded 32KB threshold + final var variables = Map.of("foo", "x".repeat(1024 * 1024)); + + // when + final var stream = + zeebeClient + .newStreamJobsCommand() + .jobType("jobType") + .consumer(job -> Uninterruptibles.awaitUninterruptibly(latch)) + .fetchVariables(List.of("foo")) + .workerName("worker") + .timeout(Duration.ofMinutes(1)) + .send(); + + // then - since streams cannot receive jobs created before they are registered, and registration + // is asynchronous, we just create some jobs until at least one job is yielded + final Map> yieldedJobs = new HashMap<>(); + try { + Awaitility.await("until some jobs are yielded") + .pollInSameThread() + .pollInterval(Duration.ofMillis(50)) + .untilAsserted( + () -> { + createSingleTaskInstance(variables); + new JobRecordStreamFilter(recordStream.jobRecords()) + .withIntent(JobIntent.YIELDED).stream() + .forEach(job -> yieldedJobs.put(job.getKey(), job)); + assertThat(yieldedJobs).isNotEmpty(); + }); + latch.countDown(); + + // since we're not exactly tracking which jobs are yielded, we can only do a coarse validation + // that the right job was yielded + assertThat(yieldedJobs) + .allSatisfy( + (key, job) -> { + assertThat(job.getIntent()).isEqualTo(JobIntent.YIELDED); + assertThat(job.getValue().getBpmnProcessId()).isEqualTo("simpleProcess"); + assertThat(job.getValue().getProcessDefinitionKey()) + .isEqualTo(deployment.getProcesses().get(0).getProcessDefinitionKey()); + assertThat(job.getValue().getRetries()).isEqualTo(3); + assertThat(job.getValue().getType()).isEqualTo("jobType"); + }); + } finally { + stream.cancel(true); + } + } + + private ProcessInstanceEvent createSingleTaskInstance(final Map variables) { + return zeebeClient + .newCreateInstanceCommand() + .bpmnProcessId("simpleProcess") + .latestVersion() + .variables(variables) + .send() + .join(); + } + + private DeploymentEvent deploySingleTaskProcess() { + return zeebeClient + .newDeployResourceCommand() + .addProcessModel( + Bpmn.createExecutableProcess("simpleProcess") + .startEvent() + .serviceTask("task", (task) -> task.zeebeJobType("jobType")) + .endEvent() + .done(), + "simpleProcess.bpmn") + .send() + .join(); + } } diff --git a/engine/src/test/java/io/camunda/zeebe/process/test/engine/GrpcToLogStreamGatewayTest.java b/engine/src/test/java/io/camunda/zeebe/process/test/engine/GrpcToLogStreamGatewayTest.java index 2bd639573..7f53cdc06 100644 --- a/engine/src/test/java/io/camunda/zeebe/process/test/engine/GrpcToLogStreamGatewayTest.java +++ b/engine/src/test/java/io/camunda/zeebe/process/test/engine/GrpcToLogStreamGatewayTest.java @@ -22,7 +22,7 @@ class GrpcToLogStreamGatewayTest { - static final List UNSUPPORTED_METHODS = List.of("streamActivatedJobs"); + static final List UNSUPPORTED_METHODS = List.of(); static final List IGNORED_METHODS = List.of( From 97a019562f443b4ca65151d194c09cfac8dc9810 Mon Sep 17 00:00:00 2001 From: Nicolas Pepin-Perreault Date: Wed, 3 Jan 2024 20:38:06 +0100 Subject: [PATCH 4/4] deps(engine): add used undeclared dependencies --- engine/pom.xml | 28 ++++++++++++++++++++++++---- 1 file changed, 24 insertions(+), 4 deletions(-) diff --git a/engine/pom.xml b/engine/pom.xml index 59e16a5d8..b778043b6 100644 --- a/engine/pom.xml +++ b/engine/pom.xml @@ -76,6 +76,21 @@ zeebe-db + + io.camunda + zeebe-scheduler + + + + io.camunda + zeebe-stream-platform + + + + com.fasterxml.jackson.core + jackson-databind + + com.google.api.grpc proto-google-common-protos @@ -116,6 +131,11 @@ grpc-stub + + com.google.guava + guava + + org.junit.jupiter @@ -186,10 +206,10 @@ analyze-duplicate - - false + + + org.slf4j:slf4j-simple +