From 815521427e46ee59028e16fc84be9bc35db8ee92 Mon Sep 17 00:00:00 2001 From: Evaldas Buinauskas <7301441+buinauskas@users.noreply.github.com> Date: Wed, 20 Sep 2023 14:31:14 +0300 Subject: [PATCH] Improve vespa connector resilience and logging (#7) * Update documentation * Update vespa feed client version * Update connector * Downgrade vespa client version * Update readme examples --- README.md | 41 ++++++++++--------- config/quickstart-vespa-json-converter.json | 2 +- ...rt-vespa-malformed-documents-with-dlq.json | 22 ++++++++++ .../quickstart-vespa-malformed-documents.json | 10 +---- config/quickstart-vespa-raw-operations.json | 12 ++++++ ...uickstart-vespa-topic-as-documenttype.json | 12 ++++++ config/quickstart-vespa.json | 2 +- config/quickstart-vespa.properties | 2 +- pom.xml | 2 +- .../kafka/connect/vespa/VespaSinkConfig.java | 26 ++++++++---- .../connect/vespa/VespaSinkConnector.java | 12 +++++- .../kafka/connect/vespa/VespaSinkTask.java | 18 +++++++- .../feeders/VespaResultCallbackHandler.java | 5 +-- .../vespa/feeders/VespaUpsertFeeder.java | 13 ++++-- .../connect/vespa/VespaRawSinkTaskTest.java | 25 ++++------- .../vespa/VespaUpsertSinkTaskTest.java | 25 +++++------ 16 files changed, 148 insertions(+), 81 deletions(-) create mode 100644 config/quickstart-vespa-malformed-documents-with-dlq.json create mode 100644 config/quickstart-vespa-raw-operations.json create mode 100644 config/quickstart-vespa-topic-as-documenttype.json diff --git a/README.md b/README.md index 600be53..89d4c7d 100644 --- a/README.md +++ b/README.md @@ -10,11 +10,11 @@ This connector has not yet been published to Confluent Hub. To install it, downl install it using `confluent-hub` command line tool. ```sh -wget https://github.com/vinted/kafka-connect-vespa/releases/download/v1.0.2/vinted-kafka-connect-vespa-1.0.2-SNAPSHOT.zip -O /tmp/vinted-kafka-connect-vespa-1.0.2-SNAPSHOT.zip -q +wget https://github.com/vinted/kafka-connect-vespa/releases/download/v1.0.3/vinted-kafka-connect-vespa-1.0.3-SNAPSHOT.zip -O /tmp/vinted-kafka-connect-vespa-1.0.3-SNAPSHOT.zip -q ``` ```sh -confluent-hub install --no-prompt /tmp/vinted-kafka-connect-vespa-1.0.2-SNAPSHOT.zip +confluent-hub install --no-prompt /tmp/vinted-kafka-connect-vespa-1.0.3-SNAPSHOT.zip ``` ### Important @@ -61,15 +61,15 @@ also warrant a higher number of connections. - Importance: low `vespa.max.streams.per.connection` -This determines the maximum number of concurrent, inflight requests for -this Sets the maximum number of streams per HTTP/2 client, which is -maxConnections \* maxStreamsPerConnection. Prefer more streams over more +This determines the maximum number of concurrent, in-flight requests for +these Sets the maximum number of streams per HTTP/2 client, which is +maxConnections \* maxStreamsPerConnection. Prefer more streams to more connections, when possible. The feed client automatically throttles load to achieve the best throughput, and the actual number of streams per connection is usually lower than the maximum. - Type: int -- Default: 32 +- Default: 128 - Valid Values: \[1,...\] - Importance: low @@ -100,21 +100,19 @@ The period of consecutive failures before shutting down. `vespa.namespace` User specified part of each document ID in that sense. Namespace can not be used in queries, other than as part of the full document ID. However, -it can be used for document selection, where id.namespace can be +it can be used for document selection, where namespace can be accessed and compared to a given string, for instance. An example use -case is visiting a subset of documents. +case is visiting a subset of documents. Defaults to topic name if not specified. - Type: string -- Default: mynamespace -- Valid Values: non-empty string without ISO control characters +- Default: \"\" - Importance: high `vespa.document.type` -Document type as defined in services.xml and the schema. +Document type as defined in services.xml and the schema. Defaults to topic name if not specified. - Type: string -- Default: mydocumenttype -- Valid Values: non-empty string without ISO control characters +- Default: \"\" - Importance: high `vespa.operational.mode` @@ -185,6 +183,9 @@ Valid options are ignore', 'warn', and 'fail'. #### Examples +Connector configuration examples can be found in the [config](config) directory. But here are some quick ones to get +started. + ##### Standalone Example This configuration is used typically along @@ -198,8 +199,8 @@ topics=music key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter vespa.endpoint=http://vespa:8080/ -vespa.namespace=mynamespace -vespa.document.type=mydocumenttype +vespa.namespace=default +vespa.document.type=music ``` ##### Distributed Example @@ -218,23 +219,23 @@ the distributed connect worker(s). "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "org.apache.kafka.connect.storage.StringConverter", "vespa.endpoint": "http://vespa:8080/", - "vespa.namespace": "mynamespace", - "vespa.document.type": "mydocumenttype" + "vespa.namespace": "default", + "vespa.document.type": "music" } } ``` -Use curl to post the configuration to one of the Kafka Connect Workers. Change `http://localhost:8088/` the endpoint of +Use curl to post the configuration to one of the Kafka Connect Workers. Change `http://localhost:8083/` the endpoint of one of your Kafka Connect worker(s). Create a new instance. ```bash -curl -s -X POST -H 'Content-Type: application/json' --data @connector.json http://localhost:8088/connectors +curl -s -X POST -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors ``` Update an existing instance. ```bash -curl -s -X PUT -H 'Content-Type: application/json' --data @connector.json http://localhost:8088/connectors/TestSinkConnector1/config +curl -s -X PUT -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors/TestSinkConnector1/config ``` diff --git a/config/quickstart-vespa-json-converter.json b/config/quickstart-vespa-json-converter.json index 0741a15..4b728fd 100644 --- a/config/quickstart-vespa-json-converter.json +++ b/config/quickstart-vespa-json-converter.json @@ -8,7 +8,7 @@ "value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": "false", "vespa.url": "http://vespa:8080/", - "vespa.namespace": "mynamespace", + "vespa.namespace": "default", "vespa.document.type": "music" } } diff --git a/config/quickstart-vespa-malformed-documents-with-dlq.json b/config/quickstart-vespa-malformed-documents-with-dlq.json new file mode 100644 index 0000000..a31928e --- /dev/null +++ b/config/quickstart-vespa-malformed-documents-with-dlq.json @@ -0,0 +1,22 @@ +{ + "config": { + "name": "VespaSinkConnector1", + "connector.class": "com.vinted.kafka.connect.vespa.VespaSinkConnector", + "tasks.max": "1", + "topics": "music", + "key.converter": "org.apache.kafka.connect.storage.StringConverter", + "value.converter": "org.apache.kafka.connect.storage.StringConverter", + "vespa.url": "http://vespa:8080/", + "vespa.namespace": "default", + "vespa.document.type": "music", + "vespa.dryrun": "false", + "vespa.drop.invalid.message": "true", + "vespa.behavior.on.malformed.documents": "WARN", + "errors.tolerance": "all", + "errors.log.enable": "true", + "errors.log.include.messages": "true", + "errors.deadletterqueue.topic.name": "dlq", + "errors.deadletterqueue.topic.replication.factor": 1, + "errors.deadletterqueue.context.headers.enable": "true" + } +} diff --git a/config/quickstart-vespa-malformed-documents.json b/config/quickstart-vespa-malformed-documents.json index 2794069..7d226fa 100644 --- a/config/quickstart-vespa-malformed-documents.json +++ b/config/quickstart-vespa-malformed-documents.json @@ -7,16 +7,10 @@ "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "org.apache.kafka.connect.storage.StringConverter", "vespa.url": "http://vespa:8080/", - "vespa.namespace": "mynamespace", + "vespa.namespace": "default", "vespa.document.type": "music", "vespa.dryrun": "false", "vespa.drop.invalid.message": "true", - "vespa.behavior.on.malformed.documents": "WARN", - "errors.tolerance": "all", - "errors.log.enable": "true", - "errors.log.include.messages": "true", - "errors.deadletterqueue.topic.name": "dlq", - "errors.deadletterqueue.topic.replication.factor": 1, - "errors.deadletterqueue.context.headers.enable": "true" + "vespa.behavior.on.malformed.documents": "WARN" } } diff --git a/config/quickstart-vespa-raw-operations.json b/config/quickstart-vespa-raw-operations.json new file mode 100644 index 0000000..ecfbb08 --- /dev/null +++ b/config/quickstart-vespa-raw-operations.json @@ -0,0 +1,12 @@ +{ + "config": { + "name": "VespaSinkConnector1", + "connector.class": "com.vinted.kafka.connect.vespa.VespaSinkConnector", + "tasks.max": "1", + "topics": "music", + "key.converter": "org.apache.kafka.connect.storage.StringConverter", + "value.converter": "org.apache.kafka.connect.storage.StringConverter", + "vespa.url": "http://vespa:8080/", + "vespa.operational.mode": "raw" + } +} diff --git a/config/quickstart-vespa-topic-as-documenttype.json b/config/quickstart-vespa-topic-as-documenttype.json new file mode 100644 index 0000000..03e8c90 --- /dev/null +++ b/config/quickstart-vespa-topic-as-documenttype.json @@ -0,0 +1,12 @@ +{ + "config": { + "name": "VespaSinkConnector1", + "connector.class": "com.vinted.kafka.connect.vespa.VespaSinkConnector", + "tasks.max": "1", + "topics": "music", + "key.converter": "org.apache.kafka.connect.storage.StringConverter", + "value.converter": "org.apache.kafka.connect.storage.StringConverter", + "vespa.url": "http://vespa:8080/", + "vespa.namespace": "default" + } +} diff --git a/config/quickstart-vespa.json b/config/quickstart-vespa.json index 04ca4d1..94aab04 100644 --- a/config/quickstart-vespa.json +++ b/config/quickstart-vespa.json @@ -7,7 +7,7 @@ "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "org.apache.kafka.connect.storage.StringConverter", "vespa.url": "http://vespa:8080/", - "vespa.namespace": "mynamespace", + "vespa.namespace": "default", "vespa.document.type": "music" } } diff --git a/config/quickstart-vespa.properties b/config/quickstart-vespa.properties index 9bcfa70..aaa634b 100644 --- a/config/quickstart-vespa.properties +++ b/config/quickstart-vespa.properties @@ -5,5 +5,5 @@ topics=music key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter vespa.url=http://vespa:8080/ -vespa.namespace=mynamespace +vespa.namespace=default vespa.document.type=music diff --git a/pom.xml b/pom.xml index f44dfa8..490b3a1 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ 4.0.0 com.vinted.kafka.connect.vespa kafka-connect-vespa - 1.0.2-SNAPSHOT + 1.0.3-SNAPSHOT kafka-connect-vespa The Vespa Sink Connector is used to write data from Kafka to a Vespa search engine. https://github.com/vinted/kafka-connect-vespa diff --git a/src/main/java/com/vinted/kafka/connect/vespa/VespaSinkConfig.java b/src/main/java/com/vinted/kafka/connect/vespa/VespaSinkConfig.java index ca556df..674284b 100644 --- a/src/main/java/com/vinted/kafka/connect/vespa/VespaSinkConfig.java +++ b/src/main/java/com/vinted/kafka/connect/vespa/VespaSinkConfig.java @@ -26,15 +26,24 @@ public class VespaSinkConfig extends AbstractConfig { private static final String ENDPOINT_DEFAULT = "http://localhost:8080"; public static final String CONNECTIONS_PER_ENDPOINT_CONFIG = "vespa.connections.per.endpoint"; - private static final String CONNECTIONS_PER_ENDPOINT_DOC = "A reasonable value here is a value that lets all feed clients (if more than one) Sets the number of connections this client will use " + "collectively have a number of connections which is a small multiple of the numbers of containers in the " + "cluster to feed, so load can be balanced across these containers. In general, this value should be kept " + "as low as possible, but poor connectivity between feeder and cluster may also warrant a higher number of " + "connections."; + private static final String CONNECTIONS_PER_ENDPOINT_DOC = "A reasonable value here is a value that lets all feed " + + "clients (if more than one) Sets the number of connections this client will use " + + "collectively have a number of connections which is a small multiple of the numbers of containers in " + + "the cluster to feed, so load can be balanced across these containers. In general, this value should " + + "be kept as low as possible, but poor connectivity between feeder and cluster may also warrant a higher " + + "number of connections."; private static final String CONNECTIONS_PER_ENDPOINT_DISPLAY = "Connections per endpoint"; private static final int CONNECTIONS_PER_ENDPOINT_DEFAULT = 8; public static final String MAX_STREAMS_PER_CONNECTION_CONFIG = "vespa.max.streams.per.connection"; - private static final String MAX_STREAMS_PER_CONNECTION_DOC = "This determines the maximum number of concurrent, inflight requests for this " + "Sets the maximum number of streams per HTTP/2 " + "client, which is maxConnections * maxStreamsPerConnection. Prefer more streams over more connections, " + "when possible. The feed client automatically throttles load to achieve the best throughput, and the " + "actual number of streams per connection is usually lower than the maximum."; + private static final String MAX_STREAMS_PER_CONNECTION_DOC = "This determines the maximum number of concurrent, " + + "in-flight requests for this. Sets the maximum number of streams per HTTP/2 " + + "client, which is maxConnections * maxStreamsPerConnection. Prefer more streams over more connections, " + + "when possible. The feed client automatically throttles load to achieve the best throughput, and the " + + "actual number of streams per connection is usually lower than the maximum."; private static final String MAX_STREAMS_PER_CONNECTION_DISPLAY = "Max streams per connection"; - private static final int MAX_STREAMS_PER_CONNECTION_DEFAULT = 32; + private static final int MAX_STREAMS_PER_CONNECTION_DEFAULT = 128; public static final String DRYRUN_CONFIG = "vespa.dryrun"; private static final String DRYRUN_DOC = "Turns on dryrun mode, where each operation succeeds after a given " @@ -57,14 +66,15 @@ public class VespaSinkConfig extends AbstractConfig { private static final String NAMESPACE_DOC = "User specified part of each document ID in that sense. Namespace can " + "not be used in queries, other than as part of the full document ID. However, it can be used for " + "document selection, where id.namespace can be accessed and compared to a given string, for instance. " - + "An example use case is visiting a subset of documents."; + + "An example use case is visiting a subset of documents. Defaults to topic name if not specified."; private static final String NAMESPACE_DISPLAY = "Namespace"; - private static final String NAMESPACE_DEFAULT = "mynamespace"; + private static final String NAMESPACE_DEFAULT = ""; public static final String DOCUMENT_TYPE_CONFIG = "vespa.document.type"; - private static final String DOCUMENT_TYPE_DOC = "Document type as defined in services.xml and the schema."; + private static final String DOCUMENT_TYPE_DOC = "Document type as defined in services.xml and the schema. " + + "Defaults to topic name if not specified"; private static final String DOCUMENT_TYPE_DISPLAY = "Document type"; - private static final String DOCUMENT_TYPE_DEFAULT = "mydocumenttype"; + private static final String DOCUMENT_TYPE_DEFAULT = ""; public static final String OPERATIONAL_MODE_CONFIG = "vespa.operational.mode"; private static final String OPERATIONAL_MODE_DOC = "The operational mode of the connector. Valid options are " @@ -195,7 +205,6 @@ private static void addConnectorConfigs(ConfigDef configDef) { NAMESPACE_CONFIG, ConfigDef.Type.STRING, NAMESPACE_DEFAULT, - ConfigDef.NonEmptyStringWithoutControlChars.nonEmptyStringWithoutControlChars(), ConfigDef.Importance.HIGH, NAMESPACE_DOC, CONNECTOR_GROUP, @@ -206,7 +215,6 @@ private static void addConnectorConfigs(ConfigDef configDef) { DOCUMENT_TYPE_CONFIG, ConfigDef.Type.STRING, DOCUMENT_TYPE_DEFAULT, - ConfigDef.NonEmptyStringWithoutControlChars.nonEmptyStringWithoutControlChars(), ConfigDef.Importance.HIGH, DOCUMENT_TYPE_DOC, CONNECTOR_GROUP, diff --git a/src/main/java/com/vinted/kafka/connect/vespa/VespaSinkConnector.java b/src/main/java/com/vinted/kafka/connect/vespa/VespaSinkConnector.java index a00345b..181f438 100644 --- a/src/main/java/com/vinted/kafka/connect/vespa/VespaSinkConnector.java +++ b/src/main/java/com/vinted/kafka/connect/vespa/VespaSinkConnector.java @@ -5,11 +5,13 @@ import com.github.jcustenborder.kafka.connect.utils.config.DocumentationImportant; import com.github.jcustenborder.kafka.connect.utils.config.DocumentationNote; import com.github.jcustenborder.kafka.connect.utils.config.Title; +import com.google.common.collect.ImmutableList; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.connect.connector.Task; import org.apache.kafka.connect.sink.SinkConnector; -import java.util.Collections; +import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -35,7 +37,13 @@ public Class taskClass() { @Override public List> taskConfigs(int maxTasks) { - return Collections.nCopies(maxTasks, props); + final List> taskConfigs = new ArrayList<>(maxTasks); + + for (int i = 0; i < maxTasks; i++) { + taskConfigs.add(new HashMap<>(props)); + } + + return ImmutableList.copyOf(taskConfigs); } @Override diff --git a/src/main/java/com/vinted/kafka/connect/vespa/VespaSinkTask.java b/src/main/java/com/vinted/kafka/connect/vespa/VespaSinkTask.java index 203458a..35ed4e4 100644 --- a/src/main/java/com/vinted/kafka/connect/vespa/VespaSinkTask.java +++ b/src/main/java/com/vinted/kafka/connect/vespa/VespaSinkTask.java @@ -48,27 +48,43 @@ protected void start(Map props, FeedClient client) { switch (config.operationalMode) { case UPSERT: + log.info("Using upsert operational mode."); feeder = new VespaUpsertFeeder(this.client, parameters, reporter, config); break; case RAW: + log.info("Using raw operational mode."); feeder = new VespaRawFeeder(this.client, parameters, reporter, config); break; default: + log.error("Unknown operational mode: {}", config.operationalMode); throw new ConnectException(String.format("Unknown operational mode: %s", config.operationalMode)); } } @Override public void put(Collection collection) { + if (collection.isEmpty()) { + return; + } + log.debug("Putting {} records.", collection.size()); - feeder.feed(collection); + try { + feeder.feed(collection); + } catch (Exception e) { + log.error("Error feeding records to Vespa.", e); + throw new ConnectException(e); + } } @Override public void stop() { log.info("Stopping Vespa Sink Task."); + if (feeder == null) { + return; + } + try { feeder.close(); } catch (IOException e) { diff --git a/src/main/java/com/vinted/kafka/connect/vespa/feeders/VespaResultCallbackHandler.java b/src/main/java/com/vinted/kafka/connect/vespa/feeders/VespaResultCallbackHandler.java index c599f29..0e4ddba 100644 --- a/src/main/java/com/vinted/kafka/connect/vespa/feeders/VespaResultCallbackHandler.java +++ b/src/main/java/com/vinted/kafka/connect/vespa/feeders/VespaResultCallbackHandler.java @@ -87,7 +87,7 @@ private Result handleMalformed(SinkRecord record, Result result, Throwable throw switch (config.behaviorOnMalformedDoc) { case IGNORE: - log.debug(ignoreMessage(record), throwable); + log.info(ignoreMessage(record), throwable); return result; case WARN: log.warn(ignoreMessage(record), throwable); @@ -105,8 +105,7 @@ private String ignoreMessage(SinkRecord record) { private String errorMessage(SinkRecord record, Result result) { return String.format( - "Failed to index document '%s'. ResultMessage: '%s'. Trace: '%s'. " - + "To ignore future document like this, change the configuration '%s' to '%s'.", + "Failed to index document '%s'. ResultMessage: '%s'. Trace: '%s'. To ignore future document like this, change the configuration '%s' to '%s'.", record, result.resultMessage(), result.traceMessage(), diff --git a/src/main/java/com/vinted/kafka/connect/vespa/feeders/VespaUpsertFeeder.java b/src/main/java/com/vinted/kafka/connect/vespa/feeders/VespaUpsertFeeder.java index f3a1aad..689557f 100644 --- a/src/main/java/com/vinted/kafka/connect/vespa/feeders/VespaUpsertFeeder.java +++ b/src/main/java/com/vinted/kafka/connect/vespa/feeders/VespaUpsertFeeder.java @@ -14,7 +14,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; import java.util.Collection; import java.util.List; import java.util.concurrent.CompletableFuture; @@ -57,7 +56,7 @@ public void feed(Collection collection) { } @Override - public void close() throws IOException { + public void close() { client.close(); } @@ -72,7 +71,7 @@ private Stream toOperation(SinkRecord record) { String key = keyConverter.convert(record); String value = valueConverter.convert(record); - DocumentId documentId = DocumentId.of(config.namespace, config.documentType, key); + DocumentId documentId = DocumentId.of(getNamespace(record), getDocumentType(record), key); Operation operation = new Operation(record, documentId, value); return Stream.of(operation); @@ -89,6 +88,14 @@ private Stream toOperation(SinkRecord record) { } } + private String getNamespace(SinkRecord record) { + return config.namespace.isEmpty() ? record.topic() : config.namespace; + } + + private String getDocumentType(SinkRecord record) { + return config.namespace.isEmpty() ? record.topic() : config.documentType; + } + private static class Operation { public final SinkRecord record; public final DocumentId documentId; diff --git a/src/test/java/com/vinted/kafka/connect/vespa/VespaRawSinkTaskTest.java b/src/test/java/com/vinted/kafka/connect/vespa/VespaRawSinkTaskTest.java index f3bde39..af0b967 100644 --- a/src/test/java/com/vinted/kafka/connect/vespa/VespaRawSinkTaskTest.java +++ b/src/test/java/com/vinted/kafka/connect/vespa/VespaRawSinkTaskTest.java @@ -12,24 +12,24 @@ public class VespaRawSinkTaskTest { private long offset = 1; - private SinkRecord lastRecord; - private Map params; - private MockVespaFeedClient client; - private VespaSinkTask task; + private final Map params = new HashMap<>(); + private final MockVespaFeedClient client = new MockVespaFeedClient(); + private final VespaSinkTask task = new VespaSinkTask(); @BeforeEach void before() { - params = new HashMap<>(); params.put(VespaSinkConfig.NAMESPACE_CONFIG, "test_namespace"); params.put(VespaSinkConfig.DOCUMENT_TYPE_CONFIG, "test_document_type"); params.put(VespaSinkConfig.OPERATIONAL_MODE_CONFIG, VespaSinkConfig.OperationalMode.RAW.name()); params.put(VespaSinkConfig.BEHAVIOR_ON_MALFORMED_DOCS_CONFIG, VespaSinkConfig.BehaviorOnMalformedDoc.WARN.name()); - - client = new MockVespaFeedClient(); - task = new VespaSinkTask(); task.start(params, client); } + @AfterEach + void after() { + this.task.stop(); + } + @Test void writesDocumentsToVespa() { List records = Collections.singletonList( @@ -71,13 +71,6 @@ void handlesMalformedPayloads() { task.put(records); } - @AfterEach - void after() { - if (this.task != null) { - this.task.stop(); - } - } - private SinkRecord record(String key, String value) { final Schema keySchema = Schema.STRING_SCHEMA; final Schema valueSchema; @@ -89,6 +82,6 @@ private SinkRecord record(String key, String value) { valueSchema = Schema.STRING_SCHEMA; } - return lastRecord = new SinkRecord("topic", 1, keySchema, key, valueSchema, value, offset++); + return new SinkRecord("topic", 1, keySchema, key, valueSchema, value, offset++); } } diff --git a/src/test/java/com/vinted/kafka/connect/vespa/VespaUpsertSinkTaskTest.java b/src/test/java/com/vinted/kafka/connect/vespa/VespaUpsertSinkTaskTest.java index bd61546..bd4c940 100644 --- a/src/test/java/com/vinted/kafka/connect/vespa/VespaUpsertSinkTaskTest.java +++ b/src/test/java/com/vinted/kafka/connect/vespa/VespaUpsertSinkTaskTest.java @@ -15,22 +15,22 @@ public class VespaUpsertSinkTaskTest { private long offset = 1; - private SinkRecord lastRecord; - private Map params; - private MockVespaFeedClient client; - private VespaSinkTask task; + private final Map params = new HashMap<>(); + private final MockVespaFeedClient client = new MockVespaFeedClient(); + private final VespaSinkTask task = new VespaSinkTask(); @BeforeEach void before() { - params = new HashMap<>(); params.put(VespaSinkConfig.NAMESPACE_CONFIG, "test_namespace"); params.put(VespaSinkConfig.DOCUMENT_TYPE_CONFIG, "test_document_type"); - - client = new MockVespaFeedClient(); - task = new VespaSinkTask(); task.start(params, client); } + @AfterEach + void after() { + this.task.stop(); + } + @Test void writesDocumentsToVespa() { List records = Arrays.asList( @@ -73,12 +73,6 @@ void writesDocumentsToVespa() { ); } - @AfterEach - void after() { - if (this.task != null) { - this.task.stop(); - } - } private SinkRecord record(String key, String value) { final Schema keySchema = Schema.STRING_SCHEMA; @@ -91,6 +85,7 @@ private SinkRecord record(String key, String value) { valueSchema = Schema.STRING_SCHEMA; } - return lastRecord = new SinkRecord("topic", 1, keySchema, key, valueSchema, value, offset++); + + return new SinkRecord("topic", 1, keySchema, key, valueSchema, value, offset++); } }