Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[router] remove store level metrics for non-streaming multiget #1306

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,14 @@
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import io.tehuti.metrics.MetricsRepository;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Function;


public class AggRouterHttpRequestStats extends AbstractVeniceAggStoreStats<RouterHttpRequestStats> {
private final Map<String, ScatterGatherStats> scatterGatherStatsMap = new VeniceConcurrentHashMap<>();
private final boolean isStoreStatsEnabled;
private final ScatterGatherStats totalScatterGatherStats;

public AggRouterHttpRequestStats(
MetricsRepository metricsRepository,
Expand All @@ -29,14 +32,18 @@ public AggRouterHttpRequestStats(
ReadOnlyStoreRepository metadataRepository,
boolean isUnregisterMetricForDeletedStoreEnabled) {
super(metricsRepository, metadataRepository, isUnregisterMetricForDeletedStoreEnabled);
// Disable store level non-streaming multi get stats reporting because it's no longer used in clients. We still
// report to the total stats for visibility of potential old clients.
isStoreStatsEnabled = !RequestType.MULTI_GET.equals(requestType);
totalScatterGatherStats = new AggScatterGatherStats();
/**
* Use a setter function to bypass the restriction that the supertype constructor could not
* touch member fields of current object.
*/
setStatsSupplier((metricsRepo, storeName) -> {
ScatterGatherStats stats;
if (storeName.equals(AbstractVeniceAggStats.STORE_NAME_FOR_TOTAL_STAT)) {
stats = new AggScatterGatherStats();
stats = totalScatterGatherStats;
} else {
stats = scatterGatherStatsMap.computeIfAbsent(storeName, k -> new ScatterGatherStats());
}
Expand All @@ -46,35 +53,45 @@ public AggRouterHttpRequestStats(
}

public ScatterGatherStats getScatterGatherStatsForStore(String storeName) {
return scatterGatherStatsMap.computeIfAbsent(storeName, k -> new ScatterGatherStats());
if (isStoreStatsEnabled) {
return scatterGatherStatsMap.computeIfAbsent(storeName, k -> new ScatterGatherStats());
} else {
return totalScatterGatherStats;
}
}

private void recordStoreStats(String storeName, Consumer<RouterHttpRequestStats> statsConsumer) {
if (isStoreStatsEnabled) {
statsConsumer.accept(getStoreStats(storeName));
}
}

public void recordRequest(String storeName) {
totalStats.recordRequest();
getStoreStats(storeName).recordRequest();
recordStoreStats(storeName, RouterHttpRequestStats::recordRequest);
}

public void recordHealthyRequest(String storeName, double latency) {
totalStats.recordHealthyRequest(latency);
getStoreStats(storeName).recordHealthyRequest(latency);
recordStoreStats(storeName, stats -> stats.recordHealthyRequest(latency));
}

public void recordUnhealthyRequest(String storeName) {
totalStats.recordUnhealthyRequest();
if (storeName != null) {
getStoreStats(storeName).recordUnhealthyRequest();
recordStoreStats(storeName, RouterHttpRequestStats::recordUnhealthyRequest);
}
}

public void recordUnavailableReplicaStreamingRequest(String storeName) {
totalStats.recordUnavailableReplicaStreamingRequest();
getStoreStats(storeName).recordUnavailableReplicaStreamingRequest();
recordStoreStats(storeName, RouterHttpRequestStats::recordUnavailableReplicaStreamingRequest);
}

public void recordUnhealthyRequest(String storeName, double latency) {
totalStats.recordUnhealthyRequest(latency);
if (storeName != null) {
getStoreStats(storeName).recordUnhealthyRequest(latency);
recordStoreStats(storeName, stats -> stats.recordUnhealthyRequest(latency));
}
}

Expand All @@ -86,12 +103,12 @@ public void recordUnhealthyRequest(String storeName, double latency) {
*/
public void recordReadQuotaUsage(String storeName, int quotaUsage) {
totalStats.recordReadQuotaUsage(quotaUsage);
getStoreStats(storeName).recordReadQuotaUsage(quotaUsage);
recordStoreStats(storeName, stats -> stats.recordReadQuotaUsage(quotaUsage));
}

public void recordTardyRequest(String storeName, double latency) {
totalStats.recordTardyRequest(latency);
getStoreStats(storeName).recordTardyRequest(latency);
recordStoreStats(storeName, stats -> stats.recordTardyRequest(latency));
}

/**
Expand All @@ -103,79 +120,79 @@ public void recordTardyRequest(String storeName, double latency) {
*/
public void recordThrottledRequest(String storeName) {
totalStats.recordThrottledRequest();
getStoreStats(storeName).recordThrottledRequest();
recordStoreStats(storeName, RouterHttpRequestStats::recordThrottledRequest);
}

public void recordThrottledRequest(String storeName, double latency) {
totalStats.recordThrottledRequest(latency);
getStoreStats(storeName).recordThrottledRequest(latency);
recordStoreStats(storeName, stats -> stats.recordThrottledRequest(latency));
}

public void recordBadRequest(String storeName) {
totalStats.recordBadRequest();
if (storeName != null) {
getStoreStats(storeName).recordBadRequest();
recordStoreStats(storeName, RouterHttpRequestStats::recordBadRequest);
}
}

public void recordBadRequestKeyCount(String storeName, int keyCount) {
totalStats.recordBadRequestKeyCount(keyCount);
if (storeName != null) {
getStoreStats(storeName).recordBadRequestKeyCount(keyCount);
recordStoreStats(storeName, stats -> stats.recordBadRequestKeyCount(keyCount));
}
}

public void recordRequestThrottledByRouterCapacity(String storeName) {
totalStats.recordRequestThrottledByRouterCapacity();
if (storeName != null) {
getStoreStats(storeName).recordRequestThrottledByRouterCapacity();
recordStoreStats(storeName, RouterHttpRequestStats::recordRequestThrottledByRouterCapacity);
}
}

public void recordErrorRetryCount(String storeName) {
totalStats.recordErrorRetryCount();
if (storeName != null) {
getStoreStats(storeName).recordErrorRetryCount();
recordStoreStats(storeName, RouterHttpRequestStats::recordErrorRetryCount);
}
}

public void recordFanoutRequestCount(String storeName, int count) {
totalStats.recordFanoutRequestCount(count);
getStoreStats(storeName).recordFanoutRequestCount(count);
recordStoreStats(storeName, stats -> stats.recordFanoutRequestCount(count));
}

public void recordLatency(String storeName, double latency) {
totalStats.recordLatency(latency);
getStoreStats(storeName).recordLatency(latency);
recordStoreStats(storeName, stats -> stats.recordLatency(latency));
}

public void recordResponseWaitingTime(String storeName, double waitingTime) {
totalStats.recordResponseWaitingTime(waitingTime);
getStoreStats(storeName).recordResponseWaitingTime(waitingTime);
recordStoreStats(storeName, stats -> stats.recordResponseWaitingTime(waitingTime));
}

public void recordRequestSize(String storeName, double keySize) {
totalStats.recordRequestSize(keySize);
getStoreStats(storeName).recordRequestSize(keySize);
recordStoreStats(storeName, stats -> stats.recordRequestSize(keySize));
}

public void recordCompressedResponseSize(String storeName, double compressedResponseSize) {
totalStats.recordCompressedResponseSize(compressedResponseSize);
getStoreStats(storeName).recordCompressedResponseSize(compressedResponseSize);
recordStoreStats(storeName, stats -> stats.recordCompressedResponseSize(compressedResponseSize));
}

public void recordResponseSize(String storeName, double valueSize) {
totalStats.recordResponseSize(valueSize);
getStoreStats(storeName).recordResponseSize(valueSize);
recordStoreStats(storeName, stats -> stats.recordResponseSize(valueSize));
}

public void recordDecompressionTime(String storeName, double decompressionTime) {
totalStats.recordDecompressionTime(decompressionTime);
getStoreStats(storeName).recordDecompressionTime(decompressionTime);
recordStoreStats(storeName, stats -> stats.recordDecompressionTime(decompressionTime));
}

public void recordQuota(String storeName, double quota) {
getStoreStats(storeName).recordQuota(quota);
recordStoreStats(storeName, stats -> stats.recordQuota(quota));
}

public void recordTotalQuota(double totalQuota) {
Expand All @@ -184,17 +201,17 @@ public void recordTotalQuota(double totalQuota) {

public void recordFindUnhealthyHostRequest(String storeName) {
totalStats.recordFindUnhealthyHostRequest();
getStoreStats(storeName).recordFindUnhealthyHostRequest();
recordStoreStats(storeName, RouterHttpRequestStats::recordFindUnhealthyHostRequest);
}

public void recordResponse(String storeName) {
totalStats.recordResponse();
getStoreStats(storeName).recordResponse();
recordStoreStats(storeName, RouterHttpRequestStats::recordResponse);
}

public void recordMetaStoreShadowRead(String storeName) {
totalStats.recordMetaStoreShadowRead();
getStoreStats(storeName).recordMetaStoreShadowRead();
recordStoreStats(storeName, RouterHttpRequestStats::recordMetaStoreShadowRead);
}

private class AggScatterGatherStats extends ScatterGatherStats {
Expand Down Expand Up @@ -234,47 +251,47 @@ public long getTotalRetriesError() {

public void recordKeyNum(String storeName, int keyNum) {
totalStats.recordKeyNum(keyNum);
getStoreStats(storeName).recordKeyNum(keyNum);
recordStoreStats(storeName, stats -> stats.recordKeyNum(keyNum));
}

public void recordRequestUsage(String storeName, int usage) {
totalStats.recordRequestUsage(usage);
getStoreStats(storeName).recordRequestUsage(usage);
recordStoreStats(storeName, stats -> stats.recordRequestUsage(usage));
}

public void recordMultiGetFallback(String storeName, int keyCount) {
totalStats.recordMultiGetFallback(keyCount);
getStoreStats(storeName).recordMultiGetFallback(keyCount);
recordStoreStats(storeName, stats -> stats.recordMultiGetFallback(keyCount));
}

public void recordRequestParsingLatency(String storeName, double latency) {
totalStats.recordRequestParsingLatency(latency);
getStoreStats(storeName).recordRequestParsingLatency(latency);
recordStoreStats(storeName, stats -> stats.recordRequestParsingLatency(latency));
}

public void recordRequestRoutingLatency(String storeName, double latency) {
totalStats.recordRequestRoutingLatency(latency);
getStoreStats(storeName).recordRequestRoutingLatency(latency);
recordStoreStats(storeName, stats -> stats.recordRequestRoutingLatency(latency));
}

public void recordUnavailableRequest(String storeName) {
totalStats.recordUnavailableRequest();
getStoreStats(storeName).recordUnavailableRequest();
recordStoreStats(storeName, RouterHttpRequestStats::recordUnavailableRequest);
}

public void recordDelayConstraintAbortedRetryRequest(String storeName) {
totalStats.recordDelayConstraintAbortedRetryRequest();
getStoreStats(storeName).recordDelayConstraintAbortedRetryRequest();
recordStoreStats(storeName, RouterHttpRequestStats::recordDelayConstraintAbortedRetryRequest);
}

public void recordSlowRouteAbortedRetryRequest(String storeName) {
totalStats.recordSlowRouteAbortedRetryRequest();
getStoreStats(storeName).recordSlowRouteAbortedRetryRequest();
recordStoreStats(storeName, RouterHttpRequestStats::recordSlowRouteAbortedRetryRequest);
}

public void recordRetryRouteLimitAbortedRetryRequest(String storeName) {
totalStats.recordRetryRouteLimitAbortedRetryRequest();
getStoreStats(storeName).recordRetryRouteLimitAbortedRetryRequest();
recordStoreStats(storeName, RouterHttpRequestStats::recordRetryRouteLimitAbortedRetryRequest);
}

public void recordKeySize(String storeName, long keySize) {
Expand All @@ -283,26 +300,26 @@ public void recordKeySize(String storeName, long keySize) {

public void recordAllowedRetryRequest(String storeName) {
totalStats.recordAllowedRetryRequest();
getStoreStats(storeName).recordAllowedRetryRequest();
recordStoreStats(storeName, RouterHttpRequestStats::recordAllowedRetryRequest);
}

public void recordDisallowedRetryRequest(String storeName) {
totalStats.recordDisallowedRetryRequest();
getStoreStats(storeName).recordDisallowedRetryRequest();
recordStoreStats(storeName, RouterHttpRequestStats::recordDisallowedRetryRequest);
}

public void recordNoAvailableReplicaAbortedRetryRequest(String storeName) {
totalStats.recordNoAvailableReplicaAbortedRetryRequest();
getStoreStats(storeName).recordRetryRouteLimitAbortedRetryRequest();
recordStoreStats(storeName, RouterHttpRequestStats::recordNoAvailableReplicaAbortedRetryRequest);
}

public void recordErrorRetryAttemptTriggeredByPendingRequestCheck(String storeName) {
totalStats.recordErrorRetryAttemptTriggeredByPendingRequestCheck();
getStoreStats(storeName).recordErrorRetryAttemptTriggeredByPendingRequestCheck();
recordStoreStats(storeName, RouterHttpRequestStats::recordErrorRetryAttemptTriggeredByPendingRequestCheck);
}

public void recordRetryDelay(String storeName, double delay) {
totalStats.recordRetryDelay(delay);
getStoreStats(storeName).recordRetryDelay(delay);
recordStoreStats(storeName, stats -> stats.recordRetryDelay(delay));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import com.linkedin.venice.read.RequestType;
import com.linkedin.venice.router.stats.AggRouterHttpRequestStats;
import com.linkedin.venice.tehuti.MockTehutiReporter;
import com.linkedin.venice.utils.Utils;
import io.tehuti.TehutiException;
import io.tehuti.metrics.MetricsRepository;
import org.mockito.Mockito;
import org.testng.Assert;
Expand Down Expand Up @@ -76,4 +78,37 @@ public void testProfilingMetrics() {
Assert.assertEquals((int) reporter.query(".total--compute_response_size.3thPercentile").value(), 3);
Assert.assertEquals((int) reporter.query(".total--compute_response_size.4thPercentile").value(), 4);
}

@Test
public void testDisableMultiGetStoreMetrics() {
AggRouterHttpRequestStats multiGetStats =
new AggRouterHttpRequestStats(metricsRepository, RequestType.MULTI_GET, storeMetadataRepository, true);
AggRouterHttpRequestStats streamingMultiGetStats = new AggRouterHttpRequestStats(
metricsRepository,
RequestType.MULTI_GET_STREAMING,
storeMetadataRepository,
true);
String storeName = Utils.getUniqueString("test-store");
multiGetStats.recordRequest(storeName);
streamingMultiGetStats.recordRequest(storeName);
multiGetStats.recordHealthyRequest(storeName, 10);
streamingMultiGetStats.recordHealthyRequest(storeName, 10);
// Total stats should exist for streaming and non-streaming multi-get
Assert.assertEquals((int) reporter.query(".total--multiget_request.Count").value(), 1);
Assert.assertEquals((int) reporter.query(".total--multiget_streaming_request.Count").value(), 1);
Assert.assertEquals((int) reporter.query(".total--multiget_healthy_request_latency.Max").value(), 10);
Assert.assertEquals((int) reporter.query(".total--multiget_streaming_healthy_request_latency.Max").value(), 10);
// Store level stats should only exist for streaming multi-get
Assert.assertEquals((int) reporter.query("." + storeName + "--multiget_streaming_request.Count").value(), 1);
Assert.assertEquals(
(int) reporter.query("." + storeName + "--multiget_streaming_healthy_request_latency.Max").value(),
10);
TehutiException exception =
Assert.expectThrows(TehutiException.class, () -> reporter.query("." + storeName + "--multiget_request.Count"));
Assert.assertTrue(exception.getMessage().contains("does not exist"));
exception = Assert.expectThrows(
TehutiException.class,
() -> reporter.query("." + storeName + "--multiget_healthy_request_latency.Max"));
Assert.assertTrue(exception.getMessage().contains("does not exist"));
}
}