Skip to content

Commit

Permalink
fixes #567: ( Impact on BC/DR - Data loss ) - High priority, Nodes wi…
Browse files Browse the repository at this point in the history
…th PointValue property cannot be successfully sink to Neo4j With Neo4j Streams Plugin
  • Loading branch information
conker84 committed Sep 20, 2023
1 parent fead34b commit c06a262
Show file tree
Hide file tree
Showing 58 changed files with 329 additions and 5,450 deletions.
6 changes: 6 additions & 0 deletions common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@
<artifactId>kafka-clients</artifactId>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.neo4j</groupId>
<artifactId>neo4j-configuration</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>

</project>
2 changes: 0 additions & 2 deletions common/src/main/kotlin/streams/events/StreamsEvent.kt
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package streams.events

import org.neo4j.graphdb.schema.ConstraintType

enum class OperationType { created, updated, deleted }

data class Meta(val timestamp: Long,
Expand Down
129 changes: 127 additions & 2 deletions common/src/main/kotlin/streams/utils/JSONUtils.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,38 @@ package streams.utils

import com.fasterxml.jackson.core.JsonGenerator
import com.fasterxml.jackson.core.JsonParseException
import com.fasterxml.jackson.core.JsonParser
import com.fasterxml.jackson.core.JsonProcessingException
import com.fasterxml.jackson.databind.*
import com.fasterxml.jackson.databind.DeserializationContext
import com.fasterxml.jackson.databind.DeserializationFeature
import com.fasterxml.jackson.databind.JsonDeserializer
import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.JsonSerializer
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.SerializationFeature
import com.fasterxml.jackson.databind.SerializerProvider
import com.fasterxml.jackson.databind.module.SimpleModule
import com.fasterxml.jackson.module.kotlin.convertValue
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import com.fasterxml.jackson.module.kotlin.readValue
import org.neo4j.driver.internal.value.PointValue
import org.neo4j.function.ThrowingBiConsumer
import org.neo4j.graphdb.spatial.Point
import org.neo4j.values.AnyValue
import org.neo4j.values.storable.CoordinateReferenceSystem
import streams.events.*
import org.neo4j.values.storable.Values
import org.neo4j.values.virtual.MapValue
import org.neo4j.values.virtual.MapValueBuilder
import streams.events.EntityType
import streams.events.Meta
import streams.events.NodePayload
import streams.events.Payload
import streams.events.RecordChange
import streams.events.RelationshipPayload
import streams.events.Schema
import streams.events.StreamsTransactionEvent
import streams.events.StreamsTransactionNodeEvent
import streams.events.StreamsTransactionRelationshipEvent
import streams.extensions.asStreamsMap
import java.io.IOException
import java.time.temporal.TemporalAccessor
Expand All @@ -33,6 +55,13 @@ fun Point.toStreamsPoint(): StreamsPoint {
}
}

fun Map<String, Any>.toMapValue(): MapValue {
val map = this
val builder = MapValueBuilder()
map.forEach { (t, u) -> builder.add(t, Values.of(u)) }
return builder.build()
}

fun PointValue.toStreamsPoint(): StreamsPoint {
val point = this.asPoint()
return when (val crsType = point.srid()) {
Expand Down Expand Up @@ -121,6 +150,100 @@ class DriverRelationshipSerializer : JsonSerializer<org.neo4j.driver.types.Relat
}
}

