diff --git a/kafka-client/build.gradle.kts b/kafka-client/build.gradle.kts index 134e74b55..56d520bdf 100644 --- a/kafka-client/build.gradle.kts +++ b/kafka-client/build.gradle.kts @@ -67,6 +67,7 @@ tasks.publish { dependencies { api(libs.kafka.streams) + implementation(project(":controller-api")) implementation(libs.bundles.scylla) implementation(libs.mongodb.driver.sync) implementation(libs.bundles.otel) diff --git a/kafka-client/src/main/java/dev/responsive/kafka/api/ResponsiveKafkaStreams.java b/kafka-client/src/main/java/dev/responsive/kafka/api/ResponsiveKafkaStreams.java index a5fcfe6d1..b40e2a03e 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/api/ResponsiveKafkaStreams.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/api/ResponsiveKafkaStreams.java @@ -18,6 +18,7 @@ import static dev.responsive.kafka.api.config.ResponsiveConfig.CLIENT_ID_CONFIG; import static dev.responsive.kafka.api.config.ResponsiveConfig.CLIENT_SECRET_CONFIG; +import static dev.responsive.kafka.api.config.ResponsiveConfig.METRICS_ENABLED_CONFIG; import static dev.responsive.kafka.api.config.ResponsiveConfig.STORAGE_HOSTNAME_CONFIG; import static dev.responsive.kafka.api.config.ResponsiveConfig.TASK_ASSIGNOR_CLASS_OVERRIDE; import static dev.responsive.kafka.internal.metrics.ResponsiveMetrics.RESPONSIVE_METRICS_NAMESPACE; @@ -36,10 +37,12 @@ import dev.responsive.kafka.internal.db.DefaultCassandraClientFactory; import dev.responsive.kafka.internal.db.mongo.ResponsiveMongoClient; import dev.responsive.kafka.internal.metrics.ClientVersionMetadata; -import dev.responsive.kafka.internal.metrics.OtelMetricsService; import dev.responsive.kafka.internal.metrics.ResponsiveMetrics; import dev.responsive.kafka.internal.metrics.ResponsiveRestoreListener; import dev.responsive.kafka.internal.metrics.ResponsiveStateListener; +import dev.responsive.kafka.internal.metrics.exporter.MetricsExportService; +import dev.responsive.kafka.internal.metrics.exporter.NoopMetricsExporterService; +import dev.responsive.kafka.internal.metrics.exporter.otel.OtelMetricsService; import dev.responsive.kafka.internal.stores.ResponsiveStoreRegistry; import dev.responsive.kafka.internal.utils.SessionClients; import dev.responsive.kafka.internal.utils.SessionUtil; @@ -221,13 +224,11 @@ private static ResponsiveMetrics createMetrics( final StreamsConfig streamsConfig, final ResponsiveConfig responsiveConfig ) { - final OtelMetricsService otel; - if (responsiveConfig.getBoolean(ResponsiveConfig.METRICS_ENABLED_CONFIG)) { - otel = OtelMetricsService.create(streamsConfig, responsiveConfig); - } else { - otel = OtelMetricsService.noop(); - } - otel.start(); + final boolean metricsEnabled = responsiveConfig.getBoolean(METRICS_ENABLED_CONFIG); + final MetricsExportService exportService = metricsEnabled + ? OtelMetricsService.create(streamsConfig, responsiveConfig) + : new NoopMetricsExporterService(); + exportService.start(); final MetricConfig metricConfig = new MetricConfig() .samples(streamsConfig.getInt(METRICS_NUM_SAMPLES_CONFIG)) @@ -245,7 +246,7 @@ private static ResponsiveMetrics createMetrics( RESPONSIVE_METRICS_NAMESPACE, streamsConfig.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX) ) - ), otel); + ), exportService); } /** diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/metrics/ResponsiveMetrics.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/metrics/ResponsiveMetrics.java index 9563c35db..3f9f10ce3 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/metrics/ResponsiveMetrics.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/metrics/ResponsiveMetrics.java @@ -16,6 +16,8 @@ package dev.responsive.kafka.internal.metrics; +import dev.responsive.kafka.internal.metrics.exporter.MetricsExportService; +import dev.responsive.kafka.internal.metrics.exporter.NoopMetricsExporterService; import java.io.Closeable; import java.util.LinkedHashMap; import java.util.Map; @@ -48,12 +50,16 @@ public class ResponsiveMetrics implements Closeable { private static final Pattern GLOBAL_THREAD_REGEX = Pattern.compile(".*-(GlobalStreamThread+)"); private OrderedTagsSupplier orderedTagsSupplier; - private final OtelMetricsService otelService; + private final MetricsExportService exportService; private final Metrics metrics; - public ResponsiveMetrics(final Metrics metrics, final OtelMetricsService otelService) { + public ResponsiveMetrics(final Metrics metrics) { + this(metrics, new NoopMetricsExporterService()); + } + + public ResponsiveMetrics(final Metrics metrics, final MetricsExportService exportService) { this.metrics = metrics; - this.otelService = otelService; + this.exportService = exportService; } /** @@ -178,6 +184,6 @@ public void close() { LOG.warn("Not all metrics were cleaned up before close: {}", metrics().keySet()); } metrics.close(); - otelService.close(); + exportService.close(); } } diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/metrics/exporter/MetricsExportService.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/metrics/exporter/MetricsExportService.java new file mode 100644 index 000000000..3ed511604 --- /dev/null +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/metrics/exporter/MetricsExportService.java @@ -0,0 +1,28 @@ +/* + * Copyright 2023 Responsive Computing, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.responsive.kafka.internal.metrics.exporter; + +import java.io.Closeable; + +public interface MetricsExportService extends Closeable { + + void start(); + + @Override + void close(); + +} diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/metrics/exporter/NoopMetricsExporterService.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/metrics/exporter/NoopMetricsExporterService.java new file mode 100644 index 000000000..95d5c2974 --- /dev/null +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/metrics/exporter/NoopMetricsExporterService.java @@ -0,0 +1,31 @@ +/* + * Copyright 2023 Responsive Computing, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.responsive.kafka.internal.metrics.exporter; + +public class NoopMetricsExporterService implements MetricsExportService { + + @Override + public void start() { + + } + + @Override + public void close() { + + } + +} diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/metrics/OtelMetricsService.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/metrics/exporter/otel/OtelMetricsService.java similarity index 53% rename from kafka-client/src/main/java/dev/responsive/kafka/internal/metrics/OtelMetricsService.java rename to kafka-client/src/main/java/dev/responsive/kafka/internal/metrics/exporter/otel/OtelMetricsService.java index c00dd26a3..0861ff840 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/metrics/OtelMetricsService.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/metrics/exporter/otel/OtelMetricsService.java @@ -14,43 +14,68 @@ * limitations under the License. */ -package dev.responsive.kafka.internal.metrics; +package dev.responsive.kafka.internal.metrics.exporter.otel; import dev.responsive.kafka.api.config.ResponsiveConfig; import dev.responsive.kafka.internal.config.ConfigUtils; -import io.opentelemetry.api.OpenTelemetry; +import dev.responsive.kafka.internal.metrics.exporter.MetricsExportService; import io.opentelemetry.api.baggage.propagation.W3CBaggagePropagator; import io.opentelemetry.api.common.Attributes; import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator; import io.opentelemetry.context.propagation.ContextPropagators; import io.opentelemetry.context.propagation.TextMapPropagator; import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporter; +import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporterBuilder; import io.opentelemetry.instrumentation.jmx.engine.JmxMetricInsight; import io.opentelemetry.instrumentation.jmx.engine.MetricConfiguration; +import io.opentelemetry.instrumentation.jmx.yaml.JmxConfig; +import io.opentelemetry.instrumentation.jmx.yaml.JmxRule; import io.opentelemetry.instrumentation.jmx.yaml.RuleParser; import io.opentelemetry.sdk.OpenTelemetrySdk; import io.opentelemetry.sdk.metrics.SdkMeterProvider; import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader; import io.opentelemetry.sdk.resources.Resource; -import java.io.Closeable; import java.time.Duration; +import org.apache.kafka.common.config.types.Password; import org.apache.kafka.streams.StreamsConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import responsive.platform.auth.ApiKeyHeaders; -public class OtelMetricsService implements Closeable { +public class OtelMetricsService implements MetricsExportService { + + private static final Logger LOG = LoggerFactory.getLogger(OtelMetricsService.class); + + private static final String SERVICE_NAME_ATTR = "service.name"; + private static final String RESPONSIVE_APPLICATION_ID_ATTR = "responsiveApplicationId"; private final JmxMetricInsight metricInsight; - private final Runnable onClose; + private final OpenTelemetrySdk otel; public static OtelMetricsService create( final StreamsConfig streamsConfig, final ResponsiveConfig config ) { - final var exporter = OtlpGrpcMetricExporter - .builder() - .addHeader("api-key", config.getString(ResponsiveConfig.METRICS_API_KEY_CONFIG)) - .addHeader("secret", config.getString(ResponsiveConfig.METRICS_API_KEY_CONFIG)) - .setEndpoint(config.getString(ResponsiveConfig.CONTROLLER_ENDPOINT_CONFIG)) - .build(); + final OtlpGrpcMetricExporterBuilder builder = OtlpGrpcMetricExporter.builder(); + + final String apiKey = config.getString(ResponsiveConfig.METRICS_API_KEY_CONFIG); + final Password secret = config.getPassword(ResponsiveConfig.METRICS_SECRET_CONFIG); + if (secret == null ^ apiKey == null) { + throw new IllegalArgumentException(String.format( + "Invalid configuration, if configured to report metrics using %s, " + + "then values for both %s and %s must be provided.", + ResponsiveConfig.METRICS_ENABLED_CONFIG, + ResponsiveConfig.METRICS_API_KEY_CONFIG, + ResponsiveConfig.METRICS_SECRET_CONFIG + )); + } else if (secret != null) { + builder.addHeader(ApiKeyHeaders.API_KEY_METADATA_KEY, apiKey); + builder.addHeader(ApiKeyHeaders.SECRET_METADATA_KEY, secret.value()); + } + + builder.setEndpoint(config.getString(ResponsiveConfig.CONTROLLER_ENDPOINT_CONFIG)); + + final var exporter = builder.build(); final var metricReader = PeriodicMetricReader .builder(exporter) @@ -60,8 +85,8 @@ public static OtelMetricsService create( final var appId = ConfigUtils.responsiveAppId(streamsConfig, config); final var attributes = Attributes .builder() - .put("service.name", appId + "-otel") - .put("responsiveApplicationId", appId) + .put(SERVICE_NAME_ATTR, appId + "-otel") + .put(RESPONSIVE_APPLICATION_ID_ATTR, appId) .build(); final var meterProvider = SdkMeterProvider @@ -79,45 +104,46 @@ public static OtelMetricsService create( ))) .build(); - return new OtelMetricsService(otel, otel::close); + return new OtelMetricsService(otel); } - public static OtelMetricsService noop() { - return new OtelMetricsService(OpenTelemetry.noop(), () -> {}); - } - - private OtelMetricsService(final OpenTelemetry otel, final Runnable onClose) { - this.onClose = onClose; + private OtelMetricsService(final OpenTelemetrySdk otel) { + this.otel = otel; this.metricInsight = JmxMetricInsight.createService(otel, 0); } + @Override public void start() { this.metricInsight.start(buildMetricConfiguration()); } @Override public void close() { - onClose.run(); + otel.close(); } - private static void buildFromUserRules(MetricConfiguration conf) { - RuleParser parserInstance = RuleParser.get(); + private static MetricConfiguration buildMetricConfiguration() { + MetricConfiguration metricConfiguration = new MetricConfiguration(); + addRulesFromJmxConfig(metricConfiguration); + return metricConfiguration; + } + + private static void addRulesFromJmxConfig(MetricConfiguration conf) { + final RuleParser parserInstance = RuleParser.get(); final ClassLoader loader = OtelMetricsService.class.getClassLoader(); // TODO(agavra): instead of including otel-jmx.config.yaml as a resource we should // fetch it from the Responsive controller on start-up - try (final var is = loader.getResourceAsStream("otel-jmx.config.yaml")) { - parserInstance.addMetricDefsTo(conf, is, "responsive builtin config"); - } catch (Exception e) { - // yaml parsing errors are caught and logged inside addMetricDefsTo - // only file access related exceptions are expected here - JmxMetricInsight.getLogger().warning(e.toString()); + try (final var inputStream = loader.getResourceAsStream("otel-jmx.config.yaml")) { + final JmxConfig jmxConfig = parserInstance.loadConfig(inputStream); + LOG.info("Found {} metric rules", jmxConfig.getRules().size()); + + for (final JmxRule rule : jmxConfig.getRules()) { + conf.addMetricDef(rule.buildMetricDef()); + } + } catch (final Exception e) { + LOG.error("Unable to load rules from otel-jmx.config.yaml!", e); + throw new IllegalStateException("Unable to load rules from otel-jmx.config.yaml.", e); } } - - private static MetricConfiguration buildMetricConfiguration() { - MetricConfiguration metricConfiguration = new MetricConfiguration(); - buildFromUserRules(metricConfiguration); - return metricConfiguration; - } } diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/metrics/EndOffsetsPollerTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/metrics/EndOffsetsPollerTest.java index e69cd94da..725ed94d4 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/internal/metrics/EndOffsetsPollerTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/metrics/EndOffsetsPollerTest.java @@ -91,7 +91,7 @@ public void setup() { lenient().when(executor.scheduleAtFixedRate(any(), anyLong(), anyLong(), any())) .thenReturn((ScheduledFuture) pollFuture); - final var responsiveMetrics = new ResponsiveMetrics(metrics, OtelMetricsService.noop()); + final var responsiveMetrics = new ResponsiveMetrics(metrics); responsiveMetrics.initializeTags( GROUP, CLIENT, new ClientVersionMetadata("1", "abc", "2", "dfe"), Collections.emptyMap()); diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/metrics/MetricPublishingCommitListenerTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/metrics/MetricPublishingCommitListenerTest.java index fa61359ea..a05d06b8e 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/internal/metrics/MetricPublishingCommitListenerTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/metrics/MetricPublishingCommitListenerTest.java @@ -64,8 +64,7 @@ class MetricPublishingCommitListenerTest { @BeforeEach public void setup() { - final ResponsiveMetrics responsiveMetrics - = new ResponsiveMetrics(metrics, OtelMetricsService.noop()); + final ResponsiveMetrics responsiveMetrics = new ResponsiveMetrics(metrics); responsiveMetrics.initializeTags( GROUP, CLIENT, new ClientVersionMetadata("1", "abc", "2", "dfe"), Collections.emptyMap()); listener = new MetricPublishingCommitListener(responsiveMetrics, THREAD_ID, offsetRecorder); diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/metrics/ResponsiveRestoreListenerTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/metrics/ResponsiveRestoreListenerTest.java index 5c48f80b2..1f96737f1 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/internal/metrics/ResponsiveRestoreListenerTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/metrics/ResponsiveRestoreListenerTest.java @@ -68,7 +68,7 @@ public class ResponsiveRestoreListenerTest { @BeforeEach public void setup() { - responsiveMetrics = new ResponsiveMetrics(metrics, OtelMetricsService.noop()); + responsiveMetrics = new ResponsiveMetrics(metrics); responsiveMetrics.initializeTags( APP_ID, CLIENT_ID, diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/stores/CommitBufferTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/stores/CommitBufferTest.java index 7b9975bb1..0d85292de 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/internal/stores/CommitBufferTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/stores/CommitBufferTest.java @@ -58,7 +58,6 @@ import dev.responsive.kafka.internal.db.partitioning.SubPartitioner; import dev.responsive.kafka.internal.db.spec.BaseTableSpec; import dev.responsive.kafka.internal.metrics.ClientVersionMetadata; -import dev.responsive.kafka.internal.metrics.OtelMetricsService; import dev.responsive.kafka.internal.metrics.ResponsiveMetrics; import dev.responsive.kafka.internal.utils.ExceptionSupplier; import dev.responsive.kafka.internal.utils.SessionClients; @@ -174,7 +173,7 @@ public void before( Optional.of(client), admin ); - final var responsiveMetrics = new ResponsiveMetrics(metrics, OtelMetricsService.noop()); + final var responsiveMetrics = new ResponsiveMetrics(metrics); responsiveMetrics.initializeTags( "commit-buffer-test-app", "commit-buffer-test-app-node-1", diff --git a/responsive-test-utils/src/main/java/dev/responsive/kafka/internal/stores/TTDRestoreListener.java b/responsive-test-utils/src/main/java/dev/responsive/kafka/internal/stores/TTDRestoreListener.java index 159eb2083..a9dadfbc9 100644 --- a/responsive-test-utils/src/main/java/dev/responsive/kafka/internal/stores/TTDRestoreListener.java +++ b/responsive-test-utils/src/main/java/dev/responsive/kafka/internal/stores/TTDRestoreListener.java @@ -17,7 +17,6 @@ package dev.responsive.kafka.internal.stores; import dev.responsive.kafka.internal.metrics.ClientVersionMetadata; -import dev.responsive.kafka.internal.metrics.OtelMetricsService; import dev.responsive.kafka.internal.metrics.ResponsiveMetrics; import dev.responsive.kafka.internal.metrics.ResponsiveRestoreListener; import java.util.Collections; @@ -36,7 +35,7 @@ public TTDRestoreListener(final ResponsiveMetrics metrics) { public static TTDRestoreListener mockRestoreListener(final Properties props) { final String appId = props.getProperty(StreamsConfig.APPLICATION_ID_CONFIG); - final var metrics = new ResponsiveMetrics(new Metrics(), OtelMetricsService.noop()); + final var metrics = new ResponsiveMetrics(new Metrics()); metrics.initializeTags( appId, appId + "-client",