From baab1ecbb8ea2f493d53cbae449144c1038d50dc Mon Sep 17 00:00:00 2001 From: Aindriu Lavelle Date: Fri, 3 Jan 2025 08:35:28 +0000 Subject: [PATCH 1/2] Increase performance of the S3 Integration tests by allowing the polling time to be depndent on the test and to take into account if their are more records left to be retrieved before waiting to collect the next batch of records Signed-off-by: Aindriu Lavelle --- .../connect/s3/source/IntegrationBase.java | 31 ++++++++++++------- .../connect/s3/source/IntegrationTest.java | 9 ++++-- 2 files changed, 26 insertions(+), 14 deletions(-) diff --git a/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationBase.java b/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationBase.java index 6b505b99..932923e8 100644 --- a/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationBase.java +++ b/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationBase.java @@ -135,41 +135,50 @@ static List consumeByteMessages(final String topic, final int expectedMe String bootstrapServers) { final Properties consumerProperties = getConsumerProperties(bootstrapServers, ByteArrayDeserializer.class, ByteArrayDeserializer.class); - final List objects = consumeMessages(topic, expectedMessageCount, consumerProperties); + final List objects = consumeMessages(topic, expectedMessageCount, Duration.ofSeconds(60), + consumerProperties); return objects.stream().map(String::new).collect(Collectors.toList()); } static List consumeAvroMessages(final String topic, final int expectedMessageCount, - final String bootstrapServers, final String schemaRegistryUrl) { + final Duration expectedMaxDuration, final String bootstrapServers, final String schemaRegistryUrl) { final Properties consumerProperties = getConsumerProperties(bootstrapServers, StringDeserializer.class, KafkaAvroDeserializer.class, schemaRegistryUrl); - return consumeMessages(topic, expectedMessageCount, consumerProperties); + return consumeMessages(topic, expectedMessageCount, expectedMaxDuration, consumerProperties); } static List consumeJsonMessages(final String topic, final int expectedMessageCount, final String bootstrapServers) { final Properties consumerProperties = getConsumerProperties(bootstrapServers, StringDeserializer.class, JsonDeserializer.class); - return consumeMessages(topic, expectedMessageCount, consumerProperties); + return consumeMessages(topic, expectedMessageCount, Duration.ofSeconds(60), consumerProperties); } static List consumeMessages(final String topic, final int expectedMessageCount, - final Properties consumerProperties) { + final Duration expectedMaxDuration, final Properties consumerProperties) { try (KafkaConsumer consumer = new KafkaConsumer<>(consumerProperties)) { consumer.subscribe(Collections.singletonList(topic)); final List recordValues = new ArrayList<>(); - await().atMost(Duration.ofMinutes(5)).pollInterval(Duration.ofSeconds(5)).untilAsserted(() -> { - final ConsumerRecords records = consumer.poll(Duration.ofMillis(500L)); - for (final ConsumerRecord record : records) { - recordValues.add(record.value()); - } - assertThat(recordValues).hasSize(expectedMessageCount); + await().atMost(expectedMaxDuration).pollInterval(Duration.ofSeconds(1)).untilAsserted(() -> { + assertThat(assertAllRecordsConsumed(consumer, recordValues)).hasSize(expectedMessageCount); }); return recordValues; } } + private static List assertAllRecordsConsumed(KafkaConsumer consumer, List recordValues) { + int recordsRetrieved = 0; + do { + final ConsumerRecords records = consumer.poll(Duration.ofMillis(500L)); + recordsRetrieved = records.count(); + for (final ConsumerRecord record : records) { + recordValues.add(record.value()); + } + } while (recordsRetrieved > 10); + return recordValues; + } + static Map consumeOffsetMessages(KafkaConsumer consumer) throws IOException { // Poll messages from the topic final Map messages = new HashMap<>(); diff --git a/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationTest.java b/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationTest.java index 884051e3..1991b356 100644 --- a/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationTest.java +++ b/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationTest.java @@ -231,8 +231,11 @@ void avroTest(final TestInfo testInfo) throws IOException { // Poll Avro messages from the Kafka topic and deserialize them final List records = IntegrationBase.consumeAvroMessages(topicName, numOfRecsFactor * 5, - connectRunner.getBootstrapServers(), schemaRegistry.getSchemaRegistryUrl()); // Ensure this method - // deserializes Avro + Duration.ofMinutes(3), connectRunner.getBootstrapServers(), schemaRegistry.getSchemaRegistryUrl()); // Ensure + // this + // method + // deserializes + // Avro // Verify that the correct data is read from the S3 bucket and pushed to Kafka assertThat(records).map(record -> entry(record.get("id"), String.valueOf(record.get("message")))) @@ -269,7 +272,7 @@ void parquetTest(final TestInfo testInfo) throws IOException { Files.delete(path); } - final List records = IntegrationBase.consumeAvroMessages(topicName, 100, + final List records = IntegrationBase.consumeAvroMessages(topicName, 100, Duration.ofSeconds(60), connectRunner.getBootstrapServers(), schemaRegistry.getSchemaRegistryUrl()); final List expectedRecordNames = IntStream.range(0, 100) .mapToObj(i -> name + i) From 826748972d20a2f250409c7d60fea2c14600913a Mon Sep 17 00:00:00 2001 From: Aindriu Lavelle Date: Fri, 3 Jan 2025 10:34:13 +0000 Subject: [PATCH 2/2] Add comments explaining changes and improve the method name for consuming messages. Signed-off-by: Aindriu Lavelle --- .../aiven/kafka/connect/s3/source/IntegrationBase.java | 9 ++++++--- .../aiven/kafka/connect/s3/source/IntegrationTest.java | 9 ++++----- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationBase.java b/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationBase.java index 932923e8..442993bf 100644 --- a/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationBase.java +++ b/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationBase.java @@ -161,20 +161,23 @@ static List consumeMessages(final String topic, final int expectedMess final List recordValues = new ArrayList<>(); await().atMost(expectedMaxDuration).pollInterval(Duration.ofSeconds(1)).untilAsserted(() -> { - assertThat(assertAllRecordsConsumed(consumer, recordValues)).hasSize(expectedMessageCount); + assertThat(consumeRecordsInProgress(consumer, recordValues)).hasSize(expectedMessageCount); }); return recordValues; } } - private static List assertAllRecordsConsumed(KafkaConsumer consumer, List recordValues) { - int recordsRetrieved = 0; + private static List consumeRecordsInProgress(KafkaConsumer consumer, List recordValues) { + int recordsRetrieved; do { final ConsumerRecords records = consumer.poll(Duration.ofMillis(500L)); recordsRetrieved = records.count(); for (final ConsumerRecord record : records) { recordValues.add(record.value()); } + // Choosing 10 records as it allows for integration tests with a smaller max poll to be added + // while maintaining efficiency, a slightly larger number could be added but this is slightly more efficient + // than larger numbers. } while (recordsRetrieved > 10); return recordValues; } diff --git a/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationTest.java b/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationTest.java index 1991b356..5a573395 100644 --- a/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationTest.java +++ b/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationTest.java @@ -230,12 +230,10 @@ void avroTest(final TestInfo testInfo) throws IOException { assertThat(testBucketAccessor.listObjects()).hasSize(5); // Poll Avro messages from the Kafka topic and deserialize them + // Waiting for 25k kafka records in this test so a longer Duration is added. final List records = IntegrationBase.consumeAvroMessages(topicName, numOfRecsFactor * 5, - Duration.ofMinutes(3), connectRunner.getBootstrapServers(), schemaRegistry.getSchemaRegistryUrl()); // Ensure - // this - // method - // deserializes - // Avro + Duration.ofMinutes(3), connectRunner.getBootstrapServers(), schemaRegistry.getSchemaRegistryUrl()); + // Ensure this method deserializes Avro // Verify that the correct data is read from the S3 bucket and pushed to Kafka assertThat(records).map(record -> entry(record.get("id"), String.valueOf(record.get("message")))) @@ -272,6 +270,7 @@ void parquetTest(final TestInfo testInfo) throws IOException { Files.delete(path); } + // Waiting for a small number of messages so using a smaller Duration of a minute final List records = IntegrationBase.consumeAvroMessages(topicName, 100, Duration.ofSeconds(60), connectRunner.getBootstrapServers(), schemaRegistry.getSchemaRegistryUrl()); final List expectedRecordNames = IntStream.range(0, 100)