diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java b/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java index 81f3284a75..16f95da045 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java @@ -1707,8 +1707,13 @@ private ConfigKeys() { public static final String DAVINCI_PUSH_STATUS_SCAN_NO_REPORT_RETRY_MAX_ATTEMPTS = "davinci.push.status.scan.no.report.retry.max.attempts"; - public static final String DAVINCI_PUSH_STATUS_SCAN_MAX_OFFLINE_INSTANCE = - "davinci.push.status.scan.max.offline.instance"; + // Config to control how many DVC replica instances are allowed to be offline before failing VPJ push. + public static final String DAVINCI_PUSH_STATUS_SCAN_MAX_OFFLINE_INSTANCE_COUNT = + "davinci.push.status.scan.max.offline.instance.count"; + + // Config to control how much percentage of DVC replica instances are allowed to be offline before failing VPJ push. + public static final String DAVINCI_PUSH_STATUS_SCAN_MAX_OFFLINE_INSTANCE_RATIO = + "davinci.push.status.scan.max.offline.instance.ratio"; public static final String CONTROLLER_ZK_SHARED_DAVINCI_PUSH_STATUS_SYSTEM_SCHEMA_STORE_AUTO_CREATION_ENABLED = "controller.zk.shared.davinci.push.status.system.schema.store.auto.creation.enabled"; diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerConfig.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerConfig.java index 127f480dfd..2dd3d08fca 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerConfig.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerConfig.java @@ -57,7 +57,8 @@ import static com.linkedin.venice.ConfigKeys.CONTROLLER_ZK_SHARED_META_SYSTEM_SCHEMA_STORE_AUTO_CREATION_ENABLED; import static com.linkedin.venice.ConfigKeys.DAVINCI_PUSH_STATUS_SCAN_ENABLED; import static com.linkedin.venice.ConfigKeys.DAVINCI_PUSH_STATUS_SCAN_INTERVAL_IN_SECONDS; -import static com.linkedin.venice.ConfigKeys.DAVINCI_PUSH_STATUS_SCAN_MAX_OFFLINE_INSTANCE; +import static com.linkedin.venice.ConfigKeys.DAVINCI_PUSH_STATUS_SCAN_MAX_OFFLINE_INSTANCE_COUNT; +import static com.linkedin.venice.ConfigKeys.DAVINCI_PUSH_STATUS_SCAN_MAX_OFFLINE_INSTANCE_RATIO; import static com.linkedin.venice.ConfigKeys.DAVINCI_PUSH_STATUS_SCAN_NO_REPORT_RETRY_MAX_ATTEMPTS; import static com.linkedin.venice.ConfigKeys.DAVINCI_PUSH_STATUS_SCAN_THREAD_NUMBER; import static com.linkedin.venice.ConfigKeys.DEPRECATED_TOPIC_MAX_RETENTION_MS; @@ -229,7 +230,9 @@ public class VeniceControllerConfig extends VeniceControllerClusterConfig { private final int daVinciPushStatusScanNoReportRetryMaxAttempt; - private final int daVinciPushStatusScanMaxOfflineInstance; + private final int daVinciPushStatusScanMaxOfflineInstanceCount; + + private final double daVinciPushStatusScanMaxOfflineInstanceRatio; private final boolean zkSharedDaVinciPushStatusSystemSchemaStoreAutoCreationEnabled; @@ -486,7 +489,10 @@ public VeniceControllerConfig(VeniceProperties props) { this.daVinciPushStatusScanThreadNumber = props.getInt(DAVINCI_PUSH_STATUS_SCAN_THREAD_NUMBER, 4); this.daVinciPushStatusScanNoReportRetryMaxAttempt = props.getInt(DAVINCI_PUSH_STATUS_SCAN_NO_REPORT_RETRY_MAX_ATTEMPTS, 6); - this.daVinciPushStatusScanMaxOfflineInstance = props.getInt(DAVINCI_PUSH_STATUS_SCAN_MAX_OFFLINE_INSTANCE, 10); + this.daVinciPushStatusScanMaxOfflineInstanceCount = + props.getInt(DAVINCI_PUSH_STATUS_SCAN_MAX_OFFLINE_INSTANCE_COUNT, 10); + this.daVinciPushStatusScanMaxOfflineInstanceRatio = + props.getDouble(DAVINCI_PUSH_STATUS_SCAN_MAX_OFFLINE_INSTANCE_RATIO, 0.1d); this.zkSharedDaVinciPushStatusSystemSchemaStoreAutoCreationEnabled = props.getBoolean(CONTROLLER_ZK_SHARED_DAVINCI_PUSH_STATUS_SYSTEM_SCHEMA_STORE_AUTO_CREATION_ENABLED, true); @@ -639,8 +645,12 @@ public long getDisabledReplicaEnablerServiceIntervalMs() { return disabledReplicaEnablerServiceIntervalMs; } - public int getDaVinciPushStatusScanMaxOfflineInstance() { - return daVinciPushStatusScanMaxOfflineInstance; + public int getDaVinciPushStatusScanMaxOfflineInstanceCount() { + return daVinciPushStatusScanMaxOfflineInstanceCount; + } + + public double getDaVinciPushStatusScanMaxOfflineInstanceRatio() { + return daVinciPushStatusScanMaxOfflineInstanceRatio; } public int getTopicCleanupDelayFactor() { diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java index c5f4b7b386..2cbd45a7b3 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java @@ -5621,7 +5621,8 @@ private OfflinePushStatusInfo getOfflinePushStatusInfo( version.kafkaTopicName(), version.getPartitionCount(), incrementalPushVersion, - multiClusterConfigs.getControllerConfig(clusterName).getDaVinciPushStatusScanMaxOfflineInstance()); + multiClusterConfigs.getControllerConfig(clusterName).getDaVinciPushStatusScanMaxOfflineInstanceCount(), + multiClusterConfigs.getControllerConfig(clusterName).getDaVinciPushStatusScanMaxOfflineInstanceRatio()); ExecutionStatus daVinciStatus = daVinciStatusAndDetails.getStatus(); String daVinciDetails = daVinciStatusAndDetails.getDetails(); executionStatus = getOverallPushStatus(executionStatus, daVinciStatus); diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/AbstractPushMonitor.java b/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/AbstractPushMonitor.java index d92e96fe77..863d839935 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/AbstractPushMonitor.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/AbstractPushMonitor.java @@ -129,7 +129,8 @@ public AbstractPushMonitor( controllerConfig.getDaVinciPushStatusScanIntervalInSeconds(), controllerConfig.getDaVinciPushStatusScanThreadNumber(), controllerConfig.getDaVinciPushStatusScanNoReportRetryMaxAttempt(), - controllerConfig.getDaVinciPushStatusScanMaxOfflineInstance()); + controllerConfig.getDaVinciPushStatusScanMaxOfflineInstanceCount(), + controllerConfig.getDaVinciPushStatusScanMaxOfflineInstanceRatio()); this.isOfflinePushMonitorDaVinciPushStatusEnabled = controllerConfig.isDaVinciPushStatusEnabled(); pushStatusCollector.start(); } diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/PushMonitorUtils.java b/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/PushMonitorUtils.java index 1dcc782b96..7d40c60b9a 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/PushMonitorUtils.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/PushMonitorUtils.java @@ -32,7 +32,8 @@ public static ExecutionStatusWithDetails getDaVinciPushStatusAndDetails( String topicName, int partitionCount, Optional incrementalPushVersion, - int maxOfflineInstance) { + int maxOfflineInstanceCount, + double maxOfflineInstanceRatio) { if (reader == null) { throw new VeniceException("PushStatusStoreReader is null"); } @@ -93,7 +94,9 @@ public static ExecutionStatusWithDetails getDaVinciPushStatusAndDetails( boolean noDaVinciStatusReported = totalReplicaCount == 0; int offlineReplicaCount = totalReplicaCount - liveReplicaCount - completedReplicaCount; // Report error if too many Da Vinci instances are not alive for over 5 minutes. - if (offlineReplicaCount > maxOfflineInstance) { + int maxOfflineInstanceAllowed = + Math.min(maxOfflineInstanceCount, (int) (maxOfflineInstanceRatio * totalReplicaCount)); + if (offlineReplicaCount > maxOfflineInstanceAllowed) { Long lastUpdateTime = storeVersionToDVCDeadInstanceTimeMap.get(topicName); if (lastUpdateTime != null) { if (lastUpdateTime + TimeUnit.MINUTES.toMillis(daVinciErrorInstanceWaitTime) < System.currentTimeMillis()) { diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/PushStatusCollector.java b/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/PushStatusCollector.java index 0cf17b0255..9fb31009bd 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/PushStatusCollector.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/PushStatusCollector.java @@ -42,7 +42,8 @@ public class PushStatusCollector { private final int daVinciPushStatusScanThreadNumber; private final boolean daVinciPushStatusScanEnabled; private final int daVinciPushStatusNoReportRetryMaxAttempts; - private final int daVinciPushStatusScanMaxOfflineInstance; + private final int daVinciPushStatusScanMaxOfflineInstanceCount; + private final double daVinciPushStatusScanMaxOfflineInstanceRatio; private ScheduledExecutorService offlinePushCheckScheduler; private ExecutorService pushStatusStoreScanExecutor; private final AtomicBoolean isStarted = new AtomicBoolean(false); @@ -58,7 +59,8 @@ public PushStatusCollector( int daVinciPushStatusScanIntervalInSeconds, int daVinciPushStatusScanThreadNumber, int daVinciPushStatusNoReportRetryMaxAttempts, - int daVinciPushStatusScanMaxOfflineInstance) { + int daVinciPushStatusScanMaxOfflineInstanceCount, + double daVinciPushStatusScanMaxOfflineInstanceRatio) { this.storeRepository = storeRepository; this.pushStatusStoreReader = pushStatusStoreReader; this.pushCompletedHandler = pushCompletedHandler; @@ -67,7 +69,8 @@ public PushStatusCollector( this.daVinciPushStatusScanPeriodInSeconds = daVinciPushStatusScanIntervalInSeconds; this.daVinciPushStatusScanThreadNumber = daVinciPushStatusScanThreadNumber; this.daVinciPushStatusNoReportRetryMaxAttempts = daVinciPushStatusNoReportRetryMaxAttempts; - this.daVinciPushStatusScanMaxOfflineInstance = daVinciPushStatusScanMaxOfflineInstance; + this.daVinciPushStatusScanMaxOfflineInstanceCount = daVinciPushStatusScanMaxOfflineInstanceCount; + this.daVinciPushStatusScanMaxOfflineInstanceRatio = daVinciPushStatusScanMaxOfflineInstanceRatio; } public void start() { @@ -131,7 +134,8 @@ private void scanDaVinciPushStatus() { topicName, pushStatus.getPartitionCount(), Optional.empty(), - daVinciPushStatusScanMaxOfflineInstance); + daVinciPushStatusScanMaxOfflineInstanceCount, + daVinciPushStatusScanMaxOfflineInstanceRatio); pushStatus.setDaVinciStatus(statusWithDetails); return pushStatus; }, pushStatusStoreScanExecutor)); diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/pushmonitor/PushMonitorUtilsTest.java b/services/venice-controller/src/test/java/com/linkedin/venice/pushmonitor/PushMonitorUtilsTest.java index f0e9224dce..4f5a9d8e52 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/pushmonitor/PushMonitorUtilsTest.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/pushmonitor/PushMonitorUtilsTest.java @@ -25,16 +25,18 @@ public void testDaVinciPushStatusScan() { map.put("c", 3); map.put("d", 10); doReturn(map).when(reader).getPartitionStatus("store", 1, 0, Optional.empty()); + doReturn(map).when(reader).getPartitionStatus("store", 2, 0, Optional.empty()); doReturn(true).when(reader).isInstanceAlive(eq("store"), eq("a")); doReturn(false).when(reader).isInstanceAlive(eq("store"), eq("b")); doReturn(false).when(reader).isInstanceAlive(eq("store"), eq("c")); doReturn(false).when(reader).isInstanceAlive(eq("store"), eq("d")); + // Testing count-based threshold. /** * It is still valid, because we have 4 replicas, 1 completed, 2 offline, 1 online, threshold number is 2. */ ExecutionStatusWithDetails executionStatusWithDetails = - PushMonitorUtils.getDaVinciPushStatusAndDetails(reader, topicName, 1, Optional.empty(), 2); + PushMonitorUtils.getDaVinciPushStatusAndDetails(reader, topicName, 1, Optional.empty(), 2, 1.0); Assert.assertEquals(executionStatusWithDetails.getStatus(), ExecutionStatus.STARTED); /** @@ -43,7 +45,7 @@ public void testDaVinciPushStatusScan() { */ Utils.sleep(1); executionStatusWithDetails = - PushMonitorUtils.getDaVinciPushStatusAndDetails(reader, topicName, 1, Optional.empty(), 1); + PushMonitorUtils.getDaVinciPushStatusAndDetails(reader, topicName, 1, Optional.empty(), 1, 1.0); Assert.assertEquals(executionStatusWithDetails.getStatus(), ExecutionStatus.STARTED); /** @@ -52,7 +54,35 @@ public void testDaVinciPushStatusScan() { */ Utils.sleep(1); executionStatusWithDetails = - PushMonitorUtils.getDaVinciPushStatusAndDetails(reader, topicName, 1, Optional.empty(), 1); + PushMonitorUtils.getDaVinciPushStatusAndDetails(reader, topicName, 1, Optional.empty(), 1, 1.0); + Assert.assertEquals(executionStatusWithDetails.getStatus(), ExecutionStatus.ERROR); + Assert.assertEquals(executionStatusWithDetails.getDetails(), " Too many dead instances: 2, total instances: 4"); + + // Testing ratio-based threshold. + topicName = "store_v2"; + /** + * It is still valid, because we have 4 replicas, 1 completed, 2 offline, 1 online, threshold number is 4 * 0.5 = 2. + */ + executionStatusWithDetails = + PushMonitorUtils.getDaVinciPushStatusAndDetails(reader, topicName, 1, Optional.empty(), 100, 0.5); + Assert.assertEquals(executionStatusWithDetails.getStatus(), ExecutionStatus.STARTED); + + /** + * The offline instances number exceed the max offline threshold count, but it will remain STARTED, as we need to wait + * until daVinciErrorInstanceWaitTime has passed since it first occurs. + */ + Utils.sleep(1); + executionStatusWithDetails = + PushMonitorUtils.getDaVinciPushStatusAndDetails(reader, topicName, 1, Optional.empty(), 100, 0.25); + Assert.assertEquals(executionStatusWithDetails.getStatus(), ExecutionStatus.STARTED); + + /** + * This time it should fail, as we override the wait time to 0, and after 1ms, the 2nd query should meet the failure + * condition check. + */ + Utils.sleep(1); + executionStatusWithDetails = + PushMonitorUtils.getDaVinciPushStatusAndDetails(reader, topicName, 1, Optional.empty(), 100, 0.25); Assert.assertEquals(executionStatusWithDetails.getStatus(), ExecutionStatus.ERROR); Assert.assertEquals(executionStatusWithDetails.getDetails(), " Too many dead instances: 2, total instances: 4"); } diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/pushmonitor/PushStatusCollectorTest.java b/services/venice-controller/src/test/java/com/linkedin/venice/pushmonitor/PushStatusCollectorTest.java index a17225546c..9c4c9fde03 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/pushmonitor/PushStatusCollectorTest.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/pushmonitor/PushStatusCollectorTest.java @@ -58,7 +58,8 @@ public void testPushStatusCollector() { 1, 4, 1, - 20); + 20, + 1); pushStatusCollector.start(); pushStatusCollector.subscribeTopic(regularStoreTopicV1, 10); @@ -190,7 +191,8 @@ public void testPushStatusCollectorDaVinciStatusPollingRetry() { 1, 4, 1, - 20); + 20, + 1); pushStatusCollector.start(); pushCompletedCount.set(0); @@ -272,7 +274,8 @@ public void testPushStatusCollectorDaVinciStatusPollingRetryWhenEmptyResultUntil 1, 4, 0, - 20); + 20, + 1); pushStatusCollector.start(); pushCompletedCount.set(0);