diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java index 66c41f291a..8d68fdcb4d 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java @@ -30,6 +30,11 @@ import com.linkedin.davinci.store.AbstractStorageEngine; import com.linkedin.davinci.store.cache.backend.ObjectCacheBackend; import com.linkedin.davinci.store.cache.backend.ObjectCacheConfig; +import com.linkedin.venice.blobtransfer.BlobTransferManager; +import com.linkedin.venice.blobtransfer.DvcBlobFinder; +import com.linkedin.venice.blobtransfer.NettyP2PBlobTransferManager; +import com.linkedin.venice.blobtransfer.client.NettyFileTransferClient; +import com.linkedin.venice.blobtransfer.server.P2PBlobTransferService; import com.linkedin.venice.client.schema.StoreSchemaFetcher; import com.linkedin.venice.client.store.ClientConfig; import com.linkedin.venice.client.store.ClientFactory; @@ -86,7 +91,7 @@ public class DaVinciBackend implements Closeable { private static final Logger LOGGER = LogManager.getLogger(DaVinciBackend.class); - + private final BlobTransferManager blobTransferManager; private final VeniceConfigLoader configLoader; private final SubscriptionBasedReadOnlyStoreRepository storeRepository; private final ReadOnlySchemaRepository schemaRepository; @@ -272,6 +277,19 @@ public DaVinciBackend( ingestionService.start(); ingestionService.addIngestionNotifier(ingestionListener); + boolean isBlobTransferEnabled = configLoader.getStoreConfig(clientConfig.getStoreName()).isBlobTransferEnabled(); + if (isBlobTransferEnabled) { + int blobTransferPort = backendConfig.getDvcP2pBlobTransferPort(); + String rocksDBPath = backendConfig.getRocksDBPath(); + this.blobTransferManager = new NettyP2PBlobTransferManager( + new P2PBlobTransferService(blobTransferPort, rocksDBPath), + new NettyFileTransferClient(blobTransferPort, rocksDBPath), + new DvcBlobFinder(ClientFactory.getTransportClient(clientConfig))); + blobTransferManager.start(); + } else { + blobTransferManager = null; + } + if (isIsolatedIngestion() && cacheConfig.isPresent()) { // TODO: There are 'some' cases where this mix might be ok, (like a batch only store, or with certain TTL // settings), @@ -427,8 +445,14 @@ private synchronized void bootstrap() { metricsRepository, storageMetadataService, ingestionService, - storageService) - : new DefaultIngestionBackend(storageMetadataService, ingestionService, storageService); + storageService, + blobTransferManager) + : new DefaultIngestionBackend( + storageMetadataService, + ingestionService, + storageService, + blobTransferManager, + configLoader.getVeniceServerConfig()); ingestionBackend.addIngestionNotifier(ingestionListener); // Subscribe all bootstrap version partitions. @@ -495,6 +519,11 @@ public synchronized void close() { storeRepository.clear(); schemaRepository.clear(); pushStatusStoreWriter.close(); + + if (blobTransferManager != null) { + blobTransferManager.close(); + } + LOGGER.info("Da Vinci backend is closed successfully"); } catch (Throwable e) { String msg = "Unable to stop Da Vinci backend"; diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/VersionBackend.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/VersionBackend.java index 8ad1bcc038..38f3ad3da0 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/VersionBackend.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/VersionBackend.java @@ -133,7 +133,7 @@ synchronized void delete() { * The following function is used to forcibly clean up any leaking data partitions, which are not * visibile to the corresponding {@link AbstractStorageEngine} since some data partitions can fail * to open because of DaVinci memory limiter. - */ + */ backend.getStorageService().forceStorageEngineCleanup(topicName); backend.getCompressorFactory().removeVersionSpecificCompressor(topicName); } catch (VeniceException e) { diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceServerConfig.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceServerConfig.java index e42a8c9a85..10a17b3d1d 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceServerConfig.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceServerConfig.java @@ -4,6 +4,7 @@ import static com.linkedin.davinci.store.rocksdb.RocksDBServerConfig.ROCKSDB_TOTAL_MEMTABLE_USAGE_CAP_IN_BYTES; import static com.linkedin.venice.ConfigKeys.AUTOCREATE_DATA_PATH; import static com.linkedin.venice.ConfigKeys.DATA_BASE_PATH; +import static com.linkedin.venice.ConfigKeys.DAVINCI_P2P_BLOB_TRANSFER_PORT; import static com.linkedin.venice.ConfigKeys.DIV_PRODUCER_STATE_MAX_AGE_MS; import static com.linkedin.venice.ConfigKeys.ENABLE_GRPC_READ_SERVER; import static com.linkedin.venice.ConfigKeys.ENABLE_SERVER_ALLOW_LIST; @@ -149,6 +150,7 @@ import com.linkedin.venice.utils.Utils; import com.linkedin.venice.utils.VeniceProperties; import com.linkedin.venice.utils.concurrent.BlockingQueueType; +import java.io.File; import java.nio.file.Paths; import java.time.Duration; import java.util.Arrays; @@ -465,6 +467,7 @@ public class VeniceServerConfig extends VeniceClusterConfig { private final boolean useDaVinciSpecificExecutionStatusForError; private final boolean recordLevelMetricWhenBootstrappingCurrentVersionEnabled; private final String identityParserClassName; + private final int dvcP2pBlobTransferPort; public VeniceServerConfig(VeniceProperties serverProperties) throws ConfigurationException { this(serverProperties, Collections.emptyMap()); @@ -487,6 +490,8 @@ public VeniceServerConfig(VeniceProperties serverProperties, Map> topicStorageEngineReferenceMap = new VeniceConcurrentHashMap<>(); + private final BlobTransferManager blobTransferManager; public DefaultIngestionBackend( StorageMetadataService storageMetadataService, KafkaStoreIngestionService storeIngestionService, - StorageService storageService) { + StorageService storageService, + BlobTransferManager blobTransferManager, + VeniceServerConfig serverConfig) { this.storageMetadataService = storageMetadataService; this.storeIngestionService = storeIngestionService; this.storageService = storageService; + this.blobTransferManager = blobTransferManager; + this.serverConfig = serverConfig; } @Override public void startConsumption(VeniceStoreVersionConfig storeConfig, int partition) { String storeVersion = storeConfig.getStoreVersionName(); LOGGER.info("Retrieving storage engine for store {} partition {}", storeVersion, partition); - Utils.waitStoreVersionOrThrow(storeVersion, getStoreIngestionService().getMetadataRepo()); + Pair storeAndVersion = + Utils.waitStoreVersionOrThrow(storeVersion, getStoreIngestionService().getMetadataRepo()); Supplier svsSupplier = () -> storageMetadataService.getStoreVersionState(storeVersion); AbstractStorageEngine storageEngine = storageService.openStoreForNewPartition(storeConfig, partition, svsSupplier); topicStorageEngineReferenceMap.compute(storeVersion, (key, storageEngineAtomicReference) -> { @@ -50,13 +66,52 @@ public void startConsumption(VeniceStoreVersionConfig storeConfig, int partition } return storageEngineAtomicReference; }); - LOGGER.info( - "Retrieved storage engine for store {} partition {}. Starting consumption in ingestion service", - storeVersion, - partition); - getStoreIngestionService().startConsumption(storeConfig, partition); + + CompletionStage bootstrapFuture = + bootstrapFromBlobs(storeAndVersion.getFirst(), storeAndVersion.getSecond().getNumber(), partition); + + bootstrapFuture.whenComplete((result, throwable) -> { + LOGGER.info( + "Retrieved storage engine for store {} partition {}. Starting consumption in ingestion service", + storeVersion, + partition); + getStoreIngestionService().startConsumption(storeConfig, partition); + }); + } + + /** + * Bootstrap from the blobs from another source (like another peer). If it fails (due to the 30-minute timeout or + * any exceptions), it deletes the partially downloaded blobs, and eventually falls back to bootstrapping from Kafka. + * Blob transfer should be enabled to boostrap from blobs, and it currently only supports batch-stores. + */ + private CompletionStage bootstrapFromBlobs(Store store, int versionNumber, int partitionId) { + if (!store.isBlobTransferEnabled() || store.isHybrid() || blobTransferManager == null) { + return CompletableFuture.completedFuture(null); + } + + String storeName = store.getName(); + String baseDir = serverConfig.getRocksDBPath(); + CompletableFuture p2pFuture = + blobTransferManager.get(storeName, versionNumber, partitionId).toCompletableFuture(); + LOGGER - .info("Completed starting consumption in ingestion service for store {} partition {}", storeVersion, partition); + .info("Bootstrapping from blobs for store {}, version {}, partition {}", storeName, versionNumber, partitionId); + + return CompletableFuture.runAsync(() -> { + try { + p2pFuture.get(30, TimeUnit.MINUTES); + } catch (Exception e) { + LOGGER.warn( + "Failed bootstrapping from blobs for store {}, version {}, partition {}", + storeName, + versionNumber, + partitionId, + e); + RocksDBUtils.deletePartitionDir(baseDir, storeName, versionNumber, partitionId); + p2pFuture.cancel(true); + // todo: close channels + } + }); } @Override diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/IsolatedIngestionBackend.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/IsolatedIngestionBackend.java index c0526f5cc6..d5db77ccdf 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/IsolatedIngestionBackend.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/IsolatedIngestionBackend.java @@ -15,6 +15,7 @@ import com.linkedin.davinci.notifier.VeniceNotifier; import com.linkedin.davinci.storage.StorageMetadataService; import com.linkedin.davinci.storage.StorageService; +import com.linkedin.venice.blobtransfer.BlobTransferManager; import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.ingestion.protocol.enums.IngestionCommandType; import com.linkedin.venice.ingestion.protocol.enums.IngestionComponentType; @@ -56,8 +57,14 @@ public IsolatedIngestionBackend( MetricsRepository metricsRepository, StorageMetadataService storageMetadataService, KafkaStoreIngestionService storeIngestionService, - StorageService storageService) { - super(storageMetadataService, storeIngestionService, storageService); + StorageService storageService, + BlobTransferManager blobTransferManager) { + super( + storageMetadataService, + storeIngestionService, + storageService, + blobTransferManager, + configLoader.getVeniceServerConfig()); int servicePort = configLoader.getVeniceServerConfig().getIngestionServicePort(); int listenerPort = configLoader.getVeniceServerConfig().getIngestionApplicationPort(); this.configLoader = configLoader; diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/isolated/IsolatedIngestionServer.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/isolated/IsolatedIngestionServer.java index 24dae61253..2456fcfe31 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/isolated/IsolatedIngestionServer.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/isolated/IsolatedIngestionServer.java @@ -719,7 +719,13 @@ private void initializeIsolatedIngestionServer() { null); storeIngestionService.start(); storeIngestionService.addIngestionNotifier(new IsolatedIngestionNotifier(this)); - ingestionBackend = new DefaultIngestionBackend(storageMetadataService, storeIngestionService, storageService); + + ingestionBackend = new DefaultIngestionBackend( + storageMetadataService, + storeIngestionService, + storageService, + null, + configLoader.getVeniceServerConfig()); if (serverConfig.isLeakedResourceCleanupEnabled()) { this.leakedResourceCleaner = new LeakedResourceCleaner( diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/rocksdb/RocksDBStorageEngineFactory.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/rocksdb/RocksDBStorageEngineFactory.java index 5094f059cf..cc3ae7a4c4 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/rocksdb/RocksDBStorageEngineFactory.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/rocksdb/RocksDBStorageEngineFactory.java @@ -123,7 +123,7 @@ public RocksDBStorageEngineFactory( InternalAvroSpecificSerializer partitionStateSerializer) { this.serverConfig = serverConfig; this.rocksDBServerConfig = serverConfig.getRocksDBServerConfig(); - this.rocksDBPath = serverConfig.getDataBasePath() + File.separator + "rocksdb"; + this.rocksDBPath = serverConfig.getRocksDBPath(); this.rocksDBMemoryStats = rocksDBMemoryStats; this.storeVersionStateSerializer = storeVersionStateSerializer; this.partitionStateSerializer = partitionStateSerializer; diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/config/VeniceServerConfigTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/config/VeniceServerConfigTest.java index a34c9de93a..6ec2ca3365 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/config/VeniceServerConfigTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/config/VeniceServerConfigTest.java @@ -1,9 +1,10 @@ package com.linkedin.davinci.config; -import static com.linkedin.davinci.ingestion.utils.IsolatedIngestionUtils.*; +import static com.linkedin.davinci.ingestion.utils.IsolatedIngestionUtils.INGESTION_ISOLATION_CONFIG_PREFIX; import static com.linkedin.davinci.store.rocksdb.RocksDBServerConfig.ROCKSDB_PLAIN_TABLE_FORMAT_ENABLED; import static com.linkedin.davinci.store.rocksdb.RocksDBServerConfig.ROCKSDB_TOTAL_MEMTABLE_USAGE_CAP_IN_BYTES; import static com.linkedin.venice.ConfigKeys.CLUSTER_NAME; +import static com.linkedin.venice.ConfigKeys.DATA_BASE_PATH; import static com.linkedin.venice.ConfigKeys.INGESTION_MEMORY_LIMIT; import static com.linkedin.venice.ConfigKeys.INGESTION_USE_DA_VINCI_CLIENT; import static com.linkedin.venice.ConfigKeys.KAFKA_BOOTSTRAP_SERVERS; @@ -46,6 +47,17 @@ public void testForkedJVMParams() { assertEquals(jvmArgs.get(1), "-Xmx256G"); } + @Test + public void testRocksDBPath() { + Properties props = populatedBasicProperties(); + props.put(DATA_BASE_PATH, "db/path"); + + VeniceServerConfig config = new VeniceServerConfig(new VeniceProperties(props)); + + String path = config.getRocksDBPath(); + assertEquals(path, "db/path/rocksdb"); + } + @Test public void testMemoryLimitConfigWithoutIngestionIsolation() { Properties propsForNonDaVinci = populatedBasicProperties(); diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/ingestion/DefaultIngestionBackendTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/ingestion/DefaultIngestionBackendTest.java new file mode 100644 index 0000000000..c3f6409882 --- /dev/null +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/ingestion/DefaultIngestionBackendTest.java @@ -0,0 +1,93 @@ +package com.linkedin.davinci.ingestion; + +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.linkedin.davinci.config.VeniceServerConfig; +import com.linkedin.davinci.config.VeniceStoreVersionConfig; +import com.linkedin.davinci.kafka.consumer.KafkaStoreIngestionService; +import com.linkedin.davinci.storage.StorageMetadataService; +import com.linkedin.davinci.storage.StorageService; +import com.linkedin.davinci.store.AbstractStorageEngine; +import com.linkedin.venice.blobtransfer.BlobTransferManager; +import com.linkedin.venice.kafka.protocol.state.StoreVersionState; +import com.linkedin.venice.meta.ReadOnlyStoreRepository; +import com.linkedin.venice.meta.Store; +import com.linkedin.venice.meta.Version; +import com.linkedin.venice.utils.Pair; +import java.io.InputStream; +import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + + +public class DefaultIngestionBackendTest { + private StorageMetadataService storageMetadataService; + private KafkaStoreIngestionService storeIngestionService; + private StorageService storageService; + private BlobTransferManager blobTransferManager; + private DefaultIngestionBackend ingestionBackend; + private VeniceStoreVersionConfig storeConfig; + private AbstractStorageEngine storageEngine; + private ReadOnlyStoreRepository metadataRepo; + private VeniceServerConfig veniceServerConfig; + + @BeforeMethod + public void setUp() { + storageMetadataService = mock(StorageMetadataService.class); + storeIngestionService = mock(KafkaStoreIngestionService.class); + storageService = mock(StorageService.class); + blobTransferManager = mock(BlobTransferManager.class); + storeConfig = mock(VeniceStoreVersionConfig.class); + storageEngine = mock(AbstractStorageEngine.class); + metadataRepo = mock(ReadOnlyStoreRepository.class); + veniceServerConfig = mock(VeniceServerConfig.class); + + // Create the DefaultIngestionBackend instance with mocked dependencies + ingestionBackend = new DefaultIngestionBackend( + storageMetadataService, + storeIngestionService, + storageService, + blobTransferManager, + veniceServerConfig); + } + + @Test + public void testStartConsumption() { + String storeVersion = "store_v1"; + int partition = 1; + String storeName = "testStore"; + int versionNumber = 1; + Store store = mock(Store.class); + Version version = mock(Version.class); + Pair storeAndVersion = Pair.create(store, version); + StoreVersionState storeVersionState = mock(StoreVersionState.class); + CompletableFuture bootstrapFuture = CompletableFuture.completedFuture(null); + String baseDir = "mockBaseDir"; + + when(storeConfig.getStoreVersionName()).thenReturn(storeVersion); + when(storeIngestionService.getMetadataRepo()).thenReturn(metadataRepo); + doNothing().when(storeIngestionService).startConsumption(any(VeniceStoreVersionConfig.class), anyInt()); + when(metadataRepo.waitVersion(anyString(), anyInt(), any(Duration.class))).thenReturn(storeAndVersion); + when(storageMetadataService.getStoreVersionState(storeVersion)).thenReturn(storeVersionState); + when(storageService.openStoreForNewPartition(eq(storeConfig), eq(partition), any())).thenReturn(storageEngine); + when(store.getName()).thenReturn(storeName); + when(version.getNumber()).thenReturn(versionNumber); + when(blobTransferManager.get(eq(storeName), eq(versionNumber), eq(partition))).thenReturn(bootstrapFuture); + when(store.isBlobTransferEnabled()).thenReturn(true); + when(store.isHybrid()).thenReturn(false); + when(veniceServerConfig.getRocksDBPath()).thenReturn(baseDir); + + ingestionBackend.startConsumption(storeConfig, partition); + + // verify that blobTransferManager was called given it is a hybrid & blob enabled + verify(blobTransferManager).get(eq(storeName), eq(versionNumber), eq(partition)); + } +} diff --git a/internal/venice-client-common/src/main/java/com/linkedin/venice/routerapi/BlobDiscoveryResponse.java b/internal/venice-client-common/src/main/java/com/linkedin/venice/routerapi/BlobDiscoveryResponse.java deleted file mode 100644 index 8e3191807f..0000000000 --- a/internal/venice-client-common/src/main/java/com/linkedin/venice/routerapi/BlobDiscoveryResponse.java +++ /dev/null @@ -1,17 +0,0 @@ -package com.linkedin.venice.routerapi; - -import com.linkedin.venice.controllerapi.ControllerResponse; -import java.util.List; - - -public class BlobDiscoveryResponse extends ControllerResponse { - private List liveNodeHostNames; - - public void setLiveNodeNames(List liveNodeHostNames) { - this.liveNodeHostNames = liveNodeHostNames; - } - - public List getLiveNodeHostNames() { - return liveNodeHostNames; - } -} diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java b/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java index 560220bb93..002a1aafae 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java @@ -1739,7 +1739,8 @@ private ConfigKeys() { // Config to control how much percentage of DVC replica instances are allowed to be offline before failing VPJ push. public static final String DAVINCI_PUSH_STATUS_SCAN_MAX_OFFLINE_INSTANCE_RATIO = "davinci.push.status.scan.max.offline.instance.ratio"; - + // Port used by peer-to-peer transfer service + public static final String DAVINCI_P2P_BLOB_TRANSFER_PORT = "davinci.p2p.blob.transfer.port"; public static final String CONTROLLER_ZK_SHARED_DAVINCI_PUSH_STATUS_SYSTEM_SCHEMA_STORE_AUTO_CREATION_ENABLED = "controller.zk.shared.davinci.push.status.system.schema.store.auto.creation.enabled"; diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/blobtransfer/BlobFinder.java b/internal/venice-common/src/main/java/com/linkedin/venice/blobtransfer/BlobFinder.java index ffff845b7e..f09318d3e5 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/blobtransfer/BlobFinder.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/blobtransfer/BlobFinder.java @@ -1,6 +1,6 @@ package com.linkedin.venice.blobtransfer; -public interface BlobFinder { +public interface BlobFinder extends AutoCloseable { /** * This method will look through the partitions for the store and version provided until it finds the partition * requested, it will then return the URL of the instances that are ready to serve in the partition. diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/blobtransfer/DvcBlobFinder.java b/internal/venice-common/src/main/java/com/linkedin/venice/blobtransfer/DvcBlobFinder.java new file mode 100644 index 0000000000..5532d03f91 --- /dev/null +++ b/internal/venice-common/src/main/java/com/linkedin/venice/blobtransfer/DvcBlobFinder.java @@ -0,0 +1,81 @@ +package com.linkedin.venice.blobtransfer; + +import static com.linkedin.venice.controllerapi.ControllerApiConstants.NAME; +import static com.linkedin.venice.controllerapi.ControllerApiConstants.STORE_PARTITION; +import static com.linkedin.venice.controllerapi.ControllerApiConstants.STORE_VERSION; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.linkedin.venice.client.store.transport.TransportClient; +import com.linkedin.venice.utils.ObjectMapperFactory; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import org.apache.http.NameValuePair; +import org.apache.http.client.utils.URLEncodedUtils; +import org.apache.http.message.BasicNameValuePair; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + + +/** + * DvcBlobFinder discovers live DaVinci peer nodes to facilitate blob transfers necessary for bootstrapping the database + */ +public class DvcBlobFinder implements BlobFinder { + private static final Logger LOGGER = LogManager.getLogger(DvcBlobFinder.class); + private static final String TYPE_BLOB_DISCOVERY = "TYPE_BLOB_DISCOVERY"; + private static final String ERROR_DISCOVERY_MESSAGE = + "Error finding DVC peers for blob transfer in store: %s, version: %d, partition: %d"; + private final TransportClient transportClient; + + public DvcBlobFinder(TransportClient transportClient) { + this.transportClient = transportClient; + } + + @Override + public BlobPeersDiscoveryResponse discoverBlobPeers(String storeName, int version, int partition) { + String uri = buildUriForBlobDiscovery(storeName, version, partition); + + CompletableFuture futureResponse = transportClient.get(uri).thenApply(response -> { + byte[] responseBody = response.getBody(); + ObjectMapper mapper = ObjectMapperFactory.getInstance(); + try { + return mapper.readValue(responseBody, BlobPeersDiscoveryResponse.class); + } catch (IOException e) { + return handleError(ERROR_DISCOVERY_MESSAGE, storeName, version, partition, e); + } + }).exceptionally(throwable -> handleError(ERROR_DISCOVERY_MESSAGE, storeName, version, partition, throwable)); + + return futureResponse.join(); + } + + private String buildUriForBlobDiscovery(String storeName, int version, int partition) { + List queryParams = new ArrayList<>(); + queryParams.add(new BasicNameValuePair(NAME, storeName)); + queryParams.add(new BasicNameValuePair(STORE_VERSION, Integer.toString(version))); + queryParams.add(new BasicNameValuePair(STORE_PARTITION, Integer.toString(partition))); + String queryString = URLEncodedUtils.format(queryParams, StandardCharsets.UTF_8); + + return String.format("/%s?%s", TYPE_BLOB_DISCOVERY, queryString); + } + + private BlobPeersDiscoveryResponse handleError( + String errorMessage, + String storeName, + int version, + int partition, + Throwable throwable) { + BlobPeersDiscoveryResponse errorResponse = new BlobPeersDiscoveryResponse(); + String errorMsg = String.format(errorMessage, storeName, version, partition); + errorResponse.setError(true); + errorResponse.setErrorMessage(errorMsg); + LOGGER.error(errorMsg, throwable); + return errorResponse; + } + + @Override + public void close() throws IOException { + transportClient.close(); + } +} diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/blobtransfer/NettyP2PBlobTransferManager.java b/internal/venice-common/src/main/java/com/linkedin/venice/blobtransfer/NettyP2PBlobTransferManager.java index 0f59773590..a3680380ed 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/blobtransfer/NettyP2PBlobTransferManager.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/blobtransfer/NettyP2PBlobTransferManager.java @@ -66,5 +66,6 @@ public CompletionStage get(String storeName, int version, int parti public void close() throws Exception { blobTransferService.close(); nettyClient.close(); + peerFinder.close(); } } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/blobtransfer/ServerBlobFinder.java b/internal/venice-common/src/main/java/com/linkedin/venice/blobtransfer/ServerBlobFinder.java index b634352595..f667b69508 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/blobtransfer/ServerBlobFinder.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/blobtransfer/ServerBlobFinder.java @@ -51,4 +51,9 @@ public BlobPeersDiscoveryResponse discoverBlobPeers(String storeName, int versio return response; } + + @Override + public void close() { + } + } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerApiConstants.java b/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerApiConstants.java index c2707b2056..94951636c8 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerApiConstants.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerApiConstants.java @@ -231,7 +231,7 @@ public class ControllerApiConstants { public static final String UNUSED_SCHEMA_DELETION_ENABLED = "unused_schema_deletion_enabled"; - public static final String BLOB_TRANSFER_ENABLED = "blob.transfer.enabled"; + public static final String BLOB_TRANSFER_ENABLED = "blob_transfer_enabled"; public static final String HEARTBEAT_TIMESTAMP = "heartbeat_timestamp"; } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/store/rocksdb/RocksDBUtils.java b/internal/venice-common/src/main/java/com/linkedin/venice/store/rocksdb/RocksDBUtils.java index d0a58d0e84..7e35d7424e 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/store/rocksdb/RocksDBUtils.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/store/rocksdb/RocksDBUtils.java @@ -2,6 +2,10 @@ import com.linkedin.venice.exceptions.VeniceException; import java.io.File; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Comparator; public class RocksDBUtils { @@ -95,4 +99,27 @@ public static int extractTempRMDSSTFileNo(String fileName) { } return Integer.parseInt(fileName.substring(TEMP_RMD_SST_FILE_PREFIX.length())); } + + /** + * Deletes the files associated with the specified store, version, and partition. + * + * @param storeName the name of the store + * @param version the version number of the store + * @param partition the partition ID + */ + public static void deletePartitionDir(String baseDir, String storeName, int version, int partition) { + String topicName = storeName + "_v" + version; + String partitionDir = composePartitionDbDir(baseDir, topicName, partition); + + Path path = null; + try { + path = Paths.get(partitionDir); + if (Files.exists(path)) { + Files.walk(path).sorted(Comparator.reverseOrder()).map(Path::toFile).forEach(File::delete); + } + } catch (Exception e) { + throw new VeniceException( + String.format("Error occurred while deleting blobs at path: %s. %s ", path, e.getMessage())); + } + } } diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/blobtransfer/DvcBlobFinderTest.java b/internal/venice-common/src/test/java/com/linkedin/venice/blobtransfer/DvcBlobFinderTest.java new file mode 100644 index 0000000000..73d86e2d80 --- /dev/null +++ b/internal/venice-common/src/test/java/com/linkedin/venice/blobtransfer/DvcBlobFinderTest.java @@ -0,0 +1,108 @@ +package com.linkedin.venice.blobtransfer; + +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.testng.AssertJUnit.assertEquals; +import static org.testng.AssertJUnit.assertTrue; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.linkedin.venice.client.store.transport.TransportClient; +import com.linkedin.venice.client.store.transport.TransportClientResponse; +import com.linkedin.venice.utils.ObjectMapperFactory; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.CompletableFuture; +import org.mockito.ArgumentCaptor; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + + +public class DvcBlobFinderTest { + private TransportClient transportClient; + private DvcBlobFinder dvcBlobFinder; + private static final String storeName = "testStore"; + private static final int version = 1; + private static final int partition = 1; + + @BeforeMethod + public void setUp() { + transportClient = mock(TransportClient.class); + dvcBlobFinder = new DvcBlobFinder(transportClient); + } + + @Test + public void testDiscoverBlobPeers_Success() { + String responseBodyJson = + "{\"error\":false,\"errorMessage\":\"\",\"discoveryResult\":[\"host1\",\"host2\",\"host3\"]}"; + byte[] responseBody = responseBodyJson.getBytes(StandardCharsets.UTF_8); + TransportClientResponse mockResponse = new TransportClientResponse(0, null, responseBody); + + CompletableFuture futureResponse = CompletableFuture.completedFuture(mockResponse); + when(transportClient.get(anyString())).thenReturn(futureResponse); + + BlobPeersDiscoveryResponse response = dvcBlobFinder.discoverBlobPeers(storeName, version, partition); + assertEquals(3, response.getDiscoveryResult().size()); + } + + @Test + public void testDiscoverBlobPeers_CallsTransportClientWithCorrectURI() { + String responseBodyJson = + "{\"error\":false,\"errorMessage\":\"\",\"discoveryResult\":[\"host1\",\"host2\",\"host3\"]}"; + byte[] responseBody = responseBodyJson.getBytes(StandardCharsets.UTF_8); + TransportClientResponse mockResponse = new TransportClientResponse(0, null, responseBody); + + CompletableFuture futureResponse = CompletableFuture.completedFuture(mockResponse); + when(transportClient.get(anyString())).thenReturn(futureResponse); + + dvcBlobFinder.discoverBlobPeers(storeName, version, partition); + + // Capture the argument passed to transportClient.get + ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(String.class); + verify(transportClient).get(argumentCaptor.capture()); + + String expectedUri = String.format( + "/TYPE_BLOB_DISCOVERY?store_name=%s&store_version=%d&store_partition=%d", + storeName, + version, + partition); + assertEquals(expectedUri, argumentCaptor.getValue()); + } + + @Test + public void testDiscoverBlobPeers_IOException() throws Exception { + String responseBodyJson = "{\"error\":true,\"errorMessage\":\"some error\",\"discoveryResult\":[]}"; + byte[] responseBody = responseBodyJson.getBytes(StandardCharsets.UTF_8); + TransportClientResponse mockResponse = new TransportClientResponse(0, null, responseBody); + + CompletableFuture futureResponse = CompletableFuture.completedFuture(mockResponse); + when(transportClient.get(anyString())).thenReturn(futureResponse); + + ObjectMapper mapper = ObjectMapperFactory.getInstance(); + ObjectMapper mockMapper = spy(mapper); + doThrow(new IOException("Test Exception")).when(mockMapper) + .readValue(responseBody, BlobPeersDiscoveryResponse.class); + + BlobPeersDiscoveryResponse response = dvcBlobFinder.discoverBlobPeers(storeName, version, partition); + assertEquals(0, response.getDiscoveryResult().size()); + assertEquals(response.getErrorMessage(), "some error"); + assertTrue(response.isError()); + } + + @Test + public void testDiscoverBlobPeers_Exceptionally() { + CompletableFuture futureResponse = new CompletableFuture<>(); + futureResponse.completeExceptionally(new RuntimeException("Test Exception")); + when(transportClient.get(anyString())).thenReturn(futureResponse); + + BlobPeersDiscoveryResponse response = dvcBlobFinder.discoverBlobPeers(storeName, version, partition); + + assertTrue(response.isError()); + assertEquals( + response.getErrorMessage(), + "Error finding DVC peers for blob transfer in store: testStore, version: 1, partition: 1"); + } +} diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/store/rocksdb/TestRocksDBUtils.java b/internal/venice-common/src/test/java/com/linkedin/venice/store/rocksdb/TestRocksDBUtils.java new file mode 100644 index 0000000000..07cef290a8 --- /dev/null +++ b/internal/venice-common/src/test/java/com/linkedin/venice/store/rocksdb/TestRocksDBUtils.java @@ -0,0 +1,85 @@ +package com.linkedin.venice.store.rocksdb; + +import static com.linkedin.venice.store.rocksdb.RocksDBUtils.deletePartitionDir; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Comparator; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + + +public class TestRocksDBUtils { + private Path baseDir; + + @BeforeMethod + public void setUp() throws IOException { + baseDir = Files.createTempDirectory("rocksdb"); + } + + @AfterMethod + public void teardown() throws IOException { + if (baseDir != null && Files.exists(baseDir)) { + Files.walk(baseDir).sorted(Comparator.reverseOrder()).map(Path::toFile).forEach(File::delete); + + // assert the temp base directory does not exist + assertFalse(Files.exists(baseDir)); + } + } + + @Test + public void testDeletePartitionDir() throws IOException { + Files.createDirectories(baseDir.resolve("storeName_v1/storeName_v1_1")); + + Files.createFile(baseDir.resolve("storeName_v1/storeName_v1_1/file1.txt")); + Files.createFile(baseDir.resolve("storeName_v1/storeName_v1_1/file2.txt")); + + // assert files exist + assertTrue(Files.exists(baseDir.resolve("storeName_v1/storeName_v1_1/file1.txt"))); + assertTrue(Files.exists(baseDir.resolve("storeName_v1/storeName_v1_1/file2.txt"))); + + deletePartitionDir(baseDir.toString(), "storeName", 1, 1); + + // assert directory does not exist + assertFalse(Files.exists(baseDir.resolve("storeName_v1/storeName_v1_1"))); + } + + @Test + public void testDeletePartitionDir_multiplePaths() throws IOException { + // version 5, partition 5 should exist + // version 5, partition 6 should be deleted + // tmp path1 "rocksdb/storeName/5/5/file1.txt" + // tmp path2 "rocksdb/storeName/5/6/file1.txt" + + Files.createDirectories(baseDir.resolve("storeName_v5/storeName_v5_5")); + Files.createDirectories(baseDir.resolve("storeName_v5/storeName_v5_6")); + + Files.createFile(baseDir.resolve("storeName_v5/storeName_v5_5/file1.txt")); + Files.createFile(baseDir.resolve("storeName_v5/storeName_v5_6/file2.txt")); + + // assert files exist + assertTrue(Files.exists(baseDir.resolve("storeName_v5/storeName_v5_5/file1.txt"))); + assertTrue(Files.exists(baseDir.resolve("storeName_v5/storeName_v5_6/file2.txt"))); + + deletePartitionDir(baseDir.toString(), "storeName", 5, 6); + + // assert version 5, partition 5 should exist + assertTrue(Files.exists(baseDir.resolve("storeName_v5/storeName_v5_5"))); + + // assert version 5, partition 6 should not exist + assertFalse(Files.exists(baseDir.resolve("storeName_v5/storeName_v5_6"))); + } + + @Test + public void testDeletePartitionDir_EmptyFiles() throws IOException { + Files.createDirectories(baseDir.resolve("storeName_v2/storeName_v2_2")); + // assert the temp base directory does exist + assertTrue(Files.exists(baseDir)); + deletePartitionDir(baseDir.toString(), "storeName", 2, 2); + } +} diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/davinci/ingestion/IsolatedIngestionServerTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/davinci/ingestion/IsolatedIngestionServerTest.java index 6203ebb3ce..abfbaca8ed 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/davinci/ingestion/IsolatedIngestionServerTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/davinci/ingestion/IsolatedIngestionServerTest.java @@ -5,6 +5,7 @@ import static com.linkedin.davinci.ingestion.utils.IsolatedIngestionUtils.executeShellCommand; import static com.linkedin.venice.ConfigKeys.CLUSTER_NAME; import static com.linkedin.venice.ConfigKeys.D2_ZK_HOSTS_ADDRESS; +import static com.linkedin.venice.ConfigKeys.DAVINCI_P2P_BLOB_TRANSFER_PORT; import static com.linkedin.venice.ConfigKeys.KAFKA_BOOTSTRAP_SERVERS; import static com.linkedin.venice.ConfigKeys.SERVER_INGESTION_ISOLATION_CONNECTION_TIMEOUT_SECONDS; import static com.linkedin.venice.ConfigKeys.SERVER_INGESTION_ISOLATION_SERVICE_PORT; @@ -129,6 +130,7 @@ private VeniceConfigLoader getConfigLoader(int servicePort) { .put(SERVER_INGESTION_ISOLATION_CONNECTION_TIMEOUT_SECONDS, 10) .put(INGESTION_ISOLATION_CONFIG_PREFIX + "." + SERVER_PARTITION_GRACEFUL_DROP_DELAY_IN_SECONDS, 10) .put(SERVER_INGESTION_ISOLATION_SERVICE_PORT, servicePort) + .put(DAVINCI_P2P_BLOB_TRANSFER_PORT, TestUtils.getFreePort()) .build(); return new VeniceConfigLoader(properties, properties); } diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientDiskFullTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientDiskFullTest.java index 5201f366af..a26b31d679 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientDiskFullTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientDiskFullTest.java @@ -5,6 +5,7 @@ import static com.linkedin.venice.ConfigKeys.CLUSTER_DISCOVERY_D2_SERVICE; import static com.linkedin.venice.ConfigKeys.D2_ZK_HOSTS_ADDRESS; import static com.linkedin.venice.ConfigKeys.DATA_BASE_PATH; +import static com.linkedin.venice.ConfigKeys.DAVINCI_P2P_BLOB_TRANSFER_PORT; import static com.linkedin.venice.ConfigKeys.DAVINCI_PUSH_STATUS_SCAN_NO_REPORT_RETRY_MAX_ATTEMPTS; import static com.linkedin.venice.ConfigKeys.PERSISTENCE_TYPE; import static com.linkedin.venice.ConfigKeys.PUSH_STATUS_STORE_ENABLED; @@ -146,7 +147,8 @@ private VeniceProperties getDaVinciBackendConfig(boolean useDaVinciSpecificExecu .put(D2_ZK_HOSTS_ADDRESS, venice.getZk().getAddress()) .put(CLUSTER_DISCOVERY_D2_SERVICE, VeniceRouterWrapper.CLUSTER_DISCOVERY_D2_SERVICE_NAME) .put(USE_DA_VINCI_SPECIFIC_EXECUTION_STATUS_FOR_ERROR, useDaVinciSpecificExecutionStatusForError) - .put(SERVER_DISK_FULL_THRESHOLD, getDiskFullThreshold(largePushRecordCount, largePushRecordMinSize)); + .put(SERVER_DISK_FULL_THRESHOLD, getDiskFullThreshold(largePushRecordCount, largePushRecordMinSize)) + .put(DAVINCI_P2P_BLOB_TRANSFER_PORT, TestUtils.getFreePort()); return venicePropertyBuilder.build(); } diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientMemoryLimitTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientMemoryLimitTest.java index 1d7f7a0959..f6cc99a733 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientMemoryLimitTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientMemoryLimitTest.java @@ -7,6 +7,7 @@ import static com.linkedin.venice.ConfigKeys.CLUSTER_DISCOVERY_D2_SERVICE; import static com.linkedin.venice.ConfigKeys.D2_ZK_HOSTS_ADDRESS; import static com.linkedin.venice.ConfigKeys.DATA_BASE_PATH; +import static com.linkedin.venice.ConfigKeys.DAVINCI_P2P_BLOB_TRANSFER_PORT; import static com.linkedin.venice.ConfigKeys.DAVINCI_PUSH_STATUS_SCAN_NO_REPORT_RETRY_MAX_ATTEMPTS; import static com.linkedin.venice.ConfigKeys.INGESTION_MEMORY_LIMIT; import static com.linkedin.venice.ConfigKeys.INGESTION_MEMORY_LIMIT_STORE_LIST; @@ -127,7 +128,8 @@ private VeniceProperties getDaVinciBackendConfig( .put(ROCKSDB_MEMTABLE_SIZE_IN_BYTES, "2MB") .put(ROCKSDB_TOTAL_MEMTABLE_USAGE_CAP_IN_BYTES, "10MB") .put(INGESTION_MEMORY_LIMIT_STORE_LIST, String.join(",", memoryLimitStores)) - .put(USE_DA_VINCI_SPECIFIC_EXECUTION_STATUS_FOR_ERROR, useDaVinciSpecificExecutionStatusForError); + .put(USE_DA_VINCI_SPECIFIC_EXECUTION_STATUS_FOR_ERROR, useDaVinciSpecificExecutionStatusForError) + .put(DAVINCI_P2P_BLOB_TRANSFER_PORT, TestUtils.getFreePort()); if (ingestionIsolationEnabledInDaVinci) { venicePropertyBuilder.put(SERVER_INGESTION_MODE, IngestionMode.ISOLATED); venicePropertyBuilder.put(SERVER_INGESTION_ISOLATION_APPLICATION_PORT, TestUtils.getFreePort()); diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientTest.java index 9d4a1da96e..2866c7803e 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientTest.java @@ -4,6 +4,7 @@ import static com.linkedin.venice.ConfigKeys.CLIENT_USE_SYSTEM_STORE_REPOSITORY; import static com.linkedin.venice.ConfigKeys.D2_ZK_HOSTS_ADDRESS; import static com.linkedin.venice.ConfigKeys.DATA_BASE_PATH; +import static com.linkedin.venice.ConfigKeys.DAVINCI_P2P_BLOB_TRANSFER_PORT; import static com.linkedin.venice.ConfigKeys.PERSISTENCE_TYPE; import static com.linkedin.venice.ConfigKeys.SERVER_CONSUMER_POOL_SIZE_PER_KAFKA_CLUSTER; import static com.linkedin.venice.ConfigKeys.SERVER_DISK_FULL_THRESHOLD; @@ -158,6 +159,7 @@ public void testConcurrentGetAndStart() throws Exception { .put(CLIENT_SYSTEM_STORE_REPOSITORY_REFRESH_INTERVAL_SECONDS, 1) .put(DATA_BASE_PATH, baseDataPath) .put(PERSISTENCE_TYPE, ROCKS_DB) + .put(DAVINCI_P2P_BLOB_TRANSFER_PORT, TestUtils.getFreePort()) .build(); for (int i = 0; i < 10; ++i) { @@ -205,6 +207,7 @@ public void testBatchStore(DaVinciConfig clientConfig) throws Exception { .put(CLIENT_SYSTEM_STORE_REPOSITORY_REFRESH_INTERVAL_SECONDS, 1) .put(DATA_BASE_PATH, baseDataPath) .put(PERSISTENCE_TYPE, ROCKS_DB) + .put(DAVINCI_P2P_BLOB_TRANSFER_PORT, TestUtils.getFreePort()) .build(); MetricsRepository metricsRepository = new MetricsRepository(); @@ -339,6 +342,7 @@ public void testDavinciSubscribeFailureWithFullDisk() throws Exception { backendConfigMap.put(CLIENT_USE_SYSTEM_STORE_REPOSITORY, true); backendConfigMap.put(CLIENT_SYSTEM_STORE_REPOSITORY_REFRESH_INTERVAL_SECONDS, 10); backendConfigMap.put(SERVER_DISK_FULL_THRESHOLD, 0.01); // force it to fail + backendConfigMap.put(DAVINCI_P2P_BLOB_TRANSFER_PORT, TestUtils.getFreePort()); try (DaVinciClient daVinciClient = ServiceFactory.getGenericAvroDaVinciClientWithRetries( storeName, @@ -366,6 +370,7 @@ public void testObjectReuse(DaVinciConfig clientConfig) throws Exception { // .put(CLIENT_SYSTEM_STORE_REPOSITORY_REFRESH_INTERVAL_SECONDS, 1) .put(DATA_BASE_PATH, baseDataPath) .put(PERSISTENCE_TYPE, ROCKS_DB) + .put(DAVINCI_P2P_BLOB_TRANSFER_PORT, TestUtils.getFreePort()) .build(); MetricsRepository metricsRepository = new MetricsRepository(); @@ -618,6 +623,7 @@ public void testHybridStoreWithoutIngestionIsolation(DaVinciConfig daVinciConfig .put(CLIENT_SYSTEM_STORE_REPOSITORY_REFRESH_INTERVAL_SECONDS, 1) .put(DATA_BASE_PATH, Utils.getTempDataDirectory().getAbsolutePath()) .put(PERSISTENCE_TYPE, ROCKS_DB) + .put(DAVINCI_P2P_BLOB_TRANSFER_PORT, TestUtils.getFreePort()) .build(); MetricsRepository metricsRepository = new MetricsRepository(); @@ -682,6 +688,7 @@ public void testHybridStore() throws Exception { .put(CLIENT_SYSTEM_STORE_REPOSITORY_REFRESH_INTERVAL_SECONDS, 1) .put(DATA_BASE_PATH, Utils.getTempDataDirectory().getAbsolutePath()) .put(PERSISTENCE_TYPE, ROCKS_DB) + .put(DAVINCI_P2P_BLOB_TRANSFER_PORT, TestUtils.getFreePort()) .build(); MetricsRepository metricsRepository = new MetricsRepository(); @@ -848,7 +855,8 @@ public void testBootstrap(DaVinciConfig daVinciConfig) throws Exception { @Test(timeOut = TEST_TIMEOUT, dataProvider = "dv-client-config-provider", dataProviderClass = DataProviderUtils.class) public void testPartialSubscription(DaVinciConfig daVinciConfig) throws Exception { String storeName = createStoreWithMetaSystemStore(KEY_COUNT); - VeniceProperties backendConfig = new PropertyBuilder().build(); + VeniceProperties backendConfig = + new PropertyBuilder().put(DAVINCI_P2P_BLOB_TRANSFER_PORT, TestUtils.getFreePort()).build(); Set keySet = new HashSet<>(); for (int i = 0; i < KEY_COUNT; ++i) { @@ -956,6 +964,7 @@ public void testCrashedDaVinciWithIngestionIsolation() throws Exception { .put(DATA_BASE_PATH, baseDataPath) .put(PERSISTENCE_TYPE, ROCKS_DB) .put(D2_ZK_HOSTS_ADDRESS, zkHosts) + .put(DAVINCI_P2P_BLOB_TRANSFER_PORT, TestUtils.getFreePort()) .build(); // Re-open the same store's database to verify RocksDB metadata partition's lock has been released. diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/fastclient/FastClientDaVinciClientCompatTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/fastclient/FastClientDaVinciClientCompatTest.java index cb02b9f490..ef879e3749 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/fastclient/FastClientDaVinciClientCompatTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/fastclient/FastClientDaVinciClientCompatTest.java @@ -3,6 +3,7 @@ import static com.linkedin.venice.ConfigKeys.CLIENT_USE_DA_VINCI_BASED_SYSTEM_STORE_REPOSITORY; import static com.linkedin.venice.ConfigKeys.CLIENT_USE_SYSTEM_STORE_REPOSITORY; import static com.linkedin.venice.ConfigKeys.DATA_BASE_PATH; +import static com.linkedin.venice.ConfigKeys.DAVINCI_P2P_BLOB_TRANSFER_PORT; import static com.linkedin.venice.ConfigKeys.PERSISTENCE_TYPE; import static com.linkedin.venice.meta.PersistenceType.ROCKS_DB; @@ -15,6 +16,7 @@ import com.linkedin.venice.fastclient.utils.AbstractClientEndToEndSetup; import com.linkedin.venice.integration.utils.VeniceRouterWrapper; import com.linkedin.venice.utils.PropertyBuilder; +import com.linkedin.venice.utils.TestUtils; import com.linkedin.venice.utils.Utils; import com.linkedin.venice.utils.VeniceProperties; import io.tehuti.metrics.MetricsRepository; @@ -116,6 +118,7 @@ private DaVinciClient setupDaVinciClient(String storeNa .put(CLIENT_USE_SYSTEM_STORE_REPOSITORY, true) .put(CLIENT_USE_DA_VINCI_BASED_SYSTEM_STORE_REPOSITORY, false) .put(DATA_BASE_PATH, dataPath) + .put(DAVINCI_P2P_BLOB_TRANSFER_PORT, TestUtils.getFreePort()) .build(); daVinciClientFactory = new CachingDaVinciClientFactory( d2Client, diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/fastclient/utils/AbstractClientEndToEndSetup.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/fastclient/utils/AbstractClientEndToEndSetup.java index 26d8d6cf37..89cc845e0c 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/fastclient/utils/AbstractClientEndToEndSetup.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/fastclient/utils/AbstractClientEndToEndSetup.java @@ -3,6 +3,7 @@ import static com.linkedin.venice.ConfigKeys.CLIENT_USE_DA_VINCI_BASED_SYSTEM_STORE_REPOSITORY; import static com.linkedin.venice.ConfigKeys.CLIENT_USE_SYSTEM_STORE_REPOSITORY; import static com.linkedin.venice.ConfigKeys.DATA_BASE_PATH; +import static com.linkedin.venice.ConfigKeys.DAVINCI_P2P_BLOB_TRANSFER_PORT; import static com.linkedin.venice.ConfigKeys.PERSISTENCE_TYPE; import static com.linkedin.venice.ConfigKeys.SERVER_HTTP2_INBOUND_ENABLED; import static com.linkedin.venice.ConfigKeys.SERVER_QUOTA_ENFORCEMENT_ENABLED; @@ -304,6 +305,7 @@ private void prepareMetaSystemStore() throws Exception { .put(CLIENT_USE_SYSTEM_STORE_REPOSITORY, true) .put(CLIENT_USE_DA_VINCI_BASED_SYSTEM_STORE_REPOSITORY, true) .put(DATA_BASE_PATH, dataPath) + .put(DAVINCI_P2P_BLOB_TRANSFER_PORT, TestUtils.getFreePort()) .build(); // Verify meta system store received the snapshot writes. diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/DaVinciTestContext.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/DaVinciTestContext.java index d142a57b43..b94f43c895 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/DaVinciTestContext.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/DaVinciTestContext.java @@ -5,6 +5,7 @@ import static com.linkedin.venice.ConfigKeys.CLUSTER_DISCOVERY_D2_SERVICE; import static com.linkedin.venice.ConfigKeys.D2_ZK_HOSTS_ADDRESS; import static com.linkedin.venice.ConfigKeys.DATA_BASE_PATH; +import static com.linkedin.venice.ConfigKeys.DAVINCI_P2P_BLOB_TRANSFER_PORT; import static com.linkedin.venice.ConfigKeys.PERSISTENCE_TYPE; import static com.linkedin.venice.ConfigKeys.SERVER_INGESTION_ISOLATION_APPLICATION_PORT; import static com.linkedin.venice.ConfigKeys.SERVER_INGESTION_ISOLATION_SERVICE_PORT; @@ -132,6 +133,7 @@ public static PropertyBuilder getDaVinciPropertyBuilder(String zkAddress) { .put(CLIENT_USE_SYSTEM_STORE_REPOSITORY, true) .put(CLIENT_SYSTEM_STORE_REPOSITORY_REFRESH_INTERVAL_SECONDS, 1) .put(D2_ZK_HOSTS_ADDRESS, zkAddress) - .put(CLUSTER_DISCOVERY_D2_SERVICE, VeniceRouterWrapper.CLUSTER_DISCOVERY_D2_SERVICE_NAME); + .put(CLUSTER_DISCOVERY_D2_SERVICE, VeniceRouterWrapper.CLUSTER_DISCOVERY_D2_SERVICE_NAME) + .put(DAVINCI_P2P_BLOB_TRANSFER_PORT, TestUtils.getFreePort()); } } diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/router/TestBlobDiscovery.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/router/TestBlobDiscovery.java index bd36762d5c..006837b151 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/router/TestBlobDiscovery.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/router/TestBlobDiscovery.java @@ -15,6 +15,7 @@ import com.linkedin.davinci.client.DaVinciConfig; import com.linkedin.davinci.client.factory.CachingDaVinciClientFactory; import com.linkedin.venice.D2.D2ClientUtils; +import com.linkedin.venice.blobtransfer.BlobPeersDiscoveryResponse; import com.linkedin.venice.common.VeniceSystemStoreType; import com.linkedin.venice.common.VeniceSystemStoreUtils; import com.linkedin.venice.controllerapi.ControllerClient; @@ -32,7 +33,6 @@ import com.linkedin.venice.meta.PersistenceType; import com.linkedin.venice.meta.Version; import com.linkedin.venice.pubsub.PubSubProducerAdapterFactory; -import com.linkedin.venice.routerapi.BlobDiscoveryResponse; import com.linkedin.venice.utils.ObjectMapperFactory; import com.linkedin.venice.utils.PropertyBuilder; import com.linkedin.venice.utils.TestUtils; @@ -215,10 +215,10 @@ public void testBlobDiscovery() throws Exception { HttpStatus.SC_OK, "Failed to get resource state for " + storeName + ". Response: " + responseBody); ObjectMapper mapper = ObjectMapperFactory.getInstance(); - BlobDiscoveryResponse blobDiscoveryResponse = - mapper.readValue(responseBody.getBytes(), BlobDiscoveryResponse.class); + BlobPeersDiscoveryResponse blobDiscoveryResponse = + mapper.readValue(responseBody.getBytes(), BlobPeersDiscoveryResponse.class); // TODO: add another testcase to retrieve >= 1 live nodes - Assert.assertEquals(blobDiscoveryResponse.getLiveNodeHostNames().size(), 0); + Assert.assertEquals(blobDiscoveryResponse.getDiscoveryResult().size(), 0); } catch (Exception e) { fail("Unexpected exception", e); } diff --git a/services/venice-router/src/main/java/com/linkedin/venice/router/MetaDataHandler.java b/services/venice-router/src/main/java/com/linkedin/venice/router/MetaDataHandler.java index e72c246f5f..c0df743b72 100644 --- a/services/venice-router/src/main/java/com/linkedin/venice/router/MetaDataHandler.java +++ b/services/venice-router/src/main/java/com/linkedin/venice/router/MetaDataHandler.java @@ -29,6 +29,7 @@ import static io.netty.handler.codec.http.HttpResponseStatus.UNAUTHORIZED; import com.fasterxml.jackson.databind.ObjectMapper; +import com.linkedin.venice.blobtransfer.BlobPeersDiscoveryResponse; import com.linkedin.venice.compression.CompressionStrategy; import com.linkedin.venice.controllerapi.CurrentVersionResponse; import com.linkedin.venice.controllerapi.D2ServiceDiscoveryResponse; @@ -60,7 +61,6 @@ import com.linkedin.venice.router.api.RouterResourceType; import com.linkedin.venice.router.api.VenicePathParserHelper; import com.linkedin.venice.router.api.VeniceVersionFinder; -import com.linkedin.venice.routerapi.BlobDiscoveryResponse; import com.linkedin.venice.routerapi.HybridStoreQuotaStatusResponse; import com.linkedin.venice.routerapi.PushStatusResponse; import com.linkedin.venice.routerapi.ReplicaState; @@ -541,7 +541,7 @@ private void handleBlobDiscovery(ChannelHandlerContext ctx, VenicePathParserHelp return; } - BlobDiscoveryResponse response = new BlobDiscoveryResponse(); + BlobPeersDiscoveryResponse response = new BlobPeersDiscoveryResponse(); try { // gets the instances for a FULL_PUSH for the store's version and partitionId // gets the instance's hostnames from its keys & filter to include only live instances @@ -556,7 +556,7 @@ private void handleBlobDiscovery(ChannelHandlerContext ctx, VenicePathParserHelp .map(CharSequence::toString) .filter(instanceHostName -> pushStatusStoreReader.isInstanceAlive(storeName, instanceHostName)) .collect(Collectors.toList()); - response.setLiveNodeNames(liveNodeHostNames); + response.setDiscoveryResult(liveNodeHostNames); } catch (VeniceException e) { byte[] errBody = (String.format(REQUEST_BLOB_DISCOVERY_ERROR_PUSH_STORE, storeName, storeVersion, storePartition)).getBytes(); diff --git a/services/venice-router/src/test/java/com/linkedin/venice/router/TestMetaDataHandler.java b/services/venice-router/src/test/java/com/linkedin/venice/router/TestMetaDataHandler.java index f20ef46050..bcafc4d2a5 100644 --- a/services/venice-router/src/test/java/com/linkedin/venice/router/TestMetaDataHandler.java +++ b/services/venice-router/src/test/java/com/linkedin/venice/router/TestMetaDataHandler.java @@ -22,6 +22,7 @@ import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_TYPE; import com.fasterxml.jackson.databind.ObjectMapper; +import com.linkedin.venice.blobtransfer.BlobPeersDiscoveryResponse; import com.linkedin.venice.common.VeniceSystemStoreType; import com.linkedin.venice.controllerapi.D2ServiceDiscoveryResponse; import com.linkedin.venice.controllerapi.LeaderControllerResponse; @@ -57,7 +58,6 @@ import com.linkedin.venice.pushmonitor.ExecutionStatus; import com.linkedin.venice.pushmonitor.HybridStoreQuotaStatus; import com.linkedin.venice.pushstatushelper.PushStatusStoreReader; -import com.linkedin.venice.routerapi.BlobDiscoveryResponse; import com.linkedin.venice.routerapi.HybridStoreQuotaStatusResponse; import com.linkedin.venice.routerapi.ReplicaState; import com.linkedin.venice.routerapi.ResourceStateResponse; @@ -1642,9 +1642,9 @@ public void testHandleBlobDiscoveryLiveNodes() throws IOException { storeRepository, pushStatusStoreReader); - BlobDiscoveryResponse blobDiscoveryResponse = - OBJECT_MAPPER.readValue(response.content().array(), BlobDiscoveryResponse.class); - List hostNames = blobDiscoveryResponse.getLiveNodeHostNames(); + BlobPeersDiscoveryResponse blobDiscoveryResponse = + OBJECT_MAPPER.readValue(response.content().array(), BlobPeersDiscoveryResponse.class); + List hostNames = blobDiscoveryResponse.getDiscoveryResult(); Collections.sort(hostNames); Collections.sort(expectedResult); @@ -1716,9 +1716,9 @@ public void testHandleBlobDiscoveryDeadNodes() throws IOException { storeRepository, pushStatusStoreReader); - BlobDiscoveryResponse blobDiscoveryResponse = - OBJECT_MAPPER.readValue(response.content().array(), BlobDiscoveryResponse.class); - List hostNames = blobDiscoveryResponse.getLiveNodeHostNames(); + BlobPeersDiscoveryResponse blobDiscoveryResponse = + OBJECT_MAPPER.readValue(response.content().array(), BlobPeersDiscoveryResponse.class); + List hostNames = blobDiscoveryResponse.getDiscoveryResult(); Assert.assertEquals(hostNames, expectedResult); Assert.assertEquals(response.status(), HttpResponseStatus.OK);