Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve vespa connector resilience and logging #7

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 29 additions & 25 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,26 @@ This connector has not yet been published to Confluent Hub. To install it, downl
install it using `confluent-hub` command line tool.

```sh
wget https://github.com/vinted/kafka-connect-vespa/releases/download/v1.0.2/vinted-kafka-connect-vespa-1.0.2-SNAPSHOT.zip -O /tmp/vinted-kafka-connect-vespa-1.0.2-SNAPSHOT.zip -q
wget https://github.com/vinted/kafka-connect-vespa/releases/download/v1.0.3/vinted-kafka-connect-vespa-1.0.3-SNAPSHOT.zip -O /tmp/vinted-kafka-connect-vespa-1.0.3-SNAPSHOT.zip -q
```

```sh
confluent-hub install --no-prompt /tmp/vinted-kafka-connect-vespa-1.0.2-SNAPSHOT.zip
confluent-hub install --no-prompt /tmp/vinted-kafka-connect-vespa-1.0.3-SNAPSHOT.zip
```

### Important

This connector expects records from Kafka to have a key and value. Values can be converted using byte, string or JSON
converters. Topic names are used as document types, use single message transforms to transform topic names into desired
document types.
This connectors can work in two modes, `UPSERT` and `RAW`. In upsert mode, the connector expects records from Kafka to
have a key and value. The key is used as the document ID and the value is used as the document body. In raw mode, the
connector will execute kafka messages as vespa document api operations using document JSON format.

### Note

This connector supports deletes. If the record stored in Kafka has a null value, this connector will delete document
with the corresponding key to Vespa.
Under upsert mode connector supports deletes, if the record stored in Kafka has a null value, this connector will
delete document with the corresponding key to Vespa.

Under upsert mode, document keys are constructed using the following format: `namespace:documenttype:id`. Namespace and
document type are taken from the connector and kafka record is used as the id.

### Configuration

Expand Down Expand Up @@ -58,15 +61,15 @@ also warrant a higher number of connections.
- Importance: low

`vespa.max.streams.per.connection`
This determines the maximum number of concurrent, inflight requests for
this Sets the maximum number of streams per HTTP/2 client, which is
maxConnections \* maxStreamsPerConnection. Prefer more streams over more
This determines the maximum number of concurrent, in-flight requests for
these Sets the maximum number of streams per HTTP/2 client, which is
maxConnections \* maxStreamsPerConnection. Prefer more streams to more
connections, when possible. The feed client automatically throttles load
to achieve the best throughput, and the actual number of streams per
connection is usually lower than the maximum.

- Type: int
- Default: 32
- Default: 128
- Valid Values: \[1,...\]
- Importance: low

Expand Down Expand Up @@ -97,21 +100,19 @@ The period of consecutive failures before shutting down.
`vespa.namespace`
User specified part of each document ID in that sense. Namespace can not
be used in queries, other than as part of the full document ID. However,
it can be used for document selection, where id.namespace can be
it can be used for document selection, where namespace can be
accessed and compared to a given string, for instance. An example use
case is visiting a subset of documents.
case is visiting a subset of documents. Defaults to topic name if not specified.

- Type: string
- Default: mynamespace
- Valid Values: non-empty string without ISO control characters
- Default: \"\"
- Importance: high

`vespa.document.type`
Document type as defined in services.xml and the schema.
Document type as defined in services.xml and the schema. Defaults to topic name if not specified.

- Type: string
- Default: mydocumenttype
- Valid Values: non-empty string without ISO control characters
- Default: \"\"
- Importance: high

`vespa.operational.mode`
Expand Down Expand Up @@ -182,6 +183,9 @@ Valid options are ignore', 'warn', and 'fail'.

#### Examples

Connector configuration examples can be found in the [config](config) directory. But here are some quick ones to get
started.

##### Standalone Example

