Skip to content

Commit

Permalink
Use propagation for error handling (#10)
Browse files Browse the repository at this point in the history
  • Loading branch information
buinauskas authored Oct 2, 2023
1 parent 8cf3362 commit 71dc337
Show file tree
Hide file tree
Showing 4 changed files with 137 additions and 127 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package com.vinted.kafka.connect.vespa.feeders;

import ai.vespa.feed.client.DocumentId;
import ai.vespa.feed.client.OperationParseException;
import ai.vespa.feed.client.Result;
import com.fasterxml.jackson.core.JsonParseException;
import com.vinted.kafka.connect.vespa.VespaReporter;
import com.vinted.kafka.connect.vespa.VespaSinkConfig;
import org.apache.kafka.connect.sink.SinkRecord;
import org.slf4j.Logger;

import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Stream;

public class VespaFeederHandler {
private final Logger log;
private final VespaSinkConfig config;
private final VespaReporter reporter;

public VespaFeederHandler(Logger log, VespaSinkConfig config, VespaReporter reporter) {
this.log = log;
this.config = config;
this.reporter = reporter;
}

public CompletableFuture<Result> handle(SinkRecord record, CompletableFuture<Result> future) {
CompletableFuture<Result> promise = new CompletableFuture<>();

future.whenComplete((result, throwable) -> {
if (result == null) {
reporter.report(record, throwable);

if (!isMalformed(throwable)) {
log.error(errorMessage(record), throwable);

promise.completeExceptionally(throwable);
} else {
switch (config.behaviorOnMalformedDoc) {
case IGNORE:
log.info(ignoreMessage(record), throwable);
promise.complete(ignoredResult());
break;
case WARN:
log.warn(ignoreMessage(record), throwable);
promise.complete(ignoredResult());
break;
case FAIL:
default:
log.error(errorMessage(record), throwable);

promise.completeExceptionally(throwable);
}
}
} else if (result.type() == Result.Type.success) {
promise.complete(result);
} else {
Throwable resultThrowable = new Throwable(result.toString());

reporter.report(record, resultThrowable);

switch (config.behaviorOnMalformedDoc) {
case IGNORE:
log.info(ignoreMessage(record), throwable);
promise.complete(result);
break;
case WARN:
log.warn(ignoreMessage(record), throwable);
promise.complete(result);
break;
case FAIL:
default:
log.error(errorMessage(record), throwable);

promise.completeExceptionally(throwable);
}
}
});

return promise;
}

private static String ignoreMessage(SinkRecord record) {
return String.format("Failed to index document '%s'. Ignoring and will not index it.", record);
}

private static String errorMessage(SinkRecord record) {
return String.format(
"Failed to index document '%s'. To ignore future document like this, change the configuration '%s' to '%s'.",
record,
VespaSinkConfig.BEHAVIOR_ON_MALFORMED_DOCS_CONFIG,
VespaSinkConfig.BehaviorOnMalformedDoc.IGNORE
);
}

private static boolean isMalformed(Throwable throwable) {
Throwable rootCause = Stream
.iterate(throwable, Throwable::getCause)
.filter(element -> element.getCause() == null)
.findFirst()
.orElse(throwable);

return rootCause.toString().toLowerCase().contains("status 400")
|| rootCause instanceof OperationParseException
|| rootCause instanceof JsonParseException;
}

private static Result ignoredResult() {
return new Result() {
@Override
public Type type() {
return Type.success;
}

@Override
public DocumentId documentId() {
return null;
}

@Override
public Optional<String> resultMessage() {
return Optional.empty();
}

@Override
public Optional<String> traceMessage() {
return Optional.empty();
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public class VespaRawFeeder implements VespaFeeder {

private final JsonFeeder feeder;
private final ValueConverter valueConverter;
private final VespaResultCallbackHandler resultCallbackHandler;
private final VespaFeederHandler vespaFeederHandler;

public VespaRawFeeder(FeedClient client, OperationParameters parameters, VespaReporter reporter, VespaSinkConfig config) {
JsonFeeder.Builder builder = JsonFeeder.builder(client);
Expand All @@ -32,7 +32,7 @@ public VespaRawFeeder(FeedClient client, OperationParameters parameters, VespaRe

this.feeder = builder.build();
this.valueConverter = new ValueConverter();
this.resultCallbackHandler = new VespaResultCallbackHandler(log, config, reporter);
this.vespaFeederHandler = new VespaFeederHandler(log, config, reporter);
}

@Override
Expand All @@ -46,9 +46,6 @@ public void close() throws IOException {
}

private CompletableFuture<Result> feedSingle(SinkRecord record) {
return CompletableFuture
.supplyAsync(() -> valueConverter.convert(record))
.thenCompose(feeder::feedSingle)
.handle((result, throwable) -> resultCallbackHandler.handle(record, result, throwable));
return vespaFeederHandler.handle(record, feeder.feedSingle(valueConverter.convert(record)));
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public class VespaUpsertFeeder implements VespaFeeder {
private final KeyConverter keyConverter;
private final ValueConverter valueConverter;
private final VespaReporter reporter;
private final VespaResultCallbackHandler resultCallbackHandler;
private final VespaFeederHandler vespaFeederHandler;

public VespaUpsertFeeder(FeedClient client, OperationParameters parameters, VespaReporter reporter, VespaSinkConfig config) {
this.config = config;
Expand All @@ -38,7 +38,7 @@ public VespaUpsertFeeder(FeedClient client, OperationParameters parameters, Vesp
this.keyConverter = new KeyConverter();
this.valueConverter = new ValueConverter();
this.reporter = reporter;
this.resultCallbackHandler = new VespaResultCallbackHandler(log, config, reporter);
this.vespaFeederHandler = new VespaFeederHandler(log, config, reporter);
}

@Override
Expand All @@ -61,9 +61,7 @@ public void close() {
}

private CompletableFuture<Result> feedSingle(Operation operation) {
return operation
.feed(client, parameters)
.handle((result, throwable) -> resultCallbackHandler.handle(operation.record, result, throwable));
return vespaFeederHandler.handle(operation.record, operation.feed(client, parameters));
}

private Stream<Operation> toOperation(SinkRecord record) {
Expand Down

0 comments on commit 71dc337

Please sign in to comment.