-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
8cf3362
commit 6d5cafb
Showing
4 changed files
with
137 additions
and
127 deletions.
There are no files selected for viewing
131 changes: 131 additions & 0 deletions
131
src/main/java/com/vinted/kafka/connect/vespa/feeders/VespaFeederHandler.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,131 @@ | ||
package com.vinted.kafka.connect.vespa.feeders; | ||
|
||
import ai.vespa.feed.client.DocumentId; | ||
import ai.vespa.feed.client.OperationParseException; | ||
import ai.vespa.feed.client.Result; | ||
import com.fasterxml.jackson.core.JsonParseException; | ||
import com.vinted.kafka.connect.vespa.VespaReporter; | ||
import com.vinted.kafka.connect.vespa.VespaSinkConfig; | ||
import org.apache.kafka.connect.sink.SinkRecord; | ||
import org.slf4j.Logger; | ||
|
||
import java.util.Optional; | ||
import java.util.concurrent.CompletableFuture; | ||
import java.util.stream.Stream; | ||
|
||
public class VespaFeederHandler { | ||
private final Logger log; | ||
private final VespaSinkConfig config; | ||
private final VespaReporter reporter; | ||
|
||
public VespaFeederHandler(Logger log, VespaSinkConfig config, VespaReporter reporter) { | ||
this.log = log; | ||
this.config = config; | ||
this.reporter = reporter; | ||
} | ||
|
||
public CompletableFuture<Result> handle(SinkRecord record, CompletableFuture<Result> future) { | ||
CompletableFuture<Result> promise = new CompletableFuture<>(); | ||
|
||
future.whenComplete((result, throwable) -> { | ||
if (result == null) { | ||
reporter.report(record, throwable); | ||
|
||
if (!isMalformed(throwable)) { | ||
log.error(errorMessage(record), throwable); | ||
|
||
promise.completeExceptionally(throwable); | ||
} else { | ||
switch (config.behaviorOnMalformedDoc) { | ||
case IGNORE: | ||
log.info(ignoreMessage(record), throwable); | ||
promise.complete(ignoredResult()); | ||
break; | ||
case WARN: | ||
log.warn(ignoreMessage(record), throwable); | ||
promise.complete(ignoredResult()); | ||
break; | ||
case FAIL: | ||
default: | ||
log.error(errorMessage(record), throwable); | ||
|
||
promise.completeExceptionally(throwable); | ||
} | ||
} | ||
} else if (result.type() == Result.Type.success) { | ||
promise.complete(result); | ||
} else { | ||
Throwable resultThrowable = new Throwable(result.toString()); | ||
|
||
reporter.report(record, resultThrowable); | ||
|
||
switch (config.behaviorOnMalformedDoc) { | ||
case IGNORE: | ||
log.info(ignoreMessage(record), throwable); | ||
promise.complete(result); | ||
break; | ||
case WARN: | ||
log.warn(ignoreMessage(record), throwable); | ||
promise.complete(result); | ||
break; | ||
case FAIL: | ||
default: | ||
log.error(errorMessage(record), throwable); | ||
|
||
promise.completeExceptionally(throwable); | ||
} | ||
} | ||
}); | ||
|
||
return promise; | ||
} | ||
|
||
private static String ignoreMessage(SinkRecord record) { | ||
return String.format("Failed to index document '%s'. Ignoring and will not index it.", record); | ||
} | ||
|
||
private static String errorMessage(SinkRecord record) { | ||
return String.format( | ||
"Failed to index document '%s'. To ignore future document like this, change the configuration '%s' to '%s'.", | ||
record, | ||
VespaSinkConfig.BEHAVIOR_ON_MALFORMED_DOCS_CONFIG, | ||
VespaSinkConfig.BehaviorOnMalformedDoc.IGNORE | ||
); | ||
} | ||
|
||
private static boolean isMalformed(Throwable throwable) { | ||
Throwable rootCause = Stream | ||
.iterate(throwable, Throwable::getCause) | ||
.filter(element -> element.getCause() == null) | ||
.findFirst() | ||
.orElse(throwable); | ||
|
||
return rootCause.toString().toLowerCase().contains("status 400") | ||
|| rootCause instanceof OperationParseException | ||
|| rootCause instanceof JsonParseException; | ||
} | ||
|
||
private static Result ignoredResult() { | ||
return new Result() { | ||
@Override | ||
public Type type() { | ||
return Type.success; | ||
} | ||
|
||
@Override | ||
public DocumentId documentId() { | ||
return null; | ||
} | ||
|
||
@Override | ||
public Optional<String> resultMessage() { | ||
return Optional.empty(); | ||
} | ||
|
||
@Override | ||
public Optional<String> traceMessage() { | ||
return Optional.empty(); | ||
} | ||
}; | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
116 changes: 0 additions & 116 deletions
116
src/main/java/com/vinted/kafka/connect/vespa/feeders/VespaResultCallbackHandler.java
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters