diff --git a/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationBase.java b/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationBase.java index 6b505b99..442993bf 100644 --- a/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationBase.java +++ b/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationBase.java @@ -135,41 +135,53 @@ static List consumeByteMessages(final String topic, final int expectedMe String bootstrapServers) { final Properties consumerProperties = getConsumerProperties(bootstrapServers, ByteArrayDeserializer.class, ByteArrayDeserializer.class); - final List objects = consumeMessages(topic, expectedMessageCount, consumerProperties); + final List objects = consumeMessages(topic, expectedMessageCount, Duration.ofSeconds(60), + consumerProperties); return objects.stream().map(String::new).collect(Collectors.toList()); } static List consumeAvroMessages(final String topic, final int expectedMessageCount, - final String bootstrapServers, final String schemaRegistryUrl) { + final Duration expectedMaxDuration, final String bootstrapServers, final String schemaRegistryUrl) { final Properties consumerProperties = getConsumerProperties(bootstrapServers, StringDeserializer.class, KafkaAvroDeserializer.class, schemaRegistryUrl); - return consumeMessages(topic, expectedMessageCount, consumerProperties); + return consumeMessages(topic, expectedMessageCount, expectedMaxDuration, consumerProperties); } static List consumeJsonMessages(final String topic, final int expectedMessageCount, final String bootstrapServers) { final Properties consumerProperties = getConsumerProperties(bootstrapServers, StringDeserializer.class, JsonDeserializer.class); - return consumeMessages(topic, expectedMessageCount, consumerProperties); + return consumeMessages(topic, expectedMessageCount, Duration.ofSeconds(60), consumerProperties); } static List consumeMessages(final String topic, final int expectedMessageCount, - final Properties consumerProperties) { + final Duration expectedMaxDuration, final Properties consumerProperties) { try (KafkaConsumer consumer = new KafkaConsumer<>(consumerProperties)) { consumer.subscribe(Collections.singletonList(topic)); final List recordValues = new ArrayList<>(); - await().atMost(Duration.ofMinutes(5)).pollInterval(Duration.ofSeconds(5)).untilAsserted(() -> { - final ConsumerRecords records = consumer.poll(Duration.ofMillis(500L)); - for (final ConsumerRecord record : records) { - recordValues.add(record.value()); - } - assertThat(recordValues).hasSize(expectedMessageCount); + await().atMost(expectedMaxDuration).pollInterval(Duration.ofSeconds(1)).untilAsserted(() -> { + assertThat(consumeRecordsInProgress(consumer, recordValues)).hasSize(expectedMessageCount); }); return recordValues; } } + private static List consumeRecordsInProgress(KafkaConsumer consumer, List recordValues) { + int recordsRetrieved; + do { + final ConsumerRecords records = consumer.poll(Duration.ofMillis(500L)); + recordsRetrieved = records.count(); + for (final ConsumerRecord record : records) { + recordValues.add(record.value()); + } + // Choosing 10 records as it allows for integration tests with a smaller max poll to be added + // while maintaining efficiency, a slightly larger number could be added but this is slightly more efficient + // than larger numbers. + } while (recordsRetrieved > 10); + return recordValues; + } + static Map consumeOffsetMessages(KafkaConsumer consumer) throws IOException { // Poll messages from the topic final Map messages = new HashMap<>(); diff --git a/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationTest.java b/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationTest.java index 884051e3..5a573395 100644 --- a/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationTest.java +++ b/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationTest.java @@ -230,9 +230,10 @@ void avroTest(final TestInfo testInfo) throws IOException { assertThat(testBucketAccessor.listObjects()).hasSize(5); // Poll Avro messages from the Kafka topic and deserialize them + // Waiting for 25k kafka records in this test so a longer Duration is added. final List records = IntegrationBase.consumeAvroMessages(topicName, numOfRecsFactor * 5, - connectRunner.getBootstrapServers(), schemaRegistry.getSchemaRegistryUrl()); // Ensure this method - // deserializes Avro + Duration.ofMinutes(3), connectRunner.getBootstrapServers(), schemaRegistry.getSchemaRegistryUrl()); + // Ensure this method deserializes Avro // Verify that the correct data is read from the S3 bucket and pushed to Kafka assertThat(records).map(record -> entry(record.get("id"), String.valueOf(record.get("message")))) @@ -269,7 +270,8 @@ void parquetTest(final TestInfo testInfo) throws IOException { Files.delete(path); } - final List records = IntegrationBase.consumeAvroMessages(topicName, 100, + // Waiting for a small number of messages so using a smaller Duration of a minute + final List records = IntegrationBase.consumeAvroMessages(topicName, 100, Duration.ofSeconds(60), connectRunner.getBootstrapServers(), schemaRegistry.getSchemaRegistryUrl()); final List expectedRecordNames = IntStream.range(0, 100) .mapToObj(i -> name + i)