Skip to content

Commit

Permalink
fixes #567: ( Impact on BC/DR - Data loss ) - High priority, Nodes wi…
Browse files Browse the repository at this point in the history
…th PointValue property cannot be successfully sink to Neo4j With Neo4j Streams Plugin
  • Loading branch information
conker84 committed Aug 29, 2023
1 parent 7248f2c commit 4c65888
Show file tree
Hide file tree
Showing 2 changed files with 219 additions and 11 deletions.
134 changes: 134 additions & 0 deletions common/src/main/kotlin/streams/utils/JSONUtils.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,12 @@ package streams.utils

import com.fasterxml.jackson.core.JsonGenerator
import com.fasterxml.jackson.core.JsonParseException
import com.fasterxml.jackson.core.JsonParser
import com.fasterxml.jackson.core.JsonProcessingException
import com.fasterxml.jackson.databind.DeserializationContext
import com.fasterxml.jackson.databind.DeserializationFeature
import com.fasterxml.jackson.databind.JsonDeserializer
import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.JsonSerializer
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.SerializationFeature
Expand All @@ -13,10 +17,18 @@ import com.fasterxml.jackson.module.kotlin.convertValue
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import com.fasterxml.jackson.module.kotlin.readValue
import org.neo4j.driver.Value
import org.neo4j.driver.Values
import org.neo4j.driver.internal.value.PointValue
import org.neo4j.driver.types.Node
import org.neo4j.driver.types.Point
import org.neo4j.driver.types.Relationship
import streams.events.EntityType
import streams.events.Meta
import streams.events.NodePayload
import streams.events.Payload
import streams.events.RecordChange
import streams.events.RelationshipPayload
import streams.events.Schema
import streams.events.StreamsTransactionEvent
import streams.events.StreamsTransactionNodeEvent
import streams.events.StreamsTransactionRelationshipEvent
Expand Down Expand Up @@ -94,6 +106,125 @@ class DriverRelationshipSerializer : JsonSerializer<Relationship>() {
}
}

class StreamsTransactionRelationshipEventDeserializer : StreamsTransactionEventDeserializer<StreamsTransactionRelationshipEvent, RelationshipPayload>() {
override fun createEvent(meta: Meta, payload: RelationshipPayload, schema: Schema): StreamsTransactionRelationshipEvent {
return StreamsTransactionRelationshipEvent(meta, payload, schema)
}

override fun convertPayload(payloadMap: JsonNode): RelationshipPayload {
return JSONUtils.convertValue<RelationshipPayload>(payloadMap)
}

override fun fillPayload(payload: RelationshipPayload,
beforeProps: Map<String, Any>?,
afterProps: Map<String, Any>?): RelationshipPayload {
return payload.copy(
before = payload.before?.copy(properties = beforeProps),
after = payload.after?.copy(properties = afterProps)
)
}

override fun deserialize(parser: JsonParser, context: DeserializationContext): StreamsTransactionRelationshipEvent {
val deserialized = super.deserialize(parser, context)
if (deserialized.payload.type == EntityType.node) {
throw IllegalArgumentException("Relationship event expected, but node type found")
}
return deserialized
}

}

class StreamsTransactionNodeEventDeserializer : StreamsTransactionEventDeserializer<StreamsTransactionNodeEvent, NodePayload>() {
override fun createEvent(meta: Meta, payload: NodePayload, schema: Schema): StreamsTransactionNodeEvent {
return StreamsTransactionNodeEvent(meta, payload, schema)
}

override fun convertPayload(payloadMap: JsonNode): NodePayload {
return JSONUtils.convertValue<NodePayload>(payloadMap)
}

override fun fillPayload(payload: NodePayload,
beforeProps: Map<String, Any>?,
afterProps: Map<String, Any>?): NodePayload {
return payload.copy(
before = payload.before?.copy(properties = beforeProps),
after = payload.after?.copy(properties = afterProps)
)
}

override fun deserialize(parser: JsonParser, context: DeserializationContext): StreamsTransactionNodeEvent {
val deserialized = super.deserialize(parser, context)
if (deserialized.payload.type == EntityType.relationship) {
throw IllegalArgumentException("Node event expected, but relationship type found")
}
return deserialized
}

}