This configuration is used typically along
Expand All @@ -195,8 +199,8 @@ topics=music
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
vespa.endpoint=http://vespa:8080/
vespa.namespace=mynamespace
vespa.document.type=mydocumenttype
vespa.namespace=default
vespa.document.type=music
```

##### Distributed Example
Expand All @@ -215,23 +219,23 @@ the distributed connect worker(s).
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"vespa.endpoint": "http://vespa:8080/",
"vespa.namespace": "mynamespace",
"vespa.document.type": "mydocumenttype"
"vespa.namespace": "default",
"vespa.document.type": "music"
}
}
```

Use curl to post the configuration to one of the Kafka Connect Workers. Change `http://localhost:8088/` the endpoint of
Use curl to post the configuration to one of the Kafka Connect Workers. Change `http://localhost:8083/` the endpoint of
one of your Kafka Connect worker(s).

Create a new instance.

```bash
curl -s -X POST -H 'Content-Type: application/json' --data @connector.json http://localhost:8088/connectors
curl -s -X POST -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors
```

Update an existing instance.

```bash
curl -s -X PUT -H 'Content-Type: application/json' --data @connector.json http://localhost:8088/connectors/TestSinkConnector1/config
curl -s -X PUT -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors/TestSinkConnector1/config
```
2 changes: 1 addition & 1 deletion config/quickstart-vespa-json-converter.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"vespa.url": "http://vespa:8080/",
"vespa.namespace": "mynamespace",
"vespa.namespace": "default",
"vespa.document.type": "music"
}
}
22 changes: 22 additions & 0 deletions config/quickstart-vespa-malformed-documents-with-dlq.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
{
"config": {
"name": "VespaSinkConnector1",
"connector.class": "com.vinted.kafka.connect.vespa.VespaSinkConnector",
"tasks.max": "1",
"topics": "music",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"vespa.url": "http://vespa:8080/",
"vespa.namespace": "default",
"vespa.document.type": "music",
"vespa.dryrun": "false",
"vespa.drop.invalid.message": "true",
"vespa.behavior.on.malformed.documents": "WARN",
"errors.tolerance": "all",
"errors.log.enable": "true",
"errors.log.include.messages": "true",
"errors.deadletterqueue.topic.name": "dlq",
"errors.deadletterqueue.topic.replication.factor": 1,
"errors.deadletterqueue.context.headers.enable": "true"
}
}
10 changes: 2 additions & 8 deletions config/quickstart-vespa-malformed-documents.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,10 @@
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"vespa.url": "http://vespa:8080/",
"vespa.namespace": "mynamespace",
"vespa.namespace": "default",
"vespa.document.type": "music",
"vespa.dryrun": "false",
"vespa.drop.invalid.message": "true",
"vespa.behavior.on.malformed.documents": "WARN",
"errors.tolerance": "all",
"errors.log.enable": "true",
"errors.log.include.messages": "true",
"errors.deadletterqueue.topic.name": "dlq",
"errors.deadletterqueue.topic.replication.factor": 1,
"errors.deadletterqueue.context.headers.enable": "true"
"vespa.behavior.on.malformed.documents": "WARN"
}
}
12 changes: 12 additions & 0 deletions config/quickstart-vespa-raw-operations.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"config": {
"name": "VespaSinkConnector1",
"connector.class": "com.vinted.kafka.connect.vespa.VespaSinkConnector",
"tasks.max": "1",
"topics": "music",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"vespa.url": "http://vespa:8080/",
"vespa.operational.mode": "raw"
}
}
12 changes: 12 additions & 0 deletions config/quickstart-vespa-topic-as-documenttype.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"config": {
"name": "VespaSinkConnector1",
"connector.class": "com.vinted.kafka.connect.vespa.VespaSinkConnector",
"tasks.max": "1",
"topics": "music",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"vespa.url": "http://vespa:8080/",
"vespa.namespace": "default"
}
}
2 changes: 1 addition & 1 deletion config/quickstart-vespa.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"vespa.url": "http://vespa:8080/",
"vespa.namespace": "mynamespace",
"vespa.namespace": "default",
"vespa.document.type": "music"
}
}
2 changes: 1 addition & 1 deletion config/quickstart-vespa.properties
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,5 @@ topics=music
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
vespa.url=http://vespa:8080/
vespa.namespace=mynamespace
vespa.namespace=default
vespa.document.type=music
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.vinted.kafka.connect.vespa</groupId>
<artifactId>kafka-connect-vespa</artifactId>
<version>1.0.2-SNAPSHOT</version>
<version>1.0.3-SNAPSHOT</version>
<name>kafka-connect-vespa</name>
<description>The Vespa Sink Connector is used to write data from Kafka to a Vespa search engine.</description>
<url>https://github.com/vinted/kafka-connect-vespa</url>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,24 @@ public class VespaSinkConfig extends AbstractConfig {
private static final String ENDPOINT_DEFAULT = "http://localhost:8080";

