Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[performance] - TopicOperator capacity test case #10050

Merged
merged 6 commits into from
May 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 19 additions & 1 deletion .packit.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -227,4 +227,22 @@ jobs:
timeout: 1440
test:
tmt:
name: performance-capacity
name: performance-capacity
- job: tests
trigger: pull_request
identifier: "performance-topic-operator-capacity"
targets:
- centos-stream-9-x86_64
- centos-stream-9-aarch64
skip_build: true
manual_trigger: true
env: { IP_FAMILY: ipv4 }
labels:
- performance-topic-operator-capacity
tf_extra_params:
settings:
pipeline:
timeout: 1440
test:
tmt:
name: performance-topic-operator-capacity
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ public interface PerformanceConstants {
*/
String TOPIC_OPERATOR_IN_NUMBER_OF_TOPICS_TO_UPDATE = "IN: UPDATE NUMBER OF TOPICS";

String TOPIC_OPERATOR_IN_MAX_QUEUE_SIZE = "IN: MAX QUEUE SIZE";

/**
* OUT constants represent the output metrics or results measured after the performance tests involving the topic operator.
*/
Expand Down Expand Up @@ -75,6 +77,8 @@ public interface PerformanceConstants {
*/
String TOPIC_OPERATOR_OUT_UPDATE_TIMES = "OUT: Bob Update Times (ms)";

String TOPIC_OPERATOR_OUT_SUCCESSFUL_KAFKA_TOPICS_CREATED = "OUT: Successful KafkaTopics Created";

// --------------------------------------------------------------------------------
// ------------------------------ USER OPERATOR -----------------------------------
// --------------------------------------------------------------------------------
Expand Down Expand Up @@ -150,7 +154,7 @@ public interface PerformanceConstants {
String TOPIC_OPERATOR_BOBS_STREAMING_USE_CASE = "bobStreamingUseCase";
String TOPIC_OPERATOR_ALICE_BULK_USE_CASE = "aliceBulkUseCase";
String USER_OPERATOR_ALICE_BULK_USE_CASE = "aliceBulkUseCase";
String USER_OPERATOR_CAPACITY_USE_CASE = "capacityUseCase";
String GENERAL_CAPACITY_USE_CASE = "capacityUseCase";

/**
* Performance metrics file
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public class TopicOperatorPerformanceReporter extends BasePerformanceReporter {
protected Path resolveComponentUseCasePathDir(Path performanceLogDir, String useCaseName, Map<String, Object> performanceAttributes) {
final String maxBatchSize = performanceAttributes.getOrDefault(PerformanceConstants.TOPIC_OPERATOR_IN_MAX_BATCH_SIZE, "").toString();
final String maxBatchLingerMs = performanceAttributes.getOrDefault(PerformanceConstants.TOPIC_OPERATOR_IN_MAX_BATCH_LINGER_MS, "").toString();
final boolean clientsEnabled = !performanceAttributes.get(PerformanceConstants.TOPIC_OPERATOR_IN_NUMBER_OF_CLIENT_INSTANCES).equals(0);
final boolean clientsEnabled = !performanceAttributes.getOrDefault(PerformanceConstants.TOPIC_OPERATOR_IN_NUMBER_OF_CLIENT_INSTANCES, "0").equals(0);

// Use the useCaseName to create a directory specific to the current test case (Alice or Bob)
final Path topicOperatorUseCasePathDir = performanceLogDir.resolve(useCaseName + "/max-batch-size-" + maxBatchSize + "-max-linger-time-" + maxBatchLingerMs + "-with-clients-" + clientsEnabled);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,7 @@ public void deleteResourcesOfTypeWithoutWait(final String resourceKind) {
}

public static <T extends CustomResource<? extends Spec, ? extends Status>> boolean waitForResourceStatus(MixedOperation<T, ?, ?> operation, String kind, String namespace, String name, Enum<?> statusType, ConditionStatus conditionStatus, long resourceTimeoutMs) {
LOGGER.info("Waiting for {}: {}/{} will have desired state: {}", kind, namespace, name, statusType);
LOGGER.log(ResourceManager.getInstance().determineLogLevel(), "Waiting for {}: {}/{} will have desired state: {}", kind, namespace, name, statusType);

TestUtils.waitFor(String.format("%s: %s#%s will have desired state: %s", kind, namespace, name, statusType),
TestConstants.POLL_INTERVAL_FOR_RESOURCE_READINESS, resourceTimeoutMs,
Expand All @@ -496,7 +496,7 @@ public void deleteResourcesOfTypeWithoutWait(final String resourceKind) {
.withName(name)
.get()));

LOGGER.info("{}: {}/{} is in desired state: {}", kind, namespace, name, statusType);
LOGGER.log(ResourceManager.getInstance().determineLogLevel(), "{}: {}/{} is in desired state: {}", kind, namespace, name, statusType);
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
* This class contains crucial methods to create, modify and check large amount of KafkaTopics
Expand All @@ -30,60 +27,60 @@ public class KafkaTopicScalabilityUtils {
private static final Logger LOGGER = LogManager.getLogger(KafkaTopicUtils.class);
private KafkaTopicScalabilityUtils() {}

/**
* Creates Kafka topics in specified batches via Kubernetes within the given namespace.
*
* @param namespaceName the Kubernetes namespace where the topics will be created.
* @param clusterName the name of the Kafka cluster with which the topics are associated.
* @param topicPrefix the prefix string for the topic names, to which a numeric suffix will be added.
* @param start the starting index for the topic creation batch.
* @param end the ending index for the topic creation batch (exclusive).
* @param numberOfPartitions the number of partitions each topic should have.
* @param numberOfReplicas the number of replicas for each topic.
* @param minInSyncReplicas the minimum number of replicas that must be in sync.
*/
public static void createTopicsViaK8s(String namespaceName, String clusterName, String topicPrefix,
int numberOfTopics, int numberOfPartitions, int numberOfReplicas, int minInSyncReplicas) {
LOGGER.info("Creating {} Topics via Kubernetes", numberOfTopics);
int start, int end, int numberOfPartitions, int numberOfReplicas, int minInSyncReplicas) {
LOGGER.info("Creating topics from {} to {} via Kubernetes", start, end - 1);

for (int i = 0; i < numberOfTopics; i++) {
String currentTopicName = topicPrefix + i;
for (int i = start; i < end; i++) {
String currentTopicName = topicPrefix + "-" + i;
ResourceManager.getInstance().createResourceWithoutWait(KafkaTopicTemplates.topic(
clusterName, currentTopicName, numberOfPartitions, numberOfReplicas, minInSyncReplicas, namespaceName).build());
clusterName, currentTopicName, numberOfPartitions, numberOfReplicas, minInSyncReplicas, namespaceName).build());
}
}

public static void createTopicsViaK8s(String namespaceName, String clusterName, String topicPrefix,
int numberOfTopics, int numberOfPartitions, int numberOfReplicas, int minInSyncReplicas) {
createTopicsViaK8s(namespaceName, clusterName, topicPrefix, 0, numberOfTopics, numberOfPartitions, numberOfReplicas, minInSyncReplicas);
}

public static void waitForTopicStatus(String namespaceName, String topicPrefix, int numberOfTopics, Enum<?> conditionType) {
waitForTopicStatus(namespaceName, topicPrefix, numberOfTopics, conditionType, ConditionStatus.True);
}

public static void waitForTopicStatus(String namespaceName, String topicPrefix, int numberOfTopics, Enum<?> conditionType, ConditionStatus conditionStatus) {
LOGGER.info("Verifying that {} Topics are in {} state", numberOfTopics, conditionType.toString());

// Determine the appropriate number of threads
int numberOfThreads = Math.min(Runtime.getRuntime().availableProcessors(), numberOfTopics);
ExecutorService customExecutor = Executors.newFixedThreadPool(numberOfThreads);

try {
List<CompletableFuture<?>> topics = new ArrayList<>();

for (int i = 0; i < numberOfTopics; i++) {
final String currentTopic = topicPrefix + i;
topics.add(CompletableFuture.runAsync(() ->
KafkaTopicUtils.waitForKafkaTopicStatus(namespaceName, currentTopic, conditionType, conditionStatus),
customExecutor // Use the custom executor
));
}

CompletableFuture<Void> allTopics = CompletableFuture.allOf(topics.toArray(new CompletableFuture[0]))
.thenRunAsync(() -> LOGGER.info("All Topics are in correct state"), customExecutor);

allTopics.join();
} finally {
// Attempt to shut down now to immediately terminate ongoing tasks
List<Runnable> notExecutedTasks = customExecutor.shutdownNow();
if (!notExecutedTasks.isEmpty()) {
LOGGER.warn("There were {} tasks that did not start", notExecutedTasks.size());
}
try {
// Wait a while for tasks to respond to being cancelled
if (!customExecutor.awaitTermination(30, TimeUnit.SECONDS))
LOGGER.error("Executor did not terminate");
} catch (InterruptedException ie) {
// Preserve interrupt status
Thread.currentThread().interrupt();
}
/**
* Waits for a specific condition to be met for a range of Kafka topics in a given namespace.
*
* @param namespaceName The Kubernetes namespace where the topics reside.
* @param topicPrefix The prefix used for topic names, to which a numeric suffix will be added.
* @param start The starting index for checking topic statuses.
* @param end The ending index for checking topic statuses (exclusive).
* @param conditionType The type of condition to wait for (e.g., READY, NOT_READY).
* @param conditionStatus The desired status of the condition (e.g., TRUE, FALSE, UNKNOWN).
*/
public static void waitForTopicStatus(String namespaceName, String topicPrefix, int start, int end, Enum<?> conditionType, ConditionStatus conditionStatus) {
LOGGER.info("Verifying that topics from {} to {} are in {} state", start, end - 1, conditionType.toString());

for (int i = start; i < end; i++) {
final String currentTopic = topicPrefix + "-" + i;
KafkaTopicUtils.waitForKafkaTopicStatus(namespaceName, currentTopic, conditionType, conditionStatus);
}
}

public static void waitForTopicStatus(String namespaceName, String topicPrefix, int numberOfTopics, Enum<?> conditionType, ConditionStatus conditionStatus) {
waitForTopicStatus(namespaceName, topicPrefix, 0, numberOfTopics, conditionType, conditionStatus);
}

public static void waitForTopicsReady(String namespaceName, String topicPrefix, int numberOfTopics) {
waitForTopicStatus(namespaceName, topicPrefix, numberOfTopics, CustomResourceStatus.Ready);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,9 @@ public static void waitForTopicWithPrefixDeletion(String namespaceName, String t
TestUtils.waitFor("deletion of all topics with prefix: " + topicPrefix, TestConstants.GLOBAL_POLL_INTERVAL, DELETION_TIMEOUT,
() -> {
try {
return getAllKafkaTopicsWithPrefix(namespaceName, topicPrefix).size() == 0;
final int numberOfTopicsToDelete = getAllKafkaTopicsWithPrefix(namespaceName, topicPrefix).size();
LOGGER.info("Remaining KafkaTopic's to delete: {} !", numberOfTopicsToDelete);
return numberOfTopicsToDelete == 0;
} catch (Exception e) {
return e.getMessage().contains("Not Found") || e.getMessage().contains("the server doesn't have a resource type");
}
Expand Down
Loading
Loading