Skip to content

Commit

Permalink
follow up for agent-free metrics (#193)
Browse files Browse the repository at this point in the history
  • Loading branch information
agavra authored Dec 4, 2023
1 parent 3cb8cd7 commit 6139222
Show file tree
Hide file tree
Showing 11 changed files with 146 additions and 56 deletions.
1 change: 1 addition & 0 deletions kafka-client/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ tasks.publish {
dependencies {
api(libs.kafka.streams)

implementation(project(":controller-api"))
implementation(libs.bundles.scylla)
implementation(libs.mongodb.driver.sync)
implementation(libs.bundles.otel)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import static dev.responsive.kafka.api.config.ResponsiveConfig.CLIENT_ID_CONFIG;
import static dev.responsive.kafka.api.config.ResponsiveConfig.CLIENT_SECRET_CONFIG;
import static dev.responsive.kafka.api.config.ResponsiveConfig.METRICS_ENABLED_CONFIG;
import static dev.responsive.kafka.api.config.ResponsiveConfig.STORAGE_HOSTNAME_CONFIG;
import static dev.responsive.kafka.api.config.ResponsiveConfig.TASK_ASSIGNOR_CLASS_OVERRIDE;
import static dev.responsive.kafka.internal.metrics.ResponsiveMetrics.RESPONSIVE_METRICS_NAMESPACE;
Expand All @@ -36,10 +37,12 @@
import dev.responsive.kafka.internal.db.DefaultCassandraClientFactory;
import dev.responsive.kafka.internal.db.mongo.ResponsiveMongoClient;
import dev.responsive.kafka.internal.metrics.ClientVersionMetadata;
import dev.responsive.kafka.internal.metrics.OtelMetricsService;
import dev.responsive.kafka.internal.metrics.ResponsiveMetrics;
import dev.responsive.kafka.internal.metrics.ResponsiveRestoreListener;
import dev.responsive.kafka.internal.metrics.ResponsiveStateListener;
import dev.responsive.kafka.internal.metrics.exporter.MetricsExportService;
import dev.responsive.kafka.internal.metrics.exporter.NoopMetricsExporterService;
import dev.responsive.kafka.internal.metrics.exporter.otel.OtelMetricsService;
import dev.responsive.kafka.internal.stores.ResponsiveStoreRegistry;
import dev.responsive.kafka.internal.utils.SessionClients;
import dev.responsive.kafka.internal.utils.SessionUtil;
Expand Down Expand Up @@ -221,13 +224,11 @@ private static ResponsiveMetrics createMetrics(
final StreamsConfig streamsConfig,
final ResponsiveConfig responsiveConfig
) {
final OtelMetricsService otel;
if (responsiveConfig.getBoolean(ResponsiveConfig.METRICS_ENABLED_CONFIG)) {
otel = OtelMetricsService.create(streamsConfig, responsiveConfig);
} else {
otel = OtelMetricsService.noop();
}
otel.start();
final boolean metricsEnabled = responsiveConfig.getBoolean(METRICS_ENABLED_CONFIG);
final MetricsExportService exportService = metricsEnabled
? OtelMetricsService.create(streamsConfig, responsiveConfig)
: new NoopMetricsExporterService();
exportService.start();

final MetricConfig metricConfig = new MetricConfig()
.samples(streamsConfig.getInt(METRICS_NUM_SAMPLES_CONFIG))
Expand All @@ -245,7 +246,7 @@ private static ResponsiveMetrics createMetrics(
RESPONSIVE_METRICS_NAMESPACE,
streamsConfig.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX)
)
), otel);
), exportService);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package dev.responsive.kafka.internal.metrics;

