Skip to content

Commit

Permalink
Fix bug in GrpcRetryer (attempt #3).
Browse files Browse the repository at this point in the history
  • Loading branch information
chronos-tachyon committed Apr 3, 2024
1 parent bc726c9 commit 5d2e9f4
Show file tree
Hide file tree
Showing 11 changed files with 201 additions and 245 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,34 +24,45 @@
import io.grpc.Deadline;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.temporal.api.workflowservice.v1.GetSystemInfoResponse;
import io.temporal.api.workflowservice.v1.GetSystemInfoResponse.Capabilities;
import io.temporal.internal.BackoffThrottler;
import io.temporal.internal.retryer.GrpcRetryer.GrpcRetryerOptions;
import io.temporal.serviceclient.RpcRetryOptions;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import javax.annotation.Nonnull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class GrpcAsyncRetryer<R> {
private static final Logger log = LoggerFactory.getLogger(GrpcRetryer.class);

static <R> CompletableFuture<R> retry(
@Nonnull ScheduledExecutorService asyncThrottlerExecutor,
@Nonnull Capabilities serverCapabilities,
@Nonnull GrpcRetryerOptions options,
@Nonnull Supplier<CompletableFuture<R>> function) {
return new GrpcAsyncRetryer<>(asyncThrottlerExecutor, serverCapabilities, options, function)
.retry();
}

private final ScheduledExecutorService executor;
private final GrpcRetryer.GrpcRetryerOptions options;
private final GetSystemInfoResponse.Capabilities serverCapabilities;
private final Capabilities serverCapabilities;
private final GrpcRetryerOptions options;
private final Supplier<CompletableFuture<R>> function;
private final BackoffThrottler throttler;
private final Deadline retriesExpirationDeadline;
private StatusRuntimeException lastMeaningfulException = null;

public GrpcAsyncRetryer(
ScheduledExecutorService asyncThrottlerExecutor,
Supplier<CompletableFuture<R>> function,
GrpcRetryer.GrpcRetryerOptions options,
GetSystemInfoResponse.Capabilities serverCapabilities) {
private GrpcAsyncRetryer(
@Nonnull ScheduledExecutorService asyncThrottlerExecutor,
@Nonnull Capabilities serverCapabilities,
@Nonnull GrpcRetryerOptions options,
@Nonnull Supplier<CompletableFuture<R>> function) {

options.validate();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

import com.google.common.base.Preconditions;
import io.grpc.Deadline;
import io.temporal.api.workflowservice.v1.GetSystemInfoResponse;
import io.temporal.api.workflowservice.v1.GetSystemInfoResponse.Capabilities;
import io.temporal.serviceclient.RpcRetryOptions;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
Expand All @@ -32,41 +32,69 @@

public final class GrpcRetryer {

private final Supplier<GetSystemInfoResponse.Capabilities> serverCapabilities;

@FunctionalInterface
public interface RetryableProc<E extends Throwable> {
void apply() throws E;

default RetryableFunc<Void, E> asFunc() {
return () -> {
RetryableProc.this.apply();
return null;
};
}
}

@FunctionalInterface
public interface RetryableFunc<R, E extends Throwable> {
R apply() throws E;
}

public GrpcRetryer(Supplier<GetSystemInfoResponse.Capabilities> serverCapabilities) {
public static <T extends Throwable> void retry(
@Nonnull Capabilities serverCapabilities,
@Nonnull GrpcRetryerOptions options,
@Nonnull RetryableProc<T> proc)
throws T {
retryWithResult(serverCapabilities, options, proc.asFunc());
}

public static <R, T extends Throwable> R retryWithResult(
@Nonnull Capabilities serverCapabilities,
@Nonnull GrpcRetryerOptions options,
@Nonnull RetryableFunc<R, T> func)
throws T {
return GrpcSyncRetryer.retry(serverCapabilities, options, func);
}

public static <R> CompletableFuture<R> retryWithResultAsync(
@Nonnull ScheduledExecutorService asyncThrottlerExecutor,
@Nonnull Capabilities serverCapabilities,
@Nonnull GrpcRetryerOptions options,
@Nonnull Supplier<CompletableFuture<R>> function) {
return GrpcAsyncRetryer.retry(asyncThrottlerExecutor, serverCapabilities, options, function);
}

private final Supplier<Capabilities> serverCapabilities;

public GrpcRetryer(@Nonnull Supplier<Capabilities> serverCapabilities) {
this.serverCapabilities = serverCapabilities;
}

public <T extends Throwable> void retry(RetryableProc<T> r, GrpcRetryerOptions options) throws T {
retryWithResult(
() -> {
r.apply();
return null;
},
options);
public <T extends Throwable> void retry(
@Nonnull RetryableProc<T> proc, @Nonnull GrpcRetryerOptions options) throws T {
retry(serverCapabilities.get(), options, proc);
}

public <R, T extends Throwable> R retryWithResult(
RetryableFunc<R, T> r, GrpcRetryerOptions options) throws T {
return new GrpcSyncRetryer().retry(r, options, serverCapabilities.get());
@Nonnull RetryableFunc<R, T> func, @Nonnull GrpcRetryerOptions options) throws T {
return retryWithResult(serverCapabilities.get(), options, func);
}

public <R> CompletableFuture<R> retryWithResultAsync(
ScheduledExecutorService asyncThrottlerExecutor,
Supplier<CompletableFuture<R>> function,
GrpcRetryerOptions options) {
return new GrpcAsyncRetryer<>(
asyncThrottlerExecutor, function, options, serverCapabilities.get())
.retry();
@Nonnull ScheduledExecutorService asyncThrottlerExecutor,
@Nonnull Supplier<CompletableFuture<R>> function,
@Nonnull GrpcRetryerOptions options) {
return retryWithResultAsync(
asyncThrottlerExecutor, serverCapabilities.get(), options, function);
}

public static class GrpcRetryerOptions {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import io.grpc.Deadline;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.temporal.api.workflowservice.v1.GetSystemInfoResponse;
import io.temporal.api.workflowservice.v1.GetSystemInfoResponse.Capabilities;
import io.temporal.serviceclient.RpcRetryOptions;
import io.temporal.serviceclient.StatusUtils;
import java.time.Duration;
Expand All @@ -48,7 +48,7 @@ class GrpcRetryerUtils {
static @Nullable RuntimeException createFinalExceptionIfNotRetryable(
@Nonnull StatusRuntimeException currentException,
@Nonnull RpcRetryOptions options,
GetSystemInfoResponse.Capabilities serverCapabilities) {
@Nonnull Capabilities serverCapabilities) {
Status.Code code = currentException.getStatus().getCode();

switch (code) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,26 @@
import io.grpc.Context;
import io.grpc.Deadline;
import io.grpc.StatusRuntimeException;
import io.temporal.api.workflowservice.v1.GetSystemInfoResponse;
import io.temporal.api.workflowservice.v1.GetSystemInfoResponse.Capabilities;
import io.temporal.internal.BackoffThrottler;
import io.temporal.internal.retryer.GrpcRetryer.GrpcRetryerOptions;
import io.temporal.internal.retryer.GrpcRetryer.RetryableFunc;
import io.temporal.serviceclient.RpcRetryOptions;
import java.util.concurrent.CancellationException;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class GrpcSyncRetryer {
private static final Logger log = LoggerFactory.getLogger(GrpcRetryer.class);

public <R, T extends Throwable> R retry(
GrpcRetryer.RetryableFunc<R, T> r,
GrpcRetryer.GrpcRetryerOptions options,
GetSystemInfoResponse.Capabilities serverCapabilities)
private GrpcSyncRetryer() {}

static <R, T extends Throwable> R retry(
@Nonnull Capabilities serverCapabilities,
@Nonnull GrpcRetryerOptions options,
@Nonnull RetryableFunc<R, T> func)
throws T {
options.validate();
RpcRetryOptions rpcOptions = options.getOptions();
Expand Down Expand Up @@ -66,7 +71,7 @@ public <R, T extends Throwable> R retry(
if (lastMeaningfulException != null) {
log.debug("Retrying after failure", lastMeaningfulException);
}
R result = r.apply();
R result = func.apply();
throttler.success();
return result;
} catch (InterruptedException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,19 @@
import io.grpc.health.v1.HealthGrpc;
import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder;
import io.grpc.stub.MetadataUtils;
import io.temporal.api.workflowservice.v1.GetSystemInfoResponse;
import io.temporal.api.workflowservice.v1.GetSystemInfoRequest;
import io.temporal.api.workflowservice.v1.GetSystemInfoResponse.Capabilities;
import io.temporal.api.workflowservice.v1.WorkflowServiceGrpc;
import io.temporal.internal.retryer.GrpcRetryer;
import io.temporal.internal.retryer.GrpcRetryer.GrpcRetryerOptions;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -87,8 +89,7 @@ final class ChannelManager {
private final Channel interceptedChannel;
private final HealthGrpc.HealthBlockingStub healthBlockingStub;

private final CompletableFuture<GetSystemInfoResponse.Capabilities> serverCapabilitiesFuture =
new CompletableFuture<>();
private final AtomicReference<Capabilities> capabilitiesAtomicReference = new AtomicReference<>();

public ChannelManager(
ServiceStubsOptions options, List<ClientInterceptor> additionalHeadInterceptors) {
Expand Down Expand Up @@ -155,9 +156,7 @@ private Channel applyHeadStandardInterceptors(Channel channel) {
headers.put(CLIENT_NAME_HEADER_KEY, CLIENT_NAME_HEADER_VALUE);

return ClientInterceptors.intercept(
channel,
MetadataUtils.newAttachHeadersInterceptor(headers),
new SystemInfoInterceptor(serverCapabilitiesFuture));
channel, MetadataUtils.newAttachHeadersInterceptor(headers));
}

private Channel applyCustomInterceptors(Channel channel) {
Expand Down Expand Up @@ -289,12 +288,14 @@ public void connect(String healthCheckServiceName, @Nullable Duration timeout) {
if (timeout == null) {
timeout = options.getRpcTimeout();
}
GrpcRetryer.GrpcRetryerOptions grpcRetryerOptions =
new GrpcRetryer.GrpcRetryerOptions(
GrpcRetryerOptions grpcRetryerOptions =
new GrpcRetryerOptions(
RpcRetryOptions.newBuilder().setExpiration(timeout).validateBuildWithDefaults(), null);

new GrpcRetryer(getServerCapabilities())
.retryWithResult(() -> this.healthCheck(healthCheckServiceName, null), grpcRetryerOptions);
GrpcRetryer.retryWithResult(
getServerCapabilities(),
grpcRetryerOptions,
() -> this.healthCheck(healthCheckServiceName, null));
}

/**
Expand Down Expand Up @@ -322,18 +323,43 @@ public HealthCheckResponse healthCheck(
return stub.check(HealthCheckRequest.newBuilder().setService(healthCheckServiceName).build());
}

public Supplier<GetSystemInfoResponse.Capabilities> getServerCapabilities() {
return () -> {
synchronized (serverCapabilitiesFuture) {
GetSystemInfoResponse.Capabilities capabilities = serverCapabilitiesFuture.getNow(null);
public Capabilities getServerCapabilities() {
Capabilities capabilities = capabilitiesAtomicReference.get();
if (capabilities == null) {
synchronized (capabilitiesAtomicReference) {
capabilities = capabilitiesAtomicReference.get();
if (capabilities == null) {
serverCapabilitiesFuture.complete(
SystemInfoInterceptor.getServerCapabilitiesOrThrow(interceptedChannel, null));
capabilities = serverCapabilitiesFuture.getNow(null);
capabilities = getServerCapabilitiesWithRetry(interceptedChannel);
capabilitiesAtomicReference.set(capabilities);
}
return capabilities;
}
};
}
return capabilities;
}

private static Capabilities getServerCapabilitiesWithRetry(Channel channel) {
final RpcRetryOptions rpcRetryOptions =
RpcRetryOptions.newBuilder().validateBuildWithDefaults();
final Deadline deadline = Deadline.after(20, TimeUnit.SECONDS);
final GrpcRetryerOptions grpcRetryerOptions = new GrpcRetryerOptions(rpcRetryOptions, deadline);
return GrpcRetryer.retryWithResult(
Capabilities.getDefaultInstance(),
grpcRetryerOptions,
() -> getServerCapabilitiesRaw(channel, deadline));
}

private static Capabilities getServerCapabilitiesRaw(Channel channel, Deadline deadline) {
try {
return WorkflowServiceGrpc.newBlockingStub(channel)
.withDeadline(deadline)
.getSystemInfo(GetSystemInfoRequest.newBuilder().build())
.getCapabilities();
} catch (StatusRuntimeException ex) {
if (Status.Code.UNIMPLEMENTED.equals(ex.getStatus().getCode())) {
return Capabilities.getDefaultInstance();
}
throw ex;
}
}

public void shutdown() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,11 @@ public void connect(@Nullable Duration timeout) {
@Override
public HealthCheckResponse healthCheck() {
// no need to pass timeout, timeout will be assigned by GrpcDeadlineInterceptor
return this.channelManager.healthCheck(HEALTH_CHECK_SERVICE_NAME, null);
return channelManager.healthCheck(HEALTH_CHECK_SERVICE_NAME, null);
}

@Override
public Supplier<GetSystemInfoResponse.Capabilities> getServerCapabilities() {
return this.channelManager.getServerCapabilities();
return channelManager::getServerCapabilities;
}
}
Loading

0 comments on commit 5d2e9f4

Please sign in to comment.