Skip to content

Commit

Permalink
emit metrics for sys store since it reflects the correct total
Browse files Browse the repository at this point in the history
  • Loading branch information
adamxchen committed Jan 25, 2024
1 parent 5d009e8 commit ad8bd7d
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import com.linkedin.davinci.ingestion.consumption.ConsumedDataReceiver;
import com.linkedin.davinci.stats.AggKafkaConsumerServiceStats;
import com.linkedin.venice.common.VeniceSystemStoreUtils;
import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope;
import com.linkedin.venice.message.KafkaKey;
import com.linkedin.venice.meta.Version;
Expand Down Expand Up @@ -123,7 +122,6 @@ public void run() {
polledPubSubMessages = pollFunction.get();
lastSuccessfulPollTimestamp = System.currentTimeMillis();
aggStats.recordTotalPollRequestLatency(lastSuccessfulPollTimestamp - beforePollingTimeStamp);
aggStats.recordTotalPollResultNum(polledPubSubMessagesCount);
if (!polledPubSubMessages.isEmpty()) {
payloadBytesConsumedInOnePoll = 0;
polledPubSubMessagesCount = 0;
Expand Down Expand Up @@ -158,13 +156,9 @@ public void run() {
aggStats.recordTotalConsumerRecordsProducingToWriterBufferLatency(
LatencyUtils.getElapsedTimeInMs(beforeProducingToWriteBufferTimestamp));
aggStats.recordTotalNonZeroPollResultNum(polledPubSubMessagesCount);
aggStats.recordTotalBytesPerPoll(payloadBytesConsumedInOnePoll);
storePollCounterMap.forEach((storeName, counter) -> {
// do not emit stats for system stores.
if (!VeniceSystemStoreUtils.isSystemStore(storeName)) {
aggStats.getStoreStats(storeName).recordPollResultNum(counter.msgCount);
aggStats.getStoreStats(storeName).recordByteSizePerPoll(counter.byteSize);
}
aggStats.getStoreStats(storeName).recordPollResultNum(counter.msgCount);
aggStats.getStoreStats(storeName).recordByteSizePerPoll(counter.byteSize);
});
bandwidthThrottler.accept(payloadBytesConsumedInOnePoll);
recordsThrottler.accept(polledPubSubMessagesCount);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,6 @@ public AggKafkaConsumerServiceStats(
isUnregisterMetricForDeletedStoreEnabled);
}

public void recordTotalBytesPerPoll(double count) {
totalStats.recordByteSizePerPoll(count);
}

public void recordTotalConsumerIdleTime(double idleTime) {
totalStats.recordConsumerIdleTime(idleTime);
}
Expand All @@ -40,10 +36,6 @@ public void recordTotalPollRequestLatency(double latency) {
totalStats.recordPollRequestLatency(latency);
}

public void recordTotalPollResultNum(int count) {
totalStats.recordPollResultNum(count);
}

public void recordTotalNonZeroPollResultNum(int count) {
totalStats.recordNonZeroPollResultNum(count);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.linkedin.venice.client.stats.ClientStats;
import com.linkedin.venice.client.store.ClientConfig;
import com.linkedin.venice.read.RequestType;
import com.linkedin.venice.utils.SystemTime;
import io.tehuti.Metric;
import io.tehuti.metrics.MeasurableStat;
import io.tehuti.metrics.MetricConfig;
Expand Down Expand Up @@ -174,4 +175,55 @@ public void testParentStats() {
assertTrue(childOccurrenceRate2.measure(metricConfig, now) > 0.0);
assertTrue(parentOccurrenceRate.measure(metricConfig, now) > 0.0);
}

@Test
public void testRegisterPerStoreAndTotalSensor() {
MetricsRepository metricsRepository = new MetricsRepository();
MetricConfig metricConfig = new MetricConfig();

AbstractVeniceStats stats = new AbstractVeniceStats(metricsRepository, "testStore");
AbstractVeniceStats totalStats = new AbstractVeniceStats(metricsRepository, "total");
Gauge parentCount = new Gauge(), childCount1 = new Gauge(), childCount2 = new Gauge();
Sensor parent = totalStats.registerSensor("parent", parentCount);
// 1) total stats is not null, the parent is the total stats
// Being a parent means, when the sensor is recorded, the parent sensor will also be recorded
Sensor sensor = stats.registerPerStoreAndTotalSensor("testSensor1", totalStats, () -> parent, childCount1);
sensor.record(10.0);
long now = System.currentTimeMillis();

// verify both store-level sensor and total-stat sensor are recorded
double total_value = parentCount.measure(metricConfig, now);
double value = childCount1.measure(metricConfig, now);
Assert.assertEquals(total_value, 10.0);
Assert.assertEquals(value, 10.0);

// 2) total stats is null, the parent is also null
Sensor another = stats.registerPerStoreAndTotalSensor("testSensor2", null, () -> null, childCount2);
// let another sensor records a different value to verify the sensor is recorded and total stats is not recorded
another.record(20.0);
now = System.currentTimeMillis();
total_value = parentCount.measure(metricConfig, now);
value = childCount1.measure(metricConfig, now);
double value2 = childCount2.measure(metricConfig, now);
Assert.assertEquals(total_value, 10.0);
Assert.assertEquals(value, 10.0);
Assert.assertEquals(value2, 20.0);
}

@Test
public void testRegisterOnlyTotalRate() {
MetricsRepository metricsRepository = new MetricsRepository();

AbstractVeniceStats stats = new AbstractVeniceStats(metricsRepository, "testStore");
AbstractVeniceStats totalStats = new AbstractVeniceStats(metricsRepository, "total");
LongAdderRateGauge parentCount = new LongAdderRateGauge();
// 1) total stats is not null so use ths supplier
LongAdderRateGauge sensor =
stats.registerOnlyTotalRate("testSensor", totalStats, () -> parentCount, SystemTime.INSTANCE);
Assert.assertEquals(sensor, parentCount);

// 2) total stats is null, so created a new one
sensor = stats.registerOnlyTotalRate("testSensor", null, () -> parentCount, SystemTime.INSTANCE);
Assert.assertNotEquals(sensor, parentCount);
}
}

0 comments on commit ad8bd7d

Please sign in to comment.