abstract class StreamsTransactionEventDeserializer<EVENT, PAYLOAD: Payload> : JsonDeserializer<EVENT>() {

abstract fun createEvent(meta: Meta, payload: PAYLOAD, schema: Schema): EVENT
abstract fun convertPayload(payloadMap: JsonNode): PAYLOAD
abstract fun fillPayload(payload: PAYLOAD,
beforeProps: Map<String, Any>?,
afterProps: Map<String, Any>?): PAYLOAD

@Throws(IOException::class, JsonProcessingException::class)
override fun deserialize(parser: JsonParser, context: DeserializationContext): EVENT {
val root: JsonNode = parser.codec.readTree(parser)
val meta = JSONUtils.convertValue<Meta>(root["meta"])
val schema = JSONUtils.convertValue<Schema>(root["schema"])
val points = schema.properties.filterValues { it == "PointValue" }.keys
var payload = convertPayload(root["payload"])
if (points.isNotEmpty()) {
val beforeProps = convertPoints(payload.before, points)
val afterProps = convertPoints(payload.after, points)
payload = fillPayload(payload, beforeProps, afterProps)
}
return createEvent(meta, payload, schema)
}

private fun convertPoints(
recordChange: RecordChange?,
points: Set<String>
) = recordChange
?.properties
?.mapValues {
if (points.contains(it.key)) {
val pointMap = it.value as Map<String, Any>
when (pointMap["crs"]) {
"cartesian" -> Values.point(
7203,
pointMap["x"].toString().toDouble(),
pointMap["y"].toString().toDouble()
)
"cartesian-3d" -> Values.point(
9157,
pointMap["x"].toString().toDouble(),
pointMap["y"].toString().toDouble(),
pointMap["z"].toString().toDouble()
)
"wgs-84" -> Values.point(
4326,
pointMap["longitude"].toString().toDouble(),
pointMap["latitude"].toString().toDouble()
)
"wgs-84-3d" -> Values.point(
4979,
pointMap["longitude"].toString().toDouble(),
pointMap["latitude"].toString().toDouble(),
pointMap["height"].toString().toDouble()
)
else -> throw IllegalArgumentException("CRS value: ${pointMap["crs"]} not found")
}
} else {
it.value
}
}

}

object JSONUtils {

private val OBJECT_MAPPER: ObjectMapper = jacksonObjectMapper()
Expand All @@ -106,10 +237,13 @@ object JSONUtils {
module.addSerializer(PointValue::class.java, PointValueSerializer())
module.addSerializer(Node::class.java, DriverNodeSerializer())
module.addSerializer(Relationship::class.java, DriverRelationshipSerializer())
module.addDeserializer(StreamsTransactionRelationshipEvent::class.java, StreamsTransactionRelationshipEventDeserializer())
module.addDeserializer(StreamsTransactionNodeEvent::class.java, StreamsTransactionNodeEventDeserializer())
module.addSerializer(TemporalAccessor::class.java, TemporalAccessorSerializer())
OBJECT_MAPPER.registerModule(module)
OBJECT_MAPPER.disable(SerializationFeature.FAIL_ON_EMPTY_BEANS)
OBJECT_MAPPER.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES)
STRICT_OBJECT_MAPPER.registerModule(module)
}

fun getObjectMapper(): ObjectMapper = OBJECT_MAPPER
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,9 +329,23 @@ class Neo4jSinkTaskTest {
SinkTask.TOPICS_CONFIG to firstTopic)