class StreamsTransactionRelationshipEventDeserializer : StreamsTransactionEventDeserializer<StreamsTransactionRelationshipEvent, RelationshipPayload>() {
override fun createEvent(meta: Meta, payload: RelationshipPayload, schema: Schema): StreamsTransactionRelationshipEvent {
return StreamsTransactionRelationshipEvent(meta, payload, schema)
}

override fun convertPayload(payloadMap: JsonNode): RelationshipPayload {
return JSONUtils.convertValue<RelationshipPayload>(payloadMap)
}

override fun fillPayload(payload: RelationshipPayload,
beforeProps: Map<String, Any>?,
afterProps: Map<String, Any>?): RelationshipPayload {
return payload.copy(
before = payload.before?.copy(properties = beforeProps),
after = payload.after?.copy(properties = afterProps)
)
}

override fun deserialize(parser: JsonParser, context: DeserializationContext): StreamsTransactionRelationshipEvent {
val deserialized = super.deserialize(parser, context)
if (deserialized.payload.type == EntityType.node) {
throw IllegalArgumentException("Relationship event expected, but node type found")
}
return deserialized
}

}

class StreamsTransactionNodeEventDeserializer : StreamsTransactionEventDeserializer<StreamsTransactionNodeEvent, NodePayload>() {
override fun createEvent(meta: Meta, payload: NodePayload, schema: Schema): StreamsTransactionNodeEvent {
return StreamsTransactionNodeEvent(meta, payload, schema)
}

override fun convertPayload(payloadMap: JsonNode): NodePayload {
return JSONUtils.convertValue<NodePayload>(payloadMap)
}

override fun fillPayload(payload: NodePayload,
beforeProps: Map<String, Any>?,
afterProps: Map<String, Any>?): NodePayload {
return payload.copy(
before = payload.before?.copy(properties = beforeProps),
after = payload.after?.copy(properties = afterProps)
)
}

override fun deserialize(parser: JsonParser, context: DeserializationContext): StreamsTransactionNodeEvent {
val deserialized = super.deserialize(parser, context)
if (deserialized.payload.type == EntityType.relationship) {
throw IllegalArgumentException("Node event expected, but relationship type found")
}
return deserialized
}

}

abstract class StreamsTransactionEventDeserializer<EVENT, PAYLOAD: Payload> : JsonDeserializer<EVENT>() {

abstract fun createEvent(meta: Meta, payload: PAYLOAD, schema: Schema): EVENT
abstract fun convertPayload(payloadMap: JsonNode): PAYLOAD
abstract fun fillPayload(payload: PAYLOAD,
beforeProps: Map<String, Any>?,
afterProps: Map<String, Any>?): PAYLOAD

@Throws(IOException::class, JsonProcessingException::class)
override fun deserialize(parser: JsonParser, context: DeserializationContext): EVENT {
val root: JsonNode = parser.codec.readTree(parser)
val meta = JSONUtils.convertValue<Meta>(root["meta"])
val schema = JSONUtils.convertValue<Schema>(root["schema"])
val points = schema.properties.filterValues { it == "PointValue" }.keys
var payload = convertPayload(root["payload"])
if (points.isNotEmpty()) {
val beforeProps = convertPoints(payload.before, points)
val afterProps = convertPoints(payload.after, points)
payload = fillPayload(payload, beforeProps, afterProps)
}
return createEvent(meta, payload, schema)
}

private fun convertPoints(
recordChange: RecordChange?,
points: Set<String>
) = recordChange
?.properties
?.mapValues {
if (points.contains(it.key)) {
org.neo4j.values.storable.PointValue.fromMap((it.value as Map<String, Any>).toMapValue())
} else {
it.value
}
}

}

