diff --git a/.gitignore b/.gitignore index 596c6c59..49761212 100644 --- a/.gitignore +++ b/.gitignore @@ -31,3 +31,4 @@ Thumbs.db bin doc/node doc/node_modules +*/docker/plugins/* diff --git a/kafka-connect-neo4j/src/main/kotlin/streams/kafka/connect/common/Neo4jConnectorConfig.kt b/kafka-connect-neo4j/src/main/kotlin/streams/kafka/connect/common/Neo4jConnectorConfig.kt index 6a759ec7..b804cdbf 100644 --- a/kafka-connect-neo4j/src/main/kotlin/streams/kafka/connect/common/Neo4jConnectorConfig.kt +++ b/kafka-connect-neo4j/src/main/kotlin/streams/kafka/connect/common/Neo4jConnectorConfig.kt @@ -76,7 +76,11 @@ open class Neo4jConnectorConfig(configDef: ConfigDef, authenticationPassword = getPassword(AUTHENTICATION_BASIC_PASSWORD).value() authenticationKerberosTicket = getPassword(AUTHENTICATION_KERBEROS_TICKET).value() - serverUri = getString(SERVER_URI).split(",").map { URI(it) } + serverUri = getString(SERVER_URI).split(",").map { + val uri = URI(it) + if (uri.port == -1) URI("$it:7687") else uri + } + connectionLivenessCheckTimeout = getLong(CONNECTION_LIVENESS_CHECK_TIMEOUT_MSECS) connectionMaxConnectionLifetime = getLong(CONNECTION_MAX_CONNECTION_LIFETIME_MSECS) connectionPoolMaxSize = getInt(CONNECTION_POOL_MAX_SIZE) diff --git a/kafka-connect-neo4j/src/test/kotlin/streams/kafka/connect/sink/Neo4jSinkConnectorConfigTest.kt b/kafka-connect-neo4j/src/test/kotlin/streams/kafka/connect/sink/Neo4jSinkConnectorConfigTest.kt index 4d22d3a6..d056e101 100644 --- a/kafka-connect-neo4j/src/test/kotlin/streams/kafka/connect/sink/Neo4jSinkConnectorConfigTest.kt +++ b/kafka-connect-neo4j/src/test/kotlin/streams/kafka/connect/sink/Neo4jSinkConnectorConfigTest.kt @@ -95,6 +95,20 @@ class Neo4jSinkConnectorConfigTest { assertEquals(c, config.serverUri[2].toString()) } + @Test + fun `should return URIs with default port if port does not exist`() { + val a = "bolt://neo4j.com" + val b = "bolt://neo4j2.com" + + val originals = mapOf(SinkConnector.TOPICS_CONFIG to "foo", + "${Neo4jSinkConnectorConfig.TOPIC_CYPHER_PREFIX}foo" to "CREATE (p:Person{name: event.firstName})", + Neo4jConnectorConfig.SERVER_URI to "$a,$b") + val config = Neo4jSinkConnectorConfig(originals) + + assertEquals("$a:7687", config.serverUri[0].toString()) + assertEquals("$b:7687", config.serverUri[1].toString()) + } + @Test fun `should return the configuration with shuffled topic order`() { val originals = mapOf(SinkConnector.TOPICS_CONFIG to "bar,foo", diff --git a/kafka-connect-neo4j/src/test/kotlin/streams/kafka/connect/source/Neo4jSourceConnectorConfigTest.kt b/kafka-connect-neo4j/src/test/kotlin/streams/kafka/connect/source/Neo4jSourceConnectorConfigTest.kt index 570cb184..8101f0df 100644 --- a/kafka-connect-neo4j/src/test/kotlin/streams/kafka/connect/source/Neo4jSourceConnectorConfigTest.kt +++ b/kafka-connect-neo4j/src/test/kotlin/streams/kafka/connect/source/Neo4jSourceConnectorConfigTest.kt @@ -2,8 +2,8 @@ package streams.kafka.connect.source import org.apache.kafka.common.config.ConfigException import org.junit.Test +import streams.kafka.connect.common.Neo4jConnectorConfig import kotlin.test.assertEquals -import kotlin.test.assertNull class Neo4jSourceConnectorConfigTest { @@ -61,4 +61,21 @@ class Neo4jSourceConnectorConfigTest { val config = Neo4jSourceConnectorConfig(originals) assertEquals("", config.streamingProperty) } + + @Test + fun `should return URIs with default port if port does not exist`() { + val a = "bolt://neo4j.com" + val b = "bolt://neo4j2.com" + + val originals = mapOf(Neo4jSourceConnectorConfig.SOURCE_TYPE to SourceType.QUERY.toString(), + Neo4jSourceConnectorConfig.SOURCE_TYPE_QUERY to "MATCH (n) RETURN n", + Neo4jSourceConnectorConfig.TOPIC to "topic", + Neo4jSourceConnectorConfig.STREAMING_POLL_INTERVAL to "10", + Neo4jSourceConnectorConfig.STREAMING_FROM to StreamingFrom.NOW.toString(), + Neo4jConnectorConfig.SERVER_URI to "$a,$b") + val config = Neo4jSourceConnectorConfig(originals) + + assertEquals("$a:7687", config.serverUri[0].toString()) + assertEquals("$b:7687", config.serverUri[1].toString()) + } } \ No newline at end of file