Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Topic Operator metrics tests #10130

Merged
merged 1 commit into from
May 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,7 @@ private void updateInternal(List<ReconcilableTopic> topics) {
var partitionedByManaged = remainingAfterDeletions.stream().collect(Collectors.partitioningBy(reconcilableTopic -> TopicOperatorUtil.isManaged(reconcilableTopic.kt())));
var unmanaged = partitionedByManaged.get(false);
addOrRemoveFinalizer(useFinalizer, unmanaged).forEach(rt -> putResult(results, rt, Either.ofRight(null)));
metrics.reconciliationsCounter(namespace).increment(unmanaged.size());

// skip reconciliation of paused KafkaTopics
var partitionedByPaused = validateManagedTopics(partitionedByManaged).stream().filter(hasTopicSpec)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,7 @@ public class ReplicasChangeHandler {
private final ExecutorService httpClientExecutor;
private final ObjectMapper mapper;

/**
* Create a new replicas change client instance.
*
* @param config Topic Operator configuration.
*/
public ReplicasChangeHandler(TopicOperatorConfig config) {
ReplicasChangeHandler(TopicOperatorConfig config) {
this.config = config;
this.httpClientExecutor = Executors.newCachedThreadPool();
this.mapper = new ObjectMapper();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,50 +27,50 @@ public class TopicOperatorMetricsHolder extends MetricsHolder {
*/
public static final String METRICS_RECONCILIATIONS_MAX_BATCH_SIZE = METRICS_RECONCILIATIONS + ".max.batch.size";
/**
* Metric name for add finalizer duration.
* Metric name for Kubernetes add finalizer duration.
*/
public static final String METRICS_ADD_FINALIZER_DURATION = METRICS_PREFIX + "add.finalizer.duration";
/**
* Metric name for removing finalizer duration.
* Metric name for Kubernetes removing finalizer duration.
*/
public static final String METRICS_REMOVE_FINALIZER_DURATION = METRICS_PREFIX + "remove.finalizer.duration";
/**
* Metric name for create topics duration.
* Metric name for Kafka create topics duration.
*/
public static final String METRICS_CREATE_TOPICS_DURATION = METRICS_PREFIX + "create.topics.duration";
/**
* Metric name for update status duration.
* Metric name for Kubernetes update status duration.
*/
public static final String METRICS_UPDATE_TOPICS_DURATION = METRICS_PREFIX + "update.status.duration";
/**
* Metric name for list reassignments duration.
* Metric name for Kafka list reassignments duration.
*/
public static final String METRICS_LIST_REASSIGNMENTS_DURATION = METRICS_PREFIX + "list.reassignments.duration";
/**
* Metric name for alter configs duration.
* Metric name for Kafka alter configs duration.
*/
public static final String METRICS_ALTER_CONFIGS_DURATION = METRICS_PREFIX + "alter.configs.duration";
/**
* Metric name for create partitions duration.
* Metric name for Kafka create partitions duration.
*/
public static final String METRICS_CREATE_PARTITIONS_DURATION = METRICS_PREFIX + "create.partitions.duration";
/**
* Metric name for describe topics duration.
* Metric name for Kafka describe topics duration.
*/
public static final String METRICS_DESCRIBE_TOPICS_DURATION = METRICS_PREFIX + "describe.topics.duration";
/**
* Metric name for describe configs duration.
* Metric name for Kafka describe configs duration.
*/
public static final String METRICS_DESCRIBE_CONFIGS_DURATION = METRICS_PREFIX + "describe.configs.duration";
/**
* Metric name for delete topics duration.
* Metric name for Kafka delete topics duration.
*/
public static final String METRICS_DELETE_TOPICS_DURATION = METRICS_PREFIX + "delete.topics.duration";

private final Map<MetricKey, AtomicInteger> reconciliationsMaxQueueMap = new ConcurrentHashMap<>(1);
private final Map<MetricKey, AtomicInteger> reconciliationsMaxBatchMap = new ConcurrentHashMap<>(1);

// additional metrics, useful for tuning or monitoring specific internal operations
// additional metrics useful for tuning or monitoring specific internal requests
private final Map<MetricKey, Timer> addFinalizerTimerMap = new ConcurrentHashMap<>(1);
private final Map<MetricKey, Timer> removeFinalizerTimerMap = new ConcurrentHashMap<>(1);
private final Map<MetricKey, Timer> createTopicsTimerMap = new ConcurrentHashMap<>(1);
Expand All @@ -85,9 +85,9 @@ public class TopicOperatorMetricsHolder extends MetricsHolder {
/**
* Constructs the operator metrics holder.
*
* @param kind Kind of the resources for which these metrics apply
* @param selectorLabels Selector labels to select the controller resources
* @param metricsProvider Topic Operator metrics provider
* @param kind Kind of the resources for which these metrics apply.
* @param selectorLabels Selector labels to select the controller resources.
* @param metricsProvider Topic Operator metrics provider.
*/
public TopicOperatorMetricsHolder(String kind,
Labels selectorLabels,
Expand All @@ -99,13 +99,12 @@ public TopicOperatorMetricsHolder(String kind,
* Creates or gets a fine-grained timer-type metric.
* This can be used to measure the duration of internal operations.
*
* @param namespace Namespace of the resources
* @param metricName Name of the metric
* @param metricHelp Help description of the metric
* @param selectorLabels Selector labels to select the controller resources
* @param timerMap Map with timers
*
* @return Timer metric
* @param namespace Namespace of the resources.
* @param metricName Name of the metric.
* @param metricHelp Help description of the metric.
* @param selectorLabels Selector labels to select the controller resources.
* @param timerMap Map with timers.
* @return Timer metric.
*/
private Timer getFineGrainedTimer(String namespace, String metricName, String metricHelp, Optional<String> selectorLabels,
Map<MetricKey, Timer> timerMap) {
Expand All @@ -116,9 +115,8 @@ private Timer getFineGrainedTimer(String namespace, String metricName, String me
/**
* Gauge metric for the max size recorded for the event queue.
*
* @param namespace Namespace of the resources being reconciled
*
* @return Metrics gauge
* @param namespace Namespace of the resources being reconciled.
* @return Metrics gauge.
*/
public AtomicInteger reconciliationsMaxQueueSize(String namespace) {
return getGauge(new MetricKey(kind, namespace), METRICS_RECONCILIATIONS_MAX_QUEUE_SIZE,
Expand All @@ -129,9 +127,8 @@ public AtomicInteger reconciliationsMaxQueueSize(String namespace) {
/**
* Gauge metric for the max size recorded for the event batch.
*
* @param namespace Namespace of the resources being reconciled
*
* @return Metrics gauge
* @param namespace Namespace of the resources being reconciled.
* @return Metrics gauge.
*/
public AtomicInteger reconciliationsMaxBatchSize(String namespace) {
return getGauge(new MetricKey(kind, namespace), METRICS_RECONCILIATIONS_MAX_BATCH_SIZE,
Expand All @@ -140,132 +137,122 @@ public AtomicInteger reconciliationsMaxBatchSize(String namespace) {
}

/**
* Timer which measures how long the addFinalizer Kubernetes operations take.
*
* @param namespace Namespace of the resources being reconciled
* Timer which measures how long the Kubernetes add finalizer request takes to complete.
*
* @return Metrics timer
* @param namespace Namespace of the resources being reconciled.
* @return Metrics timer.
*/
public Timer addFinalizerTimer(String namespace) {
return getFineGrainedTimer(namespace, METRICS_ADD_FINALIZER_DURATION,
"The time the addFinalizer Kubernetes operation takes to complete",
"The time Kubernetes addFinalizer request takes to complete",
Optional.of(getLabelSelectorValues()), addFinalizerTimerMap);
}

/**
* Timer which measures how long the removeFinalizer Kubernetes operations take.
* Timer which measures how long the Kubernetes remove finalizer request takes to complete.
*
* @param namespace Namespace of the resources being reconciled
*
* @return Metrics timer
* @param namespace Namespace of the resources being reconciled.
* @return Metrics timer.
*/
public Timer removeFinalizerTimer(String namespace) {
return getFineGrainedTimer(namespace, METRICS_REMOVE_FINALIZER_DURATION,
"The time the removeFinalizer Kubernetes operation takes to complete",
"The time Kubernetes removeFinalizer request takes to complete",
Optional.of(getLabelSelectorValues()), removeFinalizerTimerMap);
}

/**
* Timer which measures how long the createTopics Kafka operations take.
*
* @param namespace Namespace of the resources being reconciled
* Timer which measures how long the Kafka createTopics request takes to complete.
*
* @return Metrics timer
* @param namespace Namespace of the resources being reconciled.
* @return Metrics timer.
*/
public Timer createTopicsTimer(String namespace) {
return getFineGrainedTimer(namespace, METRICS_CREATE_TOPICS_DURATION,
"The time the createTopics Kafka operation takes to complete",
"The time Kafka createTopics request takes to complete",
Optional.of(getLabelSelectorValues()), createTopicsTimerMap);
}

/**
* Timer which measures how long the updateStatus Kubernetes operations take.
*
* @param namespace Namespace of the resources being reconciled
*
* @return Metrics timer
* Timer which measures how long the Kubernetes updateStatus request takes to complete.
*
* @param namespace Namespace of the resources being reconciled.
* @return Metrics timer.
*/
public Timer updateStatusTimer(String namespace) {
return getFineGrainedTimer(namespace, METRICS_UPDATE_TOPICS_DURATION,
"The time the updateStatus Kubernetes operation takes to complete",
"The time Kubernetes updateStatus request takes to complete",
Optional.of(getLabelSelectorValues()), updateStatusTimerMap);
}

/**
* Timer which measures how long the listReassignments Kafka operations take.
*
* @param namespace Namespace of the resources being reconciled
* Timer which measures how long the Kafka listPartitionReassignments request takes to complete.
*
* @return Metrics timer
* @param namespace Namespace of the resources being reconciled.
* @return Metrics timer.
*/
public Timer listReassignmentsTimer(String namespace) {
return getFineGrainedTimer(namespace, METRICS_LIST_REASSIGNMENTS_DURATION,
"The time the listPartitionReassignments Kafka operation takes to complete",
"The time Kafka listPartitionReassignments request takes to complete",
Optional.of(getLabelSelectorValues()), listReassignmentsTimerMap);
}

/**
* Timer which measures how long the incrementalAlterConfigs Kafka operations take.
* Timer which measures how long the Kafka incrementalAlterConfigs request takes to complete.
*
* @param namespace Namespace of the resources being reconciled
*
* @return Metrics timer
* @param namespace Namespace of the resources being reconciled.
* @return Metrics timer.
*/
public Timer alterConfigsTimer(String namespace) {
return getFineGrainedTimer(namespace, METRICS_ALTER_CONFIGS_DURATION,
"The time the incrementalAlterConfigs Kafka operation takes to complete",
"The time Kafka incrementalAlterConfigs request takes to complete",
Optional.of(getLabelSelectorValues()), alterConfigsTimerMap);
}

/**
* Timer which measures how long the createPartitions Kafka operations take.
*
* @param namespace Namespace of the resources being reconciled
* Timer which measures how long the Kafka createPartitions request takes to complete.
*
* @return Metrics timer
* @param namespace Namespace of the resources being reconciled.
* @return Metrics timer.
*/
public Timer createPartitionsTimer(String namespace) {
return getFineGrainedTimer(namespace, METRICS_CREATE_PARTITIONS_DURATION,
"The time the createPartitions Kafka operation takes to complete",
"The time Kafka createPartitions request takes to complete",
Optional.of(getLabelSelectorValues()), createPartitionsTimerMap);
}

/**
* Timer which measures how long the describeTopics Kafka operations take.
* Timer which measures how long the Kafka describeTopics request takes to complete.
*
* @param namespace Namespace of the resources being reconciled
*
* @return Metrics timer
* @param namespace Namespace of the resources being reconciled.
* @return Metrics timer.
*/
public Timer describeTopicsTimer(String namespace) {
return getFineGrainedTimer(namespace, METRICS_DESCRIBE_TOPICS_DURATION,
"The time the describeTopics Kafka operation takes to complete",
"The time Kafka describeTopics request takes to complete",
Optional.of(getLabelSelectorValues()), describeTopicsTimerMap);
}

/**
* Timer which measures how long the describeConfigs Kafka operations take.
*
* @param namespace Namespace of the resources being reconciled
* Timer which measures how long the Kafka describeConfigs request takes to complete.
*
* @return Metrics timer
* @param namespace Namespace of the resources being reconciled.
* @return Metrics timer.
*/
public Timer describeConfigsTimer(String namespace) {
return getFineGrainedTimer(namespace, METRICS_DESCRIBE_CONFIGS_DURATION,
"The time the describeConfigs Kafka operation takes to complete",
"The time Kafka describeConfigs request takes to complete",
Optional.of(getLabelSelectorValues()), describeConfigsTimerMap);
}

/**
* Timer which measures how long the deleteTopics Kafka operations take.
*
* @param namespace Namespace of the resources being reconciled
* Timer which measures how long the Kafka deleteTopics request takes to complete.
*
* @return Metrics timer
* @param namespace Namespace of the resources being reconciled.
* @return Metrics timer.
*/
public Timer deleteTopicsTimer(String namespace) {
return getFineGrainedTimer(namespace, METRICS_DELETE_TOPICS_DURATION,
"The time the deleteTopics Kafka operation takes to complete",
"The time Kafka deleteTopics request takes to complete",
Optional.of(getLabelSelectorValues()), deleteTopicsTimerMap);
}
}
}
Loading
Loading