import dev.responsive.kafka.internal.metrics.exporter.MetricsExportService;
import dev.responsive.kafka.internal.metrics.exporter.NoopMetricsExporterService;
import java.io.Closeable;
import java.util.LinkedHashMap;
import java.util.Map;
Expand Down Expand Up @@ -48,12 +50,16 @@ public class ResponsiveMetrics implements Closeable {
private static final Pattern GLOBAL_THREAD_REGEX = Pattern.compile(".*-(GlobalStreamThread+)");

private OrderedTagsSupplier orderedTagsSupplier;
private final OtelMetricsService otelService;
private final MetricsExportService exportService;
private final Metrics metrics;

public ResponsiveMetrics(final Metrics metrics, final OtelMetricsService otelService) {
public ResponsiveMetrics(final Metrics metrics) {
this(metrics, new NoopMetricsExporterService());
}

public ResponsiveMetrics(final Metrics metrics, final MetricsExportService exportService) {
this.metrics = metrics;
this.otelService = otelService;
this.exportService = exportService;
}

/**
Expand Down Expand Up @@ -178,6 +184,6 @@ public void close() {
LOG.warn("Not all metrics were cleaned up before close: {}", metrics().keySet());
}
metrics.close();
otelService.close();
exportService.close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright 2023 Responsive Computing, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package dev.responsive.kafka.internal.metrics.exporter;

import java.io.Closeable;

public interface MetricsExportService extends Closeable {

void start();

@Override
void close();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright 2023 Responsive Computing, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package dev.responsive.kafka.internal.metrics.exporter;

public class NoopMetricsExporterService implements MetricsExportService {

@Override
public void start() {

}

@Override
public void close() {

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,43 +14,68 @@
* limitations under the License.
*/

package dev.responsive.kafka.internal.metrics;
package dev.responsive.kafka.internal.metrics.exporter.otel;

import dev.responsive.kafka.api.config.ResponsiveConfig;
import dev.responsive.kafka.internal.config.ConfigUtils;
import io.opentelemetry.api.OpenTelemetry;
import dev.responsive.kafka.internal.metrics.exporter.MetricsExportService;
import io.opentelemetry.api.baggage.propagation.W3CBaggagePropagator;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator;
import io.opentelemetry.context.propagation.ContextPropagators;
import io.opentelemetry.context.propagation.TextMapPropagator;
import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporter;
import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporterBuilder;
import io.opentelemetry.instrumentation.jmx.engine.JmxMetricInsight;
import io.opentelemetry.instrumentation.jmx.engine.MetricConfiguration;
import io.opentelemetry.instrumentation.jmx.yaml.JmxConfig;
import io.opentelemetry.instrumentation.jmx.yaml.JmxRule;
import io.opentelemetry.instrumentation.jmx.yaml.RuleParser;
import io.opentelemetry.sdk.OpenTelemetrySdk;
import io.opentelemetry.sdk.metrics.SdkMeterProvider;
import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader;
import io.opentelemetry.sdk.resources.Resource;
import java.io.Closeable;
import java.time.Duration;
import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.streams.StreamsConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import responsive.platform.auth.ApiKeyHeaders;

public class OtelMetricsService implements Closeable {
public class OtelMetricsService implements MetricsExportService {

private static final Logger LOG = LoggerFactory.getLogger(OtelMetricsService.class);

private static final String SERVICE_NAME_ATTR = "service.name";
private static final String RESPONSIVE_APPLICATION_ID_ATTR = "responsiveApplicationId";

private final JmxMetricInsight metricInsight;
private final Runnable onClose;
private final OpenTelemetrySdk otel;

public static OtelMetricsService create(
final StreamsConfig streamsConfig,
final ResponsiveConfig config
) {
final var exporter = OtlpGrpcMetricExporter
.builder()
.addHeader("api-key", config.getString(ResponsiveConfig.METRICS_API_KEY_CONFIG))
.addHeader("secret", config.getString(ResponsiveConfig.METRICS_API_KEY_CONFIG))
.setEndpoint(config.getString(ResponsiveConfig.CONTROLLER_ENDPOINT_CONFIG))
.build();
final OtlpGrpcMetricExporterBuilder builder = OtlpGrpcMetricExporter.builder();

final String apiKey = config.getString(ResponsiveConfig.METRICS_API_KEY_CONFIG);
final Password secret = config.getPassword(ResponsiveConfig.METRICS_SECRET_CONFIG);
if (secret == null ^ apiKey == null) {
throw new IllegalArgumentException(String.format(
"Invalid configuration, if configured to report metrics using %s, "
+ "then values for both %s and %s must be provided.",
ResponsiveConfig.METRICS_ENABLED_CONFIG,
ResponsiveConfig.METRICS_API_KEY_CONFIG,
ResponsiveConfig.METRICS_SECRET_CONFIG
));
} else if (secret != null) {
builder.addHeader(ApiKeyHeaders.API_KEY_METADATA_KEY, apiKey);
builder.addHeader(ApiKeyHeaders.SECRET_METADATA_KEY, secret.value());
}

builder.setEndpoint(config.getString(ResponsiveConfig.CONTROLLER_ENDPOINT_CONFIG));

final var exporter = builder.build();

final var metricReader = PeriodicMetricReader
.builder(exporter)
Expand All @@ -60,8 +85,8 @@ public static OtelMetricsService create(
final var appId = ConfigUtils.responsiveAppId(streamsConfig, config);
final var attributes = Attributes
.builder()
.put("service.name", appId + "-otel")
.put("responsiveApplicationId", appId)
.put(SERVICE_NAME_ATTR, appId + "-otel")
.put(RESPONSIVE_APPLICATION_ID_ATTR, appId)
.build();

final var meterProvider = SdkMeterProvider
Expand All @@ -79,45 +104,46 @@ public static OtelMetricsService create(
)))
.build();

return new OtelMetricsService(otel, otel::close);
return new OtelMetricsService(otel);
}

public static OtelMetricsService noop() {
return new OtelMetricsService(OpenTelemetry.noop(), () -> {});
}

private OtelMetricsService(final OpenTelemetry otel, final Runnable onClose) {
this.onClose = onClose;
private OtelMetricsService(final OpenTelemetrySdk otel) {
this.otel = otel;
this.metricInsight = JmxMetricInsight.createService(otel, 0);
}

@Override
public void start() {
this.metricInsight.start(buildMetricConfiguration());
}

@Override
public void close() {
onClose.run();
otel.close();
}

private static void buildFromUserRules(MetricConfiguration conf) {
RuleParser parserInstance = RuleParser.get();
private static MetricConfiguration buildMetricConfiguration() {
MetricConfiguration metricConfiguration = new MetricConfiguration();
addRulesFromJmxConfig(metricConfiguration);
return metricConfiguration;
}

private static void addRulesFromJmxConfig(MetricConfiguration conf) {
final RuleParser parserInstance = RuleParser.get();
final ClassLoader loader = OtelMetricsService.class.getClassLoader();

// TODO(agavra): instead of including otel-jmx.config.yaml as a resource we should
// fetch it from the Responsive controller on start-up
try (final var is = loader.getResourceAsStream("otel-jmx.config.yaml")) {
parserInstance.addMetricDefsTo(conf, is, "responsive builtin config");
} catch (Exception e) {
// yaml parsing errors are caught and logged inside addMetricDefsTo
// only file access related exceptions are expected here
JmxMetricInsight.getLogger().warning(e.toString());
try (final var inputStream = loader.getResourceAsStream("otel-jmx.config.yaml")) {
final JmxConfig jmxConfig = parserInstance.loadConfig(inputStream);
LOG.info("Found {} metric rules", jmxConfig.getRules().size());

for (final JmxRule rule : jmxConfig.getRules()) {
conf.addMetricDef(rule.buildMetricDef());
}
} catch (final Exception e) {
LOG.error("Unable to load rules from otel-jmx.config.yaml!", e);
throw new IllegalStateException("Unable to load rules from otel-jmx.config.yaml.", e);
}
}

private static MetricConfiguration buildMetricConfiguration() {
MetricConfiguration metricConfiguration = new MetricConfiguration();
buildFromUserRules(metricConfiguration);
return metricConfiguration;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public void setup() {
lenient().when(executor.scheduleAtFixedRate(any(), anyLong(), anyLong(), any()))
.thenReturn((ScheduledFuture) pollFuture);

final var responsiveMetrics = new ResponsiveMetrics(metrics, OtelMetricsService.noop());
final var responsiveMetrics = new ResponsiveMetrics(metrics);
responsiveMetrics.initializeTags(
GROUP, CLIENT, new ClientVersionMetadata("1", "abc", "2", "dfe"), Collections.emptyMap());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,7 @@ class MetricPublishingCommitListenerTest {

@BeforeEach
public void setup() {
final ResponsiveMetrics responsiveMetrics
= new ResponsiveMetrics(metrics, OtelMetricsService.noop());
final ResponsiveMetrics responsiveMetrics = new ResponsiveMetrics(metrics);
responsiveMetrics.initializeTags(
GROUP, CLIENT, new ClientVersionMetadata("1", "abc", "2", "dfe"), Collections.emptyMap());
listener = new MetricPublishingCommitListener(responsiveMetrics, THREAD_ID, offsetRecorder);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public class ResponsiveRestoreListenerTest {

@BeforeEach
public void setup() {
responsiveMetrics = new ResponsiveMetrics(metrics, OtelMetricsService.noop());
responsiveMetrics = new ResponsiveMetrics(metrics);
responsiveMetrics.initializeTags(
APP_ID,
CLIENT_ID,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@
import dev.responsive.kafka.internal.db.partitioning.SubPartitioner;
import dev.responsive.kafka.internal.db.spec.BaseTableSpec;
import dev.responsive.kafka.internal.metrics.ClientVersionMetadata;
import dev.responsive.kafka.internal.metrics.OtelMetricsService;
import dev.responsive.kafka.internal.metrics.ResponsiveMetrics;
import dev.responsive.kafka.internal.utils.ExceptionSupplier;
import dev.responsive.kafka.internal.utils.SessionClients;
Expand Down Expand Up @@ -174,7 +173,7 @@ public void before(
Optional.of(client),
admin
);
final var responsiveMetrics = new ResponsiveMetrics(metrics, OtelMetricsService.noop());
final var responsiveMetrics = new ResponsiveMetrics(metrics);
responsiveMetrics.initializeTags(
"commit-buffer-test-app",
"commit-buffer-test-app-node-1",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package dev.responsive.kafka.internal.stores;

import dev.responsive.kafka.internal.metrics.ClientVersionMetadata;
import dev.responsive.kafka.internal.metrics.OtelMetricsService;
import dev.responsive.kafka.internal.metrics.ResponsiveMetrics;
import dev.responsive.kafka.internal.metrics.ResponsiveRestoreListener;
import java.util.Collections;
Expand All @@ -36,7 +35,7 @@ public TTDRestoreListener(final ResponsiveMetrics metrics) {

public static TTDRestoreListener mockRestoreListener(final Properties props) {
final String appId = props.getProperty(StreamsConfig.APPLICATION_ID_CONFIG);
final var metrics = new ResponsiveMetrics(new Metrics(), OtelMetricsService.noop());
final var metrics = new ResponsiveMetrics(new Metrics());
metrics.initializeTags(
appId,
appId + "-client",
Expand Down

0 comments on commit 6139222

Please sign in to comment.