From 0b54a21a3f96c1d3191dc0aa8e6643aee73e365a Mon Sep 17 00:00:00 2001 From: "nikita.smirnov" Date: Wed, 25 Oct 2023 10:25:03 +0400 Subject: [PATCH] [TH2-5107] added integration test for gRPCRouter --- .../grpc/router/impl/DefaultGrpcRouterTest.kt | 235 ++++++++++++++++-- 1 file changed, 211 insertions(+), 24 deletions(-) diff --git a/src/test/kotlin/com/exactpro/th2/common/schema/grpc/router/impl/DefaultGrpcRouterTest.kt b/src/test/kotlin/com/exactpro/th2/common/schema/grpc/router/impl/DefaultGrpcRouterTest.kt index 285af41b..70ad415b 100644 --- a/src/test/kotlin/com/exactpro/th2/common/schema/grpc/router/impl/DefaultGrpcRouterTest.kt +++ b/src/test/kotlin/com/exactpro/th2/common/schema/grpc/router/impl/DefaultGrpcRouterTest.kt @@ -23,6 +23,7 @@ import com.exactpro.th2.common.schema.grpc.configuration.GrpcRouterConfiguration import com.exactpro.th2.common.schema.grpc.configuration.GrpcServerConfiguration import com.exactpro.th2.common.schema.grpc.configuration.GrpcServiceConfiguration import com.exactpro.th2.common.schema.strategy.route.impl.RobinRoutingStrategy +import com.exactpro.th2.common.test.grpc.AsyncTestService import com.exactpro.th2.common.test.grpc.Request import com.exactpro.th2.common.test.grpc.Response import com.exactpro.th2.common.test.grpc.TestGrpc.TestImplBase @@ -39,6 +40,11 @@ import org.junit.jupiter.api.Assertions.assertNull import org.junit.jupiter.api.Nested import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertThrows +import org.mockito.kotlin.argumentCaptor +import org.mockito.kotlin.mock +import org.mockito.kotlin.timeout +import org.mockito.kotlin.verify +import org.mockito.kotlin.verifyNoMoreInteractions import org.testcontainers.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder import java.time.Duration import java.time.Instant @@ -83,7 +89,7 @@ internal open class DefaultGrpcRouterTest { abstract fun `server terminated intermediate session (retry false)`() abstract fun `server terminated intermediate session (retry true)`() - protected fun createClient( + protected fun createClientSync( configuration: GrpcServiceConfiguration = GrpcServiceConfiguration( RobinRoutingStrategy().apply { init(GrpcRawRobinStrategy(listOf("endpoint"))) }, TestService::class.java, @@ -105,6 +111,28 @@ internal open class DefaultGrpcRouterTest { return grpcRouterClient.getService(TestService::class.java) } + protected fun createClientAsync( + configuration: GrpcServiceConfiguration = GrpcServiceConfiguration( + RobinRoutingStrategy().apply { init(GrpcRawRobinStrategy(listOf("endpoint"))) }, + TestService::class.java, + mapOf("endpoint" to GrpcEndpointConfiguration("localhost", SERVER_PORT)) + ), + retryInterruptedTransaction: Boolean = false + ): AsyncTestService { + grpcRouterClient.init( + GrpcConfiguration(services = mapOf("test" to configuration)), + GrpcRouterConfiguration( + retryConfiguration = GrpcRetryConfiguration( + Int.MAX_VALUE, + RETRY_TIMEOUT, + RETRY_TIMEOUT, + retryInterruptedTransaction + ) + ) + ) + return grpcRouterClient.getService(AsyncTestService::class.java) + } + protected fun createServer( configuration: GrpcServerConfiguration = GrpcServerConfiguration( null, @@ -120,12 +148,12 @@ internal open class DefaultGrpcRouterTest { } @Nested - inner class SingleRequestSingleResponseTest : AbstractGrpcRouterTest() { + inner class SingleRequestSingleResponseSyncTest : AbstractGrpcRouterTest() { @Test override fun general() { - createServer().use(true) { + createServer().execAndClose(true) { val response = executor.submit { - return@submit createClient().singleRequestSingleResponse(Request.newBuilder().setSeq(1).build()) + return@submit createClientSync().singleRequestSingleResponse(Request.newBuilder().setSeq(1).build()) }.get(1, TimeUnit.MINUTES) assertEquals(1, response.seq) @@ -137,13 +165,13 @@ internal open class DefaultGrpcRouterTest { val clientServerBaton = Baton("client-server") val future = executor.submit { clientServerBaton.give("client thread started") - return@submit createClient().singleRequestSingleResponse(Request.newBuilder().setSeq(1).build()) + return@submit createClientSync().singleRequestSingleResponse(Request.newBuilder().setSeq(1).build()) } clientServerBaton.get("wait client thread start") Thread.sleep(RETRY_TIMEOUT * 2) - createServer().use { + createServer().execAndClose { val response = future.get(1, TimeUnit.MINUTES) assertEquals(1, response.seq) assertEquals(1, response.origSeq) @@ -162,7 +190,7 @@ internal open class DefaultGrpcRouterTest { clientServerBaton.give("client thread started") grpcContext.get().call { - createClient().singleRequestSingleResponse(Request.newBuilder().setSeq(1).build()) + createClientSync().singleRequestSingleResponse(Request.newBuilder().setSeq(1).build()) } } @@ -211,7 +239,7 @@ internal open class DefaultGrpcRouterTest { Context.current() .withDeadline(Deadline.after(RETRY_TIMEOUT / 2, TimeUnit.MILLISECONDS), deadlineExecutor) .call { - createClient().singleRequestSingleResponse(Request.newBuilder().setSeq(1).build()) + createClientSync().singleRequestSingleResponse(Request.newBuilder().setSeq(1).build()) } } @@ -254,7 +282,7 @@ internal open class DefaultGrpcRouterTest { val future = executor.submit { clientServerBaton.give("client thread started") - createClient().singleRequestSingleResponse(Request.newBuilder().setSeq(1).build()) + createClientSync().singleRequestSingleResponse(Request.newBuilder().setSeq(1).build()) } clientServerBaton.get("wait client thread start") @@ -295,12 +323,12 @@ internal open class DefaultGrpcRouterTest { val clientServerBaton = Baton("client-server") val future = executor.submit { clientServerBaton.giveAndGet("client thread started", "wait server start") - createClient().singleRequestSingleResponse(Request.newBuilder().setSeq(1).build()) + createClientSync().singleRequestSingleResponse(Request.newBuilder().setSeq(1).build()) } val handlerBaton = Baton("handler") - createServer(completeResponse = false, handlerBaton = handlerBaton).use(true) { + createServer(completeResponse = false, handlerBaton = handlerBaton).execAndClose(true) { clientServerBaton.get("wait client thread start") clientServerBaton.give("server started") handlerBaton.get("wait response sent") @@ -331,14 +359,14 @@ internal open class DefaultGrpcRouterTest { val clientServerBaton = Baton("client-server") val future = executor.submit { clientServerBaton.giveAndGet("client thread started", "wait server start") - createClient(retryInterruptedTransaction = true).singleRequestSingleResponse( + createClientSync(retryInterruptedTransaction = true).singleRequestSingleResponse( Request.newBuilder().setSeq(1).build() ) } val handlerBaton = Baton("handler") - createServer(completeResponse = false, handlerBaton = handlerBaton).use(true) { + createServer(completeResponse = false, handlerBaton = handlerBaton).execAndClose(true) { clientServerBaton.get("wait client thread start") clientServerBaton.give("server started") handlerBaton.get("wait response sent") @@ -348,7 +376,7 @@ internal open class DefaultGrpcRouterTest { assertFalse(future.isDone) DefaultGrpcRouter().use { grpcRouter -> - createServer(grpcRouter = grpcRouter).use { + createServer(grpcRouter = grpcRouter).execAndClose { val response = future.get(1, TimeUnit.MINUTES) assertEquals(1, response.seq) assertEquals(1, response.origSeq) @@ -358,12 +386,171 @@ internal open class DefaultGrpcRouterTest { } @Nested - @IntegrationTest - inner class SingleRequestMultipleResponseTest : AbstractGrpcRouterTest() { + inner class SingleRequestSingleResponseAsyncTest : AbstractGrpcRouterTest() { + @Test + override fun general() { + val streamObserver = mock> { } + createServer().execAndClose(true) { + createClientAsync().singleRequestSingleResponse(Request.newBuilder().setSeq(1).build(), streamObserver) + + val captor = argumentCaptor { } + verify(streamObserver, timeout(60 * 1_000)).onCompleted() + verify(streamObserver).onNext(captor.capture()) + verifyNoMoreInteractions(streamObserver) + + assertEquals(1, captor.allValues.size) + assertEquals(1, captor.firstValue.seq) + assertEquals(1, captor.firstValue.origSeq) + } + } + @Test + override fun `delayed server start`() { + val streamObserver = mock> { } + createClientAsync().singleRequestSingleResponse(Request.newBuilder().setSeq(1).build(), streamObserver) + + Thread.sleep(RETRY_TIMEOUT * 2) + + createServer().execAndClose { + val captor = argumentCaptor { } + verify(streamObserver, timeout(60 * 1_000)).onCompleted() + verify(streamObserver).onNext(captor.capture()) + verifyNoMoreInteractions(streamObserver) + + assertEquals(1, captor.allValues.size) + assertEquals(1, captor.firstValue.seq) + assertEquals(1, captor.firstValue.origSeq) + } + } + @Test + override fun `cancel retry request`() { + val grpcContext = Context.current().withCancellation() + + val streamObserver = mock> { } + grpcContext.call { + createClientAsync().singleRequestSingleResponse(Request.newBuilder().setSeq(1).build(), streamObserver) + } + + Thread.sleep(RETRY_TIMEOUT / 2) + + val cancelExceptionMessage = "test request is canceled" + assertTrue(grpcContext.cancel(RuntimeException(cancelExceptionMessage))) + + val captor = argumentCaptor { } + verify(streamObserver, timeout(60 * 1_000)).onError(captor.capture()) + verifyNoMoreInteractions(streamObserver) + + assertEquals(1, captor.allValues.size) + K_LOGGER.error(captor.firstValue) { "Handle exception" } + + assertException( + captor.firstValue, ExceptionMetadata( + "CANCELLED: Context cancelled", + ExceptionMetadata( + cancelExceptionMessage, + ) + ) + ) + } + @Test + override fun `deadline retry request`() { + val grpcContext = Context.current() + .withDeadline(Deadline.after(RETRY_TIMEOUT / 2, TimeUnit.MILLISECONDS), deadlineExecutor) + + val streamObserver = mock> { } + grpcContext.call { + createClientAsync().singleRequestSingleResponse(Request.newBuilder().setSeq(1).build(), streamObserver) + } + + val captor = argumentCaptor { } + verify(streamObserver, timeout(60 * 1_000)).onError(captor.capture()) + verifyNoMoreInteractions(streamObserver) + + assertEquals(1, captor.allValues.size) + K_LOGGER.error(captor.firstValue) { "Handle exception" } + + assertException( + captor.firstValue, ExceptionMetadata( + "DEADLINE_EXCEEDED: context timed out", + ExceptionMetadata( + "context timed out", + ) + ) + ) + } + @Test + override fun `interrupt thread during retry request`() { + // this test isn't relevant for async request + } + @Test + override fun `server terminated intermediate session (retry false)`() { + val streamObserver = mock> { } + val handlerBaton = Baton("handler") + + createServer(completeResponse = false, handlerBaton = handlerBaton).execAndClose(true) { + createClientAsync().singleRequestSingleResponse(Request.newBuilder().setSeq(1).build(), streamObserver) + + handlerBaton.get("wait response sent") + Thread.sleep(RETRY_TIMEOUT / 2) + } + + val captor = argumentCaptor { } + verify(streamObserver, timeout(60 * 1_000)).onError(captor.capture()) + verifyNoMoreInteractions(streamObserver) + + assertEquals(1, captor.allValues.size) + K_LOGGER.error(captor.firstValue) { "Handle exception" } + + assertException( + captor.firstValue, ExceptionMetadata( + "CANCELLED: $STATUS_DESCRIPTION_OF_INTERRUPTED_REQUEST", + ) + ) + } + @Test + override fun `server terminated intermediate session (retry true)`() { + val streamObserver = mock> { } + val handlerBaton = Baton("handler") + +// val clientServerBaton = Baton("client-server") +// val future = executor.submit { +// clientServerBaton.giveAndGet("client thread started", "wait server start") +// createClientSync(retryInterruptedTransaction = true).singleRequestSingleResponse( +// Request.newBuilder().setSeq(1).build() +// ) +// } +// +// val handlerBaton = Baton("handler") + + createServer(completeResponse = false, handlerBaton = handlerBaton).execAndClose(true) { + createClientAsync(retryInterruptedTransaction = true).singleRequestSingleResponse(Request.newBuilder().setSeq(1).build(), streamObserver) + handlerBaton.get("wait response sent") + Thread.sleep(RETRY_TIMEOUT / 2) + } + + verifyNoMoreInteractions(streamObserver) +// assertFalse(future.isDone) + + DefaultGrpcRouter().use { grpcRouter -> + createServer(grpcRouter = grpcRouter).execAndClose { + val captor = argumentCaptor { } + verify(streamObserver, timeout(60 * 1_000)).onCompleted() + verify(streamObserver).onNext(captor.capture()) + verifyNoMoreInteractions(streamObserver) + + assertEquals(1, captor.allValues.size) + assertEquals(1, captor.firstValue.seq) + assertEquals(1, captor.firstValue.origSeq) + } + } + } + } + + @Nested + inner class SingleRequestMultipleResponseSyncTest : AbstractGrpcRouterTest() { @Test override fun general() { - createServer().use { - val responses = createClient().singleRequestMultipleResponse(Request.newBuilder().setSeq(1).build()) + createServer().execAndClose { + val responses = createClientSync().singleRequestMultipleResponse(Request.newBuilder().setSeq(1).build()) .asSequence().toList() assertEquals(2, responses.size) @@ -375,7 +562,7 @@ internal open class DefaultGrpcRouterTest { } @Test override fun `delayed server start`() { - val iterator = createClient().singleRequestMultipleResponse(Request.newBuilder().setSeq(1).build()) + val iterator = createClientSync().singleRequestMultipleResponse(Request.newBuilder().setSeq(1).build()) val exception = assertThrows { // FIXME: gRPC router should retry to resend request when server isn't available. // iterator can throw exception when server disappears in the intermediate of response retransmission @@ -412,13 +599,13 @@ internal open class DefaultGrpcRouterTest { val clientServerBaton = Baton("client-server") val future = executor.submit> { clientServerBaton.giveAndGet("client thread started", "wait server start") - createClient().singleRequestMultipleResponse(Request.newBuilder().setSeq(1).build()) + createClientSync().singleRequestMultipleResponse(Request.newBuilder().setSeq(1).build()) .asSequence().toList() } val handlerBaton = Baton("handler") - createServer(completeResponse = true, handlerBaton = handlerBaton).use(true) { + createServer(completeResponse = true, handlerBaton = handlerBaton).execAndClose(true) { clientServerBaton.get("wait client thread start") clientServerBaton.give("server started") handlerBaton.get("wait response sent") @@ -446,14 +633,14 @@ internal open class DefaultGrpcRouterTest { // FIXME: gRPC router should retry to resend request when server is terminated intermediate handling. // iterator can throw exception when server disappears in the intermediate of response retransmission // and retryInterruptedTransaction false - createClient(retryInterruptedTransaction = true).singleRequestMultipleResponse( + createClientSync(retryInterruptedTransaction = true).singleRequestMultipleResponse( Request.newBuilder().setSeq(1).build() ).asSequence().toList() } val handlerBaton = Baton("handler") - createServer(completeResponse = true, handlerBaton = handlerBaton).use(true) { + createServer(completeResponse = true, handlerBaton = handlerBaton).execAndClose(true) { clientServerBaton.get("wait client thread start") clientServerBaton.give("server started") handlerBaton.get("wait response sent") @@ -480,7 +667,7 @@ internal open class DefaultGrpcRouterTest { protected const val SERVER_PORT = 8080 protected const val RETRY_TIMEOUT = 1_000L - protected inline fun Server.use(force: Boolean = false, func: Server.() -> Unit) { + protected inline fun Server.execAndClose(force: Boolean = false, func: Server.() -> Unit = { }) { try { val startTime = Instant.now() func()