Skip to content

Commit

Permalink
[fc][test] Make retry work and add proper stats prefix for streamingB…
Browse files Browse the repository at this point in the history
…atchGet (#671)

* Retry was not working for streamingBatchGet(keys) as the proper delegation was not happening in RetriableAvroGenericStoreClient.java. This change fixes this problem.
* make streamingBatchGet use "--multiget_streaming_" rather than "--multiget_" as prefix for metrics: batchGet() using single get uses single get metrics, batchGet() using streaming implementation uses "--multiget_",
streamingBatchGet uses "--multiget_streaming_"
* return an exception if every key hits no replica found issue. Before this change, even though this case didn't increment successful key count metric and increment no replica found metric, it was still considering the request to healthy. After this change, this is considered unhealthy metric. Even after this change, if at least one of the routes find a replica and is successful, streamingBatchGet considers this a healthy request as this is a partial API.
* Add additional unit and integration tests for streamingBatchGet and retry cases.
  • Loading branch information
m-nagarajan authored Oct 2, 2023
1 parent a8af3a9 commit d884953
Show file tree
Hide file tree
Showing 11 changed files with 553 additions and 136 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -314,14 +314,10 @@ protected CompletableFuture<Map<K, V>> batchGet(BatchGetRequestContext<K, V> req
if (throwable != null) {
responseFuture.completeExceptionally(throwable);
} else if (!response.isFullResponse()) {
if (requestContext.getPartialResponseException().isPresent()) {
responseFuture.completeExceptionally(
new VeniceClientException(
"Response was not complete",
requestContext.getPartialResponseException().get()));
} else {
responseFuture.completeExceptionally(new VeniceClientException("Response was not complete"));
}
responseFuture.completeExceptionally(
new VeniceClientException(
"Response was not complete",
requestContext.getPartialResponseException().orElse(null)));
} else {
responseFuture.complete(response);
}
Expand All @@ -334,6 +330,7 @@ protected CompletableFuture<VeniceResponseMap<K, V>> streamingBatchGet(
BatchGetRequestContext<K, V> requestContext,
Set<K> keys) throws VeniceClientException {
verifyMetadataInitialized();
int keySize = keys.size();
// keys that do not exist in the storage nodes
Queue<K> nonExistingKeys = new ConcurrentLinkedQueue<>();
VeniceConcurrentHashMap<K, V> valueMap = new VeniceConcurrentHashMap<>();
Expand All @@ -358,7 +355,7 @@ public void onCompletion(Optional<Exception> exception) {
if (exception.isPresent()) {
streamingResponseFuture.completeExceptionally(exception.get());
} else {
boolean isFullResponse = ((valueMap.size() + nonExistingKeys.size()) == keys.size());
boolean isFullResponse = (valueMap.size() + nonExistingKeys.size()) == keySize;
streamingResponseFuture.complete(new VeniceResponseMapImpl<>(valueMap, nonExistingKeys, isFullResponse));
}
}
Expand Down Expand Up @@ -393,16 +390,17 @@ protected void streamingBatchGet(
* that exception will be passed to the aggregate future's next stages. */
CompletableFuture.allOf(requestContext.getAllRouteFutures().toArray(new CompletableFuture[0]))
.whenComplete((response, throwable) -> {
if (throwable == null) {
callback.onCompletion(Optional.empty());
} else {
if (throwable != null || (!keys.isEmpty() && requestContext.getAllRouteFutures().isEmpty())) {
// If there is an exception or if no partition has a healthy replica.
// The exception to send to the client might be different. Get from the requestContext
Throwable clientException = throwable;
if (requestContext.getPartialResponseException().isPresent()) {
clientException = requestContext.getPartialResponseException().get();
}
callback.onCompletion(
Optional.of(new VeniceClientException("At least one route did not complete", clientException)));
} else {
callback.onCompletion(Optional.empty());
}
});
}
Expand Down Expand Up @@ -452,10 +450,10 @@ private void streamingBatchGetInternal(
* an error */
requestContext.noAvailableReplica = true;
String errorMessage = String.format(
"No route found for partitionId: %s, store: %s, version: %s",
partitionId,
"No available route for store: %s, version: %s, partitionId: %s",
getStoreName(),
currentVersion);
currentVersion,
partitionId);
LOGGER.error(errorMessage);
requestContext.setPartialResponseException(new VeniceClientException(errorMessage));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,28 @@ protected CompletableFuture<V> get(GetRequestContext requestContext, K key) thro
*/
protected CompletableFuture<Map<K, V>> batchGet(BatchGetRequestContext<K, V> requestContext, Set<K> keys)
throws VeniceClientException {
CompletableFuture<Map<K, V>> responseFuture = new CompletableFuture<>();
CompletableFuture<VeniceResponseMap<K, V>> streamingResponseFuture = streamingBatchGet(requestContext, keys);
streamingResponseFuture.whenComplete((response, throwable) -> {
if (throwable != null) {
responseFuture.completeExceptionally(throwable);
} else if (!response.isFullResponse()) {
responseFuture.completeExceptionally(
new VeniceClientException(
"Response was not complete",
requestContext.getPartialResponseException().orElse(null)));
} else {
responseFuture.complete(response);
}
});
return responseFuture;
}

protected CompletableFuture<VeniceResponseMap<K, V>> streamingBatchGet(
BatchGetRequestContext<K, V> requestContext,
Set<K> keys) throws VeniceClientException {
// keys that do not exist in the storage nodes
int keysSize = keys.size();
Queue<K> nonExistingKeys = new ConcurrentLinkedQueue<>();
VeniceConcurrentHashMap<K, V> valueMap = new VeniceConcurrentHashMap<>();
CompletableFuture<VeniceResponseMap<K, V>> streamingResponseFuture = new VeniceResponseCompletableFuture<>(
Expand All @@ -199,29 +220,12 @@ public void onCompletion(Optional<Exception> exception) {
if (exception.isPresent()) {
streamingResponseFuture.completeExceptionally(exception.get());
} else {
boolean isFullResponse = (valueMap.size() + nonExistingKeys.size()) == keys.size();
boolean isFullResponse = (valueMap.size() + nonExistingKeys.size()) == keysSize;
streamingResponseFuture.complete(new VeniceResponseMapImpl<>(valueMap, nonExistingKeys, isFullResponse));
}
}
});
CompletableFuture<Map<K, V>> responseFuture = new CompletableFuture<>();
streamingResponseFuture.whenComplete((response, throwable) -> {
if (throwable != null) {
responseFuture.completeExceptionally(throwable);
} else if (!response.isFullResponse()) {
if (requestContext.getPartialResponseException().isPresent()) {
responseFuture.completeExceptionally(
new VeniceClientException(
"Response was not complete",
requestContext.getPartialResponseException().get()));
} else {
responseFuture.completeExceptionally(new VeniceClientException("Response was not complete"));
}
} else {
responseFuture.complete(response);
}
});
return responseFuture;
return streamingResponseFuture;
}

@Override
Expand Down Expand Up @@ -301,7 +305,7 @@ public void streamingBatchGet(
if (finalException == null) {
callback.onCompletion(Optional.empty());
} else {
callback.onCompletion(Optional.of(new VeniceClientException("Request failed with exception ", finalException)));
callback.onCompletion(Optional.of(new VeniceClientException("Request failed with exception", finalException)));
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public class StatsAvroGenericStoreClient<K, V> extends DelegatingAvroStoreClient

private final FastClientStats clientStatsForSingleGet;
private final FastClientStats clientStatsForBatchGet;
private final FastClientStats clientStatsForStreamingBatchGet;
private final ClusterStats clusterStats;

private final int maxAllowedKeyCntInBatchGetReq;
Expand All @@ -40,6 +41,7 @@ public StatsAvroGenericStoreClient(InternalAvroStoreClient<K, V> delegate, Clien
super(delegate);
this.clientStatsForSingleGet = clientConfig.getStats(RequestType.SINGLE_GET);
this.clientStatsForBatchGet = clientConfig.getStats(RequestType.MULTI_GET);
this.clientStatsForStreamingBatchGet = clientConfig.getStats(RequestType.MULTI_GET_STREAMING);
this.clusterStats = clientConfig.getClusterStats();
this.maxAllowedKeyCntInBatchGetReq = clientConfig.getMaxAllowedKeyCntInBatchGetReq();
this.useStreamingBatchGetAsDefault = clientConfig.useStreamingBatchGetAsDefault();
Expand Down Expand Up @@ -121,7 +123,7 @@ protected void streamingBatchGet(
requestContext,
keys,
new StatTrackingStreamingCallBack<>(callback, statFuture, requestContext));
recordMetrics(requestContext, keys.size(), statFuture, startTimeInNS, clientStatsForBatchGet);
recordMetrics(requestContext, keys.size(), statFuture, startTimeInNS, clientStatsForStreamingBatchGet);
}

@Override
Expand All @@ -130,7 +132,7 @@ protected CompletableFuture<VeniceResponseMap<K, V>> streamingBatchGet(
Set<K> keys) {
long startTimeInNS = System.nanoTime();
CompletableFuture<VeniceResponseMap<K, V>> streamingBatchGetFuture = super.streamingBatchGet(requestContext, keys);
recordMetrics(requestContext, keys.size(), streamingBatchGetFuture, startTimeInNS, clientStatsForBatchGet);
recordMetrics(requestContext, keys.size(), streamingBatchGetFuture, startTimeInNS, clientStatsForStreamingBatchGet);
return streamingBatchGetFuture;
}

Expand All @@ -148,6 +150,14 @@ private <R> CompletableFuture<R> recordMetrics(
return AppTimeOutTrackingCompletableFuture.track(statFuture, clientStats);
}

/**
* Metrics are incremented after one of the below cases
* 1. request is complete or
* 2. exception is thrown or
* 3. routingLeakedRequestCleanupThresholdMS is elapsed: In case of streamingBatchGet.get(timeout) returning
* partial response and this timeout happens after than and before the full response is returned,
* it will still raise a silent exception leading to the request being considered an unhealthy request.
*/
private <R> CompletableFuture<R> recordRequestMetrics(
RequestContext requestContext,
int numberOfKeys,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -709,7 +709,7 @@ private void validateMetrics(
double expectedNumberOfKeysToBeRetried,
double expectedNumberOfKeysToBeRetriedSuccessfully) {
Map<String, ? extends Metric> metrics = getStats(client.getClientConfig());
String metricPrefix = "." + client.UNIT_TEST_STORE_NAME + "--multiget_";
String metricPrefix = "." + client.UNIT_TEST_STORE_NAME + "--multiget_streaming_";
if (totalNumberOfKeys > 0) {
assertTrue(metrics.get(metricPrefix + "request.OccurrenceRate").value() > 0);
assertEquals(metrics.get(metricPrefix + "request_key_count.Max").value(), totalNumberOfKeys);
Expand Down
Loading

0 comments on commit d884953

Please sign in to comment.