From 1788c3596461a5c8e4febbc09fce46d94ac5d502 Mon Sep 17 00:00:00 2001 From: Andrea Santurbano Date: Tue, 22 Aug 2023 17:11:45 +0200 Subject: [PATCH] fixes #567: ( Impact on BC/DR - Data loss ) - High priority, Nodes with PointValue property cannot be successfully sink to Neo4j With Neo4j Streams Plugin --- .../main/kotlin/streams/utils/JSONUtils.kt | 134 ++++ .../kafka/KafkaEventSinkCDCTSE.kt | 579 ++++++++++++++++++ .../kafka/connect/sink/Neo4jSinkTask.kt | 1 + .../kafka/connect/sink/Neo4jSinkTaskTest.kt | 96 ++- 4 files changed, 799 insertions(+), 11 deletions(-) create mode 100644 consumer/src/test/kotlin/integrations/kafka/KafkaEventSinkCDCTSE.kt diff --git a/common/src/main/kotlin/streams/utils/JSONUtils.kt b/common/src/main/kotlin/streams/utils/JSONUtils.kt index 5e0411b7..52a98ed1 100644 --- a/common/src/main/kotlin/streams/utils/JSONUtils.kt +++ b/common/src/main/kotlin/streams/utils/JSONUtils.kt @@ -2,8 +2,12 @@ package streams.utils import com.fasterxml.jackson.core.JsonGenerator import com.fasterxml.jackson.core.JsonParseException +import com.fasterxml.jackson.core.JsonParser import com.fasterxml.jackson.core.JsonProcessingException +import com.fasterxml.jackson.databind.DeserializationContext import com.fasterxml.jackson.databind.DeserializationFeature +import com.fasterxml.jackson.databind.JsonDeserializer +import com.fasterxml.jackson.databind.JsonNode import com.fasterxml.jackson.databind.JsonSerializer import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.databind.SerializationFeature @@ -13,10 +17,18 @@ import com.fasterxml.jackson.module.kotlin.convertValue import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper import com.fasterxml.jackson.module.kotlin.readValue import org.neo4j.driver.Value +import org.neo4j.driver.Values import org.neo4j.driver.internal.value.PointValue import org.neo4j.driver.types.Node import org.neo4j.driver.types.Point import org.neo4j.driver.types.Relationship +import streams.events.EntityType +import streams.events.Meta +import streams.events.NodePayload +import streams.events.Payload +import streams.events.RecordChange +import streams.events.RelationshipPayload +import streams.events.Schema import streams.events.StreamsTransactionEvent import streams.events.StreamsTransactionNodeEvent import streams.events.StreamsTransactionRelationshipEvent @@ -94,6 +106,125 @@ class DriverRelationshipSerializer : JsonSerializer() { } } +class StreamsTransactionRelationshipEventDeserializer : StreamsTransactionEventDeserializer() { + override fun createEvent(meta: Meta, payload: RelationshipPayload, schema: Schema): StreamsTransactionRelationshipEvent { + return StreamsTransactionRelationshipEvent(meta, payload, schema) + } + + override fun convertPayload(payloadMap: JsonNode): RelationshipPayload { + return JSONUtils.convertValue(payloadMap) + } + + override fun fillPayload(payload: RelationshipPayload, + beforeProps: Map?, + afterProps: Map?): RelationshipPayload { + return payload.copy( + before = payload.before?.copy(properties = beforeProps), + after = payload.after?.copy(properties = afterProps) + ) + } + + override fun deserialize(parser: JsonParser, context: DeserializationContext): StreamsTransactionRelationshipEvent { + val deserialized = super.deserialize(parser, context) + if (deserialized.payload.type == EntityType.node) { + throw IllegalArgumentException("Relationship event expected, but node type found") + } + return deserialized + } + +} + +class StreamsTransactionNodeEventDeserializer : StreamsTransactionEventDeserializer() { + override fun createEvent(meta: Meta, payload: NodePayload, schema: Schema): StreamsTransactionNodeEvent { + return StreamsTransactionNodeEvent(meta, payload, schema) + } + + override fun convertPayload(payloadMap: JsonNode): NodePayload { + return JSONUtils.convertValue(payloadMap) + } + + override fun fillPayload(payload: NodePayload, + beforeProps: Map?, + afterProps: Map?): NodePayload { + return payload.copy( + before = payload.before?.copy(properties = beforeProps), + after = payload.after?.copy(properties = afterProps) + ) + } + + override fun deserialize(parser: JsonParser, context: DeserializationContext): StreamsTransactionNodeEvent { + val deserialized = super.deserialize(parser, context) + if (deserialized.payload.type == EntityType.relationship) { + throw IllegalArgumentException("Node event expected, but relationship type found") + } + return deserialized + } + +} + +abstract class StreamsTransactionEventDeserializer : JsonDeserializer() { + + abstract fun createEvent(meta: Meta, payload: PAYLOAD, schema: Schema): EVENT + abstract fun convertPayload(payloadMap: JsonNode): PAYLOAD + abstract fun fillPayload(payload: PAYLOAD, + beforeProps: Map?, + afterProps: Map?): PAYLOAD + + @Throws(IOException::class, JsonProcessingException::class) + override fun deserialize(parser: JsonParser, context: DeserializationContext): EVENT { + val root: JsonNode = parser.codec.readTree(parser) + val meta = JSONUtils.convertValue(root["meta"]) + val schema = JSONUtils.convertValue(root["schema"]) + val points = schema.properties.filterValues { it == "PointValue" }.keys + var payload = convertPayload(root["payload"]) + if (points.isNotEmpty()) { + val beforeProps = convertPoints(payload.before, points) + val afterProps = convertPoints(payload.after, points) + payload = fillPayload(payload, beforeProps, afterProps) + } + return createEvent(meta, payload, schema) + } + + private fun convertPoints( + recordChange: RecordChange?, + points: Set + ) = recordChange + ?.properties + ?.mapValues { + if (points.contains(it.key)) { + val pointMap = it.value as Map + when (pointMap["crs"]) { + "cartesian" -> Values.point( + 7203, + pointMap["x"].toString().toDouble(), + pointMap["y"].toString().toDouble() + ) + "cartesian-3d" -> Values.point( + 9157, + pointMap["x"].toString().toDouble(), + pointMap["y"].toString().toDouble(), + pointMap["z"].toString().toDouble() + ) + "wgs-84" -> Values.point( + 4326, + pointMap["longitude"].toString().toDouble(), + pointMap["latitude"].toString().toDouble() + ) + "wgs-84-3d" -> Values.point( + 4979, + pointMap["longitude"].toString().toDouble(), + pointMap["latitude"].toString().toDouble(), + pointMap["height"].toString().toDouble() + ) + else -> throw IllegalArgumentException("CRS value: ${pointMap["crs"]} not found") + } + } else { + it.value + } + } + +} + object JSONUtils { private val OBJECT_MAPPER: ObjectMapper = jacksonObjectMapper() @@ -106,10 +237,13 @@ object JSONUtils { module.addSerializer(PointValue::class.java, PointValueSerializer()) module.addSerializer(Node::class.java, DriverNodeSerializer()) module.addSerializer(Relationship::class.java, DriverRelationshipSerializer()) + module.addDeserializer(StreamsTransactionRelationshipEvent::class.java, StreamsTransactionRelationshipEventDeserializer()) + module.addDeserializer(StreamsTransactionNodeEvent::class.java, StreamsTransactionNodeEventDeserializer()) module.addSerializer(TemporalAccessor::class.java, TemporalAccessorSerializer()) OBJECT_MAPPER.registerModule(module) OBJECT_MAPPER.disable(SerializationFeature.FAIL_ON_EMPTY_BEANS) OBJECT_MAPPER.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES) + STRICT_OBJECT_MAPPER.registerModule(module) } fun getObjectMapper(): ObjectMapper = OBJECT_MAPPER diff --git a/consumer/src/test/kotlin/integrations/kafka/KafkaEventSinkCDCTSE.kt b/consumer/src/test/kotlin/integrations/kafka/KafkaEventSinkCDCTSE.kt new file mode 100644 index 00000000..576f3896 --- /dev/null +++ b/consumer/src/test/kotlin/integrations/kafka/KafkaEventSinkCDCTSE.kt @@ -0,0 +1,579 @@ +package integrations.kafka + +import kotlinx.coroutines.runBlocking +import org.apache.kafka.clients.producer.ProducerRecord +import org.hamcrest.Matchers +import org.junit.Test +import org.neo4j.function.ThrowingSupplier +import streams.Assert +import streams.events.Constraint +import streams.events.Meta +import streams.events.NodeChange +import streams.events.NodePayload +import streams.events.OperationType +import streams.events.RelationshipChange +import streams.events.RelationshipNodeChange +import streams.events.RelationshipPayload +import streams.events.Schema +import streams.events.StreamsConstraintType +import streams.events.StreamsTransactionEvent +import streams.extensions.execute +import streams.utils.JSONUtils +import streams.setConfig +import streams.start +import java.util.UUID +import java.util.concurrent.TimeUnit +import kotlin.test.assertEquals + +class KafkaEventSinkCDCTSE: KafkaEventSinkBaseTSE() { + + @Test + fun shouldWriteDataFromSinkWithCDCTopic() = runBlocking { + val topic = UUID.randomUUID().toString() + db.setConfig("streams.sink.topic.cdc.sourceId", topic) + db.setConfig("streams.sink.topic.cdc.sourceId.idName", "customIdN@me") + db.setConfig("streams.sink.topic.cdc.sourceId.labelName", "CustomLabelN@me") + db.start() + + val cdcDataStart = StreamsTransactionEvent( + meta = Meta(timestamp = System.currentTimeMillis(), + username = "user", + txId = 1, + txEventId = 0, + txEventsCount = 3, + operation = OperationType.created + ), + payload = NodePayload(id = "0", + before = null, + after = NodeChange(properties = mapOf("name" to "Andrea", "comp@ny" to "LARUS-BA"), labels = listOf("User")) + ), + schema = Schema() + ) + val cdcDataEnd = StreamsTransactionEvent( + meta = Meta(timestamp = System.currentTimeMillis(), + username = "user", + txId = 1, + txEventId = 1, + txEventsCount = 3, + operation = OperationType.created + ), + payload = NodePayload(id = "1", + before = null, + after = NodeChange(properties = mapOf("name" to "Michael", "comp@ny" to "Neo4j"), labels = listOf("User Ext")) + ), + schema = Schema() + ) + val cdcDataRelationship = StreamsTransactionEvent( + meta = Meta(timestamp = System.currentTimeMillis(), + username = "user", + txId = 1, + txEventId = 2, + txEventsCount = 3, + operation = OperationType.created + ), + payload = RelationshipPayload( + id = "3", + start = RelationshipNodeChange(id = "0", labels = listOf("User"), ids = emptyMap()), + end = RelationshipNodeChange(id = "1", labels = listOf("User Ext"), ids = emptyMap()), + after = RelationshipChange(properties = mapOf("since" to 2014)), + before = null, + label = "KNOWS WHO" + ), + schema = Schema() + ) + var producerRecord = ProducerRecord(topic, UUID.randomUUID().toString(), JSONUtils.writeValueAsBytes(cdcDataStart)) + kafkaProducer.send(producerRecord).get() + producerRecord = ProducerRecord(topic, UUID.randomUUID().toString(), JSONUtils.writeValueAsBytes(cdcDataEnd)) + kafkaProducer.send(producerRecord).get() + producerRecord = ProducerRecord(topic, UUID.randomUUID().toString(), JSONUtils.writeValueAsBytes(cdcDataRelationship)) + kafkaProducer.send(producerRecord).get() + + Assert.assertEventually(ThrowingSupplier { + val query = """ + |MATCH p = (s:User:`CustomLabelN@me`{name:'Andrea', `comp@ny`:'LARUS-BA', `customIdN@me`: '0'})-[r:`KNOWS WHO`{since:2014, `customIdN@me`: '3'}]->(e:`User Ext`:`CustomLabelN@me`{name:'Michael', `comp@ny`:'Neo4j', `customIdN@me`: '1'}) + |RETURN count(p) AS count""".trimMargin() + db.execute(query) { + val result = it.columnAs("count") + result.hasNext() && result.next() == 1L && !result.hasNext() + } + }, Matchers.equalTo(true), 30, TimeUnit.SECONDS) + + } + + @Test + fun shouldWriteDataFromSinkWithCDCSchemaTopic() = runBlocking { + val topic = UUID.randomUUID().toString() + db.setConfig("streams.sink.topic.cdc.schema", topic) + db.start() + + val constraints = listOf(Constraint(label = "User", type = StreamsConstraintType.UNIQUE, properties = setOf("name", "surname"))) + val relSchema = Schema(properties = mapOf("since" to "Long"), constraints = constraints) + val nodeSchema = Schema(properties = mapOf("name" to "String", "surname" to "String", "comp@ny" to "String"), + constraints = constraints) + val cdcDataStart = StreamsTransactionEvent( + meta = Meta(timestamp = System.currentTimeMillis(), + username = "user", + txId = 1, + txEventId = 0, + txEventsCount = 3, + operation = OperationType.created + ), + payload = NodePayload(id = "0", + before = null, + after = NodeChange(properties = mapOf("name" to "Andrea", "surname" to "Santurbano", "comp@ny" to "LARUS-BA"), labels = listOf("User")) + ), + schema = nodeSchema + ) + val cdcDataEnd = StreamsTransactionEvent( + meta = Meta(timestamp = System.currentTimeMillis(), + username = "user", + txId = 1, + txEventId = 1, + txEventsCount = 3, + operation = OperationType.created + ), + payload = NodePayload(id = "1", + before = null, + after = NodeChange(properties = mapOf("name" to "Michael", "surname" to "Hunger", "comp@ny" to "Neo4j"), labels = listOf("User")) + ), + schema = nodeSchema + ) + val cdcDataRelationship = StreamsTransactionEvent( + meta = Meta(timestamp = System.currentTimeMillis(), + username = "user", + txId = 1, + txEventId = 2, + txEventsCount = 3, + operation = OperationType.created + ), + payload = RelationshipPayload( + id = "2", + start = RelationshipNodeChange(id = "0", labels = listOf("User"), ids = mapOf("name" to "Andrea", "surname" to "Santurbano")), + end = RelationshipNodeChange(id = "1", labels = listOf("User"), ids = mapOf("name" to "Michael", "surname" to "Hunger")), + after = RelationshipChange(properties = mapOf("since" to 2014)), + before = null, + label = "KNOWS WHO" + ), + schema = relSchema + ) + var producerRecord = ProducerRecord(topic, UUID.randomUUID().toString(), JSONUtils.writeValueAsBytes(cdcDataStart)) + kafkaProducer.send(producerRecord).get() + producerRecord = ProducerRecord(topic, UUID.randomUUID().toString(), JSONUtils.writeValueAsBytes(cdcDataEnd)) + kafkaProducer.send(producerRecord).get() + producerRecord = ProducerRecord(topic, UUID.randomUUID().toString(), JSONUtils.writeValueAsBytes(cdcDataRelationship)) + kafkaProducer.send(producerRecord).get() + + Assert.assertEventually(ThrowingSupplier { + val query = """ + |MATCH p = (s:User{name:'Andrea', surname:'Santurbano', `comp@ny`:'LARUS-BA'})-[r:`KNOWS WHO`{since:2014}]->(e:User{name:'Michael', surname:'Hunger', `comp@ny`:'Neo4j'}) + |RETURN count(p) AS count + |""".trimMargin() + db.execute(query) { + val result = it.columnAs("count") + result.hasNext() && result.next() == 1L && !result.hasNext() + } + }, Matchers.equalTo(true), 30, TimeUnit.SECONDS) + } + + @Test + fun shouldWriteDataFromSinkWithCDCSchemaTopicAndPointValue() = runBlocking { + val topic = UUID.randomUUID().toString() + db.setConfig("streams.sink.topic.cdc.schema", topic) + db.start() + + val constraints = listOf(Constraint(label = "User", type = StreamsConstraintType.UNIQUE, properties = setOf("name", "surname"))) + val nodeSchema = Schema( + properties = mapOf( + "name" to "String", + "surname" to "String", + "comp@ny" to "String", + "bornIn" to "PointValue" + ), + constraints = constraints) + val relSchema = Schema(properties = mapOf( + "since" to "Long", + "where" to "PointValue" + ), constraints = constraints) + val cdcDataStart = StreamsTransactionEvent( + meta = Meta(timestamp = System.currentTimeMillis(), + username = "user", + txId = 1, + txEventId = 0, + txEventsCount = 3, + operation = OperationType.created + ), + payload = NodePayload( + id = "0", + before = null, + after = NodeChange( + properties = mapOf( + "name" to "Andrea", + "surname" to "Santurbano", + "comp@ny" to "LARUS-BA", + "bornIn" to mapOf( + "crs" to "wgs-84-3d", + "latitude" to 12.78, + "longitude" to 56.7, + "height" to 100.0, + ) + ), + labels = listOf("User") + ) + ), + schema = nodeSchema + ) + val cdcDataEnd = StreamsTransactionEvent( + meta = Meta(timestamp = System.currentTimeMillis(), + username = "user", + txId = 1, + txEventId = 1, + txEventsCount = 3, + operation = OperationType.created + ), + payload = NodePayload(id = "1", + before = null, + after = NodeChange(properties = mapOf("name" to "Michael", "surname" to "Hunger", "comp@ny" to "Neo4j"), labels = listOf("User")) + ), + schema = nodeSchema + ) + val cdcDataRelationship = StreamsTransactionEvent( + meta = Meta(timestamp = System.currentTimeMillis(), + username = "user", + txId = 1, + txEventId = 2, + txEventsCount = 3, + operation = OperationType.created + ), + payload = RelationshipPayload( + id = "2", + start = RelationshipNodeChange(id = "0", labels = listOf("User"), ids = mapOf("name" to "Andrea", "surname" to "Santurbano")), + end = RelationshipNodeChange(id = "1", labels = listOf("User"), ids = mapOf("name" to "Michael", "surname" to "Hunger")), + after = RelationshipChange(properties = mapOf( + "since" to 2014, + "where" to mapOf( + "crs" to "wgs-84-3d", + "latitude" to 12.78, + "longitude" to 56.7, + "height" to 80.0, + ) + )), + before = null, + label = "MEET" + ), + schema = relSchema + ) + var producerRecord = ProducerRecord(topic, UUID.randomUUID().toString(), JSONUtils.writeValueAsBytes(cdcDataStart)) + kafkaProducer.send(producerRecord).get() + producerRecord = ProducerRecord(topic, UUID.randomUUID().toString(), JSONUtils.writeValueAsBytes(cdcDataEnd)) + kafkaProducer.send(producerRecord).get() + producerRecord = ProducerRecord(topic, UUID.randomUUID().toString(), JSONUtils.writeValueAsBytes(cdcDataRelationship)) + kafkaProducer.send(producerRecord).get() + + Assert.assertEventually(ThrowingSupplier { + db.execute(""" + |MATCH (s:User{ + | name:'Andrea', + | surname:'Santurbano', + | `comp@ny`:'LARUS-BA', + | bornIn: point({x: 56.7, y: 12.78, z: 100.0, crs: 'wgs-84-3d'}) + |}) + |MATCH (t:User{name:'Michael', surname:'Hunger', `comp@ny`:'Neo4j'}) + |MATCH p = (s)-[r:MEET{since: 2014, where: point({x: 56.7, y: 12.78, z: 80.0, crs: 'wgs-84-3d'})}]->(t) + |RETURN count(p) AS count + |""".trimMargin()) { + val result = it.columnAs("count") + result.hasNext() && result.next() == 1L && !result.hasNext() + } + }, Matchers.equalTo(true), 30, TimeUnit.SECONDS) + } + + @Test + fun writeDataFromSinkWithCDCSchemaTopicMultipleConstraintsAndLabels() = runBlocking { + val topic = UUID.randomUUID().toString() + db.setConfig("streams.sink.topic.cdc.schema", topic) + db.start() + + val constraintsCharacter = listOf( + Constraint(label = "Character", type = StreamsConstraintType.UNIQUE, properties = setOf("surname")), + Constraint(label = "Character", type = StreamsConstraintType.UNIQUE, properties = setOf("name")), + Constraint(label = "Character", type = StreamsConstraintType.UNIQUE, properties = setOf("country", "address")), + ) + val constraintsWriter = listOf( + Constraint(label = "Writer", type = StreamsConstraintType.UNIQUE, properties = setOf("lastName")), + Constraint(label = "Writer", type = StreamsConstraintType.UNIQUE, properties = setOf("firstName")), + ) + val relSchema = Schema(properties = mapOf("since" to "Long"), constraints = constraintsCharacter.plus(constraintsWriter)) + val nodeSchemaCharacter = Schema(properties = mapOf("name" to "String", "surname" to "String", "country" to "String", "address" to "String"), constraints = constraintsCharacter) + val nodeSchemaWriter = Schema(properties = mapOf("firstName" to "String", "lastName" to "String"), constraints = constraintsWriter) + val cdcDataStart = StreamsTransactionEvent( + meta = Meta(timestamp = System.currentTimeMillis(), + username = "user", + txId = 1, + txEventId = 0, + txEventsCount = 3, + operation = OperationType.created + ), + payload = NodePayload(id = "0", + before = null, + after = NodeChange(properties = mapOf("name" to "Naruto", "surname" to "Uzumaki", "country" to "Japan", "address" to "Land of Leaf"), labels = listOf("Character")) + ), + schema = nodeSchemaCharacter + ) + val cdcDataEnd = StreamsTransactionEvent( + meta = Meta(timestamp = System.currentTimeMillis(), + username = "user", + txId = 1, + txEventId = 1, + txEventsCount = 3, + operation = OperationType.created + ), + payload = NodePayload(id = "1", + before = null, + after = NodeChange(properties = mapOf("firstName" to "Masashi", "lastName" to "Kishimoto", "address" to "Dunno"), labels = listOf("Writer")) + ), + schema = nodeSchemaWriter + ) + val cdcDataRelationship = StreamsTransactionEvent( + meta = Meta(timestamp = System.currentTimeMillis(), + username = "user", + txId = 1, + txEventId = 2, + txEventsCount = 3, + operation = OperationType.created + ), + payload = RelationshipPayload( + id = "2", + // leverage on first label alphabetically and, with the same name, the first ids alphabetically, that is name, so we take the 2 previously created nodes + start = RelationshipNodeChange(id = "0", labels = listOf("Character"), + ids = mapOf("name" to "Naruto", "surname" to "Osvaldo", "address" to "Land of Sand")), + end = RelationshipNodeChange(id = "1", labels = listOf("Writer"), + ids = mapOf("firstName" to "Masashi", "lastName" to "Franco")), + after = RelationshipChange(properties = mapOf("since" to 1999)), + before = null, + label = "HAS WRITTEN" + ), + schema = relSchema + ) + var producerRecord = ProducerRecord(topic, UUID.randomUUID().toString(), JSONUtils.writeValueAsBytes(cdcDataStart)) + kafkaProducer.send(producerRecord).get() + producerRecord = ProducerRecord(topic, UUID.randomUUID().toString(), JSONUtils.writeValueAsBytes(cdcDataEnd)) + kafkaProducer.send(producerRecord).get() + producerRecord = ProducerRecord(topic, UUID.randomUUID().toString(), JSONUtils.writeValueAsBytes(cdcDataRelationship)) + kafkaProducer.send(producerRecord).get() + + Assert.assertEventually(ThrowingSupplier { + val query = """ + |MATCH p = (s:Character)-[r:`HAS WRITTEN`{since: 1999}]->(e:Writer) + |RETURN count(p) AS count + |""".trimMargin() + db.execute(query) { + val result = it.columnAs("count") + result.hasNext() && result.next() == 1L && !result.hasNext() + } + }, Matchers.equalTo(true), 30, TimeUnit.SECONDS) + + val cypherCountNodes = "MATCH (n) RETURN count(n) AS count" + var countNodes = db.execute(cypherCountNodes) { it.columnAs("count").next() } + assertEquals(2L, countNodes) + + // another CDC data, not matching the previously created nodes + val cdcDataRelationshipNotMatched = StreamsTransactionEvent( + meta = Meta(timestamp = System.currentTimeMillis(), + username = "user", + txId = 1, + txEventId = 2, + txEventsCount = 3, + operation = OperationType.created + ), + payload = RelationshipPayload( + id = "2", + // leverage on first ids alphabetically, that is name, so create 2 additional nodes + start = RelationshipNodeChange(id = "1", labels = listOf("Character"), ids = mapOf("name" to "Invalid", "surname" to "Uzumaki")), + end = RelationshipNodeChange(id = "2", labels = listOf("Writer"), ids = mapOf("firstName" to "AnotherInvalid", "surname" to "Namikaze")), + after = RelationshipChange(properties = mapOf("since" to 2000)), + before = null, + label = "HAS WRITTEN" + ), + schema = relSchema + ) + + producerRecord = ProducerRecord(topic, UUID.randomUUID().toString(), JSONUtils.writeValueAsBytes(cdcDataRelationshipNotMatched)) + kafkaProducer.send(producerRecord).get() + + Assert.assertEventually(ThrowingSupplier { + val query = """ + |MATCH p = (s:Character)-[r:`HAS WRITTEN`{since:2000}]->(e:Writer) + |RETURN count(p) AS count + |""".trimMargin() + db.execute(query) { + val result = it.columnAs("count") + result.hasNext() && result.next() == 1L && !result.hasNext() + } + }, Matchers.equalTo(true), 30, TimeUnit.SECONDS) + + // create another node + countNodes = db.execute(cypherCountNodes) { it.columnAs("count").next() } + assertEquals(4L, countNodes) + } + + @Test + fun shouldWriteDataFromSinkWithCDCSchemaTopicWithMultipleConstraints() = runBlocking { + val topic = UUID.randomUUID().toString() + db.setConfig("streams.sink.topic.cdc.schema", topic) + db.start() + + val constraints = listOf( + Constraint(label = "User", type = StreamsConstraintType.UNIQUE, properties = setOf("name")), + Constraint(label = "User", type = StreamsConstraintType.UNIQUE, properties = setOf("country", "address")), + Constraint(label = "User", type = StreamsConstraintType.UNIQUE, properties = setOf("surname")), + ) + val relSchema = Schema(properties = mapOf("since" to "Long"), constraints = constraints) + val nodeSchema = Schema(properties = mapOf("name" to "String", "surname" to "String", "country" to "String", "address" to "String"), constraints = constraints) + val cdcDataStart = StreamsTransactionEvent( + meta = Meta(timestamp = System.currentTimeMillis(), + username = "user", + txId = 1, + txEventId = 0, + txEventsCount = 3, + operation = OperationType.created + ), + payload = NodePayload(id = "0", + before = null, + after = NodeChange(properties = mapOf("name" to "Naruto", "surname" to "Uzumaki", "country" to "Japan", "address" to "Land of Leaf"), labels = listOf("User")) + ), + schema = nodeSchema + ) + val cdcDataEnd = StreamsTransactionEvent( + meta = Meta(timestamp = System.currentTimeMillis(), + username = "user", + txId = 1, + txEventId = 1, + txEventsCount = 3, + operation = OperationType.created + ), + payload = NodePayload(id = "1", + before = null, + after = NodeChange(properties = mapOf("name" to "Minato", "surname" to "Namikaze", "country" to "Japan", "address" to "Land of Leaf"), labels = listOf("User")) + ), + schema = nodeSchema + ) + val cdcDataRelationship = StreamsTransactionEvent( + meta = Meta(timestamp = System.currentTimeMillis(), + username = "user", + txId = 1, + txEventId = 2, + txEventsCount = 3, + operation = OperationType.created + ), + payload = RelationshipPayload( + id = "2", + // leverage on first ids alphabetically, that is name, so we take the 2 previously created nodes + start = RelationshipNodeChange(id = "99", labels = listOf("User"), ids = mapOf("name" to "Naruto", "surname" to "Osvaldo", "address" to "Land of Sand")), + end = RelationshipNodeChange(id = "88", labels = listOf("User"), ids = mapOf("name" to "Minato", "surname" to "Franco", "address" to "Land of Fire")), + after = RelationshipChange(properties = mapOf("since" to 2014)), + before = null, + label = "KNOWS WHO" + ), + schema = relSchema + ) + var producerRecord = ProducerRecord(topic, UUID.randomUUID().toString(), JSONUtils.writeValueAsBytes(cdcDataStart)) + kafkaProducer.send(producerRecord).get() + producerRecord = ProducerRecord(topic, UUID.randomUUID().toString(), JSONUtils.writeValueAsBytes(cdcDataEnd)) + kafkaProducer.send(producerRecord).get() + producerRecord = ProducerRecord(topic, UUID.randomUUID().toString(), JSONUtils.writeValueAsBytes(cdcDataRelationship)) + kafkaProducer.send(producerRecord).get() + + Assert.assertEventually(ThrowingSupplier { + val query = """ + |MATCH p = (s:User)-[r:`KNOWS WHO` {since: 2014}]->(e:User) + |RETURN count(p) AS count + |""".trimMargin() + db.execute(query) { + val result = it.columnAs("count") + result.hasNext() && result.next() == 1L && !result.hasNext() + } + }, Matchers.equalTo(true), 30, TimeUnit.SECONDS) + + val cypherCountNodes = "MATCH (n) RETURN count(n) AS count" + var countNodes = db.execute(cypherCountNodes) { it.columnAs("count").next() } + assertEquals(2L, countNodes) + + // another CDC data, not matching the previously created nodes + val cdcDataRelationshipNotMatched = StreamsTransactionEvent( + meta = Meta(timestamp = System.currentTimeMillis(), + username = "user", + txId = 1, + txEventId = 2, + txEventsCount = 3, + operation = OperationType.created + ), + payload = RelationshipPayload( + id = "2", + // leverage on first ids alphabetically, that is name, so create 2 additional nodes + start = RelationshipNodeChange(id = "1", labels = listOf("User"), ids = mapOf("name" to "Invalid", "surname" to "Uzumaki")), + end = RelationshipNodeChange(id = "2", labels = listOf("User"), ids = mapOf("name" to "AnotherInvalid", "surname" to "Namikaze")), + after = RelationshipChange(properties = mapOf("since" to 2000)), + before = null, + label = "KNOWS ANOTHER" + ), + schema = relSchema + ) + + producerRecord = ProducerRecord(topic, UUID.randomUUID().toString(), JSONUtils.writeValueAsBytes(cdcDataRelationshipNotMatched)) + kafkaProducer.send(producerRecord).get() + + Assert.assertEventually(ThrowingSupplier { + val query = """ + |MATCH p = (s:User)-[r:`KNOWS ANOTHER` {since:2000}]->(e:User) + |RETURN count(p) AS count + |""".trimMargin() + db.execute(query) { + val result = it.columnAs("count") + result.hasNext() && result.next() == 1L && !result.hasNext() + } + }, Matchers.equalTo(true), 30, TimeUnit.SECONDS) + + // create another node + countNodes = db.execute(cypherCountNodes) { it.columnAs("count").next() } + assertEquals(4L, countNodes) + } + + @Test + fun shouldDeleteDataFromSinkWithCDCSchemaTopic() = runBlocking { + val topic = UUID.randomUUID().toString() + db.setConfig("streams.sink.topic.cdc.schema", topic) + db.start() + + db.execute("CREATE (s:User{name:'Andrea', surname:'Santurbano', `comp@ny`:'LARUS-BA'})-[r:`KNOWS WHO`{since:2014}]->(e:User{name:'Michael', surname:'Hunger', `comp@ny`:'Neo4j'})") + val nodeSchema = Schema(properties = mapOf("name" to "String", "surname" to "String", "comp@ny" to "String"), + constraints = listOf(Constraint(label = "User", type = StreamsConstraintType.UNIQUE, properties = setOf("name", "surname")))) + val cdcDataStart = StreamsTransactionEvent( + meta = Meta(timestamp = System.currentTimeMillis(), + username = "user", + txId = 1, + txEventId = 0, + txEventsCount = 3, + operation = OperationType.deleted + ), + payload = NodePayload(id = "0", + after = null, + before = NodeChange(properties = mapOf("name" to "Andrea", "surname" to "Santurbano", "comp@ny" to "LARUS-BA"), labels = listOf("User")) + ), + schema = nodeSchema + ) + + val producerRecord = ProducerRecord(topic, UUID.randomUUID().toString(), JSONUtils.writeValueAsBytes(cdcDataStart)) + kafkaProducer.send(producerRecord).get() + + Assert.assertEventually(ThrowingSupplier { + val query = """ + |MATCH p = (s:User{name:'Andrea', surname:'Santurbano', `comp@ny`:'LARUS-BA'})-[r:`KNOWS WHO`{since:2014}]->(e:User{name:'Michael', surname:'Hunger', `comp@ny`:'Neo4j'}) + |RETURN count(p) AS count + |""".trimMargin() + db.execute(query) { + val result = it.columnAs("count") + result.hasNext() && result.next() == 0L && !result.hasNext() + } + }, Matchers.equalTo(true), 30, TimeUnit.SECONDS) + } +} \ No newline at end of file diff --git a/kafka-connect-neo4j/src/main/kotlin/streams/kafka/connect/sink/Neo4jSinkTask.kt b/kafka-connect-neo4j/src/main/kotlin/streams/kafka/connect/sink/Neo4jSinkTask.kt index a3c34b73..795f059e 100644 --- a/kafka-connect-neo4j/src/main/kotlin/streams/kafka/connect/sink/Neo4jSinkTask.kt +++ b/kafka-connect-neo4j/src/main/kotlin/streams/kafka/connect/sink/Neo4jSinkTask.kt @@ -42,6 +42,7 @@ class Neo4jSinkTask : SinkTask() { neo4jSinkService.writeData(data) } catch(e:Exception) { + e.printStackTrace() errorService.report(collection.map { ErrorData(it.topic(), it.timestamp(), it.key(), it.value(), it.kafkaPartition(), it.kafkaOffset(), this::class.java, this.config.database, e) }) diff --git a/kafka-connect-neo4j/src/test/kotlin/streams/kafka/connect/sink/Neo4jSinkTaskTest.kt b/kafka-connect-neo4j/src/test/kotlin/streams/kafka/connect/sink/Neo4jSinkTaskTest.kt index bccf9f84..02b21c77 100644 --- a/kafka-connect-neo4j/src/test/kotlin/streams/kafka/connect/sink/Neo4jSinkTaskTest.kt +++ b/kafka-connect-neo4j/src/test/kotlin/streams/kafka/connect/sink/Neo4jSinkTaskTest.kt @@ -329,9 +329,23 @@ class Neo4jSinkTaskTest { SinkTask.TOPICS_CONFIG to firstTopic) val constraints = listOf(Constraint(label = "User", type = StreamsConstraintType.UNIQUE, properties = setOf("name", "surname"))) - val relSchema = Schema(properties = mapOf("since" to "Long"), constraints = constraints) - val nodeSchema = Schema(properties = mapOf("name" to "String", "surname" to "String", "comp@ny" to "String"), - constraints = constraints) + val relSchema = Schema(properties = mapOf( + "since" to "Long", + "where" to "PointValue" + ), constraints = constraints) + val nodeSchema = Schema( + properties = mapOf( + "name" to "String", + "surname" to "String", + "comp@ny" to "String", + "bornIn2d" to "PointValue", + "bornIn3d" to "PointValue", + "livesIn2d" to "PointValue", + "livesIn3d" to "PointValue", + "worksIn2d" to "PointValue", + "worksIn3d" to "PointValue" + ), + constraints = constraints) val cdcDataStart = StreamsTransactionEvent( meta = Meta(timestamp = System.currentTimeMillis(), username = "user", @@ -342,7 +356,47 @@ class Neo4jSinkTaskTest { ), payload = NodePayload(id = "0", before = null, - after = NodeChange(properties = mapOf("name" to "Andrea", "surname" to "Santurbano", "comp@ny" to "LARUS-BA"), labels = listOf("User")) + after = NodeChange( + properties = mapOf( + "name" to "Andrea", + "surname" to "Santurbano", + "comp@ny" to "LARUS-BA", + "bornIn3d" to mapOf( + "crs" to "wgs-84-3d", + "latitude" to 12.78, + "longitude" to 56.7, + "height" to 100.0, + ), + "bornIn2d" to mapOf( + "crs" to "wgs-84", + "latitude" to 12.78, + "longitude" to 56.7 + ), + "livesIn3d" to mapOf( + "crs" to "wgs-84-3d", + "latitude" to 12.79, + "longitude" to 56.71, + "height" to 100.0, + ), + "livesIn2d" to mapOf( + "crs" to "wgs-84", + "latitude" to 12.79, + "longitude" to 56.71 + ), + "worksIn2d" to mapOf( + "crs" to "cartesian", + "x" to 1.2, + "y" to 10.1 + ), + "worksIn3d" to mapOf( + "crs" to "cartesian-3d", + "x" to 1.2, + "y" to 10.1, + "z" to 7.1 + ) + ), + labels = listOf("User") + ) ), schema = nodeSchema ) @@ -369,12 +423,20 @@ class Neo4jSinkTaskTest { operation = OperationType.created ), payload = RelationshipPayload( - id = "2", - start = RelationshipNodeChange(id = "0", labels = listOf("User"), ids = mapOf("name" to "Andrea", "surname" to "Santurbano")), - end = RelationshipNodeChange(id = "1", labels = listOf("User"), ids = mapOf("name" to "Michael", "surname" to "Hunger")), - after = RelationshipChange(properties = mapOf("since" to 2014)), - before = null, - label = "KNOWS WHO" + id = "2", + start = RelationshipNodeChange(id = "0", labels = listOf("User"), ids = mapOf("name" to "Andrea", "surname" to "Santurbano")), + end = RelationshipNodeChange(id = "1", labels = listOf("User"), ids = mapOf("name" to "Michael", "surname" to "Hunger")), + after = RelationshipChange(properties = mapOf( + "since" to 2014, + "where" to mapOf( + "crs" to "wgs-84-3d", + "latitude" to 12.78, + "longitude" to 56.7, + "height" to 80.0, + ) + )), + before = null, + label = "MEET" ), schema = relSchema ) @@ -386,7 +448,19 @@ class Neo4jSinkTaskTest { task.put(input) session.beginTransaction().use { val query = """ - |MATCH p = (s:User{name:'Andrea', surname:'Santurbano', `comp@ny`:'LARUS-BA'})-[r:`KNOWS WHO`{since:2014}]->(e:User{name:'Michael', surname:'Hunger', `comp@ny`:'Neo4j'}) + |MATCH (s:User{ + | name:'Andrea', + | surname:'Santurbano', + | `comp@ny`:'LARUS-BA', + | bornIn3d: point({x: 56.7, y: 12.78, z: 100.0, crs: 'wgs-84-3d'}), + | bornIn2d: point({x: 56.7, y: 12.78, crs: 'wgs-84'}), + | livesIn3d: point({longitude: 56.71, latitude: 12.79, height: 100}), + | livesIn2d: point({longitude: 56.71, latitude: 12.79}), + | worksIn2d: point({x: 1.2, y: 10.1, crs: 'cartesian'}), + | worksIn3d: point({x: 1.2, y: 10.1, z: 7.1, crs: 'cartesian-3d'}) + |}) + |MATCH (t:User{name:'Michael', surname:'Hunger', `comp@ny`:'Neo4j'}) + |MATCH p = (s)-[r:MEET{since: 2014, where: point({x: 56.7, y: 12.78, z: 80.0, crs: 'wgs-84-3d'})}]->(t) |RETURN count(p) AS count |""".trimMargin() val result = it.run(query)