From 4ece50958b8f9bd68e8d84533fe84b8b0ade4471 Mon Sep 17 00:00:00 2001 From: Federico Valeri Date: Sat, 18 May 2024 17:53:16 +0200 Subject: [PATCH] Fix Topic Operator metrics tests Fixes unamage topic metrics, where the operator should update total and successful reconciliations. Fixes a test issue where non existing metrics were silently ignored, instead of failing the test. Renames and completes the original tests with the missing metrics. Reduces the timeout used to wait for metric appearance from 120s to 30s (this change should be verified on Azure). Signed-off-by: Federico Valeri --- .../topic/BatchingTopicController.java | 1 + .../operator/topic/ReplicasChangeHandler.java | 7 +- .../metrics/TopicOperatorMetricsHolder.java | 145 +++++----- .../topic/TopicOperatorMetricsTest.java | 252 +++++++++++------- 4 files changed, 220 insertions(+), 185 deletions(-) diff --git a/topic-operator/src/main/java/io/strimzi/operator/topic/BatchingTopicController.java b/topic-operator/src/main/java/io/strimzi/operator/topic/BatchingTopicController.java index ee03ed3c106..8f0316cb35e 100644 --- a/topic-operator/src/main/java/io/strimzi/operator/topic/BatchingTopicController.java +++ b/topic-operator/src/main/java/io/strimzi/operator/topic/BatchingTopicController.java @@ -465,6 +465,7 @@ private void updateInternal(List topics) { var partitionedByManaged = remainingAfterDeletions.stream().collect(Collectors.partitioningBy(reconcilableTopic -> TopicOperatorUtil.isManaged(reconcilableTopic.kt()))); var unmanaged = partitionedByManaged.get(false); addOrRemoveFinalizer(useFinalizer, unmanaged).forEach(rt -> putResult(results, rt, Either.ofRight(null))); + metrics.reconciliationsCounter(namespace).increment(unmanaged.size()); // skip reconciliation of paused KafkaTopics var partitionedByPaused = validateManagedTopics(partitionedByManaged).stream().filter(hasTopicSpec) diff --git a/topic-operator/src/main/java/io/strimzi/operator/topic/ReplicasChangeHandler.java b/topic-operator/src/main/java/io/strimzi/operator/topic/ReplicasChangeHandler.java index b0be4b03c1a..fc6c89deb7d 100644 --- a/topic-operator/src/main/java/io/strimzi/operator/topic/ReplicasChangeHandler.java +++ b/topic-operator/src/main/java/io/strimzi/operator/topic/ReplicasChangeHandler.java @@ -73,12 +73,7 @@ public class ReplicasChangeHandler { private final ExecutorService httpClientExecutor; private final ObjectMapper mapper; - /** - * Create a new replicas change client instance. - * - * @param config Topic Operator configuration. - */ - public ReplicasChangeHandler(TopicOperatorConfig config) { + ReplicasChangeHandler(TopicOperatorConfig config) { this.config = config; this.httpClientExecutor = Executors.newCachedThreadPool(); this.mapper = new ObjectMapper(); diff --git a/topic-operator/src/main/java/io/strimzi/operator/topic/metrics/TopicOperatorMetricsHolder.java b/topic-operator/src/main/java/io/strimzi/operator/topic/metrics/TopicOperatorMetricsHolder.java index 46b1991e5c7..25b2c866374 100644 --- a/topic-operator/src/main/java/io/strimzi/operator/topic/metrics/TopicOperatorMetricsHolder.java +++ b/topic-operator/src/main/java/io/strimzi/operator/topic/metrics/TopicOperatorMetricsHolder.java @@ -27,50 +27,50 @@ public class TopicOperatorMetricsHolder extends MetricsHolder { */ public static final String METRICS_RECONCILIATIONS_MAX_BATCH_SIZE = METRICS_RECONCILIATIONS + ".max.batch.size"; /** - * Metric name for add finalizer duration. + * Metric name for Kubernetes add finalizer duration. */ public static final String METRICS_ADD_FINALIZER_DURATION = METRICS_PREFIX + "add.finalizer.duration"; /** - * Metric name for removing finalizer duration. + * Metric name for Kubernetes removing finalizer duration. */ public static final String METRICS_REMOVE_FINALIZER_DURATION = METRICS_PREFIX + "remove.finalizer.duration"; /** - * Metric name for create topics duration. + * Metric name for Kafka create topics duration. */ public static final String METRICS_CREATE_TOPICS_DURATION = METRICS_PREFIX + "create.topics.duration"; /** - * Metric name for update status duration. + * Metric name for Kubernetes update status duration. */ public static final String METRICS_UPDATE_TOPICS_DURATION = METRICS_PREFIX + "update.status.duration"; /** - * Metric name for list reassignments duration. + * Metric name for Kafka list reassignments duration. */ public static final String METRICS_LIST_REASSIGNMENTS_DURATION = METRICS_PREFIX + "list.reassignments.duration"; /** - * Metric name for alter configs duration. + * Metric name for Kafka alter configs duration. */ public static final String METRICS_ALTER_CONFIGS_DURATION = METRICS_PREFIX + "alter.configs.duration"; /** - * Metric name for create partitions duration. + * Metric name for Kafka create partitions duration. */ public static final String METRICS_CREATE_PARTITIONS_DURATION = METRICS_PREFIX + "create.partitions.duration"; /** - * Metric name for describe topics duration. + * Metric name for Kafka describe topics duration. */ public static final String METRICS_DESCRIBE_TOPICS_DURATION = METRICS_PREFIX + "describe.topics.duration"; /** - * Metric name for describe configs duration. + * Metric name for Kafka describe configs duration. */ public static final String METRICS_DESCRIBE_CONFIGS_DURATION = METRICS_PREFIX + "describe.configs.duration"; /** - * Metric name for delete topics duration. + * Metric name for Kafka delete topics duration. */ public static final String METRICS_DELETE_TOPICS_DURATION = METRICS_PREFIX + "delete.topics.duration"; private final Map reconciliationsMaxQueueMap = new ConcurrentHashMap<>(1); private final Map reconciliationsMaxBatchMap = new ConcurrentHashMap<>(1); - // additional metrics, useful for tuning or monitoring specific internal operations + // additional metrics useful for tuning or monitoring specific internal requests private final Map addFinalizerTimerMap = new ConcurrentHashMap<>(1); private final Map removeFinalizerTimerMap = new ConcurrentHashMap<>(1); private final Map createTopicsTimerMap = new ConcurrentHashMap<>(1); @@ -85,9 +85,9 @@ public class TopicOperatorMetricsHolder extends MetricsHolder { /** * Constructs the operator metrics holder. * - * @param kind Kind of the resources for which these metrics apply - * @param selectorLabels Selector labels to select the controller resources - * @param metricsProvider Topic Operator metrics provider + * @param kind Kind of the resources for which these metrics apply. + * @param selectorLabels Selector labels to select the controller resources. + * @param metricsProvider Topic Operator metrics provider. */ public TopicOperatorMetricsHolder(String kind, Labels selectorLabels, @@ -99,13 +99,12 @@ public TopicOperatorMetricsHolder(String kind, * Creates or gets a fine-grained timer-type metric. * This can be used to measure the duration of internal operations. * - * @param namespace Namespace of the resources - * @param metricName Name of the metric - * @param metricHelp Help description of the metric - * @param selectorLabels Selector labels to select the controller resources - * @param timerMap Map with timers - * - * @return Timer metric + * @param namespace Namespace of the resources. + * @param metricName Name of the metric. + * @param metricHelp Help description of the metric. + * @param selectorLabels Selector labels to select the controller resources. + * @param timerMap Map with timers. + * @return Timer metric. */ private Timer getFineGrainedTimer(String namespace, String metricName, String metricHelp, Optional selectorLabels, Map timerMap) { @@ -116,9 +115,8 @@ private Timer getFineGrainedTimer(String namespace, String metricName, String me /** * Gauge metric for the max size recorded for the event queue. * - * @param namespace Namespace of the resources being reconciled - * - * @return Metrics gauge + * @param namespace Namespace of the resources being reconciled. + * @return Metrics gauge. */ public AtomicInteger reconciliationsMaxQueueSize(String namespace) { return getGauge(new MetricKey(kind, namespace), METRICS_RECONCILIATIONS_MAX_QUEUE_SIZE, @@ -129,9 +127,8 @@ public AtomicInteger reconciliationsMaxQueueSize(String namespace) { /** * Gauge metric for the max size recorded for the event batch. * - * @param namespace Namespace of the resources being reconciled - * - * @return Metrics gauge + * @param namespace Namespace of the resources being reconciled. + * @return Metrics gauge. */ public AtomicInteger reconciliationsMaxBatchSize(String namespace) { return getGauge(new MetricKey(kind, namespace), METRICS_RECONCILIATIONS_MAX_BATCH_SIZE, @@ -140,132 +137,122 @@ public AtomicInteger reconciliationsMaxBatchSize(String namespace) { } /** - * Timer which measures how long the addFinalizer Kubernetes operations take. - * - * @param namespace Namespace of the resources being reconciled + * Timer which measures how long the Kubernetes add finalizer request takes to complete. * - * @return Metrics timer + * @param namespace Namespace of the resources being reconciled. + * @return Metrics timer. */ public Timer addFinalizerTimer(String namespace) { return getFineGrainedTimer(namespace, METRICS_ADD_FINALIZER_DURATION, - "The time the addFinalizer Kubernetes operation takes to complete", + "The time Kubernetes addFinalizer request takes to complete", Optional.of(getLabelSelectorValues()), addFinalizerTimerMap); } /** - * Timer which measures how long the removeFinalizer Kubernetes operations take. + * Timer which measures how long the Kubernetes remove finalizer request takes to complete. * - * @param namespace Namespace of the resources being reconciled - * - * @return Metrics timer + * @param namespace Namespace of the resources being reconciled. + * @return Metrics timer. */ public Timer removeFinalizerTimer(String namespace) { return getFineGrainedTimer(namespace, METRICS_REMOVE_FINALIZER_DURATION, - "The time the removeFinalizer Kubernetes operation takes to complete", + "The time Kubernetes removeFinalizer request takes to complete", Optional.of(getLabelSelectorValues()), removeFinalizerTimerMap); } /** - * Timer which measures how long the createTopics Kafka operations take. - * - * @param namespace Namespace of the resources being reconciled + * Timer which measures how long the Kafka createTopics request takes to complete. * - * @return Metrics timer + * @param namespace Namespace of the resources being reconciled. + * @return Metrics timer. */ public Timer createTopicsTimer(String namespace) { return getFineGrainedTimer(namespace, METRICS_CREATE_TOPICS_DURATION, - "The time the createTopics Kafka operation takes to complete", + "The time Kafka createTopics request takes to complete", Optional.of(getLabelSelectorValues()), createTopicsTimerMap); } /** - * Timer which measures how long the updateStatus Kubernetes operations take. - * - * @param namespace Namespace of the resources being reconciled - * - * @return Metrics timer + * Timer which measures how long the Kubernetes updateStatus request takes to complete. + * + * @param namespace Namespace of the resources being reconciled. + * @return Metrics timer. */ public Timer updateStatusTimer(String namespace) { return getFineGrainedTimer(namespace, METRICS_UPDATE_TOPICS_DURATION, - "The time the updateStatus Kubernetes operation takes to complete", + "The time Kubernetes updateStatus request takes to complete", Optional.of(getLabelSelectorValues()), updateStatusTimerMap); } /** - * Timer which measures how long the listReassignments Kafka operations take. - * - * @param namespace Namespace of the resources being reconciled + * Timer which measures how long the Kafka listPartitionReassignments request takes to complete. * - * @return Metrics timer + * @param namespace Namespace of the resources being reconciled. + * @return Metrics timer. */ public Timer listReassignmentsTimer(String namespace) { return getFineGrainedTimer(namespace, METRICS_LIST_REASSIGNMENTS_DURATION, - "The time the listPartitionReassignments Kafka operation takes to complete", + "The time Kafka listPartitionReassignments request takes to complete", Optional.of(getLabelSelectorValues()), listReassignmentsTimerMap); } /** - * Timer which measures how long the incrementalAlterConfigs Kafka operations take. + * Timer which measures how long the Kafka incrementalAlterConfigs request takes to complete. * - * @param namespace Namespace of the resources being reconciled - * - * @return Metrics timer + * @param namespace Namespace of the resources being reconciled. + * @return Metrics timer. */ public Timer alterConfigsTimer(String namespace) { return getFineGrainedTimer(namespace, METRICS_ALTER_CONFIGS_DURATION, - "The time the incrementalAlterConfigs Kafka operation takes to complete", + "The time Kafka incrementalAlterConfigs request takes to complete", Optional.of(getLabelSelectorValues()), alterConfigsTimerMap); } /** - * Timer which measures how long the createPartitions Kafka operations take. - * - * @param namespace Namespace of the resources being reconciled + * Timer which measures how long the Kafka createPartitions request takes to complete. * - * @return Metrics timer + * @param namespace Namespace of the resources being reconciled. + * @return Metrics timer. */ public Timer createPartitionsTimer(String namespace) { return getFineGrainedTimer(namespace, METRICS_CREATE_PARTITIONS_DURATION, - "The time the createPartitions Kafka operation takes to complete", + "The time Kafka createPartitions request takes to complete", Optional.of(getLabelSelectorValues()), createPartitionsTimerMap); } /** - * Timer which measures how long the describeTopics Kafka operations take. + * Timer which measures how long the Kafka describeTopics request takes to complete. * - * @param namespace Namespace of the resources being reconciled - * - * @return Metrics timer + * @param namespace Namespace of the resources being reconciled. + * @return Metrics timer. */ public Timer describeTopicsTimer(String namespace) { return getFineGrainedTimer(namespace, METRICS_DESCRIBE_TOPICS_DURATION, - "The time the describeTopics Kafka operation takes to complete", + "The time Kafka describeTopics request takes to complete", Optional.of(getLabelSelectorValues()), describeTopicsTimerMap); } /** - * Timer which measures how long the describeConfigs Kafka operations take. - * - * @param namespace Namespace of the resources being reconciled + * Timer which measures how long the Kafka describeConfigs request takes to complete. * - * @return Metrics timer + * @param namespace Namespace of the resources being reconciled. + * @return Metrics timer. */ public Timer describeConfigsTimer(String namespace) { return getFineGrainedTimer(namespace, METRICS_DESCRIBE_CONFIGS_DURATION, - "The time the describeConfigs Kafka operation takes to complete", + "The time Kafka describeConfigs request takes to complete", Optional.of(getLabelSelectorValues()), describeConfigsTimerMap); } /** - * Timer which measures how long the deleteTopics Kafka operations take. - * - * @param namespace Namespace of the resources being reconciled + * Timer which measures how long the Kafka deleteTopics request takes to complete. * - * @return Metrics timer + * @param namespace Namespace of the resources being reconciled. + * @return Metrics timer. */ public Timer deleteTopicsTimer(String namespace) { return getFineGrainedTimer(namespace, METRICS_DELETE_TOPICS_DURATION, - "The time the deleteTopics Kafka operation takes to complete", + "The time Kafka deleteTopics request takes to complete", Optional.of(getLabelSelectorValues()), deleteTopicsTimerMap); } -} \ No newline at end of file +} diff --git a/topic-operator/src/test/java/io/strimzi/operator/topic/TopicOperatorMetricsTest.java b/topic-operator/src/test/java/io/strimzi/operator/topic/TopicOperatorMetricsTest.java index f096eede91a..30efac65976 100644 --- a/topic-operator/src/test/java/io/strimzi/operator/topic/TopicOperatorMetricsTest.java +++ b/topic-operator/src/test/java/io/strimzi/operator/topic/TopicOperatorMetricsTest.java @@ -10,7 +10,6 @@ import io.kroxylicious.testing.kafka.api.KafkaCluster; import io.kroxylicious.testing.kafka.junit5ext.KafkaClusterExtension; import io.micrometer.core.instrument.search.MeterNotFoundException; -import io.micrometer.core.instrument.search.RequiredSearch; import io.micrometer.core.instrument.simple.SimpleMeterRegistry; import io.strimzi.api.kafka.Crds; import io.strimzi.api.kafka.model.topic.KafkaTopic; @@ -21,6 +20,8 @@ import io.strimzi.operator.topic.metrics.TopicOperatorMetricsProvider; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.hamcrest.Matcher; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; @@ -29,10 +30,10 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mockito; -import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; +import java.util.function.UnaryOperator; import static io.strimzi.api.ResourceAnnotations.ANNO_STRIMZI_IO_PAUSE_RECONCILIATION; import static io.strimzi.api.kafka.model.topic.KafkaTopic.RESOURCE_KIND; @@ -45,79 +46,76 @@ @ExtendWith(KafkaClusterExtension.class) public class TopicOperatorMetricsTest { + private static final Logger LOGGER = LogManager.getLogger(TopicOperatorMetricsTest.class); private static final String NAMESPACE = "topic-operator-test"; private static final int MAX_QUEUE_SIZE = 200; private static final int MAX_BATCH_SIZE = 10; private static final int MAX_THREADS = 2; private static final long MAX_BATCH_LINGER_MS = 10_000; - private static KubernetesClient client; - private static TopicOperatorMetricsHolder metrics; + private static KubernetesClient kubeClient; + private static TopicOperatorMetricsHolder metricsHolder; @BeforeAll public static void beforeAll(TestInfo testInfo) { TopicOperatorTestUtil.setupKubeCluster(testInfo, NAMESPACE); - client = new KubernetesClientBuilder().build(); - TopicOperatorMetricsProvider metricsProvider = new TopicOperatorMetricsProvider(new SimpleMeterRegistry()); - metrics = new TopicOperatorMetricsHolder(RESOURCE_KIND, null, metricsProvider); + kubeClient = new KubernetesClientBuilder().build(); + metricsHolder = new TopicOperatorMetricsHolder(RESOURCE_KIND, null, + new TopicOperatorMetricsProvider(new SimpleMeterRegistry())); } @AfterAll public static void afterAll(TestInfo testInfo) { - TopicOperatorTestUtil.cleanupNamespace(client, testInfo, NAMESPACE); + TopicOperatorTestUtil.cleanupNamespace(kubeClient, testInfo, NAMESPACE); TopicOperatorTestUtil.teardownKubeCluster(NAMESPACE); - client.close(); + kubeClient.close(); } @Test - public void shouldHaveMetricsAfterSomeEvents() throws InterruptedException { + public void eventHandlerMetrics() throws InterruptedException { var config = TopicOperatorConfig.buildFromMap(Map.of( TopicOperatorConfig.BOOTSTRAP_SERVERS.key(), "localhost:9092", TopicOperatorConfig.NAMESPACE.key(), NAMESPACE)); BatchingLoop mockQueue = mock(BatchingLoop.class); - TopicOperatorEventHandler eventHandler = new TopicOperatorEventHandler(config, mockQueue, metrics); + TopicOperatorEventHandler eventHandler = new TopicOperatorEventHandler(config, mockQueue, metricsHolder); + int numOfTestResources = 100; for (int i = 0; i < numOfTestResources; i++) { - KafkaTopic kt = createKafkaTopic("t" + i, "100100"); + KafkaTopic kt = buildTopicWithVersion("t" + i); eventHandler.onAdd(kt); } - String[] tags = new String[]{"kind", RESOURCE_KIND, "namespace", NAMESPACE}; - assertMetricMatches(MetricsHolder.METRICS_RESOURCES, tags, "gauge", is(Double.valueOf(numOfTestResources))); + assertMetricMatches(MetricsHolder.METRICS_RESOURCES, "gauge", is(Double.valueOf(numOfTestResources))); for (int i = 0; i < numOfTestResources; i++) { - KafkaTopic kt = createKafkaTopic("t" + i, "100100"); + KafkaTopic kt = buildTopicWithVersion("t" + i); eventHandler.onDelete(kt, false); } - assertMetricMatches(MetricsHolder.METRICS_RESOURCES, tags, "gauge", is(0.0)); + assertMetricMatches(MetricsHolder.METRICS_RESOURCES, "gauge", is(0.0)); - KafkaTopic foo1 = createKafkaTopic("my-topic", "100100"); - eventHandler.onAdd(foo1); - assertMetricMatches(MetricsHolder.METRICS_RESOURCES_PAUSED, tags, "gauge", is(0.0)); + KafkaTopic t1 = buildTopicWithVersion("t1"); + KafkaTopic t2 = buildTopicWithVersion("t2"); + t2.getMetadata().setAnnotations(Map.of(ANNO_STRIMZI_IO_PAUSE_RECONCILIATION, "true")); + eventHandler.onUpdate(t1, t2); + assertMetricMatches(MetricsHolder.METRICS_RESOURCES_PAUSED, "gauge", is(1.0)); - KafkaTopic foo2 = createKafkaTopic("my-topic", "100100"); - foo2.getMetadata().setAnnotations(Map.of(ANNO_STRIMZI_IO_PAUSE_RECONCILIATION, "true")); - eventHandler.onUpdate(foo1, foo2); - assertMetricMatches(MetricsHolder.METRICS_RESOURCES_PAUSED, tags, "gauge", is(1.0)); - - eventHandler.onUpdate(foo1, foo1); - assertMetricMatches(MetricsHolder.METRICS_RESOURCES_PAUSED, tags, "gauge", is(1.0)); - - KafkaTopic foo3 = createKafkaTopic("my-topic", "100100"); - foo3.getMetadata().setAnnotations(Map.of(ANNO_STRIMZI_IO_PAUSE_RECONCILIATION, "false")); - eventHandler.onUpdate(foo2, foo3); - assertMetricMatches(MetricsHolder.METRICS_RESOURCES_PAUSED, tags, "gauge", is(0.0)); + KafkaTopic t3 = buildTopicWithVersion("t3"); + t3.getMetadata().setAnnotations(Map.of(ANNO_STRIMZI_IO_PAUSE_RECONCILIATION, "false")); + eventHandler.onUpdate(t2, t3); + assertMetricMatches(MetricsHolder.METRICS_RESOURCES_PAUSED, "gauge", is(0.0)); } - private static KafkaTopic createKafkaTopic(String name, String version) { - KafkaTopic kt = new KafkaTopic(); - kt.getMetadata().setNamespace(NAMESPACE); - kt.getMetadata().setName(name); - kt.getMetadata().setResourceVersion(version); - return kt; + private KafkaTopic buildTopicWithVersion(String name) { + return new KafkaTopicBuilder() + .editOrNewMetadata() + .withNamespace(NAMESPACE) + .withName(name) + .withResourceVersion("100100") + .endMetadata() + .build(); } @Test - public void shouldHaveMetricsAfterSomeUpserts() throws InterruptedException { + public void batchingLoopMetrics() throws InterruptedException { BatchingLoop batchingLoop = createAndStartBatchingLoop(); int numOfTestResources = 100; for (int i = 0; i < numOfTestResources; i++) { @@ -127,17 +125,16 @@ public void shouldHaveMetricsAfterSomeUpserts() throws InterruptedException { batchingLoop.offer(new TopicUpsert(0, NAMESPACE, "t" + i, "100100")); } } - - String[] tags = new String[]{"kind", RESOURCE_KIND, "namespace", NAMESPACE}; - assertMetricMatches(TopicOperatorMetricsHolder.METRICS_RECONCILIATIONS_MAX_QUEUE_SIZE, tags, "gauge", greaterThan(0.0)); - assertMetricMatches(TopicOperatorMetricsHolder.METRICS_RECONCILIATIONS_MAX_QUEUE_SIZE, tags, "gauge", lessThanOrEqualTo(Double.valueOf(MAX_QUEUE_SIZE))); - assertMetricMatches(TopicOperatorMetricsHolder.METRICS_RECONCILIATIONS_MAX_BATCH_SIZE, tags, "gauge", greaterThan(0.0)); - assertMetricMatches(TopicOperatorMetricsHolder.METRICS_RECONCILIATIONS_MAX_BATCH_SIZE, tags, "gauge", lessThanOrEqualTo(Double.valueOf(MAX_BATCH_SIZE))); - assertMetricMatches(TopicOperatorMetricsHolder.METRICS_RECONCILIATIONS_LOCKED, tags, "counter", greaterThan(0.0)); + + assertMetricMatches(TopicOperatorMetricsHolder.METRICS_RECONCILIATIONS_MAX_QUEUE_SIZE, "gauge", greaterThan(0.0)); + assertMetricMatches(TopicOperatorMetricsHolder.METRICS_RECONCILIATIONS_MAX_QUEUE_SIZE, "gauge", lessThanOrEqualTo(Double.valueOf(MAX_QUEUE_SIZE))); + assertMetricMatches(TopicOperatorMetricsHolder.METRICS_RECONCILIATIONS_MAX_BATCH_SIZE, "gauge", greaterThan(0.0)); + assertMetricMatches(TopicOperatorMetricsHolder.METRICS_RECONCILIATIONS_MAX_BATCH_SIZE, "gauge", lessThanOrEqualTo(Double.valueOf(MAX_BATCH_SIZE))); + assertMetricMatches(TopicOperatorMetricsHolder.METRICS_RECONCILIATIONS_LOCKED, "counter", greaterThan(0.0)); batchingLoop.stop(); } - private static BatchingLoop createAndStartBatchingLoop() throws InterruptedException { + private BatchingLoop createAndStartBatchingLoop() { BatchingTopicController controller = mock(BatchingTopicController.class); ItemStore itemStore = mock(ItemStore.class); Runnable stop = mock(Runnable.class); @@ -149,75 +146,128 @@ private static BatchingLoop createAndStartBatchingLoop() throws InterruptedExcep MAX_BATCH_LINGER_MS, itemStore, stop, - metrics, + metricsHolder, NAMESPACE); batchingLoop.start(); return batchingLoop; } - + @Test - public void shouldHaveMetricsAfterSomeReconciliations(KafkaCluster cluster) throws InterruptedException { - Admin admin = Admin.create(Map.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers())); + public void batchingTopicControllerMetrics(KafkaCluster cluster) throws InterruptedException { + var admin = Admin.create(Map.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers())); var config = Mockito.mock(TopicOperatorConfig.class); Mockito.doReturn(NAMESPACE).when(config).namespace(); Mockito.doReturn(true).when(config).useFinalizer(); - Mockito.doReturn(false).when(config).enableAdditionalMetrics(); + Mockito.doReturn(true).when(config).enableAdditionalMetrics(); + var replicasChangeHandler = Mockito.mock(ReplicasChangeHandler.class); - BatchingTopicController controller = new BatchingTopicController(config, Map.of("key", "VALUE"), admin, client, metrics, replicasChangeHandler); - - KafkaTopic t1 = createResource(client, "t1", "t1"); - KafkaTopic t2 = createResource(client, "t2", "t1"); - List updateBatch = List.of( - new ReconcilableTopic(new Reconciliation("test", RESOURCE_KIND, NAMESPACE, TopicOperatorUtil.topicName(t1)), t1, TopicOperatorUtil.topicName(t1)), - new ReconcilableTopic(new Reconciliation("test", RESOURCE_KIND, NAMESPACE, TopicOperatorUtil.topicName(t2)), t2, TopicOperatorUtil.topicName(t2)) - ); - controller.onUpdate(updateBatch); - List deleteBatch = List.of( - new ReconcilableTopic(new Reconciliation("test", RESOURCE_KIND, NAMESPACE, TopicOperatorUtil.topicName(t1)), t1, TopicOperatorUtil.topicName(t1)) - ); - controller.onDelete(deleteBatch); - - String[] tags = new String[]{"kind", RESOURCE_KIND, "namespace", NAMESPACE}; - assertMetricMatches(TopicOperatorMetricsHolder.METRICS_RECONCILIATIONS, tags, "counter", is(2.0)); - assertMetricMatches(TopicOperatorMetricsHolder.METRICS_RECONCILIATIONS_SUCCESSFUL, tags, "counter", is(2.0)); - assertMetricMatches(TopicOperatorMetricsHolder.METRICS_RECONCILIATIONS_FAILED, tags, "counter", is(1.0)); - assertMetricMatches(TopicOperatorMetricsHolder.METRICS_RECONCILIATIONS_DURATION, tags, "timer", greaterThan(0.0)); - - assertMetricMatches(TopicOperatorMetricsHolder.METRICS_ADD_FINALIZER_DURATION, tags, "timer", greaterThan(0.0)); - assertMetricMatches(TopicOperatorMetricsHolder.METRICS_REMOVE_FINALIZER_DURATION, tags, "timer", greaterThan(0.0)); - assertMetricMatches(TopicOperatorMetricsHolder.METRICS_CREATE_TOPICS_DURATION, tags, "timer", greaterThan(0.0)); - assertMetricMatches(TopicOperatorMetricsHolder.METRICS_UPDATE_TOPICS_DURATION, tags, "timer", greaterThan(0.0)); - assertMetricMatches(TopicOperatorMetricsHolder.METRICS_LIST_REASSIGNMENTS_DURATION, tags, "timer", greaterThan(0.0)); - assertMetricMatches(TopicOperatorMetricsHolder.METRICS_ALTER_CONFIGS_DURATION, tags, "timer", greaterThan(0.0)); - assertMetricMatches(TopicOperatorMetricsHolder.METRICS_CREATE_PARTITIONS_DURATION, tags, "timer", greaterThan(0.0)); - assertMetricMatches(TopicOperatorMetricsHolder.METRICS_DESCRIBE_TOPICS_DURATION, tags, "timer", greaterThan(0.0)); - assertMetricMatches(TopicOperatorMetricsHolder.METRICS_DESCRIBE_CONFIGS_DURATION, tags, "timer", greaterThan(0.0)); - assertMetricMatches(TopicOperatorMetricsHolder.METRICS_DELETE_TOPICS_DURATION, tags, "timer", greaterThan(0.0)); - } + var controller = new BatchingTopicController(config, Map.of("key", "VALUE"), admin, kubeClient, metricsHolder, replicasChangeHandler); + + // create topics (success) + var t1 = createTopic("t1", 2, 1); + var t2 = createTopic("t2", 2, 1); + var t3 = createTopic("t3", 2, 1); + controller.onUpdate(List.of(reconcilableTopic(t1), reconcilableTopic(t2), reconcilableTopic(t3))); + + // update configuration + var t1ConfigChanged = updateTopic("t1", kt -> { + kt.getSpec().setConfig(Map.of("retention.ms", "86400000")); + return kt; + }); + controller.onUpdate(List.of(reconcilableTopic(t1ConfigChanged))); + + // increase partitions (success) + var t2PartIncreased = updateTopic("t2", kt -> { + kt.getSpec().setPartitions(5); + return kt; + }); + controller.onUpdate(List.of(reconcilableTopic(t2PartIncreased))); + + // decrease partitions (fail) + var t2PartDecreased = updateTopic("t2", kt -> { + kt.getSpec().setPartitions(4); + return kt; + }); + controller.onUpdate(List.of(reconcilableTopic(t2PartDecreased))); + + // increase replicas (fail) + var t2ReplIncreased = updateTopic("t2", kt -> { + kt.getSpec().setReplicas(2); + return kt; + }); + controller.onUpdate(List.of(reconcilableTopic(t2ReplIncreased))); + + // unmanage topic (success) + // should increase total and successful reconciliations metrics + var t1Unmanaged = updateTopic("t1", kt -> { + kt.getMetadata().setAnnotations(Map.of(TopicOperatorUtil.MANAGED, "false")); + return kt; + }); + controller.onUpdate(List.of(reconcilableTopic(t1Unmanaged))); + + // delete managed topics (success) + controller.onDelete(List.of(reconcilableTopic(t2))); - private KafkaTopic createResource(KubernetesClient client, String resourceName, String topicName) { - var kt = Crds.topicOperation(client). - resource(new KafkaTopicBuilder() + // delete unmanaged topic (success) + // should increase remove finalizer metric + var t3Unmanaged = updateTopic("t3", kt -> { + kt.getMetadata().setAnnotations(Map.of(TopicOperatorUtil.MANAGED, "false")); + return kt; + }); + controller.onDelete(List.of(reconcilableTopic(t3Unmanaged))); + + // check standard metrics + assertMetricMatches(TopicOperatorMetricsHolder.METRICS_RECONCILIATIONS, "counter", is(9.0)); + assertMetricMatches(TopicOperatorMetricsHolder.METRICS_RECONCILIATIONS_SUCCESSFUL, "counter", is(7.0)); + assertMetricMatches(TopicOperatorMetricsHolder.METRICS_RECONCILIATIONS_FAILED, "counter", is(2.0)); + assertMetricMatches(TopicOperatorMetricsHolder.METRICS_RECONCILIATIONS_DURATION, "timer", greaterThan(0.0)); + + // check additional metrics + assertMetricMatches(TopicOperatorMetricsHolder.METRICS_ADD_FINALIZER_DURATION, "timer", greaterThan(0.0)); + assertMetricMatches(TopicOperatorMetricsHolder.METRICS_REMOVE_FINALIZER_DURATION, "timer", greaterThan(0.0)); + assertMetricMatches(TopicOperatorMetricsHolder.METRICS_CREATE_TOPICS_DURATION, "timer", greaterThan(0.0)); + assertMetricMatches(TopicOperatorMetricsHolder.METRICS_UPDATE_TOPICS_DURATION, "timer", greaterThan(0.0)); + assertMetricMatches(TopicOperatorMetricsHolder.METRICS_LIST_REASSIGNMENTS_DURATION, "timer", greaterThan(0.0)); + assertMetricMatches(TopicOperatorMetricsHolder.METRICS_ALTER_CONFIGS_DURATION, "timer", greaterThan(0.0)); + assertMetricMatches(TopicOperatorMetricsHolder.METRICS_CREATE_PARTITIONS_DURATION, "timer", greaterThan(0.0)); + assertMetricMatches(TopicOperatorMetricsHolder.METRICS_DESCRIBE_TOPICS_DURATION, "timer", greaterThan(0.0)); + assertMetricMatches(TopicOperatorMetricsHolder.METRICS_DESCRIBE_CONFIGS_DURATION, "timer", greaterThan(0.0)); + assertMetricMatches(TopicOperatorMetricsHolder.METRICS_DELETE_TOPICS_DURATION, "timer", greaterThan(0.0)); + } + + private KafkaTopic createTopic(String name, int partitions, int replicas) { + return Crds.topicOperation(kubeClient).inNamespace(NAMESPACE). + resource(new KafkaTopicBuilder() .withNewMetadata() - .withName(resourceName) + .withName(name) .withNamespace(NAMESPACE) .addToLabels("key", "VALUE") .endMetadata() .withNewSpec() - .withTopicName(topicName) - .withPartitions(2) - .withReplicas(1) - .endSpec().build()).create(); - return kt; + .withPartitions(partitions) + .withReplicas(replicas) + .endSpec() + .build()).create(); } - - private static void assertMetricMatches(String name, String[] tags, String type, Matcher matcher) throws InterruptedException { - // wait some time because events are queued, and processing may be delayed - int timeoutSec = 120; - RequiredSearch requiredSearch = null; - while (requiredSearch == null && timeoutSec-- > 0) { + + private KafkaTopic updateTopic(String name, UnaryOperator changer) { + var kt = Crds.topicOperation(kubeClient).inNamespace(NAMESPACE).withName(name).get(); + return TopicOperatorTestUtil.modifyTopic(kubeClient, kt, changer); + } + + private ReconcilableTopic reconcilableTopic(KafkaTopic kafkaTopic) { + return new ReconcilableTopic(new Reconciliation("test", RESOURCE_KIND, NAMESPACE, + TopicOperatorUtil.topicName(kafkaTopic)), kafkaTopic, TopicOperatorUtil.topicName(kafkaTopic)); + } + + private void assertMetricMatches(String name, String type, Matcher matcher) throws InterruptedException { + var found = false; + var timeoutSec = 30; + while (!found && --timeoutSec > 0) { try { - requiredSearch = metrics.metricsProvider().meterRegistry().get(name).tags(tags); + LOGGER.info("Searching for metric {}", name); + var requiredSearch = metricsHolder.metricsProvider().meterRegistry().get(name) + .tags("kind", RESOURCE_KIND, "namespace", NAMESPACE); switch (type) { case "counter": assertThat(requiredSearch.counter().count(), matcher); @@ -231,12 +281,14 @@ private static void assertMetricMatches(String name, String[] tags, String type, default: throw new RuntimeException(format("Unknown metric type %s", type)); } + found = true; } catch (MeterNotFoundException mnfe) { + LOGGER.info("Metric {} not found", name); TimeUnit.SECONDS.sleep(1); } } - if (requiredSearch == null) { - throw new RuntimeException(format("Unable to find metric %s with tags %s", name, Arrays.toString(tags))); + if (!found) { + throw new RuntimeException(format("Unable to find metric %s", name)); } } }