Skip to content

Commit

Permalink
Merge pull request #71 from camunda-community-hub/fm-exponential-backoff
Browse files Browse the repository at this point in the history
Add backoff to retrying
  • Loading branch information
berndruecker authored Jan 26, 2022
2 parents 793c126 + 02a8a1e commit d601d66
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 6 deletions.
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
<parent>
<groupId>org.camunda.community</groupId>
<artifactId>community-hub-release-parent</artifactId>
<version>1.2.2</version>
<version>1.2.2</version>
<relativePath />
</parent>

Expand All @@ -40,7 +40,7 @@
<version.junit>5.5.2</version.junit>
<version.log4j>2.17.1</version.log4j>
<version.slf4j>1.7.26</version.slf4j>
<version.zeebe>1.2.1</version.zeebe>
<version.zeebe>1.3.1</version.zeebe>

<!-- plugin version -->
<plugin.version.assembly>3.1.1</plugin.version.assembly>
Expand Down
17 changes: 15 additions & 2 deletions src/main/java/io/zeebe/kafka/connect/sink/ZeebeSinkFuture.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,15 @@

import io.camunda.zeebe.client.api.command.FinalCommandStep;
import io.camunda.zeebe.client.api.response.PublishMessageResponse;
import io.camunda.zeebe.client.api.worker.BackoffSupplier;
import io.grpc.Status.Code;
import io.grpc.StatusRuntimeException;
import java.util.EnumSet;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -51,8 +55,15 @@ class ZeebeSinkFuture extends CompletableFuture<PublishMessageResponse> {

private final FinalCommandStep<PublishMessageResponse> command;

ZeebeSinkFuture(final FinalCommandStep<PublishMessageResponse> command) {
// TODO inject reusable executor
private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
private BackoffSupplier backoffSupplier;
private long currentRetryDelay = 50L;

ZeebeSinkFuture(
final FinalCommandStep<PublishMessageResponse> command, BackoffSupplier backoffSupplier) {
this.command = command;
this.backoffSupplier = backoffSupplier;
}

@SuppressWarnings("unchecked")
Expand All @@ -73,7 +84,9 @@ CompletableFuture<PublishMessageResponse> executeAsync() {
if (SUCCESS_CODES.contains(code)) {
complete(null);
} else if (RETRIABLE_CODES.contains(code)) {
executeAsync();
currentRetryDelay = backoffSupplier.supplyRetryDelay(currentRetryDelay);
LOGGER.trace("Retry " + command + " after seeing " + code + ", backoff/delay: " + currentRetryDelay + " ms");
executor.schedule(this::executeAsync, currentRetryDelay, TimeUnit.MILLISECONDS);
} else if (FAILURE_CODES.contains(code)) {
completeExceptionally(throwable);
} else {
Expand Down
12 changes: 11 additions & 1 deletion src/main/java/io/zeebe/kafka/connect/sink/ZeebeSinkTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import io.camunda.zeebe.client.api.command.FinalCommandStep;
import io.camunda.zeebe.client.api.command.PublishMessageCommandStep1.PublishMessageCommandStep3;
import io.camunda.zeebe.client.api.response.PublishMessageResponse;
import io.camunda.zeebe.client.api.worker.BackoffSupplier;
import io.camunda.zeebe.client.impl.worker.ExponentialBackoffBuilderImpl;
import io.zeebe.kafka.connect.sink.message.JsonRecordParser;
import io.zeebe.kafka.connect.sink.message.JsonRecordParser.Builder;
import io.zeebe.kafka.connect.sink.message.Message;
Expand All @@ -41,12 +43,20 @@ public class ZeebeSinkTask extends SinkTask {

private ManagedClient managedClient;
private JsonRecordParser parser;
private BackoffSupplier backoffSupplier;

@Override
public void start(final Map<String, String> props) {
final ZeebeSinkConnectorConfig config = new ZeebeSinkConnectorConfig(props);
managedClient = new ManagedClient(ZeebeClientHelper.buildClient(config));
parser = buildParser(config);
backoffSupplier =
new ExponentialBackoffBuilderImpl()
.maxDelay(1000L)
.minDelay(50L)
.backoffFactor(1.5)
.jitterFactor(0.2)
.build();
}

// The documentation specifies that we probably shouldn't block here but I'm not sure what the
Expand Down Expand Up @@ -82,7 +92,7 @@ private CompletableFuture<Void> publishMessages(
sinkRecords
.stream()
.map(r -> this.preparePublishRequest(client, r))
.map(ZeebeSinkFuture::new)
.map(command -> new ZeebeSinkFuture(command, backoffSupplier))
.map(ZeebeSinkFuture::executeAsync)
.toArray(CompletableFuture[]::new);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.camunda.zeebe.client.api.ZeebeFuture;
import io.camunda.zeebe.client.api.command.FinalCommandStep;
import io.camunda.zeebe.client.api.response.PublishMessageResponse;
import io.camunda.zeebe.client.impl.worker.ExponentialBackoffBuilderImpl;
import io.grpc.Status;
import io.grpc.Status.Code;
import io.grpc.StatusRuntimeException;
Expand Down Expand Up @@ -130,7 +131,7 @@ private ZeebeSinkFuture create(final RuntimeException e) {
}

private ZeebeSinkFuture create(final Runnable r) {
return new ZeebeSinkFuture(new FinalStepMock(r));
return new ZeebeSinkFuture(new FinalStepMock(r), new ExponentialBackoffBuilderImpl().build());
}

static class FinalStepMock implements FinalCommandStep<PublishMessageResponse> {
Expand Down

0 comments on commit d601d66

Please sign in to comment.