From 8d35955f8eb7683cb26346301595c0292a9dcf87 Mon Sep 17 00:00:00 2001 From: Evaldas Buinauskas <7301441+buinauskas@users.noreply.github.com> Date: Tue, 24 Oct 2023 16:18:16 +0300 Subject: [PATCH] Change DLQ reporting (#16) --- README.md | 4 ++-- pom.xml | 2 +- .../kafka/connect/vespa/feeders/VespaFeederHandler.java | 8 +++++++- 3 files changed, 10 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index ccb2bc9..404d155 100644 --- a/README.md +++ b/README.md @@ -10,11 +10,11 @@ This connector has not yet been published to Confluent Hub. To install it, downl install it using `confluent-hub` command line tool. ```sh -wget https://github.com/vinted/kafka-connect-vespa/releases/download/v1.0.7/vinted-kafka-connect-vespa-1.0.7-SNAPSHOT.zip -O /tmp/vinted-kafka-connect-vespa-1.0.7-SNAPSHOT.zip -q +wget https://github.com/vinted/kafka-connect-vespa/releases/download/v1.0.8/vinted-kafka-connect-vespa-1.0.8-SNAPSHOT.zip -O /tmp/vinted-kafka-connect-vespa-1.0.8-SNAPSHOT.zip -q ``` ```sh -confluent-hub install --no-prompt /tmp/vinted-kafka-connect-vespa-1.0.7-SNAPSHOT.zip +confluent-hub install --no-prompt /tmp/vinted-kafka-connect-vespa-1.0.8-SNAPSHOT.zip ``` ### Operational modes diff --git a/pom.xml b/pom.xml index 6a5ca3b..67e7f40 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ 4.0.0 com.vinted.kafka.connect.vespa kafka-connect-vespa - 1.0.7-SNAPSHOT + 1.0.8-SNAPSHOT kafka-connect-vespa The Vespa Sink Connector is used to write data from Kafka to a Vespa search engine. https://github.com/vinted/kafka-connect-vespa diff --git a/src/main/java/com/vinted/kafka/connect/vespa/feeders/VespaFeederHandler.java b/src/main/java/com/vinted/kafka/connect/vespa/feeders/VespaFeederHandler.java index 434bd0e..ff2de41 100644 --- a/src/main/java/com/vinted/kafka/connect/vespa/feeders/VespaFeederHandler.java +++ b/src/main/java/com/vinted/kafka/connect/vespa/feeders/VespaFeederHandler.java @@ -29,13 +29,15 @@ public CompletableFuture handle(SinkRecord record, CompletableFuture { if (result == null) { - reporter.report(record, throwable); + // An exception occurred while indexing the document if (!isMalformed(throwable)) { log.error(errorMessage(record), throwable); promise.completeExceptionally(throwable); } else { + reporter.report(record, throwable); + switch (config.behaviorOnMalformedDoc) { case IGNORE: log.info(ignoreMessage(record), throwable); @@ -53,8 +55,12 @@ public CompletableFuture handle(SinkRecord record, CompletableFuture