Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/s3-source-release' into polling_…
Browse files Browse the repository at this point in the history
…efficiency
  • Loading branch information
¨Claude committed Jan 6, 2025
2 parents cf32e79 + f6d3087 commit a7c2570
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -182,41 +182,53 @@ static List<String> consumeByteMessages(final String topic, final int expectedMe
String bootstrapServers) {
final Properties consumerProperties = getConsumerProperties(bootstrapServers, ByteArrayDeserializer.class,
ByteArrayDeserializer.class);
final List<byte[]> objects = consumeMessages(topic, expectedMessageCount, consumerProperties);
final List<byte[]> objects = consumeMessages(topic, expectedMessageCount, Duration.ofSeconds(60),
consumerProperties);
return objects.stream().map(String::new).collect(Collectors.toList());
}

static List<GenericRecord> consumeAvroMessages(final String topic, final int expectedMessageCount,
final String bootstrapServers, final String schemaRegistryUrl) {
final Duration expectedMaxDuration, final String bootstrapServers, final String schemaRegistryUrl) {
final Properties consumerProperties = getConsumerProperties(bootstrapServers, StringDeserializer.class,
KafkaAvroDeserializer.class, schemaRegistryUrl);
return consumeMessages(topic, expectedMessageCount, consumerProperties);
return consumeMessages(topic, expectedMessageCount, expectedMaxDuration, consumerProperties);
}

static List<JsonNode> consumeJsonMessages(final String topic, final int expectedMessageCount,
final String bootstrapServers) {
final Properties consumerProperties = getConsumerProperties(bootstrapServers, StringDeserializer.class,
JsonDeserializer.class);
return consumeMessages(topic, expectedMessageCount, consumerProperties);
return consumeMessages(topic, expectedMessageCount, Duration.ofSeconds(60), consumerProperties);
}

static <K, V> List<V> consumeMessages(final String topic, final int expectedMessageCount,
final Properties consumerProperties) {
final Duration expectedMaxDuration, final Properties consumerProperties) {
try (KafkaConsumer<K, V> consumer = new KafkaConsumer<>(consumerProperties)) {
consumer.subscribe(Collections.singletonList(topic));

final List<V> recordValues = new ArrayList<>();
await().atMost(Duration.ofMinutes(1)).pollInterval(Duration.ofSeconds(5)).untilAsserted(() -> {
final ConsumerRecords<K, V> records = consumer.poll(Duration.ofMillis(500L));
for (final ConsumerRecord<K, V> record : records) {
recordValues.add(record.value());
}
assertThat(recordValues).hasSize(expectedMessageCount);
await().atMost(expectedMaxDuration).pollInterval(Duration.ofSeconds(1)).untilAsserted(() -> {
assertThat(consumeRecordsInProgress(consumer, recordValues)).hasSize(expectedMessageCount);
});
return recordValues;
}
}

private static <K, V> List<V> consumeRecordsInProgress(KafkaConsumer<K, V> consumer, List<V> recordValues) {
int recordsRetrieved;
do {
final ConsumerRecords<K, V> records = consumer.poll(Duration.ofMillis(500L));
recordsRetrieved = records.count();
for (final ConsumerRecord<K, V> record : records) {
recordValues.add(record.value());
}
// Choosing 10 records as it allows for integration tests with a smaller max poll to be added
// while maintaining efficiency, a slightly larger number could be added but this is slightly more efficient
// than larger numbers.
} while (recordsRetrieved > 10);
return recordValues;
}

static Map<String, Object> consumeOffsetMessages(KafkaConsumer<byte[], byte[]> consumer) throws IOException {
// Poll messages from the topic
final Map<String, Object> messages = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,9 +223,10 @@ void avroTest(final TestInfo testInfo) throws IOException {
assertThat(testBucketAccessor.listObjects()).hasSize(5);

// Poll Avro messages from the Kafka topic and deserialize them
// Waiting for 25k kafka records in this test so a longer Duration is added.
final List<GenericRecord> records = IntegrationBase.consumeAvroMessages(topicName, numOfRecsFactor * 5,
connectRunner.getBootstrapServers(), schemaRegistry.getSchemaRegistryUrl()); // Ensure this method
// deserializes Avro
Duration.ofMinutes(3), connectRunner.getBootstrapServers(), schemaRegistry.getSchemaRegistryUrl());
// Ensure this method deserializes Avro

// Verify that the correct data is read from the S3 bucket and pushed to Kafka
assertThat(records).map(record -> entry(record.get("id"), String.valueOf(record.get("message"))))
Expand Down Expand Up @@ -263,7 +264,8 @@ void parquetTest(final TestInfo testInfo) throws IOException {
Files.delete(path);
}

final List<GenericRecord> records = IntegrationBase.consumeAvroMessages(topicName, 100,
// Waiting for a small number of messages so using a smaller Duration of a minute
final List<GenericRecord> records = IntegrationBase.consumeAvroMessages(topicName, 100, Duration.ofSeconds(60),
connectRunner.getBootstrapServers(), schemaRegistry.getSchemaRegistryUrl());
final List<String> expectedRecordNames = IntStream.range(0, 100)
.mapToObj(i -> name + i)
Expand Down

0 comments on commit a7c2570

Please sign in to comment.