Skip to content

Commit

Permalink
[da-vinci][server] Alert on failed ingestions for both incomplete and…
Browse files Browse the repository at this point in the history
… completed partitions (#727)

Problem:
The "ingestion_task_errored_gauge" metric checks ingestion errors for
partitions which explicitly reports ERROR to ZK. However, for COMPLETED
partition, ERROR will not be reported in order to protect read path,
but ingestion will stop without any alerts until we see a high lag
after rolling bounce or users notice data staleness/inconsistency.

Fix:
Added a new set to track the partition IDs whose ingestion has failed
and stopped. Life cycles of the partitions IDs in the set:
1. When a partition encounters exceptions, update both the
   partition-to-exception list and failed-partitions set;
2. When an unsubscribe action is triggered by Helix for the failed
   partition, remove it from both the partition-to-exception list and
   failed-partitions set;
3. When an unsubscribe action is triggered by internal logic, remove
   the partition from the partition-to-exception list (stop logging)
   but keep it in the failed-partitions set (keep alerting);
4. When an subscribe action happens for the failed partition, regardless
   of who triggers it, remove it from both trackings, since ingestion
   will start over again.
  • Loading branch information
huangminchn authored Nov 3, 2023
1 parent 0a04e37 commit f27bcc1
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,39 +26,48 @@ public class ConsumerAction implements Comparable<ConsumerAction> {

private long createTimestampInMs = System.currentTimeMillis();

private boolean isHelixTriggeredAction = true;
private CompletableFuture<Void> future = new CompletableFuture<>();

public ConsumerAction(ConsumerActionType type, PubSubTopicPartition topicPartition, int sequenceNumber) {
this(type, topicPartition, sequenceNumber, null, Optional.empty());
public ConsumerAction(
ConsumerActionType type,
PubSubTopicPartition topicPartition,
int sequenceNumber,
boolean isHelixTriggeredAction) {
this(type, topicPartition, sequenceNumber, null, Optional.empty(), isHelixTriggeredAction);
}

public ConsumerAction(
ConsumerActionType type,
PubSubTopicPartition topicPartition,
int sequenceNumber,
LeaderFollowerPartitionStateModel.LeaderSessionIdChecker checker) {
this(type, topicPartition, sequenceNumber, checker, Optional.empty());
LeaderFollowerPartitionStateModel.LeaderSessionIdChecker checker,
boolean isHelixTriggeredAction) {
this(type, topicPartition, sequenceNumber, checker, Optional.empty(), isHelixTriggeredAction);
}

public ConsumerAction(
ConsumerActionType type,
PubSubTopicPartition topicPartition,
int sequenceNumber,
Optional<LeaderFollowerStateType> leaderState) {
this(type, topicPartition, sequenceNumber, null, leaderState);
Optional<LeaderFollowerStateType> leaderState,
boolean isHelixTriggeredAction) {
this(type, topicPartition, sequenceNumber, null, leaderState, isHelixTriggeredAction);
}

private ConsumerAction(
ConsumerActionType type,
PubSubTopicPartition topicPartition,
int sequenceNumber,
LeaderFollowerPartitionStateModel.LeaderSessionIdChecker checker,
Optional<LeaderFollowerStateType> leaderState) {
Optional<LeaderFollowerStateType> leaderState,
boolean isHelixTriggeredAction) {
this.type = type;
this.topicPartition = Utils.notNull(topicPartition);
this.sequenceNumber = sequenceNumber;
this.checker = checker;
this.leaderState = leaderState.orElse(LeaderFollowerStateType.STANDBY);
this.isHelixTriggeredAction = isHelixTriggeredAction;
}

public ConsumerActionType getType() {
Expand Down Expand Up @@ -105,6 +114,10 @@ public CompletableFuture<Void> getFuture() {
return future;
}

public boolean isHelixTriggeredAction() {
return isHelixTriggeredAction;
}

@Override
public String toString() {
return "KafkaTaskMessage{" + "type=" + type + ", topic='" + getTopic() + '\'' + ", partition=" + getPartition()
Expand Down Expand Up @@ -161,6 +174,12 @@ public int hashCode() {
* value for no meaning.
*/
public static ConsumerAction createKillAction(PubSubTopic topic, int sequenceNumber) {
return new ConsumerAction(ConsumerActionType.KILL, new PubSubTopicPartitionImpl(topic, 0), sequenceNumber);
return new ConsumerAction(
ConsumerActionType.KILL,
new PubSubTopicPartitionImpl(topic, 0),
sequenceNumber,
null,
Optional.empty(),
false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,8 @@ public synchronized void promoteToLeader(
STANDBY_TO_LEADER,
new PubSubTopicPartitionImpl(topicPartition.getPubSubTopic(), subPartition),
nextSeqNum(),
checker));
checker,
true));
});
}

Expand All @@ -333,7 +334,8 @@ public synchronized void demoteToStandby(
LEADER_TO_STANDBY,
new PubSubTopicPartitionImpl(topicPartition.getPubSubTopic(), subPartition),
nextSeqNum(),
checker));
checker,
true));
});
}

