Skip to content

Commit

Permalink
otel integration to venice-router
Browse files Browse the repository at this point in the history
  • Loading branch information
m-nagarajan committed Nov 14, 2024
1 parent 06bf58c commit ca65467
Show file tree
Hide file tree
Showing 70 changed files with 2,525 additions and 217 deletions.
5 changes: 5 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,11 @@ ext.libraries = [
zkclient: 'com.101tec:zkclient:0.7', // For Kafka AdminUtils
zookeeper: 'org.apache.zookeeper:zookeeper:3.6.3',
zstd: 'com.github.luben:zstd-jni:1.5.2-3',
opentelemetryApi: "io.opentelemetry:opentelemetry-api:1.43.0",
opentelemetrySdk: "io.opentelemetry:opentelemetry-sdk:1.43.0",
opentelemetryExporterLogging: "io.opentelemetry:opentelemetry-exporter-logging:1.43.0",
opentelemetryExporterOtlp: "io.opentelemetry:opentelemetry-exporter-otlp:1.43.0",
opentelemetryExporterCommon: "io.opentelemetry:opentelemetry-exporter-common:1.43.0"
]

group = 'com.linkedin.venice'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import com.linkedin.venice.meta.StoreDataChangedListener;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.meta.VersionStatus;
import com.linkedin.venice.stats.StatsSupplier;
import com.linkedin.venice.stats.StatsSupplierMetricsRepository;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import io.tehuti.metrics.MetricsRepository;
Expand All @@ -25,7 +25,7 @@ public abstract class AbstractVeniceAggVersionedStats<STATS, STATS_REPORTER exte
private static final Logger LOGGER = LogManager.getLogger(AbstractVeniceAggVersionedStats.class);

private final Supplier<STATS> statsInitiator;
private final StatsSupplier<STATS_REPORTER> reporterSupplier;
private final StatsSupplierMetricsRepository<STATS_REPORTER> reporterSupplier;

protected final ReadOnlyStoreRepository metadataRepository;
private final MetricsRepository metricsRepository;
Expand All @@ -37,7 +37,7 @@ public AbstractVeniceAggVersionedStats(
MetricsRepository metricsRepository,
ReadOnlyStoreRepository metadataRepository,
Supplier<STATS> statsInitiator,
StatsSupplier<STATS_REPORTER> reporterSupplier,
StatsSupplierMetricsRepository<STATS_REPORTER> reporterSupplier,
boolean unregisterMetricForDeletedStoreEnabled) {
this.metadataRepository = metadataRepository;
this.metricsRepository = metricsRepository;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.meta.ReadOnlyStoreRepository;
import com.linkedin.venice.stats.AbstractVeniceAggStoreStats;
import com.linkedin.venice.stats.StatsSupplier;
import com.linkedin.venice.stats.StatsSupplierMetricsRepository;
import com.linkedin.venice.utils.Time;
import io.tehuti.metrics.MetricsRepository;
import java.util.Map;
Expand All @@ -29,7 +29,7 @@ public AggHostLevelIngestionStats(
unregisterMetricForDeletedStoreEnabled);
}

