diff --git a/clients/venice-client/src/main/java/com/linkedin/venice/fastclient/DispatchingAvroGenericStoreClient.java b/clients/venice-client/src/main/java/com/linkedin/venice/fastclient/DispatchingAvroGenericStoreClient.java index 319c411b55..9373fea712 100644 --- a/clients/venice-client/src/main/java/com/linkedin/venice/fastclient/DispatchingAvroGenericStoreClient.java +++ b/clients/venice-client/src/main/java/com/linkedin/venice/fastclient/DispatchingAvroGenericStoreClient.java @@ -345,7 +345,8 @@ public void onCompletion(Optional exception) { if (exception.isPresent()) { streamingResponseFuture.completeExceptionally(exception.get()); } else { - streamingResponseFuture.complete(new VeniceResponseMapImpl<>(valueMap, nonExistingKeys, true)); + streamingResponseFuture + .complete(new VeniceResponseMapImpl<>(valueMap, nonExistingKeys, valueMap.size() == keys.size())); } } }); diff --git a/clients/venice-client/src/main/java/com/linkedin/venice/fastclient/RetriableAvroGenericStoreClient.java b/clients/venice-client/src/main/java/com/linkedin/venice/fastclient/RetriableAvroGenericStoreClient.java index d9a2f913d7..234a871b45 100644 --- a/clients/venice-client/src/main/java/com/linkedin/venice/fastclient/RetriableAvroGenericStoreClient.java +++ b/clients/venice-client/src/main/java/com/linkedin/venice/fastclient/RetriableAvroGenericStoreClient.java @@ -199,7 +199,8 @@ public void onCompletion(Optional exception) { if (exception.isPresent()) { streamingResponseFuture.completeExceptionally(exception.get()); } else { - streamingResponseFuture.complete(new VeniceResponseMapImpl<>(valueMap, nonExistingKeys, true)); + streamingResponseFuture + .complete(new VeniceResponseMapImpl<>(valueMap, nonExistingKeys, valueMap.size() == keys.size())); } } }); diff --git a/clients/venice-client/src/test/java/com/linkedin/venice/fastclient/DispatchingAvroGenericStoreClientTest.java b/clients/venice-client/src/test/java/com/linkedin/venice/fastclient/DispatchingAvroGenericStoreClientTest.java index 7516fa8066..de5ab9e699 100644 --- a/clients/venice-client/src/test/java/com/linkedin/venice/fastclient/DispatchingAvroGenericStoreClientTest.java +++ b/clients/venice-client/src/test/java/com/linkedin/venice/fastclient/DispatchingAvroGenericStoreClientTest.java @@ -99,7 +99,8 @@ private void setUpClient( .setR2Client(mock(Client.class)) .setUseStreamingBatchGetAsDefault(useStreamingBatchGetAsDefault) .setMetadataRefreshIntervalInSeconds(1L) - .setRoutingLeakedRequestCleanupThresholdMS(routingLeakedRequestCleanupThresholdMS); + .setRoutingLeakedRequestCleanupThresholdMS(routingLeakedRequestCleanupThresholdMS) + .setRoutingPendingRequestCounterInstanceBlockThreshold(1); clientConfigBuilder.setMetricsRepository(new MetricsRepository()); clientConfig = clientConfigBuilder.build(); @@ -239,21 +240,23 @@ private void validateRetryMetrics(boolean batchGet, String metricPrefix, boolean } private void validateSingleGetMetrics(boolean healthyRequest) { - validateMetrics(healthyRequest, false, false, false); + validateMetrics(healthyRequest, false, false, false, false); } private void validateMultiGetMetrics( boolean healthyRequest, boolean partialHealthyRequest, - boolean useStreamingBatchGetAsDefault) { - validateMetrics(healthyRequest, partialHealthyRequest, true, useStreamingBatchGetAsDefault); + boolean useStreamingBatchGetAsDefault, + boolean noAvailableReplicas) { + validateMetrics(healthyRequest, partialHealthyRequest, true, useStreamingBatchGetAsDefault, noAvailableReplicas); } private void validateMetrics( boolean healthyRequest, boolean partialHealthyRequest, boolean batchGet, - boolean useStreamingBatchGetAsDefault) { + boolean useStreamingBatchGetAsDefault, + boolean noAvailableReplicas) { metrics = getStats(clientConfig); String metricPrefix = "." + STORE_NAME + ((batchGet && useStreamingBatchGetAsDefault) ? "--multiget_" : "--"); double requestKeyCount; @@ -315,13 +318,25 @@ private void validateMetrics( assertEquals(getRequestContext.successRequestKeyCount.get(), 0); } } + + // the below counter will always fail as we never increment them assertFalse(metrics.get(metricPrefix + "no_available_replica_request_count.OccurrenceRate").value() > 0); - if (batchGet) { - if (useStreamingBatchGetAsDefault) { - assertFalse(batchGetRequestContext.noAvailableReplica); - } // else: locally created single get context will be used internally and not batchGetRequestContext + if (noAvailableReplicas) { + if (batchGet) { + if (useStreamingBatchGetAsDefault) { + assertTrue(batchGetRequestContext.noAvailableReplica); + } // else: locally created single get context will be used internally and not batchGetRequestContext + } else { + assertTrue(getRequestContext.noAvailableReplica); + } } else { - assertFalse(getRequestContext.noAvailableReplica); + if (batchGet) { + if (useStreamingBatchGetAsDefault) { + assertFalse(batchGetRequestContext.noAvailableReplica); + } // else: locally created single get context will be used internally and not batchGetRequestContext + } else { + assertFalse(getRequestContext.noAvailableReplica); + } } validateRetryMetrics(batchGet, metricPrefix, useStreamingBatchGetAsDefault); @@ -407,7 +422,7 @@ public void testBatchGet(boolean useStreamingBatchGetAsDefault) }); } metrics = getStats(clientConfig, RequestType.MULTI_GET); - validateMultiGetMetrics(true, false, useStreamingBatchGetAsDefault); + validateMultiGetMetrics(true, false, useStreamingBatchGetAsDefault, false); } finally { tearDown(); } @@ -426,7 +441,7 @@ public void testBatchGetWithExceptionFromTransportLayer(boolean useStreamingBatc } else { assertTrue(e.getMessage().endsWith("Exception for client to return 503")); } - validateMultiGetMetrics(false, false, useStreamingBatchGetAsDefault); + validateMultiGetMetrics(false, false, useStreamingBatchGetAsDefault, false); } finally { tearDown(); } @@ -450,7 +465,7 @@ public void testBatchGetWithExceptionFromTransportLayerForOneRoute(boolean useSt BATCH_GET_KEYS.stream().forEach(key -> { assertTrue(SINGLE_GET_VALUE_RESPONSE.contentEquals(value.get(key))); }); - validateMultiGetMetrics(true, false, useStreamingBatchGetAsDefault); + validateMultiGetMetrics(true, false, useStreamingBatchGetAsDefault, false); } } catch (Exception e) { if (useStreamingBatchGetAsDefault) { @@ -458,7 +473,7 @@ public void testBatchGetWithExceptionFromTransportLayerForOneRoute(boolean useSt } else { fail(); } - validateMultiGetMetrics(false, true, useStreamingBatchGetAsDefault); + validateMultiGetMetrics(false, true, useStreamingBatchGetAsDefault, false); } finally { tearDown(); } @@ -467,17 +482,35 @@ public void testBatchGetWithExceptionFromTransportLayerForOneRoute(boolean useSt @Test(dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class, timeOut = TEST_TIMEOUT) public void testBatchGetToUnreachableClient(boolean useStreamingBatchGetAsDefault) throws IOException { try { - setUpClient(useStreamingBatchGetAsDefault, false, false, false, 2 * Time.MS_PER_SECOND); + setUpClient(useStreamingBatchGetAsDefault, false, false, false, 7 * Time.MS_PER_SECOND); batchGetRequestContext = new BatchGetRequestContext<>(); statsAvroGenericStoreClient.batchGet(batchGetRequestContext, BATCH_GET_KEYS).get(); fail(); } catch (Exception e) { + // First batchGet fails with unreachable host after wait time and this add these hosts + // as blocked due to setRoutingPendingRequestCounterInstanceBlockThreshold(1) if (useStreamingBatchGetAsDefault) { assertTrue(e.getMessage().endsWith("At least one route did not complete")); } else { assertTrue(e.getMessage().endsWith("http status: 410, Request timed out")); } - validateMultiGetMetrics(false, false, useStreamingBatchGetAsDefault); + validateMultiGetMetrics(false, false, useStreamingBatchGetAsDefault, false); + + try { + // the second batchGet is not going to find any routes (as the instances + // are blocked) and fail due to that + batchGetRequestContext = new BatchGetRequestContext<>(); + statsAvroGenericStoreClient.batchGet(batchGetRequestContext, BATCH_GET_KEYS).get(); + fail(); + } catch (Exception e1) { + System.out.println("Exception is: " + e1.getMessage()); + if (useStreamingBatchGetAsDefault) { + assertTrue(e1.getMessage().endsWith("Response was not complete")); + } else { + assertTrue(e1.getMessage().matches(".*No available route for store.*")); + } + validateMultiGetMetrics(false, false, useStreamingBatchGetAsDefault, true); + } } finally { tearDown(); }