diff --git a/temporal-serviceclient/src/main/java/io/temporal/serviceclient/ChannelManager.java b/temporal-serviceclient/src/main/java/io/temporal/serviceclient/ChannelManager.java index bca4113a3..9c16d0d0e 100644 --- a/temporal-serviceclient/src/main/java/io/temporal/serviceclient/ChannelManager.java +++ b/temporal-serviceclient/src/main/java/io/temporal/serviceclient/ChannelManager.java @@ -346,7 +346,7 @@ public Supplier getServerCapabilities() { SystemInfoInterceptor.getServerCapabilitiesWithRetryOrThrow( serverCapabilitiesFuture, interceptedChannel, - deadlineFrom(options.getHealthCheckAttemptTimeout())); + deadlineFrom(options.getSystemInfoTimeout())); } private static Deadline deadlineFrom(Duration duration) { diff --git a/temporal-serviceclient/src/main/java/io/temporal/serviceclient/ServiceStubsOptions.java b/temporal-serviceclient/src/main/java/io/temporal/serviceclient/ServiceStubsOptions.java index 5ec5b9055..dd6a7497e 100644 --- a/temporal-serviceclient/src/main/java/io/temporal/serviceclient/ServiceStubsOptions.java +++ b/temporal-serviceclient/src/main/java/io/temporal/serviceclient/ServiceStubsOptions.java @@ -71,6 +71,12 @@ public class ServiceStubsOptions { */ protected final Duration healthCheckAttemptTimeout; + /** + * SystemInfoTimeout specifies how to long to wait for service response on each health check + * attempt. Default: 5s. + */ + protected final Duration systemInfoTimeout; + /** * HealthCheckTimeout defines how long client should be sending health check requests to the * server before concluding that it is unavailable. Defaults to 10s. @@ -128,6 +134,7 @@ public class ServiceStubsOptions { this.enableHttps = that.enableHttps; this.sslContext = that.sslContext; this.healthCheckAttemptTimeout = that.healthCheckAttemptTimeout; + this.systemInfoTimeout = that.systemInfoTimeout; this.healthCheckTimeout = that.healthCheckTimeout; this.enableKeepAlive = that.enableKeepAlive; this.keepAliveTime = that.keepAliveTime; @@ -150,6 +157,7 @@ public class ServiceStubsOptions { SslContext sslContext, Duration healthCheckAttemptTimeout, Duration healthCheckTimeout, + Duration systemInfoTimeout, boolean enableKeepAlive, Duration keepAliveTime, Duration keepAliveTimeout, @@ -168,6 +176,7 @@ public class ServiceStubsOptions { this.sslContext = sslContext; this.healthCheckAttemptTimeout = healthCheckAttemptTimeout; this.healthCheckTimeout = healthCheckTimeout; + this.systemInfoTimeout = systemInfoTimeout; this.enableKeepAlive = enableKeepAlive; this.keepAliveTime = keepAliveTime; this.keepAliveTimeout = keepAliveTimeout; @@ -233,6 +242,13 @@ public Duration getHealthCheckAttemptTimeout() { return healthCheckAttemptTimeout; } + /** + * @return The timeout for the RPC made by the client to fetch server capabilities. + */ + public Duration getSystemInfoTimeout() { + return systemInfoTimeout; + } + /** * @return duration of time to wait while checking server connection when creating new client */ @@ -337,6 +353,7 @@ public boolean equals(Object o) { && Objects.equals(sslContext, that.sslContext) && Objects.equals(healthCheckAttemptTimeout, that.healthCheckAttemptTimeout) && Objects.equals(healthCheckTimeout, that.healthCheckTimeout) + && Objects.equals(systemInfoTimeout, that.systemInfoTimeout) && Objects.equals(keepAliveTime, that.keepAliveTime) && Objects.equals(keepAliveTimeout, that.keepAliveTimeout) && Objects.equals(rpcTimeout, that.rpcTimeout) @@ -358,6 +375,7 @@ public int hashCode() { sslContext, healthCheckAttemptTimeout, healthCheckTimeout, + systemInfoTimeout, enableKeepAlive, keepAliveTime, keepAliveTimeout, @@ -389,6 +407,8 @@ public String toString() { + healthCheckAttemptTimeout + ", healthCheckTimeout=" + healthCheckTimeout + + ", systemInfoTimeout=" + + systemInfoTimeout + ", enableKeepAlive=" + enableKeepAlive + ", keepAliveTime=" @@ -421,6 +441,7 @@ public static class Builder> { private String target; private Consumer> channelInitializer; private Duration healthCheckAttemptTimeout; + private Duration systemInfoTimeout; private Duration healthCheckTimeout; private boolean enableKeepAlive = true; private Duration keepAliveTime = Duration.ofSeconds(30); @@ -444,6 +465,7 @@ protected Builder(ServiceStubsOptions options) { this.sslContext = options.sslContext; this.healthCheckAttemptTimeout = options.healthCheckAttemptTimeout; this.healthCheckTimeout = options.healthCheckTimeout; + this.systemInfoTimeout = options.systemInfoTimeout; this.enableKeepAlive = options.enableKeepAlive; this.keepAliveTime = options.keepAliveTime; this.keepAliveTimeout = options.keepAliveTimeout; @@ -713,6 +735,17 @@ public T setHealthCheckTimeout(Duration healthCheckTimeout) { return self(); } + /** + * Set a SystemInfoTimeout that specifies how long the client tries to fetch server + * capabilities. + * + * @return {@code this} + */ + public T setSystemInfoTimeout(Duration systemInfoTimeout) { + this.systemInfoTimeout = systemInfoTimeout; + return self(); + } + /** * Enables keep alive ping from client to the server, which can help drop abruptly closed * connections faster. @@ -796,6 +829,7 @@ public ServiceStubsOptions build() { this.sslContext, this.healthCheckAttemptTimeout, this.healthCheckTimeout, + this.systemInfoTimeout, this.enableKeepAlive, this.keepAliveTime, this.keepAliveTimeout, @@ -847,6 +881,8 @@ public ServiceStubsOptions validateAndBuildWithDefaults() { Duration healthCheckTimeout = this.healthCheckTimeout != null ? this.healthCheckTimeout : Duration.ofSeconds(10); + Duration systemInfoTimeout = + this.systemInfoTimeout != null ? this.systemInfoTimeout : Duration.ofSeconds(5); return new ServiceStubsOptions( this.channel, target, @@ -855,6 +891,7 @@ public ServiceStubsOptions validateAndBuildWithDefaults() { this.sslContext, healthCheckAttemptTimeout, healthCheckTimeout, + systemInfoTimeout, this.enableKeepAlive, this.keepAliveTime, this.keepAliveTimeout, diff --git a/temporal-serviceclient/src/test/java/io/temporal/serviceclient/ChannelManagerTest.java b/temporal-serviceclient/src/test/java/io/temporal/serviceclient/ChannelManagerTest.java index 362b1d9e9..1433fab79 100644 --- a/temporal-serviceclient/src/test/java/io/temporal/serviceclient/ChannelManagerTest.java +++ b/temporal-serviceclient/src/test/java/io/temporal/serviceclient/ChannelManagerTest.java @@ -134,14 +134,14 @@ public void setUp() throws Exception { } @After - public void tearDown() throws Exception { + public void tearDown() { if (channelManager != null) { channelManager.shutdownNow(); } } @Test - public void testGetServerCapabilities() throws Exception { + public void testGetServerCapabilities() { Capabilities capabilities = channelManager.getServerCapabilities().get(); assertEquals(CAPABILITIES, capabilities); assertEquals(1, getSystemInfoCount.get()); @@ -150,7 +150,7 @@ public void testGetServerCapabilities() throws Exception { } @Test - public void testGetServerCapabilitiesRetry() throws Exception { + public void testGetServerCapabilitiesRetry() { getSystemInfoUnavailable.set(2); Capabilities capabilities = channelManager.getServerCapabilities().get(); assertEquals(CAPABILITIES, capabilities); @@ -160,7 +160,7 @@ public void testGetServerCapabilitiesRetry() throws Exception { } @Test - public void testGetServerCapabilitiesUnavailable() throws Exception { + public void testGetServerCapabilitiesUnavailable() { getSystemInfoUnavailable.set(Integer.MAX_VALUE); try { Capabilities unused = channelManager.getServerCapabilities().get(); @@ -174,7 +174,7 @@ public void testGetServerCapabilitiesUnavailable() throws Exception { } @Test - public void testGetServerCapabilitiesUnimplemented() throws Exception { + public void testGetServerCapabilitiesUnimplemented() { getSystemInfoUnimplemented.set(1); Capabilities capabilities = channelManager.getServerCapabilities().get(); assertEquals(Capabilities.getDefaultInstance(), capabilities); @@ -184,7 +184,7 @@ public void testGetServerCapabilitiesUnimplemented() throws Exception { } @Test - public void testGetServerCapabilitiesWithConnect() throws Exception { + public void testGetServerCapabilitiesWithConnect() { channelManager.connect(HEALTH_CHECK_NAME, Duration.ofMillis(100)); Capabilities capabilities = channelManager.getServerCapabilities().get(); assertEquals(CAPABILITIES, capabilities); @@ -194,7 +194,7 @@ public void testGetServerCapabilitiesWithConnect() throws Exception { } @Test - public void testGetServerCapabilitiesRetryWithConnect() throws Exception { + public void testGetServerCapabilitiesRetryWithConnect() { getSystemInfoUnavailable.set(2); channelManager.connect(HEALTH_CHECK_NAME, Duration.ofMillis(100)); Capabilities capabilities = channelManager.getServerCapabilities().get(); @@ -205,7 +205,7 @@ public void testGetServerCapabilitiesRetryWithConnect() throws Exception { } @Test - public void testGetServerCapabilitiesUnavailableWithConnect() throws Exception { + public void testGetServerCapabilitiesUnavailableWithConnect() { getSystemInfoUnavailable.set(Integer.MAX_VALUE); try { channelManager.connect(HEALTH_CHECK_NAME, Duration.ofMillis(100)); @@ -220,7 +220,7 @@ public void testGetServerCapabilitiesUnavailableWithConnect() throws Exception { } @Test - public void testGetServerCapabilitiesUnimplementedWithConnect() throws Exception { + public void testGetServerCapabilitiesUnimplementedWithConnect() { getSystemInfoUnimplemented.set(1); channelManager.connect(HEALTH_CHECK_NAME, Duration.ofMillis(100)); Capabilities capabilities = channelManager.getServerCapabilities().get(); diff --git a/temporal-serviceclient/src/test/java/io/temporal/serviceclient/SystemInfoTimeoutTest.java b/temporal-serviceclient/src/test/java/io/temporal/serviceclient/SystemInfoTimeoutTest.java new file mode 100644 index 000000000..050aa07f8 --- /dev/null +++ b/temporal-serviceclient/src/test/java/io/temporal/serviceclient/SystemInfoTimeoutTest.java @@ -0,0 +1,175 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.serviceclient; + +import static org.junit.Assert.assertEquals; + +import io.grpc.ClientInterceptor; +import io.grpc.ManagedChannel; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; +import io.grpc.inprocess.InProcessChannelBuilder; +import io.grpc.inprocess.InProcessServerBuilder; +import io.grpc.stub.StreamObserver; +import io.grpc.testing.GrpcCleanupRule; +import io.temporal.api.workflowservice.v1.GetSystemInfoRequest; +import io.temporal.api.workflowservice.v1.GetSystemInfoResponse; +import io.temporal.api.workflowservice.v1.WorkflowServiceGrpc; +import java.time.Duration; +import java.util.*; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.*; + +public class SystemInfoTimeoutTest { + + private static final GetSystemInfoResponse.Capabilities CAPABILITIES = + GetSystemInfoResponse.Capabilities.newBuilder().setInternalErrorDifferentiation(true).build(); + + private static final GetSystemInfoResponse GET_SYSTEM_INFO_RESPONSE = + GetSystemInfoResponse.newBuilder().setCapabilities(CAPABILITIES).build(); + + private static final RpcRetryOptions RPC_RETRY_OPTIONS = + RpcRetryOptions.newBuilder() + .setInitialInterval(Duration.ofMillis(10)) + .setBackoffCoefficient(1.0) + .setMaximumAttempts(3) + .setExpiration(Duration.ofMillis(100)) + .validateBuildWithDefaults(); + + @Rule public final GrpcCleanupRule grpcCleanupRule = new GrpcCleanupRule(); + private final AtomicInteger getSystemInfoCount = new AtomicInteger(0); + private final AbstractQueue getSystemInfoTimeout = new ArrayBlockingQueue(10); + + private final WorkflowServiceGrpc.WorkflowServiceImplBase workflowImpl = + new WorkflowServiceGrpc.WorkflowServiceImplBase() { + @Override + public void getSystemInfo( + GetSystemInfoRequest request, StreamObserver responseObserver) { + Duration timeout = getSystemInfoTimeout.poll(); + if (timeout != null) { + try { + Thread.sleep(timeout.toMillis()); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + getSystemInfoCount.getAndIncrement(); + responseObserver.onNext(GET_SYSTEM_INFO_RESPONSE); + responseObserver.onCompleted(); + } + }; + + private ManagedChannel managedChannel; + + @Before + public void setUp() throws Exception { + getSystemInfoCount.set(0); + String serverName = InProcessServerBuilder.generateName(); + grpcCleanupRule.register( + InProcessServerBuilder.forName(serverName) + .directExecutor() + .addService(workflowImpl) + .build() + .start()); + managedChannel = + grpcCleanupRule.register( + InProcessChannelBuilder.forName(serverName).directExecutor().build()); + } + + @Test + public void testGetServerCapabilitiesTimeoutExceeded() { + WorkflowServiceStubsOptions serviceStubsOptions = + WorkflowServiceStubsOptions.newBuilder() + .setChannel(managedChannel) + .setRpcRetryOptions(RPC_RETRY_OPTIONS) + .setSystemInfoTimeout(Duration.ofSeconds(1)) + .validateAndBuildWithDefaults(); + + ClientInterceptor deadlineInterceptor = + new GrpcDeadlineInterceptor( + serviceStubsOptions.getRpcTimeout(), + serviceStubsOptions.getRpcLongPollTimeout(), + serviceStubsOptions.getRpcQueryTimeout()); + + ChannelManager channelManager = + new ChannelManager(serviceStubsOptions, Collections.singletonList(deadlineInterceptor)); + + getSystemInfoTimeout.add(Duration.ofSeconds(2)); + + StatusRuntimeException sre = + Assert.assertThrows( + StatusRuntimeException.class, () -> channelManager.getServerCapabilities().get()); + assertEquals(Status.Code.DEADLINE_EXCEEDED, sre.getStatus().getCode()); + } + + @Test + public void testGetServerCapabilitiesRetry() { + WorkflowServiceStubsOptions serviceStubsOptions = + WorkflowServiceStubsOptions.newBuilder() + .setChannel(managedChannel) + .setRpcRetryOptions(RPC_RETRY_OPTIONS) + .setRpcTimeout(Duration.ofMillis(500)) + .setSystemInfoTimeout(Duration.ofSeconds(5)) + .validateAndBuildWithDefaults(); + + ClientInterceptor deadlineInterceptor = + new GrpcDeadlineInterceptor( + serviceStubsOptions.getRpcTimeout(), + serviceStubsOptions.getRpcLongPollTimeout(), + serviceStubsOptions.getRpcQueryTimeout()); + + ChannelManager channelManager = + new ChannelManager(serviceStubsOptions, Collections.singletonList(deadlineInterceptor)); + + getSystemInfoTimeout.add(Duration.ofSeconds(1)); + getSystemInfoTimeout.add(Duration.ofSeconds(1)); + + GetSystemInfoResponse.Capabilities capabilities = channelManager.getServerCapabilities().get(); + assertEquals(CAPABILITIES, capabilities); + assertEquals(3, getSystemInfoCount.get()); + } + + @Test + public void testGetServerCapabilitiesTimeout() { + WorkflowServiceStubsOptions serviceStubsOptions = + WorkflowServiceStubsOptions.newBuilder() + .setChannel(managedChannel) + .setRpcRetryOptions(RPC_RETRY_OPTIONS) + .setSystemInfoTimeout(Duration.ofSeconds(10)) + .validateAndBuildWithDefaults(); + + ClientInterceptor deadlineInterceptor = + new GrpcDeadlineInterceptor( + serviceStubsOptions.getRpcTimeout(), + serviceStubsOptions.getRpcLongPollTimeout(), + serviceStubsOptions.getRpcQueryTimeout()); + + ChannelManager channelManager = + new ChannelManager(serviceStubsOptions, Collections.singletonList(deadlineInterceptor)); + + getSystemInfoTimeout.add(Duration.ofSeconds(6)); + + GetSystemInfoResponse.Capabilities capabilities = channelManager.getServerCapabilities().get(); + assertEquals(CAPABILITIES, capabilities); + assertEquals(1, getSystemInfoCount.get()); + } +}