val constraints = listOf(Constraint(label = "User", type = StreamsConstraintType.UNIQUE, properties = setOf("name", "surname")))
val relSchema = Schema(properties = mapOf("since" to "Long"), constraints = constraints)
val nodeSchema = Schema(properties = mapOf("name" to "String", "surname" to "String", "comp@ny" to "String"),
constraints = constraints)
val relSchema = Schema(properties = mapOf(
"since" to "Long",
"where" to "PointValue"
), constraints = constraints)
val nodeSchema = Schema(
properties = mapOf(
"name" to "String",
"surname" to "String",
"comp@ny" to "String",
"bornIn2d" to "PointValue",
"bornIn3d" to "PointValue",
"livesIn2d" to "PointValue",
"livesIn3d" to "PointValue",
"worksIn2d" to "PointValue",
"worksIn3d" to "PointValue"
),
constraints = constraints)
val cdcDataStart = StreamsTransactionEvent(
meta = Meta(timestamp = System.currentTimeMillis(),
username = "user",
Expand All @@ -342,7 +356,47 @@ class Neo4jSinkTaskTest {
),
payload = NodePayload(id = "0",
before = null,
after = NodeChange(properties = mapOf("name" to "Andrea", "surname" to "Santurbano", "comp@ny" to "LARUS-BA"), labels = listOf("User"))
after = NodeChange(
properties = mapOf(
"name" to "Andrea",
"surname" to "Santurbano",
"comp@ny" to "LARUS-BA",
"bornIn3d" to mapOf(
"crs" to "wgs-84-3d",
"latitude" to 12.78,
"longitude" to 56.7,
"height" to 100.0,
),
"bornIn2d" to mapOf(
"crs" to "wgs-84",
"latitude" to 12.78,
"longitude" to 56.7
),
"livesIn3d" to mapOf(
"crs" to "wgs-84-3d",
"latitude" to 12.79,
"longitude" to 56.71,
"height" to 100.0,
),
"livesIn2d" to mapOf(
"crs" to "wgs-84",
"latitude" to 12.79,
"longitude" to 56.71
),
"worksIn2d" to mapOf(
"crs" to "cartesian",
"x" to 1.2,
"y" to 10.1
),
"worksIn3d" to mapOf(
"crs" to "cartesian-3d",
"x" to 1.2,
"y" to 10.1,
"z" to 7.1
)
),
labels = listOf("User")
)
),
schema = nodeSchema
)
Expand All @@ -369,12 +423,20 @@ class Neo4jSinkTaskTest {
operation = OperationType.created
),
payload = RelationshipPayload(
id = "2",
start = RelationshipNodeChange(id = "0", labels = listOf("User"), ids = mapOf("name" to "Andrea", "surname" to "Santurbano")),
end = RelationshipNodeChange(id = "1", labels = listOf("User"), ids = mapOf("name" to "Michael", "surname" to "Hunger")),
after = RelationshipChange(properties = mapOf("since" to 2014)),
before = null,
label = "KNOWS WHO"
id = "2",
start = RelationshipNodeChange(id = "0", labels = listOf("User"), ids = mapOf("name" to "Andrea", "surname" to "Santurbano")),
end = RelationshipNodeChange(id = "1", labels = listOf("User"), ids = mapOf("name" to "Michael", "surname" to "Hunger")),
after = RelationshipChange(properties = mapOf(
"since" to 2014,
"where" to mapOf(
"crs" to "wgs-84-3d",
"latitude" to 12.78,
"longitude" to 56.7,
"height" to 80.0,
)
)),
before = null,
label = "MEET"
),
schema = relSchema
)
Expand All @@ -386,7 +448,19 @@ class Neo4jSinkTaskTest {
task.put(input)
session.beginTransaction().use {
val query = """
|MATCH p = (s:User{name:'Andrea', surname:'Santurbano', `comp@ny`:'LARUS-BA'})-[r:`KNOWS WHO`{since:2014}]->(e:User{name:'Michael', surname:'Hunger', `comp@ny`:'Neo4j'})
|MATCH (s:User{
| name:'Andrea',
| surname:'Santurbano',
| `comp@ny`:'LARUS-BA',
| bornIn3d: point({x: 56.7, y: 12.78, z: 100.0, crs: 'wgs-84-3d'}),
| bornIn2d: point({x: 56.7, y: 12.78, crs: 'wgs-84'}),
| livesIn3d: point({longitude: 56.71, latitude: 12.79, height: 100}),
| livesIn2d: point({longitude: 56.71, latitude: 12.79}),
| worksIn2d: point({x: 1.2, y: 10.1, crs: 'cartesian'}),
| worksIn3d: point({x: 1.2, y: 10.1, z: 7.1, crs: 'cartesian-3d'})
|})
|MATCH (t:User{name:'Michael', surname:'Hunger', `comp@ny`:'Neo4j'})
|MATCH p = (s)-[r:MEET{since: 2014, where: point({x: 56.7, y: 12.78, z: 80.0, crs: 'wgs-84-3d'})}]->(t)
|RETURN count(p) AS count
|""".trimMargin()
val result = it.run(query)
Expand Down

0 comments on commit 4c65888

Please sign in to comment.