Skip to content

Commit

Permalink
fixed bugs in topic creation for test
Browse files Browse the repository at this point in the history
Signed-off-by: Ayan Sen <[email protected]>
  • Loading branch information
ayansen committed Jan 10, 2024
1 parent f4b876b commit e97217b
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 18 deletions.
15 changes: 8 additions & 7 deletions kafka/src/test/kotlin/ayansen/playground/kafka/Fixtures.kt
Original file line number Diff line number Diff line change
Expand Up @@ -38,20 +38,21 @@ object Fixtures {

private fun initializeKafka(): KafkaContainer {
val network: Network = Network.newNetwork()
val kafka = KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.4.0")).withKraft()
kafka.withNetwork(network).start()
val kafka = KafkaContainer(DockerImageName.parse(KAFKA_FULL_IMAGE_NAME))
.withKraft().withNetwork(network)
kafka.start()
return kafka
}

fun createTopics(topics:List<String>) {
val newTopics = topics.map { NewTopic(it, 1, 1) }
fun createTopics(topics: List<NewTopic>) {
val adminClient = AdminClient.create(mapOf(BOOTSTRAP_SERVERS_CONFIG to kafkaContainer.bootstrapServers))
adminClient.createTopics(newTopics)
adminClient.createTopics(topics)
}

val kafkaContainer = initializeKafka()
val schemaRegistryContainer = SchemaRegistryContainer().withKafka(kafkaContainer)
private const val APP_GROUP_ID = "test_application"
private const val KAFKA_FULL_IMAGE_NAME = "confluentinc/cp-kafka:7.4.0"

fun generateSampleEvents(numberOfEvents: Int, eventName: String): List<KeyValueTimestamp<String, SampleEvent>> =
(1..numberOfEvents).map {
Expand All @@ -67,7 +68,7 @@ object Fixtures {
val config = Properties()
config[ConsumerConfig.GROUP_ID_CONFIG] = APP_GROUP_ID
config[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = kafkaContainer.bootstrapServers
config["schema.registry.url"] = schemaRegistryContainer.schemaRegistryUrl()
config["schema.registry.url"] = schemaRegistryContainer.schemaRegistryUrl
config[KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG] = true
config[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "latest"
config[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] = "true"
Expand All @@ -79,7 +80,7 @@ object Fixtures {
fun getProducerProperties(): Properties {
val config = Properties()
config["bootstrap.servers"] = kafkaContainer.bootstrapServers
config["schema.registry.url"] = schemaRegistryContainer.schemaRegistryUrl()
config["schema.registry.url"] = schemaRegistryContainer.schemaRegistryUrl
config["acks"] = "all"
config[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
config[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = KafkaAvroSerializer::class.java
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ object IntegrationTestUtils {
private fun <K, V> getTestProducer(): KafkaProducer<K, V> {
val config = Properties()
config["bootstrap.servers"] = Fixtures.kafkaContainer.bootstrapServers
config["schema.registry.url"] = Fixtures.schemaRegistryContainer.schemaRegistryUrl()
config["schema.registry.url"] = Fixtures.schemaRegistryContainer.schemaRegistryUrl
config["acks"] = "all"
config[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
config[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = KafkaAvroSerializer::class.java
Expand All @@ -178,7 +178,7 @@ object IntegrationTestUtils {
config[ConsumerConfig.CLIENT_ID_CONFIG] = "integration-test-consumer-${(0..100).random()}"
config[ConsumerConfig.GROUP_ID_CONFIG] = "integration-test-consumers-${(0..100).random()}"
config[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = Fixtures.kafkaContainer.bootstrapServers
config["schema.registry.url"] = Fixtures.schemaRegistryContainer.schemaRegistryUrl()
config["schema.registry.url"] = Fixtures.schemaRegistryContainer.schemaRegistryUrl
config[KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG] = true
config[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "latest"
config[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] = "true"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,21 @@ class SchemaRegistryContainer :
const val CONFLUENT_PLATFORM_VERSION = "5.5.1"
}

lateinit var schemaRegistryUrl: String

init {
waitingFor(Wait.forHttp("/subjects").forStatusCode(200))
withExposedPorts(SCHEMA_REGISTRY_PORT)
}

fun withKafka(kafka: KafkaContainer): SchemaRegistryContainer {
return withKafka(kafka.network, kafka.networkAliases[0] + ":9092")
}

fun withKafka(network: Network?, bootstrapServers: String): SchemaRegistryContainer {
withNetwork(network)
withNetwork(kafka.network)
withEnv("SCHEMA_REGISTRY_HOST_NAME", "schema-registry")
withEnv("SCHEMA_REGISTRY_LISTENERS", "http://0.0.0.0:$SCHEMA_REGISTRY_PORT")
withEnv("SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS", "PLAINTEXT://$bootstrapServers")
withEnv("SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS", "PLAINTEXT://${kafka.networkAliases[0]}:9092")
start()
schemaRegistryUrl = "http://$host:${getMappedPort(SCHEMA_REGISTRY_PORT)}"
return self()
}

fun schemaRegistryUrl(): String = "http://$host:${getMappedPort(SCHEMA_REGISTRY_PORT)}"
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import ayansen.playground.kafka.IntegrationTestUtils
import ayansen.playground.kafka.Fixtures.generateSampleEvents
import ayansen.playground.kafka.Fixtures.getConsumerProperties
import ayansen.playground.kafka.Fixtures.getProducerProperties
import org.apache.kafka.clients.admin.NewTopic
import org.junit.jupiter.api.Test
import kotlin.test.assertEquals

Expand All @@ -40,7 +41,9 @@ class EventOrderingIT {
*/
@Test
fun `test ordering of events for a particular partition assigned based on record key`() {
createTopics(listOf(appConsumerTopic, appProducerTopic))
createTopics(
listOf(NewTopic(appConsumerTopic, 2, 1), NewTopic(appProducerTopic, 2, 1))
)
val firstSample = generateSampleEvents(10, "firstSample")
val secondSample = generateSampleEvents(10, "secondSample")
kafkaMirror.processRecords(500, 2)
Expand Down
3 changes: 2 additions & 1 deletion settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@ pluginManagement {
rootProject.name = 'playground'
include('graphql')
include('openapi')
include('envoy')
include('envoy')
include('kafka')

0 comments on commit e97217b

Please sign in to comment.