From ad8bd7d36654c1755ccf77b4f4791c97d96e747c Mon Sep 17 00:00:00 2001 From: "Xin(Adam) Chen" Date: Thu, 25 Jan 2024 15:23:32 -0800 Subject: [PATCH] emit metrics for sys store since it reflects the correct total --- .../kafka/consumer/ConsumptionTask.java | 10 +--- .../stats/AggKafkaConsumerServiceStats.java | 8 --- .../venice/stats/AbstractVeniceStatsTest.java | 52 +++++++++++++++++++ 3 files changed, 54 insertions(+), 16 deletions(-) diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ConsumptionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ConsumptionTask.java index 09fd52ddf8..6c8ce12fc7 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ConsumptionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ConsumptionTask.java @@ -2,7 +2,6 @@ import com.linkedin.davinci.ingestion.consumption.ConsumedDataReceiver; import com.linkedin.davinci.stats.AggKafkaConsumerServiceStats; -import com.linkedin.venice.common.VeniceSystemStoreUtils; import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope; import com.linkedin.venice.message.KafkaKey; import com.linkedin.venice.meta.Version; @@ -123,7 +122,6 @@ public void run() { polledPubSubMessages = pollFunction.get(); lastSuccessfulPollTimestamp = System.currentTimeMillis(); aggStats.recordTotalPollRequestLatency(lastSuccessfulPollTimestamp - beforePollingTimeStamp); - aggStats.recordTotalPollResultNum(polledPubSubMessagesCount); if (!polledPubSubMessages.isEmpty()) { payloadBytesConsumedInOnePoll = 0; polledPubSubMessagesCount = 0; @@ -158,13 +156,9 @@ public void run() { aggStats.recordTotalConsumerRecordsProducingToWriterBufferLatency( LatencyUtils.getElapsedTimeInMs(beforeProducingToWriteBufferTimestamp)); aggStats.recordTotalNonZeroPollResultNum(polledPubSubMessagesCount); - aggStats.recordTotalBytesPerPoll(payloadBytesConsumedInOnePoll); storePollCounterMap.forEach((storeName, counter) -> { - // do not emit stats for system stores. - if (!VeniceSystemStoreUtils.isSystemStore(storeName)) { - aggStats.getStoreStats(storeName).recordPollResultNum(counter.msgCount); - aggStats.getStoreStats(storeName).recordByteSizePerPoll(counter.byteSize); - } + aggStats.getStoreStats(storeName).recordPollResultNum(counter.msgCount); + aggStats.getStoreStats(storeName).recordByteSizePerPoll(counter.byteSize); }); bandwidthThrottler.accept(payloadBytesConsumedInOnePoll); recordsThrottler.accept(polledPubSubMessagesCount); diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/AggKafkaConsumerServiceStats.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/AggKafkaConsumerServiceStats.java index 3fac4c8393..02b881ab07 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/AggKafkaConsumerServiceStats.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/AggKafkaConsumerServiceStats.java @@ -28,10 +28,6 @@ public AggKafkaConsumerServiceStats( isUnregisterMetricForDeletedStoreEnabled); } - public void recordTotalBytesPerPoll(double count) { - totalStats.recordByteSizePerPoll(count); - } - public void recordTotalConsumerIdleTime(double idleTime) { totalStats.recordConsumerIdleTime(idleTime); } @@ -40,10 +36,6 @@ public void recordTotalPollRequestLatency(double latency) { totalStats.recordPollRequestLatency(latency); } - public void recordTotalPollResultNum(int count) { - totalStats.recordPollResultNum(count); - } - public void recordTotalNonZeroPollResultNum(int count) { totalStats.recordNonZeroPollResultNum(count); } diff --git a/internal/venice-client-common/src/test/java/com/linkedin/venice/stats/AbstractVeniceStatsTest.java b/internal/venice-client-common/src/test/java/com/linkedin/venice/stats/AbstractVeniceStatsTest.java index 11dc74a83c..5b75b72271 100644 --- a/internal/venice-client-common/src/test/java/com/linkedin/venice/stats/AbstractVeniceStatsTest.java +++ b/internal/venice-client-common/src/test/java/com/linkedin/venice/stats/AbstractVeniceStatsTest.java @@ -12,6 +12,7 @@ import com.linkedin.venice.client.stats.ClientStats; import com.linkedin.venice.client.store.ClientConfig; import com.linkedin.venice.read.RequestType; +import com.linkedin.venice.utils.SystemTime; import io.tehuti.Metric; import io.tehuti.metrics.MeasurableStat; import io.tehuti.metrics.MetricConfig; @@ -174,4 +175,55 @@ public void testParentStats() { assertTrue(childOccurrenceRate2.measure(metricConfig, now) > 0.0); assertTrue(parentOccurrenceRate.measure(metricConfig, now) > 0.0); } + + @Test + public void testRegisterPerStoreAndTotalSensor() { + MetricsRepository metricsRepository = new MetricsRepository(); + MetricConfig metricConfig = new MetricConfig(); + + AbstractVeniceStats stats = new AbstractVeniceStats(metricsRepository, "testStore"); + AbstractVeniceStats totalStats = new AbstractVeniceStats(metricsRepository, "total"); + Gauge parentCount = new Gauge(), childCount1 = new Gauge(), childCount2 = new Gauge(); + Sensor parent = totalStats.registerSensor("parent", parentCount); + // 1) total stats is not null, the parent is the total stats + // Being a parent means, when the sensor is recorded, the parent sensor will also be recorded + Sensor sensor = stats.registerPerStoreAndTotalSensor("testSensor1", totalStats, () -> parent, childCount1); + sensor.record(10.0); + long now = System.currentTimeMillis(); + + // verify both store-level sensor and total-stat sensor are recorded + double total_value = parentCount.measure(metricConfig, now); + double value = childCount1.measure(metricConfig, now); + Assert.assertEquals(total_value, 10.0); + Assert.assertEquals(value, 10.0); + + // 2) total stats is null, the parent is also null + Sensor another = stats.registerPerStoreAndTotalSensor("testSensor2", null, () -> null, childCount2); + // let another sensor records a different value to verify the sensor is recorded and total stats is not recorded + another.record(20.0); + now = System.currentTimeMillis(); + total_value = parentCount.measure(metricConfig, now); + value = childCount1.measure(metricConfig, now); + double value2 = childCount2.measure(metricConfig, now); + Assert.assertEquals(total_value, 10.0); + Assert.assertEquals(value, 10.0); + Assert.assertEquals(value2, 20.0); + } + + @Test + public void testRegisterOnlyTotalRate() { + MetricsRepository metricsRepository = new MetricsRepository(); + + AbstractVeniceStats stats = new AbstractVeniceStats(metricsRepository, "testStore"); + AbstractVeniceStats totalStats = new AbstractVeniceStats(metricsRepository, "total"); + LongAdderRateGauge parentCount = new LongAdderRateGauge(); + // 1) total stats is not null so use ths supplier + LongAdderRateGauge sensor = + stats.registerOnlyTotalRate("testSensor", totalStats, () -> parentCount, SystemTime.INSTANCE); + Assert.assertEquals(sensor, parentCount); + + // 2) total stats is null, so created a new one + sensor = stats.registerOnlyTotalRate("testSensor", null, () -> parentCount, SystemTime.INSTANCE); + Assert.assertNotEquals(sensor, parentCount); + } }