diff --git a/README.md b/README.md
index 46596b6..89d4c7d 100644
--- a/README.md
+++ b/README.md
@@ -10,23 +10,26 @@ This connector has not yet been published to Confluent Hub. To install it, downl
install it using `confluent-hub` command line tool.
```sh
-wget https://github.com/vinted/kafka-connect-vespa/releases/download/v1.0.2/vinted-kafka-connect-vespa-1.0.2-SNAPSHOT.zip -O /tmp/vinted-kafka-connect-vespa-1.0.2-SNAPSHOT.zip -q
+wget https://github.com/vinted/kafka-connect-vespa/releases/download/v1.0.3/vinted-kafka-connect-vespa-1.0.3-SNAPSHOT.zip -O /tmp/vinted-kafka-connect-vespa-1.0.3-SNAPSHOT.zip -q
```
```sh
-confluent-hub install --no-prompt /tmp/vinted-kafka-connect-vespa-1.0.2-SNAPSHOT.zip
+confluent-hub install --no-prompt /tmp/vinted-kafka-connect-vespa-1.0.3-SNAPSHOT.zip
```
### Important
-This connector expects records from Kafka to have a key and value. Values can be converted using byte, string or JSON
-converters. Topic names are used as document types, use single message transforms to transform topic names into desired
-document types.
+This connectors can work in two modes, `UPSERT` and `RAW`. In upsert mode, the connector expects records from Kafka to
+have a key and value. The key is used as the document ID and the value is used as the document body. In raw mode, the
+connector will execute kafka messages as vespa document api operations using document JSON format.
### Note
-This connector supports deletes. If the record stored in Kafka has a null value, this connector will delete document
-with the corresponding key to Vespa.
+Under upsert mode connector supports deletes, if the record stored in Kafka has a null value, this connector will
+delete document with the corresponding key to Vespa.
+
+Under upsert mode, document keys are constructed using the following format: `namespace:documenttype:id`. Namespace and
+document type are taken from the connector and kafka record is used as the id.
### Configuration
@@ -58,15 +61,15 @@ also warrant a higher number of connections.
- Importance: low
`vespa.max.streams.per.connection`
-This determines the maximum number of concurrent, inflight requests for
-this Sets the maximum number of streams per HTTP/2 client, which is
-maxConnections \* maxStreamsPerConnection. Prefer more streams over more
+This determines the maximum number of concurrent, in-flight requests for
+these Sets the maximum number of streams per HTTP/2 client, which is
+maxConnections \* maxStreamsPerConnection. Prefer more streams to more
connections, when possible. The feed client automatically throttles load
to achieve the best throughput, and the actual number of streams per
connection is usually lower than the maximum.
- Type: int
-- Default: 32
+- Default: 128
- Valid Values: \[1,...\]
- Importance: low
@@ -97,21 +100,19 @@ The period of consecutive failures before shutting down.
`vespa.namespace`
User specified part of each document ID in that sense. Namespace can not
be used in queries, other than as part of the full document ID. However,
-it can be used for document selection, where id.namespace can be
+it can be used for document selection, where namespace can be
accessed and compared to a given string, for instance. An example use
-case is visiting a subset of documents.
+case is visiting a subset of documents. Defaults to topic name if not specified.
- Type: string
-- Default: mynamespace
-- Valid Values: non-empty string without ISO control characters
+- Default: \"\"
- Importance: high
`vespa.document.type`
-Document type as defined in services.xml and the schema.
+Document type as defined in services.xml and the schema. Defaults to topic name if not specified.
- Type: string
-- Default: mydocumenttype
-- Valid Values: non-empty string without ISO control characters
+- Default: \"\"
- Importance: high
`vespa.operational.mode`
@@ -182,6 +183,9 @@ Valid options are ignore', 'warn', and 'fail'.
#### Examples
+Connector configuration examples can be found in the [config](config) directory. But here are some quick ones to get
+started.
+
##### Standalone Example
This configuration is used typically along
@@ -195,8 +199,8 @@ topics=music
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
vespa.endpoint=http://vespa:8080/
-vespa.namespace=mynamespace
-vespa.document.type=mydocumenttype
+vespa.namespace=default
+vespa.document.type=music
```
##### Distributed Example
@@ -215,23 +219,23 @@ the distributed connect worker(s).
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"vespa.endpoint": "http://vespa:8080/",
- "vespa.namespace": "mynamespace",
- "vespa.document.type": "mydocumenttype"
+ "vespa.namespace": "default",
+ "vespa.document.type": "music"
}
}
```
-Use curl to post the configuration to one of the Kafka Connect Workers. Change `http://localhost:8088/` the endpoint of
+Use curl to post the configuration to one of the Kafka Connect Workers. Change `http://localhost:8083/` the endpoint of
one of your Kafka Connect worker(s).
Create a new instance.
```bash
-curl -s -X POST -H 'Content-Type: application/json' --data @connector.json http://localhost:8088/connectors
+curl -s -X POST -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors
```
Update an existing instance.
```bash
-curl -s -X PUT -H 'Content-Type: application/json' --data @connector.json http://localhost:8088/connectors/TestSinkConnector1/config
+curl -s -X PUT -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors/TestSinkConnector1/config
```
diff --git a/config/quickstart-vespa-json-converter.json b/config/quickstart-vespa-json-converter.json
index 0741a15..4b728fd 100644
--- a/config/quickstart-vespa-json-converter.json
+++ b/config/quickstart-vespa-json-converter.json
@@ -8,7 +8,7 @@
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"vespa.url": "http://vespa:8080/",
- "vespa.namespace": "mynamespace",
+ "vespa.namespace": "default",
"vespa.document.type": "music"
}
}
diff --git a/config/quickstart-vespa-malformed-documents-with-dlq.json b/config/quickstart-vespa-malformed-documents-with-dlq.json
new file mode 100644
index 0000000..a31928e
--- /dev/null
+++ b/config/quickstart-vespa-malformed-documents-with-dlq.json
@@ -0,0 +1,22 @@
+{
+ "config": {
+ "name": "VespaSinkConnector1",
+ "connector.class": "com.vinted.kafka.connect.vespa.VespaSinkConnector",
+ "tasks.max": "1",
+ "topics": "music",
+ "key.converter": "org.apache.kafka.connect.storage.StringConverter",
+ "value.converter": "org.apache.kafka.connect.storage.StringConverter",
+ "vespa.url": "http://vespa:8080/",
+ "vespa.namespace": "default",
+ "vespa.document.type": "music",
+ "vespa.dryrun": "false",
+ "vespa.drop.invalid.message": "true",
+ "vespa.behavior.on.malformed.documents": "WARN",
+ "errors.tolerance": "all",
+ "errors.log.enable": "true",
+ "errors.log.include.messages": "true",
+ "errors.deadletterqueue.topic.name": "dlq",
+ "errors.deadletterqueue.topic.replication.factor": 1,
+ "errors.deadletterqueue.context.headers.enable": "true"
+ }
+}
diff --git a/config/quickstart-vespa-malformed-documents.json b/config/quickstart-vespa-malformed-documents.json
index 2794069..7d226fa 100644
--- a/config/quickstart-vespa-malformed-documents.json
+++ b/config/quickstart-vespa-malformed-documents.json
@@ -7,16 +7,10 @@
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"vespa.url": "http://vespa:8080/",
- "vespa.namespace": "mynamespace",
+ "vespa.namespace": "default",
"vespa.document.type": "music",
"vespa.dryrun": "false",
"vespa.drop.invalid.message": "true",
- "vespa.behavior.on.malformed.documents": "WARN",
- "errors.tolerance": "all",
- "errors.log.enable": "true",
- "errors.log.include.messages": "true",
- "errors.deadletterqueue.topic.name": "dlq",
- "errors.deadletterqueue.topic.replication.factor": 1,
- "errors.deadletterqueue.context.headers.enable": "true"
+ "vespa.behavior.on.malformed.documents": "WARN"
}
}
diff --git a/config/quickstart-vespa-raw-operations.json b/config/quickstart-vespa-raw-operations.json
new file mode 100644
index 0000000..ecfbb08
--- /dev/null
+++ b/config/quickstart-vespa-raw-operations.json
@@ -0,0 +1,12 @@
+{
+ "config": {
+ "name": "VespaSinkConnector1",
+ "connector.class": "com.vinted.kafka.connect.vespa.VespaSinkConnector",
+ "tasks.max": "1",
+ "topics": "music",
+ "key.converter": "org.apache.kafka.connect.storage.StringConverter",
+ "value.converter": "org.apache.kafka.connect.storage.StringConverter",
+ "vespa.url": "http://vespa:8080/",
+ "vespa.operational.mode": "raw"
+ }
+}
diff --git a/config/quickstart-vespa-topic-as-documenttype.json b/config/quickstart-vespa-topic-as-documenttype.json
new file mode 100644
index 0000000..03e8c90
--- /dev/null
+++ b/config/quickstart-vespa-topic-as-documenttype.json
@@ -0,0 +1,12 @@
+{
+ "config": {
+ "name": "VespaSinkConnector1",
+ "connector.class": "com.vinted.kafka.connect.vespa.VespaSinkConnector",
+ "tasks.max": "1",
+ "topics": "music",
+ "key.converter": "org.apache.kafka.connect.storage.StringConverter",
+ "value.converter": "org.apache.kafka.connect.storage.StringConverter",
+ "vespa.url": "http://vespa:8080/",
+ "vespa.namespace": "default"
+ }
+}
diff --git a/config/quickstart-vespa.json b/config/quickstart-vespa.json
index 04ca4d1..94aab04 100644
--- a/config/quickstart-vespa.json
+++ b/config/quickstart-vespa.json
@@ -7,7 +7,7 @@
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"vespa.url": "http://vespa:8080/",
- "vespa.namespace": "mynamespace",
+ "vespa.namespace": "default",
"vespa.document.type": "music"
}
}
diff --git a/config/quickstart-vespa.properties b/config/quickstart-vespa.properties
index 9bcfa70..aaa634b 100644
--- a/config/quickstart-vespa.properties
+++ b/config/quickstart-vespa.properties
@@ -5,5 +5,5 @@ topics=music
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
vespa.url=http://vespa:8080/
-vespa.namespace=mynamespace
+vespa.namespace=default
vespa.document.type=music
diff --git a/pom.xml b/pom.xml
index f44dfa8..490b3a1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -5,7 +5,7 @@
4.0.0
com.vinted.kafka.connect.vespa
kafka-connect-vespa
- 1.0.2-SNAPSHOT
+ 1.0.3-SNAPSHOT
kafka-connect-vespa
The Vespa Sink Connector is used to write data from Kafka to a Vespa search engine.
https://github.com/vinted/kafka-connect-vespa
diff --git a/src/main/java/com/vinted/kafka/connect/vespa/VespaSinkConfig.java b/src/main/java/com/vinted/kafka/connect/vespa/VespaSinkConfig.java
index ca556df..674284b 100644
--- a/src/main/java/com/vinted/kafka/connect/vespa/VespaSinkConfig.java
+++ b/src/main/java/com/vinted/kafka/connect/vespa/VespaSinkConfig.java
@@ -26,15 +26,24 @@ public class VespaSinkConfig extends AbstractConfig {
private static final String ENDPOINT_DEFAULT = "http://localhost:8080";
public static final String CONNECTIONS_PER_ENDPOINT_CONFIG = "vespa.connections.per.endpoint";
- private static final String CONNECTIONS_PER_ENDPOINT_DOC = "A reasonable value here is a value that lets all feed clients (if more than one) Sets the number of connections this client will use " + "collectively have a number of connections which is a small multiple of the numbers of containers in the " + "cluster to feed, so load can be balanced across these containers. In general, this value should be kept " + "as low as possible, but poor connectivity between feeder and cluster may also warrant a higher number of " + "connections.";
+ private static final String CONNECTIONS_PER_ENDPOINT_DOC = "A reasonable value here is a value that lets all feed "
+ + "clients (if more than one) Sets the number of connections this client will use "
+ + "collectively have a number of connections which is a small multiple of the numbers of containers in "
+ + "the cluster to feed, so load can be balanced across these containers. In general, this value should "
+ + "be kept as low as possible, but poor connectivity between feeder and cluster may also warrant a higher "
+ + "number of connections.";
private static final String CONNECTIONS_PER_ENDPOINT_DISPLAY = "Connections per endpoint";
private static final int CONNECTIONS_PER_ENDPOINT_DEFAULT = 8;
public static final String MAX_STREAMS_PER_CONNECTION_CONFIG = "vespa.max.streams.per.connection";
- private static final String MAX_STREAMS_PER_CONNECTION_DOC = "This determines the maximum number of concurrent, inflight requests for this " + "Sets the maximum number of streams per HTTP/2 " + "client, which is maxConnections * maxStreamsPerConnection. Prefer more streams over more connections, " + "when possible. The feed client automatically throttles load to achieve the best throughput, and the " + "actual number of streams per connection is usually lower than the maximum.";
+ private static final String MAX_STREAMS_PER_CONNECTION_DOC = "This determines the maximum number of concurrent, "
+ + "in-flight requests for this. Sets the maximum number of streams per HTTP/2 "
+ + "client, which is maxConnections * maxStreamsPerConnection. Prefer more streams over more connections, "
+ + "when possible. The feed client automatically throttles load to achieve the best throughput, and the "
+ + "actual number of streams per connection is usually lower than the maximum.";
private static final String MAX_STREAMS_PER_CONNECTION_DISPLAY = "Max streams per connection";
- private static final int MAX_STREAMS_PER_CONNECTION_DEFAULT = 32;
+ private static final int MAX_STREAMS_PER_CONNECTION_DEFAULT = 128;
public static final String DRYRUN_CONFIG = "vespa.dryrun";
private static final String DRYRUN_DOC = "Turns on dryrun mode, where each operation succeeds after a given "
@@ -57,14 +66,15 @@ public class VespaSinkConfig extends AbstractConfig {
private static final String NAMESPACE_DOC = "User specified part of each document ID in that sense. Namespace can "
+ "not be used in queries, other than as part of the full document ID. However, it can be used for "
+ "document selection, where id.namespace can be accessed and compared to a given string, for instance. "
- + "An example use case is visiting a subset of documents.";
+ + "An example use case is visiting a subset of documents. Defaults to topic name if not specified.";
private static final String NAMESPACE_DISPLAY = "Namespace";
- private static final String NAMESPACE_DEFAULT = "mynamespace";
+ private static final String NAMESPACE_DEFAULT = "";
public static final String DOCUMENT_TYPE_CONFIG = "vespa.document.type";
- private static final String DOCUMENT_TYPE_DOC = "Document type as defined in services.xml and the schema.";
+ private static final String DOCUMENT_TYPE_DOC = "Document type as defined in services.xml and the schema. "
+ + "Defaults to topic name if not specified";
private static final String DOCUMENT_TYPE_DISPLAY = "Document type";
- private static final String DOCUMENT_TYPE_DEFAULT = "mydocumenttype";
+ private static final String DOCUMENT_TYPE_DEFAULT = "";
public static final String OPERATIONAL_MODE_CONFIG = "vespa.operational.mode";
private static final String OPERATIONAL_MODE_DOC = "The operational mode of the connector. Valid options are "
@@ -195,7 +205,6 @@ private static void addConnectorConfigs(ConfigDef configDef) {
NAMESPACE_CONFIG,
ConfigDef.Type.STRING,
NAMESPACE_DEFAULT,
- ConfigDef.NonEmptyStringWithoutControlChars.nonEmptyStringWithoutControlChars(),
ConfigDef.Importance.HIGH,
NAMESPACE_DOC,
CONNECTOR_GROUP,
@@ -206,7 +215,6 @@ private static void addConnectorConfigs(ConfigDef configDef) {
DOCUMENT_TYPE_CONFIG,
ConfigDef.Type.STRING,
DOCUMENT_TYPE_DEFAULT,
- ConfigDef.NonEmptyStringWithoutControlChars.nonEmptyStringWithoutControlChars(),
ConfigDef.Importance.HIGH,
DOCUMENT_TYPE_DOC,
CONNECTOR_GROUP,
diff --git a/src/main/java/com/vinted/kafka/connect/vespa/VespaSinkConnector.java b/src/main/java/com/vinted/kafka/connect/vespa/VespaSinkConnector.java
index a00345b..181f438 100644
--- a/src/main/java/com/vinted/kafka/connect/vespa/VespaSinkConnector.java
+++ b/src/main/java/com/vinted/kafka/connect/vespa/VespaSinkConnector.java
@@ -5,11 +5,13 @@
import com.github.jcustenborder.kafka.connect.utils.config.DocumentationImportant;
import com.github.jcustenborder.kafka.connect.utils.config.DocumentationNote;
import com.github.jcustenborder.kafka.connect.utils.config.Title;
+import com.google.common.collect.ImmutableList;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.sink.SinkConnector;
-import java.util.Collections;
+import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -35,7 +37,13 @@ public Class extends Task> taskClass() {
@Override
public List