diff --git a/clients/venice-client/src/main/java/com/linkedin/venice/fastclient/DispatchingAvroGenericStoreClient.java b/clients/venice-client/src/main/java/com/linkedin/venice/fastclient/DispatchingAvroGenericStoreClient.java index 9ec8fae459..75bc934a5c 100644 --- a/clients/venice-client/src/main/java/com/linkedin/venice/fastclient/DispatchingAvroGenericStoreClient.java +++ b/clients/venice-client/src/main/java/com/linkedin/venice/fastclient/DispatchingAvroGenericStoreClient.java @@ -314,14 +314,10 @@ protected CompletableFuture> batchGet(BatchGetRequestContext req if (throwable != null) { responseFuture.completeExceptionally(throwable); } else if (!response.isFullResponse()) { - if (requestContext.getPartialResponseException().isPresent()) { - responseFuture.completeExceptionally( - new VeniceClientException( - "Response was not complete", - requestContext.getPartialResponseException().get())); - } else { - responseFuture.completeExceptionally(new VeniceClientException("Response was not complete")); - } + responseFuture.completeExceptionally( + new VeniceClientException( + "Response was not complete", + requestContext.getPartialResponseException().orElse(null))); } else { responseFuture.complete(response); } @@ -334,6 +330,7 @@ protected CompletableFuture> streamingBatchGet( BatchGetRequestContext requestContext, Set keys) throws VeniceClientException { verifyMetadataInitialized(); + int keySize = keys.size(); // keys that do not exist in the storage nodes Queue nonExistingKeys = new ConcurrentLinkedQueue<>(); VeniceConcurrentHashMap valueMap = new VeniceConcurrentHashMap<>(); @@ -358,7 +355,7 @@ public void onCompletion(Optional exception) { if (exception.isPresent()) { streamingResponseFuture.completeExceptionally(exception.get()); } else { - boolean isFullResponse = ((valueMap.size() + nonExistingKeys.size()) == keys.size()); + boolean isFullResponse = (valueMap.size() + nonExistingKeys.size()) == keySize; streamingResponseFuture.complete(new VeniceResponseMapImpl<>(valueMap, nonExistingKeys, isFullResponse)); } } @@ -393,9 +390,8 @@ protected void streamingBatchGet( * that exception will be passed to the aggregate future's next stages. */ CompletableFuture.allOf(requestContext.getAllRouteFutures().toArray(new CompletableFuture[0])) .whenComplete((response, throwable) -> { - if (throwable == null) { - callback.onCompletion(Optional.empty()); - } else { + if (throwable != null || (!keys.isEmpty() && requestContext.getAllRouteFutures().isEmpty())) { + // If there is an exception or if no partition has a healthy replica. // The exception to send to the client might be different. Get from the requestContext Throwable clientException = throwable; if (requestContext.getPartialResponseException().isPresent()) { @@ -403,6 +399,8 @@ protected void streamingBatchGet( } callback.onCompletion( Optional.of(new VeniceClientException("At least one route did not complete", clientException))); + } else { + callback.onCompletion(Optional.empty()); } }); } @@ -452,10 +450,10 @@ private void streamingBatchGetInternal( * an error */ requestContext.noAvailableReplica = true; String errorMessage = String.format( - "No route found for partitionId: %s, store: %s, version: %s", - partitionId, + "No available route for store: %s, version: %s, partitionId: %s", getStoreName(), - currentVersion); + currentVersion, + partitionId); LOGGER.error(errorMessage); requestContext.setPartialResponseException(new VeniceClientException(errorMessage)); } diff --git a/clients/venice-client/src/main/java/com/linkedin/venice/fastclient/RetriableAvroGenericStoreClient.java b/clients/venice-client/src/main/java/com/linkedin/venice/fastclient/RetriableAvroGenericStoreClient.java index dc58aa3240..f07c839b6d 100644 --- a/clients/venice-client/src/main/java/com/linkedin/venice/fastclient/RetriableAvroGenericStoreClient.java +++ b/clients/venice-client/src/main/java/com/linkedin/venice/fastclient/RetriableAvroGenericStoreClient.java @@ -175,7 +175,28 @@ protected CompletableFuture get(GetRequestContext requestContext, K key) thro */ protected CompletableFuture> batchGet(BatchGetRequestContext requestContext, Set keys) throws VeniceClientException { + CompletableFuture> responseFuture = new CompletableFuture<>(); + CompletableFuture> streamingResponseFuture = streamingBatchGet(requestContext, keys); + streamingResponseFuture.whenComplete((response, throwable) -> { + if (throwable != null) { + responseFuture.completeExceptionally(throwable); + } else if (!response.isFullResponse()) { + responseFuture.completeExceptionally( + new VeniceClientException( + "Response was not complete", + requestContext.getPartialResponseException().orElse(null))); + } else { + responseFuture.complete(response); + } + }); + return responseFuture; + } + + protected CompletableFuture> streamingBatchGet( + BatchGetRequestContext requestContext, + Set keys) throws VeniceClientException { // keys that do not exist in the storage nodes + int keysSize = keys.size(); Queue nonExistingKeys = new ConcurrentLinkedQueue<>(); VeniceConcurrentHashMap valueMap = new VeniceConcurrentHashMap<>(); CompletableFuture> streamingResponseFuture = new VeniceResponseCompletableFuture<>( @@ -199,29 +220,12 @@ public void onCompletion(Optional exception) { if (exception.isPresent()) { streamingResponseFuture.completeExceptionally(exception.get()); } else { - boolean isFullResponse = (valueMap.size() + nonExistingKeys.size()) == keys.size(); + boolean isFullResponse = (valueMap.size() + nonExistingKeys.size()) == keysSize; streamingResponseFuture.complete(new VeniceResponseMapImpl<>(valueMap, nonExistingKeys, isFullResponse)); } } }); - CompletableFuture> responseFuture = new CompletableFuture<>(); - streamingResponseFuture.whenComplete((response, throwable) -> { - if (throwable != null) { - responseFuture.completeExceptionally(throwable); - } else if (!response.isFullResponse()) { - if (requestContext.getPartialResponseException().isPresent()) { - responseFuture.completeExceptionally( - new VeniceClientException( - "Response was not complete", - requestContext.getPartialResponseException().get())); - } else { - responseFuture.completeExceptionally(new VeniceClientException("Response was not complete")); - } - } else { - responseFuture.complete(response); - } - }); - return responseFuture; + return streamingResponseFuture; } @Override @@ -301,7 +305,7 @@ public void streamingBatchGet( if (finalException == null) { callback.onCompletion(Optional.empty()); } else { - callback.onCompletion(Optional.of(new VeniceClientException("Request failed with exception ", finalException))); + callback.onCompletion(Optional.of(new VeniceClientException("Request failed with exception", finalException))); } }); } diff --git a/clients/venice-client/src/main/java/com/linkedin/venice/fastclient/StatsAvroGenericStoreClient.java b/clients/venice-client/src/main/java/com/linkedin/venice/fastclient/StatsAvroGenericStoreClient.java index c7591004a1..c04ac16844 100644 --- a/clients/venice-client/src/main/java/com/linkedin/venice/fastclient/StatsAvroGenericStoreClient.java +++ b/clients/venice-client/src/main/java/com/linkedin/venice/fastclient/StatsAvroGenericStoreClient.java @@ -31,6 +31,7 @@ public class StatsAvroGenericStoreClient extends DelegatingAvroStoreClient private final FastClientStats clientStatsForSingleGet; private final FastClientStats clientStatsForBatchGet; + private final FastClientStats clientStatsForStreamingBatchGet; private final ClusterStats clusterStats; private final int maxAllowedKeyCntInBatchGetReq; @@ -40,6 +41,7 @@ public StatsAvroGenericStoreClient(InternalAvroStoreClient delegate, Clien super(delegate); this.clientStatsForSingleGet = clientConfig.getStats(RequestType.SINGLE_GET); this.clientStatsForBatchGet = clientConfig.getStats(RequestType.MULTI_GET); + this.clientStatsForStreamingBatchGet = clientConfig.getStats(RequestType.MULTI_GET_STREAMING); this.clusterStats = clientConfig.getClusterStats(); this.maxAllowedKeyCntInBatchGetReq = clientConfig.getMaxAllowedKeyCntInBatchGetReq(); this.useStreamingBatchGetAsDefault = clientConfig.useStreamingBatchGetAsDefault(); @@ -121,7 +123,7 @@ protected void streamingBatchGet( requestContext, keys, new StatTrackingStreamingCallBack<>(callback, statFuture, requestContext)); - recordMetrics(requestContext, keys.size(), statFuture, startTimeInNS, clientStatsForBatchGet); + recordMetrics(requestContext, keys.size(), statFuture, startTimeInNS, clientStatsForStreamingBatchGet); } @Override @@ -130,7 +132,7 @@ protected CompletableFuture> streamingBatchGet( Set keys) { long startTimeInNS = System.nanoTime(); CompletableFuture> streamingBatchGetFuture = super.streamingBatchGet(requestContext, keys); - recordMetrics(requestContext, keys.size(), streamingBatchGetFuture, startTimeInNS, clientStatsForBatchGet); + recordMetrics(requestContext, keys.size(), streamingBatchGetFuture, startTimeInNS, clientStatsForStreamingBatchGet); return streamingBatchGetFuture; } @@ -148,6 +150,14 @@ private CompletableFuture recordMetrics( return AppTimeOutTrackingCompletableFuture.track(statFuture, clientStats); } + /** + * Metrics are incremented after one of the below cases + * 1. request is complete or + * 2. exception is thrown or + * 3. routingLeakedRequestCleanupThresholdMS is elapsed: In case of streamingBatchGet.get(timeout) returning + * partial response and this timeout happens after than and before the full response is returned, + * it will still raise a silent exception leading to the request being considered an unhealthy request. + */ private CompletableFuture recordRequestMetrics( RequestContext requestContext, int numberOfKeys, diff --git a/clients/venice-client/src/test/java/com/linkedin/venice/fastclient/BatchGetAvroStoreClientUnitTest.java b/clients/venice-client/src/test/java/com/linkedin/venice/fastclient/BatchGetAvroStoreClientUnitTest.java index 2a4534ec02..682dcc66ee 100644 --- a/clients/venice-client/src/test/java/com/linkedin/venice/fastclient/BatchGetAvroStoreClientUnitTest.java +++ b/clients/venice-client/src/test/java/com/linkedin/venice/fastclient/BatchGetAvroStoreClientUnitTest.java @@ -709,7 +709,7 @@ private void validateMetrics( double expectedNumberOfKeysToBeRetried, double expectedNumberOfKeysToBeRetriedSuccessfully) { Map metrics = getStats(client.getClientConfig()); - String metricPrefix = "." + client.UNIT_TEST_STORE_NAME + "--multiget_"; + String metricPrefix = "." + client.UNIT_TEST_STORE_NAME + "--multiget_streaming_"; if (totalNumberOfKeys > 0) { assertTrue(metrics.get(metricPrefix + "request.OccurrenceRate").value() > 0); assertEquals(metrics.get(metricPrefix + "request_key_count.Max").value(), totalNumberOfKeys); diff --git a/clients/venice-client/src/test/java/com/linkedin/venice/fastclient/DispatchingAvroGenericStoreClientTest.java b/clients/venice-client/src/test/java/com/linkedin/venice/fastclient/DispatchingAvroGenericStoreClientTest.java index cb13707861..e14880de78 100644 --- a/clients/venice-client/src/test/java/com/linkedin/venice/fastclient/DispatchingAvroGenericStoreClientTest.java +++ b/clients/venice-client/src/test/java/com/linkedin/venice/fastclient/DispatchingAvroGenericStoreClientTest.java @@ -266,42 +266,68 @@ private void validateRetryMetrics(boolean batchGet, String metricPrefix, boolean } private void validateSingleGetMetrics(boolean healthyRequest) { - validateMetrics(healthyRequest, false, false, false, false); + validateMetrics(healthyRequest, false, false, false, false, false, 1, 0); } private void validateMultiGetMetrics( boolean healthyRequest, boolean partialHealthyRequest, boolean useStreamingBatchGetAsDefault, - boolean noAvailableReplicas) { - validateMetrics(healthyRequest, partialHealthyRequest, true, useStreamingBatchGetAsDefault, noAvailableReplicas); + boolean streamingBatchGetApi, + boolean noAvailableReplicas, + int numKeys) { + validateMetrics( + healthyRequest, + partialHealthyRequest, + true, + useStreamingBatchGetAsDefault, + streamingBatchGetApi, + noAvailableReplicas, + numKeys, + 2); + } + + private void validateMultiGetMetrics( + boolean healthyRequest, + boolean partialHealthyRequest, + boolean useStreamingBatchGetAsDefault, + boolean streamingBatchGetApi, + boolean noAvailableReplicas, + int numKeys, + double numBlockedReplicas) { + validateMetrics( + healthyRequest, + partialHealthyRequest, + true, + useStreamingBatchGetAsDefault, + streamingBatchGetApi, + noAvailableReplicas, + numKeys, + numBlockedReplicas); } private void validateMetrics( boolean healthyRequest, boolean partialHealthyRequest, boolean batchGet, - boolean useStreamingBatchGetAsDefault, - boolean noAvailableReplicas) { + boolean useStreamingBatchGetAsDefault, // use streaming implementation for batchGet + boolean streamingBatchGetApi, // calling streamingBatchGet() API to test + boolean noAvailableReplicas, + int numKeys, + double numBlockedReplicas) { metrics = getStats(clientConfig); - String metricPrefix = "." + STORE_NAME + ((batchGet && useStreamingBatchGetAsDefault) ? "--multiget_" : "--"); + String metricPrefix = "." + STORE_NAME + + (streamingBatchGetApi + ? "--multiget_streaming_" + : ((batchGet && useStreamingBatchGetAsDefault) ? "--multiget_" : "--")); String routeMetricsPrefix = "." + STORE_NAME; - double requestKeyCount; double successKeyCount; - if (useStreamingBatchGetAsDefault) { - if (partialHealthyRequest) { - // batchGet and partialHealthyRequest: 2 request tried but only 1 request is successful - requestKeyCount = 2.0; - successKeyCount = 1.0; - } else { - requestKeyCount = 2.0; - successKeyCount = 2.0; - } + double requestKeyCount = numKeys; + if (partialHealthyRequest) { + // batchGet and partialHealthyRequest: 1 request is unsuccessful + successKeyCount = numKeys - 1; } else { - // single get: 1 - // batchGet using single get: 1 will be recorded twice rather than 2 - requestKeyCount = 1.0; - successKeyCount = 1.0; + successKeyCount = numKeys; } assertTrue(metrics.get(metricPrefix + "request.OccurrenceRate").value() > 0); assertEquals(metrics.get(metricPrefix + "request_key_count.Max").value(), requestKeyCount); @@ -350,16 +376,19 @@ private void validateMetrics( if (noAvailableReplicas) { assertTrue(metrics.get(metricPrefix + "no_available_replica_request_count.OccurrenceRate").value() > 0); TestUtils.waitForNonDeterministicAssertion(5, TimeUnit.SECONDS, () -> { - assertNotNull(metrics.get(routeMetricsPrefix + "_" + REPLICA1_NAME + "--pending_request_count.Max")); - assertEquals( - metrics.get(routeMetricsPrefix + "_" + REPLICA1_NAME + "--pending_request_count.Max").value(), - 1.0); + if (numBlockedReplicas == 2) { + // some test cases only have 1 replica having pending and some have 2. + assertNotNull(metrics.get(routeMetricsPrefix + "_" + REPLICA1_NAME + "--pending_request_count.Max")); + assertEquals( + metrics.get(routeMetricsPrefix + "_" + REPLICA1_NAME + "--pending_request_count.Max").value(), + 1.0); + } assertNotNull(metrics.get(routeMetricsPrefix + "_" + REPLICA2_NAME + "--pending_request_count.Max")); assertEquals( metrics.get(routeMetricsPrefix + "_" + REPLICA2_NAME + "--pending_request_count.Max").value(), 1.0); }); - assertEquals(metrics.get(routeMetricsPrefix + "--blocked_instance_count.Max").value(), 2.0); + assertEquals(metrics.get(routeMetricsPrefix + "--blocked_instance_count.Max").value(), numBlockedReplicas); if (batchGet) { if (useStreamingBatchGetAsDefault) { assertTrue(batchGetRequestContext.noAvailableReplica); @@ -461,7 +490,42 @@ public void testBatchGet(boolean useStreamingBatchGetAsDefault) }); } metrics = getStats(clientConfig, RequestType.MULTI_GET); - validateMultiGetMetrics(true, false, useStreamingBatchGetAsDefault, false); + validateMultiGetMetrics( + true, + false, + useStreamingBatchGetAsDefault, + false, + false, + useStreamingBatchGetAsDefault ? 2 : 1); + } finally { + tearDown(); + } + } + + @Test(dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class, timeOut = TEST_TIMEOUT) + public void testBatchGetWithEmptyKeys(boolean streamingBatchGet) + throws ExecutionException, InterruptedException, IOException { + + try { + setUpClient(true); + batchGetRequestContext = new BatchGetRequestContext<>(); + Map value; + if (streamingBatchGet) { + value = + (Map) statsAvroGenericStoreClient.streamingBatchGet(batchGetRequestContext, new HashSet()) + .get(); + } else { + value = (Map) statsAvroGenericStoreClient.batchGet(batchGetRequestContext, new HashSet()).get(); + } + assertTrue(value.isEmpty()); + metrics = getStats(clientConfig, RequestType.MULTI_GET); + String metricPrefix = "." + STORE_NAME + (streamingBatchGet ? "--multiget_streaming_" : "--multiget_"); + TestUtils.waitForNonDeterministicAssertion(5, TimeUnit.SECONDS, () -> { + assertTrue(metrics.get(metricPrefix + "request.OccurrenceRate").value() > 0); + assertTrue(metrics.get(metricPrefix + "healthy_request.OccurrenceRate").value() > 0); + assertFalse(metrics.get(metricPrefix + "request_key_count.Max").value() > 0); + assertFalse(metrics.get(metricPrefix + "unhealthy_request.OccurrenceRate").value() > 0); + }); } finally { tearDown(); } @@ -496,7 +560,13 @@ public void testBatchGetWithExceptionFromTransportLayer(boolean useStreamingBatc } else { assertTrue(e.getMessage().endsWith("Exception for client to return 503")); } - validateMultiGetMetrics(false, false, useStreamingBatchGetAsDefault, false); + validateMultiGetMetrics( + false, + false, + useStreamingBatchGetAsDefault, + false, + false, + useStreamingBatchGetAsDefault ? 2 : 1); } finally { tearDown(); } @@ -518,7 +588,7 @@ public void testBatchGetWithExceptionFromTransportLayerForOneRoute(boolean useSt BATCH_GET_KEYS.stream().forEach(key -> { assertTrue(SINGLE_GET_VALUE_RESPONSE.contentEquals(value.get(key))); }); - validateMultiGetMetrics(true, false, useStreamingBatchGetAsDefault, false); + validateMultiGetMetrics(true, false, useStreamingBatchGetAsDefault, false, false, 1); } } catch (Exception e) { if (useStreamingBatchGetAsDefault) { @@ -526,7 +596,13 @@ public void testBatchGetWithExceptionFromTransportLayerForOneRoute(boolean useSt } else { fail(); } - validateMultiGetMetrics(false, true, useStreamingBatchGetAsDefault, false); + validateMultiGetMetrics( + false, + true, + useStreamingBatchGetAsDefault, + false, + false, + useStreamingBatchGetAsDefault ? 2 : 1); } finally { tearDown(); } @@ -553,7 +629,7 @@ public void testBatchGetWithTimeoutV1() throws IOException, ExecutionException, TestUtils.waitForNonDeterministicAssertion(routingLeakedRequestCleanupThresholdMS + 1, TimeUnit.SECONDS, () -> { assertTrue(metrics.get("." + STORE_NAME + "--multiget_request.OccurrenceRate").value() > 0); }); - validateMultiGetMetrics(false, true, true, false); + validateMultiGetMetrics(false, true, true, false, false, 2); tearDown(); } } @@ -580,7 +656,7 @@ public void testBatchGetWithTimeoutV2() TestUtils.waitForNonDeterministicAssertion(routingLeakedRequestCleanupThresholdMS + 1, TimeUnit.SECONDS, () -> { assertTrue(metrics.get("." + STORE_NAME + "--multiget_request.OccurrenceRate").value() > 0); }); - validateMultiGetMetrics(false, true, true, false); + validateMultiGetMetrics(false, true, true, false, false, 2); tearDown(); } } @@ -607,7 +683,41 @@ public void testBatchGetWithTimeoutV3() TestUtils.waitForNonDeterministicAssertion(routingLeakedRequestCleanupThresholdMS + 1, TimeUnit.SECONDS, () -> { assertTrue(metrics.get("." + STORE_NAME + "--multiget_request.OccurrenceRate").value() > 0); }); - validateMultiGetMetrics(false, true, true, false); + validateMultiGetMetrics(false, true, true, false, false, 2); + tearDown(); + } + } + + /** + * 1st batchGet(1 key) blocks one replica and 2nd batchGet(2 keys) returns value for only 1 key as the other + * route is blocked, so batchGet() return an exception. + */ + @Test(timeOut = TEST_TIMEOUT) + public void testBatchGetWithTimeoutV4() throws IOException { + long routingLeakedRequestCleanupThresholdMS = TimeUnit.SECONDS.toMillis(1); + try { + setUpClient(true, false, false, true, true, routingLeakedRequestCleanupThresholdMS); + batchGetRequestContext = new BatchGetRequestContext<>(); + statsAvroGenericStoreClient.batchGet(batchGetRequestContext, BATCH_GET_PARTIAL_KEYS_2).get(); + fail(); + } catch (Exception e) { + assertTrue(e.getMessage().endsWith("At least one route did not complete")); + + try { + batchGetRequestContext = new BatchGetRequestContext<>(); + statsAvroGenericStoreClient.batchGet(batchGetRequestContext, BATCH_GET_KEYS).get(); + fail(); + } catch (Exception e1) { + assertTrue(e1.getMessage().endsWith("Response was not complete")); + assertTrue(e1.getCause().getCause().getMessage().matches(".*No available route for.*")); + // wait for routingLeakedRequestCleanupThresholdMS for the metrics to be increased + metrics = getStats(clientConfig); + TestUtils.waitForNonDeterministicAssertion(routingLeakedRequestCleanupThresholdMS + 1, TimeUnit.SECONDS, () -> { + assertTrue(metrics.get("." + STORE_NAME + "--multiget_request.OccurrenceRate").value() > 0); + }); + validateMultiGetMetrics(false, true, true, false, true, 2, 1); + } + } finally { tearDown(); } } @@ -632,9 +742,9 @@ public void testStreamingBatchGetWithTimeoutV1() throws IOException, ExecutionEx // wait for routingLeakedRequestCleanupThresholdMS for the metrics to be increased metrics = getStats(clientConfig); TestUtils.waitForNonDeterministicAssertion(routingLeakedRequestCleanupThresholdMS + 1, TimeUnit.SECONDS, () -> { - assertTrue(metrics.get("." + STORE_NAME + "--multiget_request.OccurrenceRate").value() > 0); + assertTrue(metrics.get("." + STORE_NAME + "--multiget_streaming_request.OccurrenceRate").value() > 0); }); - validateMultiGetMetrics(false, true, true, false); + validateMultiGetMetrics(false, true, true, true, false, 2); tearDown(); } } @@ -660,9 +770,9 @@ public void testStreamingBatchGetWithTimeoutV2() // wait for routingLeakedRequestCleanupThresholdMS for the metrics to be increased metrics = getStats(clientConfig); TestUtils.waitForNonDeterministicAssertion(routingLeakedRequestCleanupThresholdMS + 1, TimeUnit.SECONDS, () -> { - assertTrue(metrics.get("." + STORE_NAME + "--multiget_request.OccurrenceRate").value() > 0); + assertTrue(metrics.get("." + STORE_NAME + "--multiget_streaming_request.OccurrenceRate").value() > 0); }); - validateMultiGetMetrics(false, true, true, false); + validateMultiGetMetrics(false, true, true, true, false, 2); tearDown(); } } @@ -689,9 +799,9 @@ public void testStreamingBatchGetWithTimeoutV3() // wait for routingLeakedRequestCleanupThresholdMS for the metrics to be increased metrics = getStats(clientConfig); TestUtils.waitForNonDeterministicAssertion(routingLeakedRequestCleanupThresholdMS + 1, TimeUnit.SECONDS, () -> { - assertTrue(metrics.get("." + STORE_NAME + "--multiget_request.OccurrenceRate").value() > 0); + assertTrue(metrics.get("." + STORE_NAME + "--multiget_streaming_request.OccurrenceRate").value() > 0); }); - validateMultiGetMetrics(false, true, true, false); + validateMultiGetMetrics(false, true, true, true, false, 2); } finally { tearDown(); } @@ -712,7 +822,13 @@ public void testBatchGetToUnreachableClient(boolean useStreamingBatchGetAsDefaul } else { assertTrue(e.getMessage().endsWith("http status: 410, Request timed out")); } - validateMultiGetMetrics(false, false, useStreamingBatchGetAsDefault, false); + validateMultiGetMetrics( + false, + false, + useStreamingBatchGetAsDefault, + false, + false, + useStreamingBatchGetAsDefault ? 2 : 1); try { // the second batchGet is not going to find any routes (as the instances @@ -721,16 +837,163 @@ public void testBatchGetToUnreachableClient(boolean useStreamingBatchGetAsDefaul statsAvroGenericStoreClient.batchGet(batchGetRequestContext, BATCH_GET_KEYS).get(); fail(); } catch (Exception e1) { - System.out.println("Exception is: " + e1.getMessage()); if (useStreamingBatchGetAsDefault) { - assertTrue(e1.getMessage().endsWith("Response was not complete")); + assertTrue(e1.getMessage().endsWith("At least one route did not complete")); + assertTrue(e1.getCause().getCause().getMessage().matches(".*No available route for store.*")); } else { assertTrue(e1.getMessage().matches(".*No available route for store.*")); } - validateMultiGetMetrics(false, false, useStreamingBatchGetAsDefault, true); + validateMultiGetMetrics( + false, + false, + useStreamingBatchGetAsDefault, + false, + true, + useStreamingBatchGetAsDefault ? 2 : 1); } } finally { tearDown(); } } + + /** + * 1. 1st streamingBatchGet(2 keys) will result in both the routes marked as + * blocked(routingPendingRequestCounterInstanceBlockThreshold is 1). + * 2. 2nd streamingBatchGet(2 keys) will result in both route getting no replica + * found leading to exception. + */ + @Test(timeOut = TEST_TIMEOUT) + public void testStreamingBatchGetToUnreachableClient() throws IOException { + try { + setUpClient(true, false, false, false, false, TimeUnit.SECONDS.toMillis(1)); + batchGetRequestContext = new BatchGetRequestContext<>(); + statsAvroGenericStoreClient.streamingBatchGet(batchGetRequestContext, BATCH_GET_KEYS).get(); + fail(); + } catch (Exception e) { + // First batchGet fails with unreachable host after timeout and this adds the hosts + // as blocked due to setRoutingPendingRequestCounterInstanceBlockThreshold(1) + assertTrue(e.getMessage().endsWith("At least one route did not complete")); + validateMultiGetMetrics(false, false, true, true, false, 2); + + try { + // the second batchGet is not going to find any routes (as the instances + // are blocked) and fail instantly + batchGetRequestContext = new BatchGetRequestContext<>(); + statsAvroGenericStoreClient.streamingBatchGet(batchGetRequestContext, BATCH_GET_KEYS).get(); + fail(); + } catch (Exception e1) { + assertTrue(e1.getMessage().endsWith("At least one route did not complete")); + validateMultiGetMetrics(false, false, true, true, true, 2); + } + } finally { + tearDown(); + } + } + + /** + * 1. first streamingBatchGet() with 1 key getting timed out leading to the route getting + * blocked (routingPendingRequestCounterInstanceBlockThreshold = 1) + * 2. second streamingBatchGet() with 2 keys (one key (same as above) failing with no available + * replica and one key getting timed out) => exception + */ + @Test(timeOut = TEST_TIMEOUT) + public void testStreamingBatchGetToUnreachableClientV1() throws IOException { + try { + setUpClient(true, false, false, false, false, TimeUnit.SECONDS.toMillis(1)); + batchGetRequestContext = new BatchGetRequestContext<>(); + statsAvroGenericStoreClient.streamingBatchGet(batchGetRequestContext, BATCH_GET_PARTIAL_KEYS_1).get(); + fail(); + } catch (Exception e) { + // First batchGet fails with unreachable host after timeout and this adds the hosts + // as blocked due to setRoutingPendingRequestCounterInstanceBlockThreshold(1) + assertTrue(e.getMessage().endsWith("At least one route did not complete")); + validateMultiGetMetrics(false, false, true, true, false, 1); + + try { + batchGetRequestContext = new BatchGetRequestContext<>(); + statsAvroGenericStoreClient.streamingBatchGet(batchGetRequestContext, BATCH_GET_KEYS).get(); + fail(); + } catch (Exception e1) { + assertTrue(e1.getMessage().endsWith("At least one route did not complete")); + validateMultiGetMetrics(false, false, true, true, true, 2); + } + } finally { + tearDown(); + } + } + + /** + * 1. first streamingBatchGet() with 1 key getting timed out leading to the route getting + * blocked (routingPendingRequestCounterInstanceBlockThreshold = 1) + * 2. second streamingBatchGet() with 2 keys (one key (same as above) failing with no available + * replica and one key returning value) => no exception + */ + @Test(timeOut = TEST_TIMEOUT) + public void testStreamingBatchGetToUnreachableClientV2() + throws IOException, ExecutionException, InterruptedException { + try { + setUpClient(true, false, false, true, true, TimeUnit.SECONDS.toMillis(1)); + batchGetRequestContext = new BatchGetRequestContext<>(); + statsAvroGenericStoreClient.streamingBatchGet(batchGetRequestContext, BATCH_GET_PARTIAL_KEYS_2).get(); + fail(); + } catch (Exception e) { + // First batchGet fails with unreachable host after timeout and this adds the hosts + // as blocked due to setRoutingPendingRequestCounterInstanceBlockThreshold(1) + assertTrue(e.getMessage().endsWith("At least one route did not complete")); + validateMultiGetMetrics(false, false, true, true, false, 1); + batchGetRequestContext = new BatchGetRequestContext<>(); + + CompletableFuture> future = + statsAvroGenericStoreClient.streamingBatchGet(batchGetRequestContext, BATCH_GET_KEYS); + VeniceResponseMap value = future.get(); + assertFalse(value.isFullResponse()); + // TODO metric validation: 1st get increments unhealthy metrics and the second get increments healthy + // metrics + // validateMultiGetMetrics(true, true, true, true, 2); + } finally { + tearDown(); + } + } + + /** + * same as testStreamingBatchGetToUnreachableClientV2: transportClientThrowsPartialException instead of + * transportClientPartialIncomplete. + */ + @Test(timeOut = TEST_TIMEOUT) + public void testStreamingBatchGetToUnreachableClientV3() throws IOException { + try { + setUpClient(true, false, true, false, true, TimeUnit.SECONDS.toMillis(1)); + batchGetRequestContext = new BatchGetRequestContext<>(); + statsAvroGenericStoreClient.streamingBatchGet(batchGetRequestContext, BATCH_GET_KEYS).get(); + fail(); + } catch (Exception e) { + // First batchGet fails with unreachable host after timeout and this adds the hosts + // as blocked due to setRoutingPendingRequestCounterInstanceBlockThreshold(1) + assertTrue(e.getMessage().endsWith("At least one route did not complete")); + validateMultiGetMetrics(false, true, true, true, false, 2); + } finally { + tearDown(); + } + } + + /** + * same as testStreamingBatchGetToUnreachableClientV3 but the transport mock throws exception + * for both the routes. + */ + @Test(timeOut = TEST_TIMEOUT) + public void testStreamingBatchGetToUnreachableClientV4() throws IOException { + try { + setUpClient(true, true, false, false, true, TimeUnit.SECONDS.toMillis(1)); + batchGetRequestContext = new BatchGetRequestContext<>(); + statsAvroGenericStoreClient.streamingBatchGet(batchGetRequestContext, BATCH_GET_KEYS).get(); + fail(); + } catch (Exception e) { + // First batchGet fails with unreachable host after timeout and this adds the hosts + // as blocked due to setRoutingPendingRequestCounterInstanceBlockThreshold(1) + assertTrue(e.getMessage().endsWith("At least one route did not complete")); + validateMultiGetMetrics(false, false, true, true, false, 2); + } finally { + tearDown(); + } + } } diff --git a/clients/venice-client/src/test/java/com/linkedin/venice/fastclient/RetriableAvroGenericStoreClientTest.java b/clients/venice-client/src/test/java/com/linkedin/venice/fastclient/RetriableAvroGenericStoreClientTest.java index 985454c2e0..1cd7786180 100644 --- a/clients/venice-client/src/test/java/com/linkedin/venice/fastclient/RetriableAvroGenericStoreClientTest.java +++ b/clients/venice-client/src/test/java/com/linkedin/venice/fastclient/RetriableAvroGenericStoreClientTest.java @@ -32,6 +32,7 @@ import java.util.concurrent.TimeUnit; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @@ -59,6 +60,22 @@ public class RetriableAvroGenericStoreClientTest { private StatsAvroGenericStoreClient statsAvroGenericStoreClient; private Map metrics; + @DataProvider(name = "FastClient-Single-Get-MultiGet-And-Streaming-MultiGet") + public Object[][] twoBoolean() { + return DataProviderUtils.allPermutationGenerator((permutation) -> { + boolean batchGet = (boolean) permutation[0]; + boolean streamingBatchGet = (boolean) permutation[1]; + if (!batchGet) { + if (streamingBatchGet) { + return false; + } + } + return true; + }, + DataProviderUtils.BOOLEAN, // batchGet + DataProviderUtils.BOOLEAN); // streamingBatchGet + } + @BeforeClass public void setUp() { timeoutProcessor = new TimeoutProcessor(null, true, 1); @@ -227,10 +244,10 @@ private void testSingleGetAndValidateMetrics( } } - validateMetrics(false, errorRetry, longTailRetry, retryWin); + validateMetrics(false, false, errorRetry, longTailRetry, retryWin); } - private void testBatchGetAndvalidateMetrics( + private void testBatchGetAndValidateMetrics( boolean bothOriginalAndRetryFails, boolean longTailRetry, boolean retryWin, @@ -255,7 +272,36 @@ private void testBatchGetAndvalidateMetrics( } } - validateMetrics(true, false, longTailRetry, retryWin); + validateMetrics(true, false, false, longTailRetry, retryWin); + } + + private void testStreamingBatchGetAndValidateMetrics( + boolean bothOriginalAndRetryFails, + boolean longTailRetry, + boolean retryWin, + boolean keyNotFound) throws ExecutionException, InterruptedException { + batchGetRequestContext = new BatchGetRequestContext<>(); + try { + Map value = + (Map) statsAvroGenericStoreClient.streamingBatchGet(batchGetRequestContext, BATCH_GET_KEYS) + .get(); + + if (bothOriginalAndRetryFails) { + fail("An ExecutionException should be thrown here"); + } + + if (keyNotFound) { + assertEquals(value, BATCH_GET_VALUE_RESPONSE_KEY_NOT_FOUND_CASE); + } else { + assertEquals(value, BATCH_GET_VALUE_RESPONSE); + } + } catch (ExecutionException e) { + if (!bothOriginalAndRetryFails) { + throw e; + } + } + + validateMetrics(true, true, false, longTailRetry, retryWin); } /** @@ -265,11 +311,20 @@ private void testBatchGetAndvalidateMetrics( * @param longTailRetry request is retried because the original request is taking more time * @param retryWin retry request wins */ - private void validateMetrics(boolean batchGet, boolean errorRetry, boolean longTailRetry, boolean retryWin) { + private void validateMetrics( + boolean batchGet, + boolean streamingBatchGet, + boolean errorRetry, + boolean longTailRetry, + boolean retryWin) { metrics = getStats(clientConfig); - String metricsPrefix = "." + STORE_NAME + (batchGet ? "--multiget_" : "--"); + String metricsPrefix = + "." + STORE_NAME + (batchGet ? (streamingBatchGet ? "--multiget_streaming_" : "--multiget_") : "--"); double expectedKeyCount = batchGet ? 2.0 : 1.0; - assertTrue(metrics.get(metricsPrefix + "request.OccurrenceRate").value() > 0); + + TestUtils.waitForNonDeterministicAssertion(5, TimeUnit.SECONDS, () -> { + assertTrue(metrics.get(metricsPrefix + "request.OccurrenceRate").value() > 0); + }); assertEquals(metrics.get(metricsPrefix + "request_key_count.Max").value(), expectedKeyCount); if (errorRetry || longTailRetry) { @@ -350,15 +405,15 @@ public void testGetWithoutTriggeringLongTailRetry(boolean batchGet, boolean keyN if (!batchGet) { testSingleGetAndValidateMetrics(false, false, false, false, keyNotFound); } else { - testBatchGetAndvalidateMetrics(false, false, false, keyNotFound); + testBatchGetAndValidateMetrics(false, false, false, keyNotFound); } } /** * Original request latency is higher than retry threshold, but still faster than retry request */ - @Test(dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class, timeOut = TEST_TIMEOUT) - public void testGetWithTriggeringLongTailRetryAndOriginalWins(boolean batchGet) + @Test(dataProvider = "FastClient-Single-Get-MultiGet-And-Streaming-MultiGet", timeOut = TEST_TIMEOUT) + public void testGetWithTriggeringLongTailRetryAndOriginalWins(boolean batchGet, boolean streamingBatchGet) throws ExecutionException, InterruptedException { clientConfigBuilder.setMetricsRepository(new MetricsRepository()); clientConfig = clientConfigBuilder.build(); @@ -375,7 +430,11 @@ public void testGetWithTriggeringLongTailRetryAndOriginalWins(boolean batchGet) if (!batchGet) { testSingleGetAndValidateMetrics(false, false, true, false, false); } else { - testBatchGetAndvalidateMetrics(false, true, false, false); + if (streamingBatchGet) { + testStreamingBatchGetAndValidateMetrics(false, true, false, false); + } else { + testBatchGetAndValidateMetrics(false, true, false, false); + } } } @@ -400,15 +459,15 @@ public void testGetWithTriggeringLongTailRetryAndRetryWins(boolean batchGet, boo if (!batchGet) { testSingleGetAndValidateMetrics(false, false, true, true, keyNotFound); } else { - testBatchGetAndvalidateMetrics(false, true, true, keyNotFound); + testBatchGetAndValidateMetrics(false, true, true, keyNotFound); } } /** * Original request fails and retry succeeds. */ - @Test(dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class, timeOut = TEST_TIMEOUT) - public void testGetWithTriggeringErrorRetryAndRetryWins(boolean batchGet) + @Test(dataProvider = "FastClient-Single-Get-MultiGet-And-Streaming-MultiGet", timeOut = TEST_TIMEOUT) + public void testGetWithTriggeringErrorRetryAndRetryWins(boolean batchGet, boolean streamingBatchGet) throws ExecutionException, InterruptedException { clientConfigBuilder.setMetricsRepository(new MetricsRepository()); clientConfig = clientConfigBuilder.build(); @@ -419,15 +478,19 @@ public void testGetWithTriggeringErrorRetryAndRetryWins(boolean batchGet) if (!batchGet) { testSingleGetAndValidateMetrics(false, true, false, true, false); } else { - testBatchGetAndvalidateMetrics(false, true, true, false); + if (streamingBatchGet) { + testStreamingBatchGetAndValidateMetrics(false, true, true, false); + } else { + testBatchGetAndValidateMetrics(false, true, true, false); + } } } /** * Original request latency exceeds the retry threshold but succeeds and the retry fails. */ - @Test(dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class, timeOut = TEST_TIMEOUT) - public void testGetWithTriggeringLongTailRetryAndRetryFails(boolean batchGet) + @Test(dataProvider = "FastClient-Single-Get-MultiGet-And-Streaming-MultiGet", timeOut = TEST_TIMEOUT) + public void testGetWithTriggeringLongTailRetryAndRetryFails(boolean batchGet, boolean streamingBatchGet) throws ExecutionException, InterruptedException { clientConfigBuilder.setMetricsRepository(new MetricsRepository()); clientConfig = clientConfigBuilder.build(); @@ -438,15 +501,19 @@ public void testGetWithTriggeringLongTailRetryAndRetryFails(boolean batchGet) if (!batchGet) { testSingleGetAndValidateMetrics(false, false, true, false, false); } else { - testBatchGetAndvalidateMetrics(false, true, false, false); + if (streamingBatchGet) { + testStreamingBatchGetAndValidateMetrics(false, true, false, false); + } else { + testBatchGetAndValidateMetrics(false, true, false, false); + } } } /** * Original request latency exceeds the retry threshold, and both the original request and the retry fails. */ - @Test(dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class, timeOut = TEST_TIMEOUT) - public void testGetWithTriggeringLongTailRetryAndBothFailsV1(boolean batchGet) + @Test(dataProvider = "FastClient-Single-Get-MultiGet-And-Streaming-MultiGet", timeOut = TEST_TIMEOUT) + public void testGetWithTriggeringLongTailRetryAndBothFailsV1(boolean batchGet, boolean streamingBatchGet) throws InterruptedException, ExecutionException { clientConfigBuilder.setMetricsRepository(new MetricsRepository()); clientConfig = clientConfigBuilder.build(); @@ -463,15 +530,19 @@ public void testGetWithTriggeringLongTailRetryAndBothFailsV1(boolean batchGet) if (!batchGet) { testSingleGetAndValidateMetrics(true, false, true, false, false); } else { - testBatchGetAndvalidateMetrics(true, true, false, false); + if (streamingBatchGet) { + testStreamingBatchGetAndValidateMetrics(true, true, false, false); + } else { + testBatchGetAndValidateMetrics(true, true, false, false); + } } } /** * Original request latency is lower than the retry threshold, and both the original request and the retry fails. */ - @Test(dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class, timeOut = TEST_TIMEOUT) - public void testGetWithTriggeringLongTailRetryAndBothFailsV2(boolean batchGet) + @Test(dataProvider = "FastClient-Single-Get-MultiGet-And-Streaming-MultiGet", timeOut = TEST_TIMEOUT) + public void testGetWithTriggeringLongTailRetryAndBothFailsV2(boolean batchGet, boolean streamingBatchGet) throws InterruptedException, ExecutionException { clientConfigBuilder.setMetricsRepository(new MetricsRepository()); clientConfig = clientConfigBuilder.build(); @@ -488,7 +559,11 @@ public void testGetWithTriggeringLongTailRetryAndBothFailsV2(boolean batchGet) if (!batchGet) { testSingleGetAndValidateMetrics(true, true, false, false, false); } else { - testBatchGetAndvalidateMetrics(true, true, false, false); + if (streamingBatchGet) { + testStreamingBatchGetAndValidateMetrics(true, true, false, false); + } else { + testBatchGetAndValidateMetrics(true, true, false, false); + } } } } diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/fastclient/AvroStoreClientEndToEndTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/fastclient/AvroStoreClientEndToEndTest.java index 37c8c23546..56023ca447 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/fastclient/AvroStoreClientEndToEndTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/fastclient/AvroStoreClientEndToEndTest.java @@ -1,5 +1,6 @@ package com.linkedin.venice.fastclient; +import static com.linkedin.venice.utils.Time.MS_PER_SECOND; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; @@ -228,13 +229,14 @@ protected void runTest( } } - @Test(dataProvider = "FastClient-Five-Boolean-A-Number-Store-Metadata-Fetch-Mode", timeOut = TIME_OUT) + @Test(dataProvider = "FastClient-Six-Boolean-A-Number-Store-Metadata-Fetch-Mode", timeOut = TIME_OUT) public void testFastClientGet( boolean dualRead, boolean speculativeQueryEnabled, boolean batchGet, boolean useStreamingBatchGetAsDefault, boolean enableGrpc, + boolean retryEnabled, int batchGetKeySize, StoreMetadataFetchMode storeMetadataFetchMode) throws Exception { ClientConfig.ClientConfigBuilder clientConfigBuilder = @@ -256,6 +258,15 @@ public void testFastClientGet( setUpGrpcFastClient(clientConfigBuilder); } + if (retryEnabled) { + // enable retry to test the code path: to mimic retry in integration tests + // can be non-deterministic, so setting big retry threshold to not actually retry + clientConfigBuilder.setLongTailRetryEnabledForSingleGet(true) + .setLongTailRetryThresholdForSingleGetInMicroSeconds(TIME_OUT * MS_PER_SECOND) + .setLongTailRetryEnabledForBatchGet(true) + .setLongTailRetryThresholdForBatchGetInMicroSeconds(TIME_OUT * MS_PER_SECOND); + } + // dualRead needs thinClient AvroGenericStoreClient genericThinClient = null; AvroSpecificStoreClient specificThinClient = null; @@ -295,6 +306,7 @@ public void testFastClientGet( fastClientStatsValidation = metricsRepository -> validateBatchGetMetrics( metricsRepository, useStreamingBatchGetAsDefault, + false, batchGetKeySize, batchGetKeySize, false); @@ -407,6 +419,7 @@ public void testFastClientGetWithDifferentHTTPVariants( fastClientStatsValidation = metricsRepository -> validateBatchGetMetrics( metricsRepository, true, // testing batch get with useStreamingBatchGetAsDefault as true + false, recordCnt, recordCnt, false); @@ -438,7 +451,7 @@ public void testFastClientWithLongTailRetry(boolean batchGet) throws Exception { clientConfigBuilder.setLongTailRetryEnabledForBatchGet(true) .setLongTailRetryThresholdForBatchGetInMicroSeconds(1); fastClientStatsValidation = - metricsRepository -> validateBatchGetMetrics(metricsRepository, true, recordCnt, recordCnt, true); + metricsRepository -> validateBatchGetMetrics(metricsRepository, true, false, recordCnt, recordCnt, true); } else { clientConfigBuilder.setLongTailRetryEnabledForSingleGet(true) .setLongTailRetryThresholdForSingleGetInMicroSeconds(1); diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/fastclient/AvroStoreClientGzipEndToEndTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/fastclient/AvroStoreClientGzipEndToEndTest.java index 6835edbfb1..160b8acfa7 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/fastclient/AvroStoreClientGzipEndToEndTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/fastclient/AvroStoreClientGzipEndToEndTest.java @@ -18,13 +18,21 @@ public class AvroStoreClientGzipEndToEndTest extends AvroStoreClientEndToEndTest { @Override - @DataProvider(name = "FastClient-Five-Boolean-A-Number-Store-Metadata-Fetch-Mode") - public Object[][] fiveBooleanANumberStoreMetadataFetchMode() { + @DataProvider(name = "FastClient-Six-Boolean-A-Number-Store-Metadata-Fetch-Mode") + public Object[][] sixBooleanANumberStoreMetadataFetchMode() { return DataProviderUtils.allPermutationGenerator((permutation) -> { + boolean speculativeQueryEnabled = (boolean) permutation[1]; + if (speculativeQueryEnabled) { + return false; + } boolean batchGet = (boolean) permutation[2]; boolean useStreamingBatchGetAsDefault = (boolean) permutation[3]; - int batchGetKeySize = (int) permutation[5]; - StoreMetadataFetchMode storeMetadataFetchMode = (StoreMetadataFetchMode) permutation[6]; + boolean retryEnabled = (boolean) permutation[5]; + if (retryEnabled) { + return false; + } + int batchGetKeySize = (int) permutation[6]; + StoreMetadataFetchMode storeMetadataFetchMode = (StoreMetadataFetchMode) permutation[7]; if (!batchGet) { if (useStreamingBatchGetAsDefault || batchGetKeySize != (int) BATCH_GET_KEY_SIZE.get(0)) { // these parameters are related only to batchGet, so just allowing 1 set @@ -32,7 +40,8 @@ public Object[][] fiveBooleanANumberStoreMetadataFetchMode() { return false; } } - if (storeMetadataFetchMode == StoreMetadataFetchMode.DA_VINCI_CLIENT_BASED_METADATA) { + + if (storeMetadataFetchMode != StoreMetadataFetchMode.SERVER_BASED_METADATA) { return false; } return true; @@ -42,6 +51,7 @@ public Object[][] fiveBooleanANumberStoreMetadataFetchMode() { DataProviderUtils.BOOLEAN, // batchGet DataProviderUtils.BOOLEAN_TRUE, // useStreamingBatchGetAsDefault DataProviderUtils.BOOLEAN, // enableGrpc + DataProviderUtils.BOOLEAN, // retryEnabled BATCH_GET_KEY_SIZE.toArray(), // batchGetKeySize STORE_METADATA_FETCH_MODES); // storeMetadataFetchMode } diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/fastclient/AvroStoreClientZstdEndToEndTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/fastclient/AvroStoreClientZstdEndToEndTest.java index 95667931c4..41f23ea7da 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/fastclient/AvroStoreClientZstdEndToEndTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/fastclient/AvroStoreClientZstdEndToEndTest.java @@ -22,13 +22,21 @@ public class AvroStoreClientZstdEndToEndTest extends AvroStoreClientEndToEndTest { @Override - @DataProvider(name = "FastClient-Five-Boolean-A-Number-Store-Metadata-Fetch-Mode") - public Object[][] fiveBooleanANumberStoreMetadataFetchMode() { + @DataProvider(name = "FastClient-Six-Boolean-A-Number-Store-Metadata-Fetch-Mode") + public Object[][] sixBooleanANumberStoreMetadataFetchMode() { return DataProviderUtils.allPermutationGenerator((permutation) -> { + boolean speculativeQueryEnabled = (boolean) permutation[1]; + if (speculativeQueryEnabled) { + return false; + } boolean batchGet = (boolean) permutation[2]; boolean useStreamingBatchGetAsDefault = (boolean) permutation[3]; - int batchGetKeySize = (int) permutation[5]; - StoreMetadataFetchMode storeMetadataFetchMode = (StoreMetadataFetchMode) permutation[6]; + boolean retryEnabled = (boolean) permutation[5]; + if (retryEnabled) { + return false; + } + int batchGetKeySize = (int) permutation[6]; + StoreMetadataFetchMode storeMetadataFetchMode = (StoreMetadataFetchMode) permutation[7]; if (!batchGet) { if (useStreamingBatchGetAsDefault || batchGetKeySize != (int) BATCH_GET_KEY_SIZE.get(0)) { // these parameters are related only to batchGet, so just allowing 1 set @@ -36,7 +44,7 @@ public Object[][] fiveBooleanANumberStoreMetadataFetchMode() { return false; } } - if (storeMetadataFetchMode == StoreMetadataFetchMode.DA_VINCI_CLIENT_BASED_METADATA) { + if (storeMetadataFetchMode != StoreMetadataFetchMode.SERVER_BASED_METADATA) { return false; } return true; @@ -46,6 +54,7 @@ public Object[][] fiveBooleanANumberStoreMetadataFetchMode() { DataProviderUtils.BOOLEAN, // batchGet DataProviderUtils.BOOLEAN_TRUE, // useStreamingBatchGetAsDefault DataProviderUtils.BOOLEAN, // enableGrpc + DataProviderUtils.BOOLEAN, // retryEnabled BATCH_GET_KEY_SIZE.toArray(), // batchGetKeySize STORE_METADATA_FETCH_MODES); // storeMetadataFetchMode } diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/fastclient/BatchGetAvroStoreClientTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/fastclient/BatchGetAvroStoreClientTest.java index e3b0a9b99e..8e8d8a7cc5 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/fastclient/BatchGetAvroStoreClientTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/fastclient/BatchGetAvroStoreClientTest.java @@ -1,6 +1,6 @@ package com.linkedin.venice.fastclient; -import static com.linkedin.venice.utils.Time.US_PER_SECOND; +import static com.linkedin.venice.utils.Time.MS_PER_SECOND; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; @@ -135,10 +135,10 @@ public void testBatchGetGenericClient( .setUseStreamingBatchGetAsDefault(useStreamingBatchGetAsDefault); if (retryEnabled) { - // enable retry to test the code path: mimic retry in integration tests - // be non-deterministic, so setting big retry threshold to not actually retry + // enable retry to test the code path: to mimic retry in integration tests + // can be non-deterministic, so setting big retry threshold to not actually retry clientConfigBuilder.setLongTailRetryEnabledForBatchGet(true) - .setLongTailRetryThresholdForBatchGetInMicroSeconds(10 * US_PER_SECOND); + .setLongTailRetryThresholdForBatchGetInMicroSeconds(TIME_OUT * MS_PER_SECOND); } MetricsRepository metricsRepository = new MetricsRepository(); @@ -158,7 +158,7 @@ public void testBatchGetGenericClient( assertEquals(value.get(VALUE_FIELD_NAME), i); } - validateBatchGetMetrics(metricsRepository, useStreamingBatchGetAsDefault, recordCnt + 1, recordCnt, false); + validateBatchGetMetrics(metricsRepository, useStreamingBatchGetAsDefault, false, recordCnt + 1, recordCnt, false); FastClientStats stats = clientConfig.getStats(RequestType.MULTI_GET); LOGGER.info("STATS: {}", stats.buildSensorStatSummary("multiget_healthy_request_latency")); @@ -195,20 +195,28 @@ public void testBatchGetSpecificClient( assertEquals(value.get(VALUE_FIELD_NAME), i); } - validateBatchGetMetrics(metricsRepository, useStreamingBatchGetAsDefault, recordCnt, recordCnt, false); + validateBatchGetMetrics(metricsRepository, useStreamingBatchGetAsDefault, false, recordCnt, recordCnt, false); specificFastClient.close(); printAllStats(); } - @Test(dataProvider = "StoreMetadataFetchModes", timeOut = TIME_OUT) - public void testStreamingBatchGetGenericClient(StoreMetadataFetchMode storeMetadataFetchMode) throws Exception { + @Test(dataProvider = "Boolean-And-StoreMetadataFetchModes", timeOut = TIME_OUT) + public void testStreamingBatchGetGenericClient(boolean retryEnabled, StoreMetadataFetchMode storeMetadataFetchMode) + throws Exception { ClientConfig.ClientConfigBuilder clientConfigBuilder = new ClientConfig.ClientConfigBuilder<>().setStoreName(storeName) .setR2Client(r2Client) - .setSpeculativeQueryEnabled(true) + .setSpeculativeQueryEnabled(false) .setDualReadEnabled(false); + if (retryEnabled) { + // enable retry to test the code path: to mimic retry in integration tests + // can be non-deterministic, so setting big retry threshold to not actually retry + clientConfigBuilder.setLongTailRetryEnabledForBatchGet(true) + .setLongTailRetryThresholdForBatchGetInMicroSeconds(TIME_OUT * MS_PER_SECOND); + } + MetricsRepository metricsRepository = new MetricsRepository(); AvroGenericStoreClient genericFastClient = getGenericFastClient(clientConfigBuilder, metricsRepository, storeMetadataFetchMode); @@ -247,18 +255,26 @@ public void testStreamingBatchGetGenericClient(StoreMetadataFetchMode storeMetad 1, "Incorrect non existing key size . Expected 1 got " + veniceResponseMap.getNonExistingKeys().size()); - validateBatchGetMetrics(metricsRepository, true, recordCnt + 1, recordCnt, false); + validateBatchGetMetrics(metricsRepository, true, true, recordCnt + 1, recordCnt, false); } - @Test(dataProvider = "StoreMetadataFetchModes", timeOut = TIME_OUT) - public void testStreamingBatchGetWithCallbackGenericClient(StoreMetadataFetchMode storeMetadataFetchMode) - throws Exception { + @Test(dataProvider = "Boolean-And-StoreMetadataFetchModes", timeOut = TIME_OUT) + public void testStreamingBatchGetWithCallbackGenericClient( + boolean retryEnabled, + StoreMetadataFetchMode storeMetadataFetchMode) throws Exception { ClientConfig.ClientConfigBuilder clientConfigBuilder = new ClientConfig.ClientConfigBuilder<>().setStoreName(storeName) .setR2Client(r2Client) - .setSpeculativeQueryEnabled(true) + .setSpeculativeQueryEnabled(false) .setDualReadEnabled(false); + if (retryEnabled) { + // enable retry to test the code path: to mimic retry in integration tests + // can be non-deterministic, so setting big retry threshold to not actually retry + clientConfigBuilder.setLongTailRetryEnabledForBatchGet(true) + .setLongTailRetryThresholdForBatchGetInMicroSeconds(TIME_OUT * MS_PER_SECOND); + } + MetricsRepository metricsRepository = new MetricsRepository(); AvroGenericStoreClient genericFastClient = getGenericFastClient(clientConfigBuilder, metricsRepository, storeMetadataFetchMode); @@ -323,7 +339,7 @@ public void onCompletion(Optional exception) { "STATS: latency -> {}", stats.buildSensorStatSummary("multiget_healthy_request_latency", "99thPercentile")); - validateBatchGetMetrics(metricsRepository, true, recordCnt + 1, recordCnt, false); + validateBatchGetMetrics(metricsRepository, true, true, recordCnt + 1, recordCnt, false); printAllStats(); } } diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/fastclient/utils/AbstractClientEndToEndSetup.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/fastclient/utils/AbstractClientEndToEndSetup.java index 5df52fa030..03102f0f81 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/fastclient/utils/AbstractClientEndToEndSetup.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/fastclient/utils/AbstractClientEndToEndSetup.java @@ -109,7 +109,7 @@ public abstract class AbstractClientEndToEndSetup { protected ClientConfig clientConfig; - protected static final long TIME_OUT = 60 * Time.MS_PER_SECOND; + protected static final int TIME_OUT = 60 * Time.MS_PER_SECOND; protected static final String KEY_SCHEMA_STR = "\"string\""; protected static final String VALUE_FIELD_NAME = "int_field"; protected static final String VALUE_SCHEMA_STR = "{\n" + "\"type\": \"record\",\n" @@ -133,12 +133,15 @@ public abstract class AbstractClientEndToEndSetup { */ protected static final ImmutableList BATCH_GET_KEY_SIZE = ImmutableList.of(2, recordCnt); - @DataProvider(name = "FastClient-Five-Boolean-A-Number-Store-Metadata-Fetch-Mode") - public Object[][] fiveBooleanANumberStoreMetadataFetchMode() { + @DataProvider(name = "FastClient-Six-Boolean-A-Number-Store-Metadata-Fetch-Mode") + public Object[][] sixBooleanANumberStoreMetadataFetchMode() { return DataProviderUtils.allPermutationGenerator((permutation) -> { + boolean speculativeQueryEnabled = (boolean) permutation[1]; boolean batchGet = (boolean) permutation[2]; boolean useStreamingBatchGetAsDefault = (boolean) permutation[3]; - int batchGetKeySize = (int) permutation[5]; + boolean retryEnabled = (boolean) permutation[5]; + int batchGetKeySize = (int) permutation[6]; + StoreMetadataFetchMode storeMetadataFetchMode = (StoreMetadataFetchMode) permutation[7]; if (!batchGet) { if (useStreamingBatchGetAsDefault || batchGetKeySize != (int) BATCH_GET_KEY_SIZE.get(0)) { // these parameters are related only to batchGet, so just allowing 1 set @@ -146,6 +149,14 @@ public Object[][] fiveBooleanANumberStoreMetadataFetchMode() { return false; } } + if (storeMetadataFetchMode != StoreMetadataFetchMode.SERVER_BASED_METADATA) { + if (retryEnabled || speculativeQueryEnabled) { + return false; + } + } + if (retryEnabled && speculativeQueryEnabled) { + return false; + } return true; }, DataProviderUtils.BOOLEAN, // dualRead @@ -153,6 +164,7 @@ public Object[][] fiveBooleanANumberStoreMetadataFetchMode() { DataProviderUtils.BOOLEAN, // batchGet DataProviderUtils.BOOLEAN, // useStreamingBatchGetAsDefault DataProviderUtils.BOOLEAN, // enableGrpc + DataProviderUtils.BOOLEAN, // retryEnabled BATCH_GET_KEY_SIZE.toArray(), // batchGetKeySize STORE_METADATA_FETCH_MODES); // storeMetadataFetchMode } @@ -203,6 +215,11 @@ public static Object[][] storeMetadataFetchModes() { return DataProviderUtils.allPermutationGenerator(STORE_METADATA_FETCH_MODES); } + @DataProvider(name = "Boolean-And-StoreMetadataFetchModes") + public static Object[][] booleanAndstoreMetadataFetchModes() { + return DataProviderUtils.allPermutationGenerator(DataProviderUtils.BOOLEAN, STORE_METADATA_FETCH_MODES); + } + @BeforeClass(alwaysRun = true) public void setUp() throws Exception { Utils.thisIsLocalhost(); @@ -500,16 +517,18 @@ protected AvroSpecificStoreClient getSpecificThinClient } protected void validateSingleGetMetrics(MetricsRepository metricsRepository, boolean retryEnabled) { - validateBatchGetMetrics(metricsRepository, false, 0, 0, retryEnabled); + validateBatchGetMetrics(metricsRepository, false, false, 0, 0, retryEnabled); } protected void validateBatchGetMetrics( MetricsRepository metricsRepository, boolean useStreamingBatchGetAsDefault, + boolean streamingBatchGetApi, int expectedBatchGetKeySizeMetricsCount, int expectedBatchGetKeySizeSuccessMetricsCount, boolean retryEnabled) { - String metricPrefix = "." + storeName + (useStreamingBatchGetAsDefault ? "--multiget_" : "--"); + String metricPrefix = "." + storeName + + (streamingBatchGetApi ? "--multiget_streaming_" : (useStreamingBatchGetAsDefault ? "--multiget_" : "--")); double keyCount = useStreamingBatchGetAsDefault ? expectedBatchGetKeySizeMetricsCount : 1; double successKeyCount = useStreamingBatchGetAsDefault ? expectedBatchGetKeySizeSuccessMetricsCount : 1; Map metrics = metricsRepository.metrics();