diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaCustomProducerFactory.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaCustomProducerFactory.java index d41cd7e848..72c0694b41 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaCustomProducerFactory.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaCustomProducerFactory.java @@ -27,6 +27,7 @@ import org.opensearch.dataprepper.plugins.kafka.service.TopicService; import org.opensearch.dataprepper.plugins.kafka.service.TopicServiceFactory; import org.opensearch.dataprepper.plugins.kafka.sink.DLQSink; +import org.opensearch.dataprepper.plugins.kafka.util.KafkaProducerMetrics; import org.opensearch.dataprepper.plugins.kafka.util.KafkaSecurityConfigurer; import org.opensearch.dataprepper.plugins.kafka.util.KafkaTopicProducerMetrics; import org.opensearch.dataprepper.plugins.kafka.util.RestUtils; @@ -80,12 +81,14 @@ public KafkaCustomProducer createProducer(final KafkaProducerConfig kafkaProduce Serializer valueSerializer = (Serializer) serializationFactory.getSerializer(dataConfig); final KafkaProducer producer = new KafkaProducer<>(properties, keyDeserializer, valueSerializer); final KafkaTopicProducerMetrics topicMetrics = new KafkaTopicProducerMetrics(topic.getName(), pluginMetrics, topicNameInMetrics); + KafkaProducerMetrics.registerProducer(pluginMetrics, producer); final String topicName = ObjectUtils.isEmpty(kafkaProducerConfig.getTopic()) ? null : kafkaProducerConfig.getTopic().getName(); final SchemaService schemaService = new SchemaService.SchemaServiceBuilder().getFetchSchemaService(topicName, kafkaProducerConfig.getSchemaConfig()).build(); return new KafkaCustomProducer(producer, kafkaProducerConfig, dlqSink, expressionEvaluator, Objects.nonNull(sinkContext) ? sinkContext.getTagsTargetKey() : null, topicMetrics, schemaService); } + private void prepareTopicAndSchema(final KafkaProducerConfig kafkaProducerConfig, final Integer maxRequestSize) { checkTopicCreationCriteriaAndCreateTopic(kafkaProducerConfig, maxRequestSize); final SchemaConfig schemaConfig = kafkaProducerConfig.getSchemaConfig(); @@ -114,7 +117,5 @@ private void checkTopicCreationCriteriaAndCreateTopic(final KafkaProducerConfig topicService.createTopic(kafkaProducerConfig.getTopic().getName(), topic.getNumberOfPartitions(), topic.getReplicationFactor(), maxMessageBytes); topicService.closeAdminClient(); } - - } } diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaProducerMetrics.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaProducerMetrics.java new file mode 100644 index 0000000000..478b743623 --- /dev/null +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaProducerMetrics.java @@ -0,0 +1,43 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.kafka.util; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; +import org.opensearch.dataprepper.metrics.PluginMetrics; + +import java.util.Map; + +/** + * Metrics for a Kafka producer. These span all topics, which makes it distinct from + * the {@link KafkaTopicProducerMetrics}. + */ +public final class KafkaProducerMetrics { + static final Map METRICS_NAME_MAP = Map.of( + "record-queue-time-avg", "recordQueueTimeAvg", + "record-queue-time-max", "recordQueueTimeMax", + "buffer-exhausted-rate", "bufferExhaustedRate", + "buffer-available-bytes", "bufferAvailableBytes" + ); + + private KafkaProducerMetrics() { } + + public static void registerProducer(final PluginMetrics pluginMetrics, final KafkaProducer kafkaProducer) { + final Map kafkaProducerMetrics = kafkaProducer.metrics(); + for (final Map.Entry metricNameEntry : kafkaProducerMetrics.entrySet()) { + final MetricName kafkaMetricName = metricNameEntry.getKey(); + + final String dataPrepperMetricName = METRICS_NAME_MAP.get(kafkaMetricName.name()); + if(dataPrepperMetricName == null) + continue; + + final Metric metric = metricNameEntry.getValue(); + + pluginMetrics.gauge(dataPrepperMetricName, metric, m -> (double) m.metricValue()); + } + } +} diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaProducerMetricsTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaProducerMetricsTest.java new file mode 100644 index 0000000000..adcd89d3d7 --- /dev/null +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaProducerMetricsTest.java @@ -0,0 +1,112 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.kafka.util; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.ArgumentsProvider; +import org.junit.jupiter.params.provider.ArgumentsSource; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.metrics.PluginMetrics; + +import java.util.HashMap; +import java.util.Map; +import java.util.Random; +import java.util.UUID; +import java.util.function.ToDoubleFunction; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.junit.jupiter.params.provider.Arguments.arguments; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class KafkaProducerMetricsTest { + @Mock + private PluginMetrics pluginMetrics; + @Mock + private KafkaProducer kafkaProducer; + + private Map kafkaMetricsMap; + private Map knownMetricsMap; + + @BeforeEach + void setUp() { + kafkaMetricsMap = new HashMap<>(); + knownMetricsMap = new HashMap<>(); + when(kafkaProducer.metrics()).thenReturn((Map) kafkaMetricsMap); + + KafkaProducerMetrics.METRICS_NAME_MAP.keySet() + .stream() + .map(KafkaProducerMetricsTest::createKafkaMetric) + .forEach(metricName -> { + final Metric metric = mock(Metric.class); + knownMetricsMap.put(metricName.name(), metric); + kafkaMetricsMap.put(metricName, metric); + }); + IntStream.range(0, 5) + .mapToObj(ignored -> UUID.randomUUID().toString()) + .map(KafkaProducerMetricsTest::createKafkaMetric) + .forEach(metricName -> kafkaMetricsMap.put(metricName, mock(Metric.class))); + } + + @Test + void registerProducer_creates_gauges_for_each_metric_from_the_map() { + KafkaProducerMetrics.registerProducer(pluginMetrics, kafkaProducer); + + verify(pluginMetrics, times(KafkaProducerMetrics.METRICS_NAME_MAP.size())).gauge(anyString(), any(), any()); + } + + @ParameterizedTest + @ArgumentsSource(RegisteredMetricsArgumentsProvider.class) + void registerProduct_creates_expected_gauge(final String kafkaName, final String expectedDataPrepperName) { + KafkaProducerMetrics.registerProducer(pluginMetrics, kafkaProducer); + + final Metric metric = knownMetricsMap.get(kafkaName); + final ArgumentCaptor metricFunctionArgumentCaptor = ArgumentCaptor.forClass(ToDoubleFunction.class); + + verify(pluginMetrics).gauge(eq(expectedDataPrepperName), eq(metric), metricFunctionArgumentCaptor.capture()); + + final ToDoubleFunction actualMetricDoubleFunction = metricFunctionArgumentCaptor.getValue(); + + final Random random = new Random(); + final double metricValue = random.nextDouble(); + when(metric.metricValue()).thenReturn(metricValue); + assertThat(actualMetricDoubleFunction.applyAsDouble(metric), equalTo(metricValue)); + } + + private static MetricName createKafkaMetric(final String name) { + final MetricName metricName = mock(MetricName.class); + when(metricName.name()).thenReturn(name); + return metricName; + } + + static class RegisteredMetricsArgumentsProvider implements ArgumentsProvider { + @Override + public Stream provideArguments(final ExtensionContext extensionContext) throws Exception { + return KafkaProducerMetrics.METRICS_NAME_MAP.entrySet() + .stream() + .map(entry -> arguments(entry.getKey(), entry.getValue())); + } + } +} \ No newline at end of file