diff --git a/common/pom.xml b/common/pom.xml
index ce28b3f8..6c605547 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -34,6 +34,12 @@
kafka-clients
provided
+
+
+ org.neo4j
+ neo4j-configuration
+ provided
+
diff --git a/common/src/main/kotlin/streams/events/StreamsEvent.kt b/common/src/main/kotlin/streams/events/StreamsEvent.kt
index 7b0e7f8e..6fd2deda 100644
--- a/common/src/main/kotlin/streams/events/StreamsEvent.kt
+++ b/common/src/main/kotlin/streams/events/StreamsEvent.kt
@@ -1,7 +1,5 @@
package streams.events
-import org.neo4j.graphdb.schema.ConstraintType
-
enum class OperationType { created, updated, deleted }
data class Meta(val timestamp: Long,
diff --git a/common/src/main/kotlin/streams/utils/JSONUtils.kt b/common/src/main/kotlin/streams/utils/JSONUtils.kt
index d6c1c41c..283314b5 100644
--- a/common/src/main/kotlin/streams/utils/JSONUtils.kt
+++ b/common/src/main/kotlin/streams/utils/JSONUtils.kt
@@ -2,16 +2,38 @@ package streams.utils
import com.fasterxml.jackson.core.JsonGenerator
import com.fasterxml.jackson.core.JsonParseException
+import com.fasterxml.jackson.core.JsonParser
import com.fasterxml.jackson.core.JsonProcessingException
-import com.fasterxml.jackson.databind.*
+import com.fasterxml.jackson.databind.DeserializationContext
+import com.fasterxml.jackson.databind.DeserializationFeature
+import com.fasterxml.jackson.databind.JsonDeserializer
+import com.fasterxml.jackson.databind.JsonNode
+import com.fasterxml.jackson.databind.JsonSerializer
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.fasterxml.jackson.databind.SerializationFeature
+import com.fasterxml.jackson.databind.SerializerProvider
import com.fasterxml.jackson.databind.module.SimpleModule
import com.fasterxml.jackson.module.kotlin.convertValue
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import com.fasterxml.jackson.module.kotlin.readValue
import org.neo4j.driver.internal.value.PointValue
+import org.neo4j.function.ThrowingBiConsumer
import org.neo4j.graphdb.spatial.Point
+import org.neo4j.values.AnyValue
import org.neo4j.values.storable.CoordinateReferenceSystem
-import streams.events.*
+import org.neo4j.values.storable.Values
+import org.neo4j.values.virtual.MapValue
+import org.neo4j.values.virtual.MapValueBuilder
+import streams.events.EntityType
+import streams.events.Meta
+import streams.events.NodePayload
+import streams.events.Payload
+import streams.events.RecordChange
+import streams.events.RelationshipPayload
+import streams.events.Schema
+import streams.events.StreamsTransactionEvent
+import streams.events.StreamsTransactionNodeEvent
+import streams.events.StreamsTransactionRelationshipEvent
import streams.extensions.asStreamsMap
import java.io.IOException
import java.time.temporal.TemporalAccessor
@@ -33,6 +55,13 @@ fun Point.toStreamsPoint(): StreamsPoint {
}
}
+fun Map.toMapValue(): MapValue {
+ val map = this
+ val builder = MapValueBuilder()
+ map.forEach { (t, u) -> builder.add(t, Values.of(u)) }
+ return builder.build()
+}
+
fun PointValue.toStreamsPoint(): StreamsPoint {
val point = this.asPoint()
return when (val crsType = point.srid()) {
@@ -121,6 +150,100 @@ class DriverRelationshipSerializer : JsonSerializer() {
+ override fun createEvent(meta: Meta, payload: RelationshipPayload, schema: Schema): StreamsTransactionRelationshipEvent {
+ return StreamsTransactionRelationshipEvent(meta, payload, schema)
+ }
+
+ override fun convertPayload(payloadMap: JsonNode): RelationshipPayload {
+ return JSONUtils.convertValue(payloadMap)
+ }
+
+ override fun fillPayload(payload: RelationshipPayload,
+ beforeProps: Map?,
+ afterProps: Map?): RelationshipPayload {
+ return payload.copy(
+ before = payload.before?.copy(properties = beforeProps),
+ after = payload.after?.copy(properties = afterProps)
+ )
+ }
+
+ override fun deserialize(parser: JsonParser, context: DeserializationContext): StreamsTransactionRelationshipEvent {
+ val deserialized = super.deserialize(parser, context)
+ if (deserialized.payload.type == EntityType.node) {
+ throw IllegalArgumentException("Relationship event expected, but node type found")
+ }
+ return deserialized
+ }
+
+}
+
+class StreamsTransactionNodeEventDeserializer : StreamsTransactionEventDeserializer() {
+ override fun createEvent(meta: Meta, payload: NodePayload, schema: Schema): StreamsTransactionNodeEvent {
+ return StreamsTransactionNodeEvent(meta, payload, schema)
+ }
+
+ override fun convertPayload(payloadMap: JsonNode): NodePayload {
+ return JSONUtils.convertValue(payloadMap)
+ }
+
+ override fun fillPayload(payload: NodePayload,
+ beforeProps: Map?,
+ afterProps: Map?): NodePayload {
+ return payload.copy(
+ before = payload.before?.copy(properties = beforeProps),
+ after = payload.after?.copy(properties = afterProps)
+ )
+ }
+
+ override fun deserialize(parser: JsonParser, context: DeserializationContext): StreamsTransactionNodeEvent {
+ val deserialized = super.deserialize(parser, context)
+ if (deserialized.payload.type == EntityType.relationship) {
+ throw IllegalArgumentException("Node event expected, but relationship type found")
+ }
+ return deserialized
+ }
+
+}
+
+abstract class StreamsTransactionEventDeserializer : JsonDeserializer() {
+
+ abstract fun createEvent(meta: Meta, payload: PAYLOAD, schema: Schema): EVENT
+ abstract fun convertPayload(payloadMap: JsonNode): PAYLOAD
+ abstract fun fillPayload(payload: PAYLOAD,
+ beforeProps: Map?,
+ afterProps: Map?): PAYLOAD
+
+ @Throws(IOException::class, JsonProcessingException::class)
+ override fun deserialize(parser: JsonParser, context: DeserializationContext): EVENT {
+ val root: JsonNode = parser.codec.readTree(parser)
+ val meta = JSONUtils.convertValue(root["meta"])
+ val schema = JSONUtils.convertValue(root["schema"])
+ val points = schema.properties.filterValues { it == "PointValue" }.keys
+ var payload = convertPayload(root["payload"])
+ if (points.isNotEmpty()) {
+ val beforeProps = convertPoints(payload.before, points)
+ val afterProps = convertPoints(payload.after, points)
+ payload = fillPayload(payload, beforeProps, afterProps)
+ }
+ return createEvent(meta, payload, schema)
+ }
+
+ private fun convertPoints(
+ recordChange: RecordChange?,
+ points: Set
+ ) = recordChange
+ ?.properties
+ ?.mapValues {
+ if (points.contains(it.key)) {
+ org.neo4j.values.storable.PointValue.fromMap((it.value as Map).toMapValue())
+ } else {
+ it.value
+ }
+ }
+
+}
+
object JSONUtils {
private val OBJECT_MAPPER: ObjectMapper = jacksonObjectMapper()
@@ -133,6 +256,8 @@ object JSONUtils {
StreamsUtils.ignoreExceptions({ module.addSerializer(org.neo4j.driver.types.Point::class.java, DriverPointSerializer()) }, NoClassDefFoundError::class.java) // in case is loaded from
StreamsUtils.ignoreExceptions({ module.addSerializer(org.neo4j.driver.types.Node::class.java, DriverNodeSerializer()) }, NoClassDefFoundError::class.java) // in case is loaded from
StreamsUtils.ignoreExceptions({ module.addSerializer(org.neo4j.driver.types.Relationship::class.java, DriverRelationshipSerializer()) }, NoClassDefFoundError::class.java) // in case is loaded from
+ StreamsUtils.ignoreExceptions({ module.addDeserializer(StreamsTransactionRelationshipEvent::class.java, StreamsTransactionRelationshipEventDeserializer()) }, NoClassDefFoundError::class.java) // in case is loaded from
+ StreamsUtils.ignoreExceptions({ module.addDeserializer(StreamsTransactionNodeEvent::class.java, StreamsTransactionNodeEventDeserializer()) }, NoClassDefFoundError::class.java) // in case is loaded from
module.addSerializer(TemporalAccessor::class.java, TemporalAccessorSerializer())
OBJECT_MAPPER.registerModule(module)
OBJECT_MAPPER.disable(SerializationFeature.FAIL_ON_EMPTY_BEANS)
diff --git a/common/src/main/kotlin/streams/utils/ProcedureUtils.kt b/common/src/main/kotlin/streams/utils/ProcedureUtils.kt
index 33477f90..63b96be0 100644
--- a/common/src/main/kotlin/streams/utils/ProcedureUtils.kt
+++ b/common/src/main/kotlin/streams/utils/ProcedureUtils.kt
@@ -1,7 +1,5 @@
package streams.utils
-import org.neo4j.configuration.Config
-import org.neo4j.configuration.GraphDatabaseSettings
import org.neo4j.dbms.api.DatabaseManagementService
import org.neo4j.exceptions.UnsatisfiedDependencyException
import org.neo4j.kernel.impl.factory.DbmsInfo
diff --git a/consumer/src/test/kotlin/integrations/kafka/KafkaEventSinkCDCTSE.kt b/consumer/src/test/kotlin/integrations/kafka/KafkaEventSinkCDCTSE.kt
index c69697b5..e6465e7b 100644
--- a/consumer/src/test/kotlin/integrations/kafka/KafkaEventSinkCDCTSE.kt
+++ b/consumer/src/test/kotlin/integrations/kafka/KafkaEventSinkCDCTSE.kt
@@ -175,6 +175,156 @@ class KafkaEventSinkCDCTSE: KafkaEventSinkBaseTSE() {
}, Matchers.equalTo(true), 30, TimeUnit.SECONDS)
}
+ @Test
+ fun shouldWriteDataFromSinkWithCDCSchemaTopicAndPointValue() = runBlocking {
+ val topic = UUID.randomUUID().toString()
+ db.setConfig("streams.sink.topic.cdc.schema", topic)
+ db.start()
+
+ val constraints = listOf(Constraint(label = "User", type = StreamsConstraintType.UNIQUE, properties = setOf("name", "surname")))
+ val nodeSchema = Schema(
+ properties = mapOf(
+ "name" to "String",
+ "surname" to "String",
+ "comp@ny" to "String",
+ "bornIn2d" to "PointValue",
+ "bornIn3d" to "PointValue",
+ "livesIn2d" to "PointValue",
+ "livesIn3d" to "PointValue",
+ "worksIn2d" to "PointValue",
+ "worksIn3d" to "PointValue"
+ ),
+ constraints = constraints)
+ val relSchema = Schema(properties = mapOf(
+ "since" to "Long",
+ "where" to "PointValue"
+ ), constraints = constraints)
+ val cdcDataStart = StreamsTransactionEvent(
+ meta = Meta(timestamp = System.currentTimeMillis(),
+ username = "user",
+ txId = 1,
+ txEventId = 0,
+ txEventsCount = 3,
+ operation = OperationType.created
+ ),
+ payload = NodePayload(
+ id = "0",
+ before = null,
+ after = NodeChange(
+ properties = mapOf(
+ "name" to "Andrea",
+ "surname" to "Santurbano",
+ "comp@ny" to "LARUS-BA",
+ "bornIn3d" to mapOf(
+ "crs" to "wgs-84-3d",
+ "latitude" to 12.78,
+ "longitude" to 56.7,
+ "height" to 100.0,
+ ),
+ "bornIn2d" to mapOf(
+ "crs" to "wgs-84",
+ "latitude" to 12.78,
+ "longitude" to 56.7
+ ),
+ "livesIn3d" to mapOf(
+ "crs" to "wgs-84-3d",
+ "latitude" to 12.79,
+ "longitude" to 56.71,
+ "height" to 100.0,
+ ),
+ "livesIn2d" to mapOf(
+ "crs" to "wgs-84",
+ "latitude" to 12.79,
+ "longitude" to 56.71
+ ),
+ "worksIn2d" to mapOf(
+ "crs" to "cartesian",
+ "x" to 1.2,
+ "y" to 10.1
+ ),
+ "worksIn3d" to mapOf(
+ "crs" to "cartesian-3d",
+ "x" to 1.2,
+ "y" to 10.1,
+ "z" to 7.1
+ )
+ ),
+ labels = listOf("User")
+ )
+ ),
+ schema = nodeSchema
+ )
+ val cdcDataEnd = StreamsTransactionEvent(
+ meta = Meta(timestamp = System.currentTimeMillis(),
+ username = "user",
+ txId = 1,
+ txEventId = 1,
+ txEventsCount = 3,
+ operation = OperationType.created
+ ),
+ payload = NodePayload(id = "1",
+ before = null,
+ after = NodeChange(properties = mapOf("name" to "Michael", "surname" to "Hunger", "comp@ny" to "Neo4j"), labels = listOf("User"))
+ ),
+ schema = nodeSchema
+ )
+ val cdcDataRelationship = StreamsTransactionEvent(
+ meta = Meta(timestamp = System.currentTimeMillis(),
+ username = "user",
+ txId = 1,
+ txEventId = 2,
+ txEventsCount = 3,
+ operation = OperationType.created
+ ),
+ payload = RelationshipPayload(
+ id = "2",
+ start = RelationshipNodeChange(id = "0", labels = listOf("User"), ids = mapOf("name" to "Andrea", "surname" to "Santurbano")),
+ end = RelationshipNodeChange(id = "1", labels = listOf("User"), ids = mapOf("name" to "Michael", "surname" to "Hunger")),
+ after = RelationshipChange(properties = mapOf(
+ "since" to 2014,
+ "where" to mapOf(
+ "crs" to "wgs-84-3d",
+ "latitude" to 12.78,
+ "longitude" to 56.7,
+ "height" to 80.0,
+ )
+ )),
+ before = null,
+ label = "MEET"
+ ),
+ schema = relSchema
+ )
+ var producerRecord = ProducerRecord(topic, UUID.randomUUID().toString(), JSONUtils.writeValueAsBytes(cdcDataStart))
+ kafkaProducer.send(producerRecord).get()
+ producerRecord = ProducerRecord(topic, UUID.randomUUID().toString(), JSONUtils.writeValueAsBytes(cdcDataEnd))
+ kafkaProducer.send(producerRecord).get()
+ producerRecord = ProducerRecord(topic, UUID.randomUUID().toString(), JSONUtils.writeValueAsBytes(cdcDataRelationship))
+ kafkaProducer.send(producerRecord).get()
+
+ Assert.assertEventually(ThrowingSupplier {
+ val query = """
+ |MATCH (s:User{
+ | name:'Andrea',
+ | surname:'Santurbano',
+ | `comp@ny`:'LARUS-BA',
+ | bornIn3d: point({x: 56.7, y: 12.78, z: 100.0, crs: 'wgs-84-3d'}),
+ | bornIn2d: point({x: 56.7, y: 12.78, crs: 'wgs-84'}),
+ | livesIn3d: point({longitude: 56.71, latitude: 12.79, height: 100}),
+ | livesIn2d: point({longitude: 56.71, latitude: 12.79}),
+ | worksIn2d: point({x: 1.2, y: 10.1, crs: 'cartesian'}),
+ | worksIn3d: point({x: 1.2, y: 10.1, z: 7.1, crs: 'cartesian-3d'})
+ |})
+ |MATCH (t:User{name:'Michael', surname:'Hunger', `comp@ny`:'Neo4j'})
+ |MATCH p = (s)-[r:MEET{since: 2014, where: point({x: 56.7, y: 12.78, z: 80.0, crs: 'wgs-84-3d'})}]->(t)
+ |RETURN count(p) AS count
+ |""".trimMargin()
+ db.execute(query) {
+ val result = it.columnAs("count")
+ result.hasNext() && result.next() == 1L && !result.hasNext()
+ }
+ }, Matchers.equalTo(true), 30, TimeUnit.SECONDS)
+ }
+
@Test
fun writeDataFromSinkWithCDCSchemaTopicMultipleConstraintsAndLabels() = runBlocking {
val topic = UUID.randomUUID().toString()
diff --git a/kafka-connect-neo4j/README.md b/kafka-connect-neo4j/README.md
deleted file mode 100644
index 154945b3..00000000
--- a/kafka-connect-neo4j/README.md
+++ /dev/null
@@ -1,15 +0,0 @@
-# Introduction
-
-Welcome to your Kafka Connect Neo4j Connector!
-
-# Build it locally
-
-Build the project by running the following command:
-
- $ mvn clean install
-
-Inside the directory `/kafka-connect-neo4j/target/component/packages` you'll find a file named `neo4j-kafka-connect-neo4j-.zip`
-
-# Run with docker
-
-Please refer to this file [readme.adoc](doc/readme.adoc)
diff --git a/kafka-connect-neo4j/assets/neo4j-logo.png b/kafka-connect-neo4j/assets/neo4j-logo.png
deleted file mode 100644
index 9b568bce..00000000
Binary files a/kafka-connect-neo4j/assets/neo4j-logo.png and /dev/null differ
diff --git a/kafka-connect-neo4j/config/sink-quickstart.properties b/kafka-connect-neo4j/config/sink-quickstart.properties
deleted file mode 100644
index 07a5db9c..00000000
--- a/kafka-connect-neo4j/config/sink-quickstart.properties
+++ /dev/null
@@ -1,14 +0,0 @@
-# A simple configuration properties, is the same as contrib.sink.avro.neo4j.json
-name=Neo4jSinkConnector
-topics=my-topic
-connector.class=streams.kafka.connect.sink.Neo4jSinkConnector
-errors.retry.timeout=-1
-errors.retry.delay.max.ms=1000
-errors.tolerance=all
-errors.log.enable=true
-errors.log.include.messages=true
-neo4j.server.uri=bolt://neo4j:7687
-neo4j.authentication.basic.username=neo4j
-neo4j.authentication.basic.password=connect
-neo4j.encryption.enabled=false
-neo4j.topic.cypher.my-topic=MERGE (p:Person{name: event.name, surname: event.surname}) MERGE (f:Family{name: event.surname}) MERGE (p)-[:BELONGS_TO]->(f)
\ No newline at end of file
diff --git a/kafka-connect-neo4j/doc/contrib.sink.avro.neo4j.json b/kafka-connect-neo4j/doc/contrib.sink.avro.neo4j.json
deleted file mode 100644
index 9a9ffc89..00000000
--- a/kafka-connect-neo4j/doc/contrib.sink.avro.neo4j.json
+++ /dev/null
@@ -1,17 +0,0 @@
-{
- "name": "Neo4jSinkConnector",
- "config": {
- "topics": "my-topic",
- "connector.class": "streams.kafka.connect.sink.Neo4jSinkConnector",
- "errors.retry.timeout": "-1",
- "errors.retry.delay.max.ms": "1000",
- "errors.tolerance": "all",
- "errors.log.enable": true,
- "errors.log.include.messages": true,
- "neo4j.server.uri": "bolt://neo4j:7687",
- "neo4j.authentication.basic.username": "neo4j",
- "neo4j.authentication.basic.password": "connect",
- "neo4j.encryption.enabled": false,
- "neo4j.topic.cypher.my-topic": "MERGE (p:Person{name: event.name, surname: event.surname, from: 'AVRO'}) MERGE (f:Family{name: event.surname}) MERGE (p)-[:BELONGS_TO]->(f)"
- }
-}
diff --git a/kafka-connect-neo4j/doc/contrib.sink.string-json.neo4j.json b/kafka-connect-neo4j/doc/contrib.sink.string-json.neo4j.json
deleted file mode 100644
index d23e8c9d..00000000
--- a/kafka-connect-neo4j/doc/contrib.sink.string-json.neo4j.json
+++ /dev/null
@@ -1,20 +0,0 @@
-{
- "name": "Neo4jSinkConnectorJSON",
- "config": {
- "key.converter": "org.apache.kafka.connect.storage.StringConverter",
- "value.converter": "org.apache.kafka.connect.json.JsonConverter",
- "value.converter.schemas.enable": false,
- "topics": "my-topic",
- "connector.class": "streams.kafka.connect.sink.Neo4jSinkConnector",
- "errors.retry.timeout": "-1",
- "errors.retry.delay.max.ms": "1000",
- "errors.tolerance": "all",
- "errors.log.enable": true,
- "errors.log.include.messages": true,
- "neo4j.server.uri": "bolt://neo4j:7687",
- "neo4j.authentication.basic.username": "neo4j",
- "neo4j.authentication.basic.password": "connect",
- "neo4j.encryption.enabled": false,
- "neo4j.topic.cypher.my-topic": "MERGE (p:Person{name: event.name, surname: event.surname, from: 'JSON'}) MERGE (f:Family{name: event.surname}) MERGE (p)-[:BELONGS_TO]->(f)"
- }
-}
diff --git a/kafka-connect-neo4j/doc/contrib.source.avro.neo4j.json b/kafka-connect-neo4j/doc/contrib.source.avro.neo4j.json
deleted file mode 100644
index 280ae06a..00000000
--- a/kafka-connect-neo4j/doc/contrib.source.avro.neo4j.json
+++ /dev/null
@@ -1,18 +0,0 @@
-{
- "name": "Neo4jSourceConnector",
- "config": {
- "topic": "my-topic",
- "connector.class": "streams.kafka.connect.source.Neo4jSourceConnector",
- "key.converter": "io.confluent.connect.avro.AvroConverter",
- "value.converter": "io.confluent.connect.avro.AvroConverter",
- "neo4j.server.uri": "bolt://neo4j:7687",
- "neo4j.authentication.basic.username": "neo4j",
- "neo4j.authentication.basic.password": "connect",
- "neo4j.encryption.enabled": false,
- "neo4j.streaming.poll.interval.msecs": 5000,
- "neo4j.streaming.property": "timestamp",
- "neo4j.streaming.from": "LAST_COMMITTED",
- "neo4j.enforce.schema": true,
- "neo4j.source.query": "MATCH (ts:TestSource) WHERE ts.timestamp > $lastCheck RETURN ts.name AS name, ts.timestamp AS timestamp"
- }
-}
\ No newline at end of file
diff --git a/kafka-connect-neo4j/doc/contrib.source.string-json.neo4j.json b/kafka-connect-neo4j/doc/contrib.source.string-json.neo4j.json
deleted file mode 100644
index c9d8346c..00000000
--- a/kafka-connect-neo4j/doc/contrib.source.string-json.neo4j.json
+++ /dev/null
@@ -1,17 +0,0 @@
-{
- "name": "Neo4jSourceConnector",
- "config": {
- "topic": "my-topic",
- "connector.class": "streams.kafka.connect.source.Neo4jSourceConnector",
- "key.converter": "org.apache.kafka.connect.json.JsonConverter",
- "value.converter": "org.apache.kafka.connect.json.JsonConverter",
- "neo4j.server.uri": "bolt://neo4j:7687",
- "neo4j.authentication.basic.username": "neo4j",
- "neo4j.authentication.basic.password": "connect",
- "neo4j.encryption.enabled": false,
- "neo4j.streaming.poll.interval.msecs": 5000,
- "neo4j.streaming.property": "timestamp",
- "neo4j.streaming.from": "LAST_COMMITTED",
- "neo4j.source.query": "MATCH (ts:TestSource) WHERE ts.timestamp > $lastCheck RETURN ts.name AS name, ts.timestamp AS timestamp"
- }
-}
\ No newline at end of file
diff --git a/kafka-connect-neo4j/doc/contrib.source.string.neo4j.json b/kafka-connect-neo4j/doc/contrib.source.string.neo4j.json
deleted file mode 100644
index def80fd0..00000000
--- a/kafka-connect-neo4j/doc/contrib.source.string.neo4j.json
+++ /dev/null
@@ -1,17 +0,0 @@
-{
- "name": "Neo4jSourceConnectorString",
- "config": {
- "topic": "my-topic",
- "connector.class": "streams.kafka.connect.source.Neo4jSourceConnector",
- "key.converter": "org.apache.kafka.connect.storage.StringConverter",
- "value.converter": "org.apache.kafka.connect.storage.StringConverter",
- "neo4j.server.uri": "bolt://neo4j:7687",
- "neo4j.authentication.basic.username": "neo4j",
- "neo4j.authentication.basic.password": "connect",
- "neo4j.encryption.enabled": false,
- "neo4j.streaming.poll.interval.msecs": 5000,
- "neo4j.streaming.property": "timestamp",
- "neo4j.streaming.from": "LAST_COMMITTED",
- "neo4j.source.query": "MATCH (ts:TestSource) WHERE ts.timestamp > $lastCheck RETURN ts.name AS name, ts.timestamp AS timestamp"
- }
-}
\ No newline at end of file
diff --git a/kafka-connect-neo4j/doc/docker-compose.yml b/kafka-connect-neo4j/doc/docker-compose.yml
deleted file mode 100644
index 5e45aed2..00000000
--- a/kafka-connect-neo4j/doc/docker-compose.yml
+++ /dev/null
@@ -1,122 +0,0 @@
----
-version: '2'
-services:
- neo4j:
- image: neo4j:4.3-enterprise
- hostname: neo4j
- container_name: neo4j
- ports:
- - "7474:7474"
- - "7687:7687"
- environment:
- NEO4J_kafka_bootstrap_servers: broker:9093
- NEO4J_AUTH: neo4j/connect
- NEO4J_dbms_memory_heap_max__size: 8G
- NEO4J_ACCEPT_LICENSE_AGREEMENT: yes
-
- zookeeper:
- image: confluentinc/cp-zookeeper
- hostname: zookeeper
- container_name: zookeeper
- ports:
- - "2181:2181"
- environment:
- ZOOKEEPER_CLIENT_PORT: 2181
- ZOOKEEPER_TICK_TIME: 2000
-
- broker:
- image: confluentinc/cp-enterprise-kafka
- hostname: broker
- container_name: broker
- depends_on:
- - zookeeper
- ports:
- - "9092:9092"
- expose:
- - "9093"
- environment:
- KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9093,OUTSIDE://localhost:9092
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,OUTSIDE:PLAINTEXT
- KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9093,OUTSIDE://0.0.0.0:9092
- CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:9093
-
- # workaround if we change to a custom name the schema_registry fails to start
- KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
-
- KAFKA_BROKER_ID: 1
- KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
- KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
- KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
- KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
- CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
- CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
- CONFLUENT_METRICS_ENABLE: 'true'
- CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
-
- schema_registry:
- image: confluentinc/cp-schema-registry
- hostname: schema_registry
- container_name: schema_registry
- depends_on:
- - zookeeper
- - broker
- ports:
- - "8081:8081"
- environment:
- SCHEMA_REGISTRY_HOST_NAME: schema_registry
- SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'
-
- connect:
- image: confluentinc/cp-kafka-connect
- hostname: connect
- container_name: connect
- depends_on:
- - zookeeper
- - broker
- - schema_registry
- ports:
- - "8083:8083"
- volumes:
- - ./plugins:/tmp/connect-plugins
- environment:
- CONNECT_BOOTSTRAP_SERVERS: 'broker:9093'
- CONNECT_REST_ADVERTISED_HOST_NAME: connect
- CONNECT_REST_PORT: 8083
- CONNECT_GROUP_ID: compose-connect-group
- CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
- CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
- CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
- CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
- CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
- CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
- CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
- CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
- CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema_registry:8081'
- CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
- CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema_registry:8081'
- CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
- CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
- CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'
- CONNECT_PLUGIN_PATH: /usr/share/java,/tmp/connect-plugins
- CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=DEBUG,org.I0Itec.zkclient=DEBUG,org.reflections=ERROR
-
- control-center:
- image: confluentinc/cp-enterprise-control-center
- hostname: control-center
- container_name: control-center
- depends_on:
- - zookeeper
- - broker
- - schema_registry
- - connect
- ports:
- - "9021:9021"
- environment:
- CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:9093'
- CONTROL_CENTER_ZOOKEEPER_CONNECT: 'zookeeper:2181'
- CONTROL_CENTER_CONNECT_CLUSTER: 'connect:8083'
- CONTROL_CENTER_REPLICATION_FACTOR: 1
- CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
- CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
- CONFLUENT_METRICS_TOPIC_REPLICATION: 1
- PORT: 9021
\ No newline at end of file
diff --git a/kafka-connect-neo4j/doc/readme.adoc b/kafka-connect-neo4j/doc/readme.adoc
deleted file mode 100644
index 6db97828..00000000
--- a/kafka-connect-neo4j/doc/readme.adoc
+++ /dev/null
@@ -1,201 +0,0 @@
-= Build it locally
-
-Build the project by running the following command:
-
- mvn clean install
-
-Inside the directory `/kafka-connect-neo4j/target/component/packages` you'll find a file named `neo4j-kafka-connect-neo4j-.zip`
-
-== Sink
-
-=== Configuring the stack
-
-Create a directory `plugins` at the same level of the compose file and unzip the file `neo4j-kafka-connect-neo4j-.zip` inside it, then start the compose file
-
- docker-compose up -d
-
-Create the Sink instance:
-
-We'll define the Sink configuration as follows:
-
-[source,json]
-----
-include::contrib.sink.avro.neo4j.json[]
-----
-
-In particular this line:
-
-----
-"neo4j.topic.cypher.my-topic": "MERGE (p:Person{name: event.name, surname: event.surname}) MERGE (f:Family{name: event.surname}) MERGE (p)-[:BELONGS_TO]->(f)"
-----
-
-defines that all the data that comes from the topic `neo4j` will be unpacked by the Sink into Neo4j with the following Cypher query:
-
-[source,cypher]
-----
-MERGE (p:Person{name: event.name, surname: event.surname})
-MERGE (f:Family{name: event.surname})
-MERGE (p)-[:BELONGS_TO]->(f)
-----
-
-
-Under the hood the Sink inject the event object in this way
-
-[source,cypher]
-----
-UNWIND {batch} AS event
-MERGE (p:Person{name: event.name, surname: event.surname})
-MERGE (f:Family{name: event.surname})
-MERGE (p)-[:BELONGS_TO]->(f)
-----
-
-Where `{batch}` is a list of event objects.
-
-You can change the query or remove the property and add your own, but you must follow the following convention:
-
-[source,javascript]
-----
-"neo4j.topic.cypher.": ""
-----
-
-Let's load the configuration into the Confluent Platform with this REST call:
-
-[source,shell]
-----
-curl -X POST http://localhost:8083/connectors \
- -H 'Content-Type:application/json' \
- -H 'Accept:application/json' \
- -d @contrib.sink.avro.neo4j.json
-----
-
-The file `contrib.sink.string-json.neo4j.json` contains a configuration that manage a simple JSON producer example
-
-Please check that everything is fine by going into:
-
-http://localhost:9021/management/connect
-
-and click to the **Sink** tab. You must find a table just like this:
-
-[cols="4*",options="header"]
-|===
-|Status
-|Active Tasks
-|Name
-|Topics
-
-|Running
-|1
-|Neo4jSinkConnector
-|my-topic
-|===
-
-=== Use the data generator
-
-You can download and use the https://github.com/conker84/neo4j-streams-sink-tester/releases/download/1/neo4j-streams-sink-tester-1.0.jar[neo4j-streams-sink-tester-1.0.jar] in order to generate a sample dataset.
-
-This package sends records to the Neo4j Kafka Sink by using the following in two data formats:
-
-JSON example:
-
-[source,json]
-----
-{"name": "Name", "surname": "Surname"}
-----
-
-AVRO, with the schema:
-
-[source,json]
-----
-{
- "type":"record",
- "name":"User",
- "fields":[{"name":"name","type":"string"}, {"name":"surname","type":"string"}]
-}
-----
-
-Please type:
-
-----
-java -jar neo4j-streams-sink-tester-1.0.jar -h
-----
-
-to print the option list with default values.
-
-In order to choose the data format please use the `-f` flag: `-f AVRO` or `-f JSON` (the default value).
-So:
-
-----
-java -jar neo4j-streams-sink-tester-1.0.jar -f AVRO
-----
-
-Will send data in AVRO format.
-
-For a complete overview of the **Neo4j Steams Sink Tester** please refer to https://github.com/conker84/neo4j-streams-sink-tester[this repo]
-
-== Source
-
-=== Configuring the stack
-
-Create a directory `plugins` at the same level of the compose file and unzip the file `neo4j-kafka-connect-neo4j-.zip` inside it, then start the compose file
-
- docker-compose up -d
-
-=== Create the Source instance:
-
-In this chapter we'll discuss about how the Source instance works
-
-You can create a new Source instance with this REST call:
-
-[source,shell]
-----
-curl -X POST http://localhost:8083/connectors \
- -H 'Content-Type:application/json' \
- -H 'Accept:application/json' \
- -d @contrib.source.avro.neo4j.json
-----
-
-Let's look at the `contrib.source.avro.neo4j.json` file:
-
-[source,json]
-----
-include::contrib.source.avro.neo4j.json[]
-----
-
-This will create a Kafka Connect Source instance that will send `AVRO` message over the topic named `my-topic`. Every message in the
-topic will have the following structure:
-
-[source,json]
-----
-{"name": , "timestamp": }
-----
-
-**Nb.** Please check the <> for a detailed guide about the supported configuration
-parameters
-
-=== How the Source module pushes the data to the defined Kafka topic
-
-We use the query provided in the `neo4j.source.query` field by polling the database every value is into the
-`neo4j.streaming.poll.interval.msecs` field.
-
-So given the JSON configuration we have that we'll perform:
-
-[source,cypher]
-----
-MATCH (ts:TestSource) WHERE ts.timestamp > $lastCheck RETURN ts.name AS name, ts.timestamp AS timestamp
-----
-
-every 5000 milliseconds by publishing events like:
-
-[source,json]
-----
-{"name":{"string":"John Doe"},"timestamp":{"long":1624551349362}}
-----
-
-In this case we use `neo4j.enforce.schema=true` and this means that we will attach a schema for each record, in case
-you want to stream pure simple JSON strings just use the relative serializer with `neo4j.enforce.schema=false` with the
-following output:
-
-[source,json]
-----
-{"name": "John Doe", "timestamp": 1624549598834}
-----
\ No newline at end of file
diff --git a/kafka-connect-neo4j/docker/contrib.sink.avro.neo4j.json b/kafka-connect-neo4j/docker/contrib.sink.avro.neo4j.json
deleted file mode 100644
index fa4d4e96..00000000
--- a/kafka-connect-neo4j/docker/contrib.sink.avro.neo4j.json
+++ /dev/null
@@ -1,18 +0,0 @@
-{
- "name": "Neo4jSinkConnector",
- "config": {
- "topics": "my-topic",
- "connector.class": "streams.kafka.connect.sink.Neo4jSinkConnector",
- "errors.retry.timeout": "-1",
- "errors.retry.delay.max.ms": "1000",
- "errors.tolerance": "all",
- "errors.log.enable": true,
- "errors.deadletterqueue.topic.name": "test-error-topic",
- "errors.log.include.messages": true,
- "neo4j.server.uri": "bolt://neo4j:7687",
- "neo4j.authentication.basic.username": "neo4j",
- "neo4j.authentication.basic.password": "connect",
- "neo4j.encryption.enabled": false,
- "neo4j.topic.cypher.my-topic": "MERGE (p:Person{name: event.name, surname: event.surname}) MERGE (f:Family{name: event.surname}) MERGE (p)-[:BELONGS_TO]->(f)"
- }
-}
\ No newline at end of file
diff --git a/kafka-connect-neo4j/docker/contrib.sink.string-json.neo4j.json b/kafka-connect-neo4j/docker/contrib.sink.string-json.neo4j.json
deleted file mode 100644
index cda74429..00000000
--- a/kafka-connect-neo4j/docker/contrib.sink.string-json.neo4j.json
+++ /dev/null
@@ -1,21 +0,0 @@
-{
- "name": "Neo4jSinkConnector",
- "config": {
- "topics": "my-topic",
- "connector.class": "streams.kafka.connect.sink.Neo4jSinkConnector",
- "key.converter": "org.apache.kafka.connect.json.JsonConverter",
- "key.converter.schemas.enable": false,
- "value.converter": "org.apache.kafka.connect.json.JsonConverter",
- "value.converter.schemas.enable": false,
- "errors.retry.timeout": "-1",
- "errors.retry.delay.max.ms": "1000",
- "errors.tolerance": "all",
- "errors.log.enable": true,
- "errors.log.include.messages": true,
- "neo4j.server.uri": "bolt://neo4j:7687",
- "neo4j.authentication.basic.username": "neo4j",
- "neo4j.authentication.basic.password": "connect",
- "neo4j.encryption.enabled": false,
- "neo4j.topic.cypher.my-topic": "MERGE (p:Person{name: event.name, surname: event.surname}) MERGE (f:Family{name: event.surname}) MERGE (p)-[:BELONGS_TO]->(f)"
- }
-}
\ No newline at end of file
diff --git a/kafka-connect-neo4j/docker/contrib.source.avro.neo4j.json b/kafka-connect-neo4j/docker/contrib.source.avro.neo4j.json
deleted file mode 100644
index 58015c81..00000000
--- a/kafka-connect-neo4j/docker/contrib.source.avro.neo4j.json
+++ /dev/null
@@ -1,20 +0,0 @@
-{
- "name": "Neo4jSourceConnectorAVRO",
- "config": {
- "topic": "my-topic",
- "connector.class": "streams.kafka.connect.source.Neo4jSourceConnector",
- "key.converter": "io.confluent.connect.avro.AvroConverter",
- "value.converter": "io.confluent.connect.avro.AvroConverter",
- "key.converter.schema.registry.url": "http://schema_registry:8081",
- "value.converter.schema.registry.url": "http://schema_registry:8081",
- "neo4j.server.uri": "bolt://neo4j:7687",
- "neo4j.authentication.basic.username": "neo4j",
- "neo4j.authentication.basic.password": "connect",
- "neo4j.encryption.enabled": false,
- "neo4j.streaming.poll.interval.msecs": 5000,
- "neo4j.streaming.property": "timestamp",
- "neo4j.streaming.from": "LAST_COMMITTED",
- "neo4j.enforce.schema": true,
- "neo4j.source.query": "MATCH (ts:TestSource) WHERE ts.timestamp > $lastCheck RETURN ts.name AS name, ts.timestamp AS timestamp"
- }
-}
\ No newline at end of file
diff --git a/kafka-connect-neo4j/docker/contrib.source.string-json.neo4j.json b/kafka-connect-neo4j/docker/contrib.source.string-json.neo4j.json
deleted file mode 100644
index 4fed8eaf..00000000
--- a/kafka-connect-neo4j/docker/contrib.source.string-json.neo4j.json
+++ /dev/null
@@ -1,17 +0,0 @@
-{
- "name": "Neo4jSourceConnectorJSON",
- "config": {
- "topic": "my-topic",
- "connector.class": "streams.kafka.connect.source.Neo4jSourceConnector",
- "key.converter": "org.apache.kafka.connect.json.JsonConverter",
- "value.converter": "org.apache.kafka.connect.json.JsonConverter",
- "neo4j.server.uri": "bolt://neo4j:7687",
- "neo4j.authentication.basic.username": "neo4j",
- "neo4j.authentication.basic.password": "connect",
- "neo4j.encryption.enabled": false,
- "neo4j.streaming.poll.interval.msecs": 5000,
- "neo4j.streaming.property": "timestamp",
- "neo4j.streaming.from": "LAST_COMMITTED",
- "neo4j.source.query": "MATCH (ts:TestSource) WHERE ts.timestamp > $lastCheck RETURN ts.name AS name, ts.timestamp AS timestamp"
- }
-}
\ No newline at end of file
diff --git a/kafka-connect-neo4j/docker/contrib.source.string.neo4j.json b/kafka-connect-neo4j/docker/contrib.source.string.neo4j.json
deleted file mode 100644
index def80fd0..00000000
--- a/kafka-connect-neo4j/docker/contrib.source.string.neo4j.json
+++ /dev/null
@@ -1,17 +0,0 @@
-{
- "name": "Neo4jSourceConnectorString",
- "config": {
- "topic": "my-topic",
- "connector.class": "streams.kafka.connect.source.Neo4jSourceConnector",
- "key.converter": "org.apache.kafka.connect.storage.StringConverter",
- "value.converter": "org.apache.kafka.connect.storage.StringConverter",
- "neo4j.server.uri": "bolt://neo4j:7687",
- "neo4j.authentication.basic.username": "neo4j",
- "neo4j.authentication.basic.password": "connect",
- "neo4j.encryption.enabled": false,
- "neo4j.streaming.poll.interval.msecs": 5000,
- "neo4j.streaming.property": "timestamp",
- "neo4j.streaming.from": "LAST_COMMITTED",
- "neo4j.source.query": "MATCH (ts:TestSource) WHERE ts.timestamp > $lastCheck RETURN ts.name AS name, ts.timestamp AS timestamp"
- }
-}
\ No newline at end of file
diff --git a/kafka-connect-neo4j/docker/docker-compose.yml b/kafka-connect-neo4j/docker/docker-compose.yml
deleted file mode 100644
index d2dae351..00000000
--- a/kafka-connect-neo4j/docker/docker-compose.yml
+++ /dev/null
@@ -1,129 +0,0 @@
----
-version: '2'
-
-services:
- neo4j:
- image: neo4j:4.3-enterprise
- hostname: neo4j
- container_name: neo4j
- ports:
- - "7474:7474"
- - "7687:7687"
- environment:
- NEO4J_kafka_bootstrap_servers: broker:9093
- NEO4J_AUTH: neo4j/connect
- NEO4J_dbms_memory_heap_max__size: 8G
- NEO4J_ACCEPT_LICENSE_AGREEMENT: 'yes'
-
- zookeeper:
- image: confluentinc/cp-zookeeper
- hostname: zookeeper
- container_name: zookeeper
- ports:
- - "2181:2181"
- environment:
- ZOOKEEPER_CLIENT_PORT: 2181
- ZOOKEEPER_TICK_TIME: 2000
-
- broker:
- image: confluentinc/cp-enterprise-kafka
- hostname: broker
- container_name: broker
- depends_on:
- - zookeeper
- ports:
- - "9092:9092"
- expose:
- - "9093"
- environment:
- KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9093,OUTSIDE://localhost:9092
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,OUTSIDE:PLAINTEXT
- KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9093,OUTSIDE://0.0.0.0:9092
- CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:9093
-
- # workaround if we change to a custom name the schema_registry fails to start
- KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
-
- KAFKA_BROKER_ID: 1
- KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
- KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
- KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
- KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
- CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
- CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
- CONFLUENT_METRICS_ENABLE: 'true'
- CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
-
- schema_registry:
- image: confluentinc/cp-schema-registry
- hostname: schema_registry
- container_name: schema_registry
- depends_on:
- - zookeeper
- - broker
- ports:
- - "8081:8081"
- environment:
- SCHEMA_REGISTRY_HOST_NAME: schema_registry
- SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'
-
- connect:
- image: confluentinc/cp-kafka-connect
- hostname: connect
- container_name: connect
- depends_on:
- - zookeeper
- - broker
- - schema_registry
- ports:
- - "8083:8083"
- volumes:
- - ./plugins:/tmp/connect-plugins
- environment:
- CONNECT_BOOTSTRAP_SERVERS: 'broker:9093'
- CONNECT_REST_ADVERTISED_HOST_NAME: connect
- CONNECT_REST_PORT: 8083
- CONNECT_GROUP_ID: compose-connect-group
- CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
- CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
- CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
- CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
- CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
- CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
- CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
- CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
- CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema_registry:8081'
- CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
- CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema_registry:8081'
- CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
- CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
- CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'
- CONNECT_PLUGIN_PATH: /usr/share/java,/usr/share/confluent-hub-components,/tmp/connect-plugins
- CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=DEBUG,org.I0Itec.zkclient=DEBUG,org.reflections=ERROR
- command:
-# - bash
-# - -c
-# - |
-# confluent-hub install --no-prompt neo4j/kafka-connect-neo4j:latest
- /etc/confluent/docker/run
-
- control-center:
- image: confluentinc/cp-enterprise-control-center
- hostname: control-center
- container_name: control-center
- depends_on:
- - zookeeper
- - broker
- - schema_registry
- - connect
- ports:
- - "9021:9021"
- environment:
- CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:9093'
- CONTROL_CENTER_ZOOKEEPER_CONNECT: 'zookeeper:2181'
- CONTROL_CENTER_CONNECT_CLUSTER: 'connect:8083'
- CONTROL_CENTER_REPLICATION_FACTOR: 1
- CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
- CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
- CONFLUENT_METRICS_TOPIC_REPLICATION: 1
- PORT: 9021
diff --git a/kafka-connect-neo4j/docker/readme.adoc b/kafka-connect-neo4j/docker/readme.adoc
deleted file mode 100644
index 4804aac9..00000000
--- a/kafka-connect-neo4j/docker/readme.adoc
+++ /dev/null
@@ -1,121 +0,0 @@
-
-==== Configuration parameters
-:environment: neo4j
-:id: neo4j
-
-You can set the following configuration values via Confluent Connect UI, or via REST endpoint
-
-[cols="3*",subs="attributes",options="header"]
-|===
-|Field|Type|Description
-
-|{environment}.server.uri|String|The Bolt URI (default bolt://localhost:7687)
-|{environment}.authentication.type|enum[NONE, BASIC, KERBEROS]| The authentication type (default BASIC)
-|{environment}.batch.size|Int|The max number of events processed by the Cypher query (default 1000)
-|{environment}.batch.timeout.msecs|Long|The execution timeout for the cypher query (default 30000)
-|{environment}.authentication.basic.username|String| The authentication username
-|{environment}.authentication.basic.password|String| The authentication password
-|{environment}.authentication.basic.realm|String| The authentication realm
-|{environment}.authentication.kerberos.ticket|String| The Kerberos ticket
-|{environment}.encryption.enabled|Boolean| If the encryption is enabled (default false)
-|{environment}.encryption.trust.strategy|enum[TRUST_ALL_CERTIFICATES, TRUST_CUSTOM_CA_SIGNED_CERTIFICATES, TRUST_SYSTEM_CA_SIGNED_CERTIFICATES]| The Neo4j trust strategy (default TRUST_ALL_CERTIFICATES)
-|{environment}.encryption.ca.certificate.path|String| The path of the certificate
-|{environment}.connection.max.lifetime.msecs|Long| The max Neo4j connection lifetime (default 1 hour)
-|{environment}.connection.acquisition.timeout.msecs|Long| The max Neo4j acquisition timeout (default 1 hour)
-|{environment}.connection.liveness.check.timeout.msecs|Long| The max Neo4j liveness check timeout (default 1 hour)
-|{environment}.connection.max.pool.size|Int| The max pool size (default 100)
-|{environment}.load.balance.strategy|enum[ROUND_ROBIN, LEAST_CONNECTED]| The Neo4j load balance strategy (default LEAST_CONNECTED)
-|{environment}.batch.parallelize|Boolean|(default true) While concurrent batch processing improves throughput, it might cause out-of-order handling of events. Set to `false` if you need application of messages with strict ordering, e.g. for change-data-capture (CDC) events.
-|===
-
-==== Configuring the stack
-
-Start the compose file
-
-[source,bash]
-----
-docker-compose up -d
-----
-
-You can access your Neo4j instance under: http://localhost:7474, log in with `neo4j` as username and `connect` as password (see the docker-compose file to change it).
-
-===== Plugin installation
-
-You can choose your preferred way in order to install the plugin:
-
-* *Build it locally*
-+
---
-Build the project by running the following command:
-
-[source,bash]
-----
-mvn clean install
-----
-
-Create a directory `plugins` at the same level of the compose file and unzip the file `neo4j-kafka-connect-neo4j-.zip` inside it.
---
-
-* *Download the zip from the Confluent Hub*
-
-+
---
-Please go to the Confluent Hub page of the plugin:
-
-https://www.confluent.io/connector/kafka-connect-neo4j-sink/
-
-And click to the **Download Connector** button.
-
-Create a directory `plugins` at the same level of the compose file and unzip the file `neo4j-kafka-connect-neo4j-.zip` inside it.
---
-
-* *Download and install the plugin via Confluent Hub client*
-+
---
-If you are using the provided compose file you can easily install the plugin by using the Confluent Hub.
-
-Once the compose file is up and running you can install the plugin by executing the following command:
-
-[source,bash]
-----
-docker exec -it connect confluent-hub install neo4j/kafka-connect-neo4j:
-----
-
-When the installation will ask:
-
-[source,bash]
-----
-The component can be installed in any of the following Confluent Platform installations:
-----
-
-Please prefer the solution `(where this tool is installed)` and then go ahead with the default options.
-
-At the end of the process the plugin is automatically installed.
---
-
-==== Multi Database Support
-
-Neo4j 4.0 Enterprise has https://neo4j.com/docs/operations-manual/4.0/manage-databases/[multi-tenancy support],
-in order to support this feature you can define into the json (or via the Confluent UI)
-a param named `neo4j.database` which is the targeted database name.
-
-*N.b.* If no value is specified the connector will use the Neo4j's default db.
-
-==== Create the Sink Instance
-
-To create the Sink instance and configure your preferred ingestion strategy, you can follow instructions described
-into <> and <>
-sections.
-
-==== Create the Source Instance
-
-To create the Source instance and configure your preferred ingestion strategy, you can follow instructions described
-into <> section.
-
-===== Use the Kafka Connect Datagen
-
-In order to generate a sample dataset you can use Kafka Connect Datagen as explained in <> section.
-
-[NOTE]
-Before start using the data generator please create indexes in Neo4j (in order to speed-up the import process)
-
diff --git a/kafka-connect-neo4j/pom.xml b/kafka-connect-neo4j/pom.xml
deleted file mode 100644
index 33e8662b..00000000
--- a/kafka-connect-neo4j/pom.xml
+++ /dev/null
@@ -1,160 +0,0 @@
-
- 4.0.0
-
- org.neo4j
- kafka-connect-neo4j
- 2.0.2
- jar
-
- Kafka Connect Neo4j
- A Kafka Connect Neo4j Connector for kafka-connect-neo4j
-
-
- org.neo4j
- neo4j-streams-parent
- 4.1.2
-
-
-
- 5.0.0
- 0.11.1
- 3.1.0
- 0.3.141
- 32.1.1-jre
-
-
-
-
- confluent
- http://packages.confluent.io/maven/
-
-
-
-
-
- org.apache.kafka
- connect-api
- ${kafka.version}
- provided
-
-
- com.github.jcustenborder.kafka.connect
- connect-utils
- ${kafka.connect.utils.version}
-
-
-
- com.google.guava
- guava
- ${google.guava.version}
- provided
-
-
-
- org.neo4j
- neo4j-streams-common
- ${project.parent.version}
-
-
-
- org.neo4j
- neo4j-streams-test-support
- ${project.parent.version}
- test
-
-
-
- org.neo4j.driver
- neo4j-java-driver
-
-
-
-
-
-
- maven-resources-plugin
-
- ${project.build.outputDirectory}
-
-
- src/main/resources
- true
-
-
-
-
-
- io.confluent
- kafka-connect-maven-plugin
- ${confluent.connect.plugin.version}
-
-
-
- kafka-connect
-
-
-
-
- ${project.basedir}
- doc/
-
- README*
- LICENSE*
- NOTICE*
- licenses/
- docker/
-
-
-
-
- sink
- source
-
- neo4j
- organization
- Neo4j, Inc.
- https://neo4j.com/
- Neo4j Connector
- https://neo4j-contrib.github.io/neo4j-streams/#_kafka_connect
- It's a basic Apache Kafka Connect Neo4j Connector which allows moving data from Kafka topics into Neo4j via Cypher templated queries and vice versa.
- assets/neo4j-logo.png
- Neo4j Labs]]>
- https://github.com/neo4j-contrib/neo4j-streams/tree/master/kafka-connect-neo4j
- ${project.issueManagement.url}
- true
-
- neo4j
- nosql
- json
- graph
- nodes
- relationships
- cypher
-
-
-
-
-
-
-
-
-
-
- oss-kafka-connect
-
-
- com.google.guava
- guava
- ${google.guava.version}
- compile
-
-
-
- false
-
-
-
-
-
diff --git a/kafka-connect-neo4j/src/main/assembly/package.xml b/kafka-connect-neo4j/src/main/assembly/package.xml
deleted file mode 100644
index 5305539e..00000000
--- a/kafka-connect-neo4j/src/main/assembly/package.xml
+++ /dev/null
@@ -1,40 +0,0 @@
-
-
- package
-
- dir
-
- false
-
-
- ${project.basedir}
- share/doc/${project.name}/
-
- README*
- LICENSE*
- NOTICE*
- licenses/
-
-
-
- ${project.basedir}/config
- etc/${project.name}
-
- *
-
-
-
-
-
- share/kotlin/${project.name}
- true
- true
-
- org.apache.kafka:connect-api
-
-
-
-
diff --git a/kafka-connect-neo4j/src/main/kotlin/streams/kafka/connect/common/Neo4jConnectorConfig.kt b/kafka-connect-neo4j/src/main/kotlin/streams/kafka/connect/common/Neo4jConnectorConfig.kt
deleted file mode 100644
index 6a759ec7..00000000
--- a/kafka-connect-neo4j/src/main/kotlin/streams/kafka/connect/common/Neo4jConnectorConfig.kt
+++ /dev/null
@@ -1,351 +0,0 @@
-package streams.kafka.connect.common
-
-import com.github.jcustenborder.kafka.connect.utils.config.ConfigKeyBuilder
-import com.github.jcustenborder.kafka.connect.utils.config.ConfigUtils
-import com.github.jcustenborder.kafka.connect.utils.config.ValidEnum
-import com.github.jcustenborder.kafka.connect.utils.config.recommenders.Recommenders
-import com.github.jcustenborder.kafka.connect.utils.config.validators.Validators
-import com.github.jcustenborder.kafka.connect.utils.config.validators.filesystem.ValidFile
-import org.apache.kafka.common.config.AbstractConfig
-import org.apache.kafka.common.config.ConfigDef
-import org.apache.kafka.common.config.ConfigException
-import org.neo4j.driver.*
-import org.neo4j.driver.internal.async.pool.PoolSettings
-import org.neo4j.driver.net.ServerAddress
-import streams.kafka.connect.sink.AuthenticationType
-import streams.kafka.connect.utils.PropertiesUtil
-import java.io.File
-import java.net.URI
-import java.time.Duration
-import java.util.concurrent.TimeUnit
-
-object ConfigGroup {
- const val ENCRYPTION = "Encryption"
- const val CONNECTION = "Connection"
- const val AUTHENTICATION = "Authentication"
- const val TOPIC_CYPHER_MAPPING = "Topic Cypher Mapping"
- const val ERROR_REPORTING = "Error Reporting"
- const val BATCH = "Batch Management"
- const val RETRY = "Retry Strategy"
- const val DEPRECATED = "Deprecated Properties (please check the documentation)"
-}
-
-enum class ConnectorType { SINK, SOURCE }
-
-open class Neo4jConnectorConfig(configDef: ConfigDef,
- originals: Map<*, *>,
- private val type: ConnectorType): AbstractConfig(configDef, originals) {
- val encryptionEnabled: Boolean
- val encryptionTrustStrategy: Config.TrustStrategy.Strategy
- var encryptionCACertificateFile: File? = null
-
- val authenticationType: AuthenticationType
- val authenticationUsername: String
- val authenticationPassword: String
- val authenticationRealm: String
- val authenticationKerberosTicket: String
-
- val serverUri: List
- val connectionMaxConnectionLifetime: Long
- val connectionLivenessCheckTimeout: Long
- val connectionPoolMaxSize: Int
- val connectionAcquisitionTimeout: Long
-
- val retryBackoff: Long
- val retryMaxAttempts: Int
-
- val batchTimeout: Long
- val batchSize: Int
-
- val database: String
-
- init {
- database = getString(DATABASE)
- encryptionEnabled = getBoolean(ENCRYPTION_ENABLED)
- encryptionTrustStrategy = ConfigUtils
- .getEnum(Config.TrustStrategy.Strategy::class.java, this, ENCRYPTION_TRUST_STRATEGY)
- val encryptionCACertificatePATH = getString(ENCRYPTION_CA_CERTIFICATE_PATH) ?: ""
- if (encryptionCACertificatePATH != "") {
- encryptionCACertificateFile = File(encryptionCACertificatePATH)
- }
-
- authenticationType = ConfigUtils
- .getEnum(AuthenticationType::class.java, this, AUTHENTICATION_TYPE)
- authenticationRealm = getString(AUTHENTICATION_BASIC_REALM)
- authenticationUsername = getString(AUTHENTICATION_BASIC_USERNAME)
- authenticationPassword = getPassword(AUTHENTICATION_BASIC_PASSWORD).value()
- authenticationKerberosTicket = getPassword(AUTHENTICATION_KERBEROS_TICKET).value()
-
- serverUri = getString(SERVER_URI).split(",").map { URI(it) }
- connectionLivenessCheckTimeout = getLong(CONNECTION_LIVENESS_CHECK_TIMEOUT_MSECS)
- connectionMaxConnectionLifetime = getLong(CONNECTION_MAX_CONNECTION_LIFETIME_MSECS)
- connectionPoolMaxSize = getInt(CONNECTION_POOL_MAX_SIZE)
- connectionAcquisitionTimeout = getLong(CONNECTION_MAX_CONNECTION_ACQUISITION_TIMEOUT_MSECS)
-
- retryBackoff = getLong(RETRY_BACKOFF_MSECS)
- retryMaxAttempts = getInt(RETRY_MAX_ATTEMPTS)
-
- batchTimeout = getLong(BATCH_TIMEOUT_MSECS)
- batchSize = getInt(BATCH_SIZE)
- }
-
- fun hasSecuredURI() = serverUri.any { it.scheme.endsWith("+s", true) || it.scheme.endsWith("+ssc", true) }
-
- fun createDriver(): Driver {
- val configBuilder = Config.builder()
- configBuilder.withUserAgent("neo4j-kafka-connect-$type/${PropertiesUtil.getVersion()}")
-
- if (!this.hasSecuredURI()) {
- if (this.encryptionEnabled) {
- configBuilder.withEncryption()
- val trustStrategy: Config.TrustStrategy = when (this.encryptionTrustStrategy) {
- Config.TrustStrategy.Strategy.TRUST_ALL_CERTIFICATES -> Config.TrustStrategy.trustAllCertificates()
- Config.TrustStrategy.Strategy.TRUST_SYSTEM_CA_SIGNED_CERTIFICATES -> Config.TrustStrategy.trustSystemCertificates()
- Config.TrustStrategy.Strategy.TRUST_CUSTOM_CA_SIGNED_CERTIFICATES -> Config.TrustStrategy.trustCustomCertificateSignedBy(this.encryptionCACertificateFile)
- else -> {
- throw ConfigException(ENCRYPTION_TRUST_STRATEGY, this.encryptionTrustStrategy.toString(), "Encryption Trust Strategy is not supported.")
- }
- }
- configBuilder.withTrustStrategy(trustStrategy)
- } else {
- configBuilder.withoutEncryption()
- }
- }
-
- val authToken = when (this.authenticationType) {
- AuthenticationType.NONE -> AuthTokens.none()
- AuthenticationType.BASIC -> {
- if (this.authenticationRealm != "") {
- AuthTokens.basic(this.authenticationUsername, this.authenticationPassword, this.authenticationRealm)
- } else {
- AuthTokens.basic(this.authenticationUsername, this.authenticationPassword)
- }
- }
- AuthenticationType.KERBEROS -> AuthTokens.kerberos(this.authenticationKerberosTicket)
- }
- configBuilder.withMaxConnectionPoolSize(this.connectionPoolMaxSize)
- configBuilder.withMaxConnectionLifetime(this.connectionMaxConnectionLifetime, TimeUnit.MILLISECONDS)
- configBuilder.withConnectionAcquisitionTimeout(this.connectionAcquisitionTimeout, TimeUnit.MILLISECONDS)
- configBuilder.withMaxTransactionRetryTime(this.retryBackoff, TimeUnit.MILLISECONDS)
- configBuilder.withConnectionLivenessCheckTimeout(this.connectionLivenessCheckTimeout, TimeUnit.MINUTES)
- configBuilder.withResolver { address -> this.serverUri.map { ServerAddress.of(it.host, it.port) }.toSet() }
- val neo4jConfig = configBuilder.build()
-
- return GraphDatabase.driver(this.serverUri.firstOrNull(), authToken, neo4jConfig)
- }
-
- fun createSessionConfig(bookmarks: List = emptyList()): SessionConfig {
- val sessionConfigBuilder = SessionConfig.builder()
- if (this.database.isNotBlank()) {
- sessionConfigBuilder.withDatabase(this.database)
- }
- val accessMode = if (type == ConnectorType.SOURCE) {
- AccessMode.READ
- } else {
- AccessMode.WRITE
- }
- sessionConfigBuilder.withDefaultAccessMode(accessMode)
- sessionConfigBuilder.withBookmarks(bookmarks)
- return sessionConfigBuilder.build()
- }
-
- fun createTransactionConfig(): TransactionConfig {
- val batchTimeout = this.batchTimeout
- return if (batchTimeout > 0) {
- TransactionConfig.builder()
- .withTimeout(Duration.ofMillis(batchTimeout))
- .build()
- } else {
- TransactionConfig.empty()
- }
- }
-
- companion object {
- const val SERVER_URI = "neo4j.server.uri"
- const val DATABASE = "neo4j.database"
-
- const val AUTHENTICATION_TYPE = "neo4j.authentication.type"
- const val AUTHENTICATION_BASIC_USERNAME = "neo4j.authentication.basic.username"
- const val AUTHENTICATION_BASIC_PASSWORD = "neo4j.authentication.basic.password"
- const val AUTHENTICATION_BASIC_REALM = "neo4j.authentication.basic.realm"
- const val AUTHENTICATION_KERBEROS_TICKET = "neo4j.authentication.kerberos.ticket"
-
- const val ENCRYPTION_ENABLED = "neo4j.encryption.enabled"
- const val ENCRYPTION_TRUST_STRATEGY = "neo4j.encryption.trust.strategy"
- const val ENCRYPTION_CA_CERTIFICATE_PATH = "neo4j.encryption.ca.certificate.path"
-
- const val CONNECTION_MAX_CONNECTION_LIFETIME_MSECS = "neo4j.connection.max.lifetime.msecs"
- const val CONNECTION_MAX_CONNECTION_ACQUISITION_TIMEOUT_MSECS = "neo4j.connection.acquisition.timeout.msecs"
- const val CONNECTION_LIVENESS_CHECK_TIMEOUT_MSECS = "neo4j.connection.liveness.check.timeout.msecs"
- const val CONNECTION_POOL_MAX_SIZE = "neo4j.connection.max.pool.size"
-
- const val BATCH_SIZE = "neo4j.batch.size"
- const val BATCH_TIMEOUT_MSECS = "neo4j.batch.timeout.msecs"
-
- const val RETRY_BACKOFF_MSECS = "neo4j.retry.backoff.msecs"
- const val RETRY_MAX_ATTEMPTS = "neo4j.retry.max.attemps"
-
- const val CONNECTION_POOL_MAX_SIZE_DEFAULT = 100
- val BATCH_TIMEOUT_DEFAULT = TimeUnit.SECONDS.toMillis(0L)
- const val BATCH_SIZE_DEFAULT = 1000
- val RETRY_BACKOFF_DEFAULT = TimeUnit.SECONDS.toMillis(30L)
- const val RETRY_MAX_ATTEMPTS_DEFAULT = 5
-
- // Default values optimizations for Aura please look at: https://aura.support.neo4j.com/hc/en-us/articles/1500002493281-Neo4j-Java-driver-settings-for-Aura
- val CONNECTION_MAX_CONNECTION_LIFETIME_MSECS_DEFAULT = Duration.ofMinutes(8).toMillis()
- val CONNECTION_LIVENESS_CHECK_TIMEOUT_MSECS_DEFAULT = Duration.ofMinutes(2).toMillis()
-
-
- fun isValidQuery(session: Session, query: String) = try {
- session.run("EXPLAIN $query")
- true
- } catch (e: Exception) {
- false
- }
-
- fun config(): ConfigDef = ConfigDef()
- .define(ConfigKeyBuilder
- .of(AUTHENTICATION_TYPE, ConfigDef.Type.STRING)
- .documentation(PropertiesUtil.getProperty(AUTHENTICATION_TYPE))
- .importance(ConfigDef.Importance.HIGH)
- .defaultValue(AuthenticationType.BASIC.toString())
- .group(ConfigGroup.AUTHENTICATION)
- .validator(ValidEnum.of(AuthenticationType::class.java))
- .build())
- .define(ConfigKeyBuilder
- .of(AUTHENTICATION_BASIC_USERNAME, ConfigDef.Type.STRING)
- .documentation(PropertiesUtil.getProperty(AUTHENTICATION_BASIC_USERNAME))
- .importance(ConfigDef.Importance.HIGH)
- .defaultValue("")
- .group(ConfigGroup.AUTHENTICATION)
- .recommender(Recommenders.visibleIf(AUTHENTICATION_TYPE, AuthenticationType.BASIC.toString()))
- .build())
- .define(ConfigKeyBuilder
- .of(AUTHENTICATION_BASIC_PASSWORD, ConfigDef.Type.PASSWORD)
- .documentation(PropertiesUtil.getProperty(AUTHENTICATION_BASIC_PASSWORD))
- .importance(ConfigDef.Importance.HIGH)
- .defaultValue("")
- .group(ConfigGroup.AUTHENTICATION)
- .recommender(Recommenders.visibleIf(AUTHENTICATION_TYPE, AuthenticationType.BASIC.toString()))
- .build())
- .define(ConfigKeyBuilder
- .of(AUTHENTICATION_BASIC_REALM, ConfigDef.Type.STRING)
- .documentation(PropertiesUtil.getProperty(AUTHENTICATION_BASIC_REALM))
- .importance(ConfigDef.Importance.HIGH)
- .defaultValue("")
- .group(ConfigGroup.AUTHENTICATION)
- .recommender(Recommenders.visibleIf(AUTHENTICATION_TYPE, AuthenticationType.BASIC.toString()))
- .build())
- .define(ConfigKeyBuilder
- .of(AUTHENTICATION_KERBEROS_TICKET, ConfigDef.Type.PASSWORD)
- .documentation(PropertiesUtil.getProperty(AUTHENTICATION_KERBEROS_TICKET))
- .importance(ConfigDef.Importance.HIGH)
- .defaultValue("")
- .group(ConfigGroup.AUTHENTICATION)
- .recommender(Recommenders.visibleIf(AUTHENTICATION_TYPE, AuthenticationType.KERBEROS.toString()))
- .build())
- .define(ConfigKeyBuilder
- .of(SERVER_URI, ConfigDef.Type.STRING)
- .documentation(PropertiesUtil.getProperty(SERVER_URI))
- .importance(ConfigDef.Importance.HIGH)
- .defaultValue("bolt://localhost:7687")
- .group(ConfigGroup.CONNECTION)
- .validator(Validators.validURI("bolt", "bolt+routing", "bolt+s", "bolt+ssc","neo4j", "neo4j+s", "neo4j+ssc"))
- .build())
- .define(ConfigKeyBuilder
- .of(CONNECTION_POOL_MAX_SIZE, ConfigDef.Type.INT)
- .documentation(PropertiesUtil.getProperty(CONNECTION_POOL_MAX_SIZE))
- .importance(ConfigDef.Importance.LOW)
- .defaultValue(CONNECTION_POOL_MAX_SIZE_DEFAULT)
- .group(ConfigGroup.CONNECTION)
- .validator(ConfigDef.Range.atLeast(1))
- .build())
- .define(ConfigKeyBuilder
- .of(CONNECTION_MAX_CONNECTION_LIFETIME_MSECS, ConfigDef.Type.LONG)
- .documentation(PropertiesUtil.getProperty(CONNECTION_MAX_CONNECTION_LIFETIME_MSECS))
- .importance(ConfigDef.Importance.LOW)
- .defaultValue(CONNECTION_MAX_CONNECTION_LIFETIME_MSECS_DEFAULT)
- .group(ConfigGroup.CONNECTION)
- .validator(ConfigDef.Range.atLeast(1))
- .build())
- .define(ConfigKeyBuilder
- .of(CONNECTION_LIVENESS_CHECK_TIMEOUT_MSECS, ConfigDef.Type.LONG)
- .documentation(PropertiesUtil.getProperty(CONNECTION_LIVENESS_CHECK_TIMEOUT_MSECS))
- .importance(ConfigDef.Importance.LOW)
- .defaultValue(CONNECTION_LIVENESS_CHECK_TIMEOUT_MSECS_DEFAULT)
- .group(ConfigGroup.CONNECTION)
- .validator(ConfigDef.Range.atLeast(1))
- .build())
- .define(ConfigKeyBuilder
- .of(CONNECTION_MAX_CONNECTION_ACQUISITION_TIMEOUT_MSECS, ConfigDef.Type.LONG)
- .documentation(PropertiesUtil.getProperty(CONNECTION_MAX_CONNECTION_ACQUISITION_TIMEOUT_MSECS))
- .importance(ConfigDef.Importance.LOW)
- .defaultValue(PoolSettings.DEFAULT_CONNECTION_ACQUISITION_TIMEOUT)
- .group(ConfigGroup.CONNECTION)
- .validator(ConfigDef.Range.atLeast(1))
- .build())
- .define(ConfigKeyBuilder
- .of(ENCRYPTION_ENABLED, ConfigDef.Type.BOOLEAN)
- .documentation(PropertiesUtil.getProperty(ENCRYPTION_ENABLED))
- .importance(ConfigDef.Importance.HIGH)
- .defaultValue(false)
- .group(ConfigGroup.ENCRYPTION).build())
- .define(ConfigKeyBuilder
- .of(ENCRYPTION_TRUST_STRATEGY, ConfigDef.Type.STRING)
- .documentation(PropertiesUtil.getProperty(ENCRYPTION_TRUST_STRATEGY))
- .importance(ConfigDef.Importance.MEDIUM)
- .defaultValue(Config.TrustStrategy.Strategy.TRUST_ALL_CERTIFICATES.toString())
- .group(ConfigGroup.ENCRYPTION)
- .validator(ValidEnum.of(Config.TrustStrategy.Strategy::class.java))
- .recommender(Recommenders.visibleIf(ENCRYPTION_ENABLED, true))
- .build())
- .define(ConfigKeyBuilder
- .of(ENCRYPTION_CA_CERTIFICATE_PATH, ConfigDef.Type.STRING)
- .documentation(PropertiesUtil.getProperty(ENCRYPTION_CA_CERTIFICATE_PATH))
- .importance(ConfigDef.Importance.MEDIUM)
- .defaultValue("")
- .group(ConfigGroup.ENCRYPTION)
- .validator(Validators.blankOr(ValidFile.of())) // TODO check
- .recommender(Recommenders.visibleIf(
- ENCRYPTION_TRUST_STRATEGY,
- Config.TrustStrategy.Strategy.TRUST_CUSTOM_CA_SIGNED_CERTIFICATES.toString()))
- .build())
- .define(ConfigKeyBuilder
- .of(BATCH_SIZE, ConfigDef.Type.INT)
- .documentation(PropertiesUtil.getProperty(BATCH_SIZE))
- .importance(ConfigDef.Importance.LOW)
- .defaultValue(BATCH_SIZE_DEFAULT)
- .group(ConfigGroup.BATCH)
- .validator(ConfigDef.Range.atLeast(1))
- .build())
- .define(ConfigKeyBuilder
- .of(BATCH_TIMEOUT_MSECS, ConfigDef.Type.LONG)
- .documentation(PropertiesUtil.getProperty(BATCH_TIMEOUT_MSECS))
- .importance(ConfigDef.Importance.LOW)
- .defaultValue(BATCH_TIMEOUT_DEFAULT)
- .group(ConfigGroup.BATCH)
- .validator(ConfigDef.Range.atLeast(0)).build())
- .define(ConfigKeyBuilder
- .of(RETRY_BACKOFF_MSECS, ConfigDef.Type.LONG)
- .documentation(PropertiesUtil.getProperty(RETRY_BACKOFF_MSECS))
- .importance(ConfigDef.Importance.MEDIUM)
- .defaultValue(RETRY_BACKOFF_DEFAULT)
- .group(ConfigGroup.RETRY)
- .validator(ConfigDef.Range.atLeast(1))
- .build())
- .define(ConfigKeyBuilder
- .of(RETRY_MAX_ATTEMPTS, ConfigDef.Type.INT)
- .documentation(PropertiesUtil.getProperty(RETRY_MAX_ATTEMPTS))
- .importance(ConfigDef.Importance.MEDIUM)
- .defaultValue(RETRY_MAX_ATTEMPTS_DEFAULT)
- .group(ConfigGroup.RETRY)
- .validator(ConfigDef.Range.atLeast(1)).build())
- .define(ConfigKeyBuilder
- .of(DATABASE, ConfigDef.Type.STRING)
- .documentation(PropertiesUtil.getProperty(DATABASE))
- .importance(ConfigDef.Importance.HIGH)
- .group(ConfigGroup.CONNECTION)
- .defaultValue("")
- .build())
- }
-}
\ No newline at end of file
diff --git a/kafka-connect-neo4j/src/main/kotlin/streams/kafka/connect/sink/EventBuilder.kt b/kafka-connect-neo4j/src/main/kotlin/streams/kafka/connect/sink/EventBuilder.kt
deleted file mode 100644
index ce3cedaf..00000000
--- a/kafka-connect-neo4j/src/main/kotlin/streams/kafka/connect/sink/EventBuilder.kt
+++ /dev/null
@@ -1,33 +0,0 @@
-package streams.kafka.connect.sink
-
-import org.apache.kafka.connect.sink.SinkRecord
-import org.slf4j.Logger
-import org.slf4j.LoggerFactory
-import streams.kafka.connect.utils.toStreamsSinkEntity
-import streams.service.StreamsSinkEntity
-
-class EventBuilder {
- private var batchSize: Int? = null
- private lateinit var sinkRecords: Collection
-
- fun withBatchSize(batchSize: Int): EventBuilder {
- this.batchSize = batchSize
- return this
- }
-
- fun withSinkRecords(sinkRecords: Collection): EventBuilder {
- this.sinkRecords = sinkRecords
- return this
- }
-
- fun build(): Map>> { // >
- val batchSize = this.batchSize!!
- return this.sinkRecords
- .groupBy { it.topic() }
- .mapValues { entry ->
- val value = entry.value.map { it.toStreamsSinkEntity() }
- if (batchSize > value.size) listOf(value) else value.chunked(batchSize)
- }
- }
-
-}
\ No newline at end of file
diff --git a/kafka-connect-neo4j/src/main/kotlin/streams/kafka/connect/sink/Neo4jSinkConnector.kt b/kafka-connect-neo4j/src/main/kotlin/streams/kafka/connect/sink/Neo4jSinkConnector.kt
deleted file mode 100644
index cfbd042e..00000000
--- a/kafka-connect-neo4j/src/main/kotlin/streams/kafka/connect/sink/Neo4jSinkConnector.kt
+++ /dev/null
@@ -1,40 +0,0 @@
-package streams.kafka.connect.sink
-
-import com.github.jcustenborder.kafka.connect.utils.config.*
-import org.apache.kafka.common.config.ConfigDef
-import org.apache.kafka.connect.connector.Task
-import org.apache.kafka.connect.sink.SinkConnector
-import org.slf4j.LoggerFactory
-import streams.kafka.connect.utils.PropertiesUtil
-
-@Title("Neo4j Sink Connector")
-@Description("The Neo4j Sink connector reads data from Kafka and and writes the data to Neo4j using a Cypher Template")
-@DocumentationTip("If you need to control the size of transaction that is submitted to Neo4j you try adjusting the ``consumer.max.poll.records`` setting in the worker.properties for Kafka Connect.")
-@DocumentationNote("For each topic you can provide a Cypher Template by using the following syntax ``neo4j.topic.cypher.=``")
-class Neo4jSinkConnector: SinkConnector() {
- private lateinit var settings: Map
- private lateinit var config: Neo4jSinkConnectorConfig
- override fun taskConfigs(maxTasks: Int): MutableList> {
- return TaskConfigs.multiple(settings, maxTasks)
- }
-
- override fun start(props: MutableMap?) {
- settings = props!!
- config = Neo4jSinkConnectorConfig(settings)
- }
-
- override fun stop() {}
-
- override fun version(): String {
- return PropertiesUtil.getVersion()
- }
-
- override fun taskClass(): Class {
- return Neo4jSinkTask::class.java
- }
-
- override fun config(): ConfigDef {
- return Neo4jSinkConnectorConfig.config()
- }
-
-}
\ No newline at end of file
diff --git a/kafka-connect-neo4j/src/main/kotlin/streams/kafka/connect/sink/Neo4jSinkConnectorConfig.kt b/kafka-connect-neo4j/src/main/kotlin/streams/kafka/connect/sink/Neo4jSinkConnectorConfig.kt
deleted file mode 100644
index f7b4da7b..00000000
--- a/kafka-connect-neo4j/src/main/kotlin/streams/kafka/connect/sink/Neo4jSinkConnectorConfig.kt
+++ /dev/null
@@ -1,102 +0,0 @@
-package streams.kafka.connect.sink
-
-import com.github.jcustenborder.kafka.connect.utils.config.ConfigKeyBuilder
-import org.apache.kafka.common.config.ConfigDef
-import org.apache.kafka.common.config.ConfigException
-import org.apache.kafka.connect.sink.SinkTask
-import streams.kafka.connect.common.ConfigGroup
-import streams.kafka.connect.common.ConnectorType
-import streams.kafka.connect.common.Neo4jConnectorConfig
-import streams.kafka.connect.utils.PropertiesUtil
-import streams.service.TopicType
-import streams.service.TopicUtils
-import streams.service.Topics
-import streams.service.sink.strategy.SourceIdIngestionStrategyConfig
-
-enum class AuthenticationType {
- NONE, BASIC, KERBEROS
-}
-
-class Neo4jSinkConnectorConfig(originals: Map<*, *>): Neo4jConnectorConfig(config(), originals, ConnectorType.SINK) {
-
- val parallelBatches: Boolean
-
- val topics: Topics
-
- val strategyMap: Map
-
- val kafkaBrokerProperties: Map
-
- init {
- val sourceIdStrategyConfig = SourceIdIngestionStrategyConfig(getString(TOPIC_CDC_SOURCE_ID_LABEL_NAME), getString(TOPIC_CDC_SOURCE_ID_ID_NAME))
- topics = Topics.from(originals as Map, "streams.sink." to "neo4j.")
- strategyMap = TopicUtils.toStrategyMap(topics, sourceIdStrategyConfig)
-
- parallelBatches = getBoolean(BATCH_PARALLELIZE)
- val kafkaPrefix = "kafka."
- kafkaBrokerProperties = originals
- .filterKeys { it.startsWith(kafkaPrefix) }
- .mapKeys { it.key.substring(kafkaPrefix.length) }
- validateAllTopics(originals)
- }
-
- private fun validateAllTopics(originals: Map<*, *>) {
- TopicUtils.validate(this.topics)
- val topics = if (originals.containsKey(SinkTask.TOPICS_CONFIG)) {
- originals[SinkTask.TOPICS_CONFIG].toString()
- .split(",")
- .map { it.trim() }
- .sorted()
- } else { // TODO manage regexp
- emptyList()
- }
- val allTopics = this.topics
- .allTopics()
- .sorted()
- if (topics != allTopics) {
- throw ConfigException("There is a mismatch between topics defined into the property `${SinkTask.TOPICS_CONFIG}` ($topics) and configured topics ($allTopics)")
- }
- }
-
- companion object {
-
- const val BATCH_PARALLELIZE = "neo4j.batch.parallelize"
-
- const val TOPIC_CYPHER_PREFIX = "neo4j.topic.cypher."
- const val TOPIC_CDC_SOURCE_ID = "neo4j.topic.cdc.sourceId"
- const val TOPIC_CDC_SOURCE_ID_LABEL_NAME = "neo4j.topic.cdc.sourceId.labelName"
- const val TOPIC_CDC_SOURCE_ID_ID_NAME = "neo4j.topic.cdc.sourceId.idName"
- const val TOPIC_PATTERN_NODE_PREFIX = "neo4j.topic.pattern.node."
- const val TOPIC_PATTERN_RELATIONSHIP_PREFIX = "neo4j.topic.pattern.relationship."
- const val TOPIC_CDC_SCHEMA = "neo4j.topic.cdc.schema"
- const val TOPIC_CUD = "neo4j.topic.cud"
-
- private val sourceIdIngestionStrategyConfig = SourceIdIngestionStrategyConfig()
-
- fun config(): ConfigDef = Neo4jConnectorConfig.config()
- .define(ConfigKeyBuilder.of(TOPIC_CDC_SOURCE_ID, ConfigDef.Type.STRING)
- .documentation(PropertiesUtil.getProperty(TOPIC_CDC_SOURCE_ID)).importance(ConfigDef.Importance.HIGH)
- .defaultValue("").group(ConfigGroup.TOPIC_CYPHER_MAPPING)
- .build())
- .define(ConfigKeyBuilder.of(TOPIC_CDC_SOURCE_ID_LABEL_NAME, ConfigDef.Type.STRING)
- .documentation(PropertiesUtil.getProperty(TOPIC_CDC_SOURCE_ID_LABEL_NAME)).importance(ConfigDef.Importance.HIGH)
- .defaultValue(sourceIdIngestionStrategyConfig.labelName).group(ConfigGroup.TOPIC_CYPHER_MAPPING)
- .build())
- .define(ConfigKeyBuilder.of(TOPIC_CDC_SOURCE_ID_ID_NAME, ConfigDef.Type.STRING)
- .documentation(PropertiesUtil.getProperty(TOPIC_CDC_SOURCE_ID_ID_NAME)).importance(ConfigDef.Importance.HIGH)
- .defaultValue(sourceIdIngestionStrategyConfig.idName).group(ConfigGroup.TOPIC_CYPHER_MAPPING)
- .build())
- .define(ConfigKeyBuilder.of(TOPIC_CDC_SCHEMA, ConfigDef.Type.STRING)
- .documentation(PropertiesUtil.getProperty(TOPIC_CDC_SCHEMA)).importance(ConfigDef.Importance.HIGH)
- .defaultValue("").group(ConfigGroup.TOPIC_CYPHER_MAPPING)
- .build())
- .define(ConfigKeyBuilder.of(BATCH_PARALLELIZE, ConfigDef.Type.BOOLEAN)
- .documentation(PropertiesUtil.getProperty(BATCH_PARALLELIZE)).importance(ConfigDef.Importance.MEDIUM)
- .defaultValue(true).group(ConfigGroup.BATCH)
- .build())
- .define(ConfigKeyBuilder.of(TOPIC_CUD, ConfigDef.Type.STRING)
- .documentation(PropertiesUtil.getProperty(TOPIC_CUD)).importance(ConfigDef.Importance.HIGH)
- .defaultValue("").group(ConfigGroup.TOPIC_CYPHER_MAPPING)
- .build())
- }
-}
\ No newline at end of file
diff --git a/kafka-connect-neo4j/src/main/kotlin/streams/kafka/connect/sink/Neo4jSinkService.kt b/kafka-connect-neo4j/src/main/kotlin/streams/kafka/connect/sink/Neo4jSinkService.kt
deleted file mode 100644
index 33fda096..00000000
--- a/kafka-connect-neo4j/src/main/kotlin/streams/kafka/connect/sink/Neo4jSinkService.kt
+++ /dev/null
@@ -1,103 +0,0 @@
-package streams.kafka.connect.sink
-
-import kotlinx.coroutines.Dispatchers
-import kotlinx.coroutines.ExperimentalCoroutinesApi
-import kotlinx.coroutines.ObsoleteCoroutinesApi
-import kotlinx.coroutines.async
-import kotlinx.coroutines.awaitAll
-import kotlinx.coroutines.runBlocking
-import org.apache.kafka.connect.errors.ConnectException
-import org.neo4j.driver.Bookmark
-import org.neo4j.driver.Driver
-import org.neo4j.driver.TransactionConfig
-import org.neo4j.driver.exceptions.ClientException
-import org.neo4j.driver.exceptions.TransientException
-import org.slf4j.Logger
-import org.slf4j.LoggerFactory
-import streams.extensions.errors
-import streams.service.StreamsSinkEntity
-import streams.service.StreamsSinkService
-import streams.utils.StreamsUtils
-import streams.utils.retryForException
-import kotlin.streams.toList
-
-
-class Neo4jSinkService(private val config: Neo4jSinkConnectorConfig):
- StreamsSinkService(Neo4jStrategyStorage(config)) {
-
- private val log: Logger = LoggerFactory.getLogger(Neo4jSinkService::class.java)
-
- private val driver: Driver = config.createDriver()
- private val transactionConfig: TransactionConfig = config.createTransactionConfig()
-
- private val bookmarks = mutableListOf()
-
- fun close() {
- StreamsUtils.closeSafetely(driver) {
- log.info("Error while closing Driver instance:", it)
- }
- }
-
- override fun write(query: String, events: Collection) {
- val data = mapOf("events" to events)
- driver.session(config.createSessionConfig(bookmarks)).use { session ->
- try {
- runBlocking {
- retryForException(exceptions = arrayOf(ClientException::class.java, TransientException::class.java),
- retries = config.retryMaxAttempts, delayTime = 0) { // we use the delayTime = 0, because we delegate the retryBackoff to the Neo4j Java Driver
-
- session.writeTransaction({
- val result = it.run(query, data)
- if (log.isDebugEnabled) {
- val summary = result.consume()
- log.debug("Successfully executed query: `$query`. Summary: $summary")
- }
- }, transactionConfig)
- }
- }
- } catch (e: Exception) {
- bookmarks += session.lastBookmark()
- if (log.isDebugEnabled) {
- val subList = events.stream()
- .limit(5.coerceAtMost(events.size).toLong())
- .toList()
- log.debug("Exception `${e.message}` while executing query: `$query`, with data: `$subList` total-records ${events.size}")
- }
- throw e
- }
- }
- }
-
- fun writeData(data: Map>>) {
- val errors = if (config.parallelBatches) writeDataAsync(data) else writeDataSync(data);
- if (errors.isNotEmpty()) {
- throw ConnectException(errors.map { it.message }.toSet()
- .joinToString("\n", "Errors executing ${data.values.map { it.size }.sum()} jobs:\n"))
- }
- }
-
- @ExperimentalCoroutinesApi
- @ObsoleteCoroutinesApi
- private fun writeDataAsync(data: Map>>) = runBlocking {
- val jobs = data
- .flatMap { (topic, records) ->
- records.map { async (Dispatchers.IO) { writeForTopic(topic, it) } }
- }
-
- // timeout starts in writeTransaction()
- jobs.awaitAll()
- jobs.mapNotNull { it.errors() }
- }
-
- private fun writeDataSync(data: Map>>) =
- data.flatMap { (topic, records) ->
- records.mapNotNull {
- try {
- writeForTopic(topic, it)
- null
- } catch (e: Exception) {
- e
- }
- }
- }
-}
\ No newline at end of file
diff --git a/kafka-connect-neo4j/src/main/kotlin/streams/kafka/connect/sink/Neo4jSinkTask.kt b/kafka-connect-neo4j/src/main/kotlin/streams/kafka/connect/sink/Neo4jSinkTask.kt
deleted file mode 100644
index a3c34b73..00000000
--- a/kafka-connect-neo4j/src/main/kotlin/streams/kafka/connect/sink/Neo4jSinkTask.kt
+++ /dev/null
@@ -1,55 +0,0 @@
-package streams.kafka.connect.sink
-
-import com.github.jcustenborder.kafka.connect.utils.VersionUtil
-import org.apache.kafka.connect.sink.SinkRecord
-import org.apache.kafka.connect.sink.SinkTask
-import org.slf4j.Logger
-import org.slf4j.LoggerFactory
-import streams.extensions.asProperties
-import streams.service.errors.ErrorData
-import streams.service.errors.ErrorService
-import streams.service.errors.KafkaErrorService
-import streams.utils.StreamsUtils
-
-
-class Neo4jSinkTask : SinkTask() {
- private val log: Logger = LoggerFactory.getLogger(Neo4jSinkTask::class.java)
- private lateinit var config: Neo4jSinkConnectorConfig
- private lateinit var neo4jSinkService: Neo4jSinkService
- private lateinit var errorService: ErrorService
-
- override fun version(): String {
- return VersionUtil.version(this.javaClass as Class<*>)
- }
-
- override fun start(map: Map) {
- this.config = Neo4jSinkConnectorConfig(map)
- this.neo4jSinkService = Neo4jSinkService(this.config)
- this.errorService = KafkaErrorService(this.config.kafkaBrokerProperties.asProperties(),
- ErrorService.ErrorConfig.from(map.asProperties()),
- log::error)
- }
-
- override fun put(collection: Collection) {
- if (collection.isEmpty()) {
- return
- }
- try {
- val data = EventBuilder()
- .withBatchSize(config.batchSize)
- .withSinkRecords(collection)
- .build()
-
- neo4jSinkService.writeData(data)
- } catch(e:Exception) {
- errorService.report(collection.map {
- ErrorData(it.topic(), it.timestamp(), it.key(), it.value(), it.kafkaPartition(), it.kafkaOffset(), this::class.java, this.config.database, e)
- })
- }
- }
-
- override fun stop() {
- log.info("Stop() - Neo4j Sink Service")
- StreamsUtils.ignoreExceptions({ neo4jSinkService.close() }, UninitializedPropertyAccessException::class.java)
- }
-}
\ No newline at end of file
diff --git a/kafka-connect-neo4j/src/main/kotlin/streams/kafka/connect/sink/Neo4jStrategyStorage.kt b/kafka-connect-neo4j/src/main/kotlin/streams/kafka/connect/sink/Neo4jStrategyStorage.kt
deleted file mode 100644
index 2d028696..00000000
--- a/kafka-connect-neo4j/src/main/kotlin/streams/kafka/connect/sink/Neo4jStrategyStorage.kt
+++ /dev/null
@@ -1,34 +0,0 @@
-package streams.kafka.connect.sink
-
-import streams.service.StreamsStrategyStorage
-import streams.service.TopicType
-import streams.service.sink.strategy.CUDIngestionStrategy
-import streams.service.sink.strategy.CypherTemplateStrategy
-import streams.service.sink.strategy.IngestionStrategy
-import streams.service.sink.strategy.NodePatternIngestionStrategy
-import streams.service.sink.strategy.RelationshipPatternIngestionStrategy
-import streams.service.sink.strategy.SchemaIngestionStrategy
-import streams.service.sink.strategy.SourceIdIngestionStrategy
-import streams.service.sink.strategy.SourceIdIngestionStrategyConfig
-
-class Neo4jStrategyStorage(val config: Neo4jSinkConnectorConfig): StreamsStrategyStorage() {
- private val topicConfigMap = config.topics.asMap()
-
- override fun getTopicType(topic: String): TopicType? = TopicType.values().firstOrNull { topicType ->
- when (val topicConfig = topicConfigMap.getOrDefault(topicType, emptyList())) {
- is Collection<*> -> topicConfig.contains(topic)
- is Map<*, *> -> topicConfig.containsKey(topic)
- else -> false
- }
- }
-
- override fun getStrategy(topic: String): IngestionStrategy = when (val topicType = getTopicType(topic)) {
- TopicType.CDC_SOURCE_ID -> config.strategyMap[topicType] as SourceIdIngestionStrategy
- TopicType.CDC_SCHEMA -> SchemaIngestionStrategy()
- TopicType.CUD -> CUDIngestionStrategy()
- TopicType.PATTERN_NODE -> NodePatternIngestionStrategy(config.topics.nodePatternTopics.getValue(topic))
- TopicType.PATTERN_RELATIONSHIP -> RelationshipPatternIngestionStrategy(config.topics.relPatternTopics.getValue(topic))
- TopicType.CYPHER -> CypherTemplateStrategy(config.topics.cypherTopics.getValue(topic))
- null -> throw RuntimeException("Topic Type not Found")
- }
-}
\ No newline at end of file
diff --git a/kafka-connect-neo4j/src/main/kotlin/streams/kafka/connect/sink/converters/MapValueConverter.kt b/kafka-connect-neo4j/src/main/kotlin/streams/kafka/connect/sink/converters/MapValueConverter.kt
deleted file mode 100644
index 13b925df..00000000
--- a/kafka-connect-neo4j/src/main/kotlin/streams/kafka/connect/sink/converters/MapValueConverter.kt
+++ /dev/null
@@ -1,105 +0,0 @@
-package streams.kafka.connect.sink.converters
-
-import com.github.jcustenborder.kafka.connect.utils.data.AbstractConverter
-import org.apache.kafka.connect.data.Schema
-import org.apache.kafka.connect.data.Struct
-import java.math.BigDecimal
-import java.util.*
-
-open class MapValueConverter: AbstractConverter>() {
-
- open fun setValue(result: MutableMap?, fieldName: String, value: Any?) {
- if (result != null) {
- result[fieldName] = value as T
- }
- }
-
- override fun newValue(): MutableMap {
- return mutableMapOf()
- }
-
- override fun setBytesField(result: MutableMap?, fieldName: String, value: ByteArray?) {
- setValue(result, fieldName, value)
- }
-
- override fun setStringField(result: MutableMap?, fieldName: String, value: String?) {
- setValue(result, fieldName, value)
- }
-
- override fun setFloat32Field(result: MutableMap?, fieldName: String, value: Float?) {
- setValue(result, fieldName, value)
- }
-
- override fun setInt32Field(result: MutableMap?, fieldName: String, value: Int?) {
- setValue(result, fieldName, value)
- }
-
- override fun setArray(result: MutableMap?, fieldName: String, schema: Schema?, array: MutableList?) {
- val convertedArray = array?.map { convertInner(it) }
- setValue(result, fieldName, convertedArray)
- }
-
- override fun setTimestampField(result: MutableMap?, fieldName: String, value: Date) {
- setValue(result, fieldName, value)
-
- }
-
- override fun setTimeField(result: MutableMap?, fieldName: String, value: Date) {
- setValue(result, fieldName, value)
- }
-
- override fun setInt8Field(result: MutableMap?, fieldName: String, value: Byte) {
- setValue(result, fieldName, value)
- }
-
- override fun setStructField(result: MutableMap?, fieldName: String, value: Struct) {
- val converted = convert(value) as MutableMap
- setMap(result, fieldName, null, converted)
- }
-
- override fun setMap(result: MutableMap?, fieldName: String, schema: Schema?, value: MutableMap?) {
- if (value != null) {
- val converted = convert(value) as MutableMap
- setValue(result, fieldName, converted)
- } else {
- setNullField(result, fieldName)
- }
- }
-
- override fun setNullField(result: MutableMap?, fieldName: String) {
- setValue(result, fieldName, null)
- }
-
- override fun setFloat64Field(result: MutableMap?, fieldName: String, value: Double) {
- setValue(result, fieldName, value)
- }
-
- override fun setInt16Field(result: MutableMap?, fieldName: String, value: Short) {
- setValue(result, fieldName, value)
- }
-
- override fun setInt64Field(result: MutableMap?, fieldName: String, value: Long) {
- setValue(result, fieldName, value)
- }
-
- override fun setBooleanField(result: MutableMap?, fieldName: String, value: Boolean) {
- setValue(result, fieldName, value)
- }
-
- override fun setDecimalField(result: MutableMap?, fieldName: String, value: BigDecimal) {
- setValue(result, fieldName, value)
- }
-
- override fun setDateField(result: MutableMap?, fieldName: String, value: Date) {
- setValue(result, fieldName, value)
- }
-
- open fun convertInner(value: Any?): Any? {
- return when (value) {
- is Struct, is Map<*, *> -> convert(value)
- is Collection<*> -> value.map(::convertInner)
- is Array<*> -> if (value.javaClass.componentType.isPrimitive) value else value.map(::convertInner)
- else -> value
- }
- }
-}
\ No newline at end of file
diff --git a/kafka-connect-neo4j/src/main/kotlin/streams/kafka/connect/sink/converters/Neo4jValueConverter.kt b/kafka-connect-neo4j/src/main/kotlin/streams/kafka/connect/sink/converters/Neo4jValueConverter.kt
deleted file mode 100644
index 60ece516..00000000
--- a/kafka-connect-neo4j/src/main/kotlin/streams/kafka/connect/sink/converters/Neo4jValueConverter.kt
+++ /dev/null
@@ -1,63 +0,0 @@
-package streams.kafka.connect.sink.converters
-
-import org.apache.kafka.connect.data.Struct
-import org.neo4j.driver.Value
-import org.neo4j.driver.Values
-import java.math.BigDecimal
-import java.time.LocalTime
-import java.time.ZoneId
-import java.util.Date
-import java.util.concurrent.TimeUnit
-
-
-class Neo4jValueConverter: MapValueConverter() {
-
- companion object {
- @JvmStatic private val UTC = ZoneId.of("UTC")
- }
-
- override fun setValue(result: MutableMap?, fieldName: String, value: Any?) {
- if (result != null) {
- result[fieldName] = Values.value(value) ?: Values.NULL
- }
- }
-
- override fun newValue(): MutableMap {
- return mutableMapOf()
- }
-
- override fun setDecimalField(result: MutableMap?, fieldName: String, value: BigDecimal) {
- val doubleValue = value.toDouble()
- val fitsScale = doubleValue != Double.POSITIVE_INFINITY
- && doubleValue != Double.NEGATIVE_INFINITY
- && value.compareTo(doubleValue.let { BigDecimal.valueOf(it) }) == 0
- if (fitsScale) {
- setValue(result, fieldName, doubleValue)
- } else {
- setValue(result, fieldName, value.toPlainString())
- }
- }
-
- override fun setTimestampField(result: MutableMap?, fieldName: String, value: Date) {
- val localDate = value.toInstant().atZone(UTC).toLocalDateTime()
- setValue(result, fieldName, localDate)
-
- }
-
- override fun setTimeField(result: MutableMap?, fieldName: String, value: Date) {
- val time = LocalTime.ofNanoOfDay(TimeUnit.MILLISECONDS.toNanos(value.time))
- setValue(result, fieldName, time)
- }
-
- override fun setDateField(result: MutableMap?, fieldName: String, value: Date) {
- val localDate = value.toInstant().atZone(UTC).toLocalDate()
- setValue(result, fieldName, localDate)
- }
-
- override fun setStructField(result: MutableMap?, fieldName: String, value: Struct) {
- val converted = convert(value)
- .mapValues { it.value?.asObject() }
- .toMutableMap() as MutableMap
- setMap(result, fieldName, null, converted)
- }
-}
\ No newline at end of file
diff --git a/kafka-connect-neo4j/src/main/kotlin/streams/kafka/connect/source/Neo4jSourceConnector.kt b/kafka-connect-neo4j/src/main/kotlin/streams/kafka/connect/source/Neo4jSourceConnector.kt
deleted file mode 100644
index 4bf7dbd2..00000000
--- a/kafka-connect-neo4j/src/main/kotlin/streams/kafka/connect/source/Neo4jSourceConnector.kt
+++ /dev/null
@@ -1,38 +0,0 @@
-package streams.kafka.connect.source
-
-import com.github.jcustenborder.kafka.connect.utils.config.*
-import kotlinx.coroutines.Dispatchers
-import kotlinx.coroutines.GlobalScope
-import kotlinx.coroutines.Job
-import kotlinx.coroutines.launch
-import org.apache.kafka.common.config.ConfigDef
-import org.apache.kafka.connect.connector.Task
-import org.apache.kafka.connect.source.SourceConnector
-import streams.kafka.connect.utils.PropertiesUtil
-
-@Title("Neo4j Source Connector")
-@Description("The Neo4j Source connector reads data from Neo4j and and writes the data to a Kafka Topic")
-class Neo4jSourceConnector: SourceConnector() {
- private lateinit var settings: Map
- private lateinit var config: Neo4jSourceConnectorConfig
-
- // TODO Add monitor thread when we want to have schema on LABELS and RELATIONSHIP query type
-
- // TODO: for now we support just one task we need to implement
- // a SKIP/LIMIT mechanism in case we want parallelize
- override fun taskConfigs(maxTasks: Int): List