diff --git a/README.md b/README.md index 75f21ccc..f51cff8c 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# th2 common library (Java) (5.6.0) +# th2 common library (Java) (5.7.0) ## Usage @@ -192,12 +192,22 @@ The `CommonFactory` reads a gRPC router configuration from the `grpc_router.json * maxMessageSize - this option enables endpoint message filtering based on message size (message with size larger than option value will be skipped). By default, it has a value of `4 MB`. The unit of measurement of the value is number of bytes. +* retryConfiguration - this settings aria is responsible for how a component executes gRPC retries before gives up with exception. + Component executes request attempts with growing timeout between them until success or attempts over + * maxAttempts - number of attempts before give up + * minMethodRetriesTimeout - minimal timeout between retry in milliseconds + * maxMethodRetriesTimeout - maximum timeout between retry in milliseconds ```json { "enableSizeMeasuring": false, "keepAliveInterval": 60, - "maxMessageSize": 4194304 + "maxMessageSize": 4194304, + "retryConfiguration": { + "maxAttempts": 60, + "minMethodRetriesTimeout": 100, + "maxMethodRetriesTimeout": 120000 + } } ``` @@ -491,6 +501,15 @@ dependencies { ## Release notes +### 5.7.0-dev + +#### Fix: ++ gRPC `retryConfiguration` has been moved from grpc.json to grpc_router.json ++ the whole default gRPC retry interval is about 1 minute + +#### Updated: ++ grpc-service-generator: `3.5.0` + ### 5.6.0-dev #### Added: diff --git a/build.gradle b/build.gradle index 80299138..035a84b6 100644 --- a/build.gradle +++ b/build.gradle @@ -23,7 +23,6 @@ plugins { id 'maven-publish' id "io.github.gradle-nexus.publish-plugin" version "1.0.0" id 'signing' - id 'com.google.protobuf' version '0.8.8' apply false id 'org.jetbrains.kotlin.jvm' version "${kotlin_version}" id 'org.jetbrains.kotlin.kapt' version "${kotlin_version}" id "org.owasp.dependencycheck" version "8.3.1" @@ -31,14 +30,21 @@ plugins { id "com.gorylenko.gradle-git-properties" version "2.4.1" id 'com.github.jk1.dependency-license-report' version '2.5' id "de.undercouch.download" version "5.4.0" + id "com.google.protobuf" version "0.9.4" } group = 'com.exactpro.th2' version = release_version ext { + grpcVersion = '1.56.0' + protobufVersion = '3.23.2' // The protoc:3.23.3 https://github.com/protocolbuffers/protobuf/issues/13070 + serviceGeneratorVersion = '3.5.0' + cradleVersion = '5.1.1-dev' junitVersion = '5.10.0' + + genBaseDir = file("${buildDir}/generated/source/proto") } repositories { @@ -93,7 +99,7 @@ tasks.withType(Sign).configureEach { } // disable running task 'initializeSonatypeStagingRepository' on a gitlab tasks.configureEach { task -> - if (task.name.equals('initializeSonatypeStagingRepository') && + if (task.name == 'initializeSonatypeStagingRepository' && !(project.hasProperty('sonatypeUsername') && project.hasProperty('sonatypePassword')) ) { task.enabled = false @@ -189,7 +195,7 @@ dependencies { jmh 'org.openjdk.jmh:jmh-generator-annprocess:0.9' implementation 'com.google.protobuf:protobuf-java-util' - implementation 'com.exactpro.th2:grpc-service-generator:3.4.0' + implementation "com.exactpro.th2:grpc-service-generator:${serviceGeneratorVersion}" implementation "com.exactpro.th2:cradle-cassandra:${cradleVersion}" def autoValueVersion = '1.10.1' @@ -253,6 +259,7 @@ dependencies { implementation 'io.github.microutils:kotlin-logging:3.0.0' // The last version bases on kotlin 1.6.0 + testImplementation 'javax.annotation:javax.annotation-api:1.3.2' testImplementation "org.junit.jupiter:junit-jupiter:${junitVersion}" testImplementation 'org.mockito.kotlin:mockito-kotlin:4.0.0' testImplementation 'org.jetbrains.kotlin:kotlin-test-junit5' @@ -282,8 +289,37 @@ jar { sourceSets { main.kotlin.srcDirs += "src/main/kotlin" + test.resources.srcDirs += "$genBaseDir/test/services/java/resources" +} + +protobuf { + protoc { + artifact = "com.google.protobuf:protoc:${protobufVersion}" + } + plugins { + grpc { + artifact = "io.grpc:protoc-gen-grpc-java:${grpcVersion}" + } + services { + artifact = "com.exactpro.th2:grpc-service-generator:${serviceGeneratorVersion}:all@jar" + } + } + generateProtoTasks { + all()*.plugins { + grpc {} + services { + option 'javaInterfacesPath=./java/src' + option 'javaInterfacesImplPath=./java/src' + option 'javaMetaInfPath=./java/resources' + } + } + ofSourceSet('test') + } } +compileTestJava.dependsOn.add('generateTestProto') +processTestResources.dependsOn.add('generateTestProto') + tasks.withType(KotlinCompile).configureEach { kotlinOptions.jvmTarget = "11" kotlinOptions.freeCompilerArgs += "-Xjvm-default=all" diff --git a/gradle.properties b/gradle.properties index 5962a2f7..1b8f5934 100644 --- a/gradle.properties +++ b/gradle.properties @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # -release_version=5.6.0 +release_version=5.7.0 description='th2 common library (Java)' vcs_url=https://github.com/th2-net/th2-common-j kapt.include.compile.classpath=false diff --git a/src/main/java/com/exactpro/th2/common/schema/grpc/router/impl/DefaultGrpcRouter.java b/src/main/java/com/exactpro/th2/common/schema/grpc/router/impl/DefaultGrpcRouter.java index 2e7df256..628e9677 100644 --- a/src/main/java/com/exactpro/th2/common/schema/grpc/router/impl/DefaultGrpcRouter.java +++ b/src/main/java/com/exactpro/th2/common/schema/grpc/router/impl/DefaultGrpcRouter.java @@ -83,7 +83,7 @@ public T getService(@NotNull Class cls) { try { return th2ImplClass.getConstructor(RetryPolicy.class, StubStorage.class) .newInstance( - configuration.getRetryConfiguration(), + routerConfiguration.getRetryConfiguration(), stubsStorages.computeIfAbsent(cls, key -> new DefaultStubStorage<>(getServiceConfig(key), createGetMetric(GRPC_INVOKE_CALL_TOTAL, GRPC_INVOKE_CALL_MAP), diff --git a/src/main/kotlin/com/exactpro/th2/common/schema/grpc/configuration/GrpcConfiguration.kt b/src/main/kotlin/com/exactpro/th2/common/schema/grpc/configuration/GrpcConfiguration.kt index 958edc75..9ea216ae 100644 --- a/src/main/kotlin/com/exactpro/th2/common/schema/grpc/configuration/GrpcConfiguration.kt +++ b/src/main/kotlin/com/exactpro/th2/common/schema/grpc/configuration/GrpcConfiguration.kt @@ -1,5 +1,5 @@ /* - * Copyright 2020-2022 Exactpro (Exactpro Systems Limited) + * Copyright 2020-2023 Exactpro (Exactpro Systems Limited) * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -18,7 +18,6 @@ package com.exactpro.th2.common.schema.grpc.configuration import com.exactpro.th2.common.schema.configuration.Configuration import com.exactpro.th2.common.schema.message.configuration.FieldFilterConfiguration import com.exactpro.th2.common.schema.strategy.route.RoutingStrategy -import com.exactpro.th2.service.RetryPolicy import com.fasterxml.jackson.annotation.JsonProperty import io.grpc.internal.GrpcUtil @@ -26,7 +25,6 @@ data class GrpcConfiguration( @JsonProperty var services: Map = emptyMap(), @JsonProperty(value = "server") var serverConfiguration: GrpcServerConfiguration = GrpcServerConfiguration(), @JsonProperty var maxMessageSize: Int = GrpcUtil.DEFAULT_MAX_MESSAGE_SIZE, - @JsonProperty var retryConfiguration: GrpcRetryConfiguration = GrpcRetryConfiguration(), ) : Configuration() data class GrpcServiceConfiguration( @@ -46,20 +44,6 @@ data class GrpcEndpointConfiguration( var attributes: List = emptyList(), ) : Configuration() -data class GrpcRetryConfiguration( - private var maxAttempts: Int = 5, - var minMethodRetriesTimeout: Long = 100, - var maxMethodRetriesTimeout: Long = 2000 -) : Configuration(), RetryPolicy { - override fun getDelay(index: Int): Long = - (minMethodRetriesTimeout + if (maxAttempts > 1) (maxMethodRetriesTimeout - minMethodRetriesTimeout) / (maxAttempts - 1) * index else 0) - - override fun getMaxAttempts(): Int = maxAttempts - fun setMaxAttempts(maxAttempts: Int) { - this.maxAttempts = maxAttempts - } -} - data class GrpcServerConfiguration( var host: String? = "localhost", @JsonProperty(required = true) var port: Int = 8080, diff --git a/src/main/kotlin/com/exactpro/th2/common/schema/grpc/configuration/GrpcRetryConfiguration.kt b/src/main/kotlin/com/exactpro/th2/common/schema/grpc/configuration/GrpcRetryConfiguration.kt new file mode 100644 index 00000000..49bd1349 --- /dev/null +++ b/src/main/kotlin/com/exactpro/th2/common/schema/grpc/configuration/GrpcRetryConfiguration.kt @@ -0,0 +1,57 @@ +/* + * Copyright 2020-2023 Exactpro (Exactpro Systems Limited) + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.exactpro.th2.common.schema.grpc.configuration + +import com.exactpro.th2.common.schema.configuration.Configuration +import com.exactpro.th2.service.RetryPolicy + +data class GrpcRetryConfiguration( + private var maxAttempts: Int = 15, + var minMethodRetriesTimeout: Long = 100, + var maxMethodRetriesTimeout: Long = 7000, + var retryInterruptedTransaction: Boolean = false, +) : Configuration(), RetryPolicy { + + init { + require(maxAttempts >= 0) { + "'max attempts' of ${javaClass.simpleName} class must be 0 or positive" + } + require(minMethodRetriesTimeout >= 0) { + "'min method retries timeout' of ${javaClass.simpleName} class must be 0 or positive" + } + require(maxMethodRetriesTimeout >= 0) { + "'max method retries timeout' of ${javaClass.simpleName} class must be 0 or positive" + } + require(maxMethodRetriesTimeout >= minMethodRetriesTimeout) { + "'max method retries timeout' of ${javaClass.simpleName} class must be greater of equal 'min method retries timeout'" + } + } + + override fun getDelay(index: Int): Long { + val attempt = if (index > 0) { + if (index > maxAttempts) maxAttempts else index + } else { 0 } + var increment = 0L + if (maxAttempts > 1) { + increment = (maxMethodRetriesTimeout - minMethodRetriesTimeout) / (maxAttempts - 1) * attempt + } + + return minMethodRetriesTimeout + increment + } + + override fun getMaxAttempts(): Int = maxAttempts + + override fun retryInterruptedTransaction(): Boolean = retryInterruptedTransaction +} \ No newline at end of file diff --git a/src/main/kotlin/com/exactpro/th2/common/schema/grpc/configuration/GrpcRouterConfiguration.kt b/src/main/kotlin/com/exactpro/th2/common/schema/grpc/configuration/GrpcRouterConfiguration.kt index 51554c3b..8e1e2bd4 100644 --- a/src/main/kotlin/com/exactpro/th2/common/schema/grpc/configuration/GrpcRouterConfiguration.kt +++ b/src/main/kotlin/com/exactpro/th2/common/schema/grpc/configuration/GrpcRouterConfiguration.kt @@ -22,4 +22,5 @@ data class GrpcRouterConfiguration( var enableSizeMeasuring: Boolean = false, var keepAliveInterval: Long = 60L, var maxMessageSize: Int = GrpcUtil.DEFAULT_MAX_MESSAGE_SIZE, + var retryConfiguration: GrpcRetryConfiguration = GrpcRetryConfiguration(), ) : Configuration() \ No newline at end of file diff --git a/src/test/kotlin/com/exactpro/th2/common/schema/TestJsonConfiguration.kt b/src/test/kotlin/com/exactpro/th2/common/schema/TestJsonConfiguration.kt index cfa9efb5..97b4da2d 100644 --- a/src/test/kotlin/com/exactpro/th2/common/schema/TestJsonConfiguration.kt +++ b/src/test/kotlin/com/exactpro/th2/common/schema/TestJsonConfiguration.kt @@ -38,6 +38,11 @@ class TestJsonConfiguration { testDeserialize(GRPC_CONF_JSON, GRPC_CONF) } + @Test + fun `test grpc router json configuration deserialize`() { + testDeserialize(GRPC_ROUTER_CONF_JSON, GRPC_ROUTER_CONF) + } + @Test fun `test grpc json configuration serialize and deserialize`() { testSerializeAndDeserialize(GRPC_CONF) @@ -143,6 +148,18 @@ class TestJsonConfiguration { GrpcServerConfiguration("host123", 1234, 58), ) + private val GRPC_ROUTER_CONF_JSON = loadConfJson("grpc_router") + private val GRPC_ROUTER_CONF = GrpcRouterConfiguration( + true, + 61, + 4194305, + GrpcRetryConfiguration( + 61, + 101, + 120001 + ) + ) + private val RABBITMQ_CONF_JSON = loadConfJson("rabbitMq") private val RABBITMQ_CONF = RabbitMQConfiguration( "host", diff --git a/src/test/kotlin/com/exactpro/th2/common/schema/grpc/configuration/GrpcRetryConfigurationTest.kt b/src/test/kotlin/com/exactpro/th2/common/schema/grpc/configuration/GrpcRetryConfigurationTest.kt new file mode 100644 index 00000000..ffce0e18 --- /dev/null +++ b/src/test/kotlin/com/exactpro/th2/common/schema/grpc/configuration/GrpcRetryConfigurationTest.kt @@ -0,0 +1,80 @@ +/* + * Copyright 2023 Exactpro (Exactpro Systems Limited) + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.exactpro.th2.common.schema.grpc.configuration + +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertThrows +import org.junit.jupiter.api.Assertions.assertTrue +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.CsvSource + +class GrpcRetryConfigurationTest { + + @ParameterizedTest + @CsvSource( + "0,0,0", + "0,0,100", + "0,100,100", + "0,100,2000", + "1,0,0", + "1,0,100", + "1,100,100", + "1,100,2000", + "100,0,0", + "100,0,100", + "100,100,100", + "100,100,2000", + ) + fun `get delay test`(maxAttempts: String, minTimeout: String, maxTimeout: String) { + val retryPolicy = GrpcRetryConfiguration(maxAttempts.toInt(), minTimeout.toLong(), maxTimeout.toLong()) + + for (attempt in -1..retryPolicy.getMaxAttempts() + 1) { + val delay = retryPolicy.getDelay(attempt) + when { + attempt <= 0 -> { + assertEquals(retryPolicy.minMethodRetriesTimeout, delay) { + "Check minimum equality, delay: $delay, attempt: $attempt" + } + } + attempt >= retryPolicy.maxMethodRetriesTimeout -> { + assertEquals(retryPolicy.maxMethodRetriesTimeout, delay) { + "Check maximum equality, delay: $delay, attempt: $attempt" + } + } + else -> { + assertTrue(delay >= retryPolicy.minMethodRetriesTimeout) { + "Check minimum limit, delay: $delay, attempt: $attempt" + } + assertTrue(delay <= retryPolicy.maxMethodRetriesTimeout) { + "Check maximum limit, delay: $delay, attempt: $attempt" + } + } + } + } + } + + @ParameterizedTest + @CsvSource( + "-1,0,0", + "0,-1,0", + "0,0,-1", + "0,1,0", + ) + fun `negative test`(maxAttempts: String, minTimeout: String, maxTimeout: String) { + assertThrows(IllegalArgumentException::class.java) { + GrpcRetryConfiguration(maxAttempts.toInt(), minTimeout.toLong(), maxTimeout.toLong()) + } + } +} \ No newline at end of file diff --git a/src/test/kotlin/com/exactpro/th2/common/schema/grpc/router/impl/DefaultGrpcRouterTest.kt b/src/test/kotlin/com/exactpro/th2/common/schema/grpc/router/impl/DefaultGrpcRouterTest.kt new file mode 100644 index 00000000..9f4c8eeb --- /dev/null +++ b/src/test/kotlin/com/exactpro/th2/common/schema/grpc/router/impl/DefaultGrpcRouterTest.kt @@ -0,0 +1,791 @@ +/* + * Copyright 2023 Exactpro (Exactpro Systems Limited) + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.exactpro.th2.common.schema.grpc.router.impl + +import com.exactpro.th2.common.annotations.IntegrationTest +import com.exactpro.th2.common.schema.grpc.configuration.GrpcConfiguration +import com.exactpro.th2.common.schema.grpc.configuration.GrpcEndpointConfiguration +import com.exactpro.th2.common.schema.grpc.configuration.GrpcRawRobinStrategy +import com.exactpro.th2.common.schema.grpc.configuration.GrpcRetryConfiguration +import com.exactpro.th2.common.schema.grpc.configuration.GrpcRouterConfiguration +import com.exactpro.th2.common.schema.grpc.configuration.GrpcServerConfiguration +import com.exactpro.th2.common.schema.grpc.configuration.GrpcServiceConfiguration +import com.exactpro.th2.common.schema.strategy.route.impl.RobinRoutingStrategy +import com.exactpro.th2.common.test.grpc.AsyncTestService +import com.exactpro.th2.common.test.grpc.Request +import com.exactpro.th2.common.test.grpc.Response +import com.exactpro.th2.common.test.grpc.TestGrpc.TestImplBase +import com.exactpro.th2.common.test.grpc.TestService +import com.exactpro.th2.service.AbstractGrpcService.STATUS_DESCRIPTION_OF_INTERRUPTED_REQUEST +import io.grpc.Context +import io.grpc.Deadline +import io.grpc.Server +import io.grpc.StatusRuntimeException +import io.grpc.stub.StreamObserver +import mu.KotlinLogging +import org.junit.jupiter.api.AfterEach +import org.junit.jupiter.api.Assertions.assertNull +import org.junit.jupiter.api.Nested +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertThrows +import org.mockito.kotlin.argumentCaptor +import org.mockito.kotlin.mock +import org.mockito.kotlin.timeout +import org.mockito.kotlin.verify +import org.mockito.kotlin.verifyNoMoreInteractions +import org.testcontainers.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder +import java.time.Duration +import java.time.Instant +import java.util.concurrent.ArrayBlockingQueue +import java.util.concurrent.ExecutionException +import java.util.concurrent.ExecutorService +import java.util.concurrent.Executors +import java.util.concurrent.ScheduledExecutorService +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.atomic.AtomicReference +import kotlin.test.assertEquals +import kotlin.test.assertFalse +import kotlin.test.assertNotNull +import kotlin.test.assertTrue + +@IntegrationTest +internal class DefaultGrpcRouterTest { + @IntegrationTest + abstract inner class AbstractGrpcRouterTest { + private val grpcRouterClient = DefaultGrpcRouter() + private val grpcRouterServer = DefaultGrpcRouter() + protected val executor: ExecutorService = Executors.newSingleThreadExecutor( + ThreadFactoryBuilder().setNameFormat("test-%d").build() + ) + protected val deadlineExecutor: ScheduledExecutorService = Executors.newSingleThreadScheduledExecutor( + ThreadFactoryBuilder().setNameFormat("test-deadline-%d").build() + ) + + @AfterEach + fun afterEach() { + grpcRouterServer.close() + grpcRouterClient.close() + executor.shutdownGracefully() + deadlineExecutor.shutdownGracefully() + } + abstract fun general() + abstract fun `delayed server start`() + abstract fun `cancel retry request`() + abstract fun `deadline retry request`() + abstract fun `interrupt thread during retry request`() + abstract fun `server terminated intermediate session (retry false)`() + abstract fun `server terminated intermediate session (retry true)`() + + protected fun createClientSync( + configuration: GrpcServiceConfiguration = GrpcServiceConfiguration( + RobinRoutingStrategy().apply { init(GrpcRawRobinStrategy(listOf("endpoint"))) }, + TestService::class.java, + mapOf("endpoint" to GrpcEndpointConfiguration("localhost", SERVER_PORT)) + ), + retryInterruptedTransaction: Boolean = false + ): TestService { + grpcRouterClient.init( + GrpcConfiguration(services = mapOf("test" to configuration)), + GrpcRouterConfiguration( + retryConfiguration = GrpcRetryConfiguration( + Int.MAX_VALUE, + RETRY_TIMEOUT, + RETRY_TIMEOUT, + retryInterruptedTransaction + ) + ) + ) + return grpcRouterClient.getService(TestService::class.java) + } + + protected fun createClientAsync( + configuration: GrpcServiceConfiguration = GrpcServiceConfiguration( + RobinRoutingStrategy().apply { init(GrpcRawRobinStrategy(listOf("endpoint"))) }, + TestService::class.java, + mapOf("endpoint" to GrpcEndpointConfiguration("localhost", SERVER_PORT)) + ), + retryInterruptedTransaction: Boolean = false + ): AsyncTestService { + grpcRouterClient.init( + GrpcConfiguration(services = mapOf("test" to configuration)), + GrpcRouterConfiguration( + retryConfiguration = GrpcRetryConfiguration( + Int.MAX_VALUE, + RETRY_TIMEOUT, + RETRY_TIMEOUT, + retryInterruptedTransaction + ) + ) + ) + return grpcRouterClient.getService(AsyncTestService::class.java) + } + + protected fun createServer( + configuration: GrpcServerConfiguration = GrpcServerConfiguration( + null, + SERVER_PORT + ), + grpcRouter: DefaultGrpcRouter = grpcRouterServer, + completeResponse: Boolean = true, + handlerBaton: Baton? = null, + ): Server { + grpcRouter.init(GrpcConfiguration(serverConfiguration = configuration), GrpcRouterConfiguration()) + return grpcRouter.startServer(TestServiceHandler(completeResponse, handlerBaton)).apply(Server::start) + } + } + + @Nested + inner class SingleRequestSingleResponseSyncTest : AbstractGrpcRouterTest() { + @Test + override fun general() { + createServer().execAndClose(true) { + val response = executor.submit { + return@submit createClientSync().singleRequestSingleResponse(Request.newBuilder().setSeq(1).build()) + }.get(1, TimeUnit.MINUTES) + + assertEquals(1, response.seq) + assertEquals(1, response.origSeq) + } + } + @Test + override fun `delayed server start`() { + val clientServerBaton = Baton("client-server") + val future = executor.submit { + clientServerBaton.give("client thread started") + return@submit createClientSync().singleRequestSingleResponse(Request.newBuilder().setSeq(1).build()) + } + + clientServerBaton.get("wait client thread start") + Thread.sleep(RETRY_TIMEOUT / 2) + + createServer().execAndClose { + val response = future.get(1, TimeUnit.MINUTES) + assertEquals(1, response.seq) + assertEquals(1, response.origSeq) + } + } + @Test + override fun `cancel retry request`() { + val clientServerBaton = Baton("client-server") + val grpcContext = AtomicReference() + + val future = executor.submit { + grpcContext.set( + Context.current() + .withCancellation() + ) + + clientServerBaton.give("client thread started") + grpcContext.get().call { + createClientSync().singleRequestSingleResponse(Request.newBuilder().setSeq(1).build()) + } + } + + clientServerBaton.get("wait client thread start") + Thread.sleep(RETRY_TIMEOUT / 2) + val cancelExceptionMessage = "test request is canceled" + assertTrue(grpcContext.get().cancel(RuntimeException(cancelExceptionMessage))) + + val exception = assertThrows { + future.get(1, TimeUnit.MINUTES) + } + K_LOGGER.error(exception) { "Handle exception" } + + assertException( + exception, ExceptionMetadata( + "java.lang.RuntimeException: Can not execute GRPC blocking request", + ExceptionMetadata( + "Can not execute GRPC blocking request", + suspended = listOf( + ExceptionMetadata( + "UNAVAILABLE: io exception", + ExceptionMetadata( + "Connection refused: localhost/127.0.0.1:8080", + ExceptionMetadata( + "Connection refused" + ) + ), + ), + ExceptionMetadata( + "CANCELLED: Context cancelled", + ExceptionMetadata( + cancelExceptionMessage, + ) + ), + ) + ) + ) + ) + } + @Test + override fun `deadline retry request`() { + val clientServerBaton = Baton("client-server") + + val future = executor.submit { + clientServerBaton.give("client thread started") + Context.current() + .withDeadline(Deadline.after(RETRY_TIMEOUT / 2, TimeUnit.MILLISECONDS), deadlineExecutor) + .call { + createClientSync().singleRequestSingleResponse(Request.newBuilder().setSeq(1).build()) + } + } + + clientServerBaton.get("wait client thread start") + + val exception = assertThrows { + future.get(1, TimeUnit.MINUTES) + } + K_LOGGER.error(exception) { "Handle exception" } + + assertException( + exception, ExceptionMetadata( + "java.lang.RuntimeException: Can not execute GRPC blocking request", + ExceptionMetadata( + "Can not execute GRPC blocking request", + suspended = listOf( + ExceptionMetadata( + "UNAVAILABLE: io exception", + ExceptionMetadata( + "Connection refused: localhost/127.0.0.1:8080", + ExceptionMetadata( + "Connection refused" + ) + ), + ), + ExceptionMetadata( + "DEADLINE_EXCEEDED: context timed out", + ExceptionMetadata( + "context timed out", + ) + ), + ) + ) + ) + ) + } + @Test + override fun `interrupt thread during retry request`() { + val clientServerBaton = Baton("client-server") + + val future = executor.submit { + clientServerBaton.give("client thread started") + createClientSync().singleRequestSingleResponse(Request.newBuilder().setSeq(1).build()) + } + + clientServerBaton.get("wait client thread start") + Thread.sleep(RETRY_TIMEOUT / 2) + + assertEquals(0, executor.shutdownNow().size) + + val exception = assertThrows { + future.get(1, TimeUnit.MINUTES) + } + K_LOGGER.error(exception) { "Handle exception" } + + assertException( + exception, ExceptionMetadata( + "java.lang.RuntimeException: Can not execute GRPC blocking request", + ExceptionMetadata( + "Can not execute GRPC blocking request", + suspended = listOf( + ExceptionMetadata( + "UNAVAILABLE: io exception", + ExceptionMetadata( + "Connection refused: localhost/127.0.0.1:8080", + ExceptionMetadata( + "Connection refused" + ) + ), + ), + ExceptionMetadata( + "sleep interrupted" + ), + ) + ) + ) + ) + } + @Test + override fun `server terminated intermediate session (retry false)`() { + val clientServerBaton = Baton("client-server") + val future = executor.submit { + clientServerBaton.giveAndGet("client thread started", "wait server start") + createClientSync().singleRequestSingleResponse(Request.newBuilder().setSeq(1).build()) + } + + val handlerBaton = Baton("handler") + + createServer(completeResponse = false, handlerBaton = handlerBaton).execAndClose(true) { + clientServerBaton.get("wait client thread start") + clientServerBaton.give("server started") + handlerBaton.get("wait response sent") + Thread.sleep(RETRY_TIMEOUT / 2) + } + + val exception = assertThrows { + future.get(1, TimeUnit.MINUTES) + } + K_LOGGER.error(exception) { "Handle exception" } + + assertException( + exception, ExceptionMetadata( + "java.lang.RuntimeException: Can not execute GRPC blocking request", + ExceptionMetadata( + "Can not execute GRPC blocking request", + suspended = listOf( + ExceptionMetadata( + "CANCELLED: $STATUS_DESCRIPTION_OF_INTERRUPTED_REQUEST", + ), + ) + ), + ) + ) + } + @Test + override fun `server terminated intermediate session (retry true)`() { + val clientServerBaton = Baton("client-server") + val future = executor.submit { + clientServerBaton.giveAndGet("client thread started", "wait server start") + createClientSync(retryInterruptedTransaction = true).singleRequestSingleResponse( + Request.newBuilder().setSeq(1).build() + ) + } + + val handlerBaton = Baton("handler") + + createServer(completeResponse = false, handlerBaton = handlerBaton).execAndClose(true) { + clientServerBaton.get("wait client thread start") + clientServerBaton.give("server started") + handlerBaton.get("wait response sent") + Thread.sleep(RETRY_TIMEOUT / 2) + } + + assertFalse(future.isDone) + + DefaultGrpcRouter().use { grpcRouter -> + createServer(grpcRouter = grpcRouter).execAndClose { + val response = future.get(1, TimeUnit.MINUTES) + assertEquals(1, response.seq) + assertEquals(1, response.origSeq) + } + } + } + } + + @Nested + inner class SingleRequestSingleResponseAsyncTest : AbstractGrpcRouterTest() { + @Test + override fun general() { + val streamObserver = mock> { } + createServer().execAndClose(true) { + createClientAsync().singleRequestSingleResponse(Request.newBuilder().setSeq(1).build(), streamObserver) + + val captor = argumentCaptor { } + verify(streamObserver, timeout(60 * 1_000)).onCompleted() + verify(streamObserver).onNext(captor.capture()) + verifyNoMoreInteractions(streamObserver) + + assertEquals(1, captor.allValues.size) + assertEquals(1, captor.firstValue.seq) + assertEquals(1, captor.firstValue.origSeq) + } + } + @Test + override fun `delayed server start`() { + val streamObserver = mock> { } + createClientAsync().singleRequestSingleResponse(Request.newBuilder().setSeq(1).build(), streamObserver) + + Thread.sleep(RETRY_TIMEOUT / 2) + + createServer().execAndClose { + val captor = argumentCaptor { } + verify(streamObserver, timeout(60 * 1_000)).onCompleted() + verify(streamObserver).onNext(captor.capture()) + verifyNoMoreInteractions(streamObserver) + + assertEquals(1, captor.allValues.size) + assertEquals(1, captor.firstValue.seq) + assertEquals(1, captor.firstValue.origSeq) + } + } + @Test + override fun `cancel retry request`() { + val grpcContext = Context.current().withCancellation() + + val streamObserver = mock> { } + grpcContext.call { + createClientAsync().singleRequestSingleResponse(Request.newBuilder().setSeq(1).build(), streamObserver) + } + + Thread.sleep(RETRY_TIMEOUT / 2) + + val cancelExceptionMessage = "test request is canceled" + assertTrue(grpcContext.cancel(RuntimeException(cancelExceptionMessage))) + + val captor = argumentCaptor { } + verify(streamObserver, timeout(60 * 1_000)).onError(captor.capture()) + verifyNoMoreInteractions(streamObserver) + + assertEquals(1, captor.allValues.size) + K_LOGGER.error(captor.firstValue) { "Handle exception" } + + assertException( + captor.firstValue, ExceptionMetadata( + "CANCELLED: Context cancelled", + ExceptionMetadata( + cancelExceptionMessage, + ) + ) + ) + } + @Test + override fun `deadline retry request`() { + val grpcContext = Context.current() + .withDeadline(Deadline.after(RETRY_TIMEOUT / 2, TimeUnit.MILLISECONDS), deadlineExecutor) + + val streamObserver = mock> { } + grpcContext.call { + createClientAsync().singleRequestSingleResponse(Request.newBuilder().setSeq(1).build(), streamObserver) + } + + val captor = argumentCaptor { } + verify(streamObserver, timeout(60 * 1_000)).onError(captor.capture()) + verifyNoMoreInteractions(streamObserver) + + assertEquals(1, captor.allValues.size) + K_LOGGER.error(captor.firstValue) { "Handle exception" } + + assertException( + captor.firstValue, ExceptionMetadata( + "DEADLINE_EXCEEDED: context timed out", + ExceptionMetadata( + "context timed out", + ) + ) + ) + } + @Test + override fun `interrupt thread during retry request`() { + // this test isn't relevant for async request + } + @Test + override fun `server terminated intermediate session (retry false)`() { + val streamObserver = mock> { } + val handlerBaton = Baton("handler") + + createServer(completeResponse = false, handlerBaton = handlerBaton).execAndClose(true) { + createClientAsync().singleRequestSingleResponse(Request.newBuilder().setSeq(1).build(), streamObserver) + + handlerBaton.get("wait response sent") + Thread.sleep(RETRY_TIMEOUT / 2) + } + + val captor = argumentCaptor { } + verify(streamObserver, timeout(60 * 1_000)).onError(captor.capture()) + verifyNoMoreInteractions(streamObserver) + + assertEquals(1, captor.allValues.size) + K_LOGGER.error(captor.firstValue) { "Handle exception" } + + assertException( + captor.firstValue, ExceptionMetadata( + "CANCELLED: $STATUS_DESCRIPTION_OF_INTERRUPTED_REQUEST", + ) + ) + } + @Test + override fun `server terminated intermediate session (retry true)`() { + val streamObserver = mock> { } + val handlerBaton = Baton("handler") + + createServer(completeResponse = false, handlerBaton = handlerBaton).execAndClose(true) { + createClientAsync(retryInterruptedTransaction = true).singleRequestSingleResponse(Request.newBuilder().setSeq(1).build(), streamObserver) + handlerBaton.get("wait response sent") + Thread.sleep(RETRY_TIMEOUT / 2) + } + + verifyNoMoreInteractions(streamObserver) + + DefaultGrpcRouter().use { grpcRouter -> + createServer(grpcRouter = grpcRouter).execAndClose { + val captor = argumentCaptor { } + verify(streamObserver, timeout(60 * 1_000)).onCompleted() + verify(streamObserver).onNext(captor.capture()) + verifyNoMoreInteractions(streamObserver) + + assertEquals(1, captor.allValues.size) + assertEquals(1, captor.firstValue.seq) + assertEquals(1, captor.firstValue.origSeq) + } + } + } + } + + @Nested + inner class SingleRequestMultipleResponseSyncTest : AbstractGrpcRouterTest() { + @Test + override fun general() { + createServer().execAndClose { + val responses = createClientSync().singleRequestMultipleResponse(Request.newBuilder().setSeq(1).build()) + .asSequence().toList() + + assertEquals(2, responses.size) + responses.forEachIndexed { index, response -> + assertEquals(index + 1, response.seq) + assertEquals(1, response.origSeq) + } + } + } + @Test + override fun `delayed server start`() { + val iterator = createClientSync().singleRequestMultipleResponse(Request.newBuilder().setSeq(1).build()) + val exception = assertThrows { + // FIXME: gRPC router should retry to resend request when server isn't available. + // iterator can throw exception when server disappears in the intermediate of response retransmission + // and retryInterruptedTransaction false + iterator.hasNext() + } + K_LOGGER.error(exception) { "Handle exception" } + assertException( + exception, ExceptionMetadata( + "UNAVAILABLE: io exception", + cause = ExceptionMetadata( + "Connection refused: localhost/127.0.0.1:8080", + cause = ExceptionMetadata( + "Connection refused" + ) + ) + ) + ) + } + @Test + override fun `cancel retry request`() { + // FIXME: implement after retry implementing in the `delayed server start` case + } + @Test + override fun `deadline retry request`() { + // FIXME: implement after retry implementing in the `delayed server start` case + } + @Test + override fun `interrupt thread during retry request`() { + // FIXME: implement after retry implementing in the `delayed server start` case + } + @Test + override fun `server terminated intermediate session (retry false)`() { + val clientServerBaton = Baton("client-server") + val future = executor.submit> { + clientServerBaton.giveAndGet("client thread started", "wait server start") + createClientSync().singleRequestMultipleResponse(Request.newBuilder().setSeq(1).build()) + .asSequence().toList() + } + + val handlerBaton = Baton("handler") + + createServer(completeResponse = true, handlerBaton = handlerBaton).execAndClose(true) { + clientServerBaton.get("wait client thread start") + clientServerBaton.give("server started") + handlerBaton.get("wait response sent") + Thread.sleep(RETRY_TIMEOUT / 2) + } + + val exception = assertThrows { + future.get(1, TimeUnit.MINUTES) + } + K_LOGGER.error(exception) { "Handle exception" } + assertException( + exception, ExceptionMetadata( + "io.grpc.StatusRuntimeException: CANCELLED: $STATUS_DESCRIPTION_OF_INTERRUPTED_REQUEST", + ExceptionMetadata( + "CANCELLED: $STATUS_DESCRIPTION_OF_INTERRUPTED_REQUEST", + ) + ) + ) + } + @Test + override fun `server terminated intermediate session (retry true)`() { + val clientServerBaton = Baton("client-server") + val future = executor.submit> { + clientServerBaton.giveAndGet("client thread started", "wait server start") + // FIXME: gRPC router should retry to resend request when server is terminated intermediate handling. + // iterator can throw exception when server disappears in the intermediate of response retransmission + // and retryInterruptedTransaction false + createClientSync(retryInterruptedTransaction = true).singleRequestMultipleResponse( + Request.newBuilder().setSeq(1).build() + ).asSequence().toList() + } + + val handlerBaton = Baton("handler") + + createServer(completeResponse = true, handlerBaton = handlerBaton).execAndClose(true) { + clientServerBaton.get("wait client thread start") + clientServerBaton.give("server started") + handlerBaton.get("wait response sent") + Thread.sleep(RETRY_TIMEOUT / 2) + } + + val exception = assertThrows { + future.get(1, TimeUnit.MINUTES) + } + K_LOGGER.error(exception) { "Handle exception" } + assertException( + exception, ExceptionMetadata( + "io.grpc.StatusRuntimeException: CANCELLED: $STATUS_DESCRIPTION_OF_INTERRUPTED_REQUEST", + ExceptionMetadata( + "CANCELLED: $STATUS_DESCRIPTION_OF_INTERRUPTED_REQUEST", + ) + ) + ) + } + } + + companion object { + private val K_LOGGER = KotlinLogging.logger { } + + private const val SERVER_PORT = 8080 + private const val RETRY_TIMEOUT = 1_000L + private inline fun Server.execAndClose(force: Boolean = false, func: Server.() -> Unit = { }) { + try { + val startTime = Instant.now() + func() + K_LOGGER.info { "Function duration: ${Duration.between(startTime, Instant.now()).toMillis()} ms" } + } finally { + if (force) { + shutdownNow() + } else { + shutdown() + if (!awaitTermination(5, TimeUnit.SECONDS)) { + shutdownNow() + error("'Server' can't be closed") + } + } + } + } + + private fun assertException( + exception: Throwable, + exceptionMetadata: ExceptionMetadata, + path: List = emptyList() + ) { + assertEquals( + exceptionMetadata.message, + exception.message, + "Message for exception: $exception, path: ${path.printAsStackTrace()}" + ) + exceptionMetadata.suspended?.let { suspendMetadataList -> + assertEquals( + suspendMetadataList.size, + exception.suppressed.size, + "Suspended for exception: $exception, path: ${path.printAsStackTrace()}" + ) + suspendMetadataList.forEachIndexed { index, suspendMetadata -> + assertException( + exception.suppressed[index], + suspendMetadata, + path.plus(listOf(exception.message, "[$index]")) + ) + } + } ?: assertEquals( + 0, + exception.suppressed.size, + "Suspended for exception: $exception, path: ${path.printAsStackTrace()}" + ) + exceptionMetadata.cause?.let { causeMetadata -> + assertNotNull(exception.cause, "Cause for exception: $exception, path: ${path.printAsStackTrace()}") + .also { assertException(it, causeMetadata, path.plus(exception.message)) } + } ?: assertNull(exception.cause, "Cause for exception: $exception, path: ${path.printAsStackTrace()}") + } + + private fun List.printAsStackTrace() = asSequence() + .joinToString(separator = "\n -> ", prefix = "\n -> ") + + private fun ExecutorService.shutdownGracefully() { + shutdown() + if (!awaitTermination(1, TimeUnit.SECONDS)) { + shutdownNow() + error("'Executor' can't be stopped") + } + } + + internal class ExceptionMetadata( + val message: String? = null, + val cause: ExceptionMetadata? = null, + val suspended: List? = null + ) + + internal class Baton( + private val name: String + ) { + private val queue = ArrayBlockingQueue(1).apply { put(Any()) } + + fun giveAndGet(giveComment: String = "", getComment: String = "") { + give(giveComment) + get(getComment) + } + + fun give(comment: String = "") { + K_LOGGER.info { "'$name' baton is giving by [${Thread.currentThread().name}] - $comment" } + queue.put(Any()) + K_LOGGER.info { "'$name' baton is given by [${Thread.currentThread().name}] - $comment" } + } + + fun get(comment: String = "") { + K_LOGGER.info { "'$name' baton is getting by [${Thread.currentThread().name}] - $comment" } + queue.poll() + K_LOGGER.info { "'$name' baton is got by [${Thread.currentThread().name}] - $comment" } + } + } + + private class TestServiceHandler( + private val complete: Boolean = true, + private val responseBaton: Baton? = null, + ) : TestImplBase() { + private val sequence = AtomicInteger() + override fun singleRequestSingleResponse(request: Request, responseObserver: StreamObserver) { + responseObserver.onNext(Response.newBuilder().apply { + origSeq = request.seq + seq = sequence.incrementAndGet() + }.build()) + + responseBaton?.let { + Thread.sleep(1_000) + it.give("response sent") + } + + if (complete) { + responseObserver.onCompleted() + } + } + + override fun singleRequestMultipleResponse(request: Request, responseObserver: StreamObserver) { + repeat(2) { + responseObserver.onNext(Response.newBuilder().apply { + origSeq = request.seq + seq = sequence.incrementAndGet() + }.build()) + } + + responseBaton?.let { + Thread.sleep(1_000) + it.give("response sent") + } + + if (complete) { + responseObserver.onCompleted() + } + } + } + } +} \ No newline at end of file diff --git a/src/test/proto/th2_grpc_test/test.proto b/src/test/proto/th2_grpc_test/test.proto new file mode 100644 index 00000000..a4e93fbe --- /dev/null +++ b/src/test/proto/th2_grpc_test/test.proto @@ -0,0 +1,35 @@ +/* + * Copyright 2023 Exactpro (Exactpro Systems Limited) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +syntax = "proto3"; + +option java_multiple_files = true; +option java_package = "com.exactpro.th2.common.test.grpc"; + + +service Test { + rpc singleRequestSingleResponse (Request) returns (Response) {} + rpc singleRequestMultipleResponse (Request) returns (stream Response) {} +} + +message Request { + uint32 seq = 1; +} + +message Response { + uint32 orig_seq = 1; + uint32 seq = 2; +} \ No newline at end of file diff --git a/src/test/resources/test_json_configurations/grpc.json b/src/test/resources/test_json_configurations/grpc.json index 62e30d1a..2b4ad5a2 100644 --- a/src/test/resources/test_json_configurations/grpc.json +++ b/src/test/resources/test_json_configurations/grpc.json @@ -24,5 +24,10 @@ "port": 1234, "workers": 58 }, - "keepAliveInterval": 400 + "keepAliveInterval": 400, + "retryConfiguration": { + "maxAttempts": 5, + "minMethodRetriesTimeout": 100, + "maxMethodRetriesTimeout": 2000 + } } \ No newline at end of file diff --git a/src/test/resources/test_json_configurations/grpc_router.json b/src/test/resources/test_json_configurations/grpc_router.json new file mode 100644 index 00000000..d179862f --- /dev/null +++ b/src/test/resources/test_json_configurations/grpc_router.json @@ -0,0 +1,10 @@ +{ + "enableSizeMeasuring": true, + "keepAliveInterval": 61, + "maxMessageSize": 4194305, + "retryConfiguration": { + "maxAttempts": 61, + "minMethodRetriesTimeout": 101, + "maxMethodRetriesTimeout": 120001 + } +} \ No newline at end of file