Skip to content

Commit

Permalink
fixes #537: Kafka sink connector does not work with CUD ingestion str…
Browse files Browse the repository at this point in the history
…ategy and Avro format data
  • Loading branch information
conker84 committed Aug 21, 2023
1 parent 6ab213e commit 32f239c
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,23 @@ import java.util.Date
import java.util.concurrent.TimeUnit


class Neo4jValueConverter: MapValueConverter<Value>() {
class Neo4jValueConverter: MapValueConverter<Any>() {

companion object {
@JvmStatic private val UTC = ZoneId.of("UTC")
}

override fun setValue(result: MutableMap<String, Value?>?, fieldName: String, value: Any?) {
override fun setValue(result: MutableMap<String, Any?>?, fieldName: String, value: Any?) {
if (result != null) {
result[fieldName] = Values.value(value) ?: Values.NULL
result[fieldName] = value
}
}

override fun newValue(): MutableMap<String, Value?> {
override fun newValue(): MutableMap<String, Any?> {
return mutableMapOf()
}

override fun setDecimalField(result: MutableMap<String, Value?>?, fieldName: String, value: BigDecimal) {
override fun setDecimalField(result: MutableMap<String, Any?>?, fieldName: String, value: BigDecimal) {
val doubleValue = value.toDouble()
val fitsScale = doubleValue != Double.POSITIVE_INFINITY
&& doubleValue != Double.NEGATIVE_INFINITY
Expand All @@ -38,25 +38,25 @@ class Neo4jValueConverter: MapValueConverter<Value>() {
}
}

override fun setTimestampField(result: MutableMap<String, Value?>?, fieldName: String, value: Date) {
override fun setTimestampField(result: MutableMap<String, Any?>?, fieldName: String, value: Date) {
val localDate = value.toInstant().atZone(UTC).toLocalDateTime()
setValue(result, fieldName, localDate)

}

override fun setTimeField(result: MutableMap<String, Value?>?, fieldName: String, value: Date) {
override fun setTimeField(result: MutableMap<String, Any?>?, fieldName: String, value: Date) {
val time = LocalTime.ofNanoOfDay(TimeUnit.MILLISECONDS.toNanos(value.time))
setValue(result, fieldName, time)
}

override fun setDateField(result: MutableMap<String, Value?>?, fieldName: String, value: Date) {
override fun setDateField(result: MutableMap<String, Any?>?, fieldName: String, value: Date) {
val localDate = value.toInstant().atZone(UTC).toLocalDate()
setValue(result, fieldName, localDate)
}

override fun setStructField(result: MutableMap<String, Value?>?, fieldName: String, value: Struct) {
override fun setStructField(result: MutableMap<String, Any?>?, fieldName: String, value: Struct) {
val converted = convert(value)
.mapValues { it.value?.asObject() }
.mapValues { it.value }
.toMutableMap() as MutableMap<Any?, Any?>
setMap(result, fieldName, null, converted)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ class Neo4jValueConverterNestedStructTest {
.put("tns", tnsList)
}

fun getExpectedMap(): Map<String, Value> {
return JSONUtils.readValue<Map<String, Any?>>(data).mapValues(::convertDateNew).mapValues { Values.value(it.value) }
fun getExpectedMap(): Map<String, Any?> {
return JSONUtils.readValue<Map<String, Any?>>(data).mapValues(::convertDateNew)
}

fun convertDate(it: Map.Entry<String,Any?>) : Any? =
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,21 @@
package streams.kafka.connect.sink

import org.apache.kafka.connect.data.*
import org.apache.kafka.connect.data.ConnectSchema
import org.apache.kafka.connect.data.Decimal
import org.apache.kafka.connect.data.Schema
import org.apache.kafka.connect.data.SchemaBuilder
import org.apache.kafka.connect.data.Struct
import org.apache.kafka.connect.data.Time
import org.apache.kafka.connect.data.Timestamp
import org.junit.Test
import org.neo4j.driver.Value
import org.neo4j.driver.Values
import org.neo4j.driver.internal.value.*
import streams.kafka.connect.sink.converters.Neo4jValueConverter
import java.math.BigDecimal
import java.time.Instant
import java.time.LocalDate
import java.time.LocalDateTime
import java.time.LocalTime
import java.time.ZoneId
import java.util.Date
import java.util.*
import kotlin.test.assertEquals
import kotlin.test.assertTrue

Expand Down Expand Up @@ -57,45 +63,45 @@ class Neo4jValueConverterTest {
fun `should convert tree with mixes types into map of neo4j values`() {

val utc = ZoneId.of("UTC")
val result = Neo4jValueConverter().convert(Struct(TEST_SCHEMA)) as Map<String, Value?>
val result = Neo4jValueConverter().convert(Struct(TEST_SCHEMA)) as Map<String, Any?>

val target = result["target"]
assertTrue{ target is FloatValue }
assertEquals(123.4, target?.asDouble())
assertTrue { target is Double }
assertEquals(123.4, target)

val largeDecimal = result["largeDecimal"]
assertTrue{ largeDecimal is StringValue }
assertEquals(BigDecimal.valueOf(Double.MAX_VALUE).pow(2).toPlainString(), largeDecimal?.asString())
assertTrue{ largeDecimal is String }
assertEquals(BigDecimal.valueOf(Double.MAX_VALUE).pow(2).toPlainString(), largeDecimal)

val byteArray = result["byteArray"]
assertTrue{ byteArray is BytesValue }
assertEquals("Foo".toByteArray().map { it }, byteArray?.asByteArray()?.map { it })
assertTrue{ byteArray is ByteArray }
assertEquals("Foo", String(byteArray as ByteArray))

val int64 = result["int64"]
assertTrue{ int64 is IntegerValue }
assertEquals(Long.MAX_VALUE, int64?.asLong())
assertTrue{ int64 is Long }
assertEquals(Long.MAX_VALUE, int64)

val int64Timestamp = result["int64Timestamp"]
assertTrue{ int64Timestamp is LocalDateTimeValue }
assertEquals(Date.from(Instant.ofEpochMilli(789)).toInstant().atZone(utc).toLocalDateTime(), int64Timestamp?.asLocalDateTime())
assertTrue{ int64Timestamp is LocalDateTime }
assertEquals(Date.from(Instant.ofEpochMilli(789)).toInstant().atZone(utc).toLocalDateTime(), int64Timestamp)

val int32 = result["int32"]
assertTrue{ int32 is IntegerValue }
assertEquals(123, int32?.asInt())
assertTrue{ int32 is Int }
assertEquals(123, int32)

val int32Date = result["int32Date"]
assertTrue{ int32Date is DateValue }
assertEquals(Date.from(Instant.ofEpochMilli(456)).toInstant().atZone(utc).toLocalDate(), int32Date?.asLocalDate())
assertTrue{ int32Date is LocalDate }
assertEquals(Date.from(Instant.ofEpochMilli(456)).toInstant().atZone(utc).toLocalDate(), int32Date)

val int32Time = result["int32Time"]
assertTrue{ int32Time is LocalTimeValue }
assertEquals(Date.from(Instant.ofEpochMilli(123)).toInstant().atZone(utc).toLocalTime(), int32Time?.asLocalTime())
assertTrue{ int32Time is LocalTime }
assertEquals(Date.from(Instant.ofEpochMilli(123)).toInstant().atZone(utc).toLocalTime(), int32Time)

val nullField = result["nullField"]
assertTrue{ nullField is NullValue }
assertTrue{ nullField == null }

val nullFieldBytes = result["nullFieldBytes"]
assertTrue{ nullFieldBytes is NullValue }
assertTrue{ nullFieldBytes == null }

}

Expand All @@ -106,13 +112,13 @@ class Neo4jValueConverterTest {
val result = Neo4jValueConverter().convert(getItemElement(number))
val item = result["item"]

assertTrue{ item is StringValue }
assertEquals(number.toPlainString(), item?.asString())
assertTrue{ item is String }
assertEquals(number.toPlainString(), item)

val result2 = Neo4jValueConverter().convert(getItemElement(null))
val item2 = result2["item"]

assertTrue{ item2 is NullValue }
assertTrue{ item2 == null }
}

@Test
Expand All @@ -122,8 +128,8 @@ class Neo4jValueConverterTest {
val result = Neo4jValueConverter().convert(getItemElement(number))
val item = result["item"]

assertTrue{ item is StringValue }
assertEquals(number.toPlainString(), item?.asString())
assertTrue{ item is String }
assertEquals(number.toPlainString(), item)
}

@Test
Expand All @@ -133,8 +139,8 @@ class Neo4jValueConverterTest {
val result = Neo4jValueConverter().convert(getItemElement(number))
val item = result["item"]

assertTrue{ item is StringValue }
assertEquals(number.toPlainString(), item?.asString())
assertTrue{ item is String }
assertEquals(number.toPlainString(), item)
}

@Test
Expand All @@ -144,8 +150,8 @@ class Neo4jValueConverterTest {
val result = Neo4jValueConverter().convert(getItemElement(BigDecimal.valueOf(number)))
val item = result["item"]

assertTrue{ item is FloatValue }
assertEquals(number, item?.asDouble())
assertTrue{ item is Double }
assertEquals(number, item)
}

@Test
Expand All @@ -155,8 +161,7 @@ class Neo4jValueConverterTest {
val result = Neo4jValueConverter().convert(getItemElement(BigDecimal.valueOf(number)))
val item = result["item"]

assertTrue{ item is FloatValue }
assertEquals(number, item?.asDouble())
assertEquals(number, item)
}

@Test
Expand All @@ -174,11 +179,55 @@ class Neo4jValueConverterTest {
"string" to string,
"date" to date))

assertEquals(double, result["double"]?.asDouble())
assertEquals(long, result["long"]?.asLong())
assertEquals(bigDouble.toPlainString(), result["bigDouble"]?.asString())
assertEquals(string, result["string"]?.asString())
assertEquals(date.toInstant().atZone(ZoneId.of("UTC")).toLocalDateTime(), result["date"]?.asLocalDateTime())
assertEquals(double, result["double"])
assertEquals(long, result["long"])
assertEquals(bigDouble.toPlainString(), result["bigDouble"])
assertEquals(string, result["string"])
assertEquals(date.toInstant().atZone(ZoneId.of("UTC")).toLocalDateTime(), result["date"])
}

@Test
fun `should be able to process a CUD format AVRO structure`() {
val idsStruct = SchemaBuilder.struct()
.field("ID_NUM", Schema.INT64_SCHEMA)
.build()
val propertiesStruct = SchemaBuilder.struct()
.field("ID_NUM", Schema.INT64_SCHEMA)
.field("EMP_NAME", Schema.STRING_SCHEMA)
.field("PHONE", Schema.STRING_SCHEMA)
.build()
val cudSchema = SchemaBuilder.struct()
.field("type", Schema.STRING_SCHEMA)
.field("labels", SchemaBuilder.array(Schema.STRING_SCHEMA))
.field("op", Schema.STRING_SCHEMA)
.field("ids", idsStruct)
.field("properties", propertiesStruct)
.build()

val cudStruct = Struct(cudSchema)
.put("type", "node")
.put("labels", listOf("EMPLOYEE"))
.put("op", "merge")
.put("ids", Struct(idsStruct).put("ID_NUM", 36950L))
.put("properties", Struct(propertiesStruct)
.put("ID_NUM", 36950L)
.put("EMP_NAME", "Edythe")
.put("PHONE", "3333")
)

val actual = Neo4jValueConverter().convert(cudStruct) as Map<*, *>
val expected = mapOf<String, Any?>(
"type" to "node",
"labels" to listOf("EMPLOYEE"),
"op" to "merge",
"ids" to mapOf<String, Any?>("ID_NUM" to 36950L),
"properties" to mapOf<String, Any?>(
"ID_NUM" to 36950L,
"EMP_NAME" to "Edythe",
"PHONE" to "3333"
)
)
assertEquals(expected, actual)
}

@Test
Expand Down Expand Up @@ -302,16 +351,16 @@ class Neo4jValueConverterTest {
.put("p", pList)
}

fun getExpectedMap(): Map<String, Value> {
fun getExpectedMap(): Map<String, Any?> {
val firstULMap = mapOf("value" to listOf(
mapOf("value" to Values.value("First UL - First Element"), "class" to Values.NULL),
mapOf("value" to Values.value("First UL - Second Element"), "class" to Values.value(listOf("ClassA", "ClassB")))))
mapOf("value" to "First UL - First Element", "class" to null),
mapOf("value" to "First UL - Second Element", "class" to listOf("ClassA", "ClassB"))))
val secondULMap = mapOf("value" to listOf(
mapOf("value" to Values.value("Second UL - First Element"), "class" to Values.NULL),
mapOf("value" to Values.value("Second UL - Second Element"), "class" to Values.NULL)))
val ulListMap = Values.value(listOf(firstULMap, secondULMap))
val pListMap = Values.value(listOf(mapOf("value" to Values.value("First Paragraph")),
mapOf("value" to Values.value("Second Paragraph"))))
mapOf("value" to "Second UL - First Element", "class" to null),
mapOf("value" to "Second UL - Second Element", "class" to null)))
val ulListMap = listOf(firstULMap, secondULMap)
val pListMap = listOf(mapOf("value" to "First Paragraph"),
mapOf("value" to "Second Paragraph"))
return mapOf("ul" to ulListMap, "p" to pListMap)
}

Expand Down

0 comments on commit 32f239c

Please sign in to comment.