From 94f9387aebf0b670bc38609062f4e50a1abbb012 Mon Sep 17 00:00:00 2001 From: Francesco Guardiani Date: Thu, 14 Sep 2023 11:08:23 +0200 Subject: [PATCH] Produce many events in the kafka test (#196) --- .../kotlin/dev/restate/e2e/KafkaIngressTest.kt | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/tests/src/test/kotlin/dev/restate/e2e/KafkaIngressTest.kt b/tests/src/test/kotlin/dev/restate/e2e/KafkaIngressTest.kt index 7e7cce95..81f57327 100644 --- a/tests/src/test/kotlin/dev/restate/e2e/KafkaIngressTest.kt +++ b/tests/src/test/kotlin/dev/restate/e2e/KafkaIngressTest.kt @@ -71,7 +71,7 @@ abstract class BaseKafkaIngressTest { bootstrapServer: String, topic: String, key: String, - value: String + values: List ) { val props = Properties() props["bootstrap.servers"] = bootstrapServer @@ -79,7 +79,9 @@ abstract class BaseKafkaIngressTest { props["value.serializer"] = "org.apache.kafka.common.serialization.StringSerializer" val producer: Producer = KafkaProducer(props) - producer.send(ProducerRecord(topic, key, value)) + for (value in values) { + producer.send(ProducerRecord(topic, key, value)) + } producer.close() } @@ -91,9 +93,6 @@ abstract class BaseKafkaIngressTest { ) { val counter = UUID.randomUUID().toString() - // Produce message to kafka - produceMessageToKafka("PLAINTEXT://localhost:$kafkaPort", TOPIC, counter, "123") - // Create subscription val subscriptionsClient = SubscriptionsClient(ObjectMapper(), metaURL.toString(), OkHttpClient()) @@ -107,13 +106,16 @@ abstract class BaseKafkaIngressTest { .extracting { it.statusCode } .isEqualTo(201) + // Produce message to kafka + produceMessageToKafka("PLAINTEXT://localhost:$kafkaPort", TOPIC, counter, listOf("1", "2", "3")) + // Now wait for the update to be visible await untilCallTo { counterClient.get(counterRequest { counterName = counter }) } matches { num -> - num!!.value == 123L + num!!.value == 6L } } }