Skip to content

Commit

Permalink
MINOR: Add extended cluster readiness check for Connect's embedded Ka…
Browse files Browse the repository at this point in the history
…fka cluster (#16757)

Reviewers: Greg Harris <[email protected]>
  • Loading branch information
C0urante authored Aug 1, 2024
1 parent bc4df73 commit a524609
Showing 1 changed file with 45 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ public class EmbeddedKafkaCluster {
private static final Logger log = LoggerFactory.getLogger(EmbeddedKafkaCluster.class);

private static final long DEFAULT_PRODUCE_SEND_DURATION_MS = TimeUnit.SECONDS.toMillis(120);
private static final long GROUP_COORDINATOR_AVAILABILITY_DURATION_MS = TimeUnit.MINUTES.toMillis(2);

private final KafkaClusterTestKit cluster;
private final Properties brokerConfig;
Expand Down Expand Up @@ -153,6 +154,49 @@ public void start() {
producerProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
}
producer = new KafkaProducer<>(producerProps, new ByteArraySerializer(), new ByteArraySerializer());

verifyClusterReadiness();
}

/**
* Perform an extended check to ensure that the primary APIs of the cluster are available, including:
* <ul>
* <li>Ability to create a topic</li>
* <li>Ability to produce to a topic</li>
* <li>Ability to form a consumer group</li>
* <li>Ability to consume from a topic</li>
* </ul>
* If this method completes successfully, all resources created to verify the cluster health
* (such as topics and consumer groups) will be cleaned up before it returns.
* <p>
* This provides extra guarantees compared to other cluster readiness checks such as
* {@link ConnectAssertions#assertExactlyNumBrokersAreUp(int, String)} and
* {@link KafkaClusterTestKit#waitForReadyBrokers()}, which verify that brokers have
* completed startup and joined the cluster, but do not verify that the internal consumer
* offsets topic has been created or that it's actually possible for users to create and
* interact with topics.
*/
public void verifyClusterReadiness() {
String consumerGroupId = UUID.randomUUID().toString();
Map<String, Object> consumerConfig = Collections.singletonMap(GROUP_ID_CONFIG, consumerGroupId);
String topic = "consumer-warmup-" + consumerGroupId;

createTopic(topic);
produce(topic, "warmup message key", "warmup message value");

try (Consumer<?, ?> consumer = createConsumerAndSubscribeTo(consumerConfig, topic)) {
ConsumerRecords<?, ?> records = consumer.poll(Duration.ofMillis(GROUP_COORDINATOR_AVAILABILITY_DURATION_MS));
if (records.isEmpty()) {
throw new AssertionError("Failed to verify availability of group coordinator and produce/consume APIs on Kafka cluster in time");
}
}

try (Admin admin = createAdminClient()) {
admin.deleteConsumerGroups(Collections.singleton(consumerGroupId)).all().get(30, TimeUnit.SECONDS);
admin.deleteTopics(Collections.singleton(topic)).all().get(30, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
throw new AssertionError("Failed to clean up cluster health check resource(s)", e);
}
}

/**
Expand All @@ -163,6 +207,7 @@ public void start() {
*/
public void restartOnlyBrokers() {
cluster.brokers().values().forEach(BrokerServer::startup);
verifyClusterReadiness();
}

/**
Expand Down

0 comments on commit a524609

Please sign in to comment.