diff --git a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java index 357c9ddb95a16..1ba18951d0f9a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java +++ b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java @@ -77,6 +77,10 @@ public class CommonClientConfigs { public static final String METRICS_RECORDING_LEVEL_CONFIG = "metrics.recording.level"; public static final String METRICS_RECORDING_LEVEL_DOC = "The highest recording level for metrics."; + public static final String METRICS_REPLACE_ON_DUPLICATE_CONFIG = "metrics.replace.on.duplicate"; + public static final String METRICS_REPLACE_ON_DUPLICATE_DOC = "If set to true, then multiple sensors registering the same metric will not throw, but will instead log an error. This makes sensor registration errors non fatal."; + + public static final String METRIC_REPORTER_CLASSES_CONFIG = "metric.reporters"; public static final String METRIC_REPORTER_CLASSES_DOC = "A list of classes to use as metrics reporters. Implementing the org.apache.kafka.common.metrics.MetricsReporter interface allows plugging in classes that will be notified of new metric creation. The JmxReporter is always included to register JMX statistics."; diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java index 058c491672a72..f29413239ba13 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java @@ -92,11 +92,13 @@ public class AdminClientConfig extends AbstractConfig { private static final String METRICS_SAMPLE_WINDOW_MS_DOC = CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_DOC; public static final String METRICS_RECORDING_LEVEL_CONFIG = CommonClientConfigs.METRICS_RECORDING_LEVEL_CONFIG; + public static final String METRICS_REPLACE_ON_DUPLICATE_CONFIG = CommonClientConfigs.METRICS_REPLACE_ON_DUPLICATE_CONFIG; public static final String SECURITY_PROTOCOL_CONFIG = CommonClientConfigs.SECURITY_PROTOCOL_CONFIG; public static final String DEFAULT_SECURITY_PROTOCOL = CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL; private static final String SECURITY_PROTOCOL_DOC = CommonClientConfigs.SECURITY_PROTOCOL_DOC; private static final String METRICS_RECORDING_LEVEL_DOC = CommonClientConfigs.METRICS_RECORDING_LEVEL_DOC; + private static final String METRICS_REPLACE_ON_DUPLICATE_DOC = CommonClientConfigs.METRICS_REPLACE_ON_DUPLICATE_DOC; public static final String RETRIES_CONFIG = CommonClientConfigs.RETRIES_CONFIG; @@ -158,6 +160,11 @@ public class AdminClientConfig extends AbstractConfig { in(Sensor.RecordingLevel.INFO.toString(), Sensor.RecordingLevel.DEBUG.toString()), Importance.LOW, METRICS_RECORDING_LEVEL_DOC) + .define(METRICS_REPLACE_ON_DUPLICATE_CONFIG, + Type.BOOLEAN, + false, + Importance.LOW, + METRICS_REPLACE_ON_DUPLICATE_DOC) // security support .define(SECURITY_PROTOCOL_CONFIG, Type.STRING, diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 8ddb0c08627ac..ab77b504489ba 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -344,6 +344,7 @@ static KafkaAdminClient createInternal(AdminClientConfig config, TimeoutProcesso .tags(metricTags); reporters.add(new JmxReporter(JMX_PREFIX)); metrics = new Metrics(metricConfig, reporters, time); + metrics.setReplaceOnDuplicateMetric(config.getBoolean(AdminClientConfig.METRICS_REPLACE_ON_DUPLICATE_CONFIG)); String metricGrpPrefix = "admin-client"; channelBuilder = ClientUtils.createChannelBuilder(config); selector = new Selector(config.getLong(AdminClientConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java index fd81efec5e9dd..98ba062141eab 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java @@ -197,6 +197,11 @@ public class ConsumerConfig extends AbstractConfig { */ public static final String METRIC_REPORTER_CLASSES_CONFIG = CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG; + /** + * metrics.replace.on.duplicate + */ + public static final String METRIC_REPLACE_ON_DUPLICATE_CONFIG = CommonClientConfigs.METRICS_REPLACE_ON_DUPLICATE_CONFIG; + /** * check.crcs */ @@ -404,6 +409,11 @@ public class ConsumerConfig extends AbstractConfig { new ConfigDef.NonNullValidator(), Importance.LOW, CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC) + .define(METRIC_REPLACE_ON_DUPLICATE_CONFIG, + Type.BOOLEAN, + false, + Importance.LOW, + CommonClientConfigs.METRICS_REPLACE_ON_DUPLICATE_DOC) .define(KEY_DESERIALIZER_CLASS_CONFIG, Type.CLASS, Importance.HIGH, diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 5e26d3afb2643..48e74311e4afb 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -681,6 +681,7 @@ private KafkaConsumer(ConsumerConfig config, MetricsReporter.class); reporters.add(new JmxReporter(JMX_PREFIX)); this.metrics = new Metrics(metricConfig, reporters, time); + this.metrics.setReplaceOnDuplicateMetric(config.getBoolean(ConsumerConfig.METRIC_REPLACE_ON_DUPLICATE_CONFIG)); this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); // load interceptors and make sure they get clientId diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 7e432f5f0589e..685ec26feb6d5 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -356,6 +356,7 @@ public KafkaProducer(Properties properties, Serializer keySerializer, Seriali config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class); reporters.add(new JmxReporter(JMX_PREFIX)); this.metrics = new Metrics(metricConfig, reporters, time); + this.metrics.setReplaceOnDuplicateMetric(config.getBoolean(ProducerConfig.METRICS_REPLACE_ON_DUPLICATE_CONFIG)); ProducerMetrics metricsRegistry = new ProducerMetrics(this.metrics); this.partitioner = config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class); long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java index d44dae481e1a4..ff98875c844e6 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java @@ -173,6 +173,11 @@ public class ProducerConfig extends AbstractConfig { */ public static final String METRICS_RECORDING_LEVEL_CONFIG = CommonClientConfigs.METRICS_RECORDING_LEVEL_CONFIG; + /** + * metrics.replace.on.duplicate + */ + public static final String METRICS_REPLACE_ON_DUPLICATE_CONFIG = CommonClientConfigs.METRICS_REPLACE_ON_DUPLICATE_CONFIG; + /** metric.reporters */ public static final String METRIC_REPORTER_CLASSES_CONFIG = CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG; @@ -290,6 +295,11 @@ public class ProducerConfig extends AbstractConfig { new ConfigDef.NonNullValidator(), Importance.LOW, CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC) + .define(METRICS_REPLACE_ON_DUPLICATE_CONFIG, + Type.BOOLEAN, + false, + Importance.LOW, + CommonClientConfigs.METRICS_REPLACE_ON_DUPLICATE_DOC) .define(MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, Type.INT, 5, diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java index a6da9f90397d2..286492ad3d273 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java @@ -75,6 +75,8 @@ public class Metrics implements Closeable { private final ScheduledThreadPoolExecutor metricsScheduler; private static final Logger log = LoggerFactory.getLogger(Metrics.class); + private volatile boolean replaceOnDuplicate = false; + /** * Create a metrics repository with no metric reporters and default configuration. * Expiration of Sensors is disabled. @@ -528,7 +530,7 @@ public synchronized KafkaMetric removeMetric(MetricName metricName) { try { reporter.metricRemoval(metric); } catch (Exception e) { - log.error("Error when removing metric from " + reporter.getClass().getName(), e); + log.error("Error when removing metric " + metricName + " from " + reporter.getClass().getName(), e); } } log.trace("Removed metric named {}", metricName); @@ -555,8 +557,16 @@ public synchronized void removeReporter(MetricsReporter reporter) { synchronized void registerMetric(KafkaMetric metric) { MetricName metricName = metric.metricName(); - if (this.metrics.containsKey(metricName)) - throw new IllegalArgumentException("A metric named '" + metricName + "' already exists, can't register another one."); + + if (this.metrics.containsKey(metricName)) { + if (replaceOnDuplicate) { + log.error("The metric " + metricName + " is being replaced since it had already been registered. Please file a bug report."); + removeMetric(metricName); + } else { + throw new IllegalArgumentException("A metric named '" + metricName + "' already exists, can't register another one."); + } + } + this.metrics.put(metricName, metric); for (MetricsReporter reporter : reporters) { try { @@ -631,6 +641,10 @@ public MetricName metricInstance(MetricNameTemplate template, Map