Skip to content

Commit

Permalink
[server] CV state cleanup during standby -> offline and pre-start (#668)
Browse files Browse the repository at this point in the history
* [server] CV state cleanup during standby -> offline and pre-start

In the past, CV state will remain 'Completed' when the node is going through
a rebalance/restart/ungraceful shutdown, which would lead Venice Router continue
to send traffic to unavailable replicas, and this hasn't caused any unhealthy
requests yet because of the error retry mechanism in Router.

To fix this issue cleanly, this code change introduces the following strategies:
1. Remove CV states when replica is transitioning from standby to offline.
2. Remove CV states for all the hosted partitions when stopping Venice Server instances.
3. Remove CV states for all the hosted partitions before starting the Venice Server instances
   to handle ungraceful shutdown.

This code change also adds the handling of missing notification types in main process when
II is enabled.

* Fixed integration test failure

* Addressed review comments

* Addressed comments
  • Loading branch information
gaojieliu authored Oct 5, 2023
1 parent 6d6a661 commit 47721fa
Show file tree
Hide file tree
Showing 20 changed files with 327 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,10 @@ public void rollbackOnError(Message message, NotificationContext context, StateT
/**
* When state transition fails, we shouldn't remove the corresponding database here since the database could
* be either recovered by bounce/Helix Reset or completely dropped after going through 'ERROR' to 'DROPPED' state transition.
*
* Also the CV state will be updated to `ERROR` state, no need to remove it from here.
*/
stopConsumption();
stopConsumption(false);
}, true);
}

Expand All @@ -168,7 +170,7 @@ public void onBecomeDroppedFromError(Message message, NotificationContext contex
@Override
public void reset() {
try {
stopConsumption();
stopConsumption(false);
} catch (Exception e) {
logger.error(
"Error when trying to stop any ongoing consumption during reset for: {}",
Expand Down Expand Up @@ -352,8 +354,22 @@ private void initializePartitionPushStatus() {
}
}

protected void stopConsumption() {
ingestionBackend.stopConsumption(storeAndServerConfigs, partition);
protected void stopConsumption(boolean dropCVState) {
CompletableFuture<Void> future = ingestionBackend.stopConsumption(storeAndServerConfigs, partition);
if (dropCVState) {
future.whenComplete((ignored, throwable) -> {
if (throwable != null) {
logger.warn(
"Failed to stop consumption for resource: {}, partition: {}",
storeAndServerConfigs.getStoreVersionName(),
partition);
}
/**
* Drop CV state anyway, and it may not work well in error condition, but it is fine since it is still a best effort.
*/
removeCustomizedState();
});
}
}

protected VeniceIngestionBackend getIngestionBackend() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
import io.tehuti.metrics.MetricsRepository;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
Expand Down Expand Up @@ -228,6 +230,11 @@ public boolean startInner() {
public void stopInner() throws IOException {
LOGGER.info("Attempting to stop HelixParticipation service.");
ingestionBackend.prepareForShutdown();
if (helixManager != null) {
resetAllInstanceCVStates(partitionPushStatusAccessor, storageService, logger);
} else {
logger.error("Can't reset instance CV states since HelixManager is null");
}
if (helixManager != null) {
try {
helixManager.disconnect();
Expand Down Expand Up @@ -320,6 +327,13 @@ private void asyncStart() {
instance.getNodeId());

ingestionBackend.addPushStatusNotifier(pushMonitorNotifier);
/**
* The accessor can only get created successfully after helix manager is created.
*/
partitionPushStatusAccessor = new HelixPartitionStatusAccessor(
helixManager.getOriginalManager(),
instance.getNodeId(),
veniceConfigLoader.getVeniceServerConfig().isHelixHybridStoreQuotaEnabled());

CompletableFuture.runAsync(() -> {
try {
Expand All @@ -329,6 +343,8 @@ private void asyncStart() {
// our
// TODO checking, so we could use HelixManager to get some metadata instead of creating a new zk connection.
checkBeforeJoinInCluster();
helixManager
.addPreConnectCallback(() -> resetAllInstanceCVStates(partitionPushStatusAccessor, storageService, logger));
helixManager.connect();
managerFuture.complete(helixManager);
} catch (Exception e) {
Expand All @@ -337,13 +353,6 @@ private void asyncStart() {
Utils.exit("Failed to start HelixParticipationService");
}

/**
* The accessor can only get created successfully after helix manager is created.
*/
partitionPushStatusAccessor = new HelixPartitionStatusAccessor(
helixManager.getOriginalManager(),
instance.getNodeId(),
veniceConfigLoader.getVeniceServerConfig().isHelixHybridStoreQuotaEnabled());
PartitionPushStatusNotifier partitionPushStatusNotifier =
new PartitionPushStatusNotifier(partitionPushStatusAccessor);
ingestionBackend.addPushStatusNotifier(partitionPushStatusNotifier);
Expand All @@ -360,6 +369,26 @@ private void asyncStart() {
});
}

static void resetAllInstanceCVStates(
HelixPartitionStatusAccessor accessor,
StorageService storageService,
Logger currentLogger) {
// Get all hosted stores
currentLogger.info("Started resetting all instance CV states");
Map<String, Set<Integer>> storePartitionMapping = storageService.getStoreAndUserPartitionsMapping();
storePartitionMapping.forEach((storeName, partitionIds) -> {
partitionIds.forEach(partitionId -> {
try {
accessor.deleteReplicaStatus(storeName, partitionId);
} catch (Exception e) {
currentLogger
.error("Failed to delete CV state for resource: {} and partition id: {}", storeName, partitionId, e);
}
});
});
currentLogger.info("Finished resetting all instance CV states");
}

// test only
public void replaceAndAddTestIngestionNotifier(VeniceNotifier notifier) {
ingestionBackend.replaceAndAddTestPushStatusNotifier(notifier);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ public void onBecomeStandbyFromLeader(Message message, NotificationContext conte

@Transition(to = HelixState.OFFLINE_STATE, from = HelixState.STANDBY_STATE)
public void onBecomeOfflineFromStandby(Message message, NotificationContext context) {
executeStateTransition(message, context, this::stopConsumption);
executeStateTransition(message, context, () -> stopConsumption(true));
}

@Transition(to = HelixState.DROPPED_STATE, from = HelixState.OFFLINE_STATE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -65,8 +66,8 @@ public void startConsumption(
}

@Override
public void stopConsumption(VeniceStoreVersionConfig storeConfig, int partition) {
getStoreIngestionService().stopConsumption(storeConfig, partition);
public CompletableFuture<Void> stopConsumption(VeniceStoreVersionConfig storeConfig, int partition) {
return getStoreIngestionService().stopConsumption(storeConfig, partition);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.linkedin.davinci.storage.StorageService;
import java.io.Closeable;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;


public interface IngestionBackendBase extends Closeable {
Expand All @@ -20,7 +21,7 @@ void startConsumption(
int partition,
Optional<LeaderFollowerStateType> leaderState);

void stopConsumption(VeniceStoreVersionConfig storeConfig, int partition);
CompletableFuture<Void> stopConsumption(VeniceStoreVersionConfig storeConfig, int partition);

void killConsumptionTask(String topicName);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.linkedin.venice.utils.locks.AutoCloseableSingleLock;
import io.tehuti.metrics.MetricsRepository;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -109,14 +110,26 @@ public void startConsumption(
}

@Override
public void stopConsumption(VeniceStoreVersionConfig storeConfig, int partition) {
public CompletableFuture<Void> stopConsumption(VeniceStoreVersionConfig storeConfig, int partition) {
final CompletableFuture<Void> future = new CompletableFuture<>();
String topicName = storeConfig.getStoreVersionName();
executeCommandWithRetry(
topicName,
partition,
STOP_CONSUMPTION,
() -> getMainIngestionRequestClient().stopConsumption(storeConfig.getStoreVersionName(), partition),
() -> super.stopConsumption(storeConfig, partition));
executeCommandWithRetry(topicName, partition, STOP_CONSUMPTION, () -> {
/**
* For stopping consumption in II process, it is not easy to acknowledge when the action is finished.
* So we will go ahead to mark the future as completed right away.
*/
boolean res = getMainIngestionRequestClient().stopConsumption(storeConfig.getStoreVersionName(), partition);
future.complete(null);
return res;
}, () -> super.stopConsumption(storeConfig, partition).whenComplete((ignored, throwable) -> {
if (throwable != null) {
future.completeExceptionally(throwable);
} else {
future.complete(null);
}
}));

return future;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,4 +107,11 @@ public void progress(String kafkaTopic, int partitionId, long offset, String mes
isolatedIngestionServer.reportIngestionStatus(report);
}

@Override
public void stopped(String kafkaTopic, int partitionId, long offset) {
IngestionTaskReport report =
createIngestionTaskReport(IngestionReportType.STOPPED, kafkaTopic, partitionId, offset, "");
isolatedIngestionServer.reportIngestionStatus(report);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ void handleIngestionReport(IngestionTaskReport report) {
String topicName = report.topicName.toString();
int partitionId = report.partitionId;
long offset = report.offset;
String message = report.message.toString();
LOGGER.info(
"Received ingestion report {} for topic: {}, partition: {} from ingestion service. ",
reportType.name(),
Expand All @@ -114,7 +115,7 @@ void handleIngestionReport(IngestionTaskReport report) {
LeaderFollowerStateType leaderFollowerStateType = LeaderFollowerStateType.valueOf(report.leaderFollowerState);
notifierHelper(
notifier -> notifier
.completed(topicName, partitionId, report.offset, "", Optional.of(leaderFollowerStateType)));
.completed(topicName, partitionId, report.offset, message, Optional.of(leaderFollowerStateType)));
break;
case ERROR:
mainIngestionMonitorService.setVersionPartitionToLocalIngestion(topicName, partitionId);
Expand Down Expand Up @@ -150,6 +151,12 @@ void handleIngestionReport(IngestionTaskReport report) {
case TOPIC_SWITCH_RECEIVED:
notifierHelper(notifier -> notifier.topicSwitchReceived(topicName, partitionId, offset));
break;
case DATA_RECOVERY_COMPLETED:
notifierHelper(notifier -> notifier.dataRecoveryCompleted(topicName, partitionId, offset, message));
break;
case STOPPED:
notifierHelper(notifier -> notifier.stopped(topicName, partitionId, offset));
break;
default:
LOGGER.warn("Received unsupported ingestion report: {} it will be ignored for now.", report);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.linkedin.venice.pubsub.api.PubSubTopicPartition;
import io.tehuti.utils.Utils;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;


/**
Expand All @@ -25,6 +26,8 @@ public class ConsumerAction implements Comparable<ConsumerAction> {

private long createTimestampInMs = System.currentTimeMillis();

private CompletableFuture<Void> future = new CompletableFuture<>();

public ConsumerAction(ConsumerActionType type, PubSubTopicPartition topicPartition, int sequenceNumber) {
this(type, topicPartition, sequenceNumber, null, Optional.empty());
}
Expand Down Expand Up @@ -98,6 +101,10 @@ public long getCreateTimestampInMs() {
return createTimestampInMs;
}

public CompletableFuture<Void> getFuture() {
return future;
}

@Override
public String toString() {
return "KafkaTaskMessage{" + "type=" + type + ", topic='" + getTopic() + '\'' + ", partition=" + getPartition()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -899,16 +899,17 @@ private int getStoreMaximumVersionNumber(String storeName) {
* @param partitionId Venice partition's id.
*/
@Override
public void stopConsumption(VeniceStoreVersionConfig veniceStore, int partitionId) {
public CompletableFuture<Void> stopConsumption(VeniceStoreVersionConfig veniceStore, int partitionId) {
final String topic = veniceStore.getStoreVersionName();

try (AutoCloseableLock ignore = topicLockManager.getLockForResource(topic)) {
StoreIngestionTask ingestionTask = topicNameToIngestionTaskMap.get(topic);
if (ingestionTask != null && ingestionTask.isRunning()) {
ingestionTask
return ingestionTask
.unSubscribePartition(new PubSubTopicPartitionImpl(pubSubTopicRepository.getTopic(topic), partitionId));
} else {
LOGGER.warn("Ignoring stop consumption message for Topic {} Partition {}", topic, partitionId);
return CompletableFuture.completedFuture(null);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.linkedin.davinci.storage.MetadataRetriever;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;


Expand Down Expand Up @@ -36,7 +37,7 @@ default void startConsumption(VeniceStoreVersionConfig veniceStore, int partitio
* @param veniceStore Venice Store for the partition.
* @param partitionId Venice partition's id.
*/
void stopConsumption(VeniceStoreVersionConfig veniceStore, int partitionId);
CompletableFuture<Void> stopConsumption(VeniceStoreVersionConfig veniceStore, int partitionId);

/**
* Stops consuming messages from Kafka Partition corresponding to Venice Partition and wait up to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@
import java.util.Properties;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -529,17 +530,22 @@ public synchronized void subscribePartition(
/**
* Adds an asynchronous partition unsubscription request for the task.
*/
public synchronized void unSubscribePartition(PubSubTopicPartition topicPartition) {
public synchronized CompletableFuture<Void> unSubscribePartition(PubSubTopicPartition topicPartition) {
throwIfNotRunning();
List<CompletableFuture<Void>> futures = new ArrayList<>();
amplificationFactorAdapter.execute(topicPartition.getPartitionNumber(), subPartition -> {
partitionToPendingConsumerActionCountMap.computeIfAbsent(subPartition, x -> new AtomicInteger(0))
.incrementAndGet();
consumerActionsQueue.add(
new ConsumerAction(
UNSUBSCRIBE,
new PubSubTopicPartitionImpl(topicPartition.getPubSubTopic(), subPartition),
nextSeqNum()));
ConsumerAction consumerAction = new ConsumerAction(
UNSUBSCRIBE,
new PubSubTopicPartitionImpl(topicPartition.getPubSubTopic(), subPartition),
nextSeqNum());

consumerActionsQueue.add(consumerAction);
futures.add(consumerAction.getFuture());
});

return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
}

public boolean hasAnySubscription() {
Expand Down Expand Up @@ -1479,6 +1485,7 @@ private void processConsumerActions(Store store) throws InterruptedException {
LatencyUtils.getElapsedTimeInMs(action.getCreateTimestampInMs()));
action.incrementAttempt();
processConsumerAction(action, store);
action.getFuture().complete(null);
// Remove the action that is processed recently (not necessarily the head of consumerActionsQueue).
if (consumerActionsQueue.remove(action)) {
partitionToPendingConsumerActionCountMap.get(action.getPartition()).decrementAndGet();
Expand All @@ -1488,6 +1495,7 @@ private void processConsumerActions(Store store) throws InterruptedException {
action,
LatencyUtils.getElapsedTimeInMs(actionProcessStartTimeInMs));
} catch (VeniceIngestionTaskKilledException | InterruptedException e) {
action.getFuture().completeExceptionally(e);
throw e;
} catch (Throwable e) {
if (action.getAttemptsCount() <= MAX_CONSUMER_ACTION_ATTEMPTS) {
Expand All @@ -1500,6 +1508,8 @@ private void processConsumerActions(Store store) throws InterruptedException {
action.getAttemptsCount(),
LatencyUtils.getElapsedTimeInMs(actionProcessStartTimeInMs),
e);
// Mark action as failed since it has exhausted all the retries.
action.getFuture().completeExceptionally(e);
// After MAX_CONSUMER_ACTION_ATTEMPTS retries we should give up and error the ingestion task.
PartitionConsumptionState state = partitionConsumptionStateMap.get(action.getPartition());

Expand Down
Loading

0 comments on commit 47721fa

Please sign in to comment.