Skip to content

Commit

Permalink
[controller][dvc] Do not count COMPLETED Da Vinci replicas as offline…
Browse files Browse the repository at this point in the history
… replicas (#837)

Some Venice Push Job failed with the error stating more than 2k offline replicas in the push. After checking the logic we believe the live replica count check is not accurate. Completed DVC replica will not emit heartbeat to push status store, and will not be counted as live replicas if the ingestion of certain partitions staggers or data skew happens among different partitions.
This PR fix the counting to make sure completed replicas are also considered when calculating offline replicas.
  • Loading branch information
sixpluszero authored Feb 1, 2024
1 parent 6131ed6 commit 17ff202
Show file tree
Hide file tree
Showing 8 changed files with 138 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1710,8 +1710,13 @@ private ConfigKeys() {
public static final String DAVINCI_PUSH_STATUS_SCAN_NO_REPORT_RETRY_MAX_ATTEMPTS =
"davinci.push.status.scan.no.report.retry.max.attempts";

public static final String DAVINCI_PUSH_STATUS_SCAN_MAX_OFFLINE_INSTANCE =
"davinci.push.status.scan.max.offline.instance";
// Config to control how many DVC replica instances are allowed to be offline before failing VPJ push.
public static final String DAVINCI_PUSH_STATUS_SCAN_MAX_OFFLINE_INSTANCE_COUNT =
"davinci.push.status.scan.max.offline.instance.count";

// Config to control how much percentage of DVC replica instances are allowed to be offline before failing VPJ push.
public static final String DAVINCI_PUSH_STATUS_SCAN_MAX_OFFLINE_INSTANCE_RATIO =
"davinci.push.status.scan.max.offline.instance.ratio";

public static final String CONTROLLER_ZK_SHARED_DAVINCI_PUSH_STATUS_SYSTEM_SCHEMA_STORE_AUTO_CREATION_ENABLED =
"controller.zk.shared.davinci.push.status.system.schema.store.auto.creation.enabled";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@
import static com.linkedin.venice.ConfigKeys.CONTROLLER_ZK_SHARED_META_SYSTEM_SCHEMA_STORE_AUTO_CREATION_ENABLED;
import static com.linkedin.venice.ConfigKeys.DAVINCI_PUSH_STATUS_SCAN_ENABLED;
import static com.linkedin.venice.ConfigKeys.DAVINCI_PUSH_STATUS_SCAN_INTERVAL_IN_SECONDS;
import static com.linkedin.venice.ConfigKeys.DAVINCI_PUSH_STATUS_SCAN_MAX_OFFLINE_INSTANCE;
import static com.linkedin.venice.ConfigKeys.DAVINCI_PUSH_STATUS_SCAN_MAX_OFFLINE_INSTANCE_COUNT;
import static com.linkedin.venice.ConfigKeys.DAVINCI_PUSH_STATUS_SCAN_MAX_OFFLINE_INSTANCE_RATIO;
import static com.linkedin.venice.ConfigKeys.DAVINCI_PUSH_STATUS_SCAN_NO_REPORT_RETRY_MAX_ATTEMPTS;
import static com.linkedin.venice.ConfigKeys.DAVINCI_PUSH_STATUS_SCAN_THREAD_NUMBER;
import static com.linkedin.venice.ConfigKeys.DEPRECATED_TOPIC_MAX_RETENTION_MS;
Expand Down Expand Up @@ -229,7 +230,9 @@ public class VeniceControllerConfig extends VeniceControllerClusterConfig {

private final int daVinciPushStatusScanNoReportRetryMaxAttempt;

private final int daVinciPushStatusScanMaxOfflineInstance;
private final int daVinciPushStatusScanMaxOfflineInstanceCount;

private final double daVinciPushStatusScanMaxOfflineInstanceRatio;

private final boolean zkSharedDaVinciPushStatusSystemSchemaStoreAutoCreationEnabled;

Expand Down Expand Up @@ -486,7 +489,10 @@ public VeniceControllerConfig(VeniceProperties props) {
this.daVinciPushStatusScanThreadNumber = props.getInt(DAVINCI_PUSH_STATUS_SCAN_THREAD_NUMBER, 4);
this.daVinciPushStatusScanNoReportRetryMaxAttempt =
props.getInt(DAVINCI_PUSH_STATUS_SCAN_NO_REPORT_RETRY_MAX_ATTEMPTS, 6);
this.daVinciPushStatusScanMaxOfflineInstance = props.getInt(DAVINCI_PUSH_STATUS_SCAN_MAX_OFFLINE_INSTANCE, 10);
this.daVinciPushStatusScanMaxOfflineInstanceCount =
props.getInt(DAVINCI_PUSH_STATUS_SCAN_MAX_OFFLINE_INSTANCE_COUNT, 10);
this.daVinciPushStatusScanMaxOfflineInstanceRatio =
props.getDouble(DAVINCI_PUSH_STATUS_SCAN_MAX_OFFLINE_INSTANCE_RATIO, 0.05d);

this.zkSharedDaVinciPushStatusSystemSchemaStoreAutoCreationEnabled =
props.getBoolean(CONTROLLER_ZK_SHARED_DAVINCI_PUSH_STATUS_SYSTEM_SCHEMA_STORE_AUTO_CREATION_ENABLED, true);
Expand Down Expand Up @@ -639,8 +645,12 @@ public long getDisabledReplicaEnablerServiceIntervalMs() {
return disabledReplicaEnablerServiceIntervalMs;
}

public int getDaVinciPushStatusScanMaxOfflineInstance() {
return daVinciPushStatusScanMaxOfflineInstance;
public int getDaVinciPushStatusScanMaxOfflineInstanceCount() {
return daVinciPushStatusScanMaxOfflineInstanceCount;
}

public double getDaVinciPushStatusScanMaxOfflineInstanceRatio() {
return daVinciPushStatusScanMaxOfflineInstanceRatio;
}

public int getTopicCleanupDelayFactor() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5634,7 +5634,8 @@ private OfflinePushStatusInfo getOfflinePushStatusInfo(
version.kafkaTopicName(),
version.getPartitionCount(),
incrementalPushVersion,
multiClusterConfigs.getControllerConfig(clusterName).getDaVinciPushStatusScanMaxOfflineInstance());
multiClusterConfigs.getControllerConfig(clusterName).getDaVinciPushStatusScanMaxOfflineInstanceCount(),
multiClusterConfigs.getControllerConfig(clusterName).getDaVinciPushStatusScanMaxOfflineInstanceRatio());
ExecutionStatus daVinciStatus = daVinciStatusAndDetails.getStatus();
String daVinciDetails = daVinciStatusAndDetails.getDetails();
executionStatus = getOverallPushStatus(executionStatus, daVinciStatus);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,8 @@ public AbstractPushMonitor(
controllerConfig.getDaVinciPushStatusScanIntervalInSeconds(),
controllerConfig.getDaVinciPushStatusScanThreadNumber(),
controllerConfig.getDaVinciPushStatusScanNoReportRetryMaxAttempt(),
controllerConfig.getDaVinciPushStatusScanMaxOfflineInstance());
controllerConfig.getDaVinciPushStatusScanMaxOfflineInstanceCount(),
controllerConfig.getDaVinciPushStatusScanMaxOfflineInstanceRatio());
this.isOfflinePushMonitorDaVinciPushStatusEnabled = controllerConfig.isDaVinciPushStatusEnabled();
pushStatusCollector.start();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ public static ExecutionStatusWithDetails getDaVinciPushStatusAndDetails(
String topicName,
int partitionCount,
Optional<String> incrementalPushVersion,
int maxOfflineInstance) {
int maxOfflineInstanceCount,
double maxOfflineInstanceRatio) {
if (reader == null) {
throw new VeniceException("PushStatusStoreReader is null");
}
Expand All @@ -51,6 +52,8 @@ public static ExecutionStatusWithDetails getDaVinciPushStatusAndDetails(
int completedPartitions = 0;
int totalReplicaCount = 0;
int liveReplicaCount = 0;
int completedReplicaCount = 0;
Set<String> offlineInstanceList = new HashSet<>();
Set<Integer> incompletePartition = new HashSet<>();
for (int partitionId = 0; partitionId < partitionCount; partitionId++) {
Map<CharSequence, Integer> instances =
Expand All @@ -59,15 +62,22 @@ public static ExecutionStatusWithDetails getDaVinciPushStatusAndDetails(
totalReplicaCount += instances.size();
for (Map.Entry<CharSequence, Integer> entry: instances.entrySet()) {
ExecutionStatus status = ExecutionStatus.fromInt(entry.getValue());
// We will skip completed replicas, as they have stopped emitting heartbeats and will not be counted as live
// replicas.
if (status == completeStatus) {
completedReplicaCount++;
continue;
}
boolean isInstanceAlive = reader.isInstanceAlive(storeName, entry.getKey().toString());
if (!isInstanceAlive) {
// Keep at most 5 offline instances for logging purpose.
if (offlineInstanceList.size() < 5) {
offlineInstanceList.add(entry.getKey().toString());
}
continue;
}
// We only compute status based on live instances.
// Derive the overall partition ingestion status based on all live replica ingestion status.
liveReplicaCount++;
if (status == completeStatus) {
continue;
}
if (status == middleStatus) {
allInstancesCompleted = false;
continue;
Expand All @@ -87,17 +97,19 @@ public static ExecutionStatusWithDetails getDaVinciPushStatusAndDetails(
}
}
boolean noDaVinciStatusReported = totalReplicaCount == 0;

// Report error if too many davinci instances are not alive for over 5 mins
if (totalReplicaCount - liveReplicaCount > maxOfflineInstance) {
int offlineReplicaCount = totalReplicaCount - liveReplicaCount - completedReplicaCount;
// Report error if too many Da Vinci instances are not alive for over 5 minutes.
int maxOfflineInstanceAllowed =
Math.max(maxOfflineInstanceCount, (int) (maxOfflineInstanceRatio * totalReplicaCount));
if (offlineReplicaCount > maxOfflineInstanceAllowed) {
Long lastUpdateTime = storeVersionToDVCDeadInstanceTimeMap.get(topicName);
if (lastUpdateTime != null) {
if (lastUpdateTime + TimeUnit.MINUTES.toMillis(daVinciErrorInstanceWaitTime) < System.currentTimeMillis()) {
storeVersionToDVCDeadInstanceTimeMap.remove(topicName);
return new ExecutionStatusWithDetails(
ExecutionStatus.ERROR,
" Too many dead instances: " + (totalReplicaCount - liveReplicaCount) + ", total instances: "
+ totalReplicaCount,
"Too many dead instances: " + offlineReplicaCount + ", total instances: " + totalReplicaCount
+ ", example offline instances: " + offlineInstanceList,
noDaVinciStatusReported);
}
} else {
Expand All @@ -124,6 +136,8 @@ public static ExecutionStatusWithDetails getDaVinciPushStatusAndDetails(
.append(erroredReplica.get())
.append(". Live replica count: ")
.append(liveReplicaCount)
.append(", completed replica count: ")
.append(completedReplicaCount)
.append(", total replica count: ")
.append(totalReplicaCount);
}
Expand All @@ -133,6 +147,8 @@ public static ExecutionStatusWithDetails getDaVinciPushStatusAndDetails(
.append(incompletePartition)
.append(". Live replica count: ")
.append(liveReplicaCount)
.append(", completed replica count: ")
.append(completedReplicaCount)
.append(", total replica count: ")
.append(totalReplicaCount);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ public class PushStatusCollector {
private final int daVinciPushStatusScanThreadNumber;
private final boolean daVinciPushStatusScanEnabled;
private final int daVinciPushStatusNoReportRetryMaxAttempts;
private final int daVinciPushStatusScanMaxOfflineInstance;
private final int daVinciPushStatusScanMaxOfflineInstanceCount;
private final double daVinciPushStatusScanMaxOfflineInstanceRatio;
private ScheduledExecutorService offlinePushCheckScheduler;
private ExecutorService pushStatusStoreScanExecutor;
private final AtomicBoolean isStarted = new AtomicBoolean(false);
Expand All @@ -58,7 +59,8 @@ public PushStatusCollector(
int daVinciPushStatusScanIntervalInSeconds,
int daVinciPushStatusScanThreadNumber,
int daVinciPushStatusNoReportRetryMaxAttempts,
int daVinciPushStatusScanMaxOfflineInstance) {
int daVinciPushStatusScanMaxOfflineInstanceCount,
double daVinciPushStatusScanMaxOfflineInstanceRatio) {
this.storeRepository = storeRepository;
this.pushStatusStoreReader = pushStatusStoreReader;
this.pushCompletedHandler = pushCompletedHandler;
Expand All @@ -67,7 +69,8 @@ public PushStatusCollector(
this.daVinciPushStatusScanPeriodInSeconds = daVinciPushStatusScanIntervalInSeconds;
this.daVinciPushStatusScanThreadNumber = daVinciPushStatusScanThreadNumber;
this.daVinciPushStatusNoReportRetryMaxAttempts = daVinciPushStatusNoReportRetryMaxAttempts;
this.daVinciPushStatusScanMaxOfflineInstance = daVinciPushStatusScanMaxOfflineInstance;
this.daVinciPushStatusScanMaxOfflineInstanceCount = daVinciPushStatusScanMaxOfflineInstanceCount;
this.daVinciPushStatusScanMaxOfflineInstanceRatio = daVinciPushStatusScanMaxOfflineInstanceRatio;
}

public void start() {
Expand Down Expand Up @@ -131,7 +134,8 @@ private void scanDaVinciPushStatus() {
topicName,
pushStatus.getPartitionCount(),
Optional.empty(),
daVinciPushStatusScanMaxOfflineInstance);
daVinciPushStatusScanMaxOfflineInstanceCount,
daVinciPushStatusScanMaxOfflineInstanceRatio);
pushStatus.setDaVinciStatus(statusWithDetails);
return pushStatus;
}, pushStatusStoreScanExecutor));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,36 +5,97 @@
import static org.mockito.Mockito.mock;

import com.linkedin.venice.pushstatushelper.PushStatusStoreReader;
import com.linkedin.venice.utils.Utils;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.testng.Assert;
import org.testng.annotations.Test;


public class PushMonitorUtilsTest {
@Test
public void testDaVinciPushStatusScan() {
String topicName = "store_v1";
PushMonitorUtils.setDaVinciErrorInstanceWaitTime(0);
PushStatusStoreReader reader = mock(PushStatusStoreReader.class);
doReturn(true).when(reader).isInstanceAlive(eq("store"), eq("a"));
doReturn(false).when(reader).isInstanceAlive(eq("store"), eq("b"));
doReturn(false).when(reader).isInstanceAlive(eq("store"), eq("c"));
doReturn(false).when(reader).isInstanceAlive(eq("store"), eq("d"));

Map<CharSequence, Integer> map = new HashMap<>();
map.put("a", 3);
map.put("b", 3);
map.put("c", 3);
map.put("d", 10);
doReturn(map).when(reader).getPartitionStatus("store", 1, 0, Optional.empty());
doReturn(true).when(reader).isInstanceAlive(eq("store"), eq("a"));
doReturn(false).when(reader).isInstanceAlive(eq("store"), eq("b"));
doReturn(false).when(reader).isInstanceAlive(eq("store"), eq("c"));
doReturn(false).when(reader).isInstanceAlive(eq("store"), eq("d"));
doReturn(map).when(reader).getPartitionStatus("store", 2, 0, Optional.empty());

Set<String> offlineInstances = new HashSet<>();
offlineInstances.add("b");
offlineInstances.add("c");
/**
* Testing count-based threshold.
*/
// Valid, because we have 4 replicas, 1 completed, 2 offline, 1 online, threshold number is max(2, 0.25*4) = 2.
validateOfflineReplicaInPushStatus(reader, "store_v1", 2, 0.25, ExecutionStatus.STARTED, null);
// Expected to fail.
validateOfflineReplicaInPushStatus(
reader,
"store_v1",
1,
0.25,
ExecutionStatus.ERROR,
"Too many dead instances: 2, total instances: 4, example offline instances: " + offlineInstances);

/**
* Testing ratio-based threshold.
*/
// Valid, because we have 4 replicas, 1 completed, 2 offline, 1 online, threshold number is max(1, 0.5*4) = 2.
validateOfflineReplicaInPushStatus(reader, "store_v2", 1, 0.5, ExecutionStatus.STARTED, null);
// Expected to fail.
validateOfflineReplicaInPushStatus(
reader,
"store_v2",
1,
0.25,
ExecutionStatus.ERROR,
"Too many dead instances: 2, total instances: 4, example offline instances: " + offlineInstances);
}

ExecutionStatusWithDetails executionStatusWithDetails =
PushMonitorUtils.getDaVinciPushStatusAndDetails(reader, topicName, 1, Optional.empty(), 2);
private void validateOfflineReplicaInPushStatus(
PushStatusStoreReader reader,
String topicName,
int maxOfflineInstanceCount,
double maxOfflineInstanceRatio,
ExecutionStatus expectedStatus,
String expectedErrorDetails) {
/**
* Even if offline instances number exceed the max offline threshold count it will remain STARTED for the first check,
* as we need to wait until daVinciErrorInstanceWaitTime has passed since it first occurs.
*/
ExecutionStatusWithDetails executionStatusWithDetails = PushMonitorUtils.getDaVinciPushStatusAndDetails(
reader,
topicName,
1,
Optional.empty(),
maxOfflineInstanceCount,
maxOfflineInstanceRatio);
Assert.assertEquals(executionStatusWithDetails.getStatus(), ExecutionStatus.STARTED);
executionStatusWithDetails =
PushMonitorUtils.getDaVinciPushStatusAndDetails(reader, topicName, 1, Optional.empty(), 2);
Assert.assertEquals(executionStatusWithDetails.getStatus(), ExecutionStatus.ERROR);
Assert.assertEquals(executionStatusWithDetails.getDetails(), " Too many dead instances: 3, total instances: 4");
// Sleep 1ms and try again.
Utils.sleep(1);
executionStatusWithDetails = PushMonitorUtils.getDaVinciPushStatusAndDetails(
reader,
topicName,
1,
Optional.empty(),
maxOfflineInstanceCount,
maxOfflineInstanceRatio);
Assert.assertEquals(executionStatusWithDetails.getStatus(), expectedStatus);
if (expectedStatus.equals(ExecutionStatus.ERROR)) {
Assert.assertEquals(executionStatusWithDetails.getDetails(), expectedErrorDetails);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ public void testPushStatusCollector() {
1,
4,
1,
20);
20,
1);
pushStatusCollector.start();

pushStatusCollector.subscribeTopic(regularStoreTopicV1, 10);
Expand Down Expand Up @@ -190,7 +191,8 @@ public void testPushStatusCollectorDaVinciStatusPollingRetry() {
1,
4,
1,
20);
20,
1);
pushStatusCollector.start();

pushCompletedCount.set(0);
Expand Down Expand Up @@ -272,7 +274,8 @@ public void testPushStatusCollectorDaVinciStatusPollingRetryWhenEmptyResultUntil
1,
4,
0,
20);
20,
1);
pushStatusCollector.start();

pushCompletedCount.set(0);
Expand Down

0 comments on commit 17ff202

Please sign in to comment.