object JSONUtils {

private val OBJECT_MAPPER: ObjectMapper = jacksonObjectMapper()
Expand All @@ -133,6 +256,8 @@ object JSONUtils {
StreamsUtils.ignoreExceptions({ module.addSerializer(org.neo4j.driver.types.Point::class.java, DriverPointSerializer()) }, NoClassDefFoundError::class.java) // in case is loaded from
StreamsUtils.ignoreExceptions({ module.addSerializer(org.neo4j.driver.types.Node::class.java, DriverNodeSerializer()) }, NoClassDefFoundError::class.java) // in case is loaded from
StreamsUtils.ignoreExceptions({ module.addSerializer(org.neo4j.driver.types.Relationship::class.java, DriverRelationshipSerializer()) }, NoClassDefFoundError::class.java) // in case is loaded from
StreamsUtils.ignoreExceptions({ module.addDeserializer(StreamsTransactionRelationshipEvent::class.java, StreamsTransactionRelationshipEventDeserializer()) }, NoClassDefFoundError::class.java) // in case is loaded from
StreamsUtils.ignoreExceptions({ module.addDeserializer(StreamsTransactionNodeEvent::class.java, StreamsTransactionNodeEventDeserializer()) }, NoClassDefFoundError::class.java) // in case is loaded from
module.addSerializer(TemporalAccessor::class.java, TemporalAccessorSerializer())
OBJECT_MAPPER.registerModule(module)
OBJECT_MAPPER.disable(SerializationFeature.FAIL_ON_EMPTY_BEANS)
Expand Down
2 changes: 0 additions & 2 deletions common/src/main/kotlin/streams/utils/ProcedureUtils.kt
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package streams.utils

import org.neo4j.configuration.Config
import org.neo4j.configuration.GraphDatabaseSettings
import org.neo4j.dbms.api.DatabaseManagementService
import org.neo4j.exceptions.UnsatisfiedDependencyException
import org.neo4j.kernel.impl.factory.DbmsInfo
Expand Down
150 changes: 150 additions & 0 deletions consumer/src/test/kotlin/integrations/kafka/KafkaEventSinkCDCTSE.kt
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,156 @@ class KafkaEventSinkCDCTSE: KafkaEventSinkBaseTSE() {
}, Matchers.equalTo(true), 30, TimeUnit.SECONDS)
}

