From 265a085eef623ca254f4b93b523186fe127f56a5 Mon Sep 17 00:00:00 2001 From: Dhru Devalia Date: Wed, 31 Jul 2024 14:09:03 +0100 Subject: [PATCH] feat: add connector configuration migrator for 5.1 (#642) * feat: adds configuration migrator for 5.1 connector upgrade * test: add base test for configuration migrator * feat: add config migration on connector shutdown * test: add missing keys test * test: update tests with more config inputs * refactor: move ConfigurationMigratorTest to correct package * refactor: change variable declaration * test: add configuration migration test for quickstart example json * docs: update function comments * test: add map size assertions * refactor: move logging outside ConfigurationMigrator * refactor: add offset to migrated config * refactor: change migration function name --------- Co-authored-by: Eugene Rubanov --- .../connect/common/ConfigurationMigrator.kt | 155 +++++++++++++++ .../kafka/connect/sink/Neo4jSinkConnector.kt | 18 +- .../connect/source/Neo4jSourceConnector.kt | 7 +- .../connect/source/Neo4jSourceService.kt | 13 ++ .../common/ConfigurationMigratorTest.kt | 182 ++++++++++++++++++ .../resources/exampleConfigs/sinkExample.json | 20 ++ .../exampleConfigs/sourceExample.json | 19 ++ 7 files changed, 407 insertions(+), 7 deletions(-) create mode 100644 kafka-connect-neo4j/src/main/kotlin/streams/kafka/connect/common/ConfigurationMigrator.kt create mode 100644 kafka-connect-neo4j/src/test/kotlin/streams/kafka/connect/common/ConfigurationMigratorTest.kt create mode 100644 kafka-connect-neo4j/src/test/resources/exampleConfigs/sinkExample.json create mode 100644 kafka-connect-neo4j/src/test/resources/exampleConfigs/sourceExample.json diff --git a/kafka-connect-neo4j/src/main/kotlin/streams/kafka/connect/common/ConfigurationMigrator.kt b/kafka-connect-neo4j/src/main/kotlin/streams/kafka/connect/common/ConfigurationMigrator.kt new file mode 100644 index 00000000..13b4c2b7 --- /dev/null +++ b/kafka-connect-neo4j/src/main/kotlin/streams/kafka/connect/common/ConfigurationMigrator.kt @@ -0,0 +1,155 @@ +package streams.kafka.connect.common + +import org.slf4j.Logger +import org.slf4j.LoggerFactory +import streams.kafka.connect.common.Neo4jConnectorConfig.Companion.AUTHENTICATION_BASIC_PASSWORD +import streams.kafka.connect.common.Neo4jConnectorConfig.Companion.AUTHENTICATION_BASIC_REALM +import streams.kafka.connect.common.Neo4jConnectorConfig.Companion.AUTHENTICATION_BASIC_USERNAME +import streams.kafka.connect.common.Neo4jConnectorConfig.Companion.AUTHENTICATION_KERBEROS_TICKET +import streams.kafka.connect.common.Neo4jConnectorConfig.Companion.AUTHENTICATION_TYPE +import streams.kafka.connect.common.Neo4jConnectorConfig.Companion.BATCH_SIZE +import streams.kafka.connect.common.Neo4jConnectorConfig.Companion.BATCH_TIMEOUT_MSECS +import streams.kafka.connect.common.Neo4jConnectorConfig.Companion.CONNECTION_LIVENESS_CHECK_TIMEOUT_MSECS +import streams.kafka.connect.common.Neo4jConnectorConfig.Companion.CONNECTION_MAX_CONNECTION_ACQUISITION_TIMEOUT_MSECS +import streams.kafka.connect.common.Neo4jConnectorConfig.Companion.CONNECTION_MAX_CONNECTION_LIFETIME_MSECS +import streams.kafka.connect.common.Neo4jConnectorConfig.Companion.CONNECTION_POOL_MAX_SIZE +import streams.kafka.connect.common.Neo4jConnectorConfig.Companion.DATABASE +import streams.kafka.connect.common.Neo4jConnectorConfig.Companion.ENCRYPTION_CA_CERTIFICATE_PATH +import streams.kafka.connect.common.Neo4jConnectorConfig.Companion.ENCRYPTION_ENABLED +import streams.kafka.connect.common.Neo4jConnectorConfig.Companion.ENCRYPTION_TRUST_STRATEGY +import streams.kafka.connect.common.Neo4jConnectorConfig.Companion.RETRY_BACKOFF_MSECS +import streams.kafka.connect.common.Neo4jConnectorConfig.Companion.RETRY_MAX_ATTEMPTS +import streams.kafka.connect.common.Neo4jConnectorConfig.Companion.SERVER_URI +import streams.kafka.connect.sink.Neo4jSinkConnectorConfig +import streams.kafka.connect.sink.Neo4jSinkConnectorConfig.Companion.BATCH_PARALLELIZE +import streams.kafka.connect.sink.Neo4jSinkConnectorConfig.Companion.TOPIC_CDC_SCHEMA +import streams.kafka.connect.sink.Neo4jSinkConnectorConfig.Companion.TOPIC_CDC_SOURCE_ID +import streams.kafka.connect.sink.Neo4jSinkConnectorConfig.Companion.TOPIC_CDC_SOURCE_ID_ID_NAME +import streams.kafka.connect.sink.Neo4jSinkConnectorConfig.Companion.TOPIC_CDC_SOURCE_ID_LABEL_NAME +import streams.kafka.connect.sink.Neo4jSinkConnectorConfig.Companion.TOPIC_CUD +import streams.kafka.connect.sink.Neo4jSinkConnectorConfig.Companion.TOPIC_PATTERN_MERGE_NODE_PROPERTIES_ENABLED +import streams.kafka.connect.sink.Neo4jSinkConnectorConfig.Companion.TOPIC_PATTERN_MERGE_RELATIONSHIP_PROPERTIES_ENABLED +import streams.kafka.connect.source.Neo4jSourceConnectorConfig.Companion.ENFORCE_SCHEMA +import streams.kafka.connect.source.Neo4jSourceConnectorConfig.Companion.SOURCE_TYPE +import streams.kafka.connect.source.Neo4jSourceConnectorConfig.Companion.SOURCE_TYPE_QUERY +import streams.kafka.connect.source.Neo4jSourceConnectorConfig.Companion.STREAMING_FROM +import streams.kafka.connect.source.Neo4jSourceConnectorConfig.Companion.STREAMING_POLL_INTERVAL +import streams.kafka.connect.source.Neo4jSourceConnectorConfig.Companion.STREAMING_PROPERTY +import streams.kafka.connect.source.Neo4jSourceConnectorConfig.Companion.TOPIC + +/** + * Migrates configuration for the Neo4j Kafka Connector from <5.1 versions to 5.1. + * The connector upgrade includes breaking changes in configuration keys, from name changes. + * + * @property settings Kafka connect configuration + */ +class ConfigurationMigrator(private val settings: Map) { + + private val log: Logger = LoggerFactory.getLogger(ConfigurationMigrator::class.java) + + /** + * Property converter record + * + * @property updatedConfigKey New configuration key name + * @property migrationHandler Custom function to + * @constructor Create empty Property converter + */ + data class PropertyConverter(val updatedConfigKey: String, val migrationHandler: () -> String) + + private val propertyConverterMap: Map = mutableMapOf( + // Common + DATABASE to PropertyConverter("neo4j.database") { settings[DATABASE] as String }, + SERVER_URI to PropertyConverter("neo4j.uri") { settings[SERVER_URI] as String }, + AUTHENTICATION_TYPE to PropertyConverter("neo4j.authentication.type") { settings[AUTHENTICATION_TYPE] as String }, + AUTHENTICATION_BASIC_USERNAME to PropertyConverter("neo4j.authentication.basic.username") {settings[AUTHENTICATION_BASIC_USERNAME] as String}, + AUTHENTICATION_BASIC_PASSWORD to PropertyConverter("neo4j.authentication.basic.password") {settings[AUTHENTICATION_BASIC_PASSWORD] as String}, + AUTHENTICATION_BASIC_REALM to PropertyConverter("neo4j.authentication.basic.realm") {settings[AUTHENTICATION_BASIC_REALM] as String}, + AUTHENTICATION_KERBEROS_TICKET to PropertyConverter("neo4j.authentication.kerberos.ticket") {settings[AUTHENTICATION_KERBEROS_TICKET] as String}, + BATCH_SIZE to PropertyConverter("neo4j.batch-size") {settings[BATCH_SIZE] as String}, + ENCRYPTION_ENABLED to PropertyConverter("neo4j.security.encrypted") {settings[ENCRYPTION_ENABLED] as String}, + ENCRYPTION_TRUST_STRATEGY to PropertyConverter("neo4j.security.trust-strategy") {settings[ENCRYPTION_TRUST_STRATEGY] as String}, + ENCRYPTION_CA_CERTIFICATE_PATH to PropertyConverter("") {settings[ENCRYPTION_CA_CERTIFICATE_PATH] as String}, + CONNECTION_MAX_CONNECTION_LIFETIME_MSECS to PropertyConverter("neo4j.connection-timeout") { convertMsecs(settings[CONNECTION_MAX_CONNECTION_LIFETIME_MSECS] as String) }, + CONNECTION_MAX_CONNECTION_ACQUISITION_TIMEOUT_MSECS to PropertyConverter("neo4j.pool.connection-acquisition-timeout") { convertMsecs(settings[CONNECTION_MAX_CONNECTION_ACQUISITION_TIMEOUT_MSECS] as String) }, + CONNECTION_LIVENESS_CHECK_TIMEOUT_MSECS to PropertyConverter("neo4j.pool.idle-time-before-connection-test") { convertMsecs(settings[CONNECTION_LIVENESS_CHECK_TIMEOUT_MSECS] as String) }, + CONNECTION_POOL_MAX_SIZE to PropertyConverter("neo4j.pool.max-connection-pool-size") {settings[CONNECTION_POOL_MAX_SIZE] as String}, + RETRY_BACKOFF_MSECS to PropertyConverter("neo4j.max-retry-time") { convertMsecs(settings[RETRY_BACKOFF_MSECS] as String) }, + RETRY_MAX_ATTEMPTS to PropertyConverter("neo4j.max-retry-attempts") {settings[RETRY_MAX_ATTEMPTS] as String}, + // Sink + TOPIC_CDC_SOURCE_ID to PropertyConverter("neo4j.cdc.source-id.topics") {settings[TOPIC_CDC_SOURCE_ID] as String}, + TOPIC_CDC_SOURCE_ID_LABEL_NAME to PropertyConverter("neo4j.cdc.source-id.label-name") {settings[TOPIC_CDC_SOURCE_ID_LABEL_NAME] as String}, + TOPIC_CDC_SOURCE_ID_ID_NAME to PropertyConverter("neo4j.cdc.source-id.property-name") {settings[TOPIC_CDC_SOURCE_ID_ID_NAME] as String}, + TOPIC_CDC_SCHEMA to PropertyConverter("neo4j.cdc.schema.topics") {settings[TOPIC_CDC_SCHEMA] as String}, + BATCH_PARALLELIZE to PropertyConverter("") {settings[BATCH_PARALLELIZE] as String}, + TOPIC_CUD to PropertyConverter("neo4j.cud.topics") {settings[TOPIC_CUD] as String}, + TOPIC_PATTERN_MERGE_NODE_PROPERTIES_ENABLED to PropertyConverter("neo4j.pattern.node.merge-properties") {settings[TOPIC_PATTERN_MERGE_NODE_PROPERTIES_ENABLED] as String}, + TOPIC_PATTERN_MERGE_RELATIONSHIP_PROPERTIES_ENABLED to PropertyConverter("neo4j.pattern.relationship.merge-properties") {settings[TOPIC_PATTERN_MERGE_RELATIONSHIP_PROPERTIES_ENABLED] as String}, + // Source + BATCH_TIMEOUT_MSECS to PropertyConverter("neo4j.batch-timeout") { convertMsecs(settings[BATCH_TIMEOUT_MSECS] as String) }, + TOPIC to PropertyConverter("neo4j.query.topic") {settings[TOPIC] as String}, + STREAMING_FROM to PropertyConverter("") {settings[STREAMING_FROM] as String}, + SOURCE_TYPE to PropertyConverter("") {settings[SOURCE_TYPE] as String}, + SOURCE_TYPE_QUERY to PropertyConverter("neo4j.query") {settings[SOURCE_TYPE_QUERY] as String}, + STREAMING_PROPERTY to PropertyConverter("neo4j.query.streaming-property") {settings[STREAMING_PROPERTY] as String}, + STREAMING_POLL_INTERVAL to PropertyConverter("neo4j.query.poll-interval") { convertMsecs(settings[STREAMING_POLL_INTERVAL] as String) }, + ENFORCE_SCHEMA to PropertyConverter("") {settings[ENFORCE_SCHEMA] as String} + ) + + // Configuration properties that have user-defined keys + private val prefixConverterMap: Map = mutableMapOf( + Neo4jSinkConnectorConfig.TOPIC_PATTERN_NODE_PREFIX to "neo4j.pattern.node.topic.", + Neo4jSinkConnectorConfig.TOPIC_PATTERN_RELATIONSHIP_PREFIX to "neo4j.pattern.relationship.topic.", + Neo4jSinkConnectorConfig.TOPIC_CYPHER_PREFIX to "neo4j.cypher.topic." + ) + + /** + * Migrate configuration keys from existing format to v5.1 connector format. + * Configuration properties containing msecs units are converted to new format. + * + * @return updated configuration key-value pairs + */ + fun migrateToV51(): Map { + val updatedConfig: MutableMap = mutableMapOf() + + settings.forEach { (originalKey, value) -> + val propConverter = propertyConverterMap[originalKey] + if (propConverter != null) { + val newKey = propConverter.updatedConfigKey + if (newKey.isBlank()) return@forEach // Configuration option found, but no new equivalent key exists + updatedConfig[newKey] = propConverter.migrationHandler() + log.debug("Migrating configuration {} to {}", originalKey, newKey) + } else if (prefixConverterMap.keys.any { k -> originalKey.startsWith(k) }) { + val prefixMatch = prefixConverterMap.keys.find { k -> originalKey.startsWith(k) } + prefixMatch?.let { prefix -> + val replacement = prefixConverterMap[prefixMatch] + replacement?.let { repl -> + val newKey = originalKey.replace(prefix, repl) + updatedConfig[newKey] = value + log.debug("Migrating configuration prefix key {} to {}", originalKey, newKey) + } + } + } else { + // Configuration option not declared should be copied across + updatedConfig[originalKey] = value + } + } + + return updatedConfig + } + + companion object { + /** + * Converts milliseconds format into new format of time units + * Valid new format units are: `ms`, `s`, `m`, `h` and `d`. + * e.g. 1000 -> 1000ms + * + * @param msecs Original time value + * @return Migrated configuration time containing units + */ + private fun convertMsecs(msecs: String): String { + return "${msecs}ms" + } + } + +} + diff --git a/kafka-connect-neo4j/src/main/kotlin/streams/kafka/connect/sink/Neo4jSinkConnector.kt b/kafka-connect-neo4j/src/main/kotlin/streams/kafka/connect/sink/Neo4jSinkConnector.kt index cfbd042e..fa16767e 100644 --- a/kafka-connect-neo4j/src/main/kotlin/streams/kafka/connect/sink/Neo4jSinkConnector.kt +++ b/kafka-connect-neo4j/src/main/kotlin/streams/kafka/connect/sink/Neo4jSinkConnector.kt @@ -1,10 +1,17 @@ package streams.kafka.connect.sink -import com.github.jcustenborder.kafka.connect.utils.config.* +import com.fasterxml.jackson.databind.ObjectMapper +import com.github.jcustenborder.kafka.connect.utils.config.Description +import com.github.jcustenborder.kafka.connect.utils.config.DocumentationNote +import com.github.jcustenborder.kafka.connect.utils.config.DocumentationTip +import com.github.jcustenborder.kafka.connect.utils.config.TaskConfigs +import com.github.jcustenborder.kafka.connect.utils.config.Title import org.apache.kafka.common.config.ConfigDef import org.apache.kafka.connect.connector.Task import org.apache.kafka.connect.sink.SinkConnector +import org.slf4j.Logger import org.slf4j.LoggerFactory +import streams.kafka.connect.common.ConfigurationMigrator import streams.kafka.connect.utils.PropertiesUtil @Title("Neo4j Sink Connector") @@ -12,8 +19,10 @@ import streams.kafka.connect.utils.PropertiesUtil @DocumentationTip("If you need to control the size of transaction that is submitted to Neo4j you try adjusting the ``consumer.max.poll.records`` setting in the worker.properties for Kafka Connect.") @DocumentationNote("For each topic you can provide a Cypher Template by using the following syntax ``neo4j.topic.cypher.=``") class Neo4jSinkConnector: SinkConnector() { + private val log: Logger = LoggerFactory.getLogger(Neo4jSinkConnector::class.java) private lateinit var settings: Map private lateinit var config: Neo4jSinkConnectorConfig + override fun taskConfigs(maxTasks: Int): MutableList> { return TaskConfigs.multiple(settings, maxTasks) } @@ -23,7 +32,12 @@ class Neo4jSinkConnector: SinkConnector() { config = Neo4jSinkConnectorConfig(settings) } - override fun stop() {} + override fun stop() { + val migratedConfig = ConfigurationMigrator(settings).migrateToV51() + val mapper = ObjectMapper() + val jsonConfig = mapper.writeValueAsString(migratedConfig) + log.info("Migrated Sink configuration to v5.1 connector format: {}", jsonConfig) + } override fun version(): String { return PropertiesUtil.getVersion() diff --git a/kafka-connect-neo4j/src/main/kotlin/streams/kafka/connect/source/Neo4jSourceConnector.kt b/kafka-connect-neo4j/src/main/kotlin/streams/kafka/connect/source/Neo4jSourceConnector.kt index 4bf7dbd2..8b5fbd01 100644 --- a/kafka-connect-neo4j/src/main/kotlin/streams/kafka/connect/source/Neo4jSourceConnector.kt +++ b/kafka-connect-neo4j/src/main/kotlin/streams/kafka/connect/source/Neo4jSourceConnector.kt @@ -1,10 +1,7 @@ package streams.kafka.connect.source -import com.github.jcustenborder.kafka.connect.utils.config.* -import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.GlobalScope -import kotlinx.coroutines.Job -import kotlinx.coroutines.launch +import com.github.jcustenborder.kafka.connect.utils.config.Description +import com.github.jcustenborder.kafka.connect.utils.config.Title import org.apache.kafka.common.config.ConfigDef import org.apache.kafka.connect.connector.Task import org.apache.kafka.connect.source.SourceConnector diff --git a/kafka-connect-neo4j/src/main/kotlin/streams/kafka/connect/source/Neo4jSourceService.kt b/kafka-connect-neo4j/src/main/kotlin/streams/kafka/connect/source/Neo4jSourceService.kt index be33ad6c..b3b6035d 100644 --- a/kafka-connect-neo4j/src/main/kotlin/streams/kafka/connect/source/Neo4jSourceService.kt +++ b/kafka-connect-neo4j/src/main/kotlin/streams/kafka/connect/source/Neo4jSourceService.kt @@ -1,5 +1,6 @@ package streams.kafka.connect.source +import com.fasterxml.jackson.databind.ObjectMapper import kotlinx.coroutines.CancellationException import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.GlobalScope @@ -16,6 +17,7 @@ import org.neo4j.driver.Record import org.neo4j.driver.Values import org.slf4j.Logger import org.slf4j.LoggerFactory +import streams.kafka.connect.common.ConfigurationMigrator import streams.utils.StreamsUtils import java.util.concurrent.BlockingQueue import java.util.concurrent.LinkedBlockingQueue @@ -172,6 +174,17 @@ class Neo4jSourceService(private val config: Neo4jSourceConnectorConfig, offsetS StreamsUtils.closeSafetely(driver) { log.info("Error while closing Driver instance:", it) } + + val migratedConfig = ConfigurationMigrator(config.originals() as Map).migrateToV51().toMutableMap() + + log.debug("Defaulting v5.1 migrated configuration offset to last checked timestamp: {}", lastCheck) + migratedConfig["neo4j.start-from"] = "USER_PROVIDED" + migratedConfig["neo4j.start-from.value"] = lastCheck + + val mapper = ObjectMapper() + val jsonConfig = mapper.writeValueAsString(migratedConfig) + log.info("Migrated Source configuration to v5.1 connector format: {}", jsonConfig) + log.info("Neo4j Source Service closed successfully") } } \ No newline at end of file diff --git a/kafka-connect-neo4j/src/test/kotlin/streams/kafka/connect/common/ConfigurationMigratorTest.kt b/kafka-connect-neo4j/src/test/kotlin/streams/kafka/connect/common/ConfigurationMigratorTest.kt new file mode 100644 index 00000000..d96909e7 --- /dev/null +++ b/kafka-connect-neo4j/src/test/kotlin/streams/kafka/connect/common/ConfigurationMigratorTest.kt @@ -0,0 +1,182 @@ +package streams.kafka.connect.common + +import com.fasterxml.jackson.core.type.TypeReference +import com.fasterxml.jackson.databind.ObjectMapper +import org.junit.Assert.assertEquals +import org.junit.Test +import streams.kafka.connect.source.SourceType +import java.io.File + + +class ConfigurationMigratorTest { + + @Test + fun `should migrate keys to new configuration`() { + // Given a configuration containing normal keys + val originals = + mapOf( + "neo4j.topic.pattern.merge.node.properties.enabled" to "true", + "neo4j.server.uri" to "neo4j+s://x.x.x.x", + "neo4j.retry.max.attemps" to "1" + ) + + // When the configuration is migrated + val migratedConfig = ConfigurationMigrator(originals).migrateToV51() + + // Then the keys are updated to new key format containing the original value + assertEquals(originals.size, migratedConfig.size) + assertEquals(migratedConfig["neo4j.pattern.node.merge-properties"], "true") + assertEquals(migratedConfig["neo4j.uri"], "neo4j+s://x.x.x.x") + assertEquals(migratedConfig["neo4j.max-retry-attempts"], "1") + } + + @Test fun `should not migrate keys with no matching configuration key`() { + // Given a configuration which has no equivalent in the updated connector + val originals = mapOf( + "neo4j.encryption.ca.certificate.path" to "./cert.pem", + "neo4j.source.type" to SourceType.QUERY.toString(), + "neo4j.enforce.schema" to "true" + ) + + // When the configuration is migrated + val migratedConfig = ConfigurationMigrator(originals).migrateToV51() + + // Then the existing key is not outputted + assertEquals("Migrated configuration should be empty", 0, migratedConfig.size) + } + + @Test + fun `should migrate time-based keys to new configuration format`() { + // Given a configuration originally defined in milliseconds + val originals = mapOf( + "neo4j.retry.backoff.msecs" to "1200", + "neo4j.connection.max.lifetime.msecs" to "1000", + "neo4j.batch.timeout.msecs" to "500", + "neo4j.streaming.poll.interval.msecs" to "800" + ) + + // When the configuration is migrated + val migratedConfig = ConfigurationMigrator(originals).migrateToV51() + + // Then the new configuration should be labelled with its units + assertEquals(originals.size, migratedConfig.size) + assertEquals(migratedConfig["neo4j.max-retry-time"], "1200ms") + assertEquals(migratedConfig["neo4j.connection-timeout"], "1000ms") + assertEquals(migratedConfig["neo4j.batch-timeout"], "500ms") + assertEquals(migratedConfig["neo4j.query.poll-interval"], "800ms") + } + + @Test + fun `should migrate prefix based keys to new configuration`() { + // Given a configuration containing prefix/user-defined keys + val originals = + mapOf( + "neo4j.topic.cypher.foo" to "CREATE (p:Person{name: event.firstName})", + "neo4j.topic.pattern.node.bar" to "(:Bar{!barId,barName})" + ) + + // When the configuration is migrated + val migratedConfig = ConfigurationMigrator(originals).migrateToV51() + + // Then the keys are updated to new values still containing the user-defined key part + assertEquals(originals.size, migratedConfig.size) + assertEquals( + migratedConfig["neo4j.cypher.topic.foo"], + "CREATE (p:Person{name: event.firstName})", + ) + assertEquals( + migratedConfig["neo4j.pattern.node.topic.bar"], + "(:Bar{!barId,barName})", + ) + } + + @Test + fun `should migrate across unknown configuration options`() { + // Given a configuration with non-defined configuration options + val originals = mapOf( + "connector.class" to "streams.kafka.connect.source.Neo4jSourceConnector", + "key.converter" to "io.confluent.connect.avro.AvroConverter", + "arbitrary.config.key" to "arbitrary.value" + ) + + // When the configuration is migrated + val migratedConfig = ConfigurationMigrator(originals).migrateToV51() + + // Then those options should still be included + assertEquals(originals.size, migratedConfig.size) + assertEquals(migratedConfig["connector.class"], "streams.kafka.connect.source.Neo4jSourceConnector") + assertEquals(migratedConfig["key.converter"], "io.confluent.connect.avro.AvroConverter") + assertEquals(migratedConfig["arbitrary.config.key"], "arbitrary.value") + } + + @Test + fun `should migrate keys from full source quickstart configuration example`() { + // Given the configuration from the quickstart example + val quickstartSettings = loadConfiguration("src/test/resources/exampleConfigs/sourceExample.json") + + // When the configuration is migrated + val migratedConfig = ConfigurationMigrator(quickstartSettings).migrateToV51() + + // Then the keys are updated correctly + assertEquals(12, migratedConfig.size) + + assertEquals(migratedConfig["neo4j.query.topic"], "my-topic") + assertEquals(migratedConfig["connector.class"], "streams.kafka.connect.source.Neo4jSourceConnector") + assertEquals(migratedConfig["key.converter"], "io.confluent.connect.avro.AvroConverter") + assertEquals(migratedConfig["key.converter.schema.registry.url"], "http://schema-registry:8081") + assertEquals(migratedConfig["value.converter"], "io.confluent.connect.avro.AvroConverter") + assertEquals(migratedConfig["value.converter.schema.registry.url"], "http://schema-registry:8081") + assertEquals(migratedConfig["neo4j.uri"], "bolt://neo4j:7687") + assertEquals(migratedConfig["neo4j.authentication.basic.username"], "neo4j") + assertEquals(migratedConfig["neo4j.authentication.basic.password"], "password") + assertEquals(migratedConfig["neo4j.query.poll-interval"], "5000ms") + assertEquals(migratedConfig["neo4j.query.streaming-property"], "timestamp") + assertEquals( + migratedConfig["neo4j.query"], + "MATCH (ts:TestSource) WHERE ts.timestamp > \$lastCheck RETURN ts.name AS name, ts.surname AS surname, ts.timestamp AS timestamp" + ) + + assertEquals(migratedConfig["neo4j.enforce.schema"], null) + assertEquals(migratedConfig["neo4j.streaming.from"], null) + } + + @Test + fun `should migrate keys from full sink quickstart configuration example`() { + // Given the configuration from the quickstart example + val quickstartSettings = loadConfiguration("src/test/resources/exampleConfigs/sinkExample.json") + + // When the configuration is migrated + val migratedConfig = ConfigurationMigrator(quickstartSettings).migrateToV51() + + // Then the keys are updated correctly + assertEquals(15, migratedConfig.size) + + assertEquals(migratedConfig["topics"], "my-topic") + assertEquals(migratedConfig["connector.class"], "streams.kafka.connect.sink.Neo4jSinkConnector") + assertEquals(migratedConfig["key.converter"], "org.apache.kafka.connect.json.JsonConverter") + assertEquals(migratedConfig["key.converter.schemas.enable"], "false") + assertEquals(migratedConfig["value.converter"], "org.apache.kafka.connect.json.JsonConverter") + assertEquals(migratedConfig["value.converter.schemas.enable"], "false") + assertEquals(migratedConfig["errors.retry.timeout"], "-1") + assertEquals(migratedConfig["errors.retry.delay.max.ms"], "1000") + assertEquals(migratedConfig["errors.tolerance"], "all") + assertEquals(migratedConfig["errors.log.enable"], "true") + assertEquals(migratedConfig["errors.log.include.messages"], "true") + assertEquals(migratedConfig["neo4j.uri"], "bolt://neo4j:7687") + assertEquals(migratedConfig["neo4j.authentication.basic.username"], "neo4j") + assertEquals(migratedConfig["neo4j.authentication.basic.password"], "password") + assertEquals(migratedConfig["neo4j.cypher.topic.my-topic"], "MERGE (p:Person{name: event.name, surname: event.surname}) MERGE (f:Family{name: event.surname}) MERGE (p)-[:BELONGS_TO]->(f)") + } + + private fun loadConfiguration(path: String): Map { + val file = File(path) + val json = file.readText() + + val mapper = ObjectMapper() + val node = mapper.readTree(json) + val config = node.get("config") + val result: Map? = mapper.convertValue(config, object : TypeReference?>() {}) + return result as Map + } + +} diff --git a/kafka-connect-neo4j/src/test/resources/exampleConfigs/sinkExample.json b/kafka-connect-neo4j/src/test/resources/exampleConfigs/sinkExample.json new file mode 100644 index 00000000..1fc0bf72 --- /dev/null +++ b/kafka-connect-neo4j/src/test/resources/exampleConfigs/sinkExample.json @@ -0,0 +1,20 @@ +{ + "name": "Neo4jSinkConnectorJSONString", + "config": { + "topics": "my-topic", + "connector.class": "streams.kafka.connect.sink.Neo4jSinkConnector", + "key.converter": "org.apache.kafka.connect.json.JsonConverter", + "key.converter.schemas.enable": false, + "value.converter": "org.apache.kafka.connect.json.JsonConverter", + "value.converter.schemas.enable": false, + "errors.retry.timeout": "-1", + "errors.retry.delay.max.ms": "1000", + "errors.tolerance": "all", + "errors.log.enable": true, + "errors.log.include.messages": true, + "neo4j.server.uri": "bolt://neo4j:7687", + "neo4j.authentication.basic.username": "neo4j", + "neo4j.authentication.basic.password": "password", + "neo4j.topic.cypher.my-topic": "MERGE (p:Person{name: event.name, surname: event.surname}) MERGE (f:Family{name: event.surname}) MERGE (p)-[:BELONGS_TO]->(f)" + } +} \ No newline at end of file diff --git a/kafka-connect-neo4j/src/test/resources/exampleConfigs/sourceExample.json b/kafka-connect-neo4j/src/test/resources/exampleConfigs/sourceExample.json new file mode 100644 index 00000000..4ab339a7 --- /dev/null +++ b/kafka-connect-neo4j/src/test/resources/exampleConfigs/sourceExample.json @@ -0,0 +1,19 @@ +{ + "name": "Neo4jSourceConnectorAVRO", + "config": { + "topic": "my-topic", + "connector.class": "streams.kafka.connect.source.Neo4jSourceConnector", + "key.converter": "io.confluent.connect.avro.AvroConverter", + "key.converter.schema.registry.url": "http://schema-registry:8081", + "value.converter": "io.confluent.connect.avro.AvroConverter", + "value.converter.schema.registry.url": "http://schema-registry:8081", + "neo4j.server.uri": "bolt://neo4j:7687", + "neo4j.authentication.basic.username": "neo4j", + "neo4j.authentication.basic.password": "password", + "neo4j.streaming.poll.interval.msecs": 5000, + "neo4j.streaming.property": "timestamp", + "neo4j.streaming.from": "LAST_COMMITTED", + "neo4j.enforce.schema": true, + "neo4j.source.query": "MATCH (ts:TestSource) WHERE ts.timestamp > $lastCheck RETURN ts.name AS name, ts.surname AS surname, ts.timestamp AS timestamp" + } +} \ No newline at end of file