Skip to content

Commit

Permalink
Query blob discovery bootstrap (linkedin#1046)
Browse files Browse the repository at this point in the history
* commit bootstrap changes

---------

Co-authored-by: “ishwarya-personal” <“[email protected]”>
  • Loading branch information
ishwarya-citro and “ishwarya-personal” authored Jul 11, 2024
1 parent 4ba73c9 commit 3bb03f5
Show file tree
Hide file tree
Showing 30 changed files with 587 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@
import com.linkedin.davinci.store.AbstractStorageEngine;
import com.linkedin.davinci.store.cache.backend.ObjectCacheBackend;
import com.linkedin.davinci.store.cache.backend.ObjectCacheConfig;
import com.linkedin.venice.blobtransfer.BlobTransferManager;
import com.linkedin.venice.blobtransfer.DvcBlobFinder;
import com.linkedin.venice.blobtransfer.NettyP2PBlobTransferManager;
import com.linkedin.venice.blobtransfer.client.NettyFileTransferClient;
import com.linkedin.venice.blobtransfer.server.P2PBlobTransferService;
import com.linkedin.venice.client.schema.StoreSchemaFetcher;
import com.linkedin.venice.client.store.ClientConfig;
import com.linkedin.venice.client.store.ClientFactory;
Expand Down Expand Up @@ -86,7 +91,7 @@

public class DaVinciBackend implements Closeable {
private static final Logger LOGGER = LogManager.getLogger(DaVinciBackend.class);

private final BlobTransferManager blobTransferManager;
private final VeniceConfigLoader configLoader;
private final SubscriptionBasedReadOnlyStoreRepository storeRepository;
private final ReadOnlySchemaRepository schemaRepository;
Expand Down Expand Up @@ -272,6 +277,19 @@ public DaVinciBackend(
ingestionService.start();
ingestionService.addIngestionNotifier(ingestionListener);

boolean isBlobTransferEnabled = configLoader.getStoreConfig(clientConfig.getStoreName()).isBlobTransferEnabled();
if (isBlobTransferEnabled) {
int blobTransferPort = backendConfig.getDvcP2pBlobTransferPort();
String rocksDBPath = backendConfig.getRocksDBPath();
this.blobTransferManager = new NettyP2PBlobTransferManager(
new P2PBlobTransferService(blobTransferPort, rocksDBPath),
new NettyFileTransferClient(blobTransferPort, rocksDBPath),
new DvcBlobFinder(ClientFactory.getTransportClient(clientConfig)));
blobTransferManager.start();
} else {
blobTransferManager = null;
}

if (isIsolatedIngestion() && cacheConfig.isPresent()) {
// TODO: There are 'some' cases where this mix might be ok, (like a batch only store, or with certain TTL
// settings),
Expand Down Expand Up @@ -427,8 +445,14 @@ private synchronized void bootstrap() {
metricsRepository,
storageMetadataService,
ingestionService,
storageService)
: new DefaultIngestionBackend(storageMetadataService, ingestionService, storageService);
storageService,
blobTransferManager)
: new DefaultIngestionBackend(
storageMetadataService,
ingestionService,
storageService,
blobTransferManager,
configLoader.getVeniceServerConfig());
ingestionBackend.addIngestionNotifier(ingestionListener);

// Subscribe all bootstrap version partitions.
Expand Down Expand Up @@ -495,6 +519,11 @@ public synchronized void close() {
storeRepository.clear();
schemaRepository.clear();
pushStatusStoreWriter.close();

if (blobTransferManager != null) {
blobTransferManager.close();
}

LOGGER.info("Da Vinci backend is closed successfully");
} catch (Throwable e) {
String msg = "Unable to stop Da Vinci backend";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ synchronized void delete() {
* The following function is used to forcibly clean up any leaking data partitions, which are not
* visibile to the corresponding {@link AbstractStorageEngine} since some data partitions can fail
* to open because of DaVinci memory limiter.
*/
*/
backend.getStorageService().forceStorageEngineCleanup(topicName);
backend.getCompressorFactory().removeVersionSpecificCompressor(topicName);
} catch (VeniceException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import static com.linkedin.davinci.store.rocksdb.RocksDBServerConfig.ROCKSDB_TOTAL_MEMTABLE_USAGE_CAP_IN_BYTES;
import static com.linkedin.venice.ConfigKeys.AUTOCREATE_DATA_PATH;
import static com.linkedin.venice.ConfigKeys.DATA_BASE_PATH;
import static com.linkedin.venice.ConfigKeys.DAVINCI_P2P_BLOB_TRANSFER_PORT;
import static com.linkedin.venice.ConfigKeys.DIV_PRODUCER_STATE_MAX_AGE_MS;
import static com.linkedin.venice.ConfigKeys.ENABLE_GRPC_READ_SERVER;
import static com.linkedin.venice.ConfigKeys.ENABLE_SERVER_ALLOW_LIST;
Expand Down Expand Up @@ -149,6 +150,7 @@
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.VeniceProperties;
import com.linkedin.venice.utils.concurrent.BlockingQueueType;
import java.io.File;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.Arrays;
Expand Down Expand Up @@ -465,6 +467,7 @@ public class VeniceServerConfig extends VeniceClusterConfig {
private final boolean useDaVinciSpecificExecutionStatusForError;
private final boolean recordLevelMetricWhenBootstrappingCurrentVersionEnabled;
private final String identityParserClassName;
private final int dvcP2pBlobTransferPort;

public VeniceServerConfig(VeniceProperties serverProperties) throws ConfigurationException {
this(serverProperties, Collections.emptyMap());
Expand All @@ -487,6 +490,8 @@ public VeniceServerConfig(VeniceProperties serverProperties, Map<String, Map<Str
maxLeaderFollowerStateTransitionThreadNumber =
serverProperties.getInt(MAX_LEADER_FOLLOWER_STATE_TRANSITION_THREAD_NUMBER, 20);

dvcP2pBlobTransferPort = serverProperties.getInt(DAVINCI_P2P_BLOB_TRANSFER_PORT, -1);

String lfThreadPoolStrategyStr = serverProperties.getString(
LEADER_FOLLOWER_STATE_TRANSITION_THREAD_POOL_STRATEGY,
LeaderFollowerPartitionStateModelFactory.LeaderFollowerThreadPoolStrategy.SINGLE_POOL_STRATEGY.name());
Expand Down Expand Up @@ -855,6 +860,10 @@ public String getListenerHostname() {
return listenerHostname;
}

public int getDvcP2pBlobTransferPort() {
return dvcP2pBlobTransferPort;
}

/**
* Get base path of Venice storage data.
*
Expand Down Expand Up @@ -1363,4 +1372,8 @@ public boolean isRecordLevelMetricWhenBootstrappingCurrentVersionEnabled() {
public String getIdentityParserClassName() {
return identityParserClassName;
}

public String getRocksDBPath() {
return getDataBasePath() + File.separator + "rocksdb";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,13 @@ public HelixParticipationService(
if (!(storeIngestionService instanceof KafkaStoreIngestionService)) {
throw new VeniceException("Expecting " + KafkaStoreIngestionService.class.getName() + " for ingestion backend!");
}

this.ingestionBackend = new DefaultIngestionBackend(
storageMetadataService,
(KafkaStoreIngestionService) storeIngestionService,
storageService);
storageService,
null,
null);
}

// Set corePoolSize and maxPoolSize as the same value, but enable allowCoreThreadTimeOut. So the expected
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,25 @@
package com.linkedin.davinci.ingestion;

import com.linkedin.davinci.config.VeniceServerConfig;
import com.linkedin.davinci.config.VeniceStoreVersionConfig;
import com.linkedin.davinci.kafka.consumer.KafkaStoreIngestionService;
import com.linkedin.davinci.notifier.VeniceNotifier;
import com.linkedin.davinci.storage.StorageMetadataService;
import com.linkedin.davinci.storage.StorageService;
import com.linkedin.davinci.store.AbstractStorageEngine;
import com.linkedin.venice.blobtransfer.BlobTransferManager;
import com.linkedin.venice.kafka.protocol.state.StoreVersionState;
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.store.rocksdb.RocksDBUtils;
import com.linkedin.venice.utils.Pair;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import java.io.InputStream;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.apache.logging.log4j.LogManager;
Expand All @@ -25,23 +34,30 @@ public class DefaultIngestionBackend implements IngestionBackend {
private final StorageMetadataService storageMetadataService;
private final StorageService storageService;
private final KafkaStoreIngestionService storeIngestionService;
private final VeniceServerConfig serverConfig;
private final Map<String, AtomicReference<AbstractStorageEngine>> topicStorageEngineReferenceMap =
new VeniceConcurrentHashMap<>();
private final BlobTransferManager blobTransferManager;

public DefaultIngestionBackend(
StorageMetadataService storageMetadataService,
KafkaStoreIngestionService storeIngestionService,
StorageService storageService) {
StorageService storageService,
BlobTransferManager blobTransferManager,
VeniceServerConfig serverConfig) {
this.storageMetadataService = storageMetadataService;
this.storeIngestionService = storeIngestionService;
this.storageService = storageService;
this.blobTransferManager = blobTransferManager;
this.serverConfig = serverConfig;
}

@Override
public void startConsumption(VeniceStoreVersionConfig storeConfig, int partition) {
String storeVersion = storeConfig.getStoreVersionName();
LOGGER.info("Retrieving storage engine for store {} partition {}", storeVersion, partition);
Utils.waitStoreVersionOrThrow(storeVersion, getStoreIngestionService().getMetadataRepo());
Pair<Store, Version> storeAndVersion =
Utils.waitStoreVersionOrThrow(storeVersion, getStoreIngestionService().getMetadataRepo());
Supplier<StoreVersionState> svsSupplier = () -> storageMetadataService.getStoreVersionState(storeVersion);
AbstractStorageEngine storageEngine = storageService.openStoreForNewPartition(storeConfig, partition, svsSupplier);
topicStorageEngineReferenceMap.compute(storeVersion, (key, storageEngineAtomicReference) -> {
Expand All @@ -50,13 +66,52 @@ public void startConsumption(VeniceStoreVersionConfig storeConfig, int partition
}
return storageEngineAtomicReference;
});
LOGGER.info(
"Retrieved storage engine for store {} partition {}. Starting consumption in ingestion service",
storeVersion,
partition);
getStoreIngestionService().startConsumption(storeConfig, partition);

CompletionStage<Void> bootstrapFuture =
bootstrapFromBlobs(storeAndVersion.getFirst(), storeAndVersion.getSecond().getNumber(), partition);

bootstrapFuture.whenComplete((result, throwable) -> {
LOGGER.info(
"Retrieved storage engine for store {} partition {}. Starting consumption in ingestion service",
storeVersion,
partition);
getStoreIngestionService().startConsumption(storeConfig, partition);
});
}

/**
* Bootstrap from the blobs from another source (like another peer). If it fails (due to the 30-minute timeout or
* any exceptions), it deletes the partially downloaded blobs, and eventually falls back to bootstrapping from Kafka.
* Blob transfer should be enabled to boostrap from blobs, and it currently only supports batch-stores.
*/
private CompletionStage<Void> bootstrapFromBlobs(Store store, int versionNumber, int partitionId) {
if (!store.isBlobTransferEnabled() || store.isHybrid() || blobTransferManager == null) {
return CompletableFuture.completedFuture(null);
}

String storeName = store.getName();
String baseDir = serverConfig.getRocksDBPath();
CompletableFuture<InputStream> p2pFuture =
blobTransferManager.get(storeName, versionNumber, partitionId).toCompletableFuture();

LOGGER
.info("Completed starting consumption in ingestion service for store {} partition {}", storeVersion, partition);
.info("Bootstrapping from blobs for store {}, version {}, partition {}", storeName, versionNumber, partitionId);

return CompletableFuture.runAsync(() -> {
try {
p2pFuture.get(30, TimeUnit.MINUTES);
} catch (Exception e) {
LOGGER.warn(
"Failed bootstrapping from blobs for store {}, version {}, partition {}",
storeName,
versionNumber,
partitionId,
e);
RocksDBUtils.deletePartitionDir(baseDir, storeName, versionNumber, partitionId);
p2pFuture.cancel(true);
// todo: close channels
}
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import com.linkedin.davinci.notifier.VeniceNotifier;
import com.linkedin.davinci.storage.StorageMetadataService;
import com.linkedin.davinci.storage.StorageService;
import com.linkedin.venice.blobtransfer.BlobTransferManager;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.ingestion.protocol.enums.IngestionCommandType;
import com.linkedin.venice.ingestion.protocol.enums.IngestionComponentType;
Expand Down Expand Up @@ -56,8 +57,14 @@ public IsolatedIngestionBackend(
MetricsRepository metricsRepository,
StorageMetadataService storageMetadataService,
KafkaStoreIngestionService storeIngestionService,
StorageService storageService) {
super(storageMetadataService, storeIngestionService, storageService);
StorageService storageService,
BlobTransferManager blobTransferManager) {
super(
storageMetadataService,
storeIngestionService,
storageService,
blobTransferManager,
configLoader.getVeniceServerConfig());
int servicePort = configLoader.getVeniceServerConfig().getIngestionServicePort();
int listenerPort = configLoader.getVeniceServerConfig().getIngestionApplicationPort();
this.configLoader = configLoader;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -719,7 +719,13 @@ private void initializeIsolatedIngestionServer() {
null);
storeIngestionService.start();
storeIngestionService.addIngestionNotifier(new IsolatedIngestionNotifier(this));
ingestionBackend = new DefaultIngestionBackend(storageMetadataService, storeIngestionService, storageService);

ingestionBackend = new DefaultIngestionBackend(
storageMetadataService,
storeIngestionService,
storageService,
null,
configLoader.getVeniceServerConfig());

if (serverConfig.isLeakedResourceCleanupEnabled()) {
this.leakedResourceCleaner = new LeakedResourceCleaner(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ public RocksDBStorageEngineFactory(
InternalAvroSpecificSerializer<PartitionState> partitionStateSerializer) {
this.serverConfig = serverConfig;
this.rocksDBServerConfig = serverConfig.getRocksDBServerConfig();
this.rocksDBPath = serverConfig.getDataBasePath() + File.separator + "rocksdb";
this.rocksDBPath = serverConfig.getRocksDBPath();
this.rocksDBMemoryStats = rocksDBMemoryStats;
this.storeVersionStateSerializer = storeVersionStateSerializer;
this.partitionStateSerializer = partitionStateSerializer;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package com.linkedin.davinci.config;

import static com.linkedin.davinci.ingestion.utils.IsolatedIngestionUtils.*;
import static com.linkedin.davinci.ingestion.utils.IsolatedIngestionUtils.INGESTION_ISOLATION_CONFIG_PREFIX;
import static com.linkedin.davinci.store.rocksdb.RocksDBServerConfig.ROCKSDB_PLAIN_TABLE_FORMAT_ENABLED;
import static com.linkedin.davinci.store.rocksdb.RocksDBServerConfig.ROCKSDB_TOTAL_MEMTABLE_USAGE_CAP_IN_BYTES;
import static com.linkedin.venice.ConfigKeys.CLUSTER_NAME;
import static com.linkedin.venice.ConfigKeys.DATA_BASE_PATH;
import static com.linkedin.venice.ConfigKeys.INGESTION_MEMORY_LIMIT;
import static com.linkedin.venice.ConfigKeys.INGESTION_USE_DA_VINCI_CLIENT;
import static com.linkedin.venice.ConfigKeys.KAFKA_BOOTSTRAP_SERVERS;
Expand Down Expand Up @@ -46,6 +47,17 @@ public void testForkedJVMParams() {
assertEquals(jvmArgs.get(1), "-Xmx256G");
}

@Test
public void testRocksDBPath() {
Properties props = populatedBasicProperties();
props.put(DATA_BASE_PATH, "db/path");

VeniceServerConfig config = new VeniceServerConfig(new VeniceProperties(props));

String path = config.getRocksDBPath();
assertEquals(path, "db/path/rocksdb");
}

@Test
public void testMemoryLimitConfigWithoutIngestionIsolation() {
Properties propsForNonDaVinci = populatedBasicProperties();
Expand Down
Loading

0 comments on commit 3bb03f5

Please sign in to comment.