From 97df1e70e9eba07d8987099ba3007206004219a0 Mon Sep 17 00:00:00 2001 From: "Xin(Adam) Chen" Date: Tue, 3 Oct 2023 11:25:44 -0700 Subject: [PATCH] [controller] Handle NPE for incremental push reporting (#677) * throw exception with partition id when hit NPE --- .../linkedin/venice/pushmonitor/OfflinePushStatus.java | 4 ++++ .../venice/pushmonitor/OfflinePushStatusTest.java | 9 +++++++++ 2 files changed, 13 insertions(+) diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/pushmonitor/OfflinePushStatus.java b/internal/venice-common/src/main/java/com/linkedin/venice/pushmonitor/OfflinePushStatus.java index b203ecad5c..2d0bbd308b 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/pushmonitor/OfflinePushStatus.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/pushmonitor/OfflinePushStatus.java @@ -208,6 +208,10 @@ public Map> getIncrementalPushStatus( for (PartitionStatus partitionStatus: getPartitionStatuses()) { Map partitionPushStatus = new HashMap<>(); Partition partition = partitionAssignment.getPartition(partitionStatus.getPartitionId()); + if (partition == null) { + throw new VeniceException( + "Partition " + partitionStatus.getPartitionId() + " not found in partition assignment"); + } Set workingInstances = partition.getWorkingInstances().stream().map(Instance::getNodeId).collect(Collectors.toSet()); for (ReplicaStatus replicaStatus: partitionStatus.getReplicaStatuses()) { diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/pushmonitor/OfflinePushStatusTest.java b/internal/venice-common/src/test/java/com/linkedin/venice/pushmonitor/OfflinePushStatusTest.java index 633850c139..222c5c4673 100644 --- a/internal/venice-common/src/test/java/com/linkedin/venice/pushmonitor/OfflinePushStatusTest.java +++ b/internal/venice-common/src/test/java/com/linkedin/venice/pushmonitor/OfflinePushStatusTest.java @@ -9,6 +9,7 @@ import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.meta.OfflinePushStrategy; +import com.linkedin.venice.meta.PartitionAssignment; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -163,6 +164,14 @@ public void testCloneOfflinePushStatus() { Assert.assertNotEquals(clonedPush, offlinePushStatus); } + @Test(expectedExceptions = VeniceException.class, expectedExceptionsMessageRegExp = "Partition 0 not found in partition assignment") + public void testNPECaughtWhenPollingIncPushStatus() { + OfflinePushStatus offlinePushStatus = + new OfflinePushStatus(kafkaTopic, numberOfPartition, replicationFactor, strategy); + PartitionAssignment partitionAssignment = new PartitionAssignment("test-topic", 10); + offlinePushStatus.getIncrementalPushStatus(partitionAssignment, "ignore"); + } + private void testValidTargetStatuses(ExecutionStatus from, ExecutionStatus... statuses) { for (ExecutionStatus status: statuses) { OfflinePushStatus offlinePushStatus =