Skip to content

Commit

Permalink
updated kafka repository to use test containers
Browse files Browse the repository at this point in the history
Signed-off-by: Ayan Sen <[email protected]>
  • Loading branch information
ayansen committed Jan 10, 2024
1 parent 2e46e5c commit f4b876b
Show file tree
Hide file tree
Showing 6 changed files with 93 additions and 62 deletions.
3 changes: 1 addition & 2 deletions kafka/build.gradle
Original file line number Diff line number Diff line change
@@ -1,19 +1,18 @@
plugins {
id 'ayansen.playground.kotlin-common-conventions'
id 'com.github.davidmc24.gradle.plugin.avro' version '1.3.0'
id 'com.avast.gradle.docker-compose' version '0.16.8'
}
repositories {
maven {
url 'https://packages.confluent.io/maven/'
}
}
dockerCompose.isRequiredBy(test)

dependencies {
implementation 'org.apache.kafka:kafka-streams:2.5.0'
implementation 'org.apache.kafka:kafka-clients:2.6.0'
implementation 'io.confluent:kafka-avro-serializer:7.2.1'
implementation 'org.apache.avro:avro:1.10.0'
testImplementation "org.testcontainers:kafka:1.19.3"
testImplementation 'org.apache.kafka:kafka-streams-test-utils:2.6.0'
}
49 changes: 0 additions & 49 deletions kafka/docker-compose.yml

This file was deleted.

33 changes: 26 additions & 7 deletions kafka/src/test/kotlin/ayansen/playground/kafka/Fixtures.kt
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,39 @@ import ayansen.playground.avro.SampleEvent
import io.confluent.kafka.serializers.KafkaAvroDeserializer
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig
import io.confluent.kafka.serializers.KafkaAvroSerializer
import org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG
import org.apache.kafka.clients.admin.AdminClient
import org.apache.kafka.clients.admin.NewTopic
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.serialization.StringSerializer
import org.testcontainers.containers.KafkaContainer
import org.testcontainers.containers.Network
import org.testcontainers.utility.DockerImageName
import java.util.*

data class KeyValueTimestamp<T, U>(val key: T, val value: U, val timestamp: Long)


