Skip to content

Commit

Permalink
updated kafka repository to use test containers (#30)
Browse files Browse the repository at this point in the history
* updated kafka repository to use test containers

Signed-off-by: Ayan Sen <[email protected]>

* fixed bugs in topic creation for test

Signed-off-by: Ayan Sen <[email protected]>

---------

Signed-off-by: Ayan Sen <[email protected]>
  • Loading branch information
ayansen authored Jan 10, 2024
1 parent 2e46e5c commit f5c74a7
Show file tree
Hide file tree
Showing 7 changed files with 97 additions and 63 deletions.
3 changes: 1 addition & 2 deletions kafka/build.gradle
Original file line number Diff line number Diff line change
@@ -1,19 +1,18 @@
plugins {
id 'ayansen.playground.kotlin-common-conventions'
id 'com.github.davidmc24.gradle.plugin.avro' version '1.3.0'
id 'com.avast.gradle.docker-compose' version '0.16.8'
}
repositories {
maven {
url 'https://packages.confluent.io/maven/'
}
}
dockerCompose.isRequiredBy(test)

dependencies {
implementation 'org.apache.kafka:kafka-streams:2.5.0'
implementation 'org.apache.kafka:kafka-clients:2.6.0'
implementation 'io.confluent:kafka-avro-serializer:7.2.1'
implementation 'org.apache.avro:avro:1.10.0'
testImplementation "org.testcontainers:kafka:1.19.3"
testImplementation 'org.apache.kafka:kafka-streams-test-utils:2.6.0'
}
49 changes: 0 additions & 49 deletions kafka/docker-compose.yml

This file was deleted.

34 changes: 27 additions & 7 deletions kafka/src/test/kotlin/ayansen/playground/kafka/Fixtures.kt
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,40 @@ import ayansen.playground.avro.SampleEvent
import io.confluent.kafka.serializers.KafkaAvroDeserializer
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig
import io.confluent.kafka.serializers.KafkaAvroSerializer
import org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG
import org.apache.kafka.clients.admin.AdminClient
import org.apache.kafka.clients.admin.NewTopic
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.serialization.StringSerializer
import org.testcontainers.containers.KafkaContainer
import org.testcontainers.containers.Network
import org.testcontainers.utility.DockerImageName
import java.util.*

data class KeyValueTimestamp<T, U>(val key: T, val value: U, val timestamp: Long)


object Fixtures {

const val KAFKA_BROKERS = "localhost:19092"
const val SCHEMA_REGISTRY_URL = "http://localhost:8083"
const val APP_GROUP_ID = "test_application"
private fun initializeKafka(): KafkaContainer {
val network: Network = Network.newNetwork()
val kafka = KafkaContainer(DockerImageName.parse(KAFKA_FULL_IMAGE_NAME))
.withKraft().withNetwork(network)
kafka.start()
return kafka
}

fun createTopics(topics: List<NewTopic>) {
val adminClient = AdminClient.create(mapOf(BOOTSTRAP_SERVERS_CONFIG to kafkaContainer.bootstrapServers))
adminClient.createTopics(topics)
}

val kafkaContainer = initializeKafka()
val schemaRegistryContainer = SchemaRegistryContainer().withKafka(kafkaContainer)
private const val APP_GROUP_ID = "test_application"
private const val KAFKA_FULL_IMAGE_NAME = "confluentinc/cp-kafka:7.4.0"

fun generateSampleEvents(numberOfEvents: Int, eventName: String): List<KeyValueTimestamp<String, SampleEvent>> =
(1..numberOfEvents).map {
Expand All @@ -47,8 +67,8 @@ object Fixtures {
fun getConsumerProperties(): Properties {
val config = Properties()
config[ConsumerConfig.GROUP_ID_CONFIG] = APP_GROUP_ID
config[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = KAFKA_BROKERS
config["schema.registry.url"] = SCHEMA_REGISTRY_URL
config[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = kafkaContainer.bootstrapServers
config["schema.registry.url"] = schemaRegistryContainer.schemaRegistryUrl
config[KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG] = true
config[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "latest"
config[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] = "true"
Expand All @@ -59,8 +79,8 @@ object Fixtures {

fun getProducerProperties(): Properties {
val config = Properties()
config["bootstrap.servers"] = KAFKA_BROKERS
config["schema.registry.url"] = SCHEMA_REGISTRY_URL
config["bootstrap.servers"] = kafkaContainer.bootstrapServers
config["schema.registry.url"] = schemaRegistryContainer.schemaRegistryUrl
config["acks"] = "all"
config[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
config[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = KafkaAvroSerializer::class.java
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,26 @@ import kotlin.test.assertEquals
*/
object IntegrationTestUtils {

/**
* Produce the given list of records, waiting for up to `maxWaitMs` for the writes
* to complete.
*
* @param eos Whether or not the producer should use EOS
* @param topic Kafka topic to produce to
* @param partition Kafka partition to produce to
* @param toProduce List of records to produce
* @param maxWaitMs Maximum amount of time to wait for the writes to complete (ms)
* @param <V> Value type of the data records
* @param <K> Key type of the data records
* @return The list of records that were produced
* @throws Exception for any Kafka-related failures
* @throws AssertionError if the expected number of records were not produced
* within the timeout period
* @see .produceSynchronously
* @see .waitUntilMinRecordsReceived
* @see .readRecords
**/

fun <V, K> produceSynchronously(
eos: Boolean,
topic: String,
Expand Down Expand Up @@ -145,8 +165,8 @@ object IntegrationTestUtils {

private fun <K, V> getTestProducer(): KafkaProducer<K, V> {
val config = Properties()
config["bootstrap.servers"] = Fixtures.KAFKA_BROKERS
config["schema.registry.url"] = Fixtures.SCHEMA_REGISTRY_URL
config["bootstrap.servers"] = Fixtures.kafkaContainer.bootstrapServers
config["schema.registry.url"] = Fixtures.schemaRegistryContainer.schemaRegistryUrl
config["acks"] = "all"
config[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
config[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = KafkaAvroSerializer::class.java
Expand All @@ -157,13 +177,16 @@ object IntegrationTestUtils {
val config = Properties()
config[ConsumerConfig.CLIENT_ID_CONFIG] = "integration-test-consumer-${(0..100).random()}"
config[ConsumerConfig.GROUP_ID_CONFIG] = "integration-test-consumers-${(0..100).random()}"
config[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = Fixtures.KAFKA_BROKERS
config["schema.registry.url"] = Fixtures.SCHEMA_REGISTRY_URL
config[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = Fixtures.kafkaContainer.bootstrapServers
config["schema.registry.url"] = Fixtures.schemaRegistryContainer.schemaRegistryUrl
config[KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG] = true
config[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "latest"
config[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] = "true"
config[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
config[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = KafkaAvroDeserializer::class.java
return KafkaConsumer(config)
}



}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package ayansen.playground.kafka

import org.testcontainers.containers.GenericContainer
import org.testcontainers.containers.KafkaContainer
import org.testcontainers.containers.Network

import org.testcontainers.containers.wait.strategy.Wait


class SchemaRegistryContainer :
GenericContainer<SchemaRegistryContainer>("$SCHEMA_REGISTRY_IMAGE:$CONFLUENT_PLATFORM_VERSION") {
companion object {
const val SCHEMA_REGISTRY_IMAGE = "confluentinc/cp-schema-registry"
const val SCHEMA_REGISTRY_PORT = 8081
const val CONFLUENT_PLATFORM_VERSION = "5.5.1"
}

lateinit var schemaRegistryUrl: String

init {
waitingFor(Wait.forHttp("/subjects").forStatusCode(200))
withExposedPorts(SCHEMA_REGISTRY_PORT)
}

fun withKafka(kafka: KafkaContainer): SchemaRegistryContainer {
withNetwork(kafka.network)
withEnv("SCHEMA_REGISTRY_HOST_NAME", "schema-registry")
withEnv("SCHEMA_REGISTRY_LISTENERS", "http://0.0.0.0:$SCHEMA_REGISTRY_PORT")
withEnv("SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS", "PLAINTEXT://${kafka.networkAliases[0]}:9092")
start()
schemaRegistryUrl = "http://$host:${getMappedPort(SCHEMA_REGISTRY_PORT)}"
return self()
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@
package ayansen.playground.kafka.mirror

import ayansen.playground.avro.SampleEvent
import ayansen.playground.kafka.Fixtures.createTopics
import ayansen.playground.kafka.IntegrationTestUtils
import ayansen.playground.kafka.Fixtures.generateSampleEvents
import ayansen.playground.kafka.Fixtures.getConsumerProperties
import ayansen.playground.kafka.Fixtures.getProducerProperties
import org.apache.kafka.clients.admin.NewTopic
import org.junit.jupiter.api.Test
import kotlin.test.assertEquals

Expand All @@ -39,6 +41,9 @@ class EventOrderingIT {
*/
@Test
fun `test ordering of events for a particular partition assigned based on record key`() {
createTopics(
listOf(NewTopic(appConsumerTopic, 2, 1), NewTopic(appProducerTopic, 2, 1))
)
val firstSample = generateSampleEvents(10, "firstSample")
val secondSample = generateSampleEvents(10, "secondSample")
kafkaMirror.processRecords(500, 2)
Expand Down
3 changes: 2 additions & 1 deletion settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@ pluginManagement {
rootProject.name = 'playground'
include('graphql')
include('openapi')
include('envoy')
include('envoy')
include('kafka')

0 comments on commit f5c74a7

Please sign in to comment.