Skip to content

Commit

Permalink
add tests for unreachble clients
Browse files Browse the repository at this point in the history
  • Loading branch information
m-nagarajan committed Jul 25, 2023
1 parent acc42fe commit 01d7fe6
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,8 @@ public void onCompletion(Optional<Exception> exception) {
if (exception.isPresent()) {
streamingResponseFuture.completeExceptionally(exception.get());
} else {
streamingResponseFuture.complete(new VeniceResponseMapImpl<>(valueMap, nonExistingKeys, true));
streamingResponseFuture
.complete(new VeniceResponseMapImpl<>(valueMap, nonExistingKeys, valueMap.size() == keys.size()));
}
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,8 @@ public void onCompletion(Optional<Exception> exception) {
if (exception.isPresent()) {
streamingResponseFuture.completeExceptionally(exception.get());
} else {
streamingResponseFuture.complete(new VeniceResponseMapImpl<>(valueMap, nonExistingKeys, true));
streamingResponseFuture
.complete(new VeniceResponseMapImpl<>(valueMap, nonExistingKeys, valueMap.size() == keys.size()));
}
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ private void setUpClient(
.setR2Client(mock(Client.class))
.setUseStreamingBatchGetAsDefault(useStreamingBatchGetAsDefault)
.setMetadataRefreshIntervalInSeconds(1L)
.setRoutingLeakedRequestCleanupThresholdMS(routingLeakedRequestCleanupThresholdMS);
.setRoutingLeakedRequestCleanupThresholdMS(routingLeakedRequestCleanupThresholdMS)
.setRoutingPendingRequestCounterInstanceBlockThreshold(1);

clientConfigBuilder.setMetricsRepository(new MetricsRepository());
clientConfig = clientConfigBuilder.build();
Expand Down Expand Up @@ -239,21 +240,23 @@ private void validateRetryMetrics(boolean batchGet, String metricPrefix, boolean
}

private void validateSingleGetMetrics(boolean healthyRequest) {
validateMetrics(healthyRequest, false, false, false);
validateMetrics(healthyRequest, false, false, false, false);
}

private void validateMultiGetMetrics(
boolean healthyRequest,
boolean partialHealthyRequest,
boolean useStreamingBatchGetAsDefault) {
validateMetrics(healthyRequest, partialHealthyRequest, true, useStreamingBatchGetAsDefault);
boolean useStreamingBatchGetAsDefault,
boolean noAvailableReplicas) {
validateMetrics(healthyRequest, partialHealthyRequest, true, useStreamingBatchGetAsDefault, noAvailableReplicas);
}

private void validateMetrics(
boolean healthyRequest,
boolean partialHealthyRequest,
boolean batchGet,
boolean useStreamingBatchGetAsDefault) {
boolean useStreamingBatchGetAsDefault,
boolean noAvailableReplicas) {
metrics = getStats(clientConfig);
String metricPrefix = "." + STORE_NAME + ((batchGet && useStreamingBatchGetAsDefault) ? "--multiget_" : "--");
double requestKeyCount;
Expand Down Expand Up @@ -315,13 +318,25 @@ private void validateMetrics(
assertEquals(getRequestContext.successRequestKeyCount.get(), 0);
}
}

// the below counter will always fail as we never increment them
assertFalse(metrics.get(metricPrefix + "no_available_replica_request_count.OccurrenceRate").value() > 0);
if (batchGet) {
if (useStreamingBatchGetAsDefault) {
assertFalse(batchGetRequestContext.noAvailableReplica);
} // else: locally created single get context will be used internally and not batchGetRequestContext
if (noAvailableReplicas) {
if (batchGet) {
if (useStreamingBatchGetAsDefault) {
assertTrue(batchGetRequestContext.noAvailableReplica);
} // else: locally created single get context will be used internally and not batchGetRequestContext
} else {
assertTrue(getRequestContext.noAvailableReplica);
}
} else {
assertFalse(getRequestContext.noAvailableReplica);
if (batchGet) {
if (useStreamingBatchGetAsDefault) {
assertFalse(batchGetRequestContext.noAvailableReplica);
} // else: locally created single get context will be used internally and not batchGetRequestContext
} else {
assertFalse(getRequestContext.noAvailableReplica);
}
}

validateRetryMetrics(batchGet, metricPrefix, useStreamingBatchGetAsDefault);
Expand Down Expand Up @@ -407,7 +422,7 @@ public void testBatchGet(boolean useStreamingBatchGetAsDefault)
});
}
metrics = getStats(clientConfig, RequestType.MULTI_GET);
validateMultiGetMetrics(true, false, useStreamingBatchGetAsDefault);
validateMultiGetMetrics(true, false, useStreamingBatchGetAsDefault, false);
} finally {
tearDown();
}
Expand All @@ -426,7 +441,7 @@ public void testBatchGetWithExceptionFromTransportLayer(boolean useStreamingBatc
} else {
assertTrue(e.getMessage().endsWith("Exception for client to return 503"));
}
validateMultiGetMetrics(false, false, useStreamingBatchGetAsDefault);
validateMultiGetMetrics(false, false, useStreamingBatchGetAsDefault, false);
} finally {
tearDown();
}
Expand All @@ -450,15 +465,15 @@ public void testBatchGetWithExceptionFromTransportLayerForOneRoute(boolean useSt
BATCH_GET_KEYS.stream().forEach(key -> {
assertTrue(SINGLE_GET_VALUE_RESPONSE.contentEquals(value.get(key)));
});
validateMultiGetMetrics(true, false, useStreamingBatchGetAsDefault);
validateMultiGetMetrics(true, false, useStreamingBatchGetAsDefault, false);
}
} catch (Exception e) {
if (useStreamingBatchGetAsDefault) {
assertTrue(e.getMessage().endsWith("At least one route did not complete"));
} else {
fail();
}
validateMultiGetMetrics(false, true, useStreamingBatchGetAsDefault);
validateMultiGetMetrics(false, true, useStreamingBatchGetAsDefault, false);
} finally {
tearDown();
}
Expand All @@ -467,17 +482,35 @@ public void testBatchGetWithExceptionFromTransportLayerForOneRoute(boolean useSt
@Test(dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class, timeOut = TEST_TIMEOUT)
public void testBatchGetToUnreachableClient(boolean useStreamingBatchGetAsDefault) throws IOException {
try {
setUpClient(useStreamingBatchGetAsDefault, false, false, false, 2 * Time.MS_PER_SECOND);
setUpClient(useStreamingBatchGetAsDefault, false, false, false, 7 * Time.MS_PER_SECOND);
batchGetRequestContext = new BatchGetRequestContext<>();
statsAvroGenericStoreClient.batchGet(batchGetRequestContext, BATCH_GET_KEYS).get();
fail();
} catch (Exception e) {
// First batchGet fails with unreachable host after wait time and this add these hosts
// as blocked due to setRoutingPendingRequestCounterInstanceBlockThreshold(1)
if (useStreamingBatchGetAsDefault) {
assertTrue(e.getMessage().endsWith("At least one route did not complete"));
} else {
assertTrue(e.getMessage().endsWith("http status: 410, Request timed out"));
}
validateMultiGetMetrics(false, false, useStreamingBatchGetAsDefault);
validateMultiGetMetrics(false, false, useStreamingBatchGetAsDefault, false);

try {
// the second batchGet is not going to find any routes (as the instances
// are blocked) and fail due to that
batchGetRequestContext = new BatchGetRequestContext<>();
statsAvroGenericStoreClient.batchGet(batchGetRequestContext, BATCH_GET_KEYS).get();
fail();
} catch (Exception e1) {
System.out.println("Exception is: " + e1.getMessage());
if (useStreamingBatchGetAsDefault) {
assertTrue(e1.getMessage().endsWith("Response was not complete"));
} else {
assertTrue(e1.getMessage().matches(".*No available route for store.*"));
}
validateMultiGetMetrics(false, false, useStreamingBatchGetAsDefault, true);
}
} finally {
tearDown();
}
Expand Down

0 comments on commit 01d7fe6

Please sign in to comment.