From 9fb6778ce0550ef9f327a4e9d9494bb586e7481c Mon Sep 17 00:00:00 2001 From: "nikita.smirnov" Date: Tue, 24 Oct 2023 13:51:02 +0400 Subject: [PATCH] [TH2-5107] refactored DefaultGrpcRouterTest --- .../grpc/router/impl/DefaultGrpcRouterTest.kt | 690 +++++++++--------- 1 file changed, 355 insertions(+), 335 deletions(-) diff --git a/src/test/kotlin/com/exactpro/th2/common/schema/grpc/router/impl/DefaultGrpcRouterTest.kt b/src/test/kotlin/com/exactpro/th2/common/schema/grpc/router/impl/DefaultGrpcRouterTest.kt index 329e0497..285af41b 100644 --- a/src/test/kotlin/com/exactpro/th2/common/schema/grpc/router/impl/DefaultGrpcRouterTest.kt +++ b/src/test/kotlin/com/exactpro/th2/common/schema/grpc/router/impl/DefaultGrpcRouterTest.kt @@ -36,6 +36,7 @@ import io.grpc.stub.StreamObserver import mu.KotlinLogging import org.junit.jupiter.api.AfterEach import org.junit.jupiter.api.Assertions.assertNull +import org.junit.jupiter.api.Nested import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertThrows import org.testcontainers.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder @@ -45,6 +46,7 @@ import java.util.concurrent.ArrayBlockingQueue import java.util.concurrent.ExecutionException import java.util.concurrent.ExecutorService import java.util.concurrent.Executors +import java.util.concurrent.ScheduledExecutorService import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicReference @@ -54,413 +56,431 @@ import kotlin.test.assertNotNull import kotlin.test.assertTrue @IntegrationTest -internal class DefaultGrpcRouterTest { - - private val grpcRouterClient = DefaultGrpcRouter() - private val grpcRouterServer = DefaultGrpcRouter() - private val executor = Executors.newSingleThreadExecutor( - ThreadFactoryBuilder().setNameFormat("test-%d").build() - ) - private val deadlineExecutor = Executors.newSingleThreadScheduledExecutor( - ThreadFactoryBuilder().setNameFormat("test-deadline-%d").build() - ) - - @AfterEach - fun afterEach() { - grpcRouterServer.close() - grpcRouterClient.close() - executor.shutdownGracefully() - deadlineExecutor.shutdownGracefully() - } - - @Test - fun `single request single response`() { - createServer().use(true) { - val response = executor.submit { - return@submit createClient().singleRequestSingleResponse(Request.newBuilder().setSeq(1).build()) - }.get(1, TimeUnit.MINUTES) +internal open class DefaultGrpcRouterTest { + @IntegrationTest + abstract inner class AbstractGrpcRouterTest { + private val grpcRouterClient = DefaultGrpcRouter() + private val grpcRouterServer = DefaultGrpcRouter() + protected val executor: ExecutorService = Executors.newSingleThreadExecutor( + ThreadFactoryBuilder().setNameFormat("test-%d").build() + ) + protected val deadlineExecutor: ScheduledExecutorService = Executors.newSingleThreadScheduledExecutor( + ThreadFactoryBuilder().setNameFormat("test-deadline-%d").build() + ) - assertEquals(1, response.seq) - assertEquals(1, response.origSeq) + @AfterEach + fun afterEach() { + grpcRouterServer.close() + grpcRouterClient.close() + executor.shutdownGracefully() + deadlineExecutor.shutdownGracefully() } - } - - @Test - fun `single request single response - delayed server start`() { - val clientServerBaton = Baton("client-server") - val future = executor.submit { - clientServerBaton.give("client thread started") - return@submit createClient().singleRequestSingleResponse(Request.newBuilder().setSeq(1).build()) + abstract fun general() + abstract fun `delayed server start`() + abstract fun `cancel retry request`() + abstract fun `deadline retry request`() + abstract fun `interrupt thread during retry request`() + abstract fun `server terminated intermediate session (retry false)`() + abstract fun `server terminated intermediate session (retry true)`() + + protected fun createClient( + configuration: GrpcServiceConfiguration = GrpcServiceConfiguration( + RobinRoutingStrategy().apply { init(GrpcRawRobinStrategy(listOf("endpoint"))) }, + TestService::class.java, + mapOf("endpoint" to GrpcEndpointConfiguration("localhost", SERVER_PORT)) + ), + retryInterruptedTransaction: Boolean = false + ): TestService { + grpcRouterClient.init( + GrpcConfiguration(services = mapOf("test" to configuration)), + GrpcRouterConfiguration( + retryConfiguration = GrpcRetryConfiguration( + Int.MAX_VALUE, + RETRY_TIMEOUT, + RETRY_TIMEOUT, + retryInterruptedTransaction + ) + ) + ) + return grpcRouterClient.getService(TestService::class.java) } - clientServerBaton.get("wait client thread start") - Thread.sleep(RETRY_TIMEOUT * 2) - - createServer().use { - val response = future.get(1, TimeUnit.MINUTES) - assertEquals(1, response.seq) - assertEquals(1, response.origSeq) + protected fun createServer( + configuration: GrpcServerConfiguration = GrpcServerConfiguration( + null, + SERVER_PORT + ), + grpcRouter: DefaultGrpcRouter = grpcRouterServer, + completeResponse: Boolean = true, + handlerBaton: Baton? = null, + ): Server { + grpcRouter.init(GrpcConfiguration(serverConfiguration = configuration), GrpcRouterConfiguration()) + return grpcRouter.startServer(TestServiceHandler(completeResponse, handlerBaton)).apply(Server::start) } } - @Test - fun `single request single response - server is terminated in intermediate session (retry false)`() { - val clientServerBaton = Baton("client-server") - val future = executor.submit { - clientServerBaton.giveAndGet("client thread started", "wait server start") - createClient().singleRequestSingleResponse(Request.newBuilder().setSeq(1).build()) - } - - val handlerBaton = Baton("handler") - - createServer(completeResponse = false, handlerBaton = handlerBaton).use(true) { - clientServerBaton.get("wait client thread start") - clientServerBaton.give("server started") - handlerBaton.get("wait response sent") - Thread.sleep(RETRY_TIMEOUT / 2) - } - - val exception = assertThrows { - future.get(1, TimeUnit.MINUTES) - } - K_LOGGER.error(exception) { "Handle exception" } - - assertException( - exception, ExceptionMetadata( - "java.lang.RuntimeException: Can not execute GRPC blocking request", - ExceptionMetadata( - "Can not execute GRPC blocking request", - suspended = listOf( - ExceptionMetadata( - "CANCELLED: $STATUS_DESCRIPTION_OF_INTERRUPTED_REQUEST", - ), - ) - ), - ) - ) - } + @Nested + inner class SingleRequestSingleResponseTest : AbstractGrpcRouterTest() { + @Test + override fun general() { + createServer().use(true) { + val response = executor.submit { + return@submit createClient().singleRequestSingleResponse(Request.newBuilder().setSeq(1).build()) + }.get(1, TimeUnit.MINUTES) - @Test - fun `single request single response - server is terminated in intermediate session (retry true)`() { - val clientServerBaton = Baton("client-server") - val future = executor.submit { - clientServerBaton.giveAndGet("client thread started", "wait server start") - createClient(retryInterruptedTransaction = true).singleRequestSingleResponse( - Request.newBuilder().setSeq(1).build() - ) + assertEquals(1, response.seq) + assertEquals(1, response.origSeq) + } } + @Test + override fun `delayed server start`() { + val clientServerBaton = Baton("client-server") + val future = executor.submit { + clientServerBaton.give("client thread started") + return@submit createClient().singleRequestSingleResponse(Request.newBuilder().setSeq(1).build()) + } - val handlerBaton = Baton("handler") - - createServer(completeResponse = false, handlerBaton = handlerBaton).use(true) { clientServerBaton.get("wait client thread start") - clientServerBaton.give("server started") - handlerBaton.get("wait response sent") - Thread.sleep(RETRY_TIMEOUT / 2) - } + Thread.sleep(RETRY_TIMEOUT * 2) - assertFalse(future.isDone) - - DefaultGrpcRouter().use { grpcRouter -> - createServer(grpcRouter = grpcRouter).use { + createServer().use { val response = future.get(1, TimeUnit.MINUTES) assertEquals(1, response.seq) assertEquals(1, response.origSeq) } } - } + @Test + override fun `cancel retry request`() { + val clientServerBaton = Baton("client-server") + val grpcContext = AtomicReference() + + val future = executor.submit { + grpcContext.set( + Context.current() + .withCancellation() + ) - @Test - fun `single request single response - cancel request`() { - val clientServerBaton = Baton("client-server") - val grpcContext = AtomicReference() + clientServerBaton.give("client thread started") + grpcContext.get().call { + createClient().singleRequestSingleResponse(Request.newBuilder().setSeq(1).build()) + } + } - val future = executor.submit { - grpcContext.set( - Context.current() - .withCancellation() - ) + clientServerBaton.get("wait client thread start") + Thread.sleep(RETRY_TIMEOUT / 2) + val cancelExceptionMessage = "test request is canceled" + assertTrue(grpcContext.get().cancel(RuntimeException(cancelExceptionMessage))) - clientServerBaton.give("client thread started") - grpcContext.get().call { - createClient().singleRequestSingleResponse(Request.newBuilder().setSeq(1).build()) + val exception = assertThrows { + future.get(1, TimeUnit.MINUTES) } + K_LOGGER.error(exception) { "Handle exception" } + + assertException( + exception, ExceptionMetadata( + "java.lang.RuntimeException: Can not execute GRPC blocking request", + ExceptionMetadata( + "Can not execute GRPC blocking request", + suspended = listOf( + ExceptionMetadata( + "UNAVAILABLE: io exception", + ExceptionMetadata( + "Connection refused: localhost/127.0.0.1:8080", + ExceptionMetadata( + "Connection refused" + ) + ), + ), + ExceptionMetadata( + "CANCELLED: Context cancelled", + ExceptionMetadata( + cancelExceptionMessage, + ) + ), + ) + ) + ) + ) } + @Test + override fun `deadline retry request`() { + val clientServerBaton = Baton("client-server") - clientServerBaton.get("wait client thread start") - Thread.sleep(RETRY_TIMEOUT / 2) - val cancelExceptionMessage = "test request is canceled" - assertTrue(grpcContext.get().cancel(RuntimeException(cancelExceptionMessage))) + val future = executor.submit { + clientServerBaton.give("client thread started") + Context.current() + .withDeadline(Deadline.after(RETRY_TIMEOUT / 2, TimeUnit.MILLISECONDS), deadlineExecutor) + .call { + createClient().singleRequestSingleResponse(Request.newBuilder().setSeq(1).build()) + } + } - val exception = assertThrows { - future.get(1, TimeUnit.MINUTES) - } - K_LOGGER.error(exception) { "Handle exception" } - - assertException( - exception, ExceptionMetadata( - "java.lang.RuntimeException: Can not execute GRPC blocking request", - ExceptionMetadata( - "Can not execute GRPC blocking request", - suspended = listOf( - ExceptionMetadata( - "UNAVAILABLE: io exception", + clientServerBaton.get("wait client thread start") + + val exception = assertThrows { + future.get(1, TimeUnit.MINUTES) + } + K_LOGGER.error(exception) { "Handle exception" } + + assertException( + exception, ExceptionMetadata( + "java.lang.RuntimeException: Can not execute GRPC blocking request", + ExceptionMetadata( + "Can not execute GRPC blocking request", + suspended = listOf( ExceptionMetadata( - "Connection refused: localhost/127.0.0.1:8080", + "UNAVAILABLE: io exception", ExceptionMetadata( - "Connection refused" - ) + "Connection refused: localhost/127.0.0.1:8080", + ExceptionMetadata( + "Connection refused" + ) + ), ), - ), - ExceptionMetadata( - "CANCELLED: Context cancelled", ExceptionMetadata( - cancelExceptionMessage, - ) - ), + "DEADLINE_EXCEEDED: context timed out", + ExceptionMetadata( + "context timed out", + ) + ), + ) ) ) ) - ) - } + } + @Test + override fun `interrupt thread during retry request`() { + val clientServerBaton = Baton("client-server") - @Test - fun `single request single response - deadline request`() { - val clientServerBaton = Baton("client-server") + val future = executor.submit { + clientServerBaton.give("client thread started") + createClient().singleRequestSingleResponse(Request.newBuilder().setSeq(1).build()) + } - val future = executor.submit { - clientServerBaton.give("client thread started") - Context.current() - .withDeadline(Deadline.after(RETRY_TIMEOUT / 2, TimeUnit.MILLISECONDS), deadlineExecutor) - .call { - createClient().singleRequestSingleResponse(Request.newBuilder().setSeq(1).build()) - } - } + clientServerBaton.get("wait client thread start") + Thread.sleep(RETRY_TIMEOUT / 2) - clientServerBaton.get("wait client thread start") + assertEquals(0, executor.shutdownNow().size) - val exception = assertThrows { - future.get(1, TimeUnit.MINUTES) - } - K_LOGGER.error(exception) { "Handle exception" } - - assertException( - exception, ExceptionMetadata( - "java.lang.RuntimeException: Can not execute GRPC blocking request", - ExceptionMetadata( - "Can not execute GRPC blocking request", - suspended = listOf( - ExceptionMetadata( - "UNAVAILABLE: io exception", + val exception = assertThrows { + future.get(1, TimeUnit.MINUTES) + } + K_LOGGER.error(exception) { "Handle exception" } + + assertException( + exception, ExceptionMetadata( + "java.lang.RuntimeException: Can not execute GRPC blocking request", + ExceptionMetadata( + "Can not execute GRPC blocking request", + suspended = listOf( ExceptionMetadata( - "Connection refused: localhost/127.0.0.1:8080", + "UNAVAILABLE: io exception", ExceptionMetadata( - "Connection refused" - ) + "Connection refused: localhost/127.0.0.1:8080", + ExceptionMetadata( + "Connection refused" + ) + ), ), - ), - ExceptionMetadata( - "DEADLINE_EXCEEDED: context timed out", ExceptionMetadata( - "context timed out", - ) - ), + "sleep interrupted" + ), + ) ) ) ) - ) - } - - @Test - fun `single request single response - interrupt thread`() { - val clientServerBaton = Baton("client-server") - - val future = executor.submit { - clientServerBaton.give("client thread started") - createClient().singleRequestSingleResponse(Request.newBuilder().setSeq(1).build()) } + @Test + override fun `server terminated intermediate session (retry false)`() { + val clientServerBaton = Baton("client-server") + val future = executor.submit { + clientServerBaton.giveAndGet("client thread started", "wait server start") + createClient().singleRequestSingleResponse(Request.newBuilder().setSeq(1).build()) + } - clientServerBaton.get("wait client thread start") - Thread.sleep(RETRY_TIMEOUT / 2) + val handlerBaton = Baton("handler") - assertEquals(0, executor.shutdownNow().size) + createServer(completeResponse = false, handlerBaton = handlerBaton).use(true) { + clientServerBaton.get("wait client thread start") + clientServerBaton.give("server started") + handlerBaton.get("wait response sent") + Thread.sleep(RETRY_TIMEOUT / 2) + } - val exception = assertThrows { - future.get(1, TimeUnit.MINUTES) - } - K_LOGGER.error(exception) { "Handle exception" } - - assertException( - exception, ExceptionMetadata( - "java.lang.RuntimeException: Can not execute GRPC blocking request", - ExceptionMetadata( - "Can not execute GRPC blocking request", - suspended = listOf( - ExceptionMetadata( - "UNAVAILABLE: io exception", + val exception = assertThrows { + future.get(1, TimeUnit.MINUTES) + } + K_LOGGER.error(exception) { "Handle exception" } + + assertException( + exception, ExceptionMetadata( + "java.lang.RuntimeException: Can not execute GRPC blocking request", + ExceptionMetadata( + "Can not execute GRPC blocking request", + suspended = listOf( ExceptionMetadata( - "Connection refused: localhost/127.0.0.1:8080", - ExceptionMetadata( - "Connection refused" - ) + "CANCELLED: $STATUS_DESCRIPTION_OF_INTERRUPTED_REQUEST", ), - ), - ExceptionMetadata( - "sleep interrupted" - ), - ) + ) + ), ) ) - ) - } + } + @Test + override fun `server terminated intermediate session (retry true)`() { + val clientServerBaton = Baton("client-server") + val future = executor.submit { + clientServerBaton.giveAndGet("client thread started", "wait server start") + createClient(retryInterruptedTransaction = true).singleRequestSingleResponse( + Request.newBuilder().setSeq(1).build() + ) + } + + val handlerBaton = Baton("handler") + createServer(completeResponse = false, handlerBaton = handlerBaton).use(true) { + clientServerBaton.get("wait client thread start") + clientServerBaton.give("server started") + handlerBaton.get("wait response sent") + Thread.sleep(RETRY_TIMEOUT / 2) + } - @Test - fun `single request multiple response`() { - createServer().use { - val responses = createClient().singleRequestMultipleResponse(Request.newBuilder().setSeq(1).build()) - .asSequence().toList() + assertFalse(future.isDone) - assertEquals(2, responses.size) - responses.forEachIndexed { index, response -> - assertEquals(index + 1, response.seq) - assertEquals(1, response.origSeq) + DefaultGrpcRouter().use { grpcRouter -> + createServer(grpcRouter = grpcRouter).use { + val response = future.get(1, TimeUnit.MINUTES) + assertEquals(1, response.seq) + assertEquals(1, response.origSeq) + } } } } - @Test - fun `single request multiple response - delayed server start`() { - val iterator = createClient().singleRequestMultipleResponse(Request.newBuilder().setSeq(1).build()) - val exception = assertThrows { - // FIXME: gRPC router should retry to resend request when server isn't available. - // iterator can throw exception when server disappears in the intermediate of response retransmission - // and retryInterruptedTransaction false - iterator.hasNext() + @Nested + @IntegrationTest + inner class SingleRequestMultipleResponseTest : AbstractGrpcRouterTest() { + @Test + override fun general() { + createServer().use { + val responses = createClient().singleRequestMultipleResponse(Request.newBuilder().setSeq(1).build()) + .asSequence().toList() + + assertEquals(2, responses.size) + responses.forEachIndexed { index, response -> + assertEquals(index + 1, response.seq) + assertEquals(1, response.origSeq) + } + } } - K_LOGGER.error(exception) { "Handle exception" } - assertException( - exception, ExceptionMetadata( - "UNAVAILABLE: io exception", - cause = ExceptionMetadata( - "Connection refused: localhost/127.0.0.1:8080", + @Test + override fun `delayed server start`() { + val iterator = createClient().singleRequestMultipleResponse(Request.newBuilder().setSeq(1).build()) + val exception = assertThrows { + // FIXME: gRPC router should retry to resend request when server isn't available. + // iterator can throw exception when server disappears in the intermediate of response retransmission + // and retryInterruptedTransaction false + iterator.hasNext() + } + K_LOGGER.error(exception) { "Handle exception" } + assertException( + exception, ExceptionMetadata( + "UNAVAILABLE: io exception", cause = ExceptionMetadata( - "Connection refused" + "Connection refused: localhost/127.0.0.1:8080", + cause = ExceptionMetadata( + "Connection refused" + ) ) ) ) - ) - } - - @Test - fun `single request multiple response - server terminated intermediate session (retry false)`() { - val clientServerBaton = Baton("client-server") - val future = executor.submit> { - clientServerBaton.giveAndGet("client thread started", "wait server start") - createClient().singleRequestMultipleResponse(Request.newBuilder().setSeq(1).build()) - .asSequence().toList() } - - val handlerBaton = Baton("handler") - - createServer(completeResponse = true, handlerBaton = handlerBaton).use(true) { - clientServerBaton.get("wait client thread start") - clientServerBaton.give("server started") - handlerBaton.get("wait response sent") - Thread.sleep(RETRY_TIMEOUT / 2) + @Test + override fun `cancel retry request`() { + // FIXME: implement after retry implementing in the `delayed server start` case } - - val exception = assertThrows { - future.get(1, TimeUnit.MINUTES) + @Test + override fun `deadline retry request`() { + // FIXME: implement after retry implementing in the `delayed server start` case } - K_LOGGER.error(exception) { "Handle exception" } - assertException( - exception, ExceptionMetadata( - "io.grpc.StatusRuntimeException: CANCELLED: $STATUS_DESCRIPTION_OF_INTERRUPTED_REQUEST", - ExceptionMetadata( - "CANCELLED: $STATUS_DESCRIPTION_OF_INTERRUPTED_REQUEST", - ) - ) - ) - } - - @Test - fun `single request multiple response - server terminated intermediate session (retry true)`() { - val clientServerBaton = Baton("client-server") - val future = executor.submit> { - clientServerBaton.giveAndGet("client thread started", "wait server start") - // FIXME: gRPC router should retry to resend request when server is terminated intermediate handling. - // iterator can throw exception when server disappears in the intermediate of response retransmission - // and retryInterruptedTransaction false - createClient(retryInterruptedTransaction = true).singleRequestMultipleResponse( - Request.newBuilder().setSeq(1).build() - ).asSequence().toList() + @Test + override fun `interrupt thread during retry request`() { + // FIXME: implement after retry implementing in the `delayed server start` case } + @Test + override fun `server terminated intermediate session (retry false)`() { + val clientServerBaton = Baton("client-server") + val future = executor.submit> { + clientServerBaton.giveAndGet("client thread started", "wait server start") + createClient().singleRequestMultipleResponse(Request.newBuilder().setSeq(1).build()) + .asSequence().toList() + } - val handlerBaton = Baton("handler") + val handlerBaton = Baton("handler") - createServer(completeResponse = true, handlerBaton = handlerBaton).use(true) { - clientServerBaton.get("wait client thread start") - clientServerBaton.give("server started") - handlerBaton.get("wait response sent") - Thread.sleep(RETRY_TIMEOUT / 2) - } + createServer(completeResponse = true, handlerBaton = handlerBaton).use(true) { + clientServerBaton.get("wait client thread start") + clientServerBaton.give("server started") + handlerBaton.get("wait response sent") + Thread.sleep(RETRY_TIMEOUT / 2) + } - val exception = assertThrows { - future.get(1, TimeUnit.MINUTES) - } - K_LOGGER.error(exception) { "Handle exception" } - assertException( - exception, ExceptionMetadata( - "io.grpc.StatusRuntimeException: CANCELLED: $STATUS_DESCRIPTION_OF_INTERRUPTED_REQUEST", - ExceptionMetadata( - "CANCELLED: $STATUS_DESCRIPTION_OF_INTERRUPTED_REQUEST", + val exception = assertThrows { + future.get(1, TimeUnit.MINUTES) + } + K_LOGGER.error(exception) { "Handle exception" } + assertException( + exception, ExceptionMetadata( + "io.grpc.StatusRuntimeException: CANCELLED: $STATUS_DESCRIPTION_OF_INTERRUPTED_REQUEST", + ExceptionMetadata( + "CANCELLED: $STATUS_DESCRIPTION_OF_INTERRUPTED_REQUEST", + ) ) ) - ) - } + } + @Test + override fun `server terminated intermediate session (retry true)`() { + val clientServerBaton = Baton("client-server") + val future = executor.submit> { + clientServerBaton.giveAndGet("client thread started", "wait server start") + // FIXME: gRPC router should retry to resend request when server is terminated intermediate handling. + // iterator can throw exception when server disappears in the intermediate of response retransmission + // and retryInterruptedTransaction false + createClient(retryInterruptedTransaction = true).singleRequestMultipleResponse( + Request.newBuilder().setSeq(1).build() + ).asSequence().toList() + } + + val handlerBaton = Baton("handler") - private fun createClient( - configuration: GrpcServiceConfiguration = GrpcServiceConfiguration( - RobinRoutingStrategy().apply { init(GrpcRawRobinStrategy(listOf("endpoint"))) }, - TestService::class.java, - mapOf("endpoint" to GrpcEndpointConfiguration("localhost", SERVER_PORT)) - ), - retryInterruptedTransaction: Boolean = false - ): TestService { - grpcRouterClient.init( - GrpcConfiguration(services = mapOf("test" to configuration)), - GrpcRouterConfiguration( - retryConfiguration = GrpcRetryConfiguration( - Int.MAX_VALUE, - RETRY_TIMEOUT, - RETRY_TIMEOUT, - retryInterruptedTransaction + createServer(completeResponse = true, handlerBaton = handlerBaton).use(true) { + clientServerBaton.get("wait client thread start") + clientServerBaton.give("server started") + handlerBaton.get("wait response sent") + Thread.sleep(RETRY_TIMEOUT / 2) + } + + val exception = assertThrows { + future.get(1, TimeUnit.MINUTES) + } + K_LOGGER.error(exception) { "Handle exception" } + assertException( + exception, ExceptionMetadata( + "io.grpc.StatusRuntimeException: CANCELLED: $STATUS_DESCRIPTION_OF_INTERRUPTED_REQUEST", + ExceptionMetadata( + "CANCELLED: $STATUS_DESCRIPTION_OF_INTERRUPTED_REQUEST", + ) ) ) - ) - return grpcRouterClient.getService(TestService::class.java) - } - - private fun createServer( - configuration: GrpcServerConfiguration = GrpcServerConfiguration( - null, - SERVER_PORT - ), - grpcRouter: DefaultGrpcRouter = grpcRouterServer, - completeResponse: Boolean = true, - handlerBaton: Baton? = null, - ): Server { - grpcRouter.init(GrpcConfiguration(serverConfiguration = configuration), GrpcRouterConfiguration()) - return grpcRouter.startServer(TestServiceHandler(completeResponse, handlerBaton)).apply(Server::start) + } } companion object { - private val K_LOGGER = KotlinLogging.logger { } + protected val K_LOGGER = KotlinLogging.logger { } - private const val SERVER_PORT = 8080 - private const val RETRY_TIMEOUT = 1_000L - private inline fun Server.use(force: Boolean = false, func: Server.() -> Unit) { + protected const val SERVER_PORT = 8080 + protected const val RETRY_TIMEOUT = 1_000L + protected inline fun Server.use(force: Boolean = false, func: Server.() -> Unit) { try { val startTime = Instant.now() func() @@ -478,7 +498,7 @@ internal class DefaultGrpcRouterTest { } } - private fun assertException( + protected fun assertException( exception: Throwable, exceptionMetadata: ExceptionMetadata, path: List = emptyList() @@ -512,10 +532,10 @@ internal class DefaultGrpcRouterTest { } ?: assertNull(exception.cause, "Cause for exception: $exception, path: ${path.printAsStackTrace()}") } - private fun List.printAsStackTrace() = asSequence() + protected fun List.printAsStackTrace() = asSequence() .joinToString(separator = "\n -> ", prefix = "\n -> ") - private fun ExecutorService.shutdownGracefully() { + protected fun ExecutorService.shutdownGracefully() { shutdown() if (!awaitTermination(1, TimeUnit.SECONDS)) { shutdownNow() @@ -523,13 +543,13 @@ internal class DefaultGrpcRouterTest { } } - private class ExceptionMetadata( + internal class ExceptionMetadata( val message: String? = null, val cause: ExceptionMetadata? = null, val suspended: List? = null ) - private class Baton( + internal class Baton( private val name: String ) { private val queue = ArrayBlockingQueue(1).apply { put(Any()) } @@ -552,7 +572,7 @@ internal class DefaultGrpcRouterTest { } } - private class TestServiceHandler( + protected class TestServiceHandler( private val complete: Boolean = true, private val responseBaton: Baton? = null, ) : TestImplBase() {