@Test
fun shouldWriteDataFromSinkWithCDCSchemaTopicAndPointValue() = runBlocking {
val topic = UUID.randomUUID().toString()
db.setConfig("streams.sink.topic.cdc.schema", topic)
db.start()

val constraints = listOf(Constraint(label = "User", type = StreamsConstraintType.UNIQUE, properties = setOf("name", "surname")))
val nodeSchema = Schema(
properties = mapOf(
"name" to "String",
"surname" to "String",
"comp@ny" to "String",
"bornIn2d" to "PointValue",
"bornIn3d" to "PointValue",
"livesIn2d" to "PointValue",
"livesIn3d" to "PointValue",
"worksIn2d" to "PointValue",
"worksIn3d" to "PointValue"
),
constraints = constraints)
val relSchema = Schema(properties = mapOf(
"since" to "Long",
"where" to "PointValue"
), constraints = constraints)
val cdcDataStart = StreamsTransactionEvent(
meta = Meta(timestamp = System.currentTimeMillis(),
username = "user",
txId = 1,
txEventId = 0,
txEventsCount = 3,
operation = OperationType.created
),
payload = NodePayload(
id = "0",
before = null,
after = NodeChange(
properties = mapOf(
"name" to "Andrea",
"surname" to "Santurbano",
"comp@ny" to "LARUS-BA",
"bornIn3d" to mapOf(
"crs" to "wgs-84-3d",
"latitude" to 12.78,
"longitude" to 56.7,
"height" to 100.0,
),
"bornIn2d" to mapOf(
"crs" to "wgs-84",
"latitude" to 12.78,
"longitude" to 56.7
),
"livesIn3d" to mapOf(
"crs" to "wgs-84-3d",
"latitude" to 12.79,
"longitude" to 56.71,
"height" to 100.0,
),
"livesIn2d" to mapOf(
"crs" to "wgs-84",
"latitude" to 12.79,
"longitude" to 56.71
),
"worksIn2d" to mapOf(
"crs" to "cartesian",
"x" to 1.2,
"y" to 10.1
),
"worksIn3d" to mapOf(
"crs" to "cartesian-3d",
"x" to 1.2,
"y" to 10.1,
"z" to 7.1
)
),
labels = listOf("User")
)
),
schema = nodeSchema
)
val cdcDataEnd = StreamsTransactionEvent(
meta = Meta(timestamp = System.currentTimeMillis(),
username = "user",
txId = 1,
txEventId = 1,
txEventsCount = 3,
operation = OperationType.created
),
payload = NodePayload(id = "1",
before = null,
after = NodeChange(properties = mapOf("name" to "Michael", "surname" to "Hunger", "comp@ny" to "Neo4j"), labels = listOf("User"))
),
schema = nodeSchema
)
val cdcDataRelationship = StreamsTransactionEvent(
meta = Meta(timestamp = System.currentTimeMillis(),
username = "user",
txId = 1,
txEventId = 2,
txEventsCount = 3,
operation = OperationType.created
),
payload = RelationshipPayload(
id = "2",
start = RelationshipNodeChange(id = "0", labels = listOf("User"), ids = mapOf("name" to "Andrea", "surname" to "Santurbano")),
end = RelationshipNodeChange(id = "1", labels = listOf("User"), ids = mapOf("name" to "Michael", "surname" to "Hunger")),
after = RelationshipChange(properties = mapOf(
"since" to 2014,
"where" to mapOf(
"crs" to "wgs-84-3d",
"latitude" to 12.78,
"longitude" to 56.7,
"height" to 80.0,
)
)),
before = null,
label = "MEET"
),
schema = relSchema
)
var producerRecord = ProducerRecord(topic, UUID.randomUUID().toString(), JSONUtils.writeValueAsBytes(cdcDataStart))
kafkaProducer.send(producerRecord).get()
producerRecord = ProducerRecord(topic, UUID.randomUUID().toString(), JSONUtils.writeValueAsBytes(cdcDataEnd))
kafkaProducer.send(producerRecord).get()
producerRecord = ProducerRecord(topic, UUID.randomUUID().toString(), JSONUtils.writeValueAsBytes(cdcDataRelationship))
kafkaProducer.send(producerRecord).get()

Assert.assertEventually(ThrowingSupplier<Boolean, Exception> {
val query = """
|MATCH (s:User{
| name:'Andrea',
| surname:'Santurbano',
| `comp@ny`:'LARUS-BA',
| bornIn3d: point({x: 56.7, y: 12.78, z: 100.0, crs: 'wgs-84-3d'}),
| bornIn2d: point({x: 56.7, y: 12.78, crs: 'wgs-84'}),
| livesIn3d: point({longitude: 56.71, latitude: 12.79, height: 100}),
| livesIn2d: point({longitude: 56.71, latitude: 12.79}),
| worksIn2d: point({x: 1.2, y: 10.1, crs: 'cartesian'}),
| worksIn3d: point({x: 1.2, y: 10.1, z: 7.1, crs: 'cartesian-3d'})
|})
|MATCH (t:User{name:'Michael', surname:'Hunger', `comp@ny`:'Neo4j'})
|MATCH p = (s)-[r:MEET{since: 2014, where: point({x: 56.7, y: 12.78, z: 80.0, crs: 'wgs-84-3d'})}]->(t)
|RETURN count(p) AS count
|""".trimMargin()
db.execute(query) {
val result = it.columnAs<Long>("count")
result.hasNext() && result.next() == 1L && !result.hasNext()
}
}, Matchers.equalTo(true), 30, TimeUnit.SECONDS)
}

@Test
fun writeDataFromSinkWithCDCSchemaTopicMultipleConstraintsAndLabels() = runBlocking {
val topic = UUID.randomUUID().toString()
Expand Down
15 changes: 0 additions & 15 deletions kafka-connect-neo4j/README.md

This file was deleted.

Binary file removed kafka-connect-neo4j/assets/neo4j-logo.png
Binary file not shown.
14 changes: 0 additions & 14 deletions kafka-connect-neo4j/config/sink-quickstart.properties

This file was deleted.

Loading

0 comments on commit c06a262

Please sign in to comment.