From fe0154333240f8245c4d128035ddd53738249789 Mon Sep 17 00:00:00 2001 From: Manoj Nagarajan Date: Tue, 8 Oct 2024 17:57:20 -0700 Subject: [PATCH] [controller] add logs for push job status (#1225) Emit logs for push job status whenever push job metrics are emitted --- .../venice/controller/VeniceHelixAdmin.java | 38 +++++++++++++++---- .../controller/TestPushJobStatusStats.java | 4 +- 2 files changed, 33 insertions(+), 9 deletions(-) diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java index 0a823878ac..dd880b168f 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java @@ -1199,9 +1199,10 @@ static boolean isPushJobFailedDueToUserError( static void emitPushJobStatusMetrics( Map pushJobStatusStatsMap, - PushJobDetails pushJobDetails, + PushJobStatusRecordKey pushJobDetailsKey, + PushJobDetails pushJobDetailsValue, Set pushJobUserErrorCheckpoints) { - List overallStatuses = pushJobDetails.getOverallStatus(); + List overallStatuses = pushJobDetailsValue.getOverallStatus(); if (overallStatuses.isEmpty()) { return; } @@ -1209,21 +1210,31 @@ static void emitPushJobStatusMetrics( PushJobDetailsStatus overallStatus = PushJobDetailsStatus.valueOf(overallStatuses.get(overallStatuses.size() - 1).getStatus()); if (PushJobDetailsStatus.isTerminal(overallStatus.getValue())) { - String cluster = pushJobDetails.getClusterName().toString(); + String cluster = pushJobDetailsValue.getClusterName().toString(); PushJobStatusStats pushJobStatusStats = pushJobStatusStatsMap.get(cluster); Utf8 incPushKey = new Utf8("incremental.push"); boolean isIncrementalPush = false; - if (pushJobDetails.getPushJobConfigs().containsKey(incPushKey)) { - isIncrementalPush = Boolean.parseBoolean(pushJobDetails.getPushJobConfigs().get(incPushKey).toString()); + if (pushJobDetailsValue.getPushJobConfigs().containsKey(incPushKey)) { + isIncrementalPush = Boolean.parseBoolean(pushJobDetailsValue.getPushJobConfigs().get(incPushKey).toString()); } + StringBuilder logMessage = new StringBuilder(); + logMessage.append("Push job status for store name: ") + .append(pushJobDetailsKey.getStoreName()) + .append(", version: ") + .append(pushJobDetailsKey.getVersionNumber()) + .append(", cluster: ") + .append(cluster); if (PushJobDetailsStatus.isFailed(overallStatus)) { - if (isPushJobFailedDueToUserError(overallStatus, pushJobDetails, pushJobUserErrorCheckpoints)) { + logMessage.append(" failed with status: ").append(overallStatus); + if (isPushJobFailedDueToUserError(overallStatus, pushJobDetailsValue, pushJobUserErrorCheckpoints)) { + logMessage.append(" due to user error"); if (isIncrementalPush) { pushJobStatusStats.recordIncrementalPushFailureDueToUserErrorSensor(); } else { pushJobStatusStats.recordBatchPushFailureDueToUserErrorSensor(); } } else { + logMessage.append(" due to non-user error"); if (isIncrementalPush) { pushJobStatusStats.recordIncrementalPushFailureNotDueToUserErrorSensor(); } else { @@ -1231,6 +1242,7 @@ static void emitPushJobStatusMetrics( } } } else if (PushJobDetailsStatus.isSucceeded(overallStatus)) { + logMessage.append(" succeeded with status: ").append(overallStatus); // Emit metrics for successful push jobs if (isIncrementalPush) { pushJobStatusStats.recordIncrementalPushSuccessSensor(); @@ -1238,9 +1250,19 @@ static void emitPushJobStatusMetrics( pushJobStatusStats.recordBatchPushSuccessSensor(); } } + LOGGER.info( + "{}. Incremental push: {}, push job id: {}, checkpoint: {}", + logMessage.toString(), + isIncrementalPush, + pushJobDetailsValue.getPushId(), + PushJobCheckpoints.valueOf(pushJobDetailsValue.getPushJobLatestCheckpoint())); } } catch (Exception e) { - LOGGER.error("Failed to emit push job status metrics with pushJobDetails: {}", pushJobDetails.toString(), e); + LOGGER.error( + "Failed to emit push job status metrics. pushJobDetails key: {}, value: {}", + pushJobDetailsKey.toString(), + pushJobDetailsValue.toString(), + e); } } @@ -1253,7 +1275,7 @@ static void emitPushJobStatusMetrics( @Override public void sendPushJobDetails(PushJobStatusRecordKey key, PushJobDetails value) { // Emit push job status metrics - emitPushJobStatusMetrics(pushJobStatusStatsMap, value, pushJobUserErrorCheckpoints); + emitPushJobStatusMetrics(pushJobStatusStatsMap, key, value, pushJobUserErrorCheckpoints); // Send push job details to the push job status system store if (pushJobStatusStoreClusterName.isEmpty()) { throw new VeniceException( diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestPushJobStatusStats.java b/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestPushJobStatusStats.java index 92b1ae9dbf..2d4a26089e 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestPushJobStatusStats.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestPushJobStatusStats.java @@ -18,6 +18,7 @@ import com.linkedin.venice.status.PushJobDetailsStatus; import com.linkedin.venice.status.protocol.PushJobDetails; import com.linkedin.venice.status.protocol.PushJobDetailsStatusTuple; +import com.linkedin.venice.status.protocol.PushJobStatusRecordKey; import com.linkedin.venice.utils.DataProviderUtils; import java.util.ArrayList; import java.util.Collections; @@ -38,6 +39,7 @@ public class TestPushJobStatusStats { public void testEmitPushJobStatusMetrics(boolean isIncrementalPush, boolean useUserProvidedUserErrorCheckpoints) { Set userErrorCheckpoints = useUserProvidedUserErrorCheckpoints ? CUSTOM_USER_ERROR_CHECKPOINTS : DEFAULT_PUSH_JOB_USER_ERROR_CHECKPOINTS; + PushJobStatusRecordKey key = new PushJobStatusRecordKey("test", 1); PushJobDetails pushJobDetails = mock(PushJobDetails.class); Map pushJobConfigs = new HashMap<>(); pushJobConfigs.put(new Utf8("incremental.push"), String.valueOf(isIncrementalPush)); @@ -65,7 +67,7 @@ public void testEmitPushJobStatusMetrics(boolean isIncrementalPush, boolean useU for (PushJobCheckpoints checkpoint: PushJobCheckpoints.values()) { when(pushJobDetails.getPushJobLatestCheckpoint()).thenReturn(checkpoint.getValue()); - emitPushJobStatusMetrics(pushJobStatusStatsMap, pushJobDetails, userErrorCheckpoints); + emitPushJobStatusMetrics(pushJobStatusStatsMap, key, pushJobDetails, userErrorCheckpoints); boolean isUserError = userErrorCheckpoints.contains(checkpoint); if (isUserError) {