From 11aa6f89a61f14ec7ccbc96e2b5de9629c03c183 Mon Sep 17 00:00:00 2001 From: Sourav Maji Date: Sun, 5 Nov 2023 20:08:31 -0800 Subject: [PATCH 1/2] added stat --- .../venice/controller/HelixVeniceClusterResources.java | 3 ++- .../venice/controller/stats/ErrorPartitionStats.java | 6 ++++++ .../linkedin/venice/pushmonitor/AbstractPushMonitor.java | 9 ++++++++- .../pushmonitor/PartitionStatusBasedPushMonitor.java | 7 +++++-- .../venice/pushmonitor/PushMonitorDelegator.java | 7 +++++-- .../pushmonitor/PartitionStatusBasedPushMonitorTest.java | 5 ++++- 6 files changed, 30 insertions(+), 7 deletions(-) diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/HelixVeniceClusterResources.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/HelixVeniceClusterResources.java index f856c2ad75..bda7689993 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/HelixVeniceClusterResources.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/HelixVeniceClusterResources.java @@ -166,7 +166,8 @@ public HelixVeniceClusterResources( getActiveActiveRealTimeSourceKafkaURLs(config), helixAdminClient, config, - admin.getPushStatusStoreReader().orElse(null)); + admin.getPushStatusStoreReader().orElse(null), + metricsRepository); this.leakedPushStatusCleanUpService = new LeakedPushStatusCleanUpService( clusterName, diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/stats/ErrorPartitionStats.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/stats/ErrorPartitionStats.java index f2dfcd50c4..fe052f641b 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/stats/ErrorPartitionStats.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/stats/ErrorPartitionStats.java @@ -15,6 +15,7 @@ public class ErrorPartitionStats extends AbstractVeniceStats { private final Sensor currentVersionErrorPartitionRecoveredFromReset; private final Sensor currentVersionErrorPartitionUnrecoverableFromReset; private final Sensor errorPartitionProcessingTime; + private final Sensor disabledPartitionCount; public ErrorPartitionStats(MetricsRepository metricsRepository, String name) { super(metricsRepository, name); @@ -28,6 +29,7 @@ public ErrorPartitionStats(MetricsRepository metricsRepository, String name) { currentVersionErrorPartitionUnrecoverableFromReset = registerSensorIfAbsent("current_version_error_partition_unrecoverable_from_reset", new Total()); errorPartitionProcessingTime = registerSensorIfAbsent("error_partition_processing_time", new Avg(), new Max()); + disabledPartitionCount = registerSensorIfAbsent("disabled_partition_count", new Count()); } public void recordErrorPartitionResetAttempt(double value) { @@ -42,6 +44,10 @@ public void recordErrorPartitionRecoveredFromReset() { currentVersionErrorPartitionRecoveredFromReset.record(); } + public void recordDisabledPartition() { + disabledPartitionCount.record(); + } + public void recordErrorPartitionUnrecoverableFromReset() { currentVersionErrorPartitionUnrecoverableFromReset.record(); } diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/AbstractPushMonitor.java b/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/AbstractPushMonitor.java index 4fa04737ed..ceb1a5a324 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/AbstractPushMonitor.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/AbstractPushMonitor.java @@ -10,6 +10,7 @@ import com.linkedin.venice.controller.HelixAdminClient; import com.linkedin.venice.controller.VeniceControllerConfig; +import com.linkedin.venice.controller.stats.ErrorPartitionStats; import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.exceptions.VeniceNoStoreException; import com.linkedin.venice.helix.HelixCustomizedViewOfflinePushRepository; @@ -33,6 +34,7 @@ import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap; import com.linkedin.venice.utils.locks.AutoCloseableLock; import com.linkedin.venice.utils.locks.ClusterLockManager; +import io.tehuti.metrics.MetricsRepository; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -82,6 +84,7 @@ public abstract class AbstractPushMonitor private final long offlineJobResourceAssignmentWaitTimeInMilliseconds; private final PushStatusCollector pushStatusCollector; + private final ErrorPartitionStats errorPartitionStats; private final boolean isOfflinePushMonitorDaVinciPushStatusEnabled; @@ -98,7 +101,8 @@ public AbstractPushMonitor( List activeActiveRealTimeSourceKafkaURLs, HelixAdminClient helixAdminClient, VeniceControllerConfig controllerConfig, - PushStatusStoreReader pushStatusStoreReader) { + PushStatusStoreReader pushStatusStoreReader, + MetricsRepository metricsRepository) { this.clusterName = clusterName; this.offlinePushAccessor = offlinePushAccessor; this.storeCleaner = storeCleaner; @@ -110,6 +114,8 @@ public AbstractPushMonitor( this.aggregateRealTimeSourceKafkaUrl = aggregateRealTimeSourceKafkaUrl; this.activeActiveRealTimeSourceKafkaURLs = activeActiveRealTimeSourceKafkaURLs; this.helixAdminClient = helixAdminClient; + this.errorPartitionStats = new ErrorPartitionStats(metricsRepository, clusterName); + this.disableErrorLeaderReplica = controllerConfig.isErrorLeaderReplicaFailOverEnabled(); this.helixClientThrottler = new EventThrottler(10, "push_monitor_helix_client_throttler", false, EventThrottler.BLOCK_STRATEGY); @@ -755,6 +761,7 @@ public void disableReplica(String instance, int partitionId) { kafkaTopic, Collections.singletonList(HelixUtils.getPartitionName(kafkaTopic, partitionId))); disabledReplicaMap.computeIfAbsent(instance, k -> new HashSet<>()).add(partitionId); + errorPartitionStats.recordDisabledPartition(); } @Override diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/PartitionStatusBasedPushMonitor.java b/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/PartitionStatusBasedPushMonitor.java index 16ebe946bb..1b5fc346a1 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/PartitionStatusBasedPushMonitor.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/PartitionStatusBasedPushMonitor.java @@ -10,6 +10,7 @@ import com.linkedin.venice.meta.StoreCleaner; import com.linkedin.venice.pushstatushelper.PushStatusStoreReader; import com.linkedin.venice.utils.locks.ClusterLockManager; +import io.tehuti.metrics.MetricsRepository; import java.util.List; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -36,7 +37,8 @@ public PartitionStatusBasedPushMonitor( List childDataCenterKafkaUrls, HelixAdminClient helixAdminClient, VeniceControllerConfig controllerConfig, - PushStatusStoreReader pushStatusStoreReader) { + PushStatusStoreReader pushStatusStoreReader, + MetricsRepository metricsRepository) { super( clusterName, offlinePushAccessor, @@ -50,7 +52,8 @@ public PartitionStatusBasedPushMonitor( childDataCenterKafkaUrls, helixAdminClient, controllerConfig, - pushStatusStoreReader); + pushStatusStoreReader, + metricsRepository); } @Override diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/PushMonitorDelegator.java b/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/PushMonitorDelegator.java index 4139e5495c..72e4b05fe6 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/PushMonitorDelegator.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/PushMonitorDelegator.java @@ -18,6 +18,7 @@ import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap; import com.linkedin.venice.utils.locks.AutoCloseableLock; import com.linkedin.venice.utils.locks.ClusterLockManager; +import io.tehuti.metrics.MetricsRepository; import java.util.List; import java.util.Map; import java.util.Optional; @@ -55,7 +56,8 @@ public PushMonitorDelegator( List activeActiveRealTimeSourceKafkaURLs, HelixAdminClient helixAdminClient, VeniceControllerConfig controllerConfig, - PushStatusStoreReader pushStatusStoreReader) { + PushStatusStoreReader pushStatusStoreReader, + MetricsRepository metricsRepository) { this.clusterName = clusterName; this.metadataRepository = metadataRepository; @@ -72,7 +74,8 @@ public PushMonitorDelegator( activeActiveRealTimeSourceKafkaURLs, helixAdminClient, controllerConfig, - pushStatusStoreReader); + pushStatusStoreReader, + metricsRepository); this.clusterLockManager = clusterLockManager; this.topicToPushMonitorMap = new VeniceConcurrentHashMap<>(); diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/pushmonitor/PartitionStatusBasedPushMonitorTest.java b/services/venice-controller/src/test/java/com/linkedin/venice/pushmonitor/PartitionStatusBasedPushMonitorTest.java index 9b46f34d83..b6e8512680 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/pushmonitor/PartitionStatusBasedPushMonitorTest.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/pushmonitor/PartitionStatusBasedPushMonitorTest.java @@ -30,6 +30,7 @@ import com.linkedin.venice.utils.HelixUtils; import com.linkedin.venice.utils.TestUtils; import com.linkedin.venice.utils.Time; +import io.tehuti.metrics.MetricsRepository; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -61,6 +62,7 @@ protected AbstractPushMonitor getPushMonitor(StoreCleaner storeCleaner) { Collections.emptyList(), helixAdminClient, getMockControllerConfig(), + null, null); } @@ -79,7 +81,8 @@ protected AbstractPushMonitor getPushMonitor(RealTimeTopicSwitcher mockRealTimeT Collections.emptyList(), mock(HelixAdminClient.class), getMockControllerConfig(), - null); + null, + mock(MetricsRepository.class)); } @Test From 283cd0f77a8b2b154194153cbb08d99c1500e9d8 Mon Sep 17 00:00:00 2001 From: Sourav Maji Date: Mon, 6 Nov 2023 10:24:30 -0800 Subject: [PATCH 2/2] disabled replica metric --- .../stats/DisabledPartitionStats.java | 21 +++++++++++++++++++ .../controller/stats/ErrorPartitionStats.java | 6 ------ .../pushmonitor/AbstractPushMonitor.java | 8 +++---- .../pushmonitor/AbstractPushMonitorTest.java | 6 ++++++ .../PartitionStatusBasedPushMonitorTest.java | 5 ++--- 5 files changed, 33 insertions(+), 13 deletions(-) create mode 100644 services/venice-controller/src/main/java/com/linkedin/venice/controller/stats/DisabledPartitionStats.java diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/stats/DisabledPartitionStats.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/stats/DisabledPartitionStats.java new file mode 100644 index 0000000000..9bf9ac0c33 --- /dev/null +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/stats/DisabledPartitionStats.java @@ -0,0 +1,21 @@ +package com.linkedin.venice.controller.stats; + +import com.linkedin.venice.stats.AbstractVeniceStats; +import io.tehuti.metrics.MetricsRepository; +import io.tehuti.metrics.Sensor; +import io.tehuti.metrics.stats.Count; + + +public class DisabledPartitionStats extends AbstractVeniceStats { + private final Sensor disabledPartitionCount; + + public DisabledPartitionStats(MetricsRepository metricsRepository, String name) { + super(metricsRepository, name); + disabledPartitionCount = registerSensorIfAbsent("disabled_partition_count", new Count()); + } + + public void recordDisabledPartition() { + disabledPartitionCount.record(); + } + +} diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/stats/ErrorPartitionStats.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/stats/ErrorPartitionStats.java index fe052f641b..f2dfcd50c4 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/stats/ErrorPartitionStats.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/stats/ErrorPartitionStats.java @@ -15,7 +15,6 @@ public class ErrorPartitionStats extends AbstractVeniceStats { private final Sensor currentVersionErrorPartitionRecoveredFromReset; private final Sensor currentVersionErrorPartitionUnrecoverableFromReset; private final Sensor errorPartitionProcessingTime; - private final Sensor disabledPartitionCount; public ErrorPartitionStats(MetricsRepository metricsRepository, String name) { super(metricsRepository, name); @@ -29,7 +28,6 @@ public ErrorPartitionStats(MetricsRepository metricsRepository, String name) { currentVersionErrorPartitionUnrecoverableFromReset = registerSensorIfAbsent("current_version_error_partition_unrecoverable_from_reset", new Total()); errorPartitionProcessingTime = registerSensorIfAbsent("error_partition_processing_time", new Avg(), new Max()); - disabledPartitionCount = registerSensorIfAbsent("disabled_partition_count", new Count()); } public void recordErrorPartitionResetAttempt(double value) { @@ -44,10 +42,6 @@ public void recordErrorPartitionRecoveredFromReset() { currentVersionErrorPartitionRecoveredFromReset.record(); } - public void recordDisabledPartition() { - disabledPartitionCount.record(); - } - public void recordErrorPartitionUnrecoverableFromReset() { currentVersionErrorPartitionUnrecoverableFromReset.record(); } diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/AbstractPushMonitor.java b/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/AbstractPushMonitor.java index ceb1a5a324..fbad2e1a97 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/AbstractPushMonitor.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/AbstractPushMonitor.java @@ -10,7 +10,7 @@ import com.linkedin.venice.controller.HelixAdminClient; import com.linkedin.venice.controller.VeniceControllerConfig; -import com.linkedin.venice.controller.stats.ErrorPartitionStats; +import com.linkedin.venice.controller.stats.DisabledPartitionStats; import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.exceptions.VeniceNoStoreException; import com.linkedin.venice.helix.HelixCustomizedViewOfflinePushRepository; @@ -84,7 +84,7 @@ public abstract class AbstractPushMonitor private final long offlineJobResourceAssignmentWaitTimeInMilliseconds; private final PushStatusCollector pushStatusCollector; - private final ErrorPartitionStats errorPartitionStats; + private final DisabledPartitionStats disabledPartitionStats; private final boolean isOfflinePushMonitorDaVinciPushStatusEnabled; @@ -114,7 +114,7 @@ public AbstractPushMonitor( this.aggregateRealTimeSourceKafkaUrl = aggregateRealTimeSourceKafkaUrl; this.activeActiveRealTimeSourceKafkaURLs = activeActiveRealTimeSourceKafkaURLs; this.helixAdminClient = helixAdminClient; - this.errorPartitionStats = new ErrorPartitionStats(metricsRepository, clusterName); + this.disabledPartitionStats = new DisabledPartitionStats(metricsRepository, clusterName); this.disableErrorLeaderReplica = controllerConfig.isErrorLeaderReplicaFailOverEnabled(); this.helixClientThrottler = @@ -761,7 +761,7 @@ public void disableReplica(String instance, int partitionId) { kafkaTopic, Collections.singletonList(HelixUtils.getPartitionName(kafkaTopic, partitionId))); disabledReplicaMap.computeIfAbsent(instance, k -> new HashSet<>()).add(partitionId); - errorPartitionStats.recordDisabledPartition(); + disabledPartitionStats.recordDisabledPartition(); } @Override diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/pushmonitor/AbstractPushMonitorTest.java b/services/venice-controller/src/test/java/com/linkedin/venice/pushmonitor/AbstractPushMonitorTest.java index 1f6552a5e9..7a146011e6 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/pushmonitor/AbstractPushMonitorTest.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/pushmonitor/AbstractPushMonitorTest.java @@ -43,6 +43,8 @@ import com.linkedin.venice.utils.Utils; import com.linkedin.venice.utils.locks.AutoCloseableLock; import com.linkedin.venice.utils.locks.ClusterLockManager; +import io.tehuti.metrics.MetricsRepository; +import io.tehuti.metrics.Sensor; import java.util.ArrayList; import java.util.Collections; import java.util.EnumMap; @@ -70,6 +72,8 @@ public abstract class AbstractPushMonitorTest { protected VeniceControllerConfig mockControllerConfig; + protected MetricsRepository mockMetricRepo; + private final static String clusterName = Utils.getUniqueString("test_cluster"); private final static String aggregateRealTimeSourceKafkaUrl = "aggregate-real-time-source-kafka-url"; private String storeName; @@ -94,10 +98,12 @@ public void setUp() { mockAccessor = mock(OfflinePushAccessor.class); mockStoreCleaner = mock(StoreCleaner.class); mockStoreRepo = mock(ReadWriteStoreRepository.class); + mockMetricRepo = mock(MetricsRepository.class); mockRoutingDataRepo = mock(RoutingDataRepository.class); mockPushHealthStats = mock(AggPushHealthStats.class); clusterLockManager = new ClusterLockManager(clusterName); mockControllerConfig = mock(VeniceControllerConfig.class); + when(mockMetricRepo.sensor(anyString(), any())).thenReturn(mock(Sensor.class)); when(mockControllerConfig.isErrorLeaderReplicaFailOverEnabled()).thenReturn(true); when(mockControllerConfig.isDaVinciPushStatusEnabled()).thenReturn(true); when(mockControllerConfig.getDaVinciPushStatusScanIntervalInSeconds()).thenReturn(5); diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/pushmonitor/PartitionStatusBasedPushMonitorTest.java b/services/venice-controller/src/test/java/com/linkedin/venice/pushmonitor/PartitionStatusBasedPushMonitorTest.java index b6e8512680..4ac7907483 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/pushmonitor/PartitionStatusBasedPushMonitorTest.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/pushmonitor/PartitionStatusBasedPushMonitorTest.java @@ -30,7 +30,6 @@ import com.linkedin.venice.utils.HelixUtils; import com.linkedin.venice.utils.TestUtils; import com.linkedin.venice.utils.Time; -import io.tehuti.metrics.MetricsRepository; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -63,7 +62,7 @@ protected AbstractPushMonitor getPushMonitor(StoreCleaner storeCleaner) { helixAdminClient, getMockControllerConfig(), null, - null); + mockMetricRepo); } @Override @@ -82,7 +81,7 @@ protected AbstractPushMonitor getPushMonitor(RealTimeTopicSwitcher mockRealTimeT mock(HelixAdminClient.class), getMockControllerConfig(), null, - mock(MetricsRepository.class)); + mockMetricRepo); } @Test