Skip to content

Commit

Permalink
Improve vespa connector resilience and logging (#7)
Browse files Browse the repository at this point in the history
* Update documentation

* Update vespa feed client version

* Update connector

* Downgrade vespa client version

* Update readme examples
  • Loading branch information
buinauskas authored Sep 20, 2023
1 parent 0768647 commit 8155214
Show file tree
Hide file tree
Showing 16 changed files with 148 additions and 81 deletions.
41 changes: 21 additions & 20 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ This connector has not yet been published to Confluent Hub. To install it, downl
install it using `confluent-hub` command line tool.

```sh
wget https://github.com/vinted/kafka-connect-vespa/releases/download/v1.0.2/vinted-kafka-connect-vespa-1.0.2-SNAPSHOT.zip -O /tmp/vinted-kafka-connect-vespa-1.0.2-SNAPSHOT.zip -q
wget https://github.com/vinted/kafka-connect-vespa/releases/download/v1.0.3/vinted-kafka-connect-vespa-1.0.3-SNAPSHOT.zip -O /tmp/vinted-kafka-connect-vespa-1.0.3-SNAPSHOT.zip -q
```

```sh
confluent-hub install --no-prompt /tmp/vinted-kafka-connect-vespa-1.0.2-SNAPSHOT.zip
confluent-hub install --no-prompt /tmp/vinted-kafka-connect-vespa-1.0.3-SNAPSHOT.zip
```

### Important
Expand Down Expand Up @@ -61,15 +61,15 @@ also warrant a higher number of connections.
- Importance: low

`vespa.max.streams.per.connection`
This determines the maximum number of concurrent, inflight requests for
this Sets the maximum number of streams per HTTP/2 client, which is
maxConnections \* maxStreamsPerConnection. Prefer more streams over more
This determines the maximum number of concurrent, in-flight requests for
these Sets the maximum number of streams per HTTP/2 client, which is
maxConnections \* maxStreamsPerConnection. Prefer more streams to more
connections, when possible. The feed client automatically throttles load
to achieve the best throughput, and the actual number of streams per
connection is usually lower than the maximum.

- Type: int
- Default: 32
- Default: 128
- Valid Values: \[1,...\]
- Importance: low

Expand Down Expand Up @@ -100,21 +100,19 @@ The period of consecutive failures before shutting down.
`vespa.namespace`
User specified part of each document ID in that sense. Namespace can not
be used in queries, other than as part of the full document ID. However,
it can be used for document selection, where id.namespace can be
it can be used for document selection, where namespace can be
accessed and compared to a given string, for instance. An example use
case is visiting a subset of documents.
case is visiting a subset of documents. Defaults to topic name if not specified.

- Type: string
- Default: mynamespace
- Valid Values: non-empty string without ISO control characters
- Default: \"\"
- Importance: high

`vespa.document.type`
Document type as defined in services.xml and the schema.
Document type as defined in services.xml and the schema. Defaults to topic name if not specified.

- Type: string
- Default: mydocumenttype
- Valid Values: non-empty string without ISO control characters
- Default: \"\"
- Importance: high

`vespa.operational.mode`
Expand Down Expand Up @@ -185,6 +183,9 @@ Valid options are ignore', 'warn', and 'fail'.

#### Examples

Connector configuration examples can be found in the [config](config) directory. But here are some quick ones to get
started.

##### Standalone Example

This configuration is used typically along
Expand All @@ -198,8 +199,8 @@ topics=music
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
vespa.endpoint=http://vespa:8080/
vespa.namespace=mynamespace
vespa.document.type=mydocumenttype
vespa.namespace=default
vespa.document.type=music
```

##### Distributed Example
Expand All @@ -218,23 +219,23 @@ the distributed connect worker(s).
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"vespa.endpoint": "http://vespa:8080/",
"vespa.namespace": "mynamespace",
"vespa.document.type": "mydocumenttype"
"vespa.namespace": "default",
"vespa.document.type": "music"
}
}
```

Use curl to post the configuration to one of the Kafka Connect Workers. Change `http://localhost:8088/` the endpoint of
Use curl to post the configuration to one of the Kafka Connect Workers. Change `http://localhost:8083/` the endpoint of
one of your Kafka Connect worker(s).

Create a new instance.

```bash
curl -s -X POST -H 'Content-Type: application/json' --data @connector.json http://localhost:8088/connectors
curl -s -X POST -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors
```

Update an existing instance.

```bash
curl -s -X PUT -H 'Content-Type: application/json' --data @connector.json http://localhost:8088/connectors/TestSinkConnector1/config
curl -s -X PUT -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors/TestSinkConnector1/config
```
2 changes: 1 addition & 1 deletion config/quickstart-vespa-json-converter.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"vespa.url": "http://vespa:8080/",
"vespa.namespace": "mynamespace",
"vespa.namespace": "default",
"vespa.document.type": "music"
}
}
22 changes: 22 additions & 0 deletions config/quickstart-vespa-malformed-documents-with-dlq.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
{
"config": {
"name": "VespaSinkConnector1",
"connector.class": "com.vinted.kafka.connect.vespa.VespaSinkConnector",
"tasks.max": "1",
"topics": "music",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"vespa.url": "http://vespa:8080/",
"vespa.namespace": "default",
"vespa.document.type": "music",
"vespa.dryrun": "false",
"vespa.drop.invalid.message": "true",
"vespa.behavior.on.malformed.documents": "WARN",
"errors.tolerance": "all",
"errors.log.enable": "true",
"errors.log.include.messages": "true",
"errors.deadletterqueue.topic.name": "dlq",
"errors.deadletterqueue.topic.replication.factor": 1,
"errors.deadletterqueue.context.headers.enable": "true"
}
}
10 changes: 2 additions & 8 deletions config/quickstart-vespa-malformed-documents.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,10 @@
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"vespa.url": "http://vespa:8080/",
"vespa.namespace": "mynamespace",
"vespa.namespace": "default",
"vespa.document.type": "music",
"vespa.dryrun": "false",
"vespa.drop.invalid.message": "true",
"vespa.behavior.on.malformed.documents": "WARN",
"errors.tolerance": "all",
"errors.log.enable": "true",
"errors.log.include.messages": "true",
"errors.deadletterqueue.topic.name": "dlq",
"errors.deadletterqueue.topic.replication.factor": 1,
"errors.deadletterqueue.context.headers.enable": "true"
"vespa.behavior.on.malformed.documents": "WARN"
}
}
12 changes: 12 additions & 0 deletions config/quickstart-vespa-raw-operations.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"config": {
"name": "VespaSinkConnector1",
"connector.class": "com.vinted.kafka.connect.vespa.VespaSinkConnector",
"tasks.max": "1",
"topics": "music",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"vespa.url": "http://vespa:8080/",
"vespa.operational.mode": "raw"
}
}
12 changes: 12 additions & 0 deletions config/quickstart-vespa-topic-as-documenttype.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"config": {
"name": "VespaSinkConnector1",
"connector.class": "com.vinted.kafka.connect.vespa.VespaSinkConnector",
"tasks.max": "1",
"topics": "music",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"vespa.url": "http://vespa:8080/",
"vespa.namespace": "default"
}
}
2 changes: 1 addition & 1 deletion config/quickstart-vespa.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"vespa.url": "http://vespa:8080/",
"vespa.namespace": "mynamespace",
"vespa.namespace": "default",
"vespa.document.type": "music"
}
}
2 changes: 1 addition & 1 deletion config/quickstart-vespa.properties
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,5 @@ topics=music
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
vespa.url=http://vespa:8080/
vespa.namespace=mynamespace
vespa.namespace=default
vespa.document.type=music
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.vinted.kafka.connect.vespa</groupId>
<artifactId>kafka-connect-vespa</artifactId>
<version>1.0.2-SNAPSHOT</version>
<version>1.0.3-SNAPSHOT</version>
<name>kafka-connect-vespa</name>
<description>The Vespa Sink Connector is used to write data from Kafka to a Vespa search engine.</description>
<url>https://github.com/vinted/kafka-connect-vespa</url>
Expand Down
26 changes: 17 additions & 9 deletions src/main/java/com/vinted/kafka/connect/vespa/VespaSinkConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,24 @@ public class VespaSinkConfig extends AbstractConfig {
private static final String ENDPOINT_DEFAULT = "http://localhost:8080";

public static final String CONNECTIONS_PER_ENDPOINT_CONFIG = "vespa.connections.per.endpoint";
private static final String CONNECTIONS_PER_ENDPOINT_DOC = "A reasonable value here is a value that lets all feed clients (if more than one) Sets the number of connections this client will use " + "collectively have a number of connections which is a small multiple of the numbers of containers in the " + "cluster to feed, so load can be balanced across these containers. In general, this value should be kept " + "as low as possible, but poor connectivity between feeder and cluster may also warrant a higher number of " + "connections.";
private static final String CONNECTIONS_PER_ENDPOINT_DOC = "A reasonable value here is a value that lets all feed "
+ "clients (if more than one) Sets the number of connections this client will use "
+ "collectively have a number of connections which is a small multiple of the numbers of containers in "
+ "the cluster to feed, so load can be balanced across these containers. In general, this value should "
+ "be kept as low as possible, but poor connectivity between feeder and cluster may also warrant a higher "
+ "number of connections.";
private static final String CONNECTIONS_PER_ENDPOINT_DISPLAY = "Connections per endpoint";

private static final int CONNECTIONS_PER_ENDPOINT_DEFAULT = 8;

public static final String MAX_STREAMS_PER_CONNECTION_CONFIG = "vespa.max.streams.per.connection";
private static final String MAX_STREAMS_PER_CONNECTION_DOC = "This determines the maximum number of concurrent, inflight requests for this " + "Sets the maximum number of streams per HTTP/2 " + "client, which is maxConnections * maxStreamsPerConnection. Prefer more streams over more connections, " + "when possible. The feed client automatically throttles load to achieve the best throughput, and the " + "actual number of streams per connection is usually lower than the maximum.";
private static final String MAX_STREAMS_PER_CONNECTION_DOC = "This determines the maximum number of concurrent, "
+ "in-flight requests for this. Sets the maximum number of streams per HTTP/2 "
+ "client, which is maxConnections * maxStreamsPerConnection. Prefer more streams over more connections, "
+ "when possible. The feed client automatically throttles load to achieve the best throughput, and the "
+ "actual number of streams per connection is usually lower than the maximum.";
private static final String MAX_STREAMS_PER_CONNECTION_DISPLAY = "Max streams per connection";
private static final int MAX_STREAMS_PER_CONNECTION_DEFAULT = 32;
private static final int MAX_STREAMS_PER_CONNECTION_DEFAULT = 128;

public static final String DRYRUN_CONFIG = "vespa.dryrun";
private static final String DRYRUN_DOC = "Turns on dryrun mode, where each operation succeeds after a given "
Expand All @@ -57,14 +66,15 @@ public class VespaSinkConfig extends AbstractConfig {
private static final String NAMESPACE_DOC = "User specified part of each document ID in that sense. Namespace can "
+ "not be used in queries, other than as part of the full document ID. However, it can be used for "
+ "document selection, where id.namespace can be accessed and compared to a given string, for instance. "
+ "An example use case is visiting a subset of documents.";
+ "An example use case is visiting a subset of documents. Defaults to topic name if not specified.";
private static final String NAMESPACE_DISPLAY = "Namespace";
private static final String NAMESPACE_DEFAULT = "mynamespace";
private static final String NAMESPACE_DEFAULT = "";

public static final String DOCUMENT_TYPE_CONFIG = "vespa.document.type";
private static final String DOCUMENT_TYPE_DOC = "Document type as defined in services.xml and the schema.";
private static final String DOCUMENT_TYPE_DOC = "Document type as defined in services.xml and the schema. "
+ "Defaults to topic name if not specified";
private static final String DOCUMENT_TYPE_DISPLAY = "Document type";
private static final String DOCUMENT_TYPE_DEFAULT = "mydocumenttype";
private static final String DOCUMENT_TYPE_DEFAULT = "";

public static final String OPERATIONAL_MODE_CONFIG = "vespa.operational.mode";
private static final String OPERATIONAL_MODE_DOC = "The operational mode of the connector. Valid options are "
Expand Down Expand Up @@ -195,7 +205,6 @@ private static void addConnectorConfigs(ConfigDef configDef) {
NAMESPACE_CONFIG,
ConfigDef.Type.STRING,
NAMESPACE_DEFAULT,
ConfigDef.NonEmptyStringWithoutControlChars.nonEmptyStringWithoutControlChars(),
ConfigDef.Importance.HIGH,
NAMESPACE_DOC,
CONNECTOR_GROUP,
Expand All @@ -206,7 +215,6 @@ private static void addConnectorConfigs(ConfigDef configDef) {
DOCUMENT_TYPE_CONFIG,
ConfigDef.Type.STRING,
DOCUMENT_TYPE_DEFAULT,
ConfigDef.NonEmptyStringWithoutControlChars.nonEmptyStringWithoutControlChars(),
ConfigDef.Importance.HIGH,
DOCUMENT_TYPE_DOC,
CONNECTOR_GROUP,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@
import com.github.jcustenborder.kafka.connect.utils.config.DocumentationImportant;
import com.github.jcustenborder.kafka.connect.utils.config.DocumentationNote;
import com.github.jcustenborder.kafka.connect.utils.config.Title;
import com.google.common.collect.ImmutableList;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.sink.SinkConnector;

import java.util.Collections;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

Expand All @@ -35,7 +37,13 @@ public Class<? extends Task> taskClass() {

@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
return Collections.nCopies(maxTasks, props);
final List<Map<String, String>> taskConfigs = new ArrayList<>(maxTasks);

for (int i = 0; i < maxTasks; i++) {
taskConfigs.add(new HashMap<>(props));
}

return ImmutableList.copyOf(taskConfigs);
}

@Override
Expand Down
18 changes: 17 additions & 1 deletion src/main/java/com/vinted/kafka/connect/vespa/VespaSinkTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,27 +48,43 @@ protected void start(Map<String, String> props, FeedClient client) {

switch (config.operationalMode) {
case UPSERT:
log.info("Using upsert operational mode.");
feeder = new VespaUpsertFeeder(this.client, parameters, reporter, config);
break;
case RAW:
log.info("Using raw operational mode.");
feeder = new VespaRawFeeder(this.client, parameters, reporter, config);
break;
default:
log.error("Unknown operational mode: {}", config.operationalMode);
throw new ConnectException(String.format("Unknown operational mode: %s", config.operationalMode));
}
}

@Override
public void put(Collection<SinkRecord> collection) {
if (collection.isEmpty()) {
return;
}

log.debug("Putting {} records.", collection.size());

feeder.feed(collection);
try {
feeder.feed(collection);
} catch (Exception e) {
log.error("Error feeding records to Vespa.", e);
throw new ConnectException(e);
}
}

@Override
public void stop() {
log.info("Stopping Vespa Sink Task.");

if (feeder == null) {
return;
}

try {
feeder.close();
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ private Result handleMalformed(SinkRecord record, Result result, Throwable throw

switch (config.behaviorOnMalformedDoc) {
case IGNORE:
log.debug(ignoreMessage(record), throwable);
log.info(ignoreMessage(record), throwable);
return result;
case WARN:
log.warn(ignoreMessage(record), throwable);
Expand All @@ -105,8 +105,7 @@ private String ignoreMessage(SinkRecord record) {

private String errorMessage(SinkRecord record, Result result) {
return String.format(
"Failed to index document '%s'. ResultMessage: '%s'. Trace: '%s'. "
+ "To ignore future document like this, change the configuration '%s' to '%s'.",
"Failed to index document '%s'. ResultMessage: '%s'. Trace: '%s'. To ignore future document like this, change the configuration '%s' to '%s'.",
record,
result.resultMessage(),
result.traceMessage(),
Expand Down
Loading

0 comments on commit 8155214

Please sign in to comment.