From 7e9e8c0bb4b4e5bfb054baff3df0cd5b9fd99c69 Mon Sep 17 00:00:00 2001 From: Andrea Santurbano Date: Mon, 21 Aug 2023 17:31:09 +0200 Subject: [PATCH] fixes #537: Kafka sink connector does not work with CUD ingestion strategy and Avro format data --- .../sink/converters/Neo4jValueConverter.kt | 26 +-- .../kafka/connect/sink/Neo4jSinkTaskTest.kt | 76 ++++-- .../Neo4jValueConverterNestedStructTest.kt | 4 +- .../connect/sink/Neo4jValueConverterTest.kt | 218 ++++++++++++++---- 4 files changed, 247 insertions(+), 77 deletions(-) diff --git a/kafka-connect-neo4j/src/main/kotlin/streams/kafka/connect/sink/converters/Neo4jValueConverter.kt b/kafka-connect-neo4j/src/main/kotlin/streams/kafka/connect/sink/converters/Neo4jValueConverter.kt index 60ece516..ac71c523 100644 --- a/kafka-connect-neo4j/src/main/kotlin/streams/kafka/connect/sink/converters/Neo4jValueConverter.kt +++ b/kafka-connect-neo4j/src/main/kotlin/streams/kafka/connect/sink/converters/Neo4jValueConverter.kt @@ -10,23 +10,23 @@ import java.util.Date import java.util.concurrent.TimeUnit -class Neo4jValueConverter: MapValueConverter() { +class Neo4jValueConverter: MapValueConverter() { companion object { @JvmStatic private val UTC = ZoneId.of("UTC") } - override fun setValue(result: MutableMap?, fieldName: String, value: Any?) { + override fun setValue(result: MutableMap?, fieldName: String, value: Any?) { if (result != null) { - result[fieldName] = Values.value(value) ?: Values.NULL + result[fieldName] = value } } - override fun newValue(): MutableMap { + override fun newValue(): MutableMap { return mutableMapOf() } - override fun setDecimalField(result: MutableMap?, fieldName: String, value: BigDecimal) { + override fun setDecimalField(result: MutableMap?, fieldName: String, value: BigDecimal) { val doubleValue = value.toDouble() val fitsScale = doubleValue != Double.POSITIVE_INFINITY && doubleValue != Double.NEGATIVE_INFINITY @@ -38,26 +38,24 @@ class Neo4jValueConverter: MapValueConverter() { } } - override fun setTimestampField(result: MutableMap?, fieldName: String, value: Date) { + override fun setTimestampField(result: MutableMap?, fieldName: String, value: Date) { val localDate = value.toInstant().atZone(UTC).toLocalDateTime() setValue(result, fieldName, localDate) - } - override fun setTimeField(result: MutableMap?, fieldName: String, value: Date) { + override fun setTimeField(result: MutableMap?, fieldName: String, value: Date) { val time = LocalTime.ofNanoOfDay(TimeUnit.MILLISECONDS.toNanos(value.time)) setValue(result, fieldName, time) } - override fun setDateField(result: MutableMap?, fieldName: String, value: Date) { + override fun setDateField(result: MutableMap?, fieldName: String, value: Date) { val localDate = value.toInstant().atZone(UTC).toLocalDate() setValue(result, fieldName, localDate) } - override fun setStructField(result: MutableMap?, fieldName: String, value: Struct) { - val converted = convert(value) - .mapValues { it.value?.asObject() } - .toMutableMap() as MutableMap - setMap(result, fieldName, null, converted) + override fun setStructField(result: MutableMap?, fieldName: String, value: Struct) { + val converted = convert(value).toMutableMap() as MutableMap + setValue(result, fieldName, converted) } + } \ No newline at end of file diff --git a/kafka-connect-neo4j/src/test/kotlin/streams/kafka/connect/sink/Neo4jSinkTaskTest.kt b/kafka-connect-neo4j/src/test/kotlin/streams/kafka/connect/sink/Neo4jSinkTaskTest.kt index bccf9f84..543801f5 100644 --- a/kafka-connect-neo4j/src/test/kotlin/streams/kafka/connect/sink/Neo4jSinkTaskTest.kt +++ b/kafka-connect-neo4j/src/test/kotlin/streams/kafka/connect/sink/Neo4jSinkTaskTest.kt @@ -82,25 +82,27 @@ class Neo4jSinkTaskTest { task.initialize(mock(SinkTaskContext::class.java)) } - private val PERSON_SCHEMA = SchemaBuilder.struct().name("com.example.Person") - .field("firstName", Schema.STRING_SCHEMA) - .field("lastName", Schema.STRING_SCHEMA) - .field("age", Schema.OPTIONAL_INT32_SCHEMA) - .field("bool", Schema.OPTIONAL_BOOLEAN_SCHEMA) - .field("short", Schema.OPTIONAL_INT16_SCHEMA) - .field("byte", Schema.OPTIONAL_INT8_SCHEMA) - .field("long", Schema.OPTIONAL_INT64_SCHEMA) - .field("float", Schema.OPTIONAL_FLOAT32_SCHEMA) - .field("double", Schema.OPTIONAL_FLOAT64_SCHEMA) - .field("modified", Timestamp.SCHEMA) - .build() - private val PLACE_SCHEMA = SchemaBuilder.struct().name("com.example.Place") .field("name", Schema.STRING_SCHEMA) .field("latitude", Schema.FLOAT32_SCHEMA) .field("longitude", Schema.FLOAT32_SCHEMA) + .field("modified", Timestamp.SCHEMA) .build() + private val PERSON_SCHEMA = SchemaBuilder.struct().name("com.example.Person") + .field("firstName", Schema.STRING_SCHEMA) + .field("lastName", Schema.STRING_SCHEMA) + .field("age", Schema.OPTIONAL_INT32_SCHEMA) + .field("bool", Schema.OPTIONAL_BOOLEAN_SCHEMA) + .field("short", Schema.OPTIONAL_INT16_SCHEMA) + .field("byte", Schema.OPTIONAL_INT8_SCHEMA) + .field("long", Schema.OPTIONAL_INT64_SCHEMA) + .field("float", Schema.OPTIONAL_FLOAT32_SCHEMA) + .field("double", Schema.OPTIONAL_FLOAT64_SCHEMA) + .field("modified", Timestamp.SCHEMA) + .field("place", PLACE_SCHEMA) + .build() + @Test fun `test array of struct`() { val firstTopic = "neotopic" @@ -1351,6 +1353,54 @@ class Neo4jSinkTaskTest { } } + @Test + fun `should successfully parse nested timestamps`() { + val firstTopic = "neotopic" + val secondTopic = "foo" + val props = mutableMapOf() + props[Neo4jConnectorConfig.SERVER_URI] = neo4j.boltUrl + props["${Neo4jSinkConnectorConfig.TOPIC_CYPHER_PREFIX}$firstTopic"] = "CREATE (n:PersonExt {modified: event.place.modified})" + props["${Neo4jSinkConnectorConfig.TOPIC_CYPHER_PREFIX}$secondTopic"] = "CREATE (n:Person {modified: event.place.modified})" + props[Neo4jConnectorConfig.AUTHENTICATION_TYPE] = AuthenticationType.NONE.toString() + props[Neo4jConnectorConfig.BATCH_SIZE] = 2.toString() + props[SinkTask.TOPICS_CONFIG] = "$firstTopic,$secondTopic" + + val place = Struct(PLACE_SCHEMA) + .put("name", "San Mateo (CA)") + .put("latitude", 37.5629917.toFloat()) + .put("longitude", -122.3255254.toFloat()) + .put("modified", Date(1474661402123L)) + val struct = Struct(PERSON_SCHEMA) + .put("firstName", "Alex") + .put("lastName", "Smith") + .put("bool", true) + .put("short", 1234.toShort()) + .put("byte", (-32).toByte()) + .put("long", 12425436L) + .put("float", 2356.3.toFloat()) + .put("double", -2436546.56457) + .put("age", 21) + .put("modified", Date(1474661402123L)) + .put("place", place) + + task.start(props) + val input = listOf(SinkRecord(firstTopic, 1, null, null, PERSON_SCHEMA, struct, 42), + SinkRecord(firstTopic, 1, null, null, PERSON_SCHEMA, struct, 42), + SinkRecord(firstTopic, 1, null, null, PERSON_SCHEMA, struct, 42), + SinkRecord(firstTopic, 1, null, null, PERSON_SCHEMA, struct, 42), + SinkRecord(firstTopic, 1, null, null, PERSON_SCHEMA, struct, 42), + SinkRecord(secondTopic, 1, null, null, PERSON_SCHEMA, struct, 43)) + task.put(input) + session.beginTransaction().use { + val personCount = it.run("MATCH (p:Person) RETURN COUNT(p) as COUNT").single()["COUNT"].asLong() + val expectedPersonCount = input.filter { it.topic() == secondTopic }.size + assertEquals(expectedPersonCount, personCount.toInt()) + val personExtCount = it.run("MATCH (p:PersonExt) RETURN COUNT(p) as COUNT").single()["COUNT"].asLong() + val expectedPersonExtCount = input.filter { it.topic() == firstTopic }.size + assertEquals(expectedPersonExtCount, personExtCount.toInt()) + } + } + private fun countFooPersonEntities(expected: Int) { val personCount = session.run("MATCH (p:Person) RETURN count(p) as count") .single()["count"] diff --git a/kafka-connect-neo4j/src/test/kotlin/streams/kafka/connect/sink/Neo4jValueConverterNestedStructTest.kt b/kafka-connect-neo4j/src/test/kotlin/streams/kafka/connect/sink/Neo4jValueConverterNestedStructTest.kt index 5c178c5a..ef157eb5 100644 --- a/kafka-connect-neo4j/src/test/kotlin/streams/kafka/connect/sink/Neo4jValueConverterNestedStructTest.kt +++ b/kafka-connect-neo4j/src/test/kotlin/streams/kafka/connect/sink/Neo4jValueConverterNestedStructTest.kt @@ -86,8 +86,8 @@ class Neo4jValueConverterNestedStructTest { .put("tns", tnsList) } - fun getExpectedMap(): Map { - return JSONUtils.readValue>(data).mapValues(::convertDateNew).mapValues { Values.value(it.value) } + fun getExpectedMap(): Map { + return JSONUtils.readValue>(data).mapValues(::convertDateNew) } fun convertDate(it: Map.Entry) : Any? = diff --git a/kafka-connect-neo4j/src/test/kotlin/streams/kafka/connect/sink/Neo4jValueConverterTest.kt b/kafka-connect-neo4j/src/test/kotlin/streams/kafka/connect/sink/Neo4jValueConverterTest.kt index 1583e948..b86952fb 100644 --- a/kafka-connect-neo4j/src/test/kotlin/streams/kafka/connect/sink/Neo4jValueConverterTest.kt +++ b/kafka-connect-neo4j/src/test/kotlin/streams/kafka/connect/sink/Neo4jValueConverterTest.kt @@ -1,15 +1,21 @@ package streams.kafka.connect.sink -import org.apache.kafka.connect.data.* +import org.apache.kafka.connect.data.ConnectSchema +import org.apache.kafka.connect.data.Decimal +import org.apache.kafka.connect.data.Schema +import org.apache.kafka.connect.data.SchemaBuilder +import org.apache.kafka.connect.data.Struct +import org.apache.kafka.connect.data.Time +import org.apache.kafka.connect.data.Timestamp import org.junit.Test -import org.neo4j.driver.Value -import org.neo4j.driver.Values -import org.neo4j.driver.internal.value.* import streams.kafka.connect.sink.converters.Neo4jValueConverter import java.math.BigDecimal import java.time.Instant +import java.time.LocalDate +import java.time.LocalDateTime +import java.time.LocalTime import java.time.ZoneId -import java.util.Date +import java.util.* import kotlin.test.assertEquals import kotlin.test.assertTrue @@ -57,45 +63,45 @@ class Neo4jValueConverterTest { fun `should convert tree with mixes types into map of neo4j values`() { val utc = ZoneId.of("UTC") - val result = Neo4jValueConverter().convert(Struct(TEST_SCHEMA)) as Map + val result = Neo4jValueConverter().convert(Struct(TEST_SCHEMA)) as Map val target = result["target"] - assertTrue{ target is FloatValue } - assertEquals(123.4, target?.asDouble()) + assertTrue { target is Double } + assertEquals(123.4, target) val largeDecimal = result["largeDecimal"] - assertTrue{ largeDecimal is StringValue } - assertEquals(BigDecimal.valueOf(Double.MAX_VALUE).pow(2).toPlainString(), largeDecimal?.asString()) + assertTrue{ largeDecimal is String } + assertEquals(BigDecimal.valueOf(Double.MAX_VALUE).pow(2).toPlainString(), largeDecimal) val byteArray = result["byteArray"] - assertTrue{ byteArray is BytesValue } - assertEquals("Foo".toByteArray().map { it }, byteArray?.asByteArray()?.map { it }) + assertTrue{ byteArray is ByteArray } + assertEquals("Foo", String(byteArray as ByteArray)) val int64 = result["int64"] - assertTrue{ int64 is IntegerValue } - assertEquals(Long.MAX_VALUE, int64?.asLong()) + assertTrue{ int64 is Long } + assertEquals(Long.MAX_VALUE, int64) val int64Timestamp = result["int64Timestamp"] - assertTrue{ int64Timestamp is LocalDateTimeValue } - assertEquals(Date.from(Instant.ofEpochMilli(789)).toInstant().atZone(utc).toLocalDateTime(), int64Timestamp?.asLocalDateTime()) + assertTrue{ int64Timestamp is LocalDateTime } + assertEquals(Date.from(Instant.ofEpochMilli(789)).toInstant().atZone(utc).toLocalDateTime(), int64Timestamp) val int32 = result["int32"] - assertTrue{ int32 is IntegerValue } - assertEquals(123, int32?.asInt()) + assertTrue{ int32 is Int } + assertEquals(123, int32) val int32Date = result["int32Date"] - assertTrue{ int32Date is DateValue } - assertEquals(Date.from(Instant.ofEpochMilli(456)).toInstant().atZone(utc).toLocalDate(), int32Date?.asLocalDate()) + assertTrue{ int32Date is LocalDate } + assertEquals(Date.from(Instant.ofEpochMilli(456)).toInstant().atZone(utc).toLocalDate(), int32Date) val int32Time = result["int32Time"] - assertTrue{ int32Time is LocalTimeValue } - assertEquals(Date.from(Instant.ofEpochMilli(123)).toInstant().atZone(utc).toLocalTime(), int32Time?.asLocalTime()) + assertTrue{ int32Time is LocalTime } + assertEquals(Date.from(Instant.ofEpochMilli(123)).toInstant().atZone(utc).toLocalTime(), int32Time) val nullField = result["nullField"] - assertTrue{ nullField is NullValue } + assertTrue{ nullField == null } val nullFieldBytes = result["nullFieldBytes"] - assertTrue{ nullFieldBytes is NullValue } + assertTrue{ nullFieldBytes == null } } @@ -106,13 +112,13 @@ class Neo4jValueConverterTest { val result = Neo4jValueConverter().convert(getItemElement(number)) val item = result["item"] - assertTrue{ item is StringValue } - assertEquals(number.toPlainString(), item?.asString()) + assertTrue{ item is String } + assertEquals(number.toPlainString(), item) val result2 = Neo4jValueConverter().convert(getItemElement(null)) val item2 = result2["item"] - assertTrue{ item2 is NullValue } + assertTrue{ item2 == null } } @Test @@ -122,8 +128,8 @@ class Neo4jValueConverterTest { val result = Neo4jValueConverter().convert(getItemElement(number)) val item = result["item"] - assertTrue{ item is StringValue } - assertEquals(number.toPlainString(), item?.asString()) + assertTrue{ item is String } + assertEquals(number.toPlainString(), item) } @Test @@ -133,8 +139,8 @@ class Neo4jValueConverterTest { val result = Neo4jValueConverter().convert(getItemElement(number)) val item = result["item"] - assertTrue{ item is StringValue } - assertEquals(number.toPlainString(), item?.asString()) + assertTrue{ item is String } + assertEquals(number.toPlainString(), item) } @Test @@ -144,8 +150,8 @@ class Neo4jValueConverterTest { val result = Neo4jValueConverter().convert(getItemElement(BigDecimal.valueOf(number))) val item = result["item"] - assertTrue{ item is FloatValue } - assertEquals(number, item?.asDouble()) + assertTrue{ item is Double } + assertEquals(number, item) } @Test @@ -155,8 +161,7 @@ class Neo4jValueConverterTest { val result = Neo4jValueConverter().convert(getItemElement(BigDecimal.valueOf(number))) val item = result["item"] - assertTrue{ item is FloatValue } - assertEquals(number, item?.asDouble()) + assertEquals(number, item) } @Test @@ -174,11 +179,128 @@ class Neo4jValueConverterTest { "string" to string, "date" to date)) - assertEquals(double, result["double"]?.asDouble()) - assertEquals(long, result["long"]?.asLong()) - assertEquals(bigDouble.toPlainString(), result["bigDouble"]?.asString()) - assertEquals(string, result["string"]?.asString()) - assertEquals(date.toInstant().atZone(ZoneId.of("UTC")).toLocalDateTime(), result["date"]?.asLocalDateTime()) + assertEquals(double, result["double"]) + assertEquals(long, result["long"]) + assertEquals(bigDouble.toPlainString(), result["bigDouble"]) + assertEquals(string, result["string"]) + assertEquals(date.toInstant().atZone(ZoneId.of("UTC")).toLocalDateTime(), result["date"]) + } + + @Test + fun `should be able to process a CUD node format AVRO structure`() { + val idsStruct = SchemaBuilder.struct() + .field("ID_NUM", Schema.INT64_SCHEMA) + .build() + val propertiesStruct = SchemaBuilder.struct() + .field("ID_NUM", Schema.INT64_SCHEMA) + .field("EMP_NAME", Schema.STRING_SCHEMA) + .field("PHONE", Schema.STRING_SCHEMA) + .build() + val cudSchema = SchemaBuilder.struct() + .field("type", Schema.STRING_SCHEMA) + .field("labels", SchemaBuilder.array(Schema.STRING_SCHEMA)) + .field("op", Schema.STRING_SCHEMA) + .field("ids", idsStruct) + .field("properties", propertiesStruct) + .build() + + val cudStruct = Struct(cudSchema) + .put("type", "node") + .put("labels", listOf("EMPLOYEE")) + .put("op", "merge") + .put("ids", Struct(idsStruct).put("ID_NUM", 36950L)) + .put("properties", Struct(propertiesStruct) + .put("ID_NUM", 36950L) + .put("EMP_NAME", "Edythe") + .put("PHONE", "3333") + ) + + val actual = Neo4jValueConverter().convert(cudStruct) as Map<*, *> + val expected = mapOf( + "type" to "node", + "labels" to listOf("EMPLOYEE"), + "op" to "merge", + "ids" to mapOf("ID_NUM" to 36950L), + "properties" to mapOf( + "ID_NUM" to 36950L, + "EMP_NAME" to "Edythe", + "PHONE" to "3333" + ) + ) + assertEquals(expected, actual) + } + + @Test + fun `should be able to process a CUD rel format AVRO structure`() { + val fromIdStruct = SchemaBuilder.struct() + .field("person_id", Schema.INT64_SCHEMA) + .build() + val toIdStruct = SchemaBuilder.struct() + .field("product_id", Schema.INT64_SCHEMA) + .build() + val fromNodeStruct = SchemaBuilder.struct() + .field("labels", SchemaBuilder.array(Schema.STRING_SCHEMA)) + .field("op", Schema.STRING_SCHEMA) + .field("ids", fromIdStruct) + .build() + val toNodeStruct = SchemaBuilder.struct() + .field("labels", SchemaBuilder.array(Schema.STRING_SCHEMA)) + .field("op", Schema.STRING_SCHEMA) + .field("ids", toIdStruct) + .build() + val propertiesStruct = SchemaBuilder.struct() + .field("quantity", Schema.INT32_SCHEMA) + .field("year", Schema.STRING_SCHEMA) + .build() + val cudSchema = SchemaBuilder.struct() + .field("type", Schema.STRING_SCHEMA) + .field("rel_type", Schema.STRING_SCHEMA) + .field("op", Schema.STRING_SCHEMA) + .field("properties", propertiesStruct) + .field("from", fromNodeStruct) + .field("to", toNodeStruct) + .build() + + val cudStruct = Struct(cudSchema) + .put("type", "relationship") + .put("rel_type", "BOUGHT") + .put("op", "merge") + .put("from", Struct(fromNodeStruct) + .put("ids", Struct(fromIdStruct).put("person_id", 1L)) + .put("op", "match") + .put("labels", listOf("Person")) + ) + .put("to", Struct(toNodeStruct) + .put("ids", Struct(toIdStruct).put("product_id", 10L)) + .put("op", "merge") + .put("labels", listOf("Product")) + ) + .put("properties", Struct(propertiesStruct) + .put("quantity", 10) + .put("year", "2023") + ) + + val actual = Neo4jValueConverter().convert(cudStruct) as Map<*, *> + val expected = mapOf( + "type" to "relationship", + "rel_type" to "BOUGHT", + "op" to "merge", + "from" to mapOf( + "ids" to mapOf("person_id" to 1L), + "labels" to listOf("Person"), + "op" to "match" + ), + "to" to mapOf( + "ids" to mapOf("product_id" to 10L), + "labels" to listOf("Product"), + "op" to "merge" + ), + "properties" to mapOf( + "quantity" to 10, + "year" to "2023" + ) + ) + assertEquals(expected, actual) } @Test @@ -302,16 +424,16 @@ class Neo4jValueConverterTest { .put("p", pList) } - fun getExpectedMap(): Map { + fun getExpectedMap(): Map { val firstULMap = mapOf("value" to listOf( - mapOf("value" to Values.value("First UL - First Element"), "class" to Values.NULL), - mapOf("value" to Values.value("First UL - Second Element"), "class" to Values.value(listOf("ClassA", "ClassB"))))) + mapOf("value" to "First UL - First Element", "class" to null), + mapOf("value" to "First UL - Second Element", "class" to listOf("ClassA", "ClassB")))) val secondULMap = mapOf("value" to listOf( - mapOf("value" to Values.value("Second UL - First Element"), "class" to Values.NULL), - mapOf("value" to Values.value("Second UL - Second Element"), "class" to Values.NULL))) - val ulListMap = Values.value(listOf(firstULMap, secondULMap)) - val pListMap = Values.value(listOf(mapOf("value" to Values.value("First Paragraph")), - mapOf("value" to Values.value("Second Paragraph")))) + mapOf("value" to "Second UL - First Element", "class" to null), + mapOf("value" to "Second UL - Second Element", "class" to null))) + val ulListMap = listOf(firstULMap, secondULMap) + val pListMap = listOf(mapOf("value" to "First Paragraph"), + mapOf("value" to "Second Paragraph")) return mapOf("ul" to ulListMap, "p" to pListMap) }