Skip to content

Commit

Permalink
Registers some common producer metrics in the Kafka buffer. (opensear…
Browse files Browse the repository at this point in the history
…ch-project#4139)

Signed-off-by: David Venable <[email protected]>
  • Loading branch information
dlvenable authored Feb 19, 2024
1 parent 4c0e944 commit 845569c
Show file tree
Hide file tree
Showing 3 changed files with 158 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.opensearch.dataprepper.plugins.kafka.service.TopicService;
import org.opensearch.dataprepper.plugins.kafka.service.TopicServiceFactory;
import org.opensearch.dataprepper.plugins.kafka.sink.DLQSink;
import org.opensearch.dataprepper.plugins.kafka.util.KafkaProducerMetrics;
import org.opensearch.dataprepper.plugins.kafka.util.KafkaSecurityConfigurer;
import org.opensearch.dataprepper.plugins.kafka.util.KafkaTopicProducerMetrics;
import org.opensearch.dataprepper.plugins.kafka.util.RestUtils;
Expand Down Expand Up @@ -80,12 +81,14 @@ public KafkaCustomProducer createProducer(final KafkaProducerConfig kafkaProduce
Serializer<Object> valueSerializer = (Serializer<Object>) serializationFactory.getSerializer(dataConfig);
final KafkaProducer<Object, Object> producer = new KafkaProducer<>(properties, keyDeserializer, valueSerializer);
final KafkaTopicProducerMetrics topicMetrics = new KafkaTopicProducerMetrics(topic.getName(), pluginMetrics, topicNameInMetrics);
KafkaProducerMetrics.registerProducer(pluginMetrics, producer);
final String topicName = ObjectUtils.isEmpty(kafkaProducerConfig.getTopic()) ? null : kafkaProducerConfig.getTopic().getName();
final SchemaService schemaService = new SchemaService.SchemaServiceBuilder().getFetchSchemaService(topicName, kafkaProducerConfig.getSchemaConfig()).build();
return new KafkaCustomProducer(producer,
kafkaProducerConfig, dlqSink,
expressionEvaluator, Objects.nonNull(sinkContext) ? sinkContext.getTagsTargetKey() : null, topicMetrics, schemaService);
}

private void prepareTopicAndSchema(final KafkaProducerConfig kafkaProducerConfig, final Integer maxRequestSize) {
checkTopicCreationCriteriaAndCreateTopic(kafkaProducerConfig, maxRequestSize);
final SchemaConfig schemaConfig = kafkaProducerConfig.getSchemaConfig();
Expand Down Expand Up @@ -114,7 +117,5 @@ private void checkTopicCreationCriteriaAndCreateTopic(final KafkaProducerConfig
topicService.createTopic(kafkaProducerConfig.getTopic().getName(), topic.getNumberOfPartitions(), topic.getReplicationFactor(), maxMessageBytes);
topicService.closeAdminClient();
}


}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.kafka.util;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.opensearch.dataprepper.metrics.PluginMetrics;

import java.util.Map;

/**
* Metrics for a Kafka producer. These span all topics, which makes it distinct from
* the {@link KafkaTopicProducerMetrics}.
*/
public final class KafkaProducerMetrics {
static final Map<String, String> METRICS_NAME_MAP = Map.of(
"record-queue-time-avg", "recordQueueTimeAvg",
"record-queue-time-max", "recordQueueTimeMax",
"buffer-exhausted-rate", "bufferExhaustedRate",
"buffer-available-bytes", "bufferAvailableBytes"
);

private KafkaProducerMetrics() { }

public static void registerProducer(final PluginMetrics pluginMetrics, final KafkaProducer<?, ?> kafkaProducer) {
final Map<MetricName, ? extends Metric> kafkaProducerMetrics = kafkaProducer.metrics();
for (final Map.Entry<MetricName, ? extends Metric> metricNameEntry : kafkaProducerMetrics.entrySet()) {
final MetricName kafkaMetricName = metricNameEntry.getKey();

final String dataPrepperMetricName = METRICS_NAME_MAP.get(kafkaMetricName.name());
if(dataPrepperMetricName == null)
continue;

final Metric metric = metricNameEntry.getValue();

pluginMetrics.gauge(dataPrepperMetricName, metric, m -> (double) m.metricValue());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.kafka.util;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.ArgumentsProvider;
import org.junit.jupiter.params.provider.ArgumentsSource;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.metrics.PluginMetrics;

import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
import java.util.function.ToDoubleFunction;
import java.util.stream.IntStream;
import java.util.stream.Stream;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.jupiter.params.provider.Arguments.arguments;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

@ExtendWith(MockitoExtension.class)
class KafkaProducerMetricsTest {
@Mock
private PluginMetrics pluginMetrics;
@Mock
private KafkaProducer<?, ?> kafkaProducer;

private Map<MetricName, Metric> kafkaMetricsMap;
private Map<String, Metric> knownMetricsMap;

@BeforeEach
void setUp() {
kafkaMetricsMap = new HashMap<>();
knownMetricsMap = new HashMap<>();
when(kafkaProducer.metrics()).thenReturn((Map) kafkaMetricsMap);

KafkaProducerMetrics.METRICS_NAME_MAP.keySet()
.stream()
.map(KafkaProducerMetricsTest::createKafkaMetric)
.forEach(metricName -> {
final Metric metric = mock(Metric.class);
knownMetricsMap.put(metricName.name(), metric);
kafkaMetricsMap.put(metricName, metric);
});
IntStream.range(0, 5)
.mapToObj(ignored -> UUID.randomUUID().toString())
.map(KafkaProducerMetricsTest::createKafkaMetric)
.forEach(metricName -> kafkaMetricsMap.put(metricName, mock(Metric.class)));
}

@Test
void registerProducer_creates_gauges_for_each_metric_from_the_map() {
KafkaProducerMetrics.registerProducer(pluginMetrics, kafkaProducer);

verify(pluginMetrics, times(KafkaProducerMetrics.METRICS_NAME_MAP.size())).gauge(anyString(), any(), any());
}

@ParameterizedTest
@ArgumentsSource(RegisteredMetricsArgumentsProvider.class)
void registerProduct_creates_expected_gauge(final String kafkaName, final String expectedDataPrepperName) {
KafkaProducerMetrics.registerProducer(pluginMetrics, kafkaProducer);

final Metric metric = knownMetricsMap.get(kafkaName);
final ArgumentCaptor<ToDoubleFunction> metricFunctionArgumentCaptor = ArgumentCaptor.forClass(ToDoubleFunction.class);

verify(pluginMetrics).gauge(eq(expectedDataPrepperName), eq(metric), metricFunctionArgumentCaptor.capture());

final ToDoubleFunction actualMetricDoubleFunction = metricFunctionArgumentCaptor.getValue();

final Random random = new Random();
final double metricValue = random.nextDouble();
when(metric.metricValue()).thenReturn(metricValue);
assertThat(actualMetricDoubleFunction.applyAsDouble(metric), equalTo(metricValue));
}

private static MetricName createKafkaMetric(final String name) {
final MetricName metricName = mock(MetricName.class);
when(metricName.name()).thenReturn(name);
return metricName;
}

static class RegisteredMetricsArgumentsProvider implements ArgumentsProvider {
@Override
public Stream<? extends Arguments> provideArguments(final ExtensionContext extensionContext) throws Exception {
return KafkaProducerMetrics.METRICS_NAME_MAP.entrySet()
.stream()
.map(entry -> arguments(entry.getKey(), entry.getValue()));
}
}
}

0 comments on commit 845569c

Please sign in to comment.