Skip to content

Commit

Permalink
Adding another ratio config
Browse files Browse the repository at this point in the history
  • Loading branch information
sixpluszero committed Jan 31, 2024
1 parent 6022f95 commit 64ad823
Show file tree
Hide file tree
Showing 8 changed files with 78 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1707,8 +1707,13 @@ private ConfigKeys() {
public static final String DAVINCI_PUSH_STATUS_SCAN_NO_REPORT_RETRY_MAX_ATTEMPTS =
"davinci.push.status.scan.no.report.retry.max.attempts";

public static final String DAVINCI_PUSH_STATUS_SCAN_MAX_OFFLINE_INSTANCE =
"davinci.push.status.scan.max.offline.instance";
// Config to control how many DVC replica instances are allowed to be offline before failing VPJ push.
public static final String DAVINCI_PUSH_STATUS_SCAN_MAX_OFFLINE_INSTANCE_COUNT =
"davinci.push.status.scan.max.offline.instance.count";

// Config to control how much percentage of DVC replica instances are allowed to be offline before failing VPJ push.
public static final String DAVINCI_PUSH_STATUS_SCAN_MAX_OFFLINE_INSTANCE_RATIO =
"davinci.push.status.scan.max.offline.instance.ratio";

public static final String CONTROLLER_ZK_SHARED_DAVINCI_PUSH_STATUS_SYSTEM_SCHEMA_STORE_AUTO_CREATION_ENABLED =
"controller.zk.shared.davinci.push.status.system.schema.store.auto.creation.enabled";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@
import static com.linkedin.venice.ConfigKeys.CONTROLLER_ZK_SHARED_META_SYSTEM_SCHEMA_STORE_AUTO_CREATION_ENABLED;
import static com.linkedin.venice.ConfigKeys.DAVINCI_PUSH_STATUS_SCAN_ENABLED;
import static com.linkedin.venice.ConfigKeys.DAVINCI_PUSH_STATUS_SCAN_INTERVAL_IN_SECONDS;
import static com.linkedin.venice.ConfigKeys.DAVINCI_PUSH_STATUS_SCAN_MAX_OFFLINE_INSTANCE;
import static com.linkedin.venice.ConfigKeys.DAVINCI_PUSH_STATUS_SCAN_MAX_OFFLINE_INSTANCE_COUNT;
import static com.linkedin.venice.ConfigKeys.DAVINCI_PUSH_STATUS_SCAN_MAX_OFFLINE_INSTANCE_RATIO;
import static com.linkedin.venice.ConfigKeys.DAVINCI_PUSH_STATUS_SCAN_NO_REPORT_RETRY_MAX_ATTEMPTS;
import static com.linkedin.venice.ConfigKeys.DAVINCI_PUSH_STATUS_SCAN_THREAD_NUMBER;
import static com.linkedin.venice.ConfigKeys.DEPRECATED_TOPIC_MAX_RETENTION_MS;
Expand Down Expand Up @@ -229,7 +230,9 @@ public class VeniceControllerConfig extends VeniceControllerClusterConfig {

private final int daVinciPushStatusScanNoReportRetryMaxAttempt;

private final int daVinciPushStatusScanMaxOfflineInstance;
private final int daVinciPushStatusScanMaxOfflineInstanceCount;

private final double daVinciPushStatusScanMaxOfflineInstanceRatio;

private final boolean zkSharedDaVinciPushStatusSystemSchemaStoreAutoCreationEnabled;

Expand Down Expand Up @@ -486,7 +489,10 @@ public VeniceControllerConfig(VeniceProperties props) {
this.daVinciPushStatusScanThreadNumber = props.getInt(DAVINCI_PUSH_STATUS_SCAN_THREAD_NUMBER, 4);
this.daVinciPushStatusScanNoReportRetryMaxAttempt =
props.getInt(DAVINCI_PUSH_STATUS_SCAN_NO_REPORT_RETRY_MAX_ATTEMPTS, 6);
this.daVinciPushStatusScanMaxOfflineInstance = props.getInt(DAVINCI_PUSH_STATUS_SCAN_MAX_OFFLINE_INSTANCE, 10);
this.daVinciPushStatusScanMaxOfflineInstanceCount =
props.getInt(DAVINCI_PUSH_STATUS_SCAN_MAX_OFFLINE_INSTANCE_COUNT, 10);
this.daVinciPushStatusScanMaxOfflineInstanceRatio =
props.getDouble(DAVINCI_PUSH_STATUS_SCAN_MAX_OFFLINE_INSTANCE_RATIO, 0.1d);

this.zkSharedDaVinciPushStatusSystemSchemaStoreAutoCreationEnabled =
props.getBoolean(CONTROLLER_ZK_SHARED_DAVINCI_PUSH_STATUS_SYSTEM_SCHEMA_STORE_AUTO_CREATION_ENABLED, true);
Expand Down Expand Up @@ -639,8 +645,12 @@ public long getDisabledReplicaEnablerServiceIntervalMs() {
return disabledReplicaEnablerServiceIntervalMs;
}

public int getDaVinciPushStatusScanMaxOfflineInstance() {
return daVinciPushStatusScanMaxOfflineInstance;
public int getDaVinciPushStatusScanMaxOfflineInstanceCount() {
return daVinciPushStatusScanMaxOfflineInstanceCount;
}

public double getDaVinciPushStatusScanMaxOfflineInstanceRatio() {
return daVinciPushStatusScanMaxOfflineInstanceRatio;
}

public int getTopicCleanupDelayFactor() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5621,7 +5621,8 @@ private OfflinePushStatusInfo getOfflinePushStatusInfo(
version.kafkaTopicName(),
version.getPartitionCount(),
incrementalPushVersion,
multiClusterConfigs.getControllerConfig(clusterName).getDaVinciPushStatusScanMaxOfflineInstance());
multiClusterConfigs.getControllerConfig(clusterName).getDaVinciPushStatusScanMaxOfflineInstanceCount(),
multiClusterConfigs.getControllerConfig(clusterName).getDaVinciPushStatusScanMaxOfflineInstanceRatio());
ExecutionStatus daVinciStatus = daVinciStatusAndDetails.getStatus();
String daVinciDetails = daVinciStatusAndDetails.getDetails();
executionStatus = getOverallPushStatus(executionStatus, daVinciStatus);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,8 @@ public AbstractPushMonitor(
controllerConfig.getDaVinciPushStatusScanIntervalInSeconds(),
controllerConfig.getDaVinciPushStatusScanThreadNumber(),
controllerConfig.getDaVinciPushStatusScanNoReportRetryMaxAttempt(),
controllerConfig.getDaVinciPushStatusScanMaxOfflineInstance());
controllerConfig.getDaVinciPushStatusScanMaxOfflineInstanceCount(),
controllerConfig.getDaVinciPushStatusScanMaxOfflineInstanceRatio());
this.isOfflinePushMonitorDaVinciPushStatusEnabled = controllerConfig.isDaVinciPushStatusEnabled();
pushStatusCollector.start();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ public static ExecutionStatusWithDetails getDaVinciPushStatusAndDetails(
String topicName,
int partitionCount,
Optional<String> incrementalPushVersion,
int maxOfflineInstance) {
int maxOfflineInstanceCount,
double maxOfflineInstanceRatio) {
if (reader == null) {
throw new VeniceException("PushStatusStoreReader is null");
}
Expand Down Expand Up @@ -93,7 +94,9 @@ public static ExecutionStatusWithDetails getDaVinciPushStatusAndDetails(
boolean noDaVinciStatusReported = totalReplicaCount == 0;
int offlineReplicaCount = totalReplicaCount - liveReplicaCount - completedReplicaCount;
// Report error if too many Da Vinci instances are not alive for over 5 minutes.
if (offlineReplicaCount > maxOfflineInstance) {
int maxOfflineInstanceAllowed =
Math.min(maxOfflineInstanceCount, (int) (maxOfflineInstanceRatio * totalReplicaCount));
if (offlineReplicaCount > maxOfflineInstanceAllowed) {
Long lastUpdateTime = storeVersionToDVCDeadInstanceTimeMap.get(topicName);
if (lastUpdateTime != null) {
if (lastUpdateTime + TimeUnit.MINUTES.toMillis(daVinciErrorInstanceWaitTime) < System.currentTimeMillis()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ public class PushStatusCollector {
private final int daVinciPushStatusScanThreadNumber;
private final boolean daVinciPushStatusScanEnabled;
private final int daVinciPushStatusNoReportRetryMaxAttempts;
private final int daVinciPushStatusScanMaxOfflineInstance;
private final int daVinciPushStatusScanMaxOfflineInstanceCount;
private final double daVinciPushStatusScanMaxOfflineInstanceRatio;
private ScheduledExecutorService offlinePushCheckScheduler;
private ExecutorService pushStatusStoreScanExecutor;
private final AtomicBoolean isStarted = new AtomicBoolean(false);
Expand All @@ -58,7 +59,8 @@ public PushStatusCollector(
int daVinciPushStatusScanIntervalInSeconds,
int daVinciPushStatusScanThreadNumber,
int daVinciPushStatusNoReportRetryMaxAttempts,
int daVinciPushStatusScanMaxOfflineInstance) {
int daVinciPushStatusScanMaxOfflineInstanceCount,
double daVinciPushStatusScanMaxOfflineInstanceRatio) {
this.storeRepository = storeRepository;
this.pushStatusStoreReader = pushStatusStoreReader;
this.pushCompletedHandler = pushCompletedHandler;
Expand All @@ -67,7 +69,8 @@ public PushStatusCollector(
this.daVinciPushStatusScanPeriodInSeconds = daVinciPushStatusScanIntervalInSeconds;
this.daVinciPushStatusScanThreadNumber = daVinciPushStatusScanThreadNumber;
this.daVinciPushStatusNoReportRetryMaxAttempts = daVinciPushStatusNoReportRetryMaxAttempts;
this.daVinciPushStatusScanMaxOfflineInstance = daVinciPushStatusScanMaxOfflineInstance;
this.daVinciPushStatusScanMaxOfflineInstanceCount = daVinciPushStatusScanMaxOfflineInstanceCount;
this.daVinciPushStatusScanMaxOfflineInstanceRatio = daVinciPushStatusScanMaxOfflineInstanceRatio;
}

public void start() {
Expand Down Expand Up @@ -131,7 +134,8 @@ private void scanDaVinciPushStatus() {
topicName,
pushStatus.getPartitionCount(),
Optional.empty(),
daVinciPushStatusScanMaxOfflineInstance);
daVinciPushStatusScanMaxOfflineInstanceCount,
daVinciPushStatusScanMaxOfflineInstanceRatio);
pushStatus.setDaVinciStatus(statusWithDetails);
return pushStatus;
}, pushStatusStoreScanExecutor));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,18 @@ public void testDaVinciPushStatusScan() {
map.put("c", 3);
map.put("d", 10);
doReturn(map).when(reader).getPartitionStatus("store", 1, 0, Optional.empty());
doReturn(map).when(reader).getPartitionStatus("store", 2, 0, Optional.empty());
doReturn(true).when(reader).isInstanceAlive(eq("store"), eq("a"));
doReturn(false).when(reader).isInstanceAlive(eq("store"), eq("b"));
doReturn(false).when(reader).isInstanceAlive(eq("store"), eq("c"));
doReturn(false).when(reader).isInstanceAlive(eq("store"), eq("d"));

// Testing count-based threshold.
/**
* It is still valid, because we have 4 replicas, 1 completed, 2 offline, 1 online, threshold number is 2.
*/
ExecutionStatusWithDetails executionStatusWithDetails =
PushMonitorUtils.getDaVinciPushStatusAndDetails(reader, topicName, 1, Optional.empty(), 2);
PushMonitorUtils.getDaVinciPushStatusAndDetails(reader, topicName, 1, Optional.empty(), 2, 1.0);
Assert.assertEquals(executionStatusWithDetails.getStatus(), ExecutionStatus.STARTED);

/**
Expand All @@ -43,7 +45,7 @@ public void testDaVinciPushStatusScan() {
*/
Utils.sleep(1);
executionStatusWithDetails =
PushMonitorUtils.getDaVinciPushStatusAndDetails(reader, topicName, 1, Optional.empty(), 1);
PushMonitorUtils.getDaVinciPushStatusAndDetails(reader, topicName, 1, Optional.empty(), 1, 1.0);
Assert.assertEquals(executionStatusWithDetails.getStatus(), ExecutionStatus.STARTED);

/**
Expand All @@ -52,7 +54,35 @@ public void testDaVinciPushStatusScan() {
*/
Utils.sleep(1);
executionStatusWithDetails =
PushMonitorUtils.getDaVinciPushStatusAndDetails(reader, topicName, 1, Optional.empty(), 1);
PushMonitorUtils.getDaVinciPushStatusAndDetails(reader, topicName, 1, Optional.empty(), 1, 1.0);
Assert.assertEquals(executionStatusWithDetails.getStatus(), ExecutionStatus.ERROR);
Assert.assertEquals(executionStatusWithDetails.getDetails(), " Too many dead instances: 2, total instances: 4");

// Testing ratio-based threshold.
topicName = "store_v2";
/**
* It is still valid, because we have 4 replicas, 1 completed, 2 offline, 1 online, threshold number is 4 * 0.5 = 2.
*/
executionStatusWithDetails =
PushMonitorUtils.getDaVinciPushStatusAndDetails(reader, topicName, 1, Optional.empty(), 100, 0.5);
Assert.assertEquals(executionStatusWithDetails.getStatus(), ExecutionStatus.STARTED);

/**
* The offline instances number exceed the max offline threshold count, but it will remain STARTED, as we need to wait
* until daVinciErrorInstanceWaitTime has passed since it first occurs.
*/
Utils.sleep(1);
executionStatusWithDetails =
PushMonitorUtils.getDaVinciPushStatusAndDetails(reader, topicName, 1, Optional.empty(), 100, 0.25);
Assert.assertEquals(executionStatusWithDetails.getStatus(), ExecutionStatus.STARTED);

/**
* This time it should fail, as we override the wait time to 0, and after 1ms, the 2nd query should meet the failure
* condition check.
*/
Utils.sleep(1);
executionStatusWithDetails =
PushMonitorUtils.getDaVinciPushStatusAndDetails(reader, topicName, 1, Optional.empty(), 100, 0.25);
Assert.assertEquals(executionStatusWithDetails.getStatus(), ExecutionStatus.ERROR);
Assert.assertEquals(executionStatusWithDetails.getDetails(), " Too many dead instances: 2, total instances: 4");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ public void testPushStatusCollector() {
1,
4,
1,
20);
20,
1);
pushStatusCollector.start();

pushStatusCollector.subscribeTopic(regularStoreTopicV1, 10);
Expand Down Expand Up @@ -190,7 +191,8 @@ public void testPushStatusCollectorDaVinciStatusPollingRetry() {
1,
4,
1,
20);
20,
1);
pushStatusCollector.start();

pushCompletedCount.set(0);
Expand Down Expand Up @@ -272,7 +274,8 @@ public void testPushStatusCollectorDaVinciStatusPollingRetryWhenEmptyResultUntil
1,
4,
0,
20);
20,
1);
pushStatusCollector.start();

pushCompletedCount.set(0);
Expand Down

0 comments on commit 64ad823

Please sign in to comment.