Skip to content

Commit

Permalink
Issue #549: Upgrade kafka-avro-serializer dependency (#573)
Browse files Browse the repository at this point in the history
* Issue #549: Upgrade kafka-avro-serializer dependency

* fixed flaky test

* fixed KafkaEventRouterEnterpriseTSE

* Fixed SchemaRegistryContainer

* Ali feedback

* jackson 2.15.3 doesn't exist, downgrade to 2.15.2

* updated api

* downgrade to jackson 2.14.3

---------

Co-authored-by: Andrea Santurbano <[email protected]>
  • Loading branch information
mroiter-larus and conker84 authored Sep 1, 2023
1 parent 3afaa10 commit fead34b
Show file tree
Hide file tree
Showing 11 changed files with 35 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ private fun convertAvroData(rawValue: Any?): Any? = when (rawValue) {
.mapValues { convertAvroData(it.value) }
is GenericFixed -> rawValue.bytes()
is ByteBuffer -> rawValue.array()
is GenericEnumSymbol, is CharSequence -> rawValue.toString()
is GenericEnumSymbol<*>, is CharSequence -> rawValue.toString()
else -> rawValue
}
fun IndexedRecord.toMap() = this.schema.fields
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.clients.producer.internals.FutureRecordMetadata
import org.apache.kafka.common.record.RecordBatch
import org.apache.kafka.common.utils.SystemTime
import org.apache.kafka.common.utils.Time
import org.junit.Test
import org.mockito.ArgumentMatchers
import org.mockito.Mockito
Expand All @@ -25,7 +24,7 @@ class KafkaErrorServiceTest {
val counter = AtomicInteger(0)
Mockito.`when`(producer.send(ArgumentMatchers.any<ProducerRecord<ByteArray, ByteArray>>())).then {
counter.incrementAndGet()
FutureRecordMetadata(null, 0, RecordBatch.NO_TIMESTAMP, 0L, 0, 0, SystemTime())
FutureRecordMetadata(null, 0, RecordBatch.NO_TIMESTAMP, 0, 0, 0, SystemTime())
}
val dlqService = KafkaErrorService(producer, ErrorService.ErrorConfig(fail=false,dlqTopic = "dlqTopic"), { s, e -> })
dlqService.report(listOf(dlqData()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ class KafkaEventSinkSuiteIT {
* 4.0.x | 1.0.x
* 4.1.x | 1.1.x
* 5.0.x | 2.0.x
* 7.4.X | 3.4.x (We are currently using 3.5.1 which is backward compatible)
*
* Please see also https://docs.confluent.io/current/installation/versions-interoperability.html#cp-and-apache-kafka-compatibility
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package integrations.kafka
import org.testcontainers.containers.GenericContainer
import org.testcontainers.containers.wait.strategy.Wait
import org.testcontainers.containers.KafkaContainer
import org.testcontainers.containers.KafkaContainer.KAFKA_PORT
import org.testcontainers.containers.Network
import org.testcontainers.containers.SocatContainer
import java.util.stream.Stream
Expand All @@ -30,8 +29,9 @@ class SchemaRegistryContainer(version: String): GenericContainer<SchemaRegistryC
return withKafka(kafka.network, kafka.networkAliases.map { "PLAINTEXT://$it:9092" }.joinToString(","))
}

fun withKafka(network: Network, bootstrapServers: String): SchemaRegistryContainer {
fun withKafka(network: Network?, bootstrapServers: String): SchemaRegistryContainer {
withNetwork(network)
withExposedPorts(PORT)
withEnv("SCHEMA_REGISTRY_HOST_NAME", "schema-registry")
withEnv("SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS", bootstrapServers)
return self()
Expand Down
2 changes: 1 addition & 1 deletion kafka-connect-neo4j/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<confluent.connect.plugin.version>0.11.1</confluent.connect.plugin.version>
<mvn.assembly.plugin.version>3.1.0</mvn.assembly.plugin.version>
<kafka.connect.utils.version>0.3.141</kafka.connect.utils.version>
<google.guava.version>27.0.1-jre</google.guava.version>
<google.guava.version>32.1.1-jre</google.guava.version>
</properties>

<repositories>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,16 @@ class Neo4jSourceTaskTest {
task.start(props)
val totalRecords = 10
insertRecords(totalRecords)

task.poll()
var exception: ConnectException? = null
Assert.assertEventually(ThrowingSupplier<Boolean, Exception> {
try {
task.poll()
false
} catch (e: ConnectException) {
exception = e
true
}
}, Matchers.equalTo(true), 30, TimeUnit.SECONDS)
if (exception != null) throw exception as ConnectException
}
}
24 changes: 12 additions & 12 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -54,16 +54,16 @@
<java.version>1.8</java.version>
<kotlin.version>1.6.10</kotlin.version>
<kotlin.coroutines.version>1.6.0</kotlin.coroutines.version>
<neo4j.version>4.4.3</neo4j.version>
<kafka.version>2.4.1</kafka.version>
<jackson.version>2.13.1</jackson.version>
<neo4j.version>4.4.25</neo4j.version>
<kafka.version>2.6.3</kafka.version>
<jackson.version>2.14.3</jackson.version>
<kotlin.compiler.incremental>true</kotlin.compiler.incremental>
<neo4j.java.driver.version>4.4.2</neo4j.java.driver.version>
<testcontainers.version>1.15.1</testcontainers.version>
<avro.version>1.8.2</avro.version>
<testcontainers.version>1.18.3</testcontainers.version>
<avro.version>1.11.2</avro.version>
<mokito.version>3.3.0</mokito.version>
<junit.version>4.13.2</junit.version>
<kafka.avro.serializer.version>5.2.2</kafka.avro.serializer.version>
<kafka.avro.serializer.version>7.4.0</kafka.avro.serializer.version>
<junit-jupiter.version>5.7.1</junit-jupiter.version>
<hamcrest.version>1.3</hamcrest.version>
<neo4j.configuration-lifecycle.version>ad59084711</neo4j.configuration-lifecycle.version>
Expand Down Expand Up @@ -297,12 +297,12 @@
<scope>test</scope>
</dependency>

<!-- <dependency>-->
<!-- <groupId>org.slf4j</groupId>-->
<!-- <artifactId>slf4j-simple</artifactId>-->
<!-- <version>1.7.30</version>-->
<!-- <scope>test</scope>-->
<!-- </dependency>-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.30</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.neo4j.community</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class KafkaEventRouterEnterpriseTSE {
}
StreamsUtils.ignoreExceptions({
neo4j.withWaitStrategy(LogMessageWaitStrategy()
.withRegEx(".*\\[(${DB_NAME_NAMES.joinToString("|")}|neo4j)] \\[Source\\] Streams Source module initialised\n")
.withRegEx(".*\\[(${DB_NAME_NAMES.joinToString("|")}|neo4j)/\\w+\\] \\[Source\\] Streams Source module initialised\n")
.withTimes(DB_NAME_NAMES.size + 1)
.withStartupTimeout(Duration.ofMinutes(10)))
DB_NAME_NAMES.forEach {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class KafkaEventRouterSuiteIT {
* 4.0.x | 1.0.x
* 4.1.x | 1.1.x
* 5.0.x | 2.0.x
* 7.4.x | 3.4.x (We are currently using 3.5.1 which is backward compatible)
*
* Please see also https://docs.confluent.io/current/installation/versions-interoperability.html#cp-and-apache-kafka-compatibility
*/
Expand Down
2 changes: 1 addition & 1 deletion test-support/src/main/kotlin/streams/MavenUtils.kt
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ object MavenUtils {

val rt = Runtime.getRuntime()
val mvnw = if (System.getProperty("os.name").startsWith("Windows")) "./mvnw.cmd" else "./mvnw"
val commands = arrayOf(mvnw, "-pl", "!doc,!kafka-connect-neo4j", "-DbuildSubDirectory=containerPlugins") +
val commands = arrayOf(mvnw, "-pl", "!kafka-connect-neo4j", "-DbuildSubDirectory=containerPlugins") +
args.let { if (it.isNullOrEmpty()) arrayOf("package", "-Dmaven.test.skip") else it }
val proc = rt.exec(commands, null, File(path))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ private class DatabasesWaitStrategy(private val auth: AuthToken): AbstractWaitSt
Unreliables.retryUntilSuccess(startupTimeout.seconds.toInt(), TimeUnit.SECONDS) {
rateLimiter.doWhenReady {
if (databases.isNotEmpty()) {
val databasesStatus = systemSession.beginTransaction()
val databasesStatus = systemSession.beginTransaction()
.use { tx -> tx.run("SHOW DATABASES").list().map { it.get("name").asString() to it.get("currentStatus").asString() }.toMap() }
val notOnline = databasesStatus.filterValues { it != "online" }
if (databasesStatus.size < databases.size || notOnline.isNotEmpty()) {
Expand All @@ -57,7 +57,7 @@ private class DatabasesWaitStrategy(private val auth: AuthToken): AbstractWaitSt
}

class Neo4jContainerExtension(dockerImage: String): Neo4jContainer<Neo4jContainerExtension>(dockerImage) {
constructor(): this("neo4j:4.1.1-enterprise")
constructor(): this("neo4j:4.4.23-enterprise")
private val logger = LoggerFactory.getLogger(Neo4jContainerExtension::class.java)
var driver: Driver? = null
var session: Session? = null
Expand Down Expand Up @@ -100,7 +100,7 @@ class Neo4jContainerExtension(dockerImage: String): Neo4jContainer<Neo4jContaine
}

fun withKafka(kafka: KafkaContainer): Neo4jContainerExtension {
return withKafka(kafka.network, kafka.networkAliases.map { "$it:9092" }.joinToString(","))
return withKafka(kafka.network!!, kafka.networkAliases.map { "$it:9092" }.joinToString(","))
}

fun withKafka(network: Network, bootstrapServers: String): Neo4jContainerExtension {
Expand Down

0 comments on commit fead34b

Please sign in to comment.