Skip to content

Commit

Permalink
[admin-tool][controller][dvc][samza][test][vpj] Job status reporting …
Browse files Browse the repository at this point in the history
…to include detailed DVC push error (linkedin#919)

* Add new DVC error specific terminal enums in ExecutionStatus and similar ones in PushJobCheckpoints and use it instead of the generic ExecutionStatus#ERROR (which was/will be used for server/other task status): DVC_INGESTION_ERROR_DISK_FULL, DVC_INGESTION_ERROR_MEMORY_LIMIT_REACHED, DVC_INGESTION_ERROR_TOO_MANY_DEAD_INSTANCES, DVC_INGESTION_ERROR_OTHER.
* Add a new property rootStatus in ExecutionStatus. rootStatus for these 4 new enums will be ERROR and for other existing enums rootStatus will be itself. Idea is to use status.getRootStatus() for most of the cases.
* These new enums will be returned back from DVC to backend/push job polling (via DVC push status store) and update the respective new checkpoints in VPJ. Still continue to set PushJobDetailsStatus#ERROR for pushJobDetails.overallStatus
* Deployment order: These new statuses are emitted by DaVinci hosts. Other components should be deployed before the Davinci release to be able to act on these new statuses accordingly.
  • Loading branch information
m-nagarajan authored Apr 16, 2024
1 parent 7663fde commit 5ffbfb9
Show file tree
Hide file tree
Showing 37 changed files with 1,421 additions and 262 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

import static com.linkedin.venice.ConfigKeys.PUSH_STATUS_INSTANCE_NAME_SUFFIX;
import static com.linkedin.venice.ConfigKeys.VALIDATE_VENICE_INTERNAL_SCHEMA_VERSION;
import static com.linkedin.venice.pushmonitor.ExecutionStatus.DVC_INGESTION_ERROR_DISK_FULL;
import static com.linkedin.venice.pushmonitor.ExecutionStatus.DVC_INGESTION_ERROR_MEMORY_LIMIT_REACHED;
import static com.linkedin.venice.pushmonitor.ExecutionStatus.DVC_INGESTION_ERROR_OTHER;
import static java.lang.Thread.currentThread;

import com.linkedin.davinci.client.DaVinciRecordTransformer;
Expand Down Expand Up @@ -31,6 +34,8 @@
import com.linkedin.venice.client.store.ClientConfig;
import com.linkedin.venice.client.store.ClientFactory;
import com.linkedin.venice.common.VeniceSystemStoreType;
import com.linkedin.venice.exceptions.DiskLimitExhaustedException;
import com.linkedin.venice.exceptions.MemoryLimitExhaustedException;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.exceptions.VeniceNoStoreException;
import com.linkedin.venice.kafka.protocol.state.PartitionState;
Expand Down Expand Up @@ -659,7 +664,8 @@ public void error(String kafkaTopic, int partitionId, String message, Exception
/**
* Report push status needs to be executed before deleting the {@link VersionBackend}.
*/
reportPushStatus(kafkaTopic, partitionId, ExecutionStatus.ERROR);
ExecutionStatus status = getDaVinciErrorStatus(e);
reportPushStatus(kafkaTopic, partitionId, status);

versionBackend.completePartitionExceptionally(partitionId, e);
versionBackend.tryStopHeartbeat();
Expand Down Expand Up @@ -733,4 +739,18 @@ public void endOfIncrementalPushReceived(
});
}
};