public static final String CONNECTIONS_PER_ENDPOINT_CONFIG = "vespa.connections.per.endpoint";
private static final String CONNECTIONS_PER_ENDPOINT_DOC = "A reasonable value here is a value that lets all feed clients (if more than one) Sets the number of connections this client will use " + "collectively have a number of connections which is a small multiple of the numbers of containers in the " + "cluster to feed, so load can be balanced across these containers. In general, this value should be kept " + "as low as possible, but poor connectivity between feeder and cluster may also warrant a higher number of " + "connections.";
private static final String CONNECTIONS_PER_ENDPOINT_DOC = "A reasonable value here is a value that lets all feed "
+ "clients (if more than one) Sets the number of connections this client will use "
+ "collectively have a number of connections which is a small multiple of the numbers of containers in "
+ "the cluster to feed, so load can be balanced across these containers. In general, this value should "
+ "be kept as low as possible, but poor connectivity between feeder and cluster may also warrant a higher "
+ "number of connections.";
private static final String CONNECTIONS_PER_ENDPOINT_DISPLAY = "Connections per endpoint";

private static final int CONNECTIONS_PER_ENDPOINT_DEFAULT = 8;

public static final String MAX_STREAMS_PER_CONNECTION_CONFIG = "vespa.max.streams.per.connection";
private static final String MAX_STREAMS_PER_CONNECTION_DOC = "This determines the maximum number of concurrent, inflight requests for this " + "Sets the maximum number of streams per HTTP/2 " + "client, which is maxConnections * maxStreamsPerConnection. Prefer more streams over more connections, " + "when possible. The feed client automatically throttles load to achieve the best throughput, and the " + "actual number of streams per connection is usually lower than the maximum.";
private static final String MAX_STREAMS_PER_CONNECTION_DOC = "This determines the maximum number of concurrent, "
+ "in-flight requests for this. Sets the maximum number of streams per HTTP/2 "
+ "client, which is maxConnections * maxStreamsPerConnection. Prefer more streams over more connections, "
+ "when possible. The feed client automatically throttles load to achieve the best throughput, and the "
+ "actual number of streams per connection is usually lower than the maximum.";
private static final String MAX_STREAMS_PER_CONNECTION_DISPLAY = "Max streams per connection";
private static final int MAX_STREAMS_PER_CONNECTION_DEFAULT = 32;
private static final int MAX_STREAMS_PER_CONNECTION_DEFAULT = 128;