Expand Down Expand Up @@ -461,7 +463,7 @@ protected void processConsumerAction(ConsumerAction message, Store store) throws
endSegment(partition);
break;
default:
processCommonConsumerAction(operation, message.getTopicPartition(), message.getLeaderState());
processCommonConsumerAction(message);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.PriorityBlockingQueue;
Expand Down Expand Up @@ -196,6 +197,11 @@ public abstract class StoreIngestionTask implements Runnable, Closeable {
* or re-subscribing the partition.
*/
private final List<PartitionExceptionInfo> partitionIngestionExceptionList = new SparseConcurrentList<>();
/**
* Do not remove partition from this set unless Helix explicitly sends an unsubscribe action for the partition to
* remove it from the node, or Helix sends a subscribe action for the partition to re-subscribe it to the node.
*/
private final Set<Integer> failedPartitions = new ConcurrentSkipListSet<>();

/** Persists the exception thrown by {@link KafkaConsumerService}. */
private Exception lastConsumerException = null;
Expand Down Expand Up @@ -514,12 +520,19 @@ private void waitForStateVersion(String kafkaTopic) {
}
}

public synchronized void subscribePartition(
PubSubTopicPartition topicPartition,
Optional<LeaderFollowerStateType> leaderState) {
subscribePartition(topicPartition, leaderState, true);
}

/**
* Adds an asynchronous partition subscription request for the task.
*/
public synchronized void subscribePartition(
PubSubTopicPartition topicPartition,
Optional<LeaderFollowerStateType> leaderState) {
Optional<LeaderFollowerStateType> leaderState,
boolean isHelixTriggeredAction) {
throwIfNotRunning();
statusReportAdapter.initializePartitionReportStatus(topicPartition.getPartitionNumber());
amplificationFactorAdapter.execute(topicPartition.getPartitionNumber(), subPartition -> {
Expand All @@ -530,14 +543,21 @@ public synchronized void subscribePartition(
SUBSCRIBE,
new PubSubTopicPartitionImpl(topicPartition.getPubSubTopic(), subPartition),
nextSeqNum(),
amplificationFactorAdapter.isLeaderSubPartition(subPartition) ? leaderState : Optional.empty()));
amplificationFactorAdapter.isLeaderSubPartition(subPartition) ? leaderState : Optional.empty(),
isHelixTriggeredAction));
});
}

public synchronized CompletableFuture<Void> unSubscribePartition(PubSubTopicPartition topicPartition) {
return unSubscribePartition(topicPartition, true);
}

/**
* Adds an asynchronous partition unsubscription request for the task.
*/
public synchronized CompletableFuture<Void> unSubscribePartition(PubSubTopicPartition topicPartition) {
public synchronized CompletableFuture<Void> unSubscribePartition(
PubSubTopicPartition topicPartition,
boolean isHelixTriggeredAction) {
throwIfNotRunning();
List<CompletableFuture<Void>> futures = new ArrayList<>();
amplificationFactorAdapter.execute(topicPartition.getPartitionNumber(), subPartition -> {
Expand All @@ -546,7 +566,8 @@ public synchronized CompletableFuture<Void> unSubscribePartition(PubSubTopicPart
ConsumerAction consumerAction = new ConsumerAction(
UNSUBSCRIBE,
new PubSubTopicPartitionImpl(topicPartition.getPubSubTopic(), subPartition),
nextSeqNum());
nextSeqNum(),
isHelixTriggeredAction);

consumerActionsQueue.add(consumerAction);
futures.add(consumerAction.getFuture());
Expand All @@ -571,7 +592,8 @@ public synchronized void resetPartitionConsumptionOffset(PubSubTopicPartition to
new ConsumerAction(
RESET_OFFSET,
new PubSubTopicPartitionImpl(topicPartition.getPubSubTopic(), subPartition),
nextSeqNum()));
nextSeqNum(),
false));
});
}

Expand Down Expand Up @@ -1107,6 +1129,11 @@ List<PartitionExceptionInfo> getPartitionIngestionExceptionList() {
return this.partitionIngestionExceptionList;
}

