From f5c74a7a090a1818a34272af79718bcd0beadbe5 Mon Sep 17 00:00:00 2001 From: Ayan Sen Date: Wed, 10 Jan 2024 11:09:48 -0800 Subject: [PATCH] updated kafka repository to use test containers (#30) * updated kafka repository to use test containers Signed-off-by: Ayan Sen * fixed bugs in topic creation for test Signed-off-by: Ayan Sen --------- Signed-off-by: Ayan Sen --- kafka/build.gradle | 3 +- kafka/docker-compose.yml | 49 ------------------- .../ayansen/playground/kafka/Fixtures.kt | 34 ++++++++++--- .../playground/kafka/IntegrationTestUtils.kt | 31 ++++++++++-- .../kafka/SchemaRegistryContainer.kt | 35 +++++++++++++ .../kafka/mirror/EventOrderingIT.kt | 5 ++ settings.gradle | 3 +- 7 files changed, 97 insertions(+), 63 deletions(-) delete mode 100644 kafka/docker-compose.yml create mode 100644 kafka/src/test/kotlin/ayansen/playground/kafka/SchemaRegistryContainer.kt diff --git a/kafka/build.gradle b/kafka/build.gradle index aeec87b..cbb7386 100644 --- a/kafka/build.gradle +++ b/kafka/build.gradle @@ -1,19 +1,18 @@ plugins { id 'ayansen.playground.kotlin-common-conventions' id 'com.github.davidmc24.gradle.plugin.avro' version '1.3.0' - id 'com.avast.gradle.docker-compose' version '0.16.8' } repositories { maven { url 'https://packages.confluent.io/maven/' } } -dockerCompose.isRequiredBy(test) dependencies { implementation 'org.apache.kafka:kafka-streams:2.5.0' implementation 'org.apache.kafka:kafka-clients:2.6.0' implementation 'io.confluent:kafka-avro-serializer:7.2.1' implementation 'org.apache.avro:avro:1.10.0' + testImplementation "org.testcontainers:kafka:1.19.3" testImplementation 'org.apache.kafka:kafka-streams-test-utils:2.6.0' } \ No newline at end of file diff --git a/kafka/docker-compose.yml b/kafka/docker-compose.yml deleted file mode 100644 index 5766ab2..0000000 --- a/kafka/docker-compose.yml +++ /dev/null @@ -1,49 +0,0 @@ -# -# Copyright 2020. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -version: "3" -services: - zookeeper: - image: wurstmeister/zookeeper:3.4.6 - ports: - - "2181:2181" - - kafka: - image: wurstmeister/kafka:2.13-2.8.1 - ports: - - "9092:9092" - - "19092:19092" - environment: - KAFKA_BROKER_ID: 1 - KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:9092,OUTSIDE://localhost:19092 - KAFKA_LISTENERS: INSIDE://:9092,OUTSIDE://:19092 - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT - KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE - KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 - KAFKA_CREATE_TOPICS: "test_topic_v1:2:1,test_topic_mirrored_v1:1:1" - depends_on: - - zookeeper - - schema-registry: - image: confluentinc/cp-schema-registry:5.3.1 - depends_on: - - zookeeper - - kafka - ports: - - 8083:8081 - environment: - SCHEMA_REGISTRY_HOST_NAME: schema-registry - SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'PLAINTEXT://kafka:9092' \ No newline at end of file diff --git a/kafka/src/test/kotlin/ayansen/playground/kafka/Fixtures.kt b/kafka/src/test/kotlin/ayansen/playground/kafka/Fixtures.kt index e954a3c..db06525 100644 --- a/kafka/src/test/kotlin/ayansen/playground/kafka/Fixtures.kt +++ b/kafka/src/test/kotlin/ayansen/playground/kafka/Fixtures.kt @@ -19,10 +19,16 @@ import ayansen.playground.avro.SampleEvent import io.confluent.kafka.serializers.KafkaAvroDeserializer import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig import io.confluent.kafka.serializers.KafkaAvroSerializer +import org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG +import org.apache.kafka.clients.admin.AdminClient +import org.apache.kafka.clients.admin.NewTopic import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.clients.producer.ProducerConfig import org.apache.kafka.common.serialization.StringDeserializer import org.apache.kafka.common.serialization.StringSerializer +import org.testcontainers.containers.KafkaContainer +import org.testcontainers.containers.Network +import org.testcontainers.utility.DockerImageName import java.util.* data class KeyValueTimestamp(val key: T, val value: U, val timestamp: Long) @@ -30,9 +36,23 @@ data class KeyValueTimestamp(val key: T, val value: U, val timestamp: Long object Fixtures { - const val KAFKA_BROKERS = "localhost:19092" - const val SCHEMA_REGISTRY_URL = "http://localhost:8083" - const val APP_GROUP_ID = "test_application" + private fun initializeKafka(): KafkaContainer { + val network: Network = Network.newNetwork() + val kafka = KafkaContainer(DockerImageName.parse(KAFKA_FULL_IMAGE_NAME)) + .withKraft().withNetwork(network) + kafka.start() + return kafka + } + + fun createTopics(topics: List) { + val adminClient = AdminClient.create(mapOf(BOOTSTRAP_SERVERS_CONFIG to kafkaContainer.bootstrapServers)) + adminClient.createTopics(topics) + } + + val kafkaContainer = initializeKafka() + val schemaRegistryContainer = SchemaRegistryContainer().withKafka(kafkaContainer) + private const val APP_GROUP_ID = "test_application" + private const val KAFKA_FULL_IMAGE_NAME = "confluentinc/cp-kafka:7.4.0" fun generateSampleEvents(numberOfEvents: Int, eventName: String): List> = (1..numberOfEvents).map { @@ -47,8 +67,8 @@ object Fixtures { fun getConsumerProperties(): Properties { val config = Properties() config[ConsumerConfig.GROUP_ID_CONFIG] = APP_GROUP_ID - config[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = KAFKA_BROKERS - config["schema.registry.url"] = SCHEMA_REGISTRY_URL + config[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = kafkaContainer.bootstrapServers + config["schema.registry.url"] = schemaRegistryContainer.schemaRegistryUrl config[KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG] = true config[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "latest" config[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] = "true" @@ -59,8 +79,8 @@ object Fixtures { fun getProducerProperties(): Properties { val config = Properties() - config["bootstrap.servers"] = KAFKA_BROKERS - config["schema.registry.url"] = SCHEMA_REGISTRY_URL + config["bootstrap.servers"] = kafkaContainer.bootstrapServers + config["schema.registry.url"] = schemaRegistryContainer.schemaRegistryUrl config["acks"] = "all" config[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java config[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = KafkaAvroSerializer::class.java diff --git a/kafka/src/test/kotlin/ayansen/playground/kafka/IntegrationTestUtils.kt b/kafka/src/test/kotlin/ayansen/playground/kafka/IntegrationTestUtils.kt index 3ad2c92..7f54b70 100644 --- a/kafka/src/test/kotlin/ayansen/playground/kafka/IntegrationTestUtils.kt +++ b/kafka/src/test/kotlin/ayansen/playground/kafka/IntegrationTestUtils.kt @@ -41,6 +41,26 @@ import kotlin.test.assertEquals */ object IntegrationTestUtils { + /** + * Produce the given list of records, waiting for up to `maxWaitMs` for the writes + * to complete. + * + * @param eos Whether or not the producer should use EOS + * @param topic Kafka topic to produce to + * @param partition Kafka partition to produce to + * @param toProduce List of records to produce + * @param maxWaitMs Maximum amount of time to wait for the writes to complete (ms) + * @param Value type of the data records + * @param Key type of the data records + * @return The list of records that were produced + * @throws Exception for any Kafka-related failures + * @throws AssertionError if the expected number of records were not produced + * within the timeout period + * @see .produceSynchronously + * @see .waitUntilMinRecordsReceived + * @see .readRecords + **/ + fun produceSynchronously( eos: Boolean, topic: String, @@ -145,8 +165,8 @@ object IntegrationTestUtils { private fun getTestProducer(): KafkaProducer { val config = Properties() - config["bootstrap.servers"] = Fixtures.KAFKA_BROKERS - config["schema.registry.url"] = Fixtures.SCHEMA_REGISTRY_URL + config["bootstrap.servers"] = Fixtures.kafkaContainer.bootstrapServers + config["schema.registry.url"] = Fixtures.schemaRegistryContainer.schemaRegistryUrl config["acks"] = "all" config[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java config[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = KafkaAvroSerializer::class.java @@ -157,8 +177,8 @@ object IntegrationTestUtils { val config = Properties() config[ConsumerConfig.CLIENT_ID_CONFIG] = "integration-test-consumer-${(0..100).random()}" config[ConsumerConfig.GROUP_ID_CONFIG] = "integration-test-consumers-${(0..100).random()}" - config[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = Fixtures.KAFKA_BROKERS - config["schema.registry.url"] = Fixtures.SCHEMA_REGISTRY_URL + config[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = Fixtures.kafkaContainer.bootstrapServers + config["schema.registry.url"] = Fixtures.schemaRegistryContainer.schemaRegistryUrl config[KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG] = true config[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "latest" config[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] = "true" @@ -166,4 +186,7 @@ object IntegrationTestUtils { config[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = KafkaAvroDeserializer::class.java return KafkaConsumer(config) } + + + } \ No newline at end of file diff --git a/kafka/src/test/kotlin/ayansen/playground/kafka/SchemaRegistryContainer.kt b/kafka/src/test/kotlin/ayansen/playground/kafka/SchemaRegistryContainer.kt new file mode 100644 index 0000000..f880172 --- /dev/null +++ b/kafka/src/test/kotlin/ayansen/playground/kafka/SchemaRegistryContainer.kt @@ -0,0 +1,35 @@ +package ayansen.playground.kafka + +import org.testcontainers.containers.GenericContainer +import org.testcontainers.containers.KafkaContainer +import org.testcontainers.containers.Network + +import org.testcontainers.containers.wait.strategy.Wait + + +class SchemaRegistryContainer : + GenericContainer("$SCHEMA_REGISTRY_IMAGE:$CONFLUENT_PLATFORM_VERSION") { + companion object { + const val SCHEMA_REGISTRY_IMAGE = "confluentinc/cp-schema-registry" + const val SCHEMA_REGISTRY_PORT = 8081 + const val CONFLUENT_PLATFORM_VERSION = "5.5.1" + } + + lateinit var schemaRegistryUrl: String + + init { + waitingFor(Wait.forHttp("/subjects").forStatusCode(200)) + withExposedPorts(SCHEMA_REGISTRY_PORT) + } + + fun withKafka(kafka: KafkaContainer): SchemaRegistryContainer { + withNetwork(kafka.network) + withEnv("SCHEMA_REGISTRY_HOST_NAME", "schema-registry") + withEnv("SCHEMA_REGISTRY_LISTENERS", "http://0.0.0.0:$SCHEMA_REGISTRY_PORT") + withEnv("SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS", "PLAINTEXT://${kafka.networkAliases[0]}:9092") + start() + schemaRegistryUrl = "http://$host:${getMappedPort(SCHEMA_REGISTRY_PORT)}" + return self() + } + +} \ No newline at end of file diff --git a/kafka/src/test/kotlin/ayansen/playground/kafka/mirror/EventOrderingIT.kt b/kafka/src/test/kotlin/ayansen/playground/kafka/mirror/EventOrderingIT.kt index 53b5a3d..e0fb1cd 100644 --- a/kafka/src/test/kotlin/ayansen/playground/kafka/mirror/EventOrderingIT.kt +++ b/kafka/src/test/kotlin/ayansen/playground/kafka/mirror/EventOrderingIT.kt @@ -16,10 +16,12 @@ package ayansen.playground.kafka.mirror import ayansen.playground.avro.SampleEvent +import ayansen.playground.kafka.Fixtures.createTopics import ayansen.playground.kafka.IntegrationTestUtils import ayansen.playground.kafka.Fixtures.generateSampleEvents import ayansen.playground.kafka.Fixtures.getConsumerProperties import ayansen.playground.kafka.Fixtures.getProducerProperties +import org.apache.kafka.clients.admin.NewTopic import org.junit.jupiter.api.Test import kotlin.test.assertEquals @@ -39,6 +41,9 @@ class EventOrderingIT { */ @Test fun `test ordering of events for a particular partition assigned based on record key`() { + createTopics( + listOf(NewTopic(appConsumerTopic, 2, 1), NewTopic(appProducerTopic, 2, 1)) + ) val firstSample = generateSampleEvents(10, "firstSample") val secondSample = generateSampleEvents(10, "secondSample") kafkaMirror.processRecords(500, 2) diff --git a/settings.gradle b/settings.gradle index 0472bb9..bea354b 100644 --- a/settings.gradle +++ b/settings.gradle @@ -7,4 +7,5 @@ pluginManagement { rootProject.name = 'playground' include('graphql') include('openapi') -include('envoy') \ No newline at end of file +include('envoy') +include('kafka') \ No newline at end of file