From a900d30240fbf8cb93a1fcde684b8405f609acd7 Mon Sep 17 00:00:00 2001 From: Maros Orsak Date: Wed, 15 May 2024 08:51:58 +0200 Subject: [PATCH] [performance] - TopicOperator capacity test case (#10050) Signed-off-by: see-quick --- .packit.yaml | 20 +- .../performance/PerformanceConstants.java | 6 +- .../TopicOperatorPerformanceReporter.java | 2 +- .../systemtest/resources/ResourceManager.java | 4 +- .../KafkaTopicScalabilityUtils.java | 85 +++++---- .../utils/kafkaUtils/KafkaTopicUtils.java | 4 +- .../performance/TopicOperatorPerformance.java | 173 ++++++++++++++++++ .../performance/UserOperatorPerformance.java | 2 +- systemtest/tmt/plans/main.fmf | 13 ++ systemtest/tmt/tests/strimzi/main.fmf | 12 +- 10 files changed, 269 insertions(+), 52 deletions(-) diff --git a/.packit.yaml b/.packit.yaml index 4dd682ad939..d0b3c88ea89 100644 --- a/.packit.yaml +++ b/.packit.yaml @@ -227,4 +227,22 @@ jobs: timeout: 1440 test: tmt: - name: performance-capacity \ No newline at end of file + name: performance-capacity + - job: tests + trigger: pull_request + identifier: "performance-topic-operator-capacity" + targets: + - centos-stream-9-x86_64 + - centos-stream-9-aarch64 + skip_build: true + manual_trigger: true + env: { IP_FAMILY: ipv4 } + labels: + - performance-topic-operator-capacity + tf_extra_params: + settings: + pipeline: + timeout: 1440 + test: + tmt: + name: performance-topic-operator-capacity \ No newline at end of file diff --git a/systemtest/src/main/java/io/strimzi/systemtest/performance/PerformanceConstants.java b/systemtest/src/main/java/io/strimzi/systemtest/performance/PerformanceConstants.java index 931462081d4..5854113fdbe 100644 --- a/systemtest/src/main/java/io/strimzi/systemtest/performance/PerformanceConstants.java +++ b/systemtest/src/main/java/io/strimzi/systemtest/performance/PerformanceConstants.java @@ -46,6 +46,8 @@ public interface PerformanceConstants { */ String TOPIC_OPERATOR_IN_NUMBER_OF_TOPICS_TO_UPDATE = "IN: UPDATE NUMBER OF TOPICS"; + String TOPIC_OPERATOR_IN_MAX_QUEUE_SIZE = "IN: MAX QUEUE SIZE"; + /** * OUT constants represent the output metrics or results measured after the performance tests involving the topic operator. */ @@ -75,6 +77,8 @@ public interface PerformanceConstants { */ String TOPIC_OPERATOR_OUT_UPDATE_TIMES = "OUT: Bob Update Times (ms)"; + String TOPIC_OPERATOR_OUT_SUCCESSFUL_KAFKA_TOPICS_CREATED = "OUT: Successful KafkaTopics Created"; + // -------------------------------------------------------------------------------- // ------------------------------ USER OPERATOR ----------------------------------- // -------------------------------------------------------------------------------- @@ -150,7 +154,7 @@ public interface PerformanceConstants { String TOPIC_OPERATOR_BOBS_STREAMING_USE_CASE = "bobStreamingUseCase"; String TOPIC_OPERATOR_ALICE_BULK_USE_CASE = "aliceBulkUseCase"; String USER_OPERATOR_ALICE_BULK_USE_CASE = "aliceBulkUseCase"; - String USER_OPERATOR_CAPACITY_USE_CASE = "capacityUseCase"; + String GENERAL_CAPACITY_USE_CASE = "capacityUseCase"; /** * Performance metrics file diff --git a/systemtest/src/main/java/io/strimzi/systemtest/performance/report/TopicOperatorPerformanceReporter.java b/systemtest/src/main/java/io/strimzi/systemtest/performance/report/TopicOperatorPerformanceReporter.java index 43b3c8de6de..b13e549567c 100644 --- a/systemtest/src/main/java/io/strimzi/systemtest/performance/report/TopicOperatorPerformanceReporter.java +++ b/systemtest/src/main/java/io/strimzi/systemtest/performance/report/TopicOperatorPerformanceReporter.java @@ -25,7 +25,7 @@ public class TopicOperatorPerformanceReporter extends BasePerformanceReporter { protected Path resolveComponentUseCasePathDir(Path performanceLogDir, String useCaseName, Map performanceAttributes) { final String maxBatchSize = performanceAttributes.getOrDefault(PerformanceConstants.TOPIC_OPERATOR_IN_MAX_BATCH_SIZE, "").toString(); final String maxBatchLingerMs = performanceAttributes.getOrDefault(PerformanceConstants.TOPIC_OPERATOR_IN_MAX_BATCH_LINGER_MS, "").toString(); - final boolean clientsEnabled = !performanceAttributes.get(PerformanceConstants.TOPIC_OPERATOR_IN_NUMBER_OF_CLIENT_INSTANCES).equals(0); + final boolean clientsEnabled = !performanceAttributes.getOrDefault(PerformanceConstants.TOPIC_OPERATOR_IN_NUMBER_OF_CLIENT_INSTANCES, "0").equals(0); // Use the useCaseName to create a directory specific to the current test case (Alice or Bob) final Path topicOperatorUseCasePathDir = performanceLogDir.resolve(useCaseName + "/max-batch-size-" + maxBatchSize + "-max-linger-time-" + maxBatchLingerMs + "-with-clients-" + clientsEnabled); diff --git a/systemtest/src/main/java/io/strimzi/systemtest/resources/ResourceManager.java b/systemtest/src/main/java/io/strimzi/systemtest/resources/ResourceManager.java index 1842b75367c..e2ecd7e70ca 100644 --- a/systemtest/src/main/java/io/strimzi/systemtest/resources/ResourceManager.java +++ b/systemtest/src/main/java/io/strimzi/systemtest/resources/ResourceManager.java @@ -485,7 +485,7 @@ public void deleteResourcesOfTypeWithoutWait(final String resourceKind) { } public static > boolean waitForResourceStatus(MixedOperation operation, String kind, String namespace, String name, Enum statusType, ConditionStatus conditionStatus, long resourceTimeoutMs) { - LOGGER.info("Waiting for {}: {}/{} will have desired state: {}", kind, namespace, name, statusType); + LOGGER.log(ResourceManager.getInstance().determineLogLevel(), "Waiting for {}: {}/{} will have desired state: {}", kind, namespace, name, statusType); TestUtils.waitFor(String.format("%s: %s#%s will have desired state: %s", kind, namespace, name, statusType), TestConstants.POLL_INTERVAL_FOR_RESOURCE_READINESS, resourceTimeoutMs, @@ -496,7 +496,7 @@ public void deleteResourcesOfTypeWithoutWait(final String resourceKind) { .withName(name) .get())); - LOGGER.info("{}: {}/{} is in desired state: {}", kind, namespace, name, statusType); + LOGGER.log(ResourceManager.getInstance().determineLogLevel(), "{}: {}/{} is in desired state: {}", kind, namespace, name, statusType); return true; } diff --git a/systemtest/src/main/java/io/strimzi/systemtest/utils/kafkaUtils/KafkaTopicScalabilityUtils.java b/systemtest/src/main/java/io/strimzi/systemtest/utils/kafkaUtils/KafkaTopicScalabilityUtils.java index 1feec65a1fc..90f39807ebc 100644 --- a/systemtest/src/main/java/io/strimzi/systemtest/utils/kafkaUtils/KafkaTopicScalabilityUtils.java +++ b/systemtest/src/main/java/io/strimzi/systemtest/utils/kafkaUtils/KafkaTopicScalabilityUtils.java @@ -17,9 +17,6 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; /** * This class contains crucial methods to create, modify and check large amount of KafkaTopics @@ -30,60 +27,60 @@ public class KafkaTopicScalabilityUtils { private static final Logger LOGGER = LogManager.getLogger(KafkaTopicUtils.class); private KafkaTopicScalabilityUtils() {} + /** + * Creates Kafka topics in specified batches via Kubernetes within the given namespace. + * + * @param namespaceName the Kubernetes namespace where the topics will be created. + * @param clusterName the name of the Kafka cluster with which the topics are associated. + * @param topicPrefix the prefix string for the topic names, to which a numeric suffix will be added. + * @param start the starting index for the topic creation batch. + * @param end the ending index for the topic creation batch (exclusive). + * @param numberOfPartitions the number of partitions each topic should have. + * @param numberOfReplicas the number of replicas for each topic. + * @param minInSyncReplicas the minimum number of replicas that must be in sync. + */ public static void createTopicsViaK8s(String namespaceName, String clusterName, String topicPrefix, - int numberOfTopics, int numberOfPartitions, int numberOfReplicas, int minInSyncReplicas) { - LOGGER.info("Creating {} Topics via Kubernetes", numberOfTopics); + int start, int end, int numberOfPartitions, int numberOfReplicas, int minInSyncReplicas) { + LOGGER.info("Creating topics from {} to {} via Kubernetes", start, end - 1); - for (int i = 0; i < numberOfTopics; i++) { - String currentTopicName = topicPrefix + i; + for (int i = start; i < end; i++) { + String currentTopicName = topicPrefix + "-" + i; ResourceManager.getInstance().createResourceWithoutWait(KafkaTopicTemplates.topic( - clusterName, currentTopicName, numberOfPartitions, numberOfReplicas, minInSyncReplicas, namespaceName).build()); + clusterName, currentTopicName, numberOfPartitions, numberOfReplicas, minInSyncReplicas, namespaceName).build()); } } + public static void createTopicsViaK8s(String namespaceName, String clusterName, String topicPrefix, + int numberOfTopics, int numberOfPartitions, int numberOfReplicas, int minInSyncReplicas) { + createTopicsViaK8s(namespaceName, clusterName, topicPrefix, 0, numberOfTopics, numberOfPartitions, numberOfReplicas, minInSyncReplicas); + } + public static void waitForTopicStatus(String namespaceName, String topicPrefix, int numberOfTopics, Enum conditionType) { waitForTopicStatus(namespaceName, topicPrefix, numberOfTopics, conditionType, ConditionStatus.True); } - public static void waitForTopicStatus(String namespaceName, String topicPrefix, int numberOfTopics, Enum conditionType, ConditionStatus conditionStatus) { - LOGGER.info("Verifying that {} Topics are in {} state", numberOfTopics, conditionType.toString()); - - // Determine the appropriate number of threads - int numberOfThreads = Math.min(Runtime.getRuntime().availableProcessors(), numberOfTopics); - ExecutorService customExecutor = Executors.newFixedThreadPool(numberOfThreads); - - try { - List> topics = new ArrayList<>(); - - for (int i = 0; i < numberOfTopics; i++) { - final String currentTopic = topicPrefix + i; - topics.add(CompletableFuture.runAsync(() -> - KafkaTopicUtils.waitForKafkaTopicStatus(namespaceName, currentTopic, conditionType, conditionStatus), - customExecutor // Use the custom executor - )); - } - - CompletableFuture allTopics = CompletableFuture.allOf(topics.toArray(new CompletableFuture[0])) - .thenRunAsync(() -> LOGGER.info("All Topics are in correct state"), customExecutor); - - allTopics.join(); - } finally { - // Attempt to shut down now to immediately terminate ongoing tasks - List notExecutedTasks = customExecutor.shutdownNow(); - if (!notExecutedTasks.isEmpty()) { - LOGGER.warn("There were {} tasks that did not start", notExecutedTasks.size()); - } - try { - // Wait a while for tasks to respond to being cancelled - if (!customExecutor.awaitTermination(30, TimeUnit.SECONDS)) - LOGGER.error("Executor did not terminate"); - } catch (InterruptedException ie) { - // Preserve interrupt status - Thread.currentThread().interrupt(); - } + /** + * Waits for a specific condition to be met for a range of Kafka topics in a given namespace. + * + * @param namespaceName The Kubernetes namespace where the topics reside. + * @param topicPrefix The prefix used for topic names, to which a numeric suffix will be added. + * @param start The starting index for checking topic statuses. + * @param end The ending index for checking topic statuses (exclusive). + * @param conditionType The type of condition to wait for (e.g., READY, NOT_READY). + * @param conditionStatus The desired status of the condition (e.g., TRUE, FALSE, UNKNOWN). + */ + public static void waitForTopicStatus(String namespaceName, String topicPrefix, int start, int end, Enum conditionType, ConditionStatus conditionStatus) { + LOGGER.info("Verifying that topics from {} to {} are in {} state", start, end - 1, conditionType.toString()); + + for (int i = start; i < end; i++) { + final String currentTopic = topicPrefix + "-" + i; + KafkaTopicUtils.waitForKafkaTopicStatus(namespaceName, currentTopic, conditionType, conditionStatus); } } + public static void waitForTopicStatus(String namespaceName, String topicPrefix, int numberOfTopics, Enum conditionType, ConditionStatus conditionStatus) { + waitForTopicStatus(namespaceName, topicPrefix, 0, numberOfTopics, conditionType, conditionStatus); + } public static void waitForTopicsReady(String namespaceName, String topicPrefix, int numberOfTopics) { waitForTopicStatus(namespaceName, topicPrefix, numberOfTopics, CustomResourceStatus.Ready); diff --git a/systemtest/src/main/java/io/strimzi/systemtest/utils/kafkaUtils/KafkaTopicUtils.java b/systemtest/src/main/java/io/strimzi/systemtest/utils/kafkaUtils/KafkaTopicUtils.java index 5731eee868b..9a5c5c4d8bb 100644 --- a/systemtest/src/main/java/io/strimzi/systemtest/utils/kafkaUtils/KafkaTopicUtils.java +++ b/systemtest/src/main/java/io/strimzi/systemtest/utils/kafkaUtils/KafkaTopicUtils.java @@ -250,7 +250,9 @@ public static void waitForTopicWithPrefixDeletion(String namespaceName, String t TestUtils.waitFor("deletion of all topics with prefix: " + topicPrefix, TestConstants.GLOBAL_POLL_INTERVAL, DELETION_TIMEOUT, () -> { try { - return getAllKafkaTopicsWithPrefix(namespaceName, topicPrefix).size() == 0; + final int numberOfTopicsToDelete = getAllKafkaTopicsWithPrefix(namespaceName, topicPrefix).size(); + LOGGER.info("Remaining KafkaTopic's to delete: {} !", numberOfTopicsToDelete); + return numberOfTopicsToDelete == 0; } catch (Exception e) { return e.getMessage().contains("Not Found") || e.getMessage().contains("the server doesn't have a resource type"); } diff --git a/systemtest/src/test/java/io/strimzi/systemtest/performance/TopicOperatorPerformance.java b/systemtest/src/test/java/io/strimzi/systemtest/performance/TopicOperatorPerformance.java index b197f7052f4..3403191c72d 100644 --- a/systemtest/src/test/java/io/strimzi/systemtest/performance/TopicOperatorPerformance.java +++ b/systemtest/src/test/java/io/strimzi/systemtest/performance/TopicOperatorPerformance.java @@ -10,8 +10,11 @@ import io.strimzi.systemtest.AbstractST; import io.strimzi.systemtest.Environment; import io.strimzi.systemtest.TestConstants; +import io.strimzi.systemtest.enums.ConditionStatus; +import io.strimzi.systemtest.enums.CustomResourceStatus; import io.strimzi.systemtest.kafkaclients.internalClients.KafkaClients; import io.strimzi.systemtest.kafkaclients.internalClients.KafkaClientsBuilder; +import io.strimzi.systemtest.logs.LogCollector; import io.strimzi.systemtest.performance.gather.collectors.TopicOperatorMetricsCollector; import io.strimzi.systemtest.performance.gather.schedulers.TopicOperatorMetricsCollectionScheduler; import io.strimzi.systemtest.performance.report.TopicOperatorPerformanceReporter; @@ -26,6 +29,7 @@ import io.strimzi.systemtest.utils.ClientUtils; import io.strimzi.systemtest.utils.kafkaUtils.KafkaTopicScalabilityUtils; import io.strimzi.systemtest.utils.kafkaUtils.KafkaTopicUtils; +import io.strimzi.test.WaitException; import io.strimzi.test.k8s.KubeClusterResource; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -48,6 +52,7 @@ import java.util.stream.Stream; import static io.strimzi.systemtest.TestConstants.PERFORMANCE; +import static io.strimzi.systemtest.performance.PerformanceConstants.PERFORMANCE_CAPACITY; import static io.strimzi.test.k8s.KubeClusterResource.kubeClient; @Tag(PERFORMANCE) @@ -63,6 +68,7 @@ public class TopicOperatorPerformance extends AbstractST { private TopicOperatorMetricsCollector topicOperatorCollector; private TopicOperatorMetricsCollectionScheduler topicOperatorMetricsGatherer; private TopicOperatorPerformanceReporter topicOperatorPerformanceReporter = new TopicOperatorPerformanceReporter(); + private LogCollector logCollector; /** * Provides configurations for Alice's bulk batch use case, testing different batch sizes and linger times. @@ -567,6 +573,173 @@ private void spawnMultipleProducerAndConsumers(final String topicNamePrefix, fin } } + /** + * Provides a set of configurations for capacity tests, focusing on different levels of batching + * to understand the Kafka Topic Operator's performance under varying conditions. + * Each configuration is designed to test the impact of different max batch sizes and linger times + * on the system's throughput and responsiveness. + * + * @return Stream of Arguments where each pair consists of: + * - Max Batch Size (String): The maximum number of topic operations (e.g., creation, deletion) + * that the Topic Operator will batch together before processing. + * - Max Batch Linger ms (String): The maximum time, in milliseconds, that the Topic Operator + * will wait before processing a batch, allowing more operations to accumulate and thus increasing + * batching efficiency. + * + * Configurations include: + * - Default configuration: A balanced setup for typical use cases. + * - Minimal batching: Configured for high responsiveness with very small batch sizes and minimal linger times. + * - Moderate batching: A balance between batching efficiency and responsiveness. + * - Heavier batching: Larger batches and linger times aimed at higher throughput. + * - Extreme batching: Tests the upper limits of performance with very large batch sizes. + * - Maximum possible batching: Pushes the system to its stress limits to evaluate performance under extreme conditions. + */ + private static Stream provideConfigurationsForCapacity() { + return Stream.of( + Arguments.of("100", "100"), // Default configuration + Arguments.of("10", "1"), // Minimal batching for high responsiveness + Arguments.of("50", "100"), // Moderate batching for balanced performance + Arguments.of("100", "500"), // Heavier batching for throughput focus + Arguments.of("500", "1000"), // Extreme batching to test upper limits of performance + Arguments.of("1000", "2000") // Maximum possible batching for stress testing + ); + } + + @Tag(PERFORMANCE_CAPACITY) + @ParameterizedTest + @MethodSource("provideConfigurationsForCapacity") + void testCapacity(String maxBatchSize, String maxBatchLingerMs) throws IOException { + final int brokerReplicas = 3; + final int controllerReplicas = 3; + int successfulCreations = 0; + final String maxQueueSize = String.valueOf(Integer.MAX_VALUE); + + try { + resourceManager.createResourceWithWait( + NodePoolsConverter.convertNodePoolsIfNeeded( + KafkaNodePoolTemplates.brokerPoolPersistentStorage(testStorage.getNamespaceName(), testStorage.getBrokerPoolName(), testStorage.getClusterName(), brokerReplicas) + .editSpec() + .withNewPersistentClaimStorage() + .withSize("50Gi") + .endPersistentClaimStorage() + .endSpec() + .build(), + KafkaNodePoolTemplates.controllerPoolPersistentStorage(testStorage.getNamespaceName(), testStorage.getControllerPoolName(), testStorage.getClusterName(), controllerReplicas) + .editSpec() + .withNewPersistentClaimStorage() + .withSize("5Gi") + .endPersistentClaimStorage() + .endSpec() + .build() + ) + ); + resourceManager.createResourceWithWait( + KafkaTemplates.kafkaMetricsConfigMap(testStorage.getNamespaceName(), testStorage.getClusterName()), + KafkaTemplates.kafkaWithMetrics(testStorage.getNamespaceName(), testStorage.getClusterName(), brokerReplicas, controllerReplicas) + .editSpec() + .editEntityOperator() + .editTopicOperator() + .withReconciliationIntervalSeconds(10) + .endTopicOperator() + .editOrNewTemplate() + .editOrNewTopicOperatorContainer() + // Finalizers ensure orderly and controlled deletion of KafkaTopic resources. + // In this case we would delete them automatically via ResourceManager + .addNewEnv() + .withName("STRIMZI_USE_FINALIZERS") + .withValue("false") + .endEnv() + .addNewEnv() + .withName("STRIMZI_ENABLE_ADDITIONAL_METRICS") + .withValue("true") + .endEnv() + .addNewEnv() + .withName("STRIMZI_MAX_QUEUE_SIZE") + .withValue(maxQueueSize) + .endEnv() + .addNewEnv() + .withName("STRIMZI_MAX_BATCH_SIZE") + .withValue(maxBatchSize) + .endEnv() + .addNewEnv() + .withName("MAX_BATCH_LINGER_MS") + .withValue(maxBatchLingerMs) + .endEnv() + .endTopicOperatorContainer() + .endTemplate() + .endEntityOperator() + .endSpec() + .build(), + ScraperTemplates.scraperPod(testStorage.getNamespaceName(), testStorage.getScraperName()).build() + ); + + this.testStorage.addToTestStorage(TestConstants.SCRAPER_POD_KEY, + kubeClient().listPodsByPrefixInName(this.testStorage.getNamespaceName(), testStorage.getScraperName()).get(0).getMetadata().getName()); + + // -- Metrics POLL -- + // Assuming 'testStorage' contains necessary details like namespace and scraperPodName + this.topicOperatorCollector = new TopicOperatorMetricsCollector.Builder() + .withScraperPodName(this.testStorage.getScraperPodName()) + .withNamespaceName(this.testStorage.getNamespaceName()) + .withComponentType(ComponentType.TopicOperator) + .withComponentName(this.testStorage.getClusterName()) + .build(); + + this.topicOperatorMetricsGatherer = new TopicOperatorMetricsCollectionScheduler(this.topicOperatorCollector, "strimzi.io/cluster=" + this.testStorage.getClusterName()); + this.topicOperatorMetricsGatherer.startCollecting(); + + // we will create incrementally topics + final int batchSize = 100; + + while (true) { // Endless loop + int start = successfulCreations; + int end = successfulCreations + batchSize; + + try { + // Create topics + KafkaTopicScalabilityUtils.createTopicsViaK8s(testStorage.getNamespaceName(), testStorage.getClusterName(), + testStorage.getTopicName(), start, end, 12, 3, 2); + KafkaTopicScalabilityUtils.waitForTopicStatus(testStorage.getNamespaceName(), testStorage.getTopicName(), + start, end, CustomResourceStatus.Ready, ConditionStatus.True); + + successfulCreations += batchSize; + LOGGER.info("Successfully created and verified batch from {} to {}", start, end); + } catch (WaitException e) { + LOGGER.error("Failed to create Kafka topics from index {} to {}: {}", start, end, e.getMessage()); + + // after a failure we will gather logs from all components under test (i.e., TO, Kafka pods) to observer behaviour + // what might be a bottleneck of such performance + this.logCollector = new LogCollector(); + this.logCollector.collect(); + + break; // Break out of the loop if an error occurs + } + } + } finally { + // to enchantment a process of deleting we should delete all resources at once + // I saw a behaviour where deleting one by one might lead to 10s delay for deleting each KafkaTopic + LOGGER.info("Start deletion KafkaTopics in namespace:{}", testStorage.getNamespaceName()); + resourceManager.deleteResourcesOfTypeWithoutWait(KafkaTopic.RESOURCE_KIND); + KafkaTopicUtils.waitForTopicWithPrefixDeletion(testStorage.getNamespaceName(), testStorage.getTopicName()); + + if (this.topicOperatorMetricsGatherer != null) { + this.topicOperatorMetricsGatherer.stopCollecting(); + + final Map performanceAttributes = new LinkedHashMap<>(); + + performanceAttributes.put(PerformanceConstants.TOPIC_OPERATOR_IN_MAX_QUEUE_SIZE, maxQueueSize); + performanceAttributes.put(PerformanceConstants.TOPIC_OPERATOR_IN_MAX_BATCH_SIZE, maxBatchSize); + performanceAttributes.put(PerformanceConstants.TOPIC_OPERATOR_IN_MAX_BATCH_LINGER_MS, maxBatchLingerMs); + + performanceAttributes.put(PerformanceConstants.TOPIC_OPERATOR_OUT_SUCCESSFUL_KAFKA_TOPICS_CREATED, successfulCreations); + + performanceAttributes.put(PerformanceConstants.METRICS_HISTORY, this.topicOperatorMetricsGatherer.getMetricsStore()); // Map of metrics history + + this.topicOperatorPerformanceReporter.logPerformanceData(this.testStorage, performanceAttributes, REPORT_DIRECTORY + "/" + PerformanceConstants.GENERAL_CAPACITY_USE_CASE, ACTUAL_TIME, Environment.PERFORMANCE_DIR); + } + } + } + @BeforeEach public void setUp(ExtensionContext extensionContext) { this.testStorage = new TestStorage(extensionContext, TestConstants.CO_NAMESPACE); diff --git a/systemtest/src/test/java/io/strimzi/systemtest/performance/UserOperatorPerformance.java b/systemtest/src/test/java/io/strimzi/systemtest/performance/UserOperatorPerformance.java index b02b9f9920f..844ed4e3c3b 100644 --- a/systemtest/src/test/java/io/strimzi/systemtest/performance/UserOperatorPerformance.java +++ b/systemtest/src/test/java/io/strimzi/systemtest/performance/UserOperatorPerformance.java @@ -418,7 +418,7 @@ void testCapacity(String controllerThreadPoolSize, String cacheRefreshIntervalMs performanceAttributes.put(PerformanceConstants.METRICS_HISTORY, this.userOperatorMetricsGatherer.getMetricsStore()); // Map of metrics history - this.userOperatorPerformanceReporter.logPerformanceData(this.testStorage, performanceAttributes, REPORT_DIRECTORY + "/" + PerformanceConstants.USER_OPERATOR_CAPACITY_USE_CASE, ACTUAL_TIME, Environment.PERFORMANCE_DIR); + this.userOperatorPerformanceReporter.logPerformanceData(this.testStorage, performanceAttributes, REPORT_DIRECTORY + "/" + PerformanceConstants.GENERAL_CAPACITY_USE_CASE, ACTUAL_TIME, Environment.PERFORMANCE_DIR); } } } diff --git a/systemtest/tmt/plans/main.fmf b/systemtest/tmt/plans/main.fmf index 6e2bc6728e5..72dd0cc05ea 100644 --- a/systemtest/tmt/plans/main.fmf +++ b/systemtest/tmt/plans/main.fmf @@ -224,3 +224,16 @@ finish: test: - performance-capacity +/performance-topic-operator-capacity: + summary: Run topic operator performance capacity strimzi test suite + provision: + hardware: + memory: ">= 30 GiB" + cpu: + processors: ">= 8" + # it seems that default disk size (i.e., 50GB) is not enough for capacity tests + disk: + - size: ">= 60 GB" + discover+: + test: + - performance-topic-operator-capacity diff --git a/systemtest/tmt/tests/strimzi/main.fmf b/systemtest/tmt/tests/strimzi/main.fmf index f0be0fe873a..c88cfec8bc0 100644 --- a/systemtest/tmt/tests/strimzi/main.fmf +++ b/systemtest/tmt/tests/strimzi/main.fmf @@ -83,4 +83,14 @@ adjust: tier: 1 environment+: TEST_PROFILE: performance-capacity - PARALLELISM_ENABLED: false \ No newline at end of file + PARALLELISM_ENABLED: false + +/performance-topic-operator-capacity: + summary: Run topic operator performance capacity strimzi test suite + tags: [strimzi, kafka, kraft, performance, performance-capacity] + duration: 24h + tier: 1 + environment+: + TEST_PROFILE: performance-capacity + PARALLELISM_ENABLED: false + TESTS: TopicOperatorPerformance#testCapacity \ No newline at end of file