// For testing purpose
Set<Integer> getFailedPartitions() {
return this.failedPartitions;
}

private void processIngestionException() {
partitionIngestionExceptionList.forEach(partitionExceptionInfo -> {
int exceptionPartition = partitionExceptionInfo.getPartitionId();
Expand Down Expand Up @@ -1146,7 +1173,7 @@ private void processIngestionException() {
pubSubTopicPartition.getPartitionNumber());
runnableForKillIngestionTasksForNonCurrentVersions.run();
if (storageEngine.hasMemorySpaceLeft()) {
unSubscribePartition(pubSubTopicPartition);
unSubscribePartition(pubSubTopicPartition, false);
/**
* DaVinci ingestion hits memory limit and we would like to retry it in the following way:
* 1. Kill the ingestion tasks for non-current versions.
Expand All @@ -1161,7 +1188,7 @@ private void processIngestionException() {
exceptionPartition);
storageEngine.reopenStoragePartition(exceptionPartition);
// DaVinci is always a follower.
subscribePartition(pubSubTopicPartition, Optional.empty());
subscribePartition(pubSubTopicPartition, Optional.empty(), false);
}
} else {
if (!partitionConsumptionState.isCompletionReported()) {
Expand All @@ -1177,7 +1204,8 @@ private void processIngestionException() {
}
// Unsubscribe the partition to avoid more damages.
if (partitionConsumptionStateMap.containsKey(exceptionPartition)) {
unSubscribePartition(new PubSubTopicPartitionImpl(versionTopic, exceptionPartition));
// This is not an unsubscribe action from Helix
unSubscribePartition(new PubSubTopicPartitionImpl(versionTopic, exceptionPartition), false);
}
}
}
Expand Down Expand Up @@ -1688,16 +1716,19 @@ private void checkConsumptionStateWhenStart(
}
}

protected void processCommonConsumerAction(
ConsumerActionType operation,
PubSubTopicPartition topicPartition,
LeaderFollowerStateType leaderState) throws InterruptedException {
protected void processCommonConsumerAction(ConsumerAction consumerAction) throws InterruptedException {
PubSubTopicPartition topicPartition = consumerAction.getTopicPartition();
LeaderFollowerStateType leaderState = consumerAction.getLeaderState();
int partition = topicPartition.getPartitionNumber();
String topic = topicPartition.getPubSubTopic().getName();
ConsumerActionType operation = consumerAction.getType();
switch (operation) {
case SUBSCRIBE:
// Clear the error partition tracking
partitionIngestionExceptionList.set(partition, null);
// Regardless of whether it's Helix action or not, remove the partition from alerts as long as server decides
// to start or retry the ingestion.
failedPartitions.remove(partition);
// Drain the buffered message by last subscription.
storeBufferService.drainBufferedRecordsFromTopicPartition(topicPartition);
subscribedCount++;
Expand Down Expand Up @@ -1809,6 +1840,12 @@ protected void processCommonConsumerAction(
} else {
LOGGER.info("{} Unsubscribed to: {}", consumerTaskId, topicPartition);
}
// Only remove the partition from the maintained set if it's triggered by Helix.
// Otherwise, keep it in the set since it is used for alerts; alerts should be sent out if unsubscription
// happens due to internal errors.
if (consumerAction.isHelixTriggeredAction()) {
failedPartitions.remove(partition);
}
break;
case RESET_OFFSET:
resetOffset(partition, topicPartition, false);
Expand Down Expand Up @@ -1859,6 +1896,7 @@ private void resetOffset(int partition, PubSubTopicPartition topicPartition, boo
storageUtilizationManager.initPartition(partition);
// Reset the error partition tracking
partitionIngestionExceptionList.set(partition, null);
failedPartitions.remove(partition);
} else {
LOGGER.info(
"{} No need to reset offset by Kafka consumer, since the consumer is not subscribing: {}",
Expand Down Expand Up @@ -1949,6 +1987,10 @@ public boolean hasAnyPartitionConsumptionState(Predicate<PartitionConsumptionSta
return false;
}

public int getFailedIngestionPartitionCount() {
return failedPartitions.size();
}

/**
* Common record check for different state models:
* check whether server continues receiving messages after EOP for a batch-only store.
Expand Down Expand Up @@ -2288,6 +2330,7 @@ void setIngestionException(int partitionId, Exception e) {
replicaCompleted = true;
}
partitionIngestionExceptionList.set(partitionId, new PartitionExceptionInfo(e, partitionId, replicaCompleted));
failedPartitions.add(partitionId);
}

public void setLastConsumerException(Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,10 +199,9 @@ public int getIngestionTaskErroredGauge() {
if (!hasActiveIngestionTask()) {
return 0;
}
boolean anyErrorReported =
ingestionTask.hasAnyPartitionConsumptionState(PartitionConsumptionState::isErrorReported);
int totalFailedIngestionPartitions = ingestionTask.getFailedIngestionPartitionCount();
boolean anyCompleted = ingestionTask.hasAnyPartitionConsumptionState(PartitionConsumptionState::isComplete);
return anyCompleted && anyErrorReported ? 1 : 0;
return anyCompleted ? totalFailedIngestionPartitions : 0;
}

public long getBatchReplicationLag() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,33 +81,34 @@ public void testQueueReturnActionsWithDifferentPriorityInCorrectOrder() {
public void testEqualsAndHashCode() {
PubSubTopicPartition pubSubTopicPartition1 =
new PubSubTopicPartitionImpl(pubSubTopicRepository.getTopic("blah_rt"), 0);
ConsumerAction ca1 = new ConsumerAction(ConsumerActionType.RESUME, pubSubTopicPartition1, 0);
ConsumerAction ca1 = new ConsumerAction(ConsumerActionType.RESUME, pubSubTopicPartition1, 0, false);

PubSubTopicPartition pubSubTopicPartition2 =
new PubSubTopicPartitionImpl(pubSubTopicRepository.getTopic("blah_rt"), 1);
ConsumerAction ca2 = new ConsumerAction(ConsumerActionType.RESUME, pubSubTopicPartition2, 0);
ConsumerAction ca2 = new ConsumerAction(ConsumerActionType.RESUME, pubSubTopicPartition2, 0, false);

assertNotEquals(ca1, ca2);
assertNotEquals(ca1.hashCode(), ca2.hashCode());
assertFalse(ca1.equals(null));

PubSubTopicPartition pubSubTopicPartition3 =
new PubSubTopicPartitionImpl(pubSubTopicRepository.getTopic("blah_rt"), 1);
ConsumerAction ca3 = new ConsumerAction(ConsumerActionType.RESUME, pubSubTopicPartition3, 0);
ConsumerAction ca3 = new ConsumerAction(ConsumerActionType.RESUME, pubSubTopicPartition3, 0, false);

assertEquals(ca2, ca3);
assertEquals(ca2.hashCode(), ca3.hashCode());

ConsumerAction ca4 = new ConsumerAction(ConsumerActionType.RESUME, pubSubTopicPartition3, 1);
ConsumerAction ca4 = new ConsumerAction(ConsumerActionType.RESUME, pubSubTopicPartition3, 1, false);
assertNotEquals(ca2, ca4);
assertNotEquals(ca3, ca4);

ConsumerAction ca5 = new ConsumerAction(ConsumerActionType.KILL, pubSubTopicPartition3, 0);
ConsumerAction ca5 = new ConsumerAction(ConsumerActionType.KILL, pubSubTopicPartition3, 0, false);
assertNotEquals(ca2, ca5);
assertNotEquals(ca3, ca5);
assertNotEquals(ca4, ca5);

ConsumerAction ca6 = new ConsumerAction(ConsumerActionType.KILL, pubSubTopicPartition3, 0, Optional.of(LEADER));
ConsumerAction ca6 =
new ConsumerAction(ConsumerActionType.KILL, pubSubTopicPartition3, 0, Optional.of(LEADER), false);
assertNotEquals(ca2, ca6);
assertNotEquals(ca3, ca6);
assertNotEquals(ca4, ca6);
Expand All @@ -122,7 +123,8 @@ private ConsumerAction getRandomAction(String topic, int partition, int sequence
return new ConsumerAction(
actionTypes[index],
new PubSubTopicPartitionImpl(pubSubTopicRepository.getTopic(topic), partition),
sequenceNumber);
sequenceNumber,
false);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2446,6 +2446,7 @@ public void testPartitionExceptionIsolation(boolean isActiveActiveReplicationEna
assertNull(
storeIngestionTaskUnderTest.getPartitionIngestionExceptionList().get(PARTITION_FOO),
"Exception for the errored partition should be cleared after unsubscription");
assertEquals(storeIngestionTaskUnderTest.getFailedPartitions().size(), 1, "Only one partition should be failed");
}, isActiveActiveReplicationEnabled);
for (int i = 0; i < 10000; ++i) {
storeIngestionTaskUnderTest
Expand Down

0 comments on commit f27bcc1

Please sign in to comment.