From 06aeffc3894a60d2eb0534d7d294c4198d00bd48 Mon Sep 17 00:00:00 2001 From: "nikita.smirnov" Date: Wed, 8 Nov 2023 18:24:03 +0400 Subject: [PATCH] [TH2-5122] added SingleRequestMultipleResponseAsyncTest --- build.gradle | 2 +- .../grpc/router/impl/DefaultGrpcRouterTest.kt | 181 +++++++++++++++++- 2 files changed, 174 insertions(+), 9 deletions(-) diff --git a/build.gradle b/build.gradle index 777ab7eb2..ebe4b8f03 100644 --- a/build.gradle +++ b/build.gradle @@ -39,7 +39,7 @@ version = release_version ext { grpcVersion = '1.56.0' protobufVersion = '3.23.2' // The protoc:3.23.3 https://github.com/protocolbuffers/protobuf/issues/13070 - serviceGeneratorVersion = '3.5.1' + serviceGeneratorVersion = '3.5.1-th2-5122-+' cradleVersion = '5.1.1-dev' junitVersion = '5.10.0' diff --git a/src/test/kotlin/com/exactpro/th2/common/schema/grpc/router/impl/DefaultGrpcRouterTest.kt b/src/test/kotlin/com/exactpro/th2/common/schema/grpc/router/impl/DefaultGrpcRouterTest.kt index 8a674c014..943d7aa9b 100644 --- a/src/test/kotlin/com/exactpro/th2/common/schema/grpc/router/impl/DefaultGrpcRouterTest.kt +++ b/src/test/kotlin/com/exactpro/th2/common/schema/grpc/router/impl/DefaultGrpcRouterTest.kt @@ -28,6 +28,8 @@ import com.exactpro.th2.common.test.grpc.Request import com.exactpro.th2.common.test.grpc.Response import com.exactpro.th2.common.test.grpc.TestGrpc.TestImplBase import com.exactpro.th2.common.test.grpc.TestService +import com.exactpro.th2.service.AbstractGrpcService.MID_TRANSFER_FAILURE_EXCEPTION_MESSAGE +import com.exactpro.th2.service.AbstractGrpcService.ROOT_RETRY_SYNC_EXCEPTION_MESSAGE import com.exactpro.th2.service.AbstractGrpcService.STATUS_DESCRIPTION_OF_INTERRUPTED_REQUEST import io.grpc.Context import io.grpc.Deadline @@ -42,6 +44,7 @@ import org.junit.jupiter.api.assertThrows import org.mockito.kotlin.argumentCaptor import org.mockito.kotlin.mock import org.mockito.kotlin.timeout +import org.mockito.kotlin.times import org.mockito.kotlin.verify import org.mockito.kotlin.verifyNoMoreInteractions import org.testcontainers.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder @@ -275,7 +278,7 @@ internal class DefaultGrpcRouterTest { exception, ExceptionMetadata( "java.lang.RuntimeException: Can not execute GRPC blocking request", ExceptionMetadata( - "Can not execute GRPC blocking request", + ROOT_RETRY_SYNC_EXCEPTION_MESSAGE, suspended = listOf( ExceptionMetadata( "CANCELLED: $STATUS_DESCRIPTION_OF_INTERRUPTED_REQUEST", @@ -601,9 +604,9 @@ internal class DefaultGrpcRouterTest { K_LOGGER.error(exception) { "Handle exception" } assertException( exception, ExceptionMetadata( - "java.lang.IllegalStateException: Request failures mid-transfer", + "java.lang.IllegalStateException: $MID_TRANSFER_FAILURE_EXCEPTION_MESSAGE", ExceptionMetadata( - "Request failures mid-transfer", + MID_TRANSFER_FAILURE_EXCEPTION_MESSAGE, ExceptionMetadata( "CANCELLED: $STATUS_DESCRIPTION_OF_INTERRUPTED_REQUEST" ) @@ -637,9 +640,9 @@ internal class DefaultGrpcRouterTest { K_LOGGER.error(exception) { "Handle exception" } assertException( exception, ExceptionMetadata( - "java.lang.IllegalStateException: Request failures mid-transfer", + "java.lang.IllegalStateException: $MID_TRANSFER_FAILURE_EXCEPTION_MESSAGE", ExceptionMetadata( - "Request failures mid-transfer", + MID_TRANSFER_FAILURE_EXCEPTION_MESSAGE, ExceptionMetadata( "CANCELLED: $STATUS_DESCRIPTION_OF_INTERRUPTED_REQUEST" ) @@ -649,13 +652,175 @@ internal class DefaultGrpcRouterTest { } } + @Nested + inner class SingleRequestMultipleResponseAsyncTest : AbstractGrpcRouterTest() { + @Test + override fun general() { + val streamObserver = mock> { } + createServer().execAndClose(true) { + createClientAsync().singleRequestMultipleResponse(createRequest(), streamObserver) + + val captor = argumentCaptor { } + verify(streamObserver, timeout(60 * 1_000)).onCompleted() + verify(streamObserver, times(2)).onNext(captor.capture()) + verifyNoMoreInteractions(streamObserver) + + assertEquals(2, captor.allValues.size) + captor.allValues.forEachIndexed { index, response -> + assertEquals(index + 1, response.seq) + assertEquals(1, response.origSeq) + } + } + } + + @Test + override fun `delayed server start`() { + val streamObserver = mock> { } + createClientAsync().singleRequestMultipleResponse(createRequest(), streamObserver) + + Thread.sleep(RETRY_TIMEOUT / 2) + + createServer().execAndClose { + val captor = argumentCaptor { } + verify(streamObserver, timeout(60 * 1_000)).onCompleted() + verify(streamObserver, times(2)).onNext(captor.capture()) + verifyNoMoreInteractions(streamObserver) + + assertEquals(2, captor.allValues.size) + captor.allValues.forEachIndexed { index, response -> + assertEquals(index + 1, response.seq) + assertEquals(1, response.origSeq) + } + } + } + + @Test + override fun `cancel retry request`() { + val grpcContext = Context.current().withCancellation() + + val streamObserver = mock> { } + grpcContext.call { + createClientAsync().singleRequestMultipleResponse(createRequest(), streamObserver) + } + + Thread.sleep(RETRY_TIMEOUT / 2) + + assertTrue(grpcContext.cancel(RuntimeException(CANCEL_REASON))) + + val captor = argumentCaptor { } + verify(streamObserver, timeout(60 * 1_000)).onError(captor.capture()) + verifyNoMoreInteractions(streamObserver) + + assertEquals(1, captor.allValues.size) + K_LOGGER.error(captor.firstValue) { "Handle exception" } + + assertException( + captor.firstValue, ExceptionMetadata( + "CANCELLED: Context cancelled", + ExceptionMetadata( + CANCEL_REASON, + ) + ) + ) + } + + @Test + override fun `deadline retry request`() { + val grpcContext = Context.current() + .withDeadline(Deadline.after(RETRY_TIMEOUT / 2, TimeUnit.MILLISECONDS), deadlineExecutor) + + val streamObserver = mock> { } + grpcContext.call { + createClientAsync().singleRequestMultipleResponse(createRequest(), streamObserver) + } + + val captor = argumentCaptor { } + verify(streamObserver, timeout(60 * 1_000)).onError(captor.capture()) + verifyNoMoreInteractions(streamObserver) + + assertEquals(1, captor.allValues.size) + K_LOGGER.error(captor.firstValue) { "Handle exception" } + + assertException( + captor.firstValue, ExceptionMetadata( + "DEADLINE_EXCEEDED: context timed out", + ExceptionMetadata( + "context timed out", + ) + ) + ) + } + + @Test + override fun `interrupt thread during retry request`() { + // this test isn't relevant for async request + } + + @Test + override fun `server terminated intermediate session (retry false)`() { + val exception = `server terminated intermediate session`(false) + K_LOGGER.error(exception) { "Handle exception" } + + assertException( + exception, ExceptionMetadata( + "CANCELLED: $STATUS_DESCRIPTION_OF_INTERRUPTED_REQUEST", + ) + ) + } + + @Test + override fun `server terminated intermediate session (retry true)`() { + val exception = `server terminated intermediate session`(true) + K_LOGGER.error(exception) { "Handle exception" } + + assertException( + exception, ExceptionMetadata( + MID_TRANSFER_FAILURE_EXCEPTION_MESSAGE, + suspended = listOf( + ExceptionMetadata( + "CANCELLED: $STATUS_DESCRIPTION_OF_INTERRUPTED_REQUEST", + ) + ) + ) + ) + } + + private fun `server terminated intermediate session`(retryInterruptedTransaction: Boolean): Throwable { + val streamObserver = mock> { } + val handlerBaton = Baton("handler") + + createServer(completeResponse = false, handlerBaton = handlerBaton).execAndClose(true) { + createClientAsync(retryInterruptedTransaction = retryInterruptedTransaction) + .singleRequestMultipleResponse(createRequest(), streamObserver) + + handlerBaton.get("wait response sent") + Thread.sleep(RETRY_TIMEOUT / 2) + } + + val onErrorCaptor = argumentCaptor { } + verify(streamObserver, timeout(60 * 1_000)).onError(onErrorCaptor.capture()) + val onNextCaptor = argumentCaptor { } + verify(streamObserver, times(2)).onNext(onNextCaptor.capture()) + verifyNoMoreInteractions(streamObserver) + + assertEquals(1, onErrorCaptor.allValues.size) + assertEquals(2, onNextCaptor.allValues.size) + onNextCaptor.allValues.forEachIndexed { index, response -> + assertEquals(index + 1, response.seq) + assertEquals(1, response.origSeq) + } + + return onErrorCaptor.firstValue + } + } + private fun assertInterruptedSync(exception: ExecutionException) { K_LOGGER.error(exception) { "Handle exception" } assertException( exception, ExceptionMetadata( "java.lang.RuntimeException: Can not execute GRPC blocking request", ExceptionMetadata( - "Can not execute GRPC blocking request", + ROOT_RETRY_SYNC_EXCEPTION_MESSAGE, suspended = listOf( ExceptionMetadata( "UNAVAILABLE: io exception", @@ -681,7 +846,7 @@ internal class DefaultGrpcRouterTest { exception, ExceptionMetadata( "java.lang.RuntimeException: Can not execute GRPC blocking request", ExceptionMetadata( - "Can not execute GRPC blocking request", + ROOT_RETRY_SYNC_EXCEPTION_MESSAGE, suspended = listOf( ExceptionMetadata( "UNAVAILABLE: io exception", @@ -710,7 +875,7 @@ internal class DefaultGrpcRouterTest { exception, ExceptionMetadata( "java.lang.RuntimeException: Can not execute GRPC blocking request", ExceptionMetadata( - "Can not execute GRPC blocking request", + ROOT_RETRY_SYNC_EXCEPTION_MESSAGE, suspended = listOf( ExceptionMetadata( "UNAVAILABLE: io exception",