diff --git a/src/main/java/com/vinted/kafka/connect/vespa/feeders/VespaFeederHandler.java b/src/main/java/com/vinted/kafka/connect/vespa/feeders/VespaFeederHandler.java new file mode 100644 index 0000000..d864a56 --- /dev/null +++ b/src/main/java/com/vinted/kafka/connect/vespa/feeders/VespaFeederHandler.java @@ -0,0 +1,131 @@ +package com.vinted.kafka.connect.vespa.feeders; + +import ai.vespa.feed.client.DocumentId; +import ai.vespa.feed.client.OperationParseException; +import ai.vespa.feed.client.Result; +import com.fasterxml.jackson.core.JsonParseException; +import com.vinted.kafka.connect.vespa.VespaReporter; +import com.vinted.kafka.connect.vespa.VespaSinkConfig; +import org.apache.kafka.connect.sink.SinkRecord; +import org.slf4j.Logger; + +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Stream; + +public class VespaFeederHandler { + private final Logger log; + private final VespaSinkConfig config; + private final VespaReporter reporter; + + public VespaFeederHandler(Logger log, VespaSinkConfig config, VespaReporter reporter) { + this.log = log; + this.config = config; + this.reporter = reporter; + } + + public CompletableFuture handle(SinkRecord record, CompletableFuture future) { + CompletableFuture promise = new CompletableFuture<>(); + + future.whenComplete((result, throwable) -> { + if (result == null) { + reporter.report(record, throwable); + + if (!isMalformed(throwable)) { + log.error(errorMessage(record), throwable); + + promise.completeExceptionally(throwable); + } else { + switch (config.behaviorOnMalformedDoc) { + case IGNORE: + log.info(ignoreMessage(record), throwable); + promise.complete(ignoredResult()); + break; + case WARN: + log.warn(ignoreMessage(record), throwable); + promise.complete(ignoredResult()); + break; + case FAIL: + default: + log.error(errorMessage(record), throwable); + + promise.completeExceptionally(throwable); + } + } + } else if (result.type() == Result.Type.success) { + promise.complete(result); + } else { + Throwable resultThrowable = new Throwable(result.toString()); + + reporter.report(record, resultThrowable); + + switch (config.behaviorOnMalformedDoc) { + case IGNORE: + log.info(ignoreMessage(record), throwable); + promise.complete(result); + break; + case WARN: + log.warn(ignoreMessage(record), throwable); + promise.complete(result); + break; + case FAIL: + default: + log.error(errorMessage(record), throwable); + + promise.completeExceptionally(throwable); + } + } + }); + + return promise; + } + + private static String ignoreMessage(SinkRecord record) { + return String.format("Failed to index document '%s'. Ignoring and will not index it.", record); + } + + private static String errorMessage(SinkRecord record) { + return String.format( + "Failed to index document '%s'. To ignore future document like this, change the configuration '%s' to '%s'.", + record, + VespaSinkConfig.BEHAVIOR_ON_MALFORMED_DOCS_CONFIG, + VespaSinkConfig.BehaviorOnMalformedDoc.IGNORE + ); + } + + private static boolean isMalformed(Throwable throwable) { + Throwable rootCause = Stream + .iterate(throwable, Throwable::getCause) + .filter(element -> element.getCause() == null) + .findFirst() + .orElse(throwable); + + return rootCause.toString().toLowerCase().contains("status 400") + || rootCause instanceof OperationParseException + || rootCause instanceof JsonParseException; + } + + private static Result ignoredResult() { + return new Result() { + @Override + public Type type() { + return Type.success; + } + + @Override + public DocumentId documentId() { + return null; + } + + @Override + public Optional resultMessage() { + return Optional.empty(); + } + + @Override + public Optional traceMessage() { + return Optional.empty(); + } + }; + } +} diff --git a/src/main/java/com/vinted/kafka/connect/vespa/feeders/VespaRawFeeder.java b/src/main/java/com/vinted/kafka/connect/vespa/feeders/VespaRawFeeder.java index 8c3fb76..1af3e08 100644 --- a/src/main/java/com/vinted/kafka/connect/vespa/feeders/VespaRawFeeder.java +++ b/src/main/java/com/vinted/kafka/connect/vespa/feeders/VespaRawFeeder.java @@ -21,7 +21,7 @@ public class VespaRawFeeder implements VespaFeeder { private final JsonFeeder feeder; private final ValueConverter valueConverter; - private final VespaResultCallbackHandler resultCallbackHandler; + private final VespaFeederHandler vespaFeederHandler; public VespaRawFeeder(FeedClient client, OperationParameters parameters, VespaReporter reporter, VespaSinkConfig config) { JsonFeeder.Builder builder = JsonFeeder.builder(client); @@ -32,7 +32,7 @@ public VespaRawFeeder(FeedClient client, OperationParameters parameters, VespaRe this.feeder = builder.build(); this.valueConverter = new ValueConverter(); - this.resultCallbackHandler = new VespaResultCallbackHandler(log, config, reporter); + this.vespaFeederHandler = new VespaFeederHandler(log, config, reporter); } @Override @@ -46,9 +46,6 @@ public void close() throws IOException { } private CompletableFuture feedSingle(SinkRecord record) { - return CompletableFuture - .supplyAsync(() -> valueConverter.convert(record)) - .thenCompose(feeder::feedSingle) - .handle((result, throwable) -> resultCallbackHandler.handle(record, result, throwable)); + return vespaFeederHandler.handle(record, feeder.feedSingle(valueConverter.convert(record))); } } diff --git a/src/main/java/com/vinted/kafka/connect/vespa/feeders/VespaResultCallbackHandler.java b/src/main/java/com/vinted/kafka/connect/vespa/feeders/VespaResultCallbackHandler.java deleted file mode 100644 index 0e4ddba..0000000 --- a/src/main/java/com/vinted/kafka/connect/vespa/feeders/VespaResultCallbackHandler.java +++ /dev/null @@ -1,116 +0,0 @@ -package com.vinted.kafka.connect.vespa.feeders; - -import ai.vespa.feed.client.DocumentId; -import ai.vespa.feed.client.OperationParseException; -import ai.vespa.feed.client.Result; -import com.fasterxml.jackson.core.JsonParseException; -import com.vinted.kafka.connect.vespa.VespaReporter; -import com.vinted.kafka.connect.vespa.VespaSinkConfig; -import org.apache.kafka.connect.errors.ConnectException; -import org.apache.kafka.connect.sink.SinkRecord; -import org.slf4j.Logger; - -import java.util.Optional; -import java.util.stream.Stream; - -public class VespaResultCallbackHandler { - private final Logger log; - private final VespaSinkConfig config; - private final VespaReporter reporter; - - public VespaResultCallbackHandler(Logger log, VespaSinkConfig config, VespaReporter reporter) { - this.log = log; - this.config = config; - this.reporter = reporter; - } - - public Result handle(SinkRecord record, Result result, Throwable throwable) { - if (result == null) { - return handleThrowable(record, throwable); - } else { - return handleResult(record, result); - } - } - - private Result handleResult(SinkRecord record, Result result) { - if (result.type() == Result.Type.success) { - return result; - } - - return handleMalformed(record, result, new ConnectException(result.toString())); - } - - private Result handleThrowable(SinkRecord record, Throwable throwable) { - Throwable rootCause = Stream - .iterate(throwable, Throwable::getCause) - .filter(element -> element.getCause() == null) - .findFirst() - .orElse(throwable); - - Result success = new Result() { - @Override - public Type type() { - return Type.success; - } - - @Override - public DocumentId documentId() { - return null; - } - - @Override - public Optional resultMessage() { - return Optional.empty(); - } - - @Override - public Optional traceMessage() { - return Optional.empty(); - } - }; - - boolean isMalformed = rootCause.toString().toLowerCase().contains("status 400") - || rootCause instanceof OperationParseException - || rootCause instanceof JsonParseException; - - if (isMalformed) { - return handleMalformed(record, success, throwable); - } - - log.error(errorMessage(record, success), throwable); - - throw new ConnectException(throwable); - } - - private Result handleMalformed(SinkRecord record, Result result, Throwable throwable) { - reporter.report(record, throwable); - - switch (config.behaviorOnMalformedDoc) { - case IGNORE: - log.info(ignoreMessage(record), throwable); - return result; - case WARN: - log.warn(ignoreMessage(record), throwable); - return result; - case FAIL: - default: - log.error(errorMessage(record, result), throwable); - throw new ConnectException(throwable); - } - } - - private String ignoreMessage(SinkRecord record) { - return String.format("Failed to index document '%s'. Ignoring and will not index it.", record); - } - - private String errorMessage(SinkRecord record, Result result) { - return String.format( - "Failed to index document '%s'. ResultMessage: '%s'. Trace: '%s'. To ignore future document like this, change the configuration '%s' to '%s'.", - record, - result.resultMessage(), - result.traceMessage(), - VespaSinkConfig.BEHAVIOR_ON_MALFORMED_DOCS_CONFIG, - VespaSinkConfig.BehaviorOnMalformedDoc.IGNORE - ); - } -} diff --git a/src/main/java/com/vinted/kafka/connect/vespa/feeders/VespaUpsertFeeder.java b/src/main/java/com/vinted/kafka/connect/vespa/feeders/VespaUpsertFeeder.java index cdbab78..4512e56 100644 --- a/src/main/java/com/vinted/kafka/connect/vespa/feeders/VespaUpsertFeeder.java +++ b/src/main/java/com/vinted/kafka/connect/vespa/feeders/VespaUpsertFeeder.java @@ -29,7 +29,7 @@ public class VespaUpsertFeeder implements VespaFeeder { private final KeyConverter keyConverter; private final ValueConverter valueConverter; private final VespaReporter reporter; - private final VespaResultCallbackHandler resultCallbackHandler; + private final VespaFeederHandler vespaFeederHandler; public VespaUpsertFeeder(FeedClient client, OperationParameters parameters, VespaReporter reporter, VespaSinkConfig config) { this.config = config; @@ -38,7 +38,7 @@ public VespaUpsertFeeder(FeedClient client, OperationParameters parameters, Vesp this.keyConverter = new KeyConverter(); this.valueConverter = new ValueConverter(); this.reporter = reporter; - this.resultCallbackHandler = new VespaResultCallbackHandler(log, config, reporter); + this.vespaFeederHandler = new VespaFeederHandler(log, config, reporter); } @Override @@ -61,9 +61,7 @@ public void close() { } private CompletableFuture feedSingle(Operation operation) { - return operation - .feed(client, parameters) - .handle((result, throwable) -> resultCallbackHandler.handle(operation.record, result, throwable)); + return vespaFeederHandler.handle(operation.record, operation.feed(client, parameters)); } private Stream toOperation(SinkRecord record) {