Skip to content

Commit

Permalink
[server] Reuse resource names and URI object during server read reque…
Browse files Browse the repository at this point in the history
…st processing. (linkedin#1129)

During read request processing server calls String::split and creates URI object multiple times in the hot path. This PR tries to reuse those objects as much as possible.
  • Loading branch information
majisourav99 authored Aug 21, 2024
1 parent ed851aa commit d818b57
Show file tree
Hide file tree
Showing 16 changed files with 67 additions and 87 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import io.netty.handler.codec.http.HttpChunkedInput;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
Expand All @@ -30,6 +29,7 @@
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.net.URI;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -83,7 +83,7 @@ protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest httpReque
final BlobTransferPayload blobTransferRequest;
final File snapshotDir;
try {
blobTransferRequest = parseBlobTransferPayload(httpRequest);
blobTransferRequest = parseBlobTransferPayload(URI.create(httpRequest.uri()));
snapshotDir = new File(blobTransferRequest.getSnapshotDir());
if (!snapshotDir.exists() || !snapshotDir.isDirectory()) {
byte[] errBody = ("Snapshot for " + blobTransferRequest.getFullResourceName() + " doesn't exist").getBytes();
Expand Down Expand Up @@ -192,9 +192,8 @@ private void sendFile(File file, ChannelHandlerContext ctx) throws IOException {
* @param request
* @return
*/
private BlobTransferPayload parseBlobTransferPayload(HttpRequest request) throws IllegalArgumentException {
private BlobTransferPayload parseBlobTransferPayload(URI uri) throws IllegalArgumentException {
// Parse the request uri to obtain the storeName and partition
String uri = request.uri();
String[] requestParts = RequestHelper.getRequestParts(uri);
if (requestParts.length == 4) {
// [0]""/[1]"store"/[2]"version"/[3]"partition"
Expand All @@ -204,7 +203,7 @@ private BlobTransferPayload parseBlobTransferPayload(HttpRequest request) throws
Integer.parseInt(requestParts[2]),
Integer.parseInt(requestParts[3]));
} else {
throw new IllegalArgumentException("Invalid request for fetching blob at " + uri);
throw new IllegalArgumentException("Invalid request for fetching blob at " + uri.getPath());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,11 @@ public class RequestHelper {
* @param uri
* @return String array of the request parts
*/
public static String[] getRequestParts(String uri) {
public static String[] getRequestParts(URI fullUri) {
/**
* Sometimes req.uri() gives a full uri (e.g. https://host:port/path) and sometimes it only gives a path.
* Generating a URI lets us always take just the path, but we need to add on the query string.
*/
URI fullUri = URI.create(uri);
String path = fullUri.getRawPath();
if (fullUri.getRawQuery() != null) {
path += "?" + fullUri.getRawQuery();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -340,8 +340,7 @@ private HttpResponse sendHeartbeatRequest(CloseableHttpAsyncClient client, Strin
StringBuilder sb = new StringBuilder().append("http://")
.append(serverAddress)
.append("/")
.append(QueryAction.HEALTH.toString().toLowerCase())
.append("?f=b64");
.append(QueryAction.HEALTH.toString().toLowerCase());
HttpGet getReq = new HttpGet(sb.toString());
Future<HttpResponse> future = client.execute(getReq, null);
return future.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import com.linkedin.venice.listener.response.HttpShortcutResponse;
import com.linkedin.venice.meta.QueryAction;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.request.RequestHelper;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.FullHttpRequest;
Expand Down Expand Up @@ -73,20 +74,23 @@ private void setupRequestTimeout(RouterRequest routerRequest) {
@Override
protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest req) throws Exception {
try {
QueryAction action = getQueryActionFromRequest(req);
URI uri = URI.create(req.uri());
String[] requestParts = RequestHelper.getRequestParts(uri);
QueryAction action = getQueryActionFromRequest(req, requestParts);
statsHandler.setRequestSize(req.content().readableBytes());
switch (action) {
case STORAGE: // GET /storage/store/partition/key
HttpMethod requestMethod = req.method();
if (requestMethod.equals(HttpMethod.GET)) {
// TODO: evaluate whether we can replace single-get by multi-get
GetRouterRequest getRouterRequest = GetRouterRequest.parseGetHttpRequest(req);
GetRouterRequest getRouterRequest = GetRouterRequest.parseGetHttpRequest(req, requestParts);
setupRequestTimeout(getRouterRequest);
statsHandler.setRequestInfo(getRouterRequest);
ctx.fireChannelRead(getRouterRequest);
} else if (requestMethod.equals(HttpMethod.POST)) {
// Multi-get
MultiGetRouterRequestWrapper multiGetRouterReq = MultiGetRouterRequestWrapper.parseMultiGetHttpRequest(req);
MultiGetRouterRequestWrapper multiGetRouterReq =
MultiGetRouterRequestWrapper.parseMultiGetHttpRequest(req, requestParts);
setupRequestTimeout(multiGetRouterReq);
statsHandler.setRequestInfo(multiGetRouterReq);
ctx.fireChannelRead(multiGetRouterReq);
Expand All @@ -96,7 +100,8 @@ protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest req) thro
break;
case COMPUTE: // compute request
if (req.method().equals(HttpMethod.POST)) {
ComputeRouterRequestWrapper computeRouterReq = ComputeRouterRequestWrapper.parseComputeRequest(req);
ComputeRouterRequestWrapper computeRouterReq =
ComputeRouterRequestWrapper.parseComputeRequest(req, requestParts);
setupRequestTimeout(computeRouterReq);
statsHandler.setRequestInfo(computeRouterReq);
ctx.fireChannelRead(computeRouterReq);
Expand All @@ -110,30 +115,31 @@ protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest req) thro
ctx.fireChannelRead(healthCheckRequest);
break;
case DICTIONARY:
DictionaryFetchRequest dictionaryFetchRequest = DictionaryFetchRequest.parseGetHttpRequest(req);
DictionaryFetchRequest dictionaryFetchRequest = DictionaryFetchRequest.parseGetHttpRequest(uri, requestParts);
statsHandler.setStoreName(dictionaryFetchRequest.getStoreName());
ctx.fireChannelRead(dictionaryFetchRequest);
break;
case ADMIN:
AdminRequest adminRequest = AdminRequest.parseAdminHttpRequest(req);
AdminRequest adminRequest = AdminRequest.parseAdminHttpRequest(req, uri);
statsHandler.setStoreName(adminRequest.getStoreName());
ctx.fireChannelRead(adminRequest);
break;
case METADATA:
statsHandler.setMetadataRequest(true);
MetadataFetchRequest metadataFetchRequest = MetadataFetchRequest.parseGetHttpRequest(req);
MetadataFetchRequest metadataFetchRequest =
MetadataFetchRequest.parseGetHttpRequest(uri.getPath(), requestParts);
statsHandler.setStoreName(metadataFetchRequest.getStoreName());
ctx.fireChannelRead(metadataFetchRequest);
break;
case CURRENT_VERSION:
statsHandler.setMetadataRequest(true);
CurrentVersionRequest currentVersionRequest = CurrentVersionRequest.parseGetHttpRequest(req);
CurrentVersionRequest currentVersionRequest = CurrentVersionRequest.parseGetHttpRequest(uri, requestParts);
statsHandler.setStoreName(currentVersionRequest.getStoreName());
ctx.fireChannelRead(currentVersionRequest);
break;
case TOPIC_PARTITION_INGESTION_CONTEXT:
TopicPartitionIngestionContextRequest topicPartitionIngestionContextRequest =
TopicPartitionIngestionContextRequest.parseGetHttpRequest(req);
TopicPartitionIngestionContextRequest.parseGetHttpRequest(uri.getPath(), requestParts);
statsHandler.setStoreName(
Version.parseStoreFromVersionTopic(topicPartitionIngestionContextRequest.getVersionTopic()));
ctx.fireChannelRead(topicPartitionIngestionContextRequest);
Expand Down Expand Up @@ -167,10 +173,9 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc
super.userEventTriggered(ctx, evt);
}

static QueryAction getQueryActionFromRequest(HttpRequest req) {
static QueryAction getQueryActionFromRequest(HttpRequest req, String[] requestParts) {
// Sometimes req.uri() gives a full uri (eg https://host:port/path) and sometimes it only gives a path
// Generating a URI lets us always take just the path.
String[] requestParts = URI.create(req.uri()).getPath().split("/");
HttpMethod reqMethod = req.method();
if ((!reqMethod.equals(HttpMethod.GET) && !reqMethod.equals(HttpMethod.POST)) || requestParts.length < 2) {
String actions =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@ private AdminRequest(String storeVersion, ServerAdminAction action, Integer part
this.partition = partition;
}

public static AdminRequest parseAdminHttpRequest(HttpRequest request) {
URI fullUri = URI.create(request.uri());
public static AdminRequest parseAdminHttpRequest(HttpRequest request, URI fullUri) {
String[] requestParts = fullUri.getRawPath().split("/");
// [0]""/[1]"action"/[2]"store_version"/[3]"admin_action"/[4](optional)"partition_id"
if (requestParts.length >= 4 && requestParts.length <= 5) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import com.linkedin.venice.serializer.RecordDeserializer;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpRequest;
import java.net.URI;
import java.util.List;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.OptimizedBinaryDecoderFactory;
Expand Down Expand Up @@ -41,13 +40,10 @@ private ComputeRouterRequestWrapper(
}
}

public static ComputeRouterRequestWrapper parseComputeRequest(FullHttpRequest httpRequest) {
URI fullUri = URI.create(httpRequest.uri());
String path = fullUri.getRawPath();
String[] requestParts = path.split("/");
public static ComputeRouterRequestWrapper parseComputeRequest(FullHttpRequest httpRequest, String[] requestParts) {
if (requestParts.length != 3) {
// [0]""/[1]"compute"/[2]{$resourceName}
throw new VeniceException("Invalid request: " + path);
throw new VeniceException("Invalid request: " + httpRequest.uri());
}
String resourceName = requestParts[2];

Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
package com.linkedin.venice.listener.request;

import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.request.RequestHelper;
import io.netty.handler.codec.http.HttpRequest;
import java.net.URI;


public class CurrentVersionRequest {
Expand All @@ -12,16 +11,13 @@ private CurrentVersionRequest(String storeName) {
this.storeName = storeName;
}

public static CurrentVersionRequest parseGetHttpRequest(HttpRequest request) {
String uri = request.uri();
String[] requestParts = RequestHelper.getRequestParts(uri);

public static CurrentVersionRequest parseGetHttpRequest(URI uri, String[] requestParts) {
if (requestParts.length == 3) {
// [0]""/[1]"action"/[2]"store"
String storeName = requestParts[2];
return new CurrentVersionRequest(storeName);
} else {
throw new VeniceException("not a valid request for a METADATA action: " + uri);
throw new VeniceException("not a valid request for a METADATA action: " + uri.getPath());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@

import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.request.RequestHelper;
import io.netty.handler.codec.http.HttpRequest;
import java.net.URI;


/**
Expand All @@ -19,18 +18,15 @@ private DictionaryFetchRequest(String storeName, String resourceName) {
this.resourceName = resourceName;
}

public static DictionaryFetchRequest parseGetHttpRequest(HttpRequest request) {
String uri = request.uri();
String[] requestParts = RequestHelper.getRequestParts(uri);

public static DictionaryFetchRequest parseGetHttpRequest(URI uri, String[] requestParts) {
if (requestParts.length == 4) {
// [0]""/[1]"action"/[2]"store"/[3]"version"
String storeName = requestParts[2];
int storeVersion = Integer.parseInt(requestParts[3]);
String topicName = Version.composeKafkaTopic(storeName, storeVersion);
return new DictionaryFetchRequest(storeName, topicName);
} else {
throw new VeniceException("Not a valid request for a DICTIONARY action: " + uri);
throw new VeniceException("Not a valid request for a DICTIONARY action: " + uri.getPath());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.protocols.VeniceClientRequest;
import com.linkedin.venice.read.RequestType;
import com.linkedin.venice.request.RequestHelper;
import com.linkedin.venice.utils.EncodingUtils;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpRequest;
Expand Down Expand Up @@ -52,17 +51,15 @@ public int getKeyCount() {
return 1;
}

public static GetRouterRequest parseGetHttpRequest(HttpRequest request) {
String uri = request.uri();
String[] requestParts = RequestHelper.getRequestParts(uri);
public static GetRouterRequest parseGetHttpRequest(HttpRequest request, String[] requestParts) {
if (requestParts.length == 5) {
// [0]""/[1]"action"/[2]"store"/[3]"partition"/[4]"key"
String topicName = requestParts[2];
int partition = Integer.parseInt(requestParts[3]);
byte[] keyBytes = getKeyBytesFromUrlKeyString(requestParts[4]);
return new GetRouterRequest(topicName, partition, keyBytes, request);
} else {
throw new VeniceException("Not a valid request for a STORAGE action: " + uri);
throw new VeniceException("Not a valid request for a STORAGE action: " + request.uri());
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package com.linkedin.venice.listener.request;

import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.request.RequestHelper;
import io.netty.handler.codec.http.HttpRequest;


/**
Expand All @@ -16,10 +14,7 @@ private MetadataFetchRequest(String storeName) {
this.storeName = storeName;
}

public static MetadataFetchRequest parseGetHttpRequest(HttpRequest request) {
String uri = request.uri();
String[] requestParts = RequestHelper.getRequestParts(uri);

public static MetadataFetchRequest parseGetHttpRequest(String uri, String[] requestParts) {
if (requestParts.length == 3) {
// [0]""/[1]"action"/[2]"store"
String storeName = requestParts[2];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import com.linkedin.venice.serializer.RecordDeserializer;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpRequest;
import java.net.URI;
import java.util.List;
import org.apache.avro.io.OptimizedBinaryDecoderFactory;

Expand All @@ -37,16 +36,13 @@ private MultiGetRouterRequestWrapper(
super(resourceName, keys, isRetryRequest, isStreamingRequest);
}

public static MultiGetRouterRequestWrapper parseMultiGetHttpRequest(FullHttpRequest httpRequest) {
URI fullUri = URI.create(httpRequest.uri());
String path = fullUri.getRawPath();
String[] requestParts = path.split("/");
public static MultiGetRouterRequestWrapper parseMultiGetHttpRequest(
FullHttpRequest httpRequest,
String[] requestParts) {
if (requestParts.length != 3) {
// [0]""/[1]"storage"/[2]{$resourceName}
throw new VeniceException("Invalid request: " + path);
throw new VeniceException("Invalid request: " + httpRequest.uri());
}
String resourceName = requestParts[2];

// Validate API version
String apiVersion = httpRequest.headers().get(HttpConstants.VENICE_API_VERSION);
if (apiVersion == null) {
Expand All @@ -62,7 +58,7 @@ public static MultiGetRouterRequestWrapper parseMultiGetHttpRequest(FullHttpRequ
httpRequest.content().readBytes(content);
keys = parseKeys(content);

return new MultiGetRouterRequestWrapper(resourceName, keys, httpRequest);
return new MultiGetRouterRequestWrapper(requestParts[2], keys, httpRequest);
}

public static MultiGetRouterRequestWrapper parseMultiGetGrpcRequest(VeniceClientRequest grpcRequest) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package com.linkedin.venice.listener.request;

import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.request.RequestHelper;
import io.netty.handler.codec.http.HttpRequest;


public class TopicPartitionIngestionContextRequest {
Expand All @@ -16,10 +14,7 @@ private TopicPartitionIngestionContextRequest(String versionTopic, String topic,
this.partition = partition;
}

public static TopicPartitionIngestionContextRequest parseGetHttpRequest(HttpRequest request) {
String uri = request.uri();
String[] requestParts = RequestHelper.getRequestParts(uri);

public static TopicPartitionIngestionContextRequest parseGetHttpRequest(String uri, String[] requestParts) {
if (requestParts.length == 5) {
// [0]""/[1]"action"/[2]"version topic"/[3]"topic name"/[4]"partition number"
String versionTopic = requestParts[2];
Expand Down
Loading

0 comments on commit d818b57

Please sign in to comment.