static ExecutionStatus getDaVinciErrorStatus(Exception e) {
ExecutionStatus status = DVC_INGESTION_ERROR_OTHER;
if (e instanceof VeniceException) {
if (e instanceof MemoryLimitExhaustedException
|| (e.getCause() != null && e.getCause() instanceof MemoryLimitExhaustedException)) {
status = DVC_INGESTION_ERROR_MEMORY_LIMIT_REACHED;
} else if (e instanceof DiskLimitExhaustedException
|| (e.getCause() != null && e.getCause() instanceof DiskLimitExhaustedException)) {
status = DVC_INGESTION_ERROR_DISK_FULL;
}
}
return status;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import com.linkedin.venice.common.VeniceSystemStoreUtils;
import com.linkedin.venice.compression.CompressionStrategy;
import com.linkedin.venice.compression.VeniceCompressor;
import com.linkedin.venice.exceptions.DiskLimitExhaustedException;
import com.linkedin.venice.exceptions.MemoryLimitExhaustedException;
import com.linkedin.venice.exceptions.PersistenceFailureException;
import com.linkedin.venice.exceptions.VeniceChecksumException;
Expand Down Expand Up @@ -2209,9 +2210,7 @@ public void processConsumerRecord(
}

if (diskUsage.isDiskFull(recordSize)) {
throw new VeniceException(
"Disk is full: throwing exception to error push: " + storeName + " version " + versionNumber + ". "
+ diskUsage.getDiskStatus());
throw new DiskLimitExhaustedException(storeName, versionNumber, diskUsage.getDiskStatus());
}

/*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package com.linkedin.davinci;

import static com.linkedin.venice.pushmonitor.ExecutionStatus.DVC_INGESTION_ERROR_OTHER;
import static com.linkedin.venice.utils.DataProviderUtils.allPermutationGenerator;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.fail;

import com.linkedin.venice.exceptions.DiskLimitExhaustedException;
import com.linkedin.venice.exceptions.MemoryLimitExhaustedException;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.pushmonitor.ExecutionStatus;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;


public class DaVinciBackendTest {
@DataProvider(name = "DvcErrorExecutionStatus")
public static Object[][] dvcErrorExecutionStatus() {
return allPermutationGenerator((permutation) -> {
ExecutionStatus status = (ExecutionStatus) permutation[0];
return status.isDVCIngestionError();
}, ExecutionStatus.values());
}

@Test(dataProvider = "DvcErrorExecutionStatus")
public void testGetDaVinciErrorStatus(ExecutionStatus executionStatus) {
VeniceException veniceException;
switch (executionStatus) {
case DVC_INGESTION_ERROR_DISK_FULL:
veniceException = new DiskLimitExhaustedException("test");
break;
case DVC_INGESTION_ERROR_MEMORY_LIMIT_REACHED:
veniceException = new MemoryLimitExhaustedException("test");
break;
case DVC_INGESTION_ERROR_TOO_MANY_DEAD_INSTANCES:
case DVC_INGESTION_ERROR_OTHER:
veniceException = new VeniceException("test");
break;
default:
fail("Unexpected execution status: " + executionStatus);
return;
}
assertEquals(
DaVinciBackend.getDaVinciErrorStatus(veniceException),
executionStatus.equals(ExecutionStatus.DVC_INGESTION_ERROR_TOO_MANY_DEAD_INSTANCES)
? DVC_INGESTION_ERROR_OTHER
: executionStatus);
}

@Test(dataProvider = "DvcErrorExecutionStatus")
public void testGetDaVinciErrorStatusNested(ExecutionStatus executionStatus) {
VeniceException veniceException;
switch (executionStatus) {
case DVC_INGESTION_ERROR_DISK_FULL:
veniceException = new VeniceException(new DiskLimitExhaustedException("test"));
break;
case DVC_INGESTION_ERROR_MEMORY_LIMIT_REACHED:
veniceException = new VeniceException(new MemoryLimitExhaustedException("test"));
break;
case DVC_INGESTION_ERROR_TOO_MANY_DEAD_INSTANCES:
case DVC_INGESTION_ERROR_OTHER:
veniceException = new VeniceException("test");
break;
default:
fail("Unexpected execution status: " + executionStatus);
return;
}
assertEquals(
DaVinciBackend.getDaVinciErrorStatus(veniceException),
executionStatus.equals(ExecutionStatus.DVC_INGESTION_ERROR_TOO_MANY_DEAD_INSTANCES)
? DVC_INGESTION_ERROR_OTHER
: executionStatus);
}

@Test(dataProvider = "DvcErrorExecutionStatus")
public void testGetDaVinciErrorStatusWithInvalidCases(ExecutionStatus executionStatus) {
VeniceException veniceException;
switch (executionStatus) {
case DVC_INGESTION_ERROR_DISK_FULL:
case DVC_INGESTION_ERROR_MEMORY_LIMIT_REACHED:
case DVC_INGESTION_ERROR_TOO_MANY_DEAD_INSTANCES:
case DVC_INGESTION_ERROR_OTHER:
veniceException = new VeniceException("test");
break;
default:
fail("Unexpected execution status: " + executionStatus);
return;
}

assertEquals(DaVinciBackend.getDaVinciErrorStatus(veniceException), DVC_INGESTION_ERROR_OTHER);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,7 @@ public void execute() {
JobStatusQueryResponse jobStatusQueryResponse =
parentCtrlCli.queryDetailedJobStatus(result.kafKaTopic, getParams().targetRegion);

if (jobStatusQueryResponse.isError()
|| jobStatusQueryResponse.getStatus().equalsIgnoreCase(ExecutionStatus.ERROR.toString())) {
if (jobStatusQueryResponse.isError() || ExecutionStatus.isError(jobStatusQueryResponse.getStatus())) {
completeCoreWorkWithError(jobStatusQueryResponse.getStatusDetails());
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ private JobStatusQueryResponse buildJobStatusQueryResponse(ExecutionStatus statu
jobResponse.setUncompletedPartitions(partitions);
}

if (status == ExecutionStatus.ERROR) {
if (status.isError()) {
jobResponse.setStatusDetails(
"too many ERROR replicas in partition: x for offlinePushStrategy: WAIT_N_MINUS_ONE_REPLCIA_PER_PARTITION");
}
Expand Down Expand Up @@ -474,7 +474,7 @@ private void verifyMonitorRecoveryResults(ExecutionStatus status, boolean isCurr
return;
}

if (status == ExecutionStatus.ERROR) {
if (status.isError()) {
// Verify all stores are in error state.
for (int i = 0; i < numOfStores; i++) {
Assert.assertTrue(monitor.getTasks().get(i).getTaskResult().isError());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,9 @@ public enum PushJobCheckpoints {
VALIDATE_SCHEMA_AND_BUILD_DICT_MAP_JOB_COMPLETED(7), QUOTA_EXCEEDED(-1), WRITE_ACL_FAILED(-2),
DUP_KEY_WITH_DIFF_VALUE(-3), INPUT_DATA_SCHEMA_VALIDATION_FAILED(-4),
EXTENDED_INPUT_DATA_SCHEMA_VALIDATION_FAILED(-5), RECORD_TOO_LARGE_FAILED(-6), CONCURRENT_BATCH_PUSH(-7),
DATASET_CHANGED(-8), INVALID_INPUT_FILE(-9), ZSTD_DICTIONARY_CREATION_FAILED(-10);
DATASET_CHANGED(-8), INVALID_INPUT_FILE(-9), ZSTD_DICTIONARY_CREATION_FAILED(-10),
DVC_INGESTION_ERROR_DISK_FULL(-11), DVC_INGESTION_ERROR_MEMORY_LIMIT_REACHED(-12),
DVC_INGESTION_ERROR_TOO_MANY_DEAD_INSTANCES(-13), DVC_INGESTION_ERROR_OTHER(-14);

private final int value;

Expand Down Expand Up @@ -594,7 +596,7 @@ protected void setVeniceWriter(VeniceWriter<KafkaKey, byte[], byte[]> veniceWrit
}

// Visible for testing
protected void setSentPushJobDetailsTracker(SentPushJobDetailsTracker sentPushJobDetailsTracker) {
public void setSentPushJobDetailsTracker(SentPushJobDetailsTracker sentPushJobDetailsTracker) {
this.sentPushJobDetailsTracker = sentPushJobDetailsTracker;
}

Expand Down Expand Up @@ -626,7 +628,7 @@ private DataWriterComputeJob constructDataWriterComputeJob() {
return computeJob;
}

private DataWriterComputeJob getDataWriterComputeJob() {
DataWriterComputeJob getDataWriterComputeJob() {
if (dataWriterComputeJob != null) {
return dataWriterComputeJob;
}
Expand Down Expand Up @@ -2338,6 +2340,22 @@ private synchronized void closeVeniceWriter() {
}
}

static ExecutionStatus getExecutionStatusFromControllerResponse(JobStatusQueryResponse response) {
ExecutionStatus status;
try {
status = ExecutionStatus.valueOf(response.getStatus());
} catch (IllegalArgumentException e) {
StringBuilder errorMsg = new StringBuilder().append("Invalid ExecutionStatus returned from backend. status: ")
.append(response.getStatus());
if (response.getOptionalExtraDetails().isPresent()) {
errorMsg.append(", extra details: ").append(response.getOptionalExtraDetails().get());
}
LOGGER.error(errorMsg.toString());
throw new VeniceException(errorMsg.toString(), e);
}
return status;
}

/**
* High level, we want to poll the consumption job status until it errors or is complete. This is more complicated
* because we might be dealing with multiple destination clusters and we might not be able to reach all of them. We
Expand Down Expand Up @@ -2405,17 +2423,16 @@ void pollStatusUntilComplete(
}

previousOverallDetails = printJobStatus(response, previousOverallDetails, previousExtraDetails);
ExecutionStatus overallStatus = ExecutionStatus.valueOf(response.getStatus());
ExecutionStatus overallStatus = getExecutionStatusFromControllerResponse(response);
Map<String, String> regionSpecificInfo = response.getExtraInfo();
// Note that it's intended to update the push job details before updating the completed datacenter set.
updatePushJobDetailsWithColoStatus(regionSpecificInfo, completedDatacenters);
regionSpecificInfo.forEach((region, regionStatus) -> {
ExecutionStatus datacenterStatus = ExecutionStatus.valueOf(regionStatus);
if (datacenterStatus.isTerminal() && !datacenterStatus.equals(ExecutionStatus.ERROR)) {
if (datacenterStatus.isTerminal() && !datacenterStatus.isError()) {
completedDatacenters.add(region);
}
});

if (overallStatus.isTerminal()) {
if (completedDatacenters.size() != regionSpecificInfo.size() || !successfulStatuses.contains(overallStatus)) {
// 1) For regular push, one or more DC could have an UNKNOWN status and never successfully reported a
Expand All @@ -2428,7 +2445,10 @@ void pollStatusUntilComplete(
.append(pushJobSetting.veniceControllerUrl)
.append("\ncontroller response: ")
.append(response);

if (overallStatus.isDVCIngestionError()) {
this.pushJobDetails.pushJobLatestCheckpoint =
PushJobCheckpoints.valueOf(overallStatus.toString()).getValue();
}
throw new VeniceException(errorMsg.toString());
}

Expand Down Expand Up @@ -2886,4 +2906,9 @@ private static void runPushJob(String jobId, Properties props) {
job.run();
}
}

// used only for testing
void setDataWriterComputeJob(DataWriterComputeJob dataWriterComputeJob) {
this.dataWriterComputeJob = dataWriterComputeJob;
}
}
Loading

0 comments on commit 5ffbfb9

Please sign in to comment.