From 0b6895cf8836e0670f42d862c3d412e09a7117e4 Mon Sep 17 00:00:00 2001 From: Florent Biville <445792+fbiville@users.noreply.github.com> Date: Fri, 3 Nov 2023 22:34:09 +0100 Subject: [PATCH] fix: port fix for neo4j-contrib/neo4j-streams/issues/537 (#27) Fix Kafka sink connector to work with CUD ingestion strategy and Avro format data. See https://github.com/neo4j-contrib/neo4j-streams/pull/582 Co-authored-by: Andrea Santurbano --- .../sink/converters/Neo4jValueConverter.kt | 29 +-- .../kafka/sink/Neo4jSinkTaskTest.kt | 74 +++++- .../Neo4jValueConverterNestedStructTest.kt | 8 +- .../converters/Neo4jValueConverterTest.kt | 225 ++++++++++++------ 4 files changed, 237 insertions(+), 99 deletions(-) diff --git a/sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/converters/Neo4jValueConverter.kt b/sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/converters/Neo4jValueConverter.kt index b56828ae..69c2dfa8 100644 --- a/sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/converters/Neo4jValueConverter.kt +++ b/sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/converters/Neo4jValueConverter.kt @@ -22,28 +22,26 @@ import java.time.ZoneId import java.util.Date import java.util.concurrent.TimeUnit import org.apache.kafka.connect.data.Struct -import org.neo4j.driver.Value -import org.neo4j.driver.Values @Suppress("UNCHECKED_CAST") -class Neo4jValueConverter : MapValueConverter() { +class Neo4jValueConverter : MapValueConverter() { companion object { @JvmStatic private val UTC = ZoneId.of("UTC") } - override fun setValue(result: MutableMap?, fieldName: String, value: Any?) { + override fun setValue(result: MutableMap?, fieldName: String, value: Any?) { if (result != null) { - result[fieldName] = Values.value(value) ?: Values.NULL + result[fieldName] = value } } - override fun newValue(): MutableMap { + override fun newValue(): MutableMap { return mutableMapOf() } override fun setDecimalField( - result: MutableMap?, + result: MutableMap?, fieldName: String, value: BigDecimal ) { @@ -60,7 +58,7 @@ class Neo4jValueConverter : MapValueConverter() { } override fun setTimestampField( - result: MutableMap?, + result: MutableMap?, fieldName: String, value: Date ) { @@ -68,23 +66,18 @@ class Neo4jValueConverter : MapValueConverter() { setValue(result, fieldName, localDate) } - override fun setTimeField(result: MutableMap?, fieldName: String, value: Date) { + override fun setTimeField(result: MutableMap?, fieldName: String, value: Date) { val time = LocalTime.ofNanoOfDay(TimeUnit.MILLISECONDS.toNanos(value.time)) setValue(result, fieldName, time) } - override fun setDateField(result: MutableMap?, fieldName: String, value: Date) { + override fun setDateField(result: MutableMap?, fieldName: String, value: Date) { val localDate = value.toInstant().atZone(UTC).toLocalDate() setValue(result, fieldName, localDate) } - override fun setStructField( - result: MutableMap?, - fieldName: String, - value: Struct - ) { - val converted = - convert(value).mapValues { it.value?.asObject() }.toMutableMap() as MutableMap - setMap(result, fieldName, null, converted) + override fun setStructField(result: MutableMap?, fieldName: String, value: Struct) { + val converted = convert(value).toMutableMap() as MutableMap + setValue(result, fieldName, converted) } } diff --git a/sink/src/test/kotlin/org/neo4j/connectors/kafka/sink/Neo4jSinkTaskTest.kt b/sink/src/test/kotlin/org/neo4j/connectors/kafka/sink/Neo4jSinkTaskTest.kt index 3c62b91a..e1454fac 100644 --- a/sink/src/test/kotlin/org/neo4j/connectors/kafka/sink/Neo4jSinkTaskTest.kt +++ b/sink/src/test/kotlin/org/neo4j/connectors/kafka/sink/Neo4jSinkTaskTest.kt @@ -112,6 +112,15 @@ class Neo4jSinkTaskTest { task.initialize(Mockito.mock(SinkTaskContext::class.java)) } + private val PLACE_SCHEMA = + SchemaBuilder.struct() + .name("com.example.Place") + .field("name", Schema.STRING_SCHEMA) + .field("latitude", Schema.FLOAT32_SCHEMA) + .field("longitude", Schema.FLOAT32_SCHEMA) + .field("modified", Timestamp.SCHEMA) + .build() + private val PERSON_SCHEMA = SchemaBuilder.struct() .name("com.example.Person") @@ -125,14 +134,7 @@ class Neo4jSinkTaskTest { .field("float", Schema.OPTIONAL_FLOAT32_SCHEMA) .field("double", Schema.OPTIONAL_FLOAT64_SCHEMA) .field("modified", Timestamp.SCHEMA) - .build() - - private val PLACE_SCHEMA = - SchemaBuilder.struct() - .name("com.example.Place") - .field("name", Schema.STRING_SCHEMA) - .field("latitude", Schema.FLOAT32_SCHEMA) - .field("longitude", Schema.FLOAT32_SCHEMA) + .field("place", PLACE_SCHEMA) .build() @Test @@ -1905,6 +1907,62 @@ class Neo4jSinkTaskTest { assertEquals(expectedDataErrorSize, exception.errorDatas.size) } + @Test + fun `should successfully parse nested timestamps`() { + val firstTopic = "neotopic" + val secondTopic = "foo" + val props = mutableMapOf() + props[Neo4jConfiguration.URI] = neo4j.boltUrl + props["${SinkConfiguration.TOPIC_CYPHER_PREFIX}$firstTopic"] = + "CREATE (n:PersonExt {modified: event.place.modified})" + props["${SinkConfiguration.TOPIC_CYPHER_PREFIX}$secondTopic"] = + "CREATE (n:Person {modified: event.place.modified})" + props[Neo4jConfiguration.AUTHENTICATION_TYPE] = AuthenticationType.NONE.toString() + props[SinkConfiguration.BATCH_SIZE] = 2.toString() + props[SinkTask.TOPICS_CONFIG] = "$firstTopic,$secondTopic" + + val place = + Struct(PLACE_SCHEMA) + .put("name", "San Mateo (CA)") + .put("latitude", 37.5629917.toFloat()) + .put("longitude", -122.3255254.toFloat()) + .put("modified", Date(1474661402123L)) + val struct = + Struct(PERSON_SCHEMA) + .put("firstName", "Alex") + .put("lastName", "Smith") + .put("bool", true) + .put("short", 1234.toShort()) + .put("byte", (-32).toByte()) + .put("long", 12425436L) + .put("float", 2356.3.toFloat()) + .put("double", -2436546.56457) + .put("age", 21) + .put("modified", Date(1474661402123L)) + .put("place", place) + + task.start(props) + val input = + listOf( + SinkRecord(firstTopic, 1, null, null, PERSON_SCHEMA, struct, 42), + SinkRecord(firstTopic, 1, null, null, PERSON_SCHEMA, struct, 42), + SinkRecord(firstTopic, 1, null, null, PERSON_SCHEMA, struct, 42), + SinkRecord(firstTopic, 1, null, null, PERSON_SCHEMA, struct, 42), + SinkRecord(firstTopic, 1, null, null, PERSON_SCHEMA, struct, 42), + SinkRecord(secondTopic, 1, null, null, PERSON_SCHEMA, struct, 43)) + task.put(input) + session.beginTransaction().use { + val personCount = + it.run("MATCH (p:Person) RETURN COUNT(p) as COUNT").single()["COUNT"].asLong() + val expectedPersonCount = input.filter { it.topic() == secondTopic }.size + assertEquals(expectedPersonCount, personCount.toInt()) + val personExtCount = + it.run("MATCH (p:PersonExt) RETURN COUNT(p) as COUNT").single()["COUNT"].asLong() + val expectedPersonExtCount = input.filter { it.topic() == firstTopic }.size + assertEquals(expectedPersonExtCount, personExtCount.toInt()) + } + } + private fun countFooPersonEntities(expected: Int) { val personCount = session.run("MATCH (p:Person) RETURN count(p) as count").single()["count"].asLong() diff --git a/sink/src/test/kotlin/org/neo4j/connectors/kafka/sink/converters/Neo4jValueConverterNestedStructTest.kt b/sink/src/test/kotlin/org/neo4j/connectors/kafka/sink/converters/Neo4jValueConverterNestedStructTest.kt index 10897111..23d12872 100644 --- a/sink/src/test/kotlin/org/neo4j/connectors/kafka/sink/converters/Neo4jValueConverterNestedStructTest.kt +++ b/sink/src/test/kotlin/org/neo4j/connectors/kafka/sink/converters/Neo4jValueConverterNestedStructTest.kt @@ -25,8 +25,6 @@ import org.apache.kafka.connect.data.SchemaBuilder import org.apache.kafka.connect.data.Struct import org.junit.jupiter.api.Test import org.neo4j.connectors.kafka.utils.JSONUtils -import org.neo4j.driver.Value -import org.neo4j.driver.Values @Suppress("UNCHECKED_CAST") class Neo4jValueConverterNestedStructTest { @@ -125,10 +123,8 @@ class Neo4jValueConverterNestedStructTest { .put("tns", tnsList) } - fun getExpectedMap(): Map { - return JSONUtils.readValue>(data).mapValues(::convertDateNew).mapValues { - Values.value(it.value) - } + fun getExpectedMap(): Map { + return JSONUtils.readValue>(data).mapValues(::convertDateNew) } fun convertDate(it: Map.Entry): Any? = diff --git a/sink/src/test/kotlin/org/neo4j/connectors/kafka/sink/converters/Neo4jValueConverterTest.kt b/sink/src/test/kotlin/org/neo4j/connectors/kafka/sink/converters/Neo4jValueConverterTest.kt index 7633f7b3..d9676511 100644 --- a/sink/src/test/kotlin/org/neo4j/connectors/kafka/sink/converters/Neo4jValueConverterTest.kt +++ b/sink/src/test/kotlin/org/neo4j/connectors/kafka/sink/converters/Neo4jValueConverterTest.kt @@ -18,6 +18,9 @@ package org.neo4j.connectors.kafka.sink.converters import java.math.BigDecimal import java.time.Instant +import java.time.LocalDate +import java.time.LocalDateTime +import java.time.LocalTime import java.time.ZoneId import java.util.Date import kotlin.test.assertEquals @@ -30,16 +33,6 @@ import org.apache.kafka.connect.data.Struct import org.apache.kafka.connect.data.Time import org.apache.kafka.connect.data.Timestamp import org.junit.jupiter.api.Test -import org.neo4j.driver.Value -import org.neo4j.driver.Values -import org.neo4j.driver.internal.value.BytesValue -import org.neo4j.driver.internal.value.DateValue -import org.neo4j.driver.internal.value.FloatValue -import org.neo4j.driver.internal.value.IntegerValue -import org.neo4j.driver.internal.value.LocalDateTimeValue -import org.neo4j.driver.internal.value.LocalTimeValue -import org.neo4j.driver.internal.value.NullValue -import org.neo4j.driver.internal.value.StringValue class Neo4jValueConverterTest { @@ -85,52 +78,49 @@ class Neo4jValueConverterTest { fun `should convert tree with mixes types into map of neo4j values`() { val utc = ZoneId.of("UTC") - val result = Neo4jValueConverter().convert(Struct(TEST_SCHEMA)) as Map + val result = Neo4jValueConverter().convert(Struct(TEST_SCHEMA)) as Map val target = result["target"] - assertTrue { target is FloatValue } - assertEquals(123.4, target?.asDouble()) + assertTrue { target is Double } + assertEquals(123.4, target) val largeDecimal = result["largeDecimal"] - assertTrue { largeDecimal is StringValue } - assertEquals( - BigDecimal.valueOf(Double.MAX_VALUE).pow(2).toPlainString(), largeDecimal?.asString()) + assertTrue { largeDecimal is String } + assertEquals(BigDecimal.valueOf(Double.MAX_VALUE).pow(2).toPlainString(), largeDecimal) val byteArray = result["byteArray"] - assertTrue { byteArray is BytesValue } - assertEquals("Foo".toByteArray().map { it }, byteArray?.asByteArray()?.map { it }) + assertTrue { byteArray is ByteArray } + assertEquals("Foo", String(byteArray as ByteArray)) val int64 = result["int64"] - assertTrue { int64 is IntegerValue } - assertEquals(Long.MAX_VALUE, int64?.asLong()) + assertTrue { int64 is Long } + assertEquals(Long.MAX_VALUE, int64) val int64Timestamp = result["int64Timestamp"] - assertTrue { int64Timestamp is LocalDateTimeValue } + assertTrue { int64Timestamp is LocalDateTime } assertEquals( Date.from(Instant.ofEpochMilli(789)).toInstant().atZone(utc).toLocalDateTime(), - int64Timestamp?.asLocalDateTime()) + int64Timestamp) val int32 = result["int32"] - assertTrue { int32 is IntegerValue } - assertEquals(123, int32?.asInt()) + assertTrue { int32 is Int } + assertEquals(123, int32) val int32Date = result["int32Date"] - assertTrue { int32Date is DateValue } + assertTrue { int32Date is LocalDate } assertEquals( - Date.from(Instant.ofEpochMilli(456)).toInstant().atZone(utc).toLocalDate(), - int32Date?.asLocalDate()) + Date.from(Instant.ofEpochMilli(456)).toInstant().atZone(utc).toLocalDate(), int32Date) val int32Time = result["int32Time"] - assertTrue { int32Time is LocalTimeValue } + assertTrue { int32Time is LocalTime } assertEquals( - Date.from(Instant.ofEpochMilli(123)).toInstant().atZone(utc).toLocalTime(), - int32Time?.asLocalTime()) + Date.from(Instant.ofEpochMilli(123)).toInstant().atZone(utc).toLocalTime(), int32Time) val nullField = result["nullField"] - assertTrue { nullField is NullValue } + assertTrue { nullField == null } val nullFieldBytes = result["nullFieldBytes"] - assertTrue { nullFieldBytes is NullValue } + assertTrue { nullFieldBytes == null } } @Test @@ -140,13 +130,13 @@ class Neo4jValueConverterTest { val result = Neo4jValueConverter().convert(getItemElement(number)) val item = result["item"] - assertTrue { item is StringValue } - assertEquals(number.toPlainString(), item?.asString()) + assertTrue { item is String } + assertEquals(number.toPlainString(), item) val result2 = Neo4jValueConverter().convert(getItemElement(null)) val item2 = result2["item"] - assertTrue { item2 is NullValue } + assertTrue { item2 == null } } @Test @@ -156,8 +146,8 @@ class Neo4jValueConverterTest { val result = Neo4jValueConverter().convert(getItemElement(number)) val item = result["item"] - assertTrue { item is StringValue } - assertEquals(number.toPlainString(), item?.asString()) + assertTrue { item is String } + assertEquals(number.toPlainString(), item) } @Test @@ -167,8 +157,8 @@ class Neo4jValueConverterTest { val result = Neo4jValueConverter().convert(getItemElement(number)) val item = result["item"] - assertTrue { item is StringValue } - assertEquals(number.toPlainString(), item?.asString()) + assertTrue { item is String } + assertEquals(number.toPlainString(), item) } @Test @@ -178,8 +168,8 @@ class Neo4jValueConverterTest { val result = Neo4jValueConverter().convert(getItemElement(BigDecimal.valueOf(number))) val item = result["item"] - assertTrue { item is FloatValue } - assertEquals(number, item?.asDouble()) + assertTrue { item is Double } + assertEquals(number, item) } @Test @@ -189,8 +179,7 @@ class Neo4jValueConverterTest { val result = Neo4jValueConverter().convert(getItemElement(BigDecimal.valueOf(number))) val item = result["item"] - assertTrue { item is FloatValue } - assertEquals(number, item?.asDouble()) + assertEquals(number, item) } @Test @@ -211,13 +200,124 @@ class Neo4jValueConverterTest { "string" to string, "date" to date)) - assertEquals(double, result["double"]?.asDouble()) - assertEquals(long, result["long"]?.asLong()) - assertEquals(bigDouble.toPlainString(), result["bigDouble"]?.asString()) - assertEquals(string, result["string"]?.asString()) - assertEquals( - date.toInstant().atZone(ZoneId.of("UTC")).toLocalDateTime(), - result["date"]?.asLocalDateTime()) + assertEquals(double, result["double"]) + assertEquals(long, result["long"]) + assertEquals(bigDouble.toPlainString(), result["bigDouble"]) + assertEquals(string, result["string"]) + assertEquals(date.toInstant().atZone(ZoneId.of("UTC")).toLocalDateTime(), result["date"]) + } + + @Test + fun `should be able to process a CUD node format AVRO structure`() { + val idsStruct = SchemaBuilder.struct().field("ID_NUM", Schema.INT64_SCHEMA).build() + val propertiesStruct = + SchemaBuilder.struct() + .field("ID_NUM", Schema.INT64_SCHEMA) + .field("EMP_NAME", Schema.STRING_SCHEMA) + .field("PHONE", Schema.STRING_SCHEMA) + .build() + val cudSchema = + SchemaBuilder.struct() + .field("type", Schema.STRING_SCHEMA) + .field("labels", SchemaBuilder.array(Schema.STRING_SCHEMA)) + .field("op", Schema.STRING_SCHEMA) + .field("ids", idsStruct) + .field("properties", propertiesStruct) + .build() + + val cudStruct = + Struct(cudSchema) + .put("type", "node") + .put("labels", listOf("EMPLOYEE")) + .put("op", "merge") + .put("ids", Struct(idsStruct).put("ID_NUM", 36950L)) + .put( + "properties", + Struct(propertiesStruct) + .put("ID_NUM", 36950L) + .put("EMP_NAME", "Edythe") + .put("PHONE", "3333")) + + val actual = Neo4jValueConverter().convert(cudStruct) as Map<*, *> + val expected = + mapOf( + "type" to "node", + "labels" to listOf("EMPLOYEE"), + "op" to "merge", + "ids" to mapOf("ID_NUM" to 36950L), + "properties" to + mapOf("ID_NUM" to 36950L, "EMP_NAME" to "Edythe", "PHONE" to "3333")) + assertEquals(expected, actual) + } + + @Test + fun `should be able to process a CUD rel format AVRO structure`() { + val fromIdStruct = SchemaBuilder.struct().field("person_id", Schema.INT64_SCHEMA).build() + val toIdStruct = SchemaBuilder.struct().field("product_id", Schema.INT64_SCHEMA).build() + val fromNodeStruct = + SchemaBuilder.struct() + .field("labels", SchemaBuilder.array(Schema.STRING_SCHEMA)) + .field("op", Schema.STRING_SCHEMA) + .field("ids", fromIdStruct) + .build() + val toNodeStruct = + SchemaBuilder.struct() + .field("labels", SchemaBuilder.array(Schema.STRING_SCHEMA)) + .field("op", Schema.STRING_SCHEMA) + .field("ids", toIdStruct) + .build() + val propertiesStruct = + SchemaBuilder.struct() + .field("quantity", Schema.INT32_SCHEMA) + .field("year", Schema.STRING_SCHEMA) + .build() + val cudSchema = + SchemaBuilder.struct() + .field("type", Schema.STRING_SCHEMA) + .field("rel_type", Schema.STRING_SCHEMA) + .field("op", Schema.STRING_SCHEMA) + .field("properties", propertiesStruct) + .field("from", fromNodeStruct) + .field("to", toNodeStruct) + .build() + + val cudStruct = + Struct(cudSchema) + .put("type", "relationship") + .put("rel_type", "BOUGHT") + .put("op", "merge") + .put( + "from", + Struct(fromNodeStruct) + .put("ids", Struct(fromIdStruct).put("person_id", 1L)) + .put("op", "match") + .put("labels", listOf("Person"))) + .put( + "to", + Struct(toNodeStruct) + .put("ids", Struct(toIdStruct).put("product_id", 10L)) + .put("op", "merge") + .put("labels", listOf("Product"))) + .put("properties", Struct(propertiesStruct).put("quantity", 10).put("year", "2023")) + + val actual = Neo4jValueConverter().convert(cudStruct) as Map<*, *> + val expected = + mapOf( + "type" to "relationship", + "rel_type" to "BOUGHT", + "op" to "merge", + "from" to + mapOf( + "ids" to mapOf("person_id" to 1L), + "labels" to listOf("Person"), + "op" to "match"), + "to" to + mapOf( + "ids" to mapOf("product_id" to 10L), + "labels" to listOf("Product"), + "op" to "merge"), + "properties" to mapOf("quantity" to 10, "year" to "2023")) + assertEquals(expected, actual) } @Test @@ -356,33 +456,24 @@ class Neo4jValueConverterTest { return Struct(BODY_SCHEMA).put("ul", ulList).put("p", pList) } - fun getExpectedMap(): Map { + fun getExpectedMap(): Map { val firstULMap = mapOf( "value" to listOf( + mapOf("value" to "First UL - First Element", "class" to null), mapOf( - "value" to Values.value("First UL - First Element"), - "class" to Values.NULL), - mapOf( - "value" to Values.value("First UL - Second Element"), - "class" to Values.value(listOf("ClassA", "ClassB"))))) + "value" to "First UL - Second Element", + "class" to listOf("ClassA", "ClassB")))) val secondULMap = mapOf( "value" to listOf( - mapOf( - "value" to Values.value("Second UL - First Element"), - "class" to Values.NULL), - mapOf( - "value" to Values.value("Second UL - Second Element"), - "class" to Values.NULL))) - val ulListMap = Values.value(listOf(firstULMap, secondULMap)) + mapOf("value" to "Second UL - First Element", "class" to null), + mapOf("value" to "Second UL - Second Element", "class" to null))) + val ulListMap = listOf(firstULMap, secondULMap) val pListMap = - Values.value( - listOf( - mapOf("value" to Values.value("First Paragraph")), - mapOf("value" to Values.value("Second Paragraph")))) + listOf(mapOf("value" to "First Paragraph"), mapOf("value" to "Second Paragraph")) return mapOf("ul" to ulListMap, "p" to pListMap) }