object Fixtures {

const val KAFKA_BROKERS = "localhost:19092"
const val SCHEMA_REGISTRY_URL = "http://localhost:8083"
const val APP_GROUP_ID = "test_application"
private fun initializeKafka(): KafkaContainer {
val network: Network = Network.newNetwork()
val kafka = KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.4.0")).withKraft()
kafka.withNetwork(network).start()
return kafka
}

fun createTopics(topics:List<String>) {
val newTopics = topics.map { NewTopic(it, 1, 1) }
val adminClient = AdminClient.create(mapOf(BOOTSTRAP_SERVERS_CONFIG to kafkaContainer.bootstrapServers))
adminClient.createTopics(newTopics)
}

val kafkaContainer = initializeKafka()
val schemaRegistryContainer = SchemaRegistryContainer().withKafka(kafkaContainer)
private const val APP_GROUP_ID = "test_application"

fun generateSampleEvents(numberOfEvents: Int, eventName: String): List<KeyValueTimestamp<String, SampleEvent>> =
(1..numberOfEvents).map {
Expand All @@ -47,8 +66,8 @@ object Fixtures {
fun getConsumerProperties(): Properties {
val config = Properties()
config[ConsumerConfig.GROUP_ID_CONFIG] = APP_GROUP_ID
config[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = KAFKA_BROKERS
config["schema.registry.url"] = SCHEMA_REGISTRY_URL
config[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = kafkaContainer.bootstrapServers
config["schema.registry.url"] = schemaRegistryContainer.schemaRegistryUrl()
config[KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG] = true
config[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "latest"
config[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] = "true"
Expand All @@ -59,8 +78,8 @@ object Fixtures {

fun getProducerProperties(): Properties {
val config = Properties()
config["bootstrap.servers"] = KAFKA_BROKERS
config["schema.registry.url"] = SCHEMA_REGISTRY_URL
config["bootstrap.servers"] = kafkaContainer.bootstrapServers
config["schema.registry.url"] = schemaRegistryContainer.schemaRegistryUrl()
config["acks"] = "all"
config[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
config[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = KafkaAvroSerializer::class.java
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,26 @@ import kotlin.test.assertEquals
*/
object IntegrationTestUtils {

/**
* Produce the given list of records, waiting for up to `maxWaitMs` for the writes
* to complete.
*
* @param eos Whether or not the producer should use EOS
* @param topic Kafka topic to produce to
* @param partition Kafka partition to produce to
* @param toProduce List of records to produce
* @param maxWaitMs Maximum amount of time to wait for the writes to complete (ms)
* @param <V> Value type of the data records
* @param <K> Key type of the data records
* @return The list of records that were produced
* @throws Exception for any Kafka-related failures
* @throws AssertionError if the expected number of records were not produced
* within the timeout period
* @see .produceSynchronously
* @see .waitUntilMinRecordsReceived
* @see .readRecords
**/

fun <V, K> produceSynchronously(
eos: Boolean,
topic: String,
Expand Down Expand Up @@ -145,8 +165,8 @@ object IntegrationTestUtils {

private fun <K, V> getTestProducer(): KafkaProducer<K, V> {
val config = Properties()
config["bootstrap.servers"] = Fixtures.KAFKA_BROKERS
config["schema.registry.url"] = Fixtures.SCHEMA_REGISTRY_URL
config["bootstrap.servers"] = Fixtures.kafkaContainer.bootstrapServers
config["schema.registry.url"] = Fixtures.schemaRegistryContainer.schemaRegistryUrl()
config["acks"] = "all"
config[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
config[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = KafkaAvroSerializer::class.java
Expand All @@ -157,13 +177,16 @@ object IntegrationTestUtils {
val config = Properties()
config[ConsumerConfig.CLIENT_ID_CONFIG] = "integration-test-consumer-${(0..100).random()}"
config[ConsumerConfig.GROUP_ID_CONFIG] = "integration-test-consumers-${(0..100).random()}"
config[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = Fixtures.KAFKA_BROKERS
config["schema.registry.url"] = Fixtures.SCHEMA_REGISTRY_URL
config[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = Fixtures.kafkaContainer.bootstrapServers
config["schema.registry.url"] = Fixtures.schemaRegistryContainer.schemaRegistryUrl()
config[KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG] = true
config[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "latest"
config[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] = "true"
config[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
config[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = KafkaAvroDeserializer::class.java
return KafkaConsumer(config)
}



}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package ayansen.playground.kafka

import org.testcontainers.containers.GenericContainer
import org.testcontainers.containers.KafkaContainer
import org.testcontainers.containers.Network

import org.testcontainers.containers.wait.strategy.Wait


class SchemaRegistryContainer :
GenericContainer<SchemaRegistryContainer>("$SCHEMA_REGISTRY_IMAGE:$CONFLUENT_PLATFORM_VERSION") {
companion object {
const val SCHEMA_REGISTRY_IMAGE = "confluentinc/cp-schema-registry"
const val SCHEMA_REGISTRY_PORT = 8081
const val CONFLUENT_PLATFORM_VERSION = "5.5.1"
}

init {
waitingFor(Wait.forHttp("/subjects").forStatusCode(200))
withExposedPorts(SCHEMA_REGISTRY_PORT)
}

fun withKafka(kafka: KafkaContainer): SchemaRegistryContainer {
return withKafka(kafka.network, kafka.networkAliases[0] + ":9092")
}

fun withKafka(network: Network?, bootstrapServers: String): SchemaRegistryContainer {
withNetwork(network)
withEnv("SCHEMA_REGISTRY_HOST_NAME", "schema-registry")
withEnv("SCHEMA_REGISTRY_LISTENERS", "http://0.0.0.0:$SCHEMA_REGISTRY_PORT")
withEnv("SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS", "PLAINTEXT://$bootstrapServers")
start()
return self()
}

fun schemaRegistryUrl(): String = "http://$host:${getMappedPort(SCHEMA_REGISTRY_PORT)}"
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package ayansen.playground.kafka.mirror

import ayansen.playground.avro.SampleEvent
import ayansen.playground.kafka.Fixtures.createTopics
import ayansen.playground.kafka.IntegrationTestUtils
import ayansen.playground.kafka.Fixtures.generateSampleEvents
import ayansen.playground.kafka.Fixtures.getConsumerProperties
Expand All @@ -39,6 +40,7 @@ class EventOrderingIT {
*/
@Test
fun `test ordering of events for a particular partition assigned based on record key`() {
createTopics(listOf(appConsumerTopic, appProducerTopic))
val firstSample = generateSampleEvents(10, "firstSample")
val secondSample = generateSampleEvents(10, "secondSample")
kafkaMirror.processRecords(500, 2)
Expand Down

0 comments on commit f4b876b

Please sign in to comment.