From d884953d3402a4b4d9486cae07edc02784c3f95e Mon Sep 17 00:00:00 2001 From: Manoj Nagarajan Date: Mon, 2 Oct 2023 10:07:46 -0700 Subject: [PATCH] [fc][test] Make retry work and add proper stats prefix for streamingBatchGet (#671) * Retry was not working for streamingBatchGet(keys) as the proper delegation was not happening in RetriableAvroGenericStoreClient.java. This change fixes this problem. * make streamingBatchGet use "--multiget_streaming_" rather than "--multiget_" as prefix for metrics: batchGet() using single get uses single get metrics, batchGet() using streaming implementation uses "--multiget_", streamingBatchGet uses "--multiget_streaming_" * return an exception if every key hits no replica found issue. Before this change, even though this case didn't increment successful key count metric and increment no replica found metric, it was still considering the request to healthy. After this change, this is considered unhealthy metric. Even after this change, if at least one of the routes find a replica and is successful, streamingBatchGet considers this a healthy request as this is a partial API. * Add additional unit and integration tests for streamingBatchGet and retry cases. --- .../DispatchingAvroGenericStoreClient.java | 28 +- .../RetriableAvroGenericStoreClient.java | 44 ++- .../StatsAvroGenericStoreClient.java | 14 +- .../BatchGetAvroStoreClientUnitTest.java | 2 +- ...DispatchingAvroGenericStoreClientTest.java | 347 +++++++++++++++--- .../RetriableAvroGenericStoreClientTest.java | 121 ++++-- .../AvroStoreClientEndToEndTest.java | 17 +- .../AvroStoreClientGzipEndToEndTest.java | 20 +- .../AvroStoreClientZstdEndToEndTest.java | 19 +- .../BatchGetAvroStoreClientTest.java | 46 ++- .../utils/AbstractClientEndToEndSetup.java | 31 +- 11 files changed, 553 insertions(+), 136 deletions(-) diff --git a/clients/venice-client/src/main/java/com/linkedin/venice/fastclient/DispatchingAvroGenericStoreClient.java b/clients/venice-client/src/main/java/com/linkedin/venice/fastclient/DispatchingAvroGenericStoreClient.java index 9ec8fae459..75bc934a5c 100644 --- a/clients/venice-client/src/main/java/com/linkedin/venice/fastclient/DispatchingAvroGenericStoreClient.java +++ b/clients/venice-client/src/main/java/com/linkedin/venice/fastclient/DispatchingAvroGenericStoreClient.java @@ -314,14 +314,10 @@ protected CompletableFuture> batchGet(BatchGetRequestContext req if (throwable != null) { responseFuture.completeExceptionally(throwable); } else if (!response.isFullResponse()) { - if (requestContext.getPartialResponseException().isPresent()) { - responseFuture.completeExceptionally( - new VeniceClientException( - "Response was not complete", - requestContext.getPartialResponseException().get())); - } else { - responseFuture.completeExceptionally(new VeniceClientException("Response was not complete")); - } + responseFuture.completeExceptionally( + new VeniceClientException( + "Response was not complete", + requestContext.getPartialResponseException().orElse(null))); } else { responseFuture.complete(response); } @@ -334,6 +330,7 @@ protected CompletableFuture> streamingBatchGet( BatchGetRequestContext requestContext, Set keys) throws VeniceClientException { verifyMetadataInitialized(); + int keySize = keys.size(); // keys that do not exist in the storage nodes Queue nonExistingKeys = new ConcurrentLinkedQueue<>(); VeniceConcurrentHashMap valueMap = new VeniceConcurrentHashMap<>(); @@ -358,7 +355,7 @@ public void onCompletion(Optional exception) { if (exception.isPresent()) { streamingResponseFuture.completeExceptionally(exception.get()); } else { - boolean isFullResponse = ((valueMap.size() + nonExistingKeys.size()) == keys.size()); + boolean isFullResponse = (valueMap.size() + nonExistingKeys.size()) == keySize; streamingResponseFuture.complete(new VeniceResponseMapImpl<>(valueMap, nonExistingKeys, isFullResponse)); } } @@ -393,9 +390,8 @@ protected void streamingBatchGet( * that exception will be passed to the aggregate future's next stages. */ CompletableFuture.allOf(requestContext.getAllRouteFutures().toArray(new CompletableFuture[0])) .whenComplete((response, throwable) -> { - if (throwable == null) { - callback.onCompletion(Optional.empty()); - } else { + if (throwable != null || (!keys.isEmpty() && requestContext.getAllRouteFutures().isEmpty())) { + // If there is an exception or if no partition has a healthy replica. // The exception to send to the client might be different. Get from the requestContext Throwable clientException = throwable; if (requestContext.getPartialResponseException().isPresent()) { @@ -403,6 +399,8 @@ protected void streamingBatchGet( } callback.onCompletion( Optional.of(new VeniceClientException("At least one route did not complete", clientException))); + } else { + callback.onCompletion(Optional.empty()); } }); } @@ -452,10 +450,10 @@ private void streamingBatchGetInternal( * an error */ requestContext.noAvailableReplica = true; String errorMessage = String.format( - "No route found for partitionId: %s, store: %s, version: %s", - partitionId, + "No available route for store: %s, version: %s, partitionId: %s", getStoreName(), - currentVersion); + currentVersion, + partitionId); LOGGER.error(errorMessage); requestContext.setPartialResponseException(new VeniceClientException(errorMessage)); } diff --git a/clients/venice-client/src/main/java/com/linkedin/venice/fastclient/RetriableAvroGenericStoreClient.java b/clients/venice-client/src/main/java/com/linkedin/venice/fastclient/RetriableAvroGenericStoreClient.java index dc58aa3240..f07c839b6d 100644 --- a/clients/venice-client/src/main/java/com/linkedin/venice/fastclient/RetriableAvroGenericStoreClient.java +++ b/clients/venice-client/src/main/java/com/linkedin/venice/fastclient/RetriableAvroGenericStoreClient.java @@ -175,7 +175,28 @@ protected CompletableFuture get(GetRequestContext requestContext, K key) thro */ protected CompletableFuture> batchGet(BatchGetRequestContext requestContext, Set keys) throws VeniceClientException { + CompletableFuture> responseFuture = new CompletableFuture<>(); + CompletableFuture> streamingResponseFuture = streamingBatchGet(requestContext, keys); + streamingResponseFuture.whenComplete((response, throwable) -> { + if (throwable != null) { + responseFuture.completeExceptionally(throwable); + } else if (!response.isFullResponse()) { + responseFuture.completeExceptionally( + new VeniceClientException( + "Response was not complete", + requestContext.getPartialResponseException().orElse(null))); + } else { + responseFuture.complete(response); + } + }); + return responseFuture; + } + + protected CompletableFuture> streamingBatchGet( + BatchGetRequestContext requestContext, + Set keys) throws VeniceClientException { // keys that do not exist in the storage nodes + int keysSize = keys.size(); Queue nonExistingKeys = new ConcurrentLinkedQueue<>(); VeniceConcurrentHashMap valueMap = new VeniceConcurrentHashMap<>(); CompletableFuture> streamingResponseFuture = new VeniceResponseCompletableFuture<>( @@ -199,29 +220,12 @@ public void onCompletion(Optional exception) { if (exception.isPresent()) { streamingResponseFuture.completeExceptionally(exception.get()); } else { - boolean isFullResponse = (valueMap.size() + nonExistingKeys.size()) == keys.size(); + boolean isFullResponse = (valueMap.size() + nonExistingKeys.size()) == keysSize; streamingResponseFuture.complete(new VeniceResponseMapImpl<>(valueMap, nonExistingKeys, isFullResponse)); } } }); - CompletableFuture> responseFuture = new CompletableFuture<>(); - streamingResponseFuture.whenComplete((response, throwable) -> { - if (throwable != null) { - responseFuture.completeExceptionally(throwable); - } else if (!response.isFullResponse()) { - if (requestContext.getPartialResponseException().isPresent()) { - responseFuture.completeExceptionally( - new VeniceClientException( - "Response was not complete", - requestContext.getPartialResponseException().get())); - } else { - responseFuture.completeExceptionally(new VeniceClientException("Response was not complete")); - } - } else { - responseFuture.complete(response); - } - }); - return responseFuture; + return streamingResponseFuture; } @Override @@ -301,7 +305,7 @@ public void streamingBatchGet( if (finalException == null) { callback.onCompletion(Optional.empty()); } else { - callback.onCompletion(Optional.of(new VeniceClientException("Request failed with exception ", finalException))); + callback.onCompletion(Optional.of(new VeniceClientException("Request failed with exception", finalException))); } }); } diff --git a/clients/venice-client/src/main/java/com/linkedin/venice/fastclient/StatsAvroGenericStoreClient.java b/clients/venice-client/src/main/java/com/linkedin/venice/fastclient/StatsAvroGenericStoreClient.java index c7591004a1..c04ac16844 100644 --- a/clients/venice-client/src/main/java/com/linkedin/venice/fastclient/StatsAvroGenericStoreClient.java +++ b/clients/venice-client/src/main/java/com/linkedin/venice/fastclient/StatsAvroGenericStoreClient.java @@ -31,6 +31,7 @@ public class StatsAvroGenericStoreClient extends DelegatingAvroStoreClient private final FastClientStats clientStatsForSingleGet; private final FastClientStats clientStatsForBatchGet; + private final FastClientStats clientStatsForStreamingBatchGet; private final ClusterStats clusterStats; private final int maxAllowedKeyCntInBatchGetReq; @@ -40,6 +41,7 @@ public StatsAvroGenericStoreClient(InternalAvroStoreClient delegate, Clien super(delegate); this.clientStatsForSingleGet = clientConfig.getStats(RequestType.SINGLE_GET); this.clientStatsForBatchGet = clientConfig.getStats(RequestType.MULTI_GET); + this.clientStatsForStreamingBatchGet = clientConfig.getStats(RequestType.MULTI_GET_STREAMING); this.clusterStats = clientConfig.getClusterStats(); this.maxAllowedKeyCntInBatchGetReq = clientConfig.getMaxAllowedKeyCntInBatchGetReq(); this.useStreamingBatchGetAsDefault = clientConfig.useStreamingBatchGetAsDefault(); @@ -121,7 +123,7 @@ protected void streamingBatchGet( requestContext, keys, new StatTrackingStreamingCallBack<>(callback, statFuture, requestContext)); - recordMetrics(requestContext, keys.size(), statFuture, startTimeInNS, clientStatsForBatchGet); + recordMetrics(requestContext, keys.size(), statFuture, startTimeInNS, clientStatsForStreamingBatchGet); } @Override @@ -130,7 +132,7 @@ protected CompletableFuture> streamingBatchGet( Set keys) { long startTimeInNS = System.nanoTime(); CompletableFuture> streamingBatchGetFuture = super.streamingBatchGet(requestContext, keys); - recordMetrics(requestContext, keys.size(), streamingBatchGetFuture, startTimeInNS, clientStatsForBatchGet); + recordMetrics(requestContext, keys.size(), streamingBatchGetFuture, startTimeInNS, clientStatsForStreamingBatchGet); return streamingBatchGetFuture; } @@ -148,6 +150,14 @@ private CompletableFuture recordMetrics( return AppTimeOutTrackingCompletableFuture.track(statFuture, clientStats); } + /** + * Metrics are incremented after one of the below cases + * 1. request is complete or + * 2. exception is thrown or + * 3. routingLeakedRequestCleanupThresholdMS is elapsed: In case of streamingBatchGet.get(timeout) returning + * partial response and this timeout happens after than and before the full response is returned, + * it will still raise a silent exception leading to the request being considered an unhealthy request. + */ private CompletableFuture recordRequestMetrics( RequestContext requestContext, int numberOfKeys, diff --git a/clients/venice-client/src/test/java/com/linkedin/venice/fastclient/BatchGetAvroStoreClientUnitTest.java b/clients/venice-client/src/test/java/com/linkedin/venice/fastclient/BatchGetAvroStoreClientUnitTest.java index 2a4534ec02..682dcc66ee 100644 --- a/clients/venice-client/src/test/java/com/linkedin/venice/fastclient/BatchGetAvroStoreClientUnitTest.java +++ b/clients/venice-client/src/test/java/com/linkedin/venice/fastclient/BatchGetAvroStoreClientUnitTest.java @@ -709,7 +709,7 @@ private void validateMetrics( double expectedNumberOfKeysToBeRetried, double expectedNumberOfKeysToBeRetriedSuccessfully) { Map metrics = getStats(client.getClientConfig()); - String metricPrefix = "." + client.UNIT_TEST_STORE_NAME + "--multiget_"; + String metricPrefix = "." + client.UNIT_TEST_STORE_NAME + "--multiget_streaming_"; if (totalNumberOfKeys > 0) { assertTrue(metrics.get(metricPrefix + "request.OccurrenceRate").value() > 0); assertEquals(metrics.get(metricPrefix + "request_key_count.Max").value(), totalNumberOfKeys); diff --git a/clients/venice-client/src/test/java/com/linkedin/venice/fastclient/DispatchingAvroGenericStoreClientTest.java b/clients/venice-client/src/test/java/com/linkedin/venice/fastclient/DispatchingAvroGenericStoreClientTest.java index cb13707861..e14880de78 100644 --- a/clients/venice-client/src/test/java/com/linkedin/venice/fastclient/DispatchingAvroGenericStoreClientTest.java +++ b/clients/venice-client/src/test/java/com/linkedin/venice/fastclient/DispatchingAvroGenericStoreClientTest.java @@ -266,42 +266,68 @@ private void validateRetryMetrics(boolean batchGet, String metricPrefix, boolean } private void validateSingleGetMetrics(boolean healthyRequest) { - validateMetrics(healthyRequest, false, false, false, false); + validateMetrics(healthyRequest, false, false, false, false, false, 1, 0); } private void validateMultiGetMetrics( boolean healthyRequest, boolean partialHealthyRequest, boolean useStreamingBatchGetAsDefault, - boolean noAvailableReplicas) { - validateMetrics(healthyRequest, partialHealthyRequest, true, useStreamingBatchGetAsDefault, noAvailableReplicas); + boolean streamingBatchGetApi, + boolean noAvailableReplicas, + int numKeys) { + validateMetrics( + healthyRequest, + partialHealthyRequest, + true, + useStreamingBatchGetAsDefault, + streamingBatchGetApi, + noAvailableReplicas, + numKeys, + 2); + } + + private void validateMultiGetMetrics( + boolean healthyRequest, + boolean partialHealthyRequest, + boolean useStreamingBatchGetAsDefault, + boolean streamingBatchGetApi, + boolean noAvailableReplicas, + int numKeys, + double numBlockedReplicas) { + validateMetrics( + healthyRequest, + partialHealthyRequest, + true, + useStreamingBatchGetAsDefault, + streamingBatchGetApi, + noAvailableReplicas, + numKeys, + numBlockedReplicas); } private void validateMetrics( boolean healthyRequest, boolean partialHealthyRequest, boolean batchGet, - boolean useStreamingBatchGetAsDefault, - boolean noAvailableReplicas) { + boolean useStreamingBatchGetAsDefault, // use streaming implementation for batchGet + boolean streamingBatchGetApi, // calling streamingBatchGet() API to test + boolean noAvailableReplicas, + int numKeys, + double numBlockedReplicas) { metrics = getStats(clientConfig); - String metricPrefix = "." + STORE_NAME + ((batchGet && useStreamingBatchGetAsDefault) ? "--multiget_" : "--"); + String metricPrefix = "." + STORE_NAME + + (streamingBatchGetApi + ? "--multiget_streaming_" + : ((batchGet && useStreamingBatchGetAsDefault) ? "--multiget_" : "--")); String routeMetricsPrefix = "." + STORE_NAME; - double requestKeyCount; double successKeyCount; - if (useStreamingBatchGetAsDefault) { - if (partialHealthyRequest) { - // batchGet and partialHealthyRequest: 2 request tried but only 1 request is successful - requestKeyCount = 2.0; - successKeyCount = 1.0; - } else { - requestKeyCount = 2.0; - successKeyCount = 2.0; - } + double requestKeyCount = numKeys; + if (partialHealthyRequest) { + // batchGet and partialHealthyRequest: 1 request is unsuccessful + successKeyCount = numKeys - 1; } else { - // single get: 1 - // batchGet using single get: 1 will be recorded twice rather than 2 - requestKeyCount = 1.0; - successKeyCount = 1.0; + successKeyCount = numKeys; } assertTrue(metrics.get(metricPrefix + "request.OccurrenceRate").value() > 0); assertEquals(metrics.get(metricPrefix + "request_key_count.Max").value(), requestKeyCount); @@ -350,16 +376,19 @@ private void validateMetrics( if (noAvailableReplicas) { assertTrue(metrics.get(metricPrefix + "no_available_replica_request_count.OccurrenceRate").value() > 0); TestUtils.waitForNonDeterministicAssertion(5, TimeUnit.SECONDS, () -> { - assertNotNull(metrics.get(routeMetricsPrefix + "_" + REPLICA1_NAME + "--pending_request_count.Max")); - assertEquals( - metrics.get(routeMetricsPrefix + "_" + REPLICA1_NAME + "--pending_request_count.Max").value(), - 1.0); + if (numBlockedReplicas == 2) { + // some test cases only have 1 replica having pending and some have 2. + assertNotNull(metrics.get(routeMetricsPrefix + "_" + REPLICA1_NAME + "--pending_request_count.Max")); + assertEquals( + metrics.get(routeMetricsPrefix + "_" + REPLICA1_NAME + "--pending_request_count.Max").value(), + 1.0); + } assertNotNull(metrics.get(routeMetricsPrefix + "_" + REPLICA2_NAME + "--pending_request_count.Max")); assertEquals( metrics.get(routeMetricsPrefix + "_" + REPLICA2_NAME + "--pending_request_count.Max").value(), 1.0); }); - assertEquals(metrics.get(routeMetricsPrefix + "--blocked_instance_count.Max").value(), 2.0); + assertEquals(metrics.get(routeMetricsPrefix + "--blocked_instance_count.Max").value(), numBlockedReplicas); if (batchGet) { if (useStreamingBatchGetAsDefault) { assertTrue(batchGetRequestContext.noAvailableReplica); @@ -461,7 +490,42 @@ public void testBatchGet(boolean useStreamingBatchGetAsDefault) }); } metrics = getStats(clientConfig, RequestType.MULTI_GET); - validateMultiGetMetrics(true, false, useStreamingBatchGetAsDefault, false); + validateMultiGetMetrics( + true, + false, + useStreamingBatchGetAsDefault, + false, + false, + useStreamingBatchGetAsDefault ? 2 : 1); + } finally { + tearDown(); + } + } + + @Test(dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class, timeOut = TEST_TIMEOUT) + public void testBatchGetWithEmptyKeys(boolean streamingBatchGet) + throws ExecutionException, InterruptedException, IOException { + + try { + setUpClient(true); + batchGetRequestContext = new BatchGetRequestContext<>(); + Map value; + if (streamingBatchGet) { + value = + (Map) statsAvroGenericStoreClient.streamingBatchGet(batchGetRequestContext, new HashSet()) + .get(); + } else { + value = (Map) statsAvroGenericStoreClient.batchGet(batchGetRequestContext, new HashSet()).get(); + } + assertTrue(value.isEmpty()); + metrics = getStats(clientConfig, RequestType.MULTI_GET); + String metricPrefix = "." + STORE_NAME + (streamingBatchGet ? "--multiget_streaming_" : "--multiget_"); + TestUtils.waitForNonDeterministicAssertion(5, TimeUnit.SECONDS, () -> { + assertTrue(metrics.get(metricPrefix + "request.OccurrenceRate").value() > 0); + assertTrue(metrics.get(metricPrefix + "healthy_request.OccurrenceRate").value() > 0); + assertFalse(metrics.get(metricPrefix + "request_key_count.Max").value() > 0); + assertFalse(metrics.get(metricPrefix + "unhealthy_request.OccurrenceRate").value() > 0); + }); } finally { tearDown(); } @@ -496,7 +560,13 @@ public void testBatchGetWithExceptionFromTransportLayer(boolean useStreamingBatc } else { assertTrue(e.getMessage().endsWith("Exception for client to return 503")); } - validateMultiGetMetrics(false, false, useStreamingBatchGetAsDefault, false); + validateMultiGetMetrics( + false, + false, + useStreamingBatchGetAsDefault, + false, + false, + useStreamingBatchGetAsDefault ? 2 : 1); } finally { tearDown(); } @@ -518,7 +588,7 @@ public void testBatchGetWithExceptionFromTransportLayerForOneRoute(boolean useSt BATCH_GET_KEYS.stream().forEach(key -> { assertTrue(SINGLE_GET_VALUE_RESPONSE.contentEquals(value.get(key))); }); - validateMultiGetMetrics(true, false, useStreamingBatchGetAsDefault, false); + validateMultiGetMetrics(true, false, useStreamingBatchGetAsDefault, false, false, 1); } } catch (Exception e) { if (useStreamingBatchGetAsDefault) { @@ -526,7 +596,13 @@ public void testBatchGetWithExceptionFromTransportLayerForOneRoute(boolean useSt } else { fail(); } - validateMultiGetMetrics(false, true, useStreamingBatchGetAsDefault, false); + validateMultiGetMetrics( + false, + true, + useStreamingBatchGetAsDefault, + false, + false, + useStreamingBatchGetAsDefault ? 2 : 1); } finally { tearDown(); } @@ -553,7 +629,7 @@ public void testBatchGetWithTimeoutV1() throws IOException, ExecutionException, TestUtils.waitForNonDeterministicAssertion(routingLeakedRequestCleanupThresholdMS + 1, TimeUnit.SECONDS, () -> { assertTrue(metrics.get("." + STORE_NAME + "--multiget_request.OccurrenceRate").value() > 0); }); - validateMultiGetMetrics(false, true, true, false); + validateMultiGetMetrics(false, true, true, false, false, 2); tearDown(); } } @@ -580,7 +656,7 @@ public void testBatchGetWithTimeoutV2() TestUtils.waitForNonDeterministicAssertion(routingLeakedRequestCleanupThresholdMS + 1, TimeUnit.SECONDS, () -> { assertTrue(metrics.get("." + STORE_NAME + "--multiget_request.OccurrenceRate").value() > 0); }); - validateMultiGetMetrics(false, true, true, false); + validateMultiGetMetrics(false, true, true, false, false, 2); tearDown(); } } @@ -607,7 +683,41 @@ public void testBatchGetWithTimeoutV3() TestUtils.waitForNonDeterministicAssertion(routingLeakedRequestCleanupThresholdMS + 1, TimeUnit.SECONDS, () -> { assertTrue(metrics.get("." + STORE_NAME + "--multiget_request.OccurrenceRate").value() > 0); }); - validateMultiGetMetrics(false, true, true, false); + validateMultiGetMetrics(false, true, true, false, false, 2); + tearDown(); + } + } + + /** + * 1st batchGet(1 key) blocks one replica and 2nd batchGet(2 keys) returns value for only 1 key as the other + * route is blocked, so batchGet() return an exception. + */ + @Test(timeOut = TEST_TIMEOUT) + public void testBatchGetWithTimeoutV4() throws IOException { + long routingLeakedRequestCleanupThresholdMS = TimeUnit.SECONDS.toMillis(1); + try { + setUpClient(true, false, false, true, true, routingLeakedRequestCleanupThresholdMS); + batchGetRequestContext = new BatchGetRequestContext<>(); + statsAvroGenericStoreClient.batchGet(batchGetRequestContext, BATCH_GET_PARTIAL_KEYS_2).get(); + fail(); + } catch (Exception e) { + assertTrue(e.getMessage().endsWith("At least one route did not complete")); + + try { + batchGetRequestContext = new BatchGetRequestContext<>(); + statsAvroGenericStoreClient.batchGet(batchGetRequestContext, BATCH_GET_KEYS).get(); + fail(); + } catch (Exception e1) { + assertTrue(e1.getMessage().endsWith("Response was not complete")); + assertTrue(e1.getCause().getCause().getMessage().matches(".*No available route for.*")); + // wait for routingLeakedRequestCleanupThresholdMS for the metrics to be increased + metrics = getStats(clientConfig); + TestUtils.waitForNonDeterministicAssertion(routingLeakedRequestCleanupThresholdMS + 1, TimeUnit.SECONDS, () -> { + assertTrue(metrics.get("." + STORE_NAME + "--multiget_request.OccurrenceRate").value() > 0); + }); + validateMultiGetMetrics(false, true, true, false, true, 2, 1); + } + } finally { tearDown(); } } @@ -632,9 +742,9 @@ public void testStreamingBatchGetWithTimeoutV1() throws IOException, ExecutionEx // wait for routingLeakedRequestCleanupThresholdMS for the metrics to be increased metrics = getStats(clientConfig); TestUtils.waitForNonDeterministicAssertion(routingLeakedRequestCleanupThresholdMS + 1, TimeUnit.SECONDS, () -> { - assertTrue(metrics.get("." + STORE_NAME + "--multiget_request.OccurrenceRate").value() > 0); + assertTrue(metrics.get("." + STORE_NAME + "--multiget_streaming_request.OccurrenceRate").value() > 0); }); - validateMultiGetMetrics(false, true, true, false); + validateMultiGetMetrics(false, true, true, true, false, 2); tearDown(); } } @@ -660,9 +770,9 @@ public void testStreamingBatchGetWithTimeoutV2() // wait for routingLeakedRequestCleanupThresholdMS for the metrics to be increased metrics = getStats(clientConfig); TestUtils.waitForNonDeterministicAssertion(routingLeakedRequestCleanupThresholdMS + 1, TimeUnit.SECONDS, () -> { - assertTrue(metrics.get("." + STORE_NAME + "--multiget_request.OccurrenceRate").value() > 0); + assertTrue(metrics.get("." + STORE_NAME + "--multiget_streaming_request.OccurrenceRate").value() > 0); }); - validateMultiGetMetrics(false, true, true, false); + validateMultiGetMetrics(false, true, true, true, false, 2); tearDown(); } } @@ -689,9 +799,9 @@ public void testStreamingBatchGetWithTimeoutV3() // wait for routingLeakedRequestCleanupThresholdMS for the metrics to be increased metrics = getStats(clientConfig); TestUtils.waitForNonDeterministicAssertion(routingLeakedRequestCleanupThresholdMS + 1, TimeUnit.SECONDS, () -> { - assertTrue(metrics.get("." + STORE_NAME + "--multiget_request.OccurrenceRate").value() > 0); + assertTrue(metrics.get("." + STORE_NAME + "--multiget_streaming_request.OccurrenceRate").value() > 0); }); - validateMultiGetMetrics(false, true, true, false); + validateMultiGetMetrics(false, true, true, true, false, 2); } finally { tearDown(); } @@ -712,7 +822,13 @@ public void testBatchGetToUnreachableClient(boolean useStreamingBatchGetAsDefaul } else { assertTrue(e.getMessage().endsWith("http status: 410, Request timed out")); } - validateMultiGetMetrics(false, false, useStreamingBatchGetAsDefault, false); + validateMultiGetMetrics( + false, + false, + useStreamingBatchGetAsDefault, + false, + false, + useStreamingBatchGetAsDefault ? 2 : 1); try { // the second batchGet is not going to find any routes (as the instances @@ -721,16 +837,163 @@ public void testBatchGetToUnreachableClient(boolean useStreamingBatchGetAsDefaul statsAvroGenericStoreClient.batchGet(batchGetRequestContext, BATCH_GET_KEYS).get(); fail(); } catch (Exception e1) { - System.out.println("Exception is: " + e1.getMessage()); if (useStreamingBatchGetAsDefault) { - assertTrue(e1.getMessage().endsWith("Response was not complete")); + assertTrue(e1.getMessage().endsWith("At least one route did not complete")); + assertTrue(e1.getCause().getCause().getMessage().matches(".*No available route for store.*")); } else { assertTrue(e1.getMessage().matches(".*No available route for store.*")); } - validateMultiGetMetrics(false, false, useStreamingBatchGetAsDefault, true); + validateMultiGetMetrics( + false, + false, + useStreamingBatchGetAsDefault, + false, + true, + useStreamingBatchGetAsDefault ? 2 : 1); } } finally { tearDown(); } } + + /** + * 1. 1st streamingBatchGet(2 keys) will result in both the routes marked as + * blocked(routingPendingRequestCounterInstanceBlockThreshold is 1). + * 2. 2nd streamingBatchGet(2 keys) will result in both route getting no replica + * found leading to exception. + */ + @Test(timeOut = TEST_TIMEOUT) + public void testStreamingBatchGetToUnreachableClient() throws IOException { + try { + setUpClient(true, false, false, false, false, TimeUnit.SECONDS.toMillis(1)); + batchGetRequestContext = new BatchGetRequestContext<>(); + statsAvroGenericStoreClient.streamingBatchGet(batchGetRequestContext, BATCH_GET_KEYS).get(); + fail(); + } catch (Exception e) { + // First batchGet fails with unreachable host after timeout and this adds the hosts + // as blocked due to setRoutingPendingRequestCounterInstanceBlockThreshold(1) + assertTrue(e.getMessage().endsWith("At least one route did not complete")); + validateMultiGetMetrics(false, false, true, true, false, 2); + + try { + // the second batchGet is not going to find any routes (as the instances + // are blocked) and fail instantly + batchGetRequestContext = new BatchGetRequestContext<>(); + statsAvroGenericStoreClient.streamingBatchGet(batchGetRequestContext, BATCH_GET_KEYS).get(); + fail(); + } catch (Exception e1) { + assertTrue(e1.getMessage().endsWith("At least one route did not complete")); + validateMultiGetMetrics(false, false, true, true, true, 2); + } + } finally { + tearDown(); + } + } + + /** + * 1. first streamingBatchGet() with 1 key getting timed out leading to the route getting + * blocked (routingPendingRequestCounterInstanceBlockThreshold = 1) + * 2. second streamingBatchGet() with 2 keys (one key (same as above) failing with no available + * replica and one key getting timed out) => exception + */ + @Test(timeOut = TEST_TIMEOUT) + public void testStreamingBatchGetToUnreachableClientV1() throws IOException { + try { + setUpClient(true, false, false, false, false, TimeUnit.SECONDS.toMillis(1)); + batchGetRequestContext = new BatchGetRequestContext<>(); + statsAvroGenericStoreClient.streamingBatchGet(batchGetRequestContext, BATCH_GET_PARTIAL_KEYS_1).get(); + fail(); + } catch (Exception e) { + // First batchGet fails with unreachable host after timeout and this adds the hosts + // as blocked due to setRoutingPendingRequestCounterInstanceBlockThreshold(1) + assertTrue(e.getMessage().endsWith("At least one route did not complete")); + validateMultiGetMetrics(false, false, true, true, false, 1); + + try { + batchGetRequestContext = new BatchGetRequestContext<>(); + statsAvroGenericStoreClient.streamingBatchGet(batchGetRequestContext, BATCH_GET_KEYS).get(); + fail(); + } catch (Exception e1) { + assertTrue(e1.getMessage().endsWith("At least one route did not complete")); + validateMultiGetMetrics(false, false, true, true, true, 2); + } + } finally { + tearDown(); + } + } + + /** + * 1. first streamingBatchGet() with 1 key getting timed out leading to the route getting + * blocked (routingPendingRequestCounterInstanceBlockThreshold = 1) + * 2. second streamingBatchGet() with 2 keys (one key (same as above) failing with no available + * replica and one key returning value) => no exception + */ + @Test(timeOut = TEST_TIMEOUT) + public void testStreamingBatchGetToUnreachableClientV2() + throws IOException, ExecutionException, InterruptedException { + try { + setUpClient(true, false, false, true, true, TimeUnit.SECONDS.toMillis(1)); + batchGetRequestContext = new BatchGetRequestContext<>(); + statsAvroGenericStoreClient.streamingBatchGet(batchGetRequestContext, BATCH_GET_PARTIAL_KEYS_2).get(); + fail(); + } catch (Exception e) { + // First batchGet fails with unreachable host after timeout and this adds the hosts + // as blocked due to setRoutingPendingRequestCounterInstanceBlockThreshold(1) + assertTrue(e.getMessage().endsWith("At least one route did not complete")); + validateMultiGetMetrics(false, false, true, true, false, 1); + batchGetRequestContext = new BatchGetRequestContext<>(); + + CompletableFuture> future = + statsAvroGenericStoreClient.streamingBatchGet(batchGetRequestContext, BATCH_GET_KEYS); + VeniceResponseMap value = future.get(); + assertFalse(value.isFullResponse()); + // TODO metric validation: 1st get increments unhealthy metrics and the second get increments healthy + // metrics + // validateMultiGetMetrics(true, true, true, true, 2); + } finally { + tearDown(); + } + } + + /** + * same as testStreamingBatchGetToUnreachableClientV2: transportClientThrowsPartialException instead of + * transportClientPartialIncomplete. + */ + @Test(timeOut = TEST_TIMEOUT) + public void testStreamingBatchGetToUnreachableClientV3() throws IOException { + try { + setUpClient(true, false, true, false, true, TimeUnit.SECONDS.toMillis(1)); + batchGetRequestContext = new BatchGetRequestContext<>(); + statsAvroGenericStoreClient.streamingBatchGet(batchGetRequestContext, BATCH_GET_KEYS).get(); + fail(); + } catch (Exception e) { + // First batchGet fails with unreachable host after timeout and this adds the hosts + // as blocked due to setRoutingPendingRequestCounterInstanceBlockThreshold(1) + assertTrue(e.getMessage().endsWith("At least one route did not complete")); + validateMultiGetMetrics(false, true, true, true, false, 2); + } finally { + tearDown(); + } + } + + /** + * same as testStreamingBatchGetToUnreachableClientV3 but the transport mock throws exception + * for both the routes. + */ + @Test(timeOut = TEST_TIMEOUT) + public void testStreamingBatchGetToUnreachableClientV4() throws IOException { + try { + setUpClient(true, true, false, false, true, TimeUnit.SECONDS.toMillis(1)); + batchGetRequestContext = new BatchGetRequestContext<>(); + statsAvroGenericStoreClient.streamingBatchGet(batchGetRequestContext, BATCH_GET_KEYS).get(); + fail(); + } catch (Exception e) { + // First batchGet fails with unreachable host after timeout and this adds the hosts + // as blocked due to setRoutingPendingRequestCounterInstanceBlockThreshold(1) + assertTrue(e.getMessage().endsWith("At least one route did not complete")); + validateMultiGetMetrics(false, false, true, true, false, 2); + } finally { + tearDown(); + } + } } diff --git a/clients/venice-client/src/test/java/com/linkedin/venice/fastclient/RetriableAvroGenericStoreClientTest.java b/clients/venice-client/src/test/java/com/linkedin/venice/fastclient/RetriableAvroGenericStoreClientTest.java index 985454c2e0..1cd7786180 100644 --- a/clients/venice-client/src/test/java/com/linkedin/venice/fastclient/RetriableAvroGenericStoreClientTest.java +++ b/clients/venice-client/src/test/java/com/linkedin/venice/fastclient/RetriableAvroGenericStoreClientTest.java @@ -32,6 +32,7 @@ import java.util.concurrent.TimeUnit; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @@ -59,6 +60,22 @@ public class RetriableAvroGenericStoreClientTest { private StatsAvroGenericStoreClient statsAvroGenericStoreClient; private Map metrics; + @DataProvider(name = "FastClient-Single-Get-MultiGet-And-Streaming-MultiGet") + public Object[][] twoBoolean() { + return DataProviderUtils.allPermutationGenerator((permutation) -> { + boolean batchGet = (boolean) permutation[0]; + boolean streamingBatchGet = (boolean) permutation[1]; + if (!batchGet) { + if (streamingBatchGet) { + return false; + } + } + return true; + }, + DataProviderUtils.BOOLEAN, // batchGet + DataProviderUtils.BOOLEAN); // streamingBatchGet + } + @BeforeClass public void setUp() { timeoutProcessor = new TimeoutProcessor(null, true, 1); @@ -227,10 +244,10 @@ private void testSingleGetAndValidateMetrics( } } - validateMetrics(false, errorRetry, longTailRetry, retryWin); + validateMetrics(false, false, errorRetry, longTailRetry, retryWin); } - private void testBatchGetAndvalidateMetrics( + private void testBatchGetAndValidateMetrics( boolean bothOriginalAndRetryFails, boolean longTailRetry, boolean retryWin, @@ -255,7 +272,36 @@ private void testBatchGetAndvalidateMetrics( } } - validateMetrics(true, false, longTailRetry, retryWin); + validateMetrics(true, false, false, longTailRetry, retryWin); + } + + private void testStreamingBatchGetAndValidateMetrics( + boolean bothOriginalAndRetryFails, + boolean longTailRetry, + boolean retryWin, + boolean keyNotFound) throws ExecutionException, InterruptedException { + batchGetRequestContext = new BatchGetRequestContext<>(); + try { + Map value = + (Map) statsAvroGenericStoreClient.streamingBatchGet(batchGetRequestContext, BATCH_GET_KEYS) + .get(); + + if (bothOriginalAndRetryFails) { + fail("An ExecutionException should be thrown here"); + } + + if (keyNotFound) { + assertEquals(value, BATCH_GET_VALUE_RESPONSE_KEY_NOT_FOUND_CASE); + } else { + assertEquals(value, BATCH_GET_VALUE_RESPONSE); + } + } catch (ExecutionException e) { + if (!bothOriginalAndRetryFails) { + throw e; + } + } + + validateMetrics(true, true, false, longTailRetry, retryWin); } /** @@ -265,11 +311,20 @@ private void testBatchGetAndvalidateMetrics( * @param longTailRetry request is retried because the original request is taking more time * @param retryWin retry request wins */ - private void validateMetrics(boolean batchGet, boolean errorRetry, boolean longTailRetry, boolean retryWin) { + private void validateMetrics( + boolean batchGet, + boolean streamingBatchGet, + boolean errorRetry, + boolean longTailRetry, + boolean retryWin) { metrics = getStats(clientConfig); - String metricsPrefix = "." + STORE_NAME + (batchGet ? "--multiget_" : "--"); + String metricsPrefix = + "." + STORE_NAME + (batchGet ? (streamingBatchGet ? "--multiget_streaming_" : "--multiget_") : "--"); double expectedKeyCount = batchGet ? 2.0 : 1.0; - assertTrue(metrics.get(metricsPrefix + "request.OccurrenceRate").value() > 0); + + TestUtils.waitForNonDeterministicAssertion(5, TimeUnit.SECONDS, () -> { + assertTrue(metrics.get(metricsPrefix + "request.OccurrenceRate").value() > 0); + }); assertEquals(metrics.get(metricsPrefix + "request_key_count.Max").value(), expectedKeyCount); if (errorRetry || longTailRetry) { @@ -350,15 +405,15 @@ public void testGetWithoutTriggeringLongTailRetry(boolean batchGet, boolean keyN if (!batchGet) { testSingleGetAndValidateMetrics(false, false, false, false, keyNotFound); } else { - testBatchGetAndvalidateMetrics(false, false, false, keyNotFound); + testBatchGetAndValidateMetrics(false, false, false, keyNotFound); } } /** * Original request latency is higher than retry threshold, but still faster than retry request */ - @Test(dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class, timeOut = TEST_TIMEOUT) - public void testGetWithTriggeringLongTailRetryAndOriginalWins(boolean batchGet) + @Test(dataProvider = "FastClient-Single-Get-MultiGet-And-Streaming-MultiGet", timeOut = TEST_TIMEOUT) + public void testGetWithTriggeringLongTailRetryAndOriginalWins(boolean batchGet, boolean streamingBatchGet) throws ExecutionException, InterruptedException { clientConfigBuilder.setMetricsRepository(new MetricsRepository()); clientConfig = clientConfigBuilder.build(); @@ -375,7 +430,11 @@ public void testGetWithTriggeringLongTailRetryAndOriginalWins(boolean batchGet) if (!batchGet) { testSingleGetAndValidateMetrics(false, false, true, false, false); } else { - testBatchGetAndvalidateMetrics(false, true, false, false); + if (streamingBatchGet) { + testStreamingBatchGetAndValidateMetrics(false, true, false, false); + } else { + testBatchGetAndValidateMetrics(false, true, false, false); + } } } @@ -400,15 +459,15 @@ public void testGetWithTriggeringLongTailRetryAndRetryWins(boolean batchGet, boo if (!batchGet) { testSingleGetAndValidateMetrics(false, false, true, true, keyNotFound); } else { - testBatchGetAndvalidateMetrics(false, true, true, keyNotFound); + testBatchGetAndValidateMetrics(false, true, true, keyNotFound); } } /** * Original request fails and retry succeeds. */ - @Test(dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class, timeOut = TEST_TIMEOUT) - public void testGetWithTriggeringErrorRetryAndRetryWins(boolean batchGet) + @Test(dataProvider = "FastClient-Single-Get-MultiGet-And-Streaming-MultiGet", timeOut = TEST_TIMEOUT) + public void testGetWithTriggeringErrorRetryAndRetryWins(boolean batchGet, boolean streamingBatchGet) throws ExecutionException, InterruptedException { clientConfigBuilder.setMetricsRepository(new MetricsRepository()); clientConfig = clientConfigBuilder.build(); @@ -419,15 +478,19 @@ public void testGetWithTriggeringErrorRetryAndRetryWins(boolean batchGet) if (!batchGet) { testSingleGetAndValidateMetrics(false, true, false, true, false); } else { - testBatchGetAndvalidateMetrics(false, true, true, false); + if (streamingBatchGet) { + testStreamingBatchGetAndValidateMetrics(false, true, true, false); + } else { + testBatchGetAndValidateMetrics(false, true, true, false); + } } } /** * Original request latency exceeds the retry threshold but succeeds and the retry fails. */ - @Test(dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class, timeOut = TEST_TIMEOUT) - public void testGetWithTriggeringLongTailRetryAndRetryFails(boolean batchGet) + @Test(dataProvider = "FastClient-Single-Get-MultiGet-And-Streaming-MultiGet", timeOut = TEST_TIMEOUT) + public void testGetWithTriggeringLongTailRetryAndRetryFails(boolean batchGet, boolean streamingBatchGet) throws ExecutionException, InterruptedException { clientConfigBuilder.setMetricsRepository(new MetricsRepository()); clientConfig = clientConfigBuilder.build(); @@ -438,15 +501,19 @@ public void testGetWithTriggeringLongTailRetryAndRetryFails(boolean batchGet) if (!batchGet) { testSingleGetAndValidateMetrics(false, false, true, false, false); } else { - testBatchGetAndvalidateMetrics(false, true, false, false); + if (streamingBatchGet) { + testStreamingBatchGetAndValidateMetrics(false, true, false, false); + } else { + testBatchGetAndValidateMetrics(false, true, false, false); + } } } /** * Original request latency exceeds the retry threshold, and both the original request and the retry fails. */ - @Test(dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class, timeOut = TEST_TIMEOUT) - public void testGetWithTriggeringLongTailRetryAndBothFailsV1(boolean batchGet) + @Test(dataProvider = "FastClient-Single-Get-MultiGet-And-Streaming-MultiGet", timeOut = TEST_TIMEOUT) + public void testGetWithTriggeringLongTailRetryAndBothFailsV1(boolean batchGet, boolean streamingBatchGet) throws InterruptedException, ExecutionException { clientConfigBuilder.setMetricsRepository(new MetricsRepository()); clientConfig = clientConfigBuilder.build(); @@ -463,15 +530,19 @@ public void testGetWithTriggeringLongTailRetryAndBothFailsV1(boolean batchGet) if (!batchGet) { testSingleGetAndValidateMetrics(true, false, true, false, false); } else { - testBatchGetAndvalidateMetrics(true, true, false, false); + if (streamingBatchGet) { + testStreamingBatchGetAndValidateMetrics(true, true, false, false); + } else { + testBatchGetAndValidateMetrics(true, true, false, false); + } } } /** * Original request latency is lower than the retry threshold, and both the original request and the retry fails. */ - @Test(dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class, timeOut = TEST_TIMEOUT) - public void testGetWithTriggeringLongTailRetryAndBothFailsV2(boolean batchGet) + @Test(dataProvider = "FastClient-Single-Get-MultiGet-And-Streaming-MultiGet", timeOut = TEST_TIMEOUT) + public void testGetWithTriggeringLongTailRetryAndBothFailsV2(boolean batchGet, boolean streamingBatchGet) throws InterruptedException, ExecutionException { clientConfigBuilder.setMetricsRepository(new MetricsRepository()); clientConfig = clientConfigBuilder.build(); @@ -488,7 +559,11 @@ public void testGetWithTriggeringLongTailRetryAndBothFailsV2(boolean batchGet) if (!batchGet) { testSingleGetAndValidateMetrics(true, true, false, false, false); } else { - testBatchGetAndvalidateMetrics(true, true, false, false); + if (streamingBatchGet) { + testStreamingBatchGetAndValidateMetrics(true, true, false, false); + } else { + testBatchGetAndValidateMetrics(true, true, false, false); + } } } } diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/fastclient/AvroStoreClientEndToEndTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/fastclient/AvroStoreClientEndToEndTest.java index 37c8c23546..56023ca447 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/fastclient/AvroStoreClientEndToEndTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/fastclient/AvroStoreClientEndToEndTest.java @@ -1,5 +1,6 @@ package com.linkedin.venice.fastclient; +import static com.linkedin.venice.utils.Time.MS_PER_SECOND; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; @@ -228,13 +229,14 @@ protected void runTest( } } - @Test(dataProvider = "FastClient-Five-Boolean-A-Number-Store-Metadata-Fetch-Mode", timeOut = TIME_OUT) + @Test(dataProvider = "FastClient-Six-Boolean-A-Number-Store-Metadata-Fetch-Mode", timeOut = TIME_OUT) public void testFastClientGet( boolean dualRead, boolean speculativeQueryEnabled, boolean batchGet, boolean useStreamingBatchGetAsDefault, boolean enableGrpc, + boolean retryEnabled, int batchGetKeySize, StoreMetadataFetchMode storeMetadataFetchMode) throws Exception { ClientConfig.ClientConfigBuilder clientConfigBuilder = @@ -256,6 +258,15 @@ public void testFastClientGet( setUpGrpcFastClient(clientConfigBuilder); } + if (retryEnabled) { + // enable retry to test the code path: to mimic retry in integration tests + // can be non-deterministic, so setting big retry threshold to not actually retry + clientConfigBuilder.setLongTailRetryEnabledForSingleGet(true) + .setLongTailRetryThresholdForSingleGetInMicroSeconds(TIME_OUT * MS_PER_SECOND) + .setLongTailRetryEnabledForBatchGet(true) + .setLongTailRetryThresholdForBatchGetInMicroSeconds(TIME_OUT * MS_PER_SECOND); + } + // dualRead needs thinClient AvroGenericStoreClient genericThinClient = null; AvroSpecificStoreClient specificThinClient = null; @@ -295,6 +306,7 @@ public void testFastClientGet( fastClientStatsValidation = metricsRepository -> validateBatchGetMetrics( metricsRepository, useStreamingBatchGetAsDefault, + false, batchGetKeySize, batchGetKeySize, false); @@ -407,6 +419,7 @@ public void testFastClientGetWithDifferentHTTPVariants( fastClientStatsValidation = metricsRepository -> validateBatchGetMetrics( metricsRepository, true, // testing batch get with useStreamingBatchGetAsDefault as true + false, recordCnt, recordCnt, false); @@ -438,7 +451,7 @@ public void testFastClientWithLongTailRetry(boolean batchGet) throws Exception { clientConfigBuilder.setLongTailRetryEnabledForBatchGet(true) .setLongTailRetryThresholdForBatchGetInMicroSeconds(1); fastClientStatsValidation = - metricsRepository -> validateBatchGetMetrics(metricsRepository, true, recordCnt, recordCnt, true); + metricsRepository -> validateBatchGetMetrics(metricsRepository, true, false, recordCnt, recordCnt, true); } else { clientConfigBuilder.setLongTailRetryEnabledForSingleGet(true) .setLongTailRetryThresholdForSingleGetInMicroSeconds(1); diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/fastclient/AvroStoreClientGzipEndToEndTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/fastclient/AvroStoreClientGzipEndToEndTest.java index 6835edbfb1..160b8acfa7 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/fastclient/AvroStoreClientGzipEndToEndTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/fastclient/AvroStoreClientGzipEndToEndTest.java @@ -18,13 +18,21 @@ public class AvroStoreClientGzipEndToEndTest extends AvroStoreClientEndToEndTest { @Override - @DataProvider(name = "FastClient-Five-Boolean-A-Number-Store-Metadata-Fetch-Mode") - public Object[][] fiveBooleanANumberStoreMetadataFetchMode() { + @DataProvider(name = "FastClient-Six-Boolean-A-Number-Store-Metadata-Fetch-Mode") + public Object[][] sixBooleanANumberStoreMetadataFetchMode() { return DataProviderUtils.allPermutationGenerator((permutation) -> { + boolean speculativeQueryEnabled = (boolean) permutation[1]; + if (speculativeQueryEnabled) { + return false; + } boolean batchGet = (boolean) permutation[2]; boolean useStreamingBatchGetAsDefault = (boolean) permutation[3]; - int batchGetKeySize = (int) permutation[5]; - StoreMetadataFetchMode storeMetadataFetchMode = (StoreMetadataFetchMode) permutation[6]; + boolean retryEnabled = (boolean) permutation[5]; + if (retryEnabled) { + return false; + } + int batchGetKeySize = (int) permutation[6]; + StoreMetadataFetchMode storeMetadataFetchMode = (StoreMetadataFetchMode) permutation[7]; if (!batchGet) { if (useStreamingBatchGetAsDefault || batchGetKeySize != (int) BATCH_GET_KEY_SIZE.get(0)) { // these parameters are related only to batchGet, so just allowing 1 set @@ -32,7 +40,8 @@ public Object[][] fiveBooleanANumberStoreMetadataFetchMode() { return false; } } - if (storeMetadataFetchMode == StoreMetadataFetchMode.DA_VINCI_CLIENT_BASED_METADATA) { + + if (storeMetadataFetchMode != StoreMetadataFetchMode.SERVER_BASED_METADATA) { return false; } return true; @@ -42,6 +51,7 @@ public Object[][] fiveBooleanANumberStoreMetadataFetchMode() { DataProviderUtils.BOOLEAN, // batchGet DataProviderUtils.BOOLEAN_TRUE, // useStreamingBatchGetAsDefault DataProviderUtils.BOOLEAN, // enableGrpc + DataProviderUtils.BOOLEAN, // retryEnabled BATCH_GET_KEY_SIZE.toArray(), // batchGetKeySize STORE_METADATA_FETCH_MODES); // storeMetadataFetchMode } diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/fastclient/AvroStoreClientZstdEndToEndTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/fastclient/AvroStoreClientZstdEndToEndTest.java index 95667931c4..41f23ea7da 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/fastclient/AvroStoreClientZstdEndToEndTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/fastclient/AvroStoreClientZstdEndToEndTest.java @@ -22,13 +22,21 @@ public class AvroStoreClientZstdEndToEndTest extends AvroStoreClientEndToEndTest { @Override - @DataProvider(name = "FastClient-Five-Boolean-A-Number-Store-Metadata-Fetch-Mode") - public Object[][] fiveBooleanANumberStoreMetadataFetchMode() { + @DataProvider(name = "FastClient-Six-Boolean-A-Number-Store-Metadata-Fetch-Mode") + public Object[][] sixBooleanANumberStoreMetadataFetchMode() { return DataProviderUtils.allPermutationGenerator((permutation) -> { + boolean speculativeQueryEnabled = (boolean) permutation[1]; + if (speculativeQueryEnabled) { + return false; + } boolean batchGet = (boolean) permutation[2]; boolean useStreamingBatchGetAsDefault = (boolean) permutation[3]; - int batchGetKeySize = (int) permutation[5]; - StoreMetadataFetchMode storeMetadataFetchMode = (StoreMetadataFetchMode) permutation[6]; + boolean retryEnabled = (boolean) permutation[5]; + if (retryEnabled) { + return false; + } + int batchGetKeySize = (int) permutation[6]; + StoreMetadataFetchMode storeMetadataFetchMode = (StoreMetadataFetchMode) permutation[7]; if (!batchGet) { if (useStreamingBatchGetAsDefault || batchGetKeySize != (int) BATCH_GET_KEY_SIZE.get(0)) { // these parameters are related only to batchGet, so just allowing 1 set @@ -36,7 +44,7 @@ public Object[][] fiveBooleanANumberStoreMetadataFetchMode() { return false; } } - if (storeMetadataFetchMode == StoreMetadataFetchMode.DA_VINCI_CLIENT_BASED_METADATA) { + if (storeMetadataFetchMode != StoreMetadataFetchMode.SERVER_BASED_METADATA) { return false; } return true; @@ -46,6 +54,7 @@ public Object[][] fiveBooleanANumberStoreMetadataFetchMode() { DataProviderUtils.BOOLEAN, // batchGet DataProviderUtils.BOOLEAN_TRUE, // useStreamingBatchGetAsDefault DataProviderUtils.BOOLEAN, // enableGrpc + DataProviderUtils.BOOLEAN, // retryEnabled BATCH_GET_KEY_SIZE.toArray(), // batchGetKeySize STORE_METADATA_FETCH_MODES); // storeMetadataFetchMode } diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/fastclient/BatchGetAvroStoreClientTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/fastclient/BatchGetAvroStoreClientTest.java index e3b0a9b99e..8e8d8a7cc5 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/fastclient/BatchGetAvroStoreClientTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/fastclient/BatchGetAvroStoreClientTest.java @@ -1,6 +1,6 @@ package com.linkedin.venice.fastclient; -import static com.linkedin.venice.utils.Time.US_PER_SECOND; +import static com.linkedin.venice.utils.Time.MS_PER_SECOND; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; @@ -135,10 +135,10 @@ public void testBatchGetGenericClient( .setUseStreamingBatchGetAsDefault(useStreamingBatchGetAsDefault); if (retryEnabled) { - // enable retry to test the code path: mimic retry in integration tests - // be non-deterministic, so setting big retry threshold to not actually retry + // enable retry to test the code path: to mimic retry in integration tests + // can be non-deterministic, so setting big retry threshold to not actually retry clientConfigBuilder.setLongTailRetryEnabledForBatchGet(true) - .setLongTailRetryThresholdForBatchGetInMicroSeconds(10 * US_PER_SECOND); + .setLongTailRetryThresholdForBatchGetInMicroSeconds(TIME_OUT * MS_PER_SECOND); } MetricsRepository metricsRepository = new MetricsRepository(); @@ -158,7 +158,7 @@ public void testBatchGetGenericClient( assertEquals(value.get(VALUE_FIELD_NAME), i); } - validateBatchGetMetrics(metricsRepository, useStreamingBatchGetAsDefault, recordCnt + 1, recordCnt, false); + validateBatchGetMetrics(metricsRepository, useStreamingBatchGetAsDefault, false, recordCnt + 1, recordCnt, false); FastClientStats stats = clientConfig.getStats(RequestType.MULTI_GET); LOGGER.info("STATS: {}", stats.buildSensorStatSummary("multiget_healthy_request_latency")); @@ -195,20 +195,28 @@ public void testBatchGetSpecificClient( assertEquals(value.get(VALUE_FIELD_NAME), i); } - validateBatchGetMetrics(metricsRepository, useStreamingBatchGetAsDefault, recordCnt, recordCnt, false); + validateBatchGetMetrics(metricsRepository, useStreamingBatchGetAsDefault, false, recordCnt, recordCnt, false); specificFastClient.close(); printAllStats(); } - @Test(dataProvider = "StoreMetadataFetchModes", timeOut = TIME_OUT) - public void testStreamingBatchGetGenericClient(StoreMetadataFetchMode storeMetadataFetchMode) throws Exception { + @Test(dataProvider = "Boolean-And-StoreMetadataFetchModes", timeOut = TIME_OUT) + public void testStreamingBatchGetGenericClient(boolean retryEnabled, StoreMetadataFetchMode storeMetadataFetchMode) + throws Exception { ClientConfig.ClientConfigBuilder clientConfigBuilder = new ClientConfig.ClientConfigBuilder<>().setStoreName(storeName) .setR2Client(r2Client) - .setSpeculativeQueryEnabled(true) + .setSpeculativeQueryEnabled(false) .setDualReadEnabled(false); + if (retryEnabled) { + // enable retry to test the code path: to mimic retry in integration tests + // can be non-deterministic, so setting big retry threshold to not actually retry + clientConfigBuilder.setLongTailRetryEnabledForBatchGet(true) + .setLongTailRetryThresholdForBatchGetInMicroSeconds(TIME_OUT * MS_PER_SECOND); + } + MetricsRepository metricsRepository = new MetricsRepository(); AvroGenericStoreClient genericFastClient = getGenericFastClient(clientConfigBuilder, metricsRepository, storeMetadataFetchMode); @@ -247,18 +255,26 @@ public void testStreamingBatchGetGenericClient(StoreMetadataFetchMode storeMetad 1, "Incorrect non existing key size . Expected 1 got " + veniceResponseMap.getNonExistingKeys().size()); - validateBatchGetMetrics(metricsRepository, true, recordCnt + 1, recordCnt, false); + validateBatchGetMetrics(metricsRepository, true, true, recordCnt + 1, recordCnt, false); } - @Test(dataProvider = "StoreMetadataFetchModes", timeOut = TIME_OUT) - public void testStreamingBatchGetWithCallbackGenericClient(StoreMetadataFetchMode storeMetadataFetchMode) - throws Exception { + @Test(dataProvider = "Boolean-And-StoreMetadataFetchModes", timeOut = TIME_OUT) + public void testStreamingBatchGetWithCallbackGenericClient( + boolean retryEnabled, + StoreMetadataFetchMode storeMetadataFetchMode) throws Exception { ClientConfig.ClientConfigBuilder clientConfigBuilder = new ClientConfig.ClientConfigBuilder<>().setStoreName(storeName) .setR2Client(r2Client) - .setSpeculativeQueryEnabled(true) + .setSpeculativeQueryEnabled(false) .setDualReadEnabled(false); + if (retryEnabled) { + // enable retry to test the code path: to mimic retry in integration tests + // can be non-deterministic, so setting big retry threshold to not actually retry + clientConfigBuilder.setLongTailRetryEnabledForBatchGet(true) + .setLongTailRetryThresholdForBatchGetInMicroSeconds(TIME_OUT * MS_PER_SECOND); + } + MetricsRepository metricsRepository = new MetricsRepository(); AvroGenericStoreClient genericFastClient = getGenericFastClient(clientConfigBuilder, metricsRepository, storeMetadataFetchMode); @@ -323,7 +339,7 @@ public void onCompletion(Optional exception) { "STATS: latency -> {}", stats.buildSensorStatSummary("multiget_healthy_request_latency", "99thPercentile")); - validateBatchGetMetrics(metricsRepository, true, recordCnt + 1, recordCnt, false); + validateBatchGetMetrics(metricsRepository, true, true, recordCnt + 1, recordCnt, false); printAllStats(); } } diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/fastclient/utils/AbstractClientEndToEndSetup.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/fastclient/utils/AbstractClientEndToEndSetup.java index 5df52fa030..03102f0f81 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/fastclient/utils/AbstractClientEndToEndSetup.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/fastclient/utils/AbstractClientEndToEndSetup.java @@ -109,7 +109,7 @@ public abstract class AbstractClientEndToEndSetup { protected ClientConfig clientConfig; - protected static final long TIME_OUT = 60 * Time.MS_PER_SECOND; + protected static final int TIME_OUT = 60 * Time.MS_PER_SECOND; protected static final String KEY_SCHEMA_STR = "\"string\""; protected static final String VALUE_FIELD_NAME = "int_field"; protected static final String VALUE_SCHEMA_STR = "{\n" + "\"type\": \"record\",\n" @@ -133,12 +133,15 @@ public abstract class AbstractClientEndToEndSetup { */ protected static final ImmutableList BATCH_GET_KEY_SIZE = ImmutableList.of(2, recordCnt); - @DataProvider(name = "FastClient-Five-Boolean-A-Number-Store-Metadata-Fetch-Mode") - public Object[][] fiveBooleanANumberStoreMetadataFetchMode() { + @DataProvider(name = "FastClient-Six-Boolean-A-Number-Store-Metadata-Fetch-Mode") + public Object[][] sixBooleanANumberStoreMetadataFetchMode() { return DataProviderUtils.allPermutationGenerator((permutation) -> { + boolean speculativeQueryEnabled = (boolean) permutation[1]; boolean batchGet = (boolean) permutation[2]; boolean useStreamingBatchGetAsDefault = (boolean) permutation[3]; - int batchGetKeySize = (int) permutation[5]; + boolean retryEnabled = (boolean) permutation[5]; + int batchGetKeySize = (int) permutation[6]; + StoreMetadataFetchMode storeMetadataFetchMode = (StoreMetadataFetchMode) permutation[7]; if (!batchGet) { if (useStreamingBatchGetAsDefault || batchGetKeySize != (int) BATCH_GET_KEY_SIZE.get(0)) { // these parameters are related only to batchGet, so just allowing 1 set @@ -146,6 +149,14 @@ public Object[][] fiveBooleanANumberStoreMetadataFetchMode() { return false; } } + if (storeMetadataFetchMode != StoreMetadataFetchMode.SERVER_BASED_METADATA) { + if (retryEnabled || speculativeQueryEnabled) { + return false; + } + } + if (retryEnabled && speculativeQueryEnabled) { + return false; + } return true; }, DataProviderUtils.BOOLEAN, // dualRead @@ -153,6 +164,7 @@ public Object[][] fiveBooleanANumberStoreMetadataFetchMode() { DataProviderUtils.BOOLEAN, // batchGet DataProviderUtils.BOOLEAN, // useStreamingBatchGetAsDefault DataProviderUtils.BOOLEAN, // enableGrpc + DataProviderUtils.BOOLEAN, // retryEnabled BATCH_GET_KEY_SIZE.toArray(), // batchGetKeySize STORE_METADATA_FETCH_MODES); // storeMetadataFetchMode } @@ -203,6 +215,11 @@ public static Object[][] storeMetadataFetchModes() { return DataProviderUtils.allPermutationGenerator(STORE_METADATA_FETCH_MODES); } + @DataProvider(name = "Boolean-And-StoreMetadataFetchModes") + public static Object[][] booleanAndstoreMetadataFetchModes() { + return DataProviderUtils.allPermutationGenerator(DataProviderUtils.BOOLEAN, STORE_METADATA_FETCH_MODES); + } + @BeforeClass(alwaysRun = true) public void setUp() throws Exception { Utils.thisIsLocalhost(); @@ -500,16 +517,18 @@ protected AvroSpecificStoreClient getSpecificThinClient } protected void validateSingleGetMetrics(MetricsRepository metricsRepository, boolean retryEnabled) { - validateBatchGetMetrics(metricsRepository, false, 0, 0, retryEnabled); + validateBatchGetMetrics(metricsRepository, false, false, 0, 0, retryEnabled); } protected void validateBatchGetMetrics( MetricsRepository metricsRepository, boolean useStreamingBatchGetAsDefault, + boolean streamingBatchGetApi, int expectedBatchGetKeySizeMetricsCount, int expectedBatchGetKeySizeSuccessMetricsCount, boolean retryEnabled) { - String metricPrefix = "." + storeName + (useStreamingBatchGetAsDefault ? "--multiget_" : "--"); + String metricPrefix = "." + storeName + + (streamingBatchGetApi ? "--multiget_streaming_" : (useStreamingBatchGetAsDefault ? "--multiget_" : "--")); double keyCount = useStreamingBatchGetAsDefault ? expectedBatchGetKeySizeMetricsCount : 1; double successKeyCount = useStreamingBatchGetAsDefault ? expectedBatchGetKeySizeSuccessMetricsCount : 1; Map metrics = metricsRepository.metrics();