Skip to content

Commit

Permalink
[performance] - TopicOperator capacity test case (#10050)
Browse files Browse the repository at this point in the history
Signed-off-by: see-quick <[email protected]>
  • Loading branch information
see-quick authored May 15, 2024
1 parent 65f1ec2 commit a900d30
Show file tree
Hide file tree
Showing 10 changed files with 269 additions and 52 deletions.
20 changes: 19 additions & 1 deletion .packit.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -227,4 +227,22 @@ jobs:
timeout: 1440
test:
tmt:
name: performance-capacity
name: performance-capacity
- job: tests
trigger: pull_request
identifier: "performance-topic-operator-capacity"
targets:
- centos-stream-9-x86_64
- centos-stream-9-aarch64
skip_build: true
manual_trigger: true
env: { IP_FAMILY: ipv4 }
labels:
- performance-topic-operator-capacity
tf_extra_params:
settings:
pipeline:
timeout: 1440
test:
tmt:
name: performance-topic-operator-capacity
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ public interface PerformanceConstants {
*/
String TOPIC_OPERATOR_IN_NUMBER_OF_TOPICS_TO_UPDATE = "IN: UPDATE NUMBER OF TOPICS";

String TOPIC_OPERATOR_IN_MAX_QUEUE_SIZE = "IN: MAX QUEUE SIZE";

/**
* OUT constants represent the output metrics or results measured after the performance tests involving the topic operator.
*/
Expand Down Expand Up @@ -75,6 +77,8 @@ public interface PerformanceConstants {
*/
String TOPIC_OPERATOR_OUT_UPDATE_TIMES = "OUT: Bob Update Times (ms)";

String TOPIC_OPERATOR_OUT_SUCCESSFUL_KAFKA_TOPICS_CREATED = "OUT: Successful KafkaTopics Created";

// --------------------------------------------------------------------------------
// ------------------------------ USER OPERATOR -----------------------------------
// --------------------------------------------------------------------------------
Expand Down Expand Up @@ -150,7 +154,7 @@ public interface PerformanceConstants {
String TOPIC_OPERATOR_BOBS_STREAMING_USE_CASE = "bobStreamingUseCase";
String TOPIC_OPERATOR_ALICE_BULK_USE_CASE = "aliceBulkUseCase";
String USER_OPERATOR_ALICE_BULK_USE_CASE = "aliceBulkUseCase";
String USER_OPERATOR_CAPACITY_USE_CASE = "capacityUseCase";
String GENERAL_CAPACITY_USE_CASE = "capacityUseCase";

/**
* Performance metrics file
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public class TopicOperatorPerformanceReporter extends BasePerformanceReporter {
protected Path resolveComponentUseCasePathDir(Path performanceLogDir, String useCaseName, Map<String, Object> performanceAttributes) {
final String maxBatchSize = performanceAttributes.getOrDefault(PerformanceConstants.TOPIC_OPERATOR_IN_MAX_BATCH_SIZE, "").toString();
final String maxBatchLingerMs = performanceAttributes.getOrDefault(PerformanceConstants.TOPIC_OPERATOR_IN_MAX_BATCH_LINGER_MS, "").toString();
final boolean clientsEnabled = !performanceAttributes.get(PerformanceConstants.TOPIC_OPERATOR_IN_NUMBER_OF_CLIENT_INSTANCES).equals(0);
final boolean clientsEnabled = !performanceAttributes.getOrDefault(PerformanceConstants.TOPIC_OPERATOR_IN_NUMBER_OF_CLIENT_INSTANCES, "0").equals(0);

// Use the useCaseName to create a directory specific to the current test case (Alice or Bob)
final Path topicOperatorUseCasePathDir = performanceLogDir.resolve(useCaseName + "/max-batch-size-" + maxBatchSize + "-max-linger-time-" + maxBatchLingerMs + "-with-clients-" + clientsEnabled);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,7 @@ public void deleteResourcesOfTypeWithoutWait(final String resourceKind) {
}

public static <T extends CustomResource<? extends Spec, ? extends Status>> boolean waitForResourceStatus(MixedOperation<T, ?, ?> operation, String kind, String namespace, String name, Enum<?> statusType, ConditionStatus conditionStatus, long resourceTimeoutMs) {
LOGGER.info("Waiting for {}: {}/{} will have desired state: {}", kind, namespace, name, statusType);
LOGGER.log(ResourceManager.getInstance().determineLogLevel(), "Waiting for {}: {}/{} will have desired state: {}", kind, namespace, name, statusType);

TestUtils.waitFor(String.format("%s: %s#%s will have desired state: %s", kind, namespace, name, statusType),
TestConstants.POLL_INTERVAL_FOR_RESOURCE_READINESS, resourceTimeoutMs,
Expand All @@ -496,7 +496,7 @@ public void deleteResourcesOfTypeWithoutWait(final String resourceKind) {
.withName(name)
.get()));

LOGGER.info("{}: {}/{} is in desired state: {}", kind, namespace, name, statusType);
LOGGER.log(ResourceManager.getInstance().determineLogLevel(), "{}: {}/{} is in desired state: {}", kind, namespace, name, statusType);
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
* This class contains crucial methods to create, modify and check large amount of KafkaTopics
Expand All @@ -30,60 +27,60 @@ public class KafkaTopicScalabilityUtils {
private static final Logger LOGGER = LogManager.getLogger(KafkaTopicUtils.class);
private KafkaTopicScalabilityUtils() {}

/**
* Creates Kafka topics in specified batches via Kubernetes within the given namespace.
*
* @param namespaceName the Kubernetes namespace where the topics will be created.
* @param clusterName the name of the Kafka cluster with which the topics are associated.
* @param topicPrefix the prefix string for the topic names, to which a numeric suffix will be added.
* @param start the starting index for the topic creation batch.
* @param end the ending index for the topic creation batch (exclusive).
* @param numberOfPartitions the number of partitions each topic should have.
* @param numberOfReplicas the number of replicas for each topic.
* @param minInSyncReplicas the minimum number of replicas that must be in sync.
*/
public static void createTopicsViaK8s(String namespaceName, String clusterName, String topicPrefix,
int numberOfTopics, int numberOfPartitions, int numberOfReplicas, int minInSyncReplicas) {
LOGGER.info("Creating {} Topics via Kubernetes", numberOfTopics);
int start, int end, int numberOfPartitions, int numberOfReplicas, int minInSyncReplicas) {
LOGGER.info("Creating topics from {} to {} via Kubernetes", start, end - 1);

for (int i = 0; i < numberOfTopics; i++) {
String currentTopicName = topicPrefix + i;
for (int i = start; i < end; i++) {
String currentTopicName = topicPrefix + "-" + i;
ResourceManager.getInstance().createResourceWithoutWait(KafkaTopicTemplates.topic(
clusterName, currentTopicName, numberOfPartitions, numberOfReplicas, minInSyncReplicas, namespaceName).build());
clusterName, currentTopicName, numberOfPartitions, numberOfReplicas, minInSyncReplicas, namespaceName).build());
}
}

public static void createTopicsViaK8s(String namespaceName, String clusterName, String topicPrefix,
int numberOfTopics, int numberOfPartitions, int numberOfReplicas, int minInSyncReplicas) {
createTopicsViaK8s(namespaceName, clusterName, topicPrefix, 0, numberOfTopics, numberOfPartitions, numberOfReplicas, minInSyncReplicas);
}

public static void waitForTopicStatus(String namespaceName, String topicPrefix, int numberOfTopics, Enum<?> conditionType) {
waitForTopicStatus(namespaceName, topicPrefix, numberOfTopics, conditionType, ConditionStatus.True);
}

public static void waitForTopicStatus(String namespaceName, String topicPrefix, int numberOfTopics, Enum<?> conditionType, ConditionStatus conditionStatus) {
LOGGER.info("Verifying that {} Topics are in {} state", numberOfTopics, conditionType.toString());

// Determine the appropriate number of threads
int numberOfThreads = Math.min(Runtime.getRuntime().availableProcessors(), numberOfTopics);
ExecutorService customExecutor = Executors.newFixedThreadPool(numberOfThreads);

try {
List<CompletableFuture<?>> topics = new ArrayList<>();

for (int i = 0; i < numberOfTopics; i++) {
final String currentTopic = topicPrefix + i;
topics.add(CompletableFuture.runAsync(() ->
KafkaTopicUtils.waitForKafkaTopicStatus(namespaceName, currentTopic, conditionType, conditionStatus),
customExecutor // Use the custom executor
));
}

CompletableFuture<Void> allTopics = CompletableFuture.allOf(topics.toArray(new CompletableFuture[0]))
.thenRunAsync(() -> LOGGER.info("All Topics are in correct state"), customExecutor);

allTopics.join();
} finally {
// Attempt to shut down now to immediately terminate ongoing tasks
List<Runnable> notExecutedTasks = customExecutor.shutdownNow();
if (!notExecutedTasks.isEmpty()) {
LOGGER.warn("There were {} tasks that did not start", notExecutedTasks.size());
}
try {
// Wait a while for tasks to respond to being cancelled
if (!customExecutor.awaitTermination(30, TimeUnit.SECONDS))
LOGGER.error("Executor did not terminate");
} catch (InterruptedException ie) {
// Preserve interrupt status
Thread.currentThread().interrupt();
}
/**
* Waits for a specific condition to be met for a range of Kafka topics in a given namespace.
*
* @param namespaceName The Kubernetes namespace where the topics reside.
* @param topicPrefix The prefix used for topic names, to which a numeric suffix will be added.
* @param start The starting index for checking topic statuses.
* @param end The ending index for checking topic statuses (exclusive).
* @param conditionType The type of condition to wait for (e.g., READY, NOT_READY).
* @param conditionStatus The desired status of the condition (e.g., TRUE, FALSE, UNKNOWN).
*/
public static void waitForTopicStatus(String namespaceName, String topicPrefix, int start, int end, Enum<?> conditionType, ConditionStatus conditionStatus) {
LOGGER.info("Verifying that topics from {} to {} are in {} state", start, end - 1, conditionType.toString());

for (int i = start; i < end; i++) {
final String currentTopic = topicPrefix + "-" + i;
KafkaTopicUtils.waitForKafkaTopicStatus(namespaceName, currentTopic, conditionType, conditionStatus);
}
}

public static void waitForTopicStatus(String namespaceName, String topicPrefix, int numberOfTopics, Enum<?> conditionType, ConditionStatus conditionStatus) {
waitForTopicStatus(namespaceName, topicPrefix, 0, numberOfTopics, conditionType, conditionStatus);
}

public static void waitForTopicsReady(String namespaceName, String topicPrefix, int numberOfTopics) {
waitForTopicStatus(namespaceName, topicPrefix, numberOfTopics, CustomResourceStatus.Ready);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,9 @@ public static void waitForTopicWithPrefixDeletion(String namespaceName, String t
TestUtils.waitFor("deletion of all topics with prefix: " + topicPrefix, TestConstants.GLOBAL_POLL_INTERVAL, DELETION_TIMEOUT,
() -> {
try {
return getAllKafkaTopicsWithPrefix(namespaceName, topicPrefix).size() == 0;
final int numberOfTopicsToDelete = getAllKafkaTopicsWithPrefix(namespaceName, topicPrefix).size();
LOGGER.info("Remaining KafkaTopic's to delete: {} !", numberOfTopicsToDelete);
return numberOfTopicsToDelete == 0;
} catch (Exception e) {
return e.getMessage().contains("Not Found") || e.getMessage().contains("the server doesn't have a resource type");
}
Expand Down
Loading

0 comments on commit a900d30

Please sign in to comment.