diff --git a/README.md b/README.md index 17f9433e..ad2b9a14 100644 --- a/README.md +++ b/README.md @@ -501,12 +501,15 @@ dependencies { ## Release notes -### 5.6.1-dev +### 5.7.0-dev #### Fix: + gRPC `retryConfiguration` has been moved from grpc.json to grpc_router.json + the whole default gRPC retry interval is about 1 hour +#### Updated: ++ grpc-service-generator: `3.5.0` + ### 5.6.0-dev #### Added: diff --git a/build.gradle b/build.gradle index 305f6790..281a6799 100644 --- a/build.gradle +++ b/build.gradle @@ -39,7 +39,7 @@ version = release_version ext { grpcVersion = '1.56.0' protobufVersion = '3.23.2' // The protoc:3.23.3 https://github.com/protocolbuffers/protobuf/issues/13070 - serviceGeneratorVersion = '3.4.0' + serviceGeneratorVersion = '3.5.0-TH2-5107-+' cradleVersion = '5.1.1-dev' junitVersion = '5.10.0' diff --git a/src/main/kotlin/com/exactpro/th2/common/schema/grpc/configuration/GrpcRetryConfiguration.kt b/src/main/kotlin/com/exactpro/th2/common/schema/grpc/configuration/GrpcRetryConfiguration.kt index 9295c4e7..95d93fde 100644 --- a/src/main/kotlin/com/exactpro/th2/common/schema/grpc/configuration/GrpcRetryConfiguration.kt +++ b/src/main/kotlin/com/exactpro/th2/common/schema/grpc/configuration/GrpcRetryConfiguration.kt @@ -20,7 +20,8 @@ import com.exactpro.th2.service.RetryPolicy data class GrpcRetryConfiguration( private var maxAttempts: Int = 60, var minMethodRetriesTimeout: Long = 100, - var maxMethodRetriesTimeout: Long = 120_000 + var maxMethodRetriesTimeout: Long = 120_000, + var retryInterruptedTransaction: Boolean = false, ) : Configuration(), RetryPolicy { init { @@ -51,4 +52,8 @@ data class GrpcRetryConfiguration( } override fun getMaxAttempts(): Int = maxAttempts + + override fun retryInterruptedTransaction(): Boolean { + return retryInterruptedTransaction + } } \ No newline at end of file diff --git a/src/test/kotlin/com/exactpro/th2/common/schema/grpc/router/impl/DefaultGrpcRouterTest.kt b/src/test/kotlin/com/exactpro/th2/common/schema/grpc/router/impl/DefaultGrpcRouterTest.kt index bc3ac84c..329e0497 100644 --- a/src/test/kotlin/com/exactpro/th2/common/schema/grpc/router/impl/DefaultGrpcRouterTest.kt +++ b/src/test/kotlin/com/exactpro/th2/common/schema/grpc/router/impl/DefaultGrpcRouterTest.kt @@ -27,6 +27,9 @@ import com.exactpro.th2.common.test.grpc.Request import com.exactpro.th2.common.test.grpc.Response import com.exactpro.th2.common.test.grpc.TestGrpc.TestImplBase import com.exactpro.th2.common.test.grpc.TestService +import com.exactpro.th2.service.AbstractGrpcService.STATUS_DESCRIPTION_OF_INTERRUPTED_REQUEST +import io.grpc.Context +import io.grpc.Deadline import io.grpc.Server import io.grpc.StatusRuntimeException import io.grpc.stub.StreamObserver @@ -39,11 +42,16 @@ import org.testcontainers.shaded.com.google.common.util.concurrent.ThreadFactory import java.time.Duration import java.time.Instant import java.util.concurrent.ArrayBlockingQueue +import java.util.concurrent.ExecutionException +import java.util.concurrent.ExecutorService import java.util.concurrent.Executors import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.atomic.AtomicReference import kotlin.test.assertEquals +import kotlin.test.assertFalse import kotlin.test.assertNotNull +import kotlin.test.assertTrue @IntegrationTest internal class DefaultGrpcRouterTest { @@ -53,16 +61,16 @@ internal class DefaultGrpcRouterTest { private val executor = Executors.newSingleThreadExecutor( ThreadFactoryBuilder().setNameFormat("test-%d").build() ) + private val deadlineExecutor = Executors.newSingleThreadScheduledExecutor( + ThreadFactoryBuilder().setNameFormat("test-deadline-%d").build() + ) @AfterEach fun afterEach() { grpcRouterServer.close() grpcRouterClient.close() - executor.shutdown() - if (!executor.awaitTermination(1, TimeUnit.SECONDS)) { - executor.shutdownNow() - error("'Executor' can't be stopped") - } + executor.shutdownGracefully() + deadlineExecutor.shutdownGracefully() } @Test @@ -86,7 +94,7 @@ internal class DefaultGrpcRouterTest { } clientServerBaton.get("wait client thread start") - Thread.sleep(1_000) + Thread.sleep(RETRY_TIMEOUT * 2) createServer().use { val response = future.get(1, TimeUnit.MINUTES) @@ -96,28 +104,62 @@ internal class DefaultGrpcRouterTest { } @Test - fun `single request single response - server terminated intermediate session`() { + fun `single request single response - server is terminated in intermediate session (retry false)`() { val clientServerBaton = Baton("client-server") val future = executor.submit { clientServerBaton.giveAndGet("client thread started", "wait server start") -// FIXME: client should differ Interrupted request and Connection refused -// return@submit assertThrows { - return@submit createClient().singleRequestSingleResponse(Request.newBuilder().setSeq(1).build()) -// } + createClient().singleRequestSingleResponse(Request.newBuilder().setSeq(1).build()) } - clientServerBaton.get("wait client thread start") + val handlerBaton = Baton("handler") + + createServer(completeResponse = false, handlerBaton = handlerBaton).use(true) { + clientServerBaton.get("wait client thread start") + clientServerBaton.give("server started") + handlerBaton.get("wait response sent") + Thread.sleep(RETRY_TIMEOUT / 2) + } + + val exception = assertThrows { + future.get(1, TimeUnit.MINUTES) + } + K_LOGGER.error(exception) { "Handle exception" } + + assertException( + exception, ExceptionMetadata( + "java.lang.RuntimeException: Can not execute GRPC blocking request", + ExceptionMetadata( + "Can not execute GRPC blocking request", + suspended = listOf( + ExceptionMetadata( + "CANCELLED: $STATUS_DESCRIPTION_OF_INTERRUPTED_REQUEST", + ), + ) + ), + ) + ) + } + + @Test + fun `single request single response - server is terminated in intermediate session (retry true)`() { + val clientServerBaton = Baton("client-server") + val future = executor.submit { + clientServerBaton.giveAndGet("client thread started", "wait server start") + createClient(retryInterruptedTransaction = true).singleRequestSingleResponse( + Request.newBuilder().setSeq(1).build() + ) + } val handlerBaton = Baton("handler") createServer(completeResponse = false, handlerBaton = handlerBaton).use(true) { + clientServerBaton.get("wait client thread start") clientServerBaton.give("server started") handlerBaton.get("wait response sent") + Thread.sleep(RETRY_TIMEOUT / 2) } -// FIXME: client should differ Interrupted request and Connection refused -// val exception = future.get(1, TimeUnit.MINUTES) -// assertEquals("", exception.message) + assertFalse(future.isDone) DefaultGrpcRouter().use { grpcRouter -> createServer(grpcRouter = grpcRouter).use { @@ -128,12 +170,156 @@ internal class DefaultGrpcRouterTest { } } + @Test + fun `single request single response - cancel request`() { + val clientServerBaton = Baton("client-server") + val grpcContext = AtomicReference() + + val future = executor.submit { + grpcContext.set( + Context.current() + .withCancellation() + ) + + clientServerBaton.give("client thread started") + grpcContext.get().call { + createClient().singleRequestSingleResponse(Request.newBuilder().setSeq(1).build()) + } + } + + clientServerBaton.get("wait client thread start") + Thread.sleep(RETRY_TIMEOUT / 2) + val cancelExceptionMessage = "test request is canceled" + assertTrue(grpcContext.get().cancel(RuntimeException(cancelExceptionMessage))) + + val exception = assertThrows { + future.get(1, TimeUnit.MINUTES) + } + K_LOGGER.error(exception) { "Handle exception" } + + assertException( + exception, ExceptionMetadata( + "java.lang.RuntimeException: Can not execute GRPC blocking request", + ExceptionMetadata( + "Can not execute GRPC blocking request", + suspended = listOf( + ExceptionMetadata( + "UNAVAILABLE: io exception", + ExceptionMetadata( + "Connection refused: localhost/127.0.0.1:8080", + ExceptionMetadata( + "Connection refused" + ) + ), + ), + ExceptionMetadata( + "CANCELLED: Context cancelled", + ExceptionMetadata( + cancelExceptionMessage, + ) + ), + ) + ) + ) + ) + } + + @Test + fun `single request single response - deadline request`() { + val clientServerBaton = Baton("client-server") + + val future = executor.submit { + clientServerBaton.give("client thread started") + Context.current() + .withDeadline(Deadline.after(RETRY_TIMEOUT / 2, TimeUnit.MILLISECONDS), deadlineExecutor) + .call { + createClient().singleRequestSingleResponse(Request.newBuilder().setSeq(1).build()) + } + } + + clientServerBaton.get("wait client thread start") + + val exception = assertThrows { + future.get(1, TimeUnit.MINUTES) + } + K_LOGGER.error(exception) { "Handle exception" } + + assertException( + exception, ExceptionMetadata( + "java.lang.RuntimeException: Can not execute GRPC blocking request", + ExceptionMetadata( + "Can not execute GRPC blocking request", + suspended = listOf( + ExceptionMetadata( + "UNAVAILABLE: io exception", + ExceptionMetadata( + "Connection refused: localhost/127.0.0.1:8080", + ExceptionMetadata( + "Connection refused" + ) + ), + ), + ExceptionMetadata( + "DEADLINE_EXCEEDED: context timed out", + ExceptionMetadata( + "context timed out", + ) + ), + ) + ) + ) + ) + } + + @Test + fun `single request single response - interrupt thread`() { + val clientServerBaton = Baton("client-server") + + val future = executor.submit { + clientServerBaton.give("client thread started") + createClient().singleRequestSingleResponse(Request.newBuilder().setSeq(1).build()) + } + + clientServerBaton.get("wait client thread start") + Thread.sleep(RETRY_TIMEOUT / 2) + + assertEquals(0, executor.shutdownNow().size) + + val exception = assertThrows { + future.get(1, TimeUnit.MINUTES) + } + K_LOGGER.error(exception) { "Handle exception" } + + assertException( + exception, ExceptionMetadata( + "java.lang.RuntimeException: Can not execute GRPC blocking request", + ExceptionMetadata( + "Can not execute GRPC blocking request", + suspended = listOf( + ExceptionMetadata( + "UNAVAILABLE: io exception", + ExceptionMetadata( + "Connection refused: localhost/127.0.0.1:8080", + ExceptionMetadata( + "Connection refused" + ) + ), + ), + ExceptionMetadata( + "sleep interrupted" + ), + ) + ) + ) + ) + } + + @Test fun `single request multiple response`() { createServer().use { - val responses = executor.submit> { - createClient().singleRequestMultipleResponse(Request.newBuilder().setSeq(1).build()).asSequence().toList() - }.get(1, TimeUnit.SECONDS) + val responses = createClient().singleRequestMultipleResponse(Request.newBuilder().setSeq(1).build()) + .asSequence().toList() assertEquals(2, responses.size) responses.forEachIndexed { index, response -> @@ -149,48 +335,89 @@ internal class DefaultGrpcRouterTest { val exception = assertThrows { // FIXME: gRPC router should retry to resend request when server isn't available. // iterator can throw exception when server disappears in the intermediate of response retransmission + // and retryInterruptedTransaction false iterator.hasNext() } - exception.printStackTrace() - assertEquals("UNAVAILABLE: io exception", exception.message) - var cause = exception.cause - assertNotNull(cause) - assertEquals("Connection refused: localhost/127.0.0.1:8080", cause.message) - cause = cause.cause - assertNotNull(cause) - assertEquals("Connection refused", cause.message) - assertNull(cause.cause) + K_LOGGER.error(exception) { "Handle exception" } + assertException( + exception, ExceptionMetadata( + "UNAVAILABLE: io exception", + cause = ExceptionMetadata( + "Connection refused: localhost/127.0.0.1:8080", + cause = ExceptionMetadata( + "Connection refused" + ) + ) + ) + ) } @Test - fun `single request multiple response - server terminated intermediate session`() { + fun `single request multiple response - server terminated intermediate session (retry false)`() { val clientServerBaton = Baton("client-server") - val future = executor.submit { + val future = executor.submit> { clientServerBaton.giveAndGet("client thread started", "wait server start") - return@submit assertThrows { - createClient().singleRequestMultipleResponse(Request.newBuilder().setSeq(1).build()).asSequence().toList() - } + createClient().singleRequestMultipleResponse(Request.newBuilder().setSeq(1).build()) + .asSequence().toList() } - clientServerBaton.get("wait client thread start") + val handlerBaton = Baton("handler") + + createServer(completeResponse = true, handlerBaton = handlerBaton).use(true) { + clientServerBaton.get("wait client thread start") + clientServerBaton.give("server started") + handlerBaton.get("wait response sent") + Thread.sleep(RETRY_TIMEOUT / 2) + } + + val exception = assertThrows { + future.get(1, TimeUnit.MINUTES) + } + K_LOGGER.error(exception) { "Handle exception" } + assertException( + exception, ExceptionMetadata( + "io.grpc.StatusRuntimeException: CANCELLED: $STATUS_DESCRIPTION_OF_INTERRUPTED_REQUEST", + ExceptionMetadata( + "CANCELLED: $STATUS_DESCRIPTION_OF_INTERRUPTED_REQUEST", + ) + ) + ) + } + + @Test + fun `single request multiple response - server terminated intermediate session (retry true)`() { + val clientServerBaton = Baton("client-server") + val future = executor.submit> { + clientServerBaton.giveAndGet("client thread started", "wait server start") + // FIXME: gRPC router should retry to resend request when server is terminated intermediate handling. + // iterator can throw exception when server disappears in the intermediate of response retransmission + // and retryInterruptedTransaction false + createClient(retryInterruptedTransaction = true).singleRequestMultipleResponse( + Request.newBuilder().setSeq(1).build() + ).asSequence().toList() + } val handlerBaton = Baton("handler") createServer(completeResponse = true, handlerBaton = handlerBaton).use(true) { + clientServerBaton.get("wait client thread start") clientServerBaton.give("server started") handlerBaton.get("wait response sent") + Thread.sleep(RETRY_TIMEOUT / 2) } - val exception = future.get(1, TimeUnit.MINUTES) - exception.printStackTrace() - assertEquals("UNAVAILABLE: io exception", exception.message) - var cause = exception.cause - assertNotNull(cause) - assertEquals("Connection refused: localhost/127.0.0.1:8080", cause.message) - cause = cause.cause - assertNotNull(cause) - assertEquals("Connection refused", cause.message) - assertNull(cause.cause) + val exception = assertThrows { + future.get(1, TimeUnit.MINUTES) + } + K_LOGGER.error(exception) { "Handle exception" } + assertException( + exception, ExceptionMetadata( + "io.grpc.StatusRuntimeException: CANCELLED: $STATUS_DESCRIPTION_OF_INTERRUPTED_REQUEST", + ExceptionMetadata( + "CANCELLED: $STATUS_DESCRIPTION_OF_INTERRUPTED_REQUEST", + ) + ) + ) } private fun createClient( @@ -198,20 +425,23 @@ internal class DefaultGrpcRouterTest { RobinRoutingStrategy().apply { init(GrpcRawRobinStrategy(listOf("endpoint"))) }, TestService::class.java, mapOf("endpoint" to GrpcEndpointConfiguration("localhost", SERVER_PORT)) - ) + ), + retryInterruptedTransaction: Boolean = false ): TestService { grpcRouterClient.init( GrpcConfiguration(services = mapOf("test" to configuration)), GrpcRouterConfiguration( retryConfiguration = GrpcRetryConfiguration( Int.MAX_VALUE, - 500, - 500 + RETRY_TIMEOUT, + RETRY_TIMEOUT, + retryInterruptedTransaction ) ) ) return grpcRouterClient.getService(TestService::class.java) } + private fun createServer( configuration: GrpcServerConfiguration = GrpcServerConfiguration( null, @@ -224,10 +454,12 @@ internal class DefaultGrpcRouterTest { grpcRouter.init(GrpcConfiguration(serverConfiguration = configuration), GrpcRouterConfiguration()) return grpcRouter.startServer(TestServiceHandler(completeResponse, handlerBaton)).apply(Server::start) } + companion object { private val K_LOGGER = KotlinLogging.logger { } private const val SERVER_PORT = 8080 + private const val RETRY_TIMEOUT = 1_000L private inline fun Server.use(force: Boolean = false, func: Server.() -> Unit) { try { val startTime = Instant.now() @@ -246,6 +478,57 @@ internal class DefaultGrpcRouterTest { } } + private fun assertException( + exception: Throwable, + exceptionMetadata: ExceptionMetadata, + path: List = emptyList() + ) { + assertEquals( + exceptionMetadata.message, + exception.message, + "Message for exception: $exception, path: ${path.printAsStackTrace()}" + ) + exceptionMetadata.suspended?.let { suspendMetadataList -> + assertEquals( + suspendMetadataList.size, + exception.suppressed.size, + "Suspended for exception: $exception, path: ${path.printAsStackTrace()}" + ) + suspendMetadataList.forEachIndexed { index, suspendMetadata -> + assertException( + exception.suppressed[index], + suspendMetadata, + path.plus(listOf(exception.message, "[$index]")) + ) + } + } ?: assertEquals( + 0, + exception.suppressed.size, + "Suspended for exception: $exception, path: ${path.printAsStackTrace()}" + ) + exceptionMetadata.cause?.let { causeMetadata -> + assertNotNull(exception.cause, "Cause for exception: $exception, path: ${path.printAsStackTrace()}") + .also { assertException(it, causeMetadata, path.plus(exception.message)) } + } ?: assertNull(exception.cause, "Cause for exception: $exception, path: ${path.printAsStackTrace()}") + } + + private fun List.printAsStackTrace() = asSequence() + .joinToString(separator = "\n -> ", prefix = "\n -> ") + + private fun ExecutorService.shutdownGracefully() { + shutdown() + if (!awaitTermination(1, TimeUnit.SECONDS)) { + shutdownNow() + error("'Executor' can't be stopped") + } + } + + private class ExceptionMetadata( + val message: String? = null, + val cause: ExceptionMetadata? = null, + val suspended: List? = null + ) + private class Baton( private val name: String ) { @@ -255,17 +538,20 @@ internal class DefaultGrpcRouterTest { give(giveComment) get(getComment) } + fun give(comment: String = "") { K_LOGGER.info { "'$name' baton is giving by [${Thread.currentThread().name}] - $comment" } queue.put(Any()) K_LOGGER.info { "'$name' baton is given by [${Thread.currentThread().name}] - $comment" } } + fun get(comment: String = "") { K_LOGGER.info { "'$name' baton is getting by [${Thread.currentThread().name}] - $comment" } queue.poll() K_LOGGER.info { "'$name' baton is got by [${Thread.currentThread().name}] - $comment" } } } + private class TestServiceHandler( private val complete: Boolean = true, private val responseBaton: Baton? = null,