Skip to content

Commit

Permalink
[fc][test] Honor max batch get limit and return proper response for n…
Browse files Browse the repository at this point in the history
…onExistingKeys (#648)

* streaming batchGet was not honoring max keys limit in a batch get. Returning VeniceKeyCountLimitException now.
* setting the default max number of keys in a batch get in FC to 150. It was 2 before.
* When there is a non-existing key in batch get, an exception was returned "Response was not complete": Changed it to consider non-existing keys as part of response key count.
* Added unit tests/integration tests for the above cases.
  • Loading branch information
m-nagarajan authored Sep 26, 2023
1 parent 6913113 commit 6440600
Show file tree
Hide file tree
Showing 9 changed files with 125 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import com.linkedin.venice.fastclient.stats.ClusterStats;
import com.linkedin.venice.fastclient.stats.FastClientStats;
import com.linkedin.venice.read.RequestType;
import com.linkedin.venice.router.api.VenicePathParser;
import com.linkedin.venice.systemstore.schemas.StoreMetaKey;
import com.linkedin.venice.systemstore.schemas.StoreMetaValue;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
Expand Down Expand Up @@ -51,13 +50,9 @@ public class ClientConfig<K, V, T extends SpecificRecord> {
private final long routingUnavailableRequestCounterResetDelayMS;
private final int routingPendingRequestCounterInstanceBlockThreshold;

/**
* The max allowed key count in batch-get request.
* Right now, the batch-get implementation will leverage single-get, which is inefficient, when there
* are many keys, since the requests to the same storage node won't be reused.
* But to temporarily unblock the first customer, we will only allow at most two keys in a batch-get request.
*/
// Max allowed key count in batch-get request
private final int maxAllowedKeyCntInBatchGetReq;
protected static final int MAX_ALLOWED_KEY_COUNT_IN_BATCHGET = 150;
private final DaVinciClient<StoreMetaKey, StoreMetaValue> daVinciClientForMetaStore;
private final AvroSpecificStoreClient<StoreMetaKey, StoreMetaValue> thinClientForMetaStore;
private final long metadataRefreshIntervalInSeconds;
Expand Down Expand Up @@ -387,16 +382,10 @@ public static class ClientConfigBuilder<K, V, T extends SpecificRecord> {
private long routingUnavailableRequestCounterResetDelayMS = -1;
private int routingPendingRequestCounterInstanceBlockThreshold = -1;
/**
* TODO:
* maxAllowedKeyCntInBatchGetReq was set to 2 initially for singleGet based multiGet
* for a specific customer ask. This needs to be reevaluated for streamingBatchGet().
* Today, the batch-get size for thinclient is enforced in Venice Router via
* {@link VenicePathParser#getBatchGetLimit} and it is configurable in store-level.
* In Fast-Client, it is still an open question about how to setup the batch-get limit
* or whether we need any limit at all. To start with, this can be set similar to routers
* global config and evaluate from there.
* maxAllowedKeyCntInBatchGetReq is set to {@link #MAX_ALLOWED_KEY_COUNT_IN_BATCHGET}
* for fast-client and can be overridden by client config.
*/
private int maxAllowedKeyCntInBatchGetReq = 2;
private int maxAllowedKeyCntInBatchGetReq = MAX_ALLOWED_KEY_COUNT_IN_BATCHGET;

private DaVinciClient<StoreMetaKey, StoreMetaValue> daVinciClientForMetaStore;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
import com.linkedin.venice.fastclient.transport.GrpcTransportClient;
import com.linkedin.venice.fastclient.transport.R2TransportClient;
import com.linkedin.venice.fastclient.transport.TransportClientResponseForRoute;
import com.linkedin.venice.read.RequestType;
import com.linkedin.venice.read.protocol.request.router.MultiGetRouterRequestKeyV1;
import com.linkedin.venice.read.protocol.response.MultiGetResponseRecordV1;
import com.linkedin.venice.router.exception.VeniceKeyCountLimitException;
import com.linkedin.venice.schema.avro.ReadAvroProtocolDefinition;
import com.linkedin.venice.serializer.FastSerializerDeserializerFactory;
import com.linkedin.venice.serializer.RecordDeserializer;
Expand Down Expand Up @@ -416,6 +418,16 @@ private void streamingBatchGetInternal(
BatchGetRequestContext<K, V> requestContext,
Set<K> keys,
BiConsumer<TransportClientResponseForRoute, Throwable> transportClientResponseCompletionHandler) {

int keyCnt = keys.size();
if (keyCnt > this.config.getMaxAllowedKeyCntInBatchGetReq()) {
throw new VeniceKeyCountLimitException(
getStoreName(),
RequestType.MULTI_GET,
keyCnt,
this.config.getMaxAllowedKeyCntInBatchGetReq());
}

/* Prepare each of the routes needed to query the keys */
requestContext.instanceHealthMonitor = metadata.getInstanceHealthMonitor();
String uriForBatchGetRequest = composeURIForBatchGetRequest(requestContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,8 @@ public void onCompletion(Optional<Exception> exception) {
if (exception.isPresent()) {
streamingResponseFuture.completeExceptionally(exception.get());
} else {
streamingResponseFuture
.complete(new VeniceResponseMapImpl<>(valueMap, nonExistingKeys, valueMap.size() == keys.size()));
boolean isFullResponse = (valueMap.size() + nonExistingKeys.size()) == keys.size();
streamingResponseFuture.complete(new VeniceResponseMapImpl<>(valueMap, nonExistingKeys, isFullResponse));
}
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.linkedin.venice.fastclient.stats.ClusterStats;
import com.linkedin.venice.fastclient.stats.FastClientStats;
import com.linkedin.venice.read.RequestType;
import com.linkedin.venice.router.exception.VeniceKeyCountLimitException;
import com.linkedin.venice.utils.LatencyUtils;
import com.linkedin.venice.utils.Time;
import java.util.Collections;
Expand Down Expand Up @@ -80,9 +81,11 @@ protected CompletableFuture<Map<K, V>> batchGetUsingSingleGet(Set<K> keys) throw
}
int keyCnt = keys.size();
if (keyCnt > maxAllowedKeyCntInBatchGetReq) {
throw new VeniceClientException(
"Currently, the max allowed key count in a batch-get request: " + maxAllowedKeyCntInBatchGetReq
+ ", but received: " + keyCnt);
throw new VeniceKeyCountLimitException(
getStoreName(),
RequestType.MULTI_GET,
keyCnt,
maxAllowedKeyCntInBatchGetReq);
}
CompletableFuture<Map<K, V>> resultFuture = new CompletableFuture<>();
Map<K, CompletableFuture<V>> valueFutures = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.linkedin.venice.fastclient.transport.TransportClientResponseForRoute;
import com.linkedin.venice.read.RequestType;
import com.linkedin.venice.read.protocol.response.MultiGetResponseRecordV1;
import com.linkedin.venice.router.exception.VeniceKeyCountLimitException;
import com.linkedin.venice.serializer.FastSerializerDeserializerFactory;
import com.linkedin.venice.serializer.RecordSerializer;
import com.linkedin.venice.serializer.SerializerDeserializerFactory;
Expand Down Expand Up @@ -466,6 +467,22 @@ public void testBatchGet(boolean useStreamingBatchGetAsDefault)
}
}

@Test(dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class, timeOut = TEST_TIMEOUT, expectedExceptions = VeniceKeyCountLimitException.class)
public void testBatchGetWithMoreKeysThanMaxSize(boolean useStreamingBatchGetAsDefault)
throws ExecutionException, InterruptedException, IOException {
try {
setUpClient(useStreamingBatchGetAsDefault);
batchGetRequestContext = new BatchGetRequestContext<>();
Set<String> keys = new HashSet<>();
for (int i = 0; i < ClientConfig.MAX_ALLOWED_KEY_COUNT_IN_BATCHGET + 1; ++i) {
keys.add("testKey" + i);
}
statsAvroGenericStoreClient.batchGet(batchGetRequestContext, keys).get();
} finally {
tearDown();
}
}

@Test(dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class, timeOut = TEST_TIMEOUT)
public void testBatchGetWithExceptionFromTransportLayer(boolean useStreamingBatchGetAsDefault) throws IOException {
try {
Expand Down
Loading

0 comments on commit 6440600

Please sign in to comment.