Skip to content

Commit

Permalink
fix dimension names
Browse files Browse the repository at this point in the history
  • Loading branch information
m-nagarajan committed Oct 17, 2024
1 parent 801478b commit a636f3b
Show file tree
Hide file tree
Showing 7 changed files with 80 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,12 @@

public class VeniceOpenTelemetryMetricsRepository {
private static final Logger LOGGER = LogManager.getLogger(VeniceOpenTelemetryMetricsRepository.class);

private static final String FLUENTBIT_TAG_PATTERN =
"serviceName:%s,instanceId:%s,containerName:%s,mdmAccount:%s,mdmNamespace:%s";
private static final String FLUENTBIT_ENDPOINT = "http://[::1]:22784/v1/metrics";
private static final String FLUENTBIT_HEADER_NAME = "X-LI-Fluentbit-Tag";

private static final String NOOP_ACCOUNT = "noopAccount";
private static final String NOOP_NAMESPACE = "noopNamespace";

private OpenTelemetry openTelemetry = null;
private SdkMeterProvider sdkMeterProvider = null;
Meter meter;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package com.linkedin.venice.stats;

public enum VeniceMetricsDimensions {
VENICE_STORE_NAME("Venice.Store.Name"),

/** {@link com.linkedin.venice.read.RequestType} */
VENICE_REQUEST_METHOD("Venice.Request.Method"),

/** {@link io.netty.handler.codec.http.HttpResponseStatus} */
HTTP_RESPONSE_STATUS_CODE("Http.Response.StatusCode"),

/** {@link io.netty.handler.codec.http.HttpStatusClass} */
HTTP_RESPONSE_STATUS_CODE_CATEGORY("Http.Response.StatusCodeCategory"),

/** {@link VeniceResponseStatus} */
VENICE_RESPONSE_STATUS_CODE("Venice.Response.StatusCode");

private final String dimensionName;

VeniceMetricsDimensions(String dimensionName) {
this.dimensionName = dimensionName;
}

public String getDimensionName() {
return this.dimensionName;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package com.linkedin.venice.stats;

public enum VeniceResponseStatus {
HEALTHY, UNHEALTHY, TARDY, THROTTLED
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.linkedin.venice.router.stats.RouterStats;
import com.linkedin.venice.router.streaming.SuccessfulStreamingResponse;
import com.linkedin.venice.schema.avro.ReadAvroProtocolDefinition;
import com.linkedin.venice.stats.VeniceResponseStatus;
import com.linkedin.venice.utils.LatencyUtils;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
Expand Down Expand Up @@ -230,7 +231,7 @@ public FullHttpResponse buildResponse(
}
}

HttpResponseStatus responseStatus = finalResponse.status();
HttpResponseStatus httpResponseStatus = finalResponse.status();
Map<MetricNames, TimeValue> allMetrics = metrics.getMetrics();
/**
* All the metrics in {@link com.linkedin.ddsstorage.router.api.MetricNames} are supported in {@link Metrics}.
Expand All @@ -243,23 +244,27 @@ public FullHttpResponse buildResponse(
// TODO: When a batch get throws a quota exception, the ROUTER_SERVER_TIME is missing, so we can't record anything
// here...
double latency = LatencyUtils.convertNSToMS(timeValue.getRawValue(TimeUnit.NANOSECONDS));
stats.recordLatency(storeName, latency);
// new metrics can be added here based on responseStatus
if (HEALTHY_STATUSES.contains(responseStatus)) {
VeniceResponseStatus veniceResponseStatus;
if (HEALTHY_STATUSES.contains(httpResponseStatus)) {
routerStats.getStatsByType(RequestType.SINGLE_GET)
.recordReadQuotaUsage(storeName, venicePath.getPartitionKeys().size());
if (isFastRequest(latency, requestType)) {
stats.recordHealthyRequest(storeName, latency);
veniceResponseStatus = VeniceResponseStatus.HEALTHY;
} else {
stats.recordTardyRequest(storeName, latency);
veniceResponseStatus = VeniceResponseStatus.TARDY;
}
} else if (responseStatus.equals(TOO_MANY_REQUESTS)) {
} else if (httpResponseStatus.equals(TOO_MANY_REQUESTS)) {
LOGGER.debug("request is rejected by storage node because quota is exceeded");
stats.recordThrottledRequest(storeName, latency);
veniceResponseStatus = VeniceResponseStatus.THROTTLED;
} else {
LOGGER.debug("Unhealthy request detected, latency: {}ms, response status: {}", latency, responseStatus);
LOGGER.debug("Unhealthy request detected, latency: {}ms, response status: {}", latency, httpResponseStatus);
stats.recordUnhealthyRequest(storeName, latency);
veniceResponseStatus = VeniceResponseStatus.UNHEALTHY;
}
stats.recordLatency(storeName, latency, httpResponseStatus, veniceResponseStatus);
}
timeValue = allMetrics.get(ROUTER_RESPONSE_WAIT_TIME);
if (timeValue != null) {
Expand All @@ -276,7 +281,7 @@ public FullHttpResponse buildResponse(
double routingTime = LatencyUtils.convertNSToMS(timeValue.getRawValue(TimeUnit.NANOSECONDS));
stats.recordRequestRoutingLatency(storeName, routingTime);
}
if (HEALTHY_STATUSES.contains(responseStatus) && !venicePath.isStreamingRequest()) {
if (HEALTHY_STATUSES.contains(httpResponseStatus) && !venicePath.isStreamingRequest()) {
// Only record successful response
stats.recordResponseSize(storeName, finalResponse.content().readableBytes());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
import com.linkedin.venice.stats.AbstractVeniceAggStats;
import com.linkedin.venice.stats.AbstractVeniceAggStoreStats;
import com.linkedin.venice.stats.VeniceMetricsRepository;
import com.linkedin.venice.stats.VeniceResponseStatus;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import io.netty.handler.codec.http.HttpResponseStatus;
import java.util.Map;
import java.util.function.Function;

Expand Down Expand Up @@ -144,9 +146,13 @@ public void recordFanoutRequestCount(String storeName, int count) {
getStoreStats(storeName).recordFanoutRequestCount(count);
}

public void recordLatency(String storeName, double latency) {
totalStats.recordLatency(latency);
getStoreStats(storeName).recordLatency(latency);
public void recordLatency(
String storeName,
double latency,
HttpResponseStatus responseStatus,
VeniceResponseStatus veniceResponseStatus) {
totalStats.recordLatency(latency, responseStatus, veniceResponseStatus);
getStoreStats(storeName).recordLatency(latency, responseStatus, veniceResponseStatus);
}

public void recordResponseWaitingTime(String storeName, double waitingTime) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@
import com.linkedin.venice.stats.LambdaStat;
import com.linkedin.venice.stats.TehutiUtils;
import com.linkedin.venice.stats.VeniceMetricsConfig;
import com.linkedin.venice.stats.VeniceMetricsDimensions;
import com.linkedin.venice.stats.VeniceMetricsRepository;
import com.linkedin.venice.stats.VeniceResponseStatus;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.DoubleHistogram;
import io.opentelemetry.api.metrics.LongCounter;
Expand Down Expand Up @@ -40,12 +43,8 @@ public class RouterHttpRequestStats extends AbstractVeniceHttpStats {
private final Sensor requestSensor;
private final LongCounter requestSensorOtel;
private final Sensor healthySensor;
private final LongCounter healthySensorOtel;
private final Sensor unhealthySensor;
private final LongCounter unhealthySensorOtel;
private final Sensor tardySensor;
private final LongCounter tardySensorOtel;
// response status based metrics
private final Sensor healthyRequestRateSensor;
private final Sensor tardyRequestRatioSensor;
private final Sensor throttleSensor;
Expand All @@ -54,11 +53,8 @@ public class RouterHttpRequestStats extends AbstractVeniceHttpStats {
private final Sensor latencySensor;
private final DoubleHistogram latencySensorOtel;
private final Sensor healthyRequestLatencySensor;
private final DoubleHistogram healthyRequestLatencySensorOtel;
private final Sensor unhealthyRequestLatencySensor;
private final DoubleHistogram unhealthyRequestLatencySensorOtel;
private final Sensor tardyRequestLatencySensor;
private final DoubleHistogram tardyRequestLatencySensorOtel;
private final Sensor throttledRequestLatencySensor;
private final Sensor requestSizeSensor;
private final Sensor compressedResponseSizeSensor;
Expand Down Expand Up @@ -103,26 +99,22 @@ public RouterHttpRequestStats(
ScatterGatherStats scatterGatherStats,
boolean isKeyValueProfilingEnabled) {
super(metricsRepository, storeName, requestType);
metricDimensions =
Attributes.builder().put("storeName", storeName).put("requestType", requestType.toString()).build();
metricDimensions = Attributes.builder()
.put(VeniceMetricsDimensions.VENICE_STORE_NAME.getDimensionName(), storeName)
.put(VeniceMetricsDimensions.VENICE_REQUEST_METHOD.getDimensionName(), requestType.toString())
.build();

this.systemStoreName = VeniceSystemStoreUtils.extractSystemStoreType(storeName);
Rate requestRate = new OccurrenceRate();
Rate healthyRequestRate = new OccurrenceRate();
Rate tardyRequestRate = new OccurrenceRate();
requestSensor = registerSensor("request", new Count(), requestRate);
requestSensorOtel =
metricsRepository.getOpenTelemetryMetricsRepository().getCounter("CallCount", "NUMBER", "All requests count");
requestSensorOtel = metricsRepository.getOpenTelemetryMetricsRepository()
.getCounter("CallCount", "NUMBER", "Count of all incoming requests");
healthySensor = registerSensor("healthy_request", new Count(), healthyRequestRate);
healthySensorOtel = metricsRepository.getOpenTelemetryMetricsRepository()
.getCounter("SuccessCount", "NUMBER", "All healthy requests count");
unhealthySensor = registerSensor("unhealthy_request", new Count());
unhealthySensorOtel = metricsRepository.getOpenTelemetryMetricsRepository()
.getCounter("ErrorCount", "NUMBER", "All unhealthy requests count");
unavailableReplicaStreamingRequestSensor = registerSensor("unavailable_replica_streaming_request", new Count());
tardySensor = registerSensor("tardy_request", new Count(), tardyRequestRate);
tardySensorOtel = metricsRepository.getOpenTelemetryMetricsRepository()
.getCounter("TardyCount", "NUMBER", "All tardy requests count");
healthyRequestRateSensor =
registerSensor(new TehutiUtils.SimpleRatioStat(healthyRequestRate, requestRate, "healthy_request_ratio"));
tardyRequestRatioSensor =
Expand All @@ -139,15 +131,9 @@ public RouterHttpRequestStats(

healthyRequestLatencySensor =
registerSensorWithDetailedPercentiles("healthy_request_latency", new Avg(), new Max(0));
healthyRequestLatencySensorOtel = metricsRepository.getOpenTelemetryMetricsRepository()
.getHistogram("SuccessTime", TimeUnit.MILLISECONDS.toString(), "Latency of all healthy requests");
unhealthyRequestLatencySensor =
registerSensorWithDetailedPercentiles("unhealthy_request_latency", new Avg(), new Max(0));
unhealthyRequestLatencySensorOtel = metricsRepository.getOpenTelemetryMetricsRepository()
.getHistogram("ErrorTime", TimeUnit.MILLISECONDS.toString(), "Latency of all unhealthy requests");
tardyRequestLatencySensor = registerSensorWithDetailedPercentiles("tardy_request_latency", new Avg(), new Max(0));
tardyRequestLatencySensorOtel = metricsRepository.getOpenTelemetryMetricsRepository()
.getHistogram("TardyTime", TimeUnit.MILLISECONDS.toString(), "Latency of all tardy requests");
throttledRequestLatencySensor =
registerSensorWithDetailedPercentiles("throttled_request_latency", new Avg(), new Max(0));
routerResponseWaitingTimeSensor = registerSensor(
Expand Down Expand Up @@ -247,16 +233,13 @@ public void recordRequest() {

public void recordHealthyRequest(Double latency) {
healthySensor.record();
healthySensorOtel.add(1, this.metricDimensions);
if (latency != null) {
healthyRequestLatencySensor.record(latency);
healthyRequestLatencySensorOtel.record(latency);
}
}

public void recordUnhealthyRequest() {
unhealthySensor.record();
unhealthySensorOtel.add(1, this.metricDimensions);
}

public void recordUnavailableReplicaStreamingRequest() {
Expand All @@ -266,7 +249,6 @@ public void recordUnavailableReplicaStreamingRequest() {
public void recordUnhealthyRequest(double latency) {
recordUnhealthyRequest();
unhealthyRequestLatencySensor.record(latency);
unhealthyRequestLatencySensorOtel.record(latency);
}

/**
Expand All @@ -279,9 +261,7 @@ public void recordReadQuotaUsage(int quotaUsage) {

public void recordTardyRequest(double latency) {
tardySensor.record();
tardySensorOtel.add(1, this.metricDimensions);
tardyRequestLatencySensor.record(latency);
tardyRequestLatencySensorOtel.record(latency);
}

public void recordThrottledRequest(double latency) {
Expand Down Expand Up @@ -322,9 +302,22 @@ public void recordFanoutRequestCount(int count) {
}
}

public void recordLatency(double latency) {
public void recordLatency(
double latency,
HttpResponseStatus responseStatus,
VeniceResponseStatus veniceResponseStatus) {
latencySensor.record(latency);
latencySensorOtel.record(latency, metricDimensions);
Attributes tempMetricDimensions = Attributes.builder()
.putAll(metricDimensions)
.put(
VeniceMetricsDimensions.HTTP_RESPONSE_STATUS_CODE.getDimensionName(),
responseStatus.codeAsText().toString())
.put(
VeniceMetricsDimensions.HTTP_RESPONSE_STATUS_CODE_CATEGORY.getDimensionName(),
responseStatus.codeClass().name())
.put(VeniceMetricsDimensions.VENICE_RESPONSE_STATUS_CODE.getDimensionName(), veniceResponseStatus.toString())
.build();
latencySensorOtel.record(latency, tempMetricDimensions);
}

public void recordResponseWaitingTime(double waitingTime) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
import com.linkedin.venice.read.RequestType;
import com.linkedin.venice.router.stats.AggRouterHttpRequestStats;
import com.linkedin.venice.stats.VeniceMetricsRepository;
import com.linkedin.venice.stats.VeniceResponseStatus;
import com.linkedin.venice.tehuti.MockTehutiReporter;
import io.netty.handler.codec.http.HttpResponseStatus;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.BeforeSuite;
Expand Down Expand Up @@ -45,7 +47,7 @@ public void testAggRouterMetrics() {
Assert.assertEquals(reporter.query(".store1--error_retry.Count").value(), 1d);

for (int i = 1; i <= 100; i += 1) {
stats.recordLatency("store2", i);
stats.recordLatency("store2", i, HttpResponseStatus.OK, VeniceResponseStatus.HEALTHY);
}

Assert.assertEquals((int) reporter.query(".total--latency.50thPercentile").value(), 50);
Expand Down

0 comments on commit a636f3b

Please sign in to comment.