public static final String DRYRUN_CONFIG = "vespa.dryrun";
private static final String DRYRUN_DOC = "Turns on dryrun mode, where each operation succeeds after a given "
Expand All @@ -57,14 +66,15 @@ public class VespaSinkConfig extends AbstractConfig {
private static final String NAMESPACE_DOC = "User specified part of each document ID in that sense. Namespace can "
+ "not be used in queries, other than as part of the full document ID. However, it can be used for "
+ "document selection, where id.namespace can be accessed and compared to a given string, for instance. "
+ "An example use case is visiting a subset of documents.";
+ "An example use case is visiting a subset of documents. Defaults to topic name if not specified.";
private static final String NAMESPACE_DISPLAY = "Namespace";
private static final String NAMESPACE_DEFAULT = "mynamespace";
private static final String NAMESPACE_DEFAULT = "";

public static final String DOCUMENT_TYPE_CONFIG = "vespa.document.type";
private static final String DOCUMENT_TYPE_DOC = "Document type as defined in services.xml and the schema.";
private static final String DOCUMENT_TYPE_DOC = "Document type as defined in services.xml and the schema. "
+ "Defaults to topic name if not specified";
private static final String DOCUMENT_TYPE_DISPLAY = "Document type";
private static final String DOCUMENT_TYPE_DEFAULT = "mydocumenttype";
private static final String DOCUMENT_TYPE_DEFAULT = "";

public static final String OPERATIONAL_MODE_CONFIG = "vespa.operational.mode";
private static final String OPERATIONAL_MODE_DOC = "The operational mode of the connector. Valid options are "
Expand Down Expand Up @@ -195,7 +205,6 @@ private static void addConnectorConfigs(ConfigDef configDef) {
NAMESPACE_CONFIG,
ConfigDef.Type.STRING,
NAMESPACE_DEFAULT,
ConfigDef.NonEmptyStringWithoutControlChars.nonEmptyStringWithoutControlChars(),
ConfigDef.Importance.HIGH,
NAMESPACE_DOC,
CONNECTOR_GROUP,
Expand All @@ -206,7 +215,6 @@ private static void addConnectorConfigs(ConfigDef configDef) {
DOCUMENT_TYPE_CONFIG,
ConfigDef.Type.STRING,
DOCUMENT_TYPE_DEFAULT,
ConfigDef.NonEmptyStringWithoutControlChars.nonEmptyStringWithoutControlChars(),
ConfigDef.Importance.HIGH,
DOCUMENT_TYPE_DOC,
CONNECTOR_GROUP,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@
import com.github.jcustenborder.kafka.connect.utils.config.DocumentationImportant;
import com.github.jcustenborder.kafka.connect.utils.config.DocumentationNote;
import com.github.jcustenborder.kafka.connect.utils.config.Title;
import com.google.common.collect.ImmutableList;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.sink.SinkConnector;

import java.util.Collections;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

Expand All @@ -35,7 +37,13 @@ public Class<? extends Task> taskClass() {

@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
return Collections.nCopies(maxTasks, props);
final List<Map<String, String>> taskConfigs = new ArrayList<>(maxTasks);

for (int i = 0; i < maxTasks; i++) {
taskConfigs.add(new HashMap<>(props));
}

return ImmutableList.copyOf(taskConfigs);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,27 +48,43 @@ protected void start(Map<String, String> props, FeedClient client) {

switch (config.operationalMode) {
case UPSERT:
log.info("Using upsert operational mode.");
feeder = new VespaUpsertFeeder(this.client, parameters, reporter, config);
break;
case RAW:
log.info("Using raw operational mode.");
feeder = new VespaRawFeeder(this.client, parameters, reporter, config);
break;
default:
log.error("Unknown operational mode: {}", config.operationalMode);
throw new ConnectException(String.format("Unknown operational mode: %s", config.operationalMode));
}
}

@Override
public void put(Collection<SinkRecord> collection) {
if (collection.isEmpty()) {
return;
}

log.debug("Putting {} records.", collection.size());

feeder.feed(collection);
try {
feeder.feed(collection);
} catch (Exception e) {
log.error("Error feeding records to Vespa.", e);
throw new ConnectException(e);
}
}

@Override
public void stop() {
log.info("Stopping Vespa Sink Task.");

if (feeder == null) {
return;
}

try {
feeder.close();
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ private Result handleMalformed(SinkRecord record, Result result, Throwable throw

switch (config.behaviorOnMalformedDoc) {
case IGNORE:
log.debug(ignoreMessage(record), throwable);
log.info(ignoreMessage(record), throwable);
return result;
case WARN:
log.warn(ignoreMessage(record), throwable);
Expand All @@ -105,8 +105,7 @@ private String ignoreMessage(SinkRecord record) {

private String errorMessage(SinkRecord record, Result result) {
return String.format(
"Failed to index document '%s'. ResultMessage: '%s'. Trace: '%s'. "
+ "To ignore future document like this, change the configuration '%s' to '%s'.",
"Failed to index document '%s'. ResultMessage: '%s'. Trace: '%s'. To ignore future document like this, change the configuration '%s' to '%s'.",
record,
result.resultMessage(),
result.traceMessage(),
Expand Down
Loading