From 32f239c896a73d70bae3b38226d642ea4799522d Mon Sep 17 00:00:00 2001 From: Andrea Santurbano Date: Mon, 21 Aug 2023 17:31:09 +0200 Subject: [PATCH] fixes #537: Kafka sink connector does not work with CUD ingestion strategy and Avro format data --- .../sink/converters/Neo4jValueConverter.kt | 20 +-- .../Neo4jValueConverterNestedStructTest.kt | 4 +- .../connect/sink/Neo4jValueConverterTest.kt | 145 ++++++++++++------ 3 files changed, 109 insertions(+), 60 deletions(-) diff --git a/kafka-connect-neo4j/src/main/kotlin/streams/kafka/connect/sink/converters/Neo4jValueConverter.kt b/kafka-connect-neo4j/src/main/kotlin/streams/kafka/connect/sink/converters/Neo4jValueConverter.kt index 60ece516..51be839e 100644 --- a/kafka-connect-neo4j/src/main/kotlin/streams/kafka/connect/sink/converters/Neo4jValueConverter.kt +++ b/kafka-connect-neo4j/src/main/kotlin/streams/kafka/connect/sink/converters/Neo4jValueConverter.kt @@ -10,23 +10,23 @@ import java.util.Date import java.util.concurrent.TimeUnit -class Neo4jValueConverter: MapValueConverter() { +class Neo4jValueConverter: MapValueConverter() { companion object { @JvmStatic private val UTC = ZoneId.of("UTC") } - override fun setValue(result: MutableMap?, fieldName: String, value: Any?) { + override fun setValue(result: MutableMap?, fieldName: String, value: Any?) { if (result != null) { - result[fieldName] = Values.value(value) ?: Values.NULL + result[fieldName] = value } } - override fun newValue(): MutableMap { + override fun newValue(): MutableMap { return mutableMapOf() } - override fun setDecimalField(result: MutableMap?, fieldName: String, value: BigDecimal) { + override fun setDecimalField(result: MutableMap?, fieldName: String, value: BigDecimal) { val doubleValue = value.toDouble() val fitsScale = doubleValue != Double.POSITIVE_INFINITY && doubleValue != Double.NEGATIVE_INFINITY @@ -38,25 +38,25 @@ class Neo4jValueConverter: MapValueConverter() { } } - override fun setTimestampField(result: MutableMap?, fieldName: String, value: Date) { + override fun setTimestampField(result: MutableMap?, fieldName: String, value: Date) { val localDate = value.toInstant().atZone(UTC).toLocalDateTime() setValue(result, fieldName, localDate) } - override fun setTimeField(result: MutableMap?, fieldName: String, value: Date) { + override fun setTimeField(result: MutableMap?, fieldName: String, value: Date) { val time = LocalTime.ofNanoOfDay(TimeUnit.MILLISECONDS.toNanos(value.time)) setValue(result, fieldName, time) } - override fun setDateField(result: MutableMap?, fieldName: String, value: Date) { + override fun setDateField(result: MutableMap?, fieldName: String, value: Date) { val localDate = value.toInstant().atZone(UTC).toLocalDate() setValue(result, fieldName, localDate) } - override fun setStructField(result: MutableMap?, fieldName: String, value: Struct) { + override fun setStructField(result: MutableMap?, fieldName: String, value: Struct) { val converted = convert(value) - .mapValues { it.value?.asObject() } + .mapValues { it.value } .toMutableMap() as MutableMap setMap(result, fieldName, null, converted) } diff --git a/kafka-connect-neo4j/src/test/kotlin/streams/kafka/connect/sink/Neo4jValueConverterNestedStructTest.kt b/kafka-connect-neo4j/src/test/kotlin/streams/kafka/connect/sink/Neo4jValueConverterNestedStructTest.kt index 5c178c5a..ef157eb5 100644 --- a/kafka-connect-neo4j/src/test/kotlin/streams/kafka/connect/sink/Neo4jValueConverterNestedStructTest.kt +++ b/kafka-connect-neo4j/src/test/kotlin/streams/kafka/connect/sink/Neo4jValueConverterNestedStructTest.kt @@ -86,8 +86,8 @@ class Neo4jValueConverterNestedStructTest { .put("tns", tnsList) } - fun getExpectedMap(): Map { - return JSONUtils.readValue>(data).mapValues(::convertDateNew).mapValues { Values.value(it.value) } + fun getExpectedMap(): Map { + return JSONUtils.readValue>(data).mapValues(::convertDateNew) } fun convertDate(it: Map.Entry) : Any? = diff --git a/kafka-connect-neo4j/src/test/kotlin/streams/kafka/connect/sink/Neo4jValueConverterTest.kt b/kafka-connect-neo4j/src/test/kotlin/streams/kafka/connect/sink/Neo4jValueConverterTest.kt index 1583e948..b281dcd7 100644 --- a/kafka-connect-neo4j/src/test/kotlin/streams/kafka/connect/sink/Neo4jValueConverterTest.kt +++ b/kafka-connect-neo4j/src/test/kotlin/streams/kafka/connect/sink/Neo4jValueConverterTest.kt @@ -1,15 +1,21 @@ package streams.kafka.connect.sink -import org.apache.kafka.connect.data.* +import org.apache.kafka.connect.data.ConnectSchema +import org.apache.kafka.connect.data.Decimal +import org.apache.kafka.connect.data.Schema +import org.apache.kafka.connect.data.SchemaBuilder +import org.apache.kafka.connect.data.Struct +import org.apache.kafka.connect.data.Time +import org.apache.kafka.connect.data.Timestamp import org.junit.Test -import org.neo4j.driver.Value -import org.neo4j.driver.Values -import org.neo4j.driver.internal.value.* import streams.kafka.connect.sink.converters.Neo4jValueConverter import java.math.BigDecimal import java.time.Instant +import java.time.LocalDate +import java.time.LocalDateTime +import java.time.LocalTime import java.time.ZoneId -import java.util.Date +import java.util.* import kotlin.test.assertEquals import kotlin.test.assertTrue @@ -57,45 +63,45 @@ class Neo4jValueConverterTest { fun `should convert tree with mixes types into map of neo4j values`() { val utc = ZoneId.of("UTC") - val result = Neo4jValueConverter().convert(Struct(TEST_SCHEMA)) as Map + val result = Neo4jValueConverter().convert(Struct(TEST_SCHEMA)) as Map val target = result["target"] - assertTrue{ target is FloatValue } - assertEquals(123.4, target?.asDouble()) + assertTrue { target is Double } + assertEquals(123.4, target) val largeDecimal = result["largeDecimal"] - assertTrue{ largeDecimal is StringValue } - assertEquals(BigDecimal.valueOf(Double.MAX_VALUE).pow(2).toPlainString(), largeDecimal?.asString()) + assertTrue{ largeDecimal is String } + assertEquals(BigDecimal.valueOf(Double.MAX_VALUE).pow(2).toPlainString(), largeDecimal) val byteArray = result["byteArray"] - assertTrue{ byteArray is BytesValue } - assertEquals("Foo".toByteArray().map { it }, byteArray?.asByteArray()?.map { it }) + assertTrue{ byteArray is ByteArray } + assertEquals("Foo", String(byteArray as ByteArray)) val int64 = result["int64"] - assertTrue{ int64 is IntegerValue } - assertEquals(Long.MAX_VALUE, int64?.asLong()) + assertTrue{ int64 is Long } + assertEquals(Long.MAX_VALUE, int64) val int64Timestamp = result["int64Timestamp"] - assertTrue{ int64Timestamp is LocalDateTimeValue } - assertEquals(Date.from(Instant.ofEpochMilli(789)).toInstant().atZone(utc).toLocalDateTime(), int64Timestamp?.asLocalDateTime()) + assertTrue{ int64Timestamp is LocalDateTime } + assertEquals(Date.from(Instant.ofEpochMilli(789)).toInstant().atZone(utc).toLocalDateTime(), int64Timestamp) val int32 = result["int32"] - assertTrue{ int32 is IntegerValue } - assertEquals(123, int32?.asInt()) + assertTrue{ int32 is Int } + assertEquals(123, int32) val int32Date = result["int32Date"] - assertTrue{ int32Date is DateValue } - assertEquals(Date.from(Instant.ofEpochMilli(456)).toInstant().atZone(utc).toLocalDate(), int32Date?.asLocalDate()) + assertTrue{ int32Date is LocalDate } + assertEquals(Date.from(Instant.ofEpochMilli(456)).toInstant().atZone(utc).toLocalDate(), int32Date) val int32Time = result["int32Time"] - assertTrue{ int32Time is LocalTimeValue } - assertEquals(Date.from(Instant.ofEpochMilli(123)).toInstant().atZone(utc).toLocalTime(), int32Time?.asLocalTime()) + assertTrue{ int32Time is LocalTime } + assertEquals(Date.from(Instant.ofEpochMilli(123)).toInstant().atZone(utc).toLocalTime(), int32Time) val nullField = result["nullField"] - assertTrue{ nullField is NullValue } + assertTrue{ nullField == null } val nullFieldBytes = result["nullFieldBytes"] - assertTrue{ nullFieldBytes is NullValue } + assertTrue{ nullFieldBytes == null } } @@ -106,13 +112,13 @@ class Neo4jValueConverterTest { val result = Neo4jValueConverter().convert(getItemElement(number)) val item = result["item"] - assertTrue{ item is StringValue } - assertEquals(number.toPlainString(), item?.asString()) + assertTrue{ item is String } + assertEquals(number.toPlainString(), item) val result2 = Neo4jValueConverter().convert(getItemElement(null)) val item2 = result2["item"] - assertTrue{ item2 is NullValue } + assertTrue{ item2 == null } } @Test @@ -122,8 +128,8 @@ class Neo4jValueConverterTest { val result = Neo4jValueConverter().convert(getItemElement(number)) val item = result["item"] - assertTrue{ item is StringValue } - assertEquals(number.toPlainString(), item?.asString()) + assertTrue{ item is String } + assertEquals(number.toPlainString(), item) } @Test @@ -133,8 +139,8 @@ class Neo4jValueConverterTest { val result = Neo4jValueConverter().convert(getItemElement(number)) val item = result["item"] - assertTrue{ item is StringValue } - assertEquals(number.toPlainString(), item?.asString()) + assertTrue{ item is String } + assertEquals(number.toPlainString(), item) } @Test @@ -144,8 +150,8 @@ class Neo4jValueConverterTest { val result = Neo4jValueConverter().convert(getItemElement(BigDecimal.valueOf(number))) val item = result["item"] - assertTrue{ item is FloatValue } - assertEquals(number, item?.asDouble()) + assertTrue{ item is Double } + assertEquals(number, item) } @Test @@ -155,8 +161,7 @@ class Neo4jValueConverterTest { val result = Neo4jValueConverter().convert(getItemElement(BigDecimal.valueOf(number))) val item = result["item"] - assertTrue{ item is FloatValue } - assertEquals(number, item?.asDouble()) + assertEquals(number, item) } @Test @@ -174,11 +179,55 @@ class Neo4jValueConverterTest { "string" to string, "date" to date)) - assertEquals(double, result["double"]?.asDouble()) - assertEquals(long, result["long"]?.asLong()) - assertEquals(bigDouble.toPlainString(), result["bigDouble"]?.asString()) - assertEquals(string, result["string"]?.asString()) - assertEquals(date.toInstant().atZone(ZoneId.of("UTC")).toLocalDateTime(), result["date"]?.asLocalDateTime()) + assertEquals(double, result["double"]) + assertEquals(long, result["long"]) + assertEquals(bigDouble.toPlainString(), result["bigDouble"]) + assertEquals(string, result["string"]) + assertEquals(date.toInstant().atZone(ZoneId.of("UTC")).toLocalDateTime(), result["date"]) + } + + @Test + fun `should be able to process a CUD format AVRO structure`() { + val idsStruct = SchemaBuilder.struct() + .field("ID_NUM", Schema.INT64_SCHEMA) + .build() + val propertiesStruct = SchemaBuilder.struct() + .field("ID_NUM", Schema.INT64_SCHEMA) + .field("EMP_NAME", Schema.STRING_SCHEMA) + .field("PHONE", Schema.STRING_SCHEMA) + .build() + val cudSchema = SchemaBuilder.struct() + .field("type", Schema.STRING_SCHEMA) + .field("labels", SchemaBuilder.array(Schema.STRING_SCHEMA)) + .field("op", Schema.STRING_SCHEMA) + .field("ids", idsStruct) + .field("properties", propertiesStruct) + .build() + + val cudStruct = Struct(cudSchema) + .put("type", "node") + .put("labels", listOf("EMPLOYEE")) + .put("op", "merge") + .put("ids", Struct(idsStruct).put("ID_NUM", 36950L)) + .put("properties", Struct(propertiesStruct) + .put("ID_NUM", 36950L) + .put("EMP_NAME", "Edythe") + .put("PHONE", "3333") + ) + + val actual = Neo4jValueConverter().convert(cudStruct) as Map<*, *> + val expected = mapOf( + "type" to "node", + "labels" to listOf("EMPLOYEE"), + "op" to "merge", + "ids" to mapOf("ID_NUM" to 36950L), + "properties" to mapOf( + "ID_NUM" to 36950L, + "EMP_NAME" to "Edythe", + "PHONE" to "3333" + ) + ) + assertEquals(expected, actual) } @Test @@ -302,16 +351,16 @@ class Neo4jValueConverterTest { .put("p", pList) } - fun getExpectedMap(): Map { + fun getExpectedMap(): Map { val firstULMap = mapOf("value" to listOf( - mapOf("value" to Values.value("First UL - First Element"), "class" to Values.NULL), - mapOf("value" to Values.value("First UL - Second Element"), "class" to Values.value(listOf("ClassA", "ClassB"))))) + mapOf("value" to "First UL - First Element", "class" to null), + mapOf("value" to "First UL - Second Element", "class" to listOf("ClassA", "ClassB")))) val secondULMap = mapOf("value" to listOf( - mapOf("value" to Values.value("Second UL - First Element"), "class" to Values.NULL), - mapOf("value" to Values.value("Second UL - Second Element"), "class" to Values.NULL))) - val ulListMap = Values.value(listOf(firstULMap, secondULMap)) - val pListMap = Values.value(listOf(mapOf("value" to Values.value("First Paragraph")), - mapOf("value" to Values.value("Second Paragraph")))) + mapOf("value" to "Second UL - First Element", "class" to null), + mapOf("value" to "Second UL - Second Element", "class" to null))) + val ulListMap = listOf(firstULMap, secondULMap) + val pListMap = listOf(mapOf("value" to "First Paragraph"), + mapOf("value" to "Second Paragraph")) return mapOf("ul" to ulListMap, "p" to pListMap) }