static class HostLevelStoreIngestionStatsSupplier implements StatsSupplier<HostLevelIngestionStats> {
static class HostLevelStoreIngestionStatsSupplier implements StatsSupplierMetricsRepository<HostLevelIngestionStats> {
private final VeniceServerConfig serverConfig;
private final Map<String, StoreIngestionTask> ingestionTaskMap;
private final Time time;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.meta.ReadOnlyStoreRepository;
import com.linkedin.venice.stats.AbstractVeniceAggStoreStats;
import com.linkedin.venice.stats.StatsSupplier;
import com.linkedin.venice.stats.StatsSupplierMetricsRepository;
import com.linkedin.venice.utils.SystemTime;
import io.tehuti.metrics.MetricsRepository;
import java.util.function.LongSupplier;
Expand Down Expand Up @@ -99,7 +99,7 @@ public void recordTotalLatestOffsetIsPresent() {
totalStats.recordLatestOffsetIsPresent();
}

static class KafkaConsumerServiceStatsSupplier implements StatsSupplier<KafkaConsumerServiceStats> {
static class KafkaConsumerServiceStatsSupplier implements StatsSupplierMetricsRepository<KafkaConsumerServiceStats> {
private final LongSupplier getMaxElapsedTimeSinceLastPollInConsumerPool;

KafkaConsumerServiceStatsSupplier(LongSupplier getMaxElapsedTimeSinceLastPollInConsumerPool) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.linkedin.davinci.stats;

import com.linkedin.venice.stats.StatsSupplier;
import com.linkedin.venice.stats.StatsSupplierMetricsRepository;
import io.tehuti.metrics.MetricsRepository;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
Expand All @@ -25,7 +25,7 @@ public VeniceVersionedStats(
MetricsRepository metricsRepository,
String storeName,
Supplier<STATS> statsInitiator,
StatsSupplier<STATS_REPORTER> reporterSupplier) {
StatsSupplierMetricsRepository<STATS_REPORTER> reporterSupplier) {
this.storeName = storeName;
this.versionedStats = new Int2ObjectOpenHashMap<>();
this.reporters = new VeniceVersionedStatsReporter<>(metricsRepository, storeName, reporterSupplier);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import com.linkedin.venice.common.VeniceSystemStoreUtils;
import com.linkedin.venice.stats.AbstractVeniceStats;
import com.linkedin.venice.stats.StatsSupplier;
import com.linkedin.venice.stats.StatsSupplierMetricsRepository;
import io.tehuti.metrics.MetricsRepository;
import io.tehuti.metrics.stats.AsyncGauge;

Expand All @@ -22,7 +22,7 @@ public class VeniceVersionedStatsReporter<STATS, STATS_REPORTER extends Abstract
public VeniceVersionedStatsReporter(
MetricsRepository metricsRepository,
String storeName,
StatsSupplier<STATS_REPORTER> statsSupplier) {
StatsSupplierMetricsRepository<STATS_REPORTER> statsSupplier) {
super(metricsRepository, storeName);

this.isSystemStore = VeniceSystemStoreUtils.isSystemStore(storeName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import com.linkedin.davinci.stats.AbstractVeniceAggVersionedStats;
import com.linkedin.venice.meta.ReadOnlyStoreRepository;
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.stats.StatsSupplier;
import com.linkedin.venice.stats.StatsSupplierMetricsRepository;
import io.tehuti.metrics.MetricsRepository;
import java.util.Map;
import java.util.function.Supplier;
Expand All @@ -18,7 +18,7 @@ public HeartbeatVersionedStats(
MetricsRepository metricsRepository,
ReadOnlyStoreRepository metadataRepository,
Supplier<HeartbeatStat> statsInitiator,
StatsSupplier<HeartbeatStatReporter> reporterSupplier,
StatsSupplierMetricsRepository<HeartbeatStatReporter> reporterSupplier,
Map<String, Map<Integer, Map<Integer, Map<String, Pair<Long, Boolean>>>>> leaderMonitors,
Map<String, Map<Integer, Map<Integer, Map<String, Pair<Long, Boolean>>>>> followerMonitors) {
super(metricsRepository, metadataRepository, statsInitiator, reporterSupplier, true);
Expand Down
5 changes: 5 additions & 0 deletions internal/venice-client-common/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ dependencies {
implementation libraries.log4j2api
implementation libraries.zstd
implementation libraries.conscrypt
implementation libraries.opentelemetryApi
implementation libraries.opentelemetrySdk
implementation libraries.opentelemetryExporterLogging
implementation libraries.opentelemetryExporterOtlp
implementation libraries.opentelemetryExporterCommon

testImplementation project(':internal:venice-test-common')
testImplementation project(':clients:venice-thin-client')
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,23 @@
package com.linkedin.venice.read;

public enum RequestType {
SINGLE_GET(""), MULTI_GET("multiget_"), MULTI_GET_STREAMING("multiget_streaming_"), COMPUTE("compute_"),
COMPUTE_STREAMING("compute_streaming_");
SINGLE_GET("", "single_get"), MULTI_GET("multiget_", "multi_get"),
MULTI_GET_STREAMING("multiget_streaming_", "multi_get_streaming"), COMPUTE("compute_", "compute"),
COMPUTE_STREAMING("compute_streaming_", "compute_streaming");

private String metricPrefix;
private String requestTypeName;

RequestType(String metricPrefix) {
RequestType(String metricPrefix, String requestTypeName) {
this.metricPrefix = metricPrefix;
this.requestTypeName = requestTypeName;
}

public String getMetricPrefix() {
return this.metricPrefix;
}

public String getRequestTypeName() {
return this.requestTypeName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,40 +10,81 @@ public abstract class AbstractVeniceAggStats<T extends AbstractVeniceStats> {
protected T totalStats;
protected final Map<String, T> storeStats = new VeniceConcurrentHashMap<>();

private StatsSupplier<T> statsFactory;
private StatsSupplierMetricsRepository<T> statsFactoryMetricsRepository;
private StatsSupplierVeniceMetricsRepository<T> statsFactoryVeniceMetricsRepository;

private final MetricsRepository metricsRepository;
private String clusterName = null;

private AbstractVeniceAggStats(MetricsRepository metricsRepository, StatsSupplier<T> statsSupplier, T totalStats) {
private AbstractVeniceAggStats(
MetricsRepository metricsRepository,
StatsSupplierMetricsRepository<T> statsSupplier,
T totalStats) {
this.metricsRepository = metricsRepository;
this.statsFactoryMetricsRepository = statsSupplier;
this.totalStats = totalStats;
}

private AbstractVeniceAggStats(
VeniceMetricsRepository metricsRepository,
StatsSupplierVeniceMetricsRepository<T> statsSupplier,
String clusterName,
T totalStats) {
this.metricsRepository = metricsRepository;
this.statsFactory = statsSupplier;
this.statsFactoryVeniceMetricsRepository = statsSupplier;
this.clusterName = clusterName;
this.totalStats = totalStats;
}

public AbstractVeniceAggStats(MetricsRepository metricsRepository, StatsSupplier<T> statsSupplier) {
public AbstractVeniceAggStats(MetricsRepository metricsRepository, StatsSupplierMetricsRepository<T> statsSupplier) {
this(metricsRepository, statsSupplier, statsSupplier.get(metricsRepository, STORE_NAME_FOR_TOTAL_STAT, null));
}

public AbstractVeniceAggStats(MetricsRepository metricsRepository) {
public AbstractVeniceAggStats(
StatsSupplierVeniceMetricsRepository<T> statsSupplier,
VeniceMetricsRepository metricsRepository,
String clusterName) {
this(
metricsRepository,
statsSupplier,
clusterName,
statsSupplier.get(metricsRepository, STORE_NAME_FOR_TOTAL_STAT, clusterName, null));
}

public AbstractVeniceAggStats(MetricsRepository metricsRepository, String clusterName) {
this.metricsRepository = metricsRepository;
this.clusterName = clusterName;
}

public void setStatsSupplier(StatsSupplier<T> statsSupplier) {
this.statsFactory = statsSupplier;
this.totalStats = statsSupplier.get(metricsRepository, STORE_NAME_FOR_TOTAL_STAT, null);
public void setStatsSupplier(StatsSupplierVeniceMetricsRepository<T> statsSupplier) {
this.statsFactoryVeniceMetricsRepository = statsSupplier;
if (metricsRepository instanceof VeniceMetricsRepository) {
this.totalStats =
statsSupplier.get((VeniceMetricsRepository) metricsRepository, STORE_NAME_FOR_TOTAL_STAT, clusterName, null);
}
}

public AbstractVeniceAggStats(
String clusterName,
MetricsRepository metricsRepository,
StatsSupplier<T> statsSupplier) {
StatsSupplierMetricsRepository<T> statsSupplier) {
this(
metricsRepository,
statsSupplier,
statsSupplier.get(metricsRepository, STORE_NAME_FOR_TOTAL_STAT + "." + clusterName, null));
this.clusterName = clusterName;
}

public T getStoreStats(String storeName) {
return storeStats.computeIfAbsent(storeName, k -> statsFactory.get(metricsRepository, storeName, totalStats));
if (metricsRepository instanceof VeniceMetricsRepository) {
return storeStats.computeIfAbsent(
storeName,
k -> statsFactoryVeniceMetricsRepository
.get((VeniceMetricsRepository) metricsRepository, storeName, clusterName, totalStats));
} else {
return storeStats
.computeIfAbsent(storeName, k -> statsFactoryMetricsRepository.get(metricsRepository, storeName, totalStats));
}
}

public T getNullableStoreStats(String storeName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import io.tehuti.metrics.MetricsRepository;


public interface StatsSupplier<T extends AbstractVeniceStats> {
public interface StatsSupplierMetricsRepository<T extends AbstractVeniceStats> {
/**
* Legacy function, for implementations that do not use total stats in their constructor.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package com.linkedin.venice.stats;

/** copy of {@link StatsSupplierMetricsRepository} for {@link VeniceMetricsRepository} */
public interface StatsSupplierVeniceMetricsRepository<T extends AbstractVeniceStats> {
/**
* Legacy function, for implementations that do not use total stats in their constructor.
*
* @see #get(VeniceMetricsRepository, String, String, AbstractVeniceStats) which is the only caller.
*/
T get(VeniceMetricsRepository metricsRepository, String storeName, String clusterName);

/**
* This is the function that gets called by {@link AbstractVeniceAggStats}, and concrete classes can
* optionally implement it in order to be provided with the total stats instance.
*/
default T get(VeniceMetricsRepository metricsRepository, String storeName, String clusterName, T totalStats) {
return get(metricsRepository, storeName, clusterName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io.tehuti.metrics.stats.Percentiles;
import io.tehuti.metrics.stats.Rate;
import java.util.Arrays;
import java.util.Map;


/**
Expand Down Expand Up @@ -129,6 +130,19 @@ public static MetricsRepository getMetricsRepository(String serviceName) {
return metricsRepository;
}

public static VeniceMetricsRepository getVeniceMetricsRepository(
String serviceName,
String metricPrefix,
Map<String, String> configs) {
VeniceMetricsRepository metricsRepository = new VeniceMetricsRepository(
new VeniceMetricsConfig.VeniceMetricsConfigBuilder().setServiceName(serviceName)
.setMetricPrefix(metricPrefix)
.extractAndSetOtelConfigs(configs)
.build());
metricsRepository.addReporter(new JmxReporter(serviceName));
return metricsRepository;
}

/**
* A valid metric name needs to pass the test in {@link javax.management.ObjectName}. This helper function will
* try to fix all invalid character mentioned in the above function to avoid MalformedObjectNameException; besides,
Expand Down
Loading

0 comments on commit ca65467

Please sign in to comment.