diff --git a/common/src/main/kotlin/streams/utils/JSONUtils.kt b/common/src/main/kotlin/streams/utils/JSONUtils.kt index 5e0411b7..52a98ed1 100644 --- a/common/src/main/kotlin/streams/utils/JSONUtils.kt +++ b/common/src/main/kotlin/streams/utils/JSONUtils.kt @@ -2,8 +2,12 @@ package streams.utils import com.fasterxml.jackson.core.JsonGenerator import com.fasterxml.jackson.core.JsonParseException +import com.fasterxml.jackson.core.JsonParser import com.fasterxml.jackson.core.JsonProcessingException +import com.fasterxml.jackson.databind.DeserializationContext import com.fasterxml.jackson.databind.DeserializationFeature +import com.fasterxml.jackson.databind.JsonDeserializer +import com.fasterxml.jackson.databind.JsonNode import com.fasterxml.jackson.databind.JsonSerializer import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.databind.SerializationFeature @@ -13,10 +17,18 @@ import com.fasterxml.jackson.module.kotlin.convertValue import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper import com.fasterxml.jackson.module.kotlin.readValue import org.neo4j.driver.Value +import org.neo4j.driver.Values import org.neo4j.driver.internal.value.PointValue import org.neo4j.driver.types.Node import org.neo4j.driver.types.Point import org.neo4j.driver.types.Relationship +import streams.events.EntityType +import streams.events.Meta +import streams.events.NodePayload +import streams.events.Payload +import streams.events.RecordChange +import streams.events.RelationshipPayload +import streams.events.Schema import streams.events.StreamsTransactionEvent import streams.events.StreamsTransactionNodeEvent import streams.events.StreamsTransactionRelationshipEvent @@ -94,6 +106,125 @@ class DriverRelationshipSerializer : JsonSerializer() { } } +class StreamsTransactionRelationshipEventDeserializer : StreamsTransactionEventDeserializer() { + override fun createEvent(meta: Meta, payload: RelationshipPayload, schema: Schema): StreamsTransactionRelationshipEvent { + return StreamsTransactionRelationshipEvent(meta, payload, schema) + } + + override fun convertPayload(payloadMap: JsonNode): RelationshipPayload { + return JSONUtils.convertValue(payloadMap) + } + + override fun fillPayload(payload: RelationshipPayload, + beforeProps: Map?, + afterProps: Map?): RelationshipPayload { + return payload.copy( + before = payload.before?.copy(properties = beforeProps), + after = payload.after?.copy(properties = afterProps) + ) + } + + override fun deserialize(parser: JsonParser, context: DeserializationContext): StreamsTransactionRelationshipEvent { + val deserialized = super.deserialize(parser, context) + if (deserialized.payload.type == EntityType.node) { + throw IllegalArgumentException("Relationship event expected, but node type found") + } + return deserialized + } + +} + +class StreamsTransactionNodeEventDeserializer : StreamsTransactionEventDeserializer() { + override fun createEvent(meta: Meta, payload: NodePayload, schema: Schema): StreamsTransactionNodeEvent { + return StreamsTransactionNodeEvent(meta, payload, schema) + } + + override fun convertPayload(payloadMap: JsonNode): NodePayload { + return JSONUtils.convertValue(payloadMap) + } + + override fun fillPayload(payload: NodePayload, + beforeProps: Map?, + afterProps: Map?): NodePayload { + return payload.copy( + before = payload.before?.copy(properties = beforeProps), + after = payload.after?.copy(properties = afterProps) + ) + } + + override fun deserialize(parser: JsonParser, context: DeserializationContext): StreamsTransactionNodeEvent { + val deserialized = super.deserialize(parser, context) + if (deserialized.payload.type == EntityType.relationship) { + throw IllegalArgumentException("Node event expected, but relationship type found") + } + return deserialized + } + +} + +abstract class StreamsTransactionEventDeserializer : JsonDeserializer() { + + abstract fun createEvent(meta: Meta, payload: PAYLOAD, schema: Schema): EVENT + abstract fun convertPayload(payloadMap: JsonNode): PAYLOAD + abstract fun fillPayload(payload: PAYLOAD, + beforeProps: Map?, + afterProps: Map?): PAYLOAD + + @Throws(IOException::class, JsonProcessingException::class) + override fun deserialize(parser: JsonParser, context: DeserializationContext): EVENT { + val root: JsonNode = parser.codec.readTree(parser) + val meta = JSONUtils.convertValue(root["meta"]) + val schema = JSONUtils.convertValue(root["schema"]) + val points = schema.properties.filterValues { it == "PointValue" }.keys + var payload = convertPayload(root["payload"]) + if (points.isNotEmpty()) { + val beforeProps = convertPoints(payload.before, points) + val afterProps = convertPoints(payload.after, points) + payload = fillPayload(payload, beforeProps, afterProps) + } + return createEvent(meta, payload, schema) + } + + private fun convertPoints( + recordChange: RecordChange?, + points: Set + ) = recordChange + ?.properties + ?.mapValues { + if (points.contains(it.key)) { + val pointMap = it.value as Map + when (pointMap["crs"]) { + "cartesian" -> Values.point( + 7203, + pointMap["x"].toString().toDouble(), + pointMap["y"].toString().toDouble() + ) + "cartesian-3d" -> Values.point( + 9157, + pointMap["x"].toString().toDouble(), + pointMap["y"].toString().toDouble(), + pointMap["z"].toString().toDouble() + ) + "wgs-84" -> Values.point( + 4326, + pointMap["longitude"].toString().toDouble(), + pointMap["latitude"].toString().toDouble() + ) + "wgs-84-3d" -> Values.point( + 4979, + pointMap["longitude"].toString().toDouble(), + pointMap["latitude"].toString().toDouble(), + pointMap["height"].toString().toDouble() + ) + else -> throw IllegalArgumentException("CRS value: ${pointMap["crs"]} not found") + } + } else { + it.value + } + } + +} + object JSONUtils { private val OBJECT_MAPPER: ObjectMapper = jacksonObjectMapper() @@ -106,10 +237,13 @@ object JSONUtils { module.addSerializer(PointValue::class.java, PointValueSerializer()) module.addSerializer(Node::class.java, DriverNodeSerializer()) module.addSerializer(Relationship::class.java, DriverRelationshipSerializer()) + module.addDeserializer(StreamsTransactionRelationshipEvent::class.java, StreamsTransactionRelationshipEventDeserializer()) + module.addDeserializer(StreamsTransactionNodeEvent::class.java, StreamsTransactionNodeEventDeserializer()) module.addSerializer(TemporalAccessor::class.java, TemporalAccessorSerializer()) OBJECT_MAPPER.registerModule(module) OBJECT_MAPPER.disable(SerializationFeature.FAIL_ON_EMPTY_BEANS) OBJECT_MAPPER.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES) + STRICT_OBJECT_MAPPER.registerModule(module) } fun getObjectMapper(): ObjectMapper = OBJECT_MAPPER diff --git a/kafka-connect-neo4j/src/test/kotlin/streams/kafka/connect/sink/Neo4jSinkTaskTest.kt b/kafka-connect-neo4j/src/test/kotlin/streams/kafka/connect/sink/Neo4jSinkTaskTest.kt index 543801f5..a0420250 100644 --- a/kafka-connect-neo4j/src/test/kotlin/streams/kafka/connect/sink/Neo4jSinkTaskTest.kt +++ b/kafka-connect-neo4j/src/test/kotlin/streams/kafka/connect/sink/Neo4jSinkTaskTest.kt @@ -331,9 +331,23 @@ class Neo4jSinkTaskTest { SinkTask.TOPICS_CONFIG to firstTopic) val constraints = listOf(Constraint(label = "User", type = StreamsConstraintType.UNIQUE, properties = setOf("name", "surname"))) - val relSchema = Schema(properties = mapOf("since" to "Long"), constraints = constraints) - val nodeSchema = Schema(properties = mapOf("name" to "String", "surname" to "String", "comp@ny" to "String"), - constraints = constraints) + val relSchema = Schema(properties = mapOf( + "since" to "Long", + "where" to "PointValue" + ), constraints = constraints) + val nodeSchema = Schema( + properties = mapOf( + "name" to "String", + "surname" to "String", + "comp@ny" to "String", + "bornIn2d" to "PointValue", + "bornIn3d" to "PointValue", + "livesIn2d" to "PointValue", + "livesIn3d" to "PointValue", + "worksIn2d" to "PointValue", + "worksIn3d" to "PointValue" + ), + constraints = constraints) val cdcDataStart = StreamsTransactionEvent( meta = Meta(timestamp = System.currentTimeMillis(), username = "user", @@ -344,7 +358,47 @@ class Neo4jSinkTaskTest { ), payload = NodePayload(id = "0", before = null, - after = NodeChange(properties = mapOf("name" to "Andrea", "surname" to "Santurbano", "comp@ny" to "LARUS-BA"), labels = listOf("User")) + after = NodeChange( + properties = mapOf( + "name" to "Andrea", + "surname" to "Santurbano", + "comp@ny" to "LARUS-BA", + "bornIn3d" to mapOf( + "crs" to "wgs-84-3d", + "latitude" to 12.78, + "longitude" to 56.7, + "height" to 100.0, + ), + "bornIn2d" to mapOf( + "crs" to "wgs-84", + "latitude" to 12.78, + "longitude" to 56.7 + ), + "livesIn3d" to mapOf( + "crs" to "wgs-84-3d", + "latitude" to 12.79, + "longitude" to 56.71, + "height" to 100.0, + ), + "livesIn2d" to mapOf( + "crs" to "wgs-84", + "latitude" to 12.79, + "longitude" to 56.71 + ), + "worksIn2d" to mapOf( + "crs" to "cartesian", + "x" to 1.2, + "y" to 10.1 + ), + "worksIn3d" to mapOf( + "crs" to "cartesian-3d", + "x" to 1.2, + "y" to 10.1, + "z" to 7.1 + ) + ), + labels = listOf("User") + ) ), schema = nodeSchema ) @@ -371,12 +425,20 @@ class Neo4jSinkTaskTest { operation = OperationType.created ), payload = RelationshipPayload( - id = "2", - start = RelationshipNodeChange(id = "0", labels = listOf("User"), ids = mapOf("name" to "Andrea", "surname" to "Santurbano")), - end = RelationshipNodeChange(id = "1", labels = listOf("User"), ids = mapOf("name" to "Michael", "surname" to "Hunger")), - after = RelationshipChange(properties = mapOf("since" to 2014)), - before = null, - label = "KNOWS WHO" + id = "2", + start = RelationshipNodeChange(id = "0", labels = listOf("User"), ids = mapOf("name" to "Andrea", "surname" to "Santurbano")), + end = RelationshipNodeChange(id = "1", labels = listOf("User"), ids = mapOf("name" to "Michael", "surname" to "Hunger")), + after = RelationshipChange(properties = mapOf( + "since" to 2014, + "where" to mapOf( + "crs" to "wgs-84-3d", + "latitude" to 12.78, + "longitude" to 56.7, + "height" to 80.0, + ) + )), + before = null, + label = "MEET" ), schema = relSchema ) @@ -388,7 +450,19 @@ class Neo4jSinkTaskTest { task.put(input) session.beginTransaction().use { val query = """ - |MATCH p = (s:User{name:'Andrea', surname:'Santurbano', `comp@ny`:'LARUS-BA'})-[r:`KNOWS WHO`{since:2014}]->(e:User{name:'Michael', surname:'Hunger', `comp@ny`:'Neo4j'}) + |MATCH (s:User{ + | name:'Andrea', + | surname:'Santurbano', + | `comp@ny`:'LARUS-BA', + | bornIn3d: point({x: 56.7, y: 12.78, z: 100.0, crs: 'wgs-84-3d'}), + | bornIn2d: point({x: 56.7, y: 12.78, crs: 'wgs-84'}), + | livesIn3d: point({longitude: 56.71, latitude: 12.79, height: 100}), + | livesIn2d: point({longitude: 56.71, latitude: 12.79}), + | worksIn2d: point({x: 1.2, y: 10.1, crs: 'cartesian'}), + | worksIn3d: point({x: 1.2, y: 10.1, z: 7.1, crs: 'cartesian-3d'}) + |}) + |MATCH (t:User{name:'Michael', surname:'Hunger', `comp@ny`:'Neo4j'}) + |MATCH p = (s)-[r:MEET{since: 2014, where: point({x: 56.7, y: 12.78, z: 80.0, crs: 'wgs-84-3d'})}]->(t) |RETURN count(p) AS count |""".trimMargin() val result = it.run(query)