Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Fix Kafka sink connector to work with CUD ingestion strategy and Avro format data.

See neo4j-contrib/neo4j-streams#582

Co-authored-by: Andrea Santurbano <[email protected]>
  • Loading branch information
fbiville and conker84 authored Nov 3, 2023
1 parent 8a06dc8 commit 0b6895c
Show file tree
Hide file tree
Showing 4 changed files with 237 additions and 99 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,28 +22,26 @@ import java.time.ZoneId
import java.util.Date
import java.util.concurrent.TimeUnit
import org.apache.kafka.connect.data.Struct
import org.neo4j.driver.Value
import org.neo4j.driver.Values

@Suppress("UNCHECKED_CAST")
class Neo4jValueConverter : MapValueConverter<Value>() {
class Neo4jValueConverter : MapValueConverter<Any>() {

companion object {
@JvmStatic private val UTC = ZoneId.of("UTC")
}

override fun setValue(result: MutableMap<String, Value?>?, fieldName: String, value: Any?) {
override fun setValue(result: MutableMap<String, Any?>?, fieldName: String, value: Any?) {
if (result != null) {
result[fieldName] = Values.value(value) ?: Values.NULL
result[fieldName] = value
}
}

override fun newValue(): MutableMap<String, Value?> {
override fun newValue(): MutableMap<String, Any?> {
return mutableMapOf()
}

override fun setDecimalField(
result: MutableMap<String, Value?>?,
result: MutableMap<String, Any?>?,
fieldName: String,
value: BigDecimal
) {
Expand All @@ -60,31 +58,26 @@ class Neo4jValueConverter : MapValueConverter<Value>() {
}

override fun setTimestampField(
result: MutableMap<String, Value?>?,
result: MutableMap<String, Any?>?,
fieldName: String,
value: Date
) {
val localDate = value.toInstant().atZone(UTC).toLocalDateTime()
setValue(result, fieldName, localDate)
}

override fun setTimeField(result: MutableMap<String, Value?>?, fieldName: String, value: Date) {
override fun setTimeField(result: MutableMap<String, Any?>?, fieldName: String, value: Date) {
val time = LocalTime.ofNanoOfDay(TimeUnit.MILLISECONDS.toNanos(value.time))
setValue(result, fieldName, time)
}

override fun setDateField(result: MutableMap<String, Value?>?, fieldName: String, value: Date) {
override fun setDateField(result: MutableMap<String, Any?>?, fieldName: String, value: Date) {
val localDate = value.toInstant().atZone(UTC).toLocalDate()
setValue(result, fieldName, localDate)
}

override fun setStructField(
result: MutableMap<String, Value?>?,
fieldName: String,
value: Struct
) {
val converted =
convert(value).mapValues { it.value?.asObject() }.toMutableMap() as MutableMap<Any?, Any?>
setMap(result, fieldName, null, converted)
override fun setStructField(result: MutableMap<String, Any?>?, fieldName: String, value: Struct) {
val converted = convert(value).toMutableMap() as MutableMap<Any?, Any?>
setValue(result, fieldName, converted)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,15 @@ class Neo4jSinkTaskTest {
task.initialize(Mockito.mock(SinkTaskContext::class.java))
}

private val PLACE_SCHEMA =
SchemaBuilder.struct()
.name("com.example.Place")
.field("name", Schema.STRING_SCHEMA)
.field("latitude", Schema.FLOAT32_SCHEMA)
.field("longitude", Schema.FLOAT32_SCHEMA)
.field("modified", Timestamp.SCHEMA)
.build()

private val PERSON_SCHEMA =
SchemaBuilder.struct()
.name("com.example.Person")
Expand All @@ -125,14 +134,7 @@ class Neo4jSinkTaskTest {
.field("float", Schema.OPTIONAL_FLOAT32_SCHEMA)
.field("double", Schema.OPTIONAL_FLOAT64_SCHEMA)
.field("modified", Timestamp.SCHEMA)
.build()

private val PLACE_SCHEMA =
SchemaBuilder.struct()
.name("com.example.Place")
.field("name", Schema.STRING_SCHEMA)
.field("latitude", Schema.FLOAT32_SCHEMA)
.field("longitude", Schema.FLOAT32_SCHEMA)
.field("place", PLACE_SCHEMA)
.build()

@Test
Expand Down Expand Up @@ -1905,6 +1907,62 @@ class Neo4jSinkTaskTest {
assertEquals(expectedDataErrorSize, exception.errorDatas.size)
}

@Test
fun `should successfully parse nested timestamps`() {
val firstTopic = "neotopic"
val secondTopic = "foo"
val props = mutableMapOf<String, String>()
props[Neo4jConfiguration.URI] = neo4j.boltUrl
props["${SinkConfiguration.TOPIC_CYPHER_PREFIX}$firstTopic"] =
"CREATE (n:PersonExt {modified: event.place.modified})"
props["${SinkConfiguration.TOPIC_CYPHER_PREFIX}$secondTopic"] =
"CREATE (n:Person {modified: event.place.modified})"
props[Neo4jConfiguration.AUTHENTICATION_TYPE] = AuthenticationType.NONE.toString()
props[SinkConfiguration.BATCH_SIZE] = 2.toString()
props[SinkTask.TOPICS_CONFIG] = "$firstTopic,$secondTopic"

val place =
Struct(PLACE_SCHEMA)
.put("name", "San Mateo (CA)")
.put("latitude", 37.5629917.toFloat())
.put("longitude", -122.3255254.toFloat())
.put("modified", Date(1474661402123L))
val struct =
Struct(PERSON_SCHEMA)
.put("firstName", "Alex")
.put("lastName", "Smith")
.put("bool", true)
.put("short", 1234.toShort())
.put("byte", (-32).toByte())
.put("long", 12425436L)
.put("float", 2356.3.toFloat())
.put("double", -2436546.56457)
.put("age", 21)
.put("modified", Date(1474661402123L))
.put("place", place)

task.start(props)
val input =
listOf(
SinkRecord(firstTopic, 1, null, null, PERSON_SCHEMA, struct, 42),
SinkRecord(firstTopic, 1, null, null, PERSON_SCHEMA, struct, 42),
SinkRecord(firstTopic, 1, null, null, PERSON_SCHEMA, struct, 42),
SinkRecord(firstTopic, 1, null, null, PERSON_SCHEMA, struct, 42),
SinkRecord(firstTopic, 1, null, null, PERSON_SCHEMA, struct, 42),
SinkRecord(secondTopic, 1, null, null, PERSON_SCHEMA, struct, 43))
task.put(input)
session.beginTransaction().use {
val personCount =
it.run("MATCH (p:Person) RETURN COUNT(p) as COUNT").single()["COUNT"].asLong()
val expectedPersonCount = input.filter { it.topic() == secondTopic }.size
assertEquals(expectedPersonCount, personCount.toInt())
val personExtCount =
it.run("MATCH (p:PersonExt) RETURN COUNT(p) as COUNT").single()["COUNT"].asLong()
val expectedPersonExtCount = input.filter { it.topic() == firstTopic }.size
assertEquals(expectedPersonExtCount, personExtCount.toInt())
}
}

private fun countFooPersonEntities(expected: Int) {
val personCount =
session.run("MATCH (p:Person) RETURN count(p) as count").single()["count"].asLong()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ import org.apache.kafka.connect.data.SchemaBuilder
import org.apache.kafka.connect.data.Struct
import org.junit.jupiter.api.Test
import org.neo4j.connectors.kafka.utils.JSONUtils
import org.neo4j.driver.Value
import org.neo4j.driver.Values

@Suppress("UNCHECKED_CAST")
class Neo4jValueConverterNestedStructTest {
Expand Down Expand Up @@ -125,10 +123,8 @@ class Neo4jValueConverterNestedStructTest {
.put("tns", tnsList)
}

fun getExpectedMap(): Map<String, Value> {
return JSONUtils.readValue<Map<String, Any?>>(data).mapValues(::convertDateNew).mapValues {
Values.value(it.value)
}
fun getExpectedMap(): Map<String, Any?> {
return JSONUtils.readValue<Map<String, Any?>>(data).mapValues(::convertDateNew)
}

fun convertDate(it: Map.Entry<String, Any?>): Any? =
Expand Down
Loading

0 comments on commit 0b6895c

Please sign in to comment.