Skip to content

Commit

Permalink
Client bindings and request flows for controller APIs createStore, ge…
Browse files Browse the repository at this point in the history
…tStoresInCluster and QueryJobStatus
  • Loading branch information
Bharath Kumarasubramanian committed Oct 17, 2024
1 parent 14d05d4 commit bd647b0
Show file tree
Hide file tree
Showing 8 changed files with 657 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@
import com.linkedin.venice.exceptions.ErrorType;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.exceptions.VeniceHttpException;
import com.linkedin.venice.grpc.ControllerGrpcTransport;
import com.linkedin.venice.grpc.GrpcControllerRoute;
import com.linkedin.venice.grpc.GrpcConverters;
import com.linkedin.venice.helix.VeniceJsonSerializer;
import com.linkedin.venice.meta.VeniceUserStoreType;
import com.linkedin.venice.meta.Version;
Expand All @@ -98,6 +101,7 @@
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
Expand All @@ -121,6 +125,8 @@ public class ControllerClient implements Closeable {
private String leaderControllerUrl;
private final List<String> controllerDiscoveryUrls;

private final boolean useGrpc;

public ControllerClient(String clusterName, String discoveryUrls) {
this(clusterName, discoveryUrls, Optional.empty());
}
Expand All @@ -140,6 +146,7 @@ public ControllerClient(String clusterName, String discoveryUrls, Optional<SSLFa
if (this.controllerDiscoveryUrls.isEmpty()) {
throw new VeniceException("Controller discovery url list is empty");
}
this.useGrpc = false;
}

/**
Expand Down Expand Up @@ -1421,6 +1428,41 @@ private <T extends ControllerResponse> T request(
int timeoutMs,
int maxAttempts,
byte[] data) {
if (useGrpc) {
return fireRequestUsingGrpc(route, params, responseType, timeoutMs, maxAttempts, data);
} else {
return fireRequestUsingHttp(route, params, responseType, timeoutMs, maxAttempts, data);
}
}

private <T extends ControllerResponse> T fireRequestUsingGrpc(
ControllerRoute route,
QueryParams params,
Class<T> responseType,
int timeoutMs,
int maxAttempts,
byte[] data) {
GrpcControllerRoute grpcControllerRoute = GrpcConverters.mapControllerRouteToGrpcControllerRoute(route);
try (ControllerGrpcTransport transport = new ControllerGrpcTransport(sslFactory)) {
try {
CompletionStage<T> responseFuture =
transport.request(getLeaderControllerUrl(), params, responseType, grpcControllerRoute);
return responseFuture.toCompletableFuture().join();
} catch (Exception e) {
throw new VeniceException("Encountered error when getting response from server", e);
}
} catch (Exception e) {
throw new VeniceException("Encountered error when fetching response from grpc server", e);
}
}

private <T extends ControllerResponse> T fireRequestUsingHttp(
ControllerRoute route,
QueryParams params,
Class<T> responseType,
int timeoutMs,
int maxAttempts,
byte[] data) {
Exception lastException = null;
boolean logErrorMessage = true;
try (ControllerTransport transport = new ControllerTransport(sslFactory)) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
package com.linkedin.venice.grpc;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.linkedin.venice.client.exceptions.VeniceClientException;
import com.linkedin.venice.controllerapi.QueryParams;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.protocols.CreateStoreGrpcRequest;
import com.linkedin.venice.protocols.GetStoresInClusterGrpcRequest;
import com.linkedin.venice.protocols.QueryJobStatusGrpcRequest;
import com.linkedin.venice.protocols.VeniceControllerGrpcServiceGrpc;
import com.linkedin.venice.security.SSLFactory;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import io.grpc.ChannelCredentials;
import io.grpc.Grpc;
import io.grpc.InsecureChannelCredentials;
import io.grpc.ManagedChannel;
import io.grpc.TlsChannelCredentials;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;


public class ControllerGrpcTransport implements AutoCloseable {
private static final int PORT = 1234;
private static final String GRPC_ADDRESS_FORMAT = "%s:%s";
private final VeniceConcurrentHashMap<String, ManagedChannel> serverGrpcChannels;
private final ChannelCredentials channelCredentials;
private final VeniceConcurrentHashMap<ManagedChannel, VeniceControllerGrpcServiceGrpc.VeniceControllerGrpcServiceStub> stubCache;

public ControllerGrpcTransport(Optional<SSLFactory> sslFactory) {
this.stubCache = new VeniceConcurrentHashMap<>();
this.serverGrpcChannels = new VeniceConcurrentHashMap<>();
this.channelCredentials = buildChannelCredentials(sslFactory);
}

public <ResT> CompletionStage<ResT> request(
String serverUrl,
QueryParams params,
Class<ResT> responseType,
GrpcControllerRoute route) {

VeniceControllerGrpcServiceGrpc.VeniceControllerGrpcServiceStub stub = getOrCreateStub(serverUrl);
CompletableFuture<ResT> valueFuture = new CompletableFuture<>();

if (GrpcControllerRoute.CREATE_STORE.equals(route)) {
stub.createStore(
(CreateStoreGrpcRequest) GrpcConverters.getRequestConverter(route.getRequestType()).convert(params),
buildStreamObserver(valueFuture, responseType, route));
} else if (GrpcControllerRoute.GET_STORES_IN_CLUSTER.equals(route)) {
stub.getStoresInCluster(
(GetStoresInClusterGrpcRequest) GrpcConverters.getRequestConverter(route.getRequestType()).convert(params),
buildStreamObserver(valueFuture, responseType, route));
} else if (GrpcControllerRoute.QUERY_JOB_STATUS.equals(route)) {
stub.getJobStatus(
(QueryJobStatusGrpcRequest) GrpcConverters.getRequestConverter(route.getRequestType()).convert(params),
buildStreamObserver(valueFuture, responseType, route));
} else {
throw new VeniceException("Unknown gRPC route; Failing the request");
}

return valueFuture;
}

@VisibleForTesting
<T, ResT> ControllerGrpcObserver<T, ResT> buildStreamObserver(
CompletableFuture<ResT> future,
Class<ResT> httpResponseType,
GrpcControllerRoute route) {
return new ControllerGrpcObserver<>(future, httpResponseType, route);
}

@Override
public void close() throws IOException {
for (Map.Entry<String, ManagedChannel> entry: serverGrpcChannels.entrySet()) {
entry.getValue().shutdown();
}
}

@VisibleForTesting
ChannelCredentials buildChannelCredentials(Optional<SSLFactory> sslFactory) {
SSLFactory factory = sslFactory.orElse(null);

if (factory == null) {
return InsecureChannelCredentials.create();
}

try {
TlsChannelCredentials.Builder tlsBuilder = TlsChannelCredentials.newBuilder()
.keyManager(GrpcUtils.getKeyManagers(factory))
.trustManager(GrpcUtils.getTrustManagers(factory));
return tlsBuilder.build();
} catch (Exception e) {
throw new VeniceClientException(
"Failed to initialize SSL channel credentials for Venice gRPC Transport Client",
e);
}
}

VeniceControllerGrpcServiceGrpc.VeniceControllerGrpcServiceStub getOrCreateStub(String serverAddress) {
String grpcAddress = getGrpcAddressFromServerAddress(serverAddress);

ManagedChannel channel = serverGrpcChannels
.computeIfAbsent(serverAddress, k -> Grpc.newChannelBuilder(grpcAddress, channelCredentials).build());

return stubCache.computeIfAbsent(channel, VeniceControllerGrpcServiceGrpc::newStub);
}

@VisibleForTesting
String getGrpcAddressFromServerAddress(String serverAddress) {
String[] serverAddressParts = serverAddress.split(":");
Preconditions.checkState(serverAddressParts.length == 2, "Invalid server address");

return String.format(GRPC_ADDRESS_FORMAT, serverAddressParts[0], PORT);
}

static class ControllerGrpcObserver<ResT, HttpResT> implements StreamObserver<ResT> {
private final CompletableFuture<HttpResT> responseFuture;
private final Class<HttpResT> httpResponseType;

private final GrpcControllerRoute route;

public ControllerGrpcObserver(
CompletableFuture<HttpResT> future,
Class<HttpResT> httpResponseType,
GrpcControllerRoute route) {
this.httpResponseType = httpResponseType;
this.responseFuture = future;
this.route = route;
}

@Override
public void onNext(ResT value) {
if (!responseFuture.isDone()) {

@SuppressWarnings("Unchecked")
HttpResT result = ((GrpcToHttpResponseConverter<ResT, HttpResT>) GrpcConverters
.getResponseConverter(route.getResponseType(), httpResponseType)).convert(value);

responseFuture.complete(result);
}
}

@Override
public void onError(Throwable t) {
responseFuture.completeExceptionally(t);
}

@Override
public void onCompleted() {
// do nothing
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package com.linkedin.venice.grpc;

import com.google.protobuf.GeneratedMessageV3;
import com.linkedin.venice.protocols.CreateStoreGrpcRequest;
import com.linkedin.venice.protocols.CreateStoreGrpcResponse;
import com.linkedin.venice.protocols.GetStoresInClusterGrpcRequest;
import com.linkedin.venice.protocols.GetStoresInClusterGrpcResponse;
import com.linkedin.venice.protocols.QueryJobStatusGrpcRequest;
import com.linkedin.venice.protocols.QueryJobStatusGrpcResponse;


public enum GrpcControllerRoute {
CREATE_STORE(CreateStoreGrpcRequest.class, CreateStoreGrpcResponse.class),
GET_STORES_IN_CLUSTER(GetStoresInClusterGrpcRequest.class, GetStoresInClusterGrpcResponse.class),
QUERY_JOB_STATUS(QueryJobStatusGrpcRequest.class, QueryJobStatusGrpcResponse.class);

private final Class<? extends GeneratedMessageV3> requestType;
private final Class<? extends GeneratedMessageV3> responseType;

GrpcControllerRoute(Class<? extends GeneratedMessageV3> reqT, Class<? extends GeneratedMessageV3> resT) {
this.requestType = reqT;
this.responseType = resT;
}

public Class<? extends GeneratedMessageV3> getRequestType() {
return this.requestType;
}

public Class<? extends GeneratedMessageV3> getResponseType() {
return this.responseType;
}
}
Loading

0 comments on commit bd647b0

Please sign in to comment.