From aae686da0534fc4696d827635e0d157875c3413d Mon Sep 17 00:00:00 2001
From: Evaldas Buinauskas <7301441+buinauskas@users.noreply.github.com>
Date: Thu, 21 Sep 2023 09:15:02 +0300
Subject: [PATCH] Fix document type bug in document construction (#9)
---
.github/workflows/CI.yml | 4 +-
README.md | 6 +--
pom.xml | 2 +-
.../kafka/connect/vespa/VespaSinkConfig.java | 8 ++--
.../vespa/feeders/VespaUpsertFeeder.java | 4 +-
.../vespa/VespaUpsertSinkTaskTest.java | 45 +++++++++++++------
6 files changed, 45 insertions(+), 24 deletions(-)
diff --git a/.github/workflows/CI.yml b/.github/workflows/CI.yml
index f8d2e6c..6cd9ab1 100644
--- a/.github/workflows/CI.yml
+++ b/.github/workflows/CI.yml
@@ -33,7 +33,7 @@ jobs:
run: mvn package
- uses: actions/upload-artifact@v3
with:
- name: kafka-connect-vespa-${{ steps.project.outputs.version }}
+ name: vinted-kafka-connect-vespa-${{ steps.project.outputs.version }}
path: |
- target/components/kafka-connect-vespa-${{ steps.project.outputs.version }}.jar
+ target/kafka-connect-vespa-${{ steps.project.outputs.version }}-jar-with-dependencies.jar
target/components/packages/vinted-kafka-connect-vespa-${{ steps.project.outputs.version }}.zip
diff --git a/README.md b/README.md
index 89d4c7d..2f30923 100644
--- a/README.md
+++ b/README.md
@@ -105,14 +105,14 @@ accessed and compared to a given string, for instance. An example use
case is visiting a subset of documents. Defaults to topic name if not specified.
- Type: string
-- Default: \"\"
+- Default: null
- Importance: high
`vespa.document.type`
Document type as defined in services.xml and the schema. Defaults to topic name if not specified.
- Type: string
-- Default: \"\"
+- Default: null
- Importance: high
`vespa.operational.mode`
@@ -174,7 +174,7 @@ message.
`vespa.behavior.on.malformed.documents`
How to handle records that Vespa rejects due to document malformation.
-Valid options are ignore', 'warn', and 'fail'.
+Valid options are `ignore`, `warn`, and `fail`.
- Type: string
- Default: FAIL
diff --git a/pom.xml b/pom.xml
index 490b3a1..99c1147 100644
--- a/pom.xml
+++ b/pom.xml
@@ -5,7 +5,7 @@
4.0.0
com.vinted.kafka.connect.vespa
kafka-connect-vespa
- 1.0.3-SNAPSHOT
+ 1.0.4-SNAPSHOT
kafka-connect-vespa
The Vespa Sink Connector is used to write data from Kafka to a Vespa search engine.
https://github.com/vinted/kafka-connect-vespa
diff --git a/src/main/java/com/vinted/kafka/connect/vespa/VespaSinkConfig.java b/src/main/java/com/vinted/kafka/connect/vespa/VespaSinkConfig.java
index 674284b..123e84f 100644
--- a/src/main/java/com/vinted/kafka/connect/vespa/VespaSinkConfig.java
+++ b/src/main/java/com/vinted/kafka/connect/vespa/VespaSinkConfig.java
@@ -68,13 +68,13 @@ public class VespaSinkConfig extends AbstractConfig {
+ "document selection, where id.namespace can be accessed and compared to a given string, for instance. "
+ "An example use case is visiting a subset of documents. Defaults to topic name if not specified.";
private static final String NAMESPACE_DISPLAY = "Namespace";
- private static final String NAMESPACE_DEFAULT = "";
+ private static final String NAMESPACE_DEFAULT = null;
public static final String DOCUMENT_TYPE_CONFIG = "vespa.document.type";
private static final String DOCUMENT_TYPE_DOC = "Document type as defined in services.xml and the schema. "
+ "Defaults to topic name if not specified";
private static final String DOCUMENT_TYPE_DISPLAY = "Document type";
- private static final String DOCUMENT_TYPE_DEFAULT = "";
+ private static final String DOCUMENT_TYPE_DEFAULT = null;
public static final String OPERATIONAL_MODE_CONFIG = "vespa.operational.mode";
private static final String OPERATIONAL_MODE_DOC = "The operational mode of the connector. Valid options are "
@@ -116,7 +116,7 @@ public class VespaSinkConfig extends AbstractConfig {
public static final String BEHAVIOR_ON_MALFORMED_DOCS_CONFIG = "vespa.behavior.on.malformed.documents";
private static final String BEHAVIOR_ON_MALFORMED_DOCS_DOC = "How to handle records that Vespa rejects due to "
- + "document malformation. Valid options are ignore', 'warn', and 'fail'.";
+ + "document malformation. Valid options are `ignore`, `warn`, and `fail`.";
private static final String BEHAVIOR_ON_MALFORMED_DOCS_DISPLAY = "Behavior on malformed documents";
private static final BehaviorOnMalformedDoc BEHAVIOR_ON_MALFORMED_DOCS_DEFAULT = BehaviorOnMalformedDoc.FAIL;
@@ -205,6 +205,7 @@ private static void addConnectorConfigs(ConfigDef configDef) {
NAMESPACE_CONFIG,
ConfigDef.Type.STRING,
NAMESPACE_DEFAULT,
+ ConfigDef.NonEmptyStringWithoutControlChars.nonEmptyStringWithoutControlChars(),
ConfigDef.Importance.HIGH,
NAMESPACE_DOC,
CONNECTOR_GROUP,
@@ -215,6 +216,7 @@ private static void addConnectorConfigs(ConfigDef configDef) {
DOCUMENT_TYPE_CONFIG,
ConfigDef.Type.STRING,
DOCUMENT_TYPE_DEFAULT,
+ ConfigDef.NonEmptyStringWithoutControlChars.nonEmptyStringWithoutControlChars(),
ConfigDef.Importance.HIGH,
DOCUMENT_TYPE_DOC,
CONNECTOR_GROUP,
diff --git a/src/main/java/com/vinted/kafka/connect/vespa/feeders/VespaUpsertFeeder.java b/src/main/java/com/vinted/kafka/connect/vespa/feeders/VespaUpsertFeeder.java
index 689557f..cdbab78 100644
--- a/src/main/java/com/vinted/kafka/connect/vespa/feeders/VespaUpsertFeeder.java
+++ b/src/main/java/com/vinted/kafka/connect/vespa/feeders/VespaUpsertFeeder.java
@@ -89,11 +89,11 @@ private Stream toOperation(SinkRecord record) {
}
private String getNamespace(SinkRecord record) {
- return config.namespace.isEmpty() ? record.topic() : config.namespace;
+ return config.namespace == null ? record.topic() : config.namespace;
}
private String getDocumentType(SinkRecord record) {
- return config.namespace.isEmpty() ? record.topic() : config.documentType;
+ return config.documentType == null ? record.topic() : config.documentType;
}
private static class Operation {
diff --git a/src/test/java/com/vinted/kafka/connect/vespa/VespaUpsertSinkTaskTest.java b/src/test/java/com/vinted/kafka/connect/vespa/VespaUpsertSinkTaskTest.java
index bd4c940..6ec7fb2 100644
--- a/src/test/java/com/vinted/kafka/connect/vespa/VespaUpsertSinkTaskTest.java
+++ b/src/test/java/com/vinted/kafka/connect/vespa/VespaUpsertSinkTaskTest.java
@@ -5,26 +5,16 @@
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.sink.SinkRecord;
import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
public class VespaUpsertSinkTaskTest {
private long offset = 1;
private final Map params = new HashMap<>();
private final MockVespaFeedClient client = new MockVespaFeedClient();
private final VespaSinkTask task = new VespaSinkTask();
-
- @BeforeEach
- void before() {
- params.put(VespaSinkConfig.NAMESPACE_CONFIG, "test_namespace");
- params.put(VespaSinkConfig.DOCUMENT_TYPE_CONFIG, "test_document_type");
- task.start(params, client);
- }
+ private final String topic = UUID.randomUUID().toString();
@AfterEach
void after() {
@@ -33,6 +23,11 @@ void after() {
@Test
void writesDocumentsToVespa() {
+ params.put(VespaSinkConfig.NAMESPACE_CONFIG, "test_namespace");
+ params.put(VespaSinkConfig.DOCUMENT_TYPE_CONFIG, "test_document_type");
+
+ task.start(params, client);
+
List records = Arrays.asList(
record("set1", "{\"field\":\"value1\"}"),
record("set2", "{\"field\":\"value2\"}"),
@@ -73,6 +68,30 @@ void writesDocumentsToVespa() {
);
}
+ @Test
+ void writesDocumentsWithTopicNameToVespa() {
+ task.start(params, client);
+
+ List records = Arrays.asList(
+ record("set1", "{\"field\":\"value1\"}"),
+ record("set2", "{\"field\":\"value2\"}"),
+ record("delete1", null),
+ record("set3", "{\"field\":\"value3\"}"),
+ record("set4", "{\"field\":\"value4_old\"}"),
+ record("set4", "{\"field\":\"value4\"}")
+ );
+
+ task.put(records);
+
+ client.assertAllDocumentIds(
+ String.format("id:%s:%s::set1", topic, topic),
+ String.format("id:%s:%s::delete1", topic, topic),
+ String.format("id:%s:%s::set2", topic, topic),
+ String.format("id:%s:%s::set3", topic, topic),
+ String.format("id:%s:%s::set4", topic, topic)
+ );
+ }
+
private SinkRecord record(String key, String value) {
final Schema keySchema = Schema.STRING_SCHEMA;
@@ -86,6 +105,6 @@ private SinkRecord record(String key, String value) {
}
- return new SinkRecord("topic", 1, keySchema, key, valueSchema, value, offset++);
+ return new SinkRecord(topic, 1, keySchema, key, valueSchema, value, offset++);
}
}