diff --git a/common/src/main/kotlin/streams/events/StreamsEvent.kt b/common/src/main/kotlin/streams/events/StreamsEvent.kt index 7b0e7f8e..6fd2deda 100644 --- a/common/src/main/kotlin/streams/events/StreamsEvent.kt +++ b/common/src/main/kotlin/streams/events/StreamsEvent.kt @@ -1,7 +1,5 @@ package streams.events -import org.neo4j.graphdb.schema.ConstraintType - enum class OperationType { created, updated, deleted } data class Meta(val timestamp: Long, diff --git a/common/src/main/kotlin/streams/utils/JSONUtils.kt b/common/src/main/kotlin/streams/utils/JSONUtils.kt index d6c1c41c..67933c77 100644 --- a/common/src/main/kotlin/streams/utils/JSONUtils.kt +++ b/common/src/main/kotlin/streams/utils/JSONUtils.kt @@ -2,16 +2,38 @@ package streams.utils import com.fasterxml.jackson.core.JsonGenerator import com.fasterxml.jackson.core.JsonParseException +import com.fasterxml.jackson.core.JsonParser import com.fasterxml.jackson.core.JsonProcessingException -import com.fasterxml.jackson.databind.* +import com.fasterxml.jackson.databind.DeserializationContext +import com.fasterxml.jackson.databind.DeserializationFeature +import com.fasterxml.jackson.databind.JsonDeserializer +import com.fasterxml.jackson.databind.JsonNode +import com.fasterxml.jackson.databind.JsonSerializer +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.databind.SerializationFeature +import com.fasterxml.jackson.databind.SerializerProvider import com.fasterxml.jackson.databind.module.SimpleModule import com.fasterxml.jackson.module.kotlin.convertValue import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper import com.fasterxml.jackson.module.kotlin.readValue import org.neo4j.driver.internal.value.PointValue +import org.neo4j.function.ThrowingBiConsumer import org.neo4j.graphdb.spatial.Point +import org.neo4j.values.AnyValue import org.neo4j.values.storable.CoordinateReferenceSystem -import streams.events.* +import org.neo4j.values.storable.Values +import org.neo4j.values.virtual.MapValue +import org.neo4j.values.virtual.MapValueBuilder +import streams.events.EntityType +import streams.events.Meta +import streams.events.NodePayload +import streams.events.Payload +import streams.events.RecordChange +import streams.events.RelationshipPayload +import streams.events.Schema +import streams.events.StreamsTransactionEvent +import streams.events.StreamsTransactionNodeEvent +import streams.events.StreamsTransactionRelationshipEvent import streams.extensions.asStreamsMap import java.io.IOException import java.time.temporal.TemporalAccessor @@ -33,6 +55,13 @@ fun Point.toStreamsPoint(): StreamsPoint { } } +fun Map.toMapValue(): MapValue { + val map = this + val builder = MapValueBuilder() + map.forEach { (t, u) -> builder.add(t, Values.of(u) ) } + return builder.build() +} + fun PointValue.toStreamsPoint(): StreamsPoint { val point = this.asPoint() return when (val crsType = point.srid()) { @@ -121,6 +150,100 @@ class DriverRelationshipSerializer : JsonSerializer() { + override fun createEvent(meta: Meta, payload: RelationshipPayload, schema: Schema): StreamsTransactionRelationshipEvent { + return StreamsTransactionRelationshipEvent(meta, payload, schema) + } + + override fun convertPayload(payloadMap: JsonNode): RelationshipPayload { + return JSONUtils.convertValue(payloadMap) + } + + override fun fillPayload(payload: RelationshipPayload, + beforeProps: Map?, + afterProps: Map?): RelationshipPayload { + return payload.copy( + before = payload.before?.copy(properties = beforeProps), + after = payload.after?.copy(properties = afterProps) + ) + } + + override fun deserialize(parser: JsonParser, context: DeserializationContext): StreamsTransactionRelationshipEvent { + val deserialized = super.deserialize(parser, context) + if (deserialized.payload.type == EntityType.node) { + throw IllegalArgumentException("Relationship event expected, but node type found") + } + return deserialized + } + +} + +class StreamsTransactionNodeEventDeserializer : StreamsTransactionEventDeserializer() { + override fun createEvent(meta: Meta, payload: NodePayload, schema: Schema): StreamsTransactionNodeEvent { + return StreamsTransactionNodeEvent(meta, payload, schema) + } + + override fun convertPayload(payloadMap: JsonNode): NodePayload { + return JSONUtils.convertValue(payloadMap) + } + + override fun fillPayload(payload: NodePayload, + beforeProps: Map?, + afterProps: Map?): NodePayload { + return payload.copy( + before = payload.before?.copy(properties = beforeProps), + after = payload.after?.copy(properties = afterProps) + ) + } + + override fun deserialize(parser: JsonParser, context: DeserializationContext): StreamsTransactionNodeEvent { + val deserialized = super.deserialize(parser, context) + if (deserialized.payload.type == EntityType.relationship) { + throw IllegalArgumentException("Node event expected, but relationship type found") + } + return deserialized + } + +} + +abstract class StreamsTransactionEventDeserializer : JsonDeserializer() { + + abstract fun createEvent(meta: Meta, payload: PAYLOAD, schema: Schema): EVENT + abstract fun convertPayload(payloadMap: JsonNode): PAYLOAD + abstract fun fillPayload(payload: PAYLOAD, + beforeProps: Map?, + afterProps: Map?): PAYLOAD + + @Throws(IOException::class, JsonProcessingException::class) + override fun deserialize(parser: JsonParser, context: DeserializationContext): EVENT { + val root: JsonNode = parser.codec.readTree(parser) + val meta = JSONUtils.convertValue(root["meta"]) + val schema = JSONUtils.convertValue(root["schema"]) + val points = schema.properties.filterValues { it == "PointValue" }.keys + var payload = convertPayload(root["payload"]) + if (points.isNotEmpty()) { + val beforeProps = convertPoints(payload.before, points) + val afterProps = convertPoints(payload.after, points) + payload = fillPayload(payload, beforeProps, afterProps) + } + return createEvent(meta, payload, schema) + } + + private fun convertPoints( + recordChange: RecordChange?, + points: Set + ) = recordChange + ?.properties + ?.mapValues { + if (points.contains(it.key)) { + org.neo4j.values.storable.PointValue.fromMap((it.value as Map).toMapValue()) + } else { + it.value + } + } + +} + object JSONUtils { private val OBJECT_MAPPER: ObjectMapper = jacksonObjectMapper() @@ -133,6 +256,8 @@ object JSONUtils { StreamsUtils.ignoreExceptions({ module.addSerializer(org.neo4j.driver.types.Point::class.java, DriverPointSerializer()) }, NoClassDefFoundError::class.java) // in case is loaded from StreamsUtils.ignoreExceptions({ module.addSerializer(org.neo4j.driver.types.Node::class.java, DriverNodeSerializer()) }, NoClassDefFoundError::class.java) // in case is loaded from StreamsUtils.ignoreExceptions({ module.addSerializer(org.neo4j.driver.types.Relationship::class.java, DriverRelationshipSerializer()) }, NoClassDefFoundError::class.java) // in case is loaded from + StreamsUtils.ignoreExceptions({ module.addDeserializer(StreamsTransactionRelationshipEvent::class.java, StreamsTransactionRelationshipEventDeserializer()) }, NoClassDefFoundError::class.java) // in case is loaded from + StreamsUtils.ignoreExceptions({ module.addDeserializer(StreamsTransactionNodeEvent::class.java, StreamsTransactionNodeEventDeserializer()) }, NoClassDefFoundError::class.java) // in case is loaded from module.addSerializer(TemporalAccessor::class.java, TemporalAccessorSerializer()) OBJECT_MAPPER.registerModule(module) OBJECT_MAPPER.disable(SerializationFeature.FAIL_ON_EMPTY_BEANS) diff --git a/consumer/src/test/kotlin/integrations/kafka/KafkaEventSinkCDCTSE.kt b/consumer/src/test/kotlin/integrations/kafka/KafkaEventSinkCDCTSE.kt index c69697b5..e6465e7b 100644 --- a/consumer/src/test/kotlin/integrations/kafka/KafkaEventSinkCDCTSE.kt +++ b/consumer/src/test/kotlin/integrations/kafka/KafkaEventSinkCDCTSE.kt @@ -175,6 +175,156 @@ class KafkaEventSinkCDCTSE: KafkaEventSinkBaseTSE() { }, Matchers.equalTo(true), 30, TimeUnit.SECONDS) } + @Test + fun shouldWriteDataFromSinkWithCDCSchemaTopicAndPointValue() = runBlocking { + val topic = UUID.randomUUID().toString() + db.setConfig("streams.sink.topic.cdc.schema", topic) + db.start() + + val constraints = listOf(Constraint(label = "User", type = StreamsConstraintType.UNIQUE, properties = setOf("name", "surname"))) + val nodeSchema = Schema( + properties = mapOf( + "name" to "String", + "surname" to "String", + "comp@ny" to "String", + "bornIn2d" to "PointValue", + "bornIn3d" to "PointValue", + "livesIn2d" to "PointValue", + "livesIn3d" to "PointValue", + "worksIn2d" to "PointValue", + "worksIn3d" to "PointValue" + ), + constraints = constraints) + val relSchema = Schema(properties = mapOf( + "since" to "Long", + "where" to "PointValue" + ), constraints = constraints) + val cdcDataStart = StreamsTransactionEvent( + meta = Meta(timestamp = System.currentTimeMillis(), + username = "user", + txId = 1, + txEventId = 0, + txEventsCount = 3, + operation = OperationType.created + ), + payload = NodePayload( + id = "0", + before = null, + after = NodeChange( + properties = mapOf( + "name" to "Andrea", + "surname" to "Santurbano", + "comp@ny" to "LARUS-BA", + "bornIn3d" to mapOf( + "crs" to "wgs-84-3d", + "latitude" to 12.78, + "longitude" to 56.7, + "height" to 100.0, + ), + "bornIn2d" to mapOf( + "crs" to "wgs-84", + "latitude" to 12.78, + "longitude" to 56.7 + ), + "livesIn3d" to mapOf( + "crs" to "wgs-84-3d", + "latitude" to 12.79, + "longitude" to 56.71, + "height" to 100.0, + ), + "livesIn2d" to mapOf( + "crs" to "wgs-84", + "latitude" to 12.79, + "longitude" to 56.71 + ), + "worksIn2d" to mapOf( + "crs" to "cartesian", + "x" to 1.2, + "y" to 10.1 + ), + "worksIn3d" to mapOf( + "crs" to "cartesian-3d", + "x" to 1.2, + "y" to 10.1, + "z" to 7.1 + ) + ), + labels = listOf("User") + ) + ), + schema = nodeSchema + ) + val cdcDataEnd = StreamsTransactionEvent( + meta = Meta(timestamp = System.currentTimeMillis(), + username = "user", + txId = 1, + txEventId = 1, + txEventsCount = 3, + operation = OperationType.created + ), + payload = NodePayload(id = "1", + before = null, + after = NodeChange(properties = mapOf("name" to "Michael", "surname" to "Hunger", "comp@ny" to "Neo4j"), labels = listOf("User")) + ), + schema = nodeSchema + ) + val cdcDataRelationship = StreamsTransactionEvent( + meta = Meta(timestamp = System.currentTimeMillis(), + username = "user", + txId = 1, + txEventId = 2, + txEventsCount = 3, + operation = OperationType.created + ), + payload = RelationshipPayload( + id = "2", + start = RelationshipNodeChange(id = "0", labels = listOf("User"), ids = mapOf("name" to "Andrea", "surname" to "Santurbano")), + end = RelationshipNodeChange(id = "1", labels = listOf("User"), ids = mapOf("name" to "Michael", "surname" to "Hunger")), + after = RelationshipChange(properties = mapOf( + "since" to 2014, + "where" to mapOf( + "crs" to "wgs-84-3d", + "latitude" to 12.78, + "longitude" to 56.7, + "height" to 80.0, + ) + )), + before = null, + label = "MEET" + ), + schema = relSchema + ) + var producerRecord = ProducerRecord(topic, UUID.randomUUID().toString(), JSONUtils.writeValueAsBytes(cdcDataStart)) + kafkaProducer.send(producerRecord).get() + producerRecord = ProducerRecord(topic, UUID.randomUUID().toString(), JSONUtils.writeValueAsBytes(cdcDataEnd)) + kafkaProducer.send(producerRecord).get() + producerRecord = ProducerRecord(topic, UUID.randomUUID().toString(), JSONUtils.writeValueAsBytes(cdcDataRelationship)) + kafkaProducer.send(producerRecord).get() + + Assert.assertEventually(ThrowingSupplier { + val query = """ + |MATCH (s:User{ + | name:'Andrea', + | surname:'Santurbano', + | `comp@ny`:'LARUS-BA', + | bornIn3d: point({x: 56.7, y: 12.78, z: 100.0, crs: 'wgs-84-3d'}), + | bornIn2d: point({x: 56.7, y: 12.78, crs: 'wgs-84'}), + | livesIn3d: point({longitude: 56.71, latitude: 12.79, height: 100}), + | livesIn2d: point({longitude: 56.71, latitude: 12.79}), + | worksIn2d: point({x: 1.2, y: 10.1, crs: 'cartesian'}), + | worksIn3d: point({x: 1.2, y: 10.1, z: 7.1, crs: 'cartesian-3d'}) + |}) + |MATCH (t:User{name:'Michael', surname:'Hunger', `comp@ny`:'Neo4j'}) + |MATCH p = (s)-[r:MEET{since: 2014, where: point({x: 56.7, y: 12.78, z: 80.0, crs: 'wgs-84-3d'})}]->(t) + |RETURN count(p) AS count + |""".trimMargin() + db.execute(query) { + val result = it.columnAs("count") + result.hasNext() && result.next() == 1L && !result.hasNext() + } + }, Matchers.equalTo(true), 30, TimeUnit.SECONDS) + } + @Test fun writeDataFromSinkWithCDCSchemaTopicMultipleConstraintsAndLabels() = runBlocking { val topic = UUID.randomUUID().toString() diff --git a/pom.xml b/pom.xml index 10be1f94..d4e7bb90 100644 --- a/pom.xml +++ b/pom.xml @@ -43,7 +43,7 @@ common test-support - kafka-connect-neo4j + producer consumer distribution diff --git a/producer/src/test/kotlin/streams/integrations/KafkaEventRouterSimpleTSE.kt b/producer/src/test/kotlin/streams/integrations/KafkaEventRouterSimpleTSE.kt index dc2f1ea1..37547918 100644 --- a/producer/src/test/kotlin/streams/integrations/KafkaEventRouterSimpleTSE.kt +++ b/producer/src/test/kotlin/streams/integrations/KafkaEventRouterSimpleTSE.kt @@ -5,6 +5,8 @@ import kotlinx.coroutines.runBlocking import org.hamcrest.Matchers import org.junit.Test import org.neo4j.function.ThrowingSupplier +import org.neo4j.values.storable.CoordinateReferenceSystem +import org.neo4j.values.storable.PointValue import streams.Assert import streams.events.EntityType import streams.events.NodeChange @@ -17,6 +19,7 @@ import streams.KafkaTestUtils import streams.utils.JSONUtils import streams.setConfig import streams.start +import streams.utils.toMapValue import java.util.* import java.util.concurrent.TimeUnit import kotlin.test.assertEquals @@ -44,6 +47,36 @@ class KafkaEventRouterSimpleTSE: KafkaEventRouterBaseTSE() { }) } + @Test + fun testCreateNodeWithPointValue() { + db.start() + kafkaConsumer.subscribe(listOf("neo4j")) + db.execute("CREATE (:Person {name:'John Doe', age:42, bornIn: point({longitude: 12.78, latitude: 56.7, height: 100})})") + val records = kafkaConsumer.poll(5000) + assertEquals(1, records.count()) + assertEquals(true, records.all { + JSONUtils.asStreamsTransactionEvent(it.value()).let { + val after = it.payload.after as NodeChange + val labels = after.labels + val propertiesAfter = after.properties + val expectedProperties = mapOf( + "name" to "John Doe", + "age" to 42, + "bornIn" to PointValue.fromMap(mapOf( + "crs" to "wgs-84-3d", + "latitude" to 12.78, + "longitude" to 56.7, + "height" to 100.0, + ).toMapValue()) + ) + labels == listOf("Person") && propertiesAfter == expectedProperties + && it.meta.operation == OperationType.created + && it.schema.properties == mapOf("name" to "String", "age" to "Long", "bornIn" to "PointValue") + && it.schema.constraints.isEmpty() + } + }) + } + @Test fun testCreateRelationshipWithRelRouting() { db.setConfig("streams.source.topic.relationships.knows", "KNOWS{*}").start()