Skip to content

Commit

Permalink
[da-vinci][controller] Filter out status reporting from the nodes, wh…
Browse files Browse the repository at this point in the history
…ere the DaVinci app is still bootstrapping (linkedin#1203)

* [da-vinci] Disabled status reporting when the DaVinci app is still bootstrapping

In Prod, we are seeing the bootstrapping nodes are blocking the
new push, and this PR disables the status reporting if the app
is still bootstrapping.
This feature is especially important for DaVinci apps, which are
using multiple large stores and bootstrapping from an empty state
can take a long time and it can delay the new push significantly.

* Added the following enhancements:
1. Subscription won't return if there are still active current version bootstrapping
   to avoid serving stale current version.
2. Delete the heartbeat entry for the current instance if the instance is bootstrapping
   to avoid Controller mark the bootstrapping node as crashed, which can fail the
   new push jobs.
  • Loading branch information
gaojieliu authored Oct 2, 2024
1 parent 06cc1fc commit b9685bb
Show file tree
Hide file tree
Showing 22 changed files with 551 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
Expand Down Expand Up @@ -446,7 +447,8 @@ private synchronized void bootstrap() {
storageMetadataService,
ingestionService,
storageService,
blobTransferManager)
blobTransferManager,
this::getVeniceCurrentVersionNumber)
: new DefaultIngestionBackend(
storageMetadataService,
ingestionService,
Expand Down Expand Up @@ -656,6 +658,11 @@ Version getVeniceCurrentVersion(String storeName) {
}
}

int getVeniceCurrentVersionNumber(String storeName) {
Version currentVersion = getVeniceCurrentVersion(storeName);
return currentVersion == null ? -1 : currentVersion.getNumber();
}

private Version getVeniceLatestNonFaultyVersion(Store store, Set<Integer> faultyVersions) {
Version latestNonFaultyVersion = null;
for (Version version: store.getVersions()) {
Expand Down Expand Up @@ -818,4 +825,30 @@ static ExecutionStatus getDaVinciErrorStatus(Exception e, boolean useDaVinciSpec
}
return status;
}

public boolean hasCurrentVersionBootstrapping() {
return ingestionBackend.hasCurrentVersionBootstrapping();
}

static class BootstrappingAwareCompletableFuture {
private ScheduledExecutorService scheduledExecutor =
Executors.newSingleThreadScheduledExecutor(new DaemonThreadFactory("DaVinci_Bootstrapping_Check_Executor"));
public final CompletableFuture<Void> bootstrappingFuture = new CompletableFuture<>();

public BootstrappingAwareCompletableFuture(DaVinciBackend backend) {
scheduledExecutor.scheduleAtFixedRate(() -> {
if (bootstrappingFuture.isDone()) {
return;
}
if (!backend.hasCurrentVersionBootstrapping()) {
bootstrappingFuture.complete(null);
}
}, 0, 3, TimeUnit.SECONDS);
bootstrappingFuture.whenComplete((ignored1, ignored2) -> scheduledExecutor.shutdown());
}

public CompletableFuture<Void> getBootstrappingFuture() {
return bootstrappingFuture;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -193,14 +193,30 @@ synchronized void tryStartHeartbeat() {
if (isReportingPushStatus() && heartbeat == null) {
heartbeat = backend.getExecutor().scheduleAtFixedRate(() -> {
try {
backend.getPushStatusStoreWriter().writeHeartbeat(version.getStoreName());
sendOutHeartbeat(backend, version);
} catch (Throwable t) {
LOGGER.error("Unable to send heartbeat for {}", this);
}
}, 0, heartbeatInterval, TimeUnit.SECONDS);
}
}

protected static void sendOutHeartbeat(DaVinciBackend backend, Version version) {
if (backend.hasCurrentVersionBootstrapping()) {
LOGGER.info(
"DaVinci still is still bootstrapping, so it will send heart-beat message with a special timestamp"
+ " for store: {} to avoid delaying the new push job",
version.getStoreName());
/**
* Tell backend that the report from the bootstrapping instance doesn't count to avoid
* delaying new pushes.
*/
backend.getPushStatusStoreWriter().writeHeartbeatForBootstrappingInstance(version.getStoreName());
} else {
backend.getPushStatusStoreWriter().writeHeartbeat(version.getStoreName());
}
}

synchronized void tryStopHeartbeat() {
if (heartbeat != null && partitionFutures.values().stream().allMatch(CompletableFuture::isDone)) {
heartbeat.cancel(true);
Expand Down Expand Up @@ -359,9 +375,40 @@ synchronized CompletableFuture<Void> subscribe(ComplementSet<Integer> partitions
futures.add(partitionFutures.get(partition));
}

return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).whenComplete((v, e) -> {
CompletableFuture<Void> bootstrappingAwareSubscriptionFuture = new CompletableFuture<>();

CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).whenComplete((v, e) -> {
storeBackendStats.recordSubscribeDuration(Duration.between(startTime, Instant.now()));
if (e != null) {
bootstrappingAwareSubscriptionFuture.completeExceptionally(e);
LOGGER.warn("Bootstrapping store: {}, version: {} failed", version.getStoreName(), version.getNumber(), e);
} else {
LOGGER.info("Bootstrapping store: {}, version: {} is completed", version.getStoreName(), version.getNumber());
/**
* It is important to start polling the bootstrapping status after the version ingestion is completed to
* make sure the bootstrapping status polling is valid (not doing polling without any past/active ingestion tasks).
*/
new DaVinciBackend.BootstrappingAwareCompletableFuture(backend).getBootstrappingFuture()
.whenComplete((ignored, ee) -> {
if (ee != null) {
bootstrappingAwareSubscriptionFuture.completeExceptionally(ee);
LOGGER.warn(
"Bootstrapping aware subscription to store: {}, version: {} failed",
version.getStoreName(),
version.getNumber(),
ee);
} else {
bootstrappingAwareSubscriptionFuture.complete(null);
LOGGER.info(
"Bootstrapping aware subscription to store: {}, version: {} is completed",
version.getStoreName(),
version.getNumber());
}
});
}
});

return bootstrappingAwareSubscriptionFuture;
}

synchronized void unsubscribe(ComplementSet<Integer> partitions) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,11 @@ public void setStorageEngineReference(
}
}

@Override
public boolean hasCurrentVersionBootstrapping() {
return getStoreIngestionService().hasCurrentVersionBootstrapping();
}

@Override
public KafkaStoreIngestionService getStoreIngestionService() {
return storeIngestionService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,9 @@ void dropStoragePartitionGracefully(

// setStorageEngineReference is used by Da Vinci exclusively to speed up storage engine retrieval for read path.
void setStorageEngineReference(String topicName, AtomicReference<AbstractStorageEngine> storageEngineReference);

/**
* Check whether there are any current version bootstrapping or not.
*/
boolean hasCurrentVersionBootstrapping();
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.linkedin.davinci.ingestion.main.MainIngestionRequestClient;
import com.linkedin.davinci.ingestion.main.MainIngestionStorageMetadataService;
import com.linkedin.davinci.ingestion.main.MainPartitionIngestionStatus;
import com.linkedin.davinci.ingestion.main.MainTopicIngestionStatus;
import com.linkedin.davinci.kafka.consumer.KafkaStoreIngestionService;
import com.linkedin.davinci.notifier.RelayNotifier;
import com.linkedin.davinci.notifier.VeniceNotifier;
Expand All @@ -19,12 +20,15 @@
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.ingestion.protocol.enums.IngestionCommandType;
import com.linkedin.venice.ingestion.protocol.enums.IngestionComponentType;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.utils.Time;
import com.linkedin.venice.utils.Utils;
import io.tehuti.metrics.MetricsRepository;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -50,6 +54,7 @@ public class IsolatedIngestionBackend extends DefaultIngestionBackend implements
private final MainIngestionMonitorService mainIngestionMonitorService;
private final VeniceConfigLoader configLoader;
private final ExecutorService completionReportHandlingExecutor = Executors.newFixedThreadPool(10);
private final Function<String, Integer> currentVersionSupplier;
private Process isolatedIngestionServiceProcess;

public IsolatedIngestionBackend(
Expand All @@ -58,7 +63,8 @@ public IsolatedIngestionBackend(
StorageMetadataService storageMetadataService,
KafkaStoreIngestionService storeIngestionService,
StorageService storageService,
BlobTransferManager blobTransferManager) {
BlobTransferManager blobTransferManager,
Function<String, Integer> currentVersionSupplier) {
super(
storageMetadataService,
storeIngestionService,
Expand All @@ -68,6 +74,7 @@ public IsolatedIngestionBackend(
int servicePort = configLoader.getVeniceServerConfig().getIngestionServicePort();
int listenerPort = configLoader.getVeniceServerConfig().getIngestionApplicationPort();
this.configLoader = configLoader;
this.currentVersionSupplier = currentVersionSupplier;
// Create the ingestion request client.
mainIngestionRequestClient = new MainIngestionRequestClient(configLoader);
// Create the forked isolated ingestion process.
Expand Down Expand Up @@ -192,6 +199,10 @@ public MainIngestionMonitorService getMainIngestionMonitorService() {
return mainIngestionMonitorService;
}

Function<String, Integer> getCurrentVersionSupplier() {
return currentVersionSupplier;
}

public MainIngestionRequestClient getMainIngestionRequestClient() {
return mainIngestionRequestClient;
}
Expand All @@ -218,6 +229,31 @@ public void close() {
}
}

public boolean hasCurrentVersionBootstrapping() {
if (super.hasCurrentVersionBootstrapping()) {
return true;
}

Map<String, MainTopicIngestionStatus> topicIngestionStatusMap =
getMainIngestionMonitorService().getTopicIngestionStatusMap();
for (Map.Entry<String, MainTopicIngestionStatus> entry: topicIngestionStatusMap.entrySet()) {
String topicName = entry.getKey();
MainTopicIngestionStatus ingestionStatus = entry.getValue();
String storeName = Version.parseStoreFromKafkaTopicName(topicName);
int version = Version.parseVersionFromKafkaTopicName(topicName);
/**
* If the current version is still being ingested by isolated process, it means the bootstrapping hasn't finished
* yet as the ingestion task should be handled over to main process if all partitions complete ingestion.
*/
if (getCurrentVersionSupplier().apply(storeName) == version
&& ingestionStatus.hasPartitionIngestingInIsolatedProcess()) {
return true;
}
}

return false;
}

boolean isTopicPartitionHostedInMainProcess(String topicName, int partition) {
return getMainIngestionMonitorService().getTopicPartitionIngestionStatus(topicName, partition)
.equals(MainPartitionIngestionStatus.MAIN);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,13 @@ public long getIngestingPartitionCount() {
public String getTopicName() {
return topicName;
}

public boolean hasPartitionIngestingInIsolatedProcess() {
for (Map.Entry<Integer, MainPartitionIngestionStatus> entry: ingestionStatusMap.entrySet()) {
if (entry.getValue().equals(MainPartitionIngestionStatus.ISOLATED)) {
return true;
}
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,20 @@ private static void shutdownExecutorService(ExecutorService executor, String nam
}
}

public boolean hasCurrentVersionBootstrapping() {
return hasCurrentVersionBootstrapping(topicNameToIngestionTaskMap);
}

public static boolean hasCurrentVersionBootstrapping(Map<String, StoreIngestionTask> ingestionTaskMap) {
for (Map.Entry<String, StoreIngestionTask> entry: ingestionTaskMap.entrySet()) {
StoreIngestionTask task = entry.getValue();
if (task.isCurrentVersion() && !task.hasAllPartitionReportedCompleted()) {
return true;
}
}
return false;
}

/**
* Stops all the Kafka consumption tasks.
* Closes all the Kafka clients.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,20 @@
import static com.linkedin.venice.pushmonitor.ExecutionStatus.ERROR;
import static com.linkedin.venice.utils.DataProviderUtils.BOOLEAN;
import static com.linkedin.venice.utils.DataProviderUtils.allPermutationGenerator;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.fail;

import com.linkedin.venice.exceptions.DiskLimitExhaustedException;
import com.linkedin.venice.exceptions.MemoryLimitExhaustedException;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.pushmonitor.ExecutionStatus;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

Expand Down Expand Up @@ -119,4 +126,17 @@ public void testGetDaVinciErrorStatusWithInvalidCases(
}
}

@Test
public void testBootstrappingAwareCompletableFuture()
throws ExecutionException, InterruptedException, TimeoutException {
DaVinciBackend backend = mock(DaVinciBackend.class);

when(backend.hasCurrentVersionBootstrapping()).thenReturn(true).thenReturn(false);

DaVinciBackend.BootstrappingAwareCompletableFuture future =
new DaVinciBackend.BootstrappingAwareCompletableFuture(backend);
future.getBootstrappingFuture().get(10, TimeUnit.SECONDS);
verify(backend, times(2)).hasCurrentVersionBootstrapping();
}

}
Loading

0 comments on commit b9685bb

Please sign in to comment.