Skip to content

Commit

Permalink
Increase performance of the S3 Integration tests by allowing the poll…
Browse files Browse the repository at this point in the history
…ing time to be depndent on the test and to take into account if their are more records left to be retrieved before waiting to collect the next batch of records

Signed-off-by: Aindriu Lavelle <[email protected]>
  • Loading branch information
aindriu-aiven committed Jan 3, 2025
1 parent b4475c9 commit baab1ec
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -135,41 +135,50 @@ static List<String> consumeByteMessages(final String topic, final int expectedMe
String bootstrapServers) {
final Properties consumerProperties = getConsumerProperties(bootstrapServers, ByteArrayDeserializer.class,
ByteArrayDeserializer.class);
final List<byte[]> objects = consumeMessages(topic, expectedMessageCount, consumerProperties);
final List<byte[]> objects = consumeMessages(topic, expectedMessageCount, Duration.ofSeconds(60),
consumerProperties);
return objects.stream().map(String::new).collect(Collectors.toList());
}

static List<GenericRecord> consumeAvroMessages(final String topic, final int expectedMessageCount,
final String bootstrapServers, final String schemaRegistryUrl) {
final Duration expectedMaxDuration, final String bootstrapServers, final String schemaRegistryUrl) {
final Properties consumerProperties = getConsumerProperties(bootstrapServers, StringDeserializer.class,
KafkaAvroDeserializer.class, schemaRegistryUrl);
return consumeMessages(topic, expectedMessageCount, consumerProperties);
return consumeMessages(topic, expectedMessageCount, expectedMaxDuration, consumerProperties);
}

static List<JsonNode> consumeJsonMessages(final String topic, final int expectedMessageCount,
final String bootstrapServers) {
final Properties consumerProperties = getConsumerProperties(bootstrapServers, StringDeserializer.class,
JsonDeserializer.class);
return consumeMessages(topic, expectedMessageCount, consumerProperties);
return consumeMessages(topic, expectedMessageCount, Duration.ofSeconds(60), consumerProperties);
}

static <K, V> List<V> consumeMessages(final String topic, final int expectedMessageCount,
final Properties consumerProperties) {
final Duration expectedMaxDuration, final Properties consumerProperties) {
try (KafkaConsumer<K, V> consumer = new KafkaConsumer<>(consumerProperties)) {
consumer.subscribe(Collections.singletonList(topic));

final List<V> recordValues = new ArrayList<>();
await().atMost(Duration.ofMinutes(5)).pollInterval(Duration.ofSeconds(5)).untilAsserted(() -> {
final ConsumerRecords<K, V> records = consumer.poll(Duration.ofMillis(500L));
for (final ConsumerRecord<K, V> record : records) {
recordValues.add(record.value());
}
assertThat(recordValues).hasSize(expectedMessageCount);
await().atMost(expectedMaxDuration).pollInterval(Duration.ofSeconds(1)).untilAsserted(() -> {
assertThat(assertAllRecordsConsumed(consumer, recordValues)).hasSize(expectedMessageCount);
});
return recordValues;
}
}

private static <K, V> List<V> assertAllRecordsConsumed(KafkaConsumer<K, V> consumer, List<V> recordValues) {
int recordsRetrieved = 0;
do {
final ConsumerRecords<K, V> records = consumer.poll(Duration.ofMillis(500L));
recordsRetrieved = records.count();
for (final ConsumerRecord<K, V> record : records) {
recordValues.add(record.value());
}
} while (recordsRetrieved > 10);
return recordValues;
}

static Map<String, Object> consumeOffsetMessages(KafkaConsumer<byte[], byte[]> consumer) throws IOException {
// Poll messages from the topic
final Map<String, Object> messages = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,8 +231,11 @@ void avroTest(final TestInfo testInfo) throws IOException {

// Poll Avro messages from the Kafka topic and deserialize them
final List<GenericRecord> records = IntegrationBase.consumeAvroMessages(topicName, numOfRecsFactor * 5,
connectRunner.getBootstrapServers(), schemaRegistry.getSchemaRegistryUrl()); // Ensure this method
// deserializes Avro
Duration.ofMinutes(3), connectRunner.getBootstrapServers(), schemaRegistry.getSchemaRegistryUrl()); // Ensure
// this
// method
// deserializes
// Avro

// Verify that the correct data is read from the S3 bucket and pushed to Kafka
assertThat(records).map(record -> entry(record.get("id"), String.valueOf(record.get("message"))))
Expand Down Expand Up @@ -269,7 +272,7 @@ void parquetTest(final TestInfo testInfo) throws IOException {
Files.delete(path);
}

final List<GenericRecord> records = IntegrationBase.consumeAvroMessages(topicName, 100,
final List<GenericRecord> records = IntegrationBase.consumeAvroMessages(topicName, 100, Duration.ofSeconds(60),
connectRunner.getBootstrapServers(), schemaRegistry.getSchemaRegistryUrl());
final List<String> expectedRecordNames = IntStream.range(0, 100)
.mapToObj(i -> name + i)
Expand Down

0 comments on commit baab1ec

Please sign in to comment.