Skip to content

Commit

Permalink
[TH2-5107] added integration test for gRPCRouter
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita-Smirnov-Exactpro committed Oct 25, 2023
1 parent 9fb6778 commit 0b54a21
Showing 1 changed file with 211 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import com.exactpro.th2.common.schema.grpc.configuration.GrpcRouterConfiguration
import com.exactpro.th2.common.schema.grpc.configuration.GrpcServerConfiguration
import com.exactpro.th2.common.schema.grpc.configuration.GrpcServiceConfiguration
import com.exactpro.th2.common.schema.strategy.route.impl.RobinRoutingStrategy
import com.exactpro.th2.common.test.grpc.AsyncTestService
import com.exactpro.th2.common.test.grpc.Request
import com.exactpro.th2.common.test.grpc.Response
import com.exactpro.th2.common.test.grpc.TestGrpc.TestImplBase
Expand All @@ -39,6 +40,11 @@ import org.junit.jupiter.api.Assertions.assertNull
import org.junit.jupiter.api.Nested
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertThrows
import org.mockito.kotlin.argumentCaptor
import org.mockito.kotlin.mock
import org.mockito.kotlin.timeout
import org.mockito.kotlin.verify
import org.mockito.kotlin.verifyNoMoreInteractions
import org.testcontainers.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder
import java.time.Duration
import java.time.Instant
Expand Down Expand Up @@ -83,7 +89,7 @@ internal open class DefaultGrpcRouterTest {
abstract fun `server terminated intermediate session (retry false)`()
abstract fun `server terminated intermediate session (retry true)`()

protected fun createClient(
protected fun createClientSync(
configuration: GrpcServiceConfiguration = GrpcServiceConfiguration(
RobinRoutingStrategy().apply { init(GrpcRawRobinStrategy(listOf("endpoint"))) },
TestService::class.java,
Expand All @@ -105,6 +111,28 @@ internal open class DefaultGrpcRouterTest {
return grpcRouterClient.getService(TestService::class.java)
}

protected fun createClientAsync(
configuration: GrpcServiceConfiguration = GrpcServiceConfiguration(
RobinRoutingStrategy().apply { init(GrpcRawRobinStrategy(listOf("endpoint"))) },
TestService::class.java,
mapOf("endpoint" to GrpcEndpointConfiguration("localhost", SERVER_PORT))
),
retryInterruptedTransaction: Boolean = false
): AsyncTestService {
grpcRouterClient.init(
GrpcConfiguration(services = mapOf("test" to configuration)),
GrpcRouterConfiguration(
retryConfiguration = GrpcRetryConfiguration(
Int.MAX_VALUE,
RETRY_TIMEOUT,
RETRY_TIMEOUT,
retryInterruptedTransaction
)
)
)
return grpcRouterClient.getService(AsyncTestService::class.java)
}

protected fun createServer(
configuration: GrpcServerConfiguration = GrpcServerConfiguration(
null,
Expand All @@ -120,12 +148,12 @@ internal open class DefaultGrpcRouterTest {
}

@Nested
inner class SingleRequestSingleResponseTest : AbstractGrpcRouterTest() {
inner class SingleRequestSingleResponseSyncTest : AbstractGrpcRouterTest() {
@Test
override fun general() {
createServer().use(true) {
createServer().execAndClose(true) {
val response = executor.submit<Response> {
return@submit createClient().singleRequestSingleResponse(Request.newBuilder().setSeq(1).build())
return@submit createClientSync().singleRequestSingleResponse(Request.newBuilder().setSeq(1).build())
}.get(1, TimeUnit.MINUTES)

assertEquals(1, response.seq)
Expand All @@ -137,13 +165,13 @@ internal open class DefaultGrpcRouterTest {
val clientServerBaton = Baton("client-server")
val future = executor.submit<Response> {
clientServerBaton.give("client thread started")
return@submit createClient().singleRequestSingleResponse(Request.newBuilder().setSeq(1).build())
return@submit createClientSync().singleRequestSingleResponse(Request.newBuilder().setSeq(1).build())
}

clientServerBaton.get("wait client thread start")
Thread.sleep(RETRY_TIMEOUT * 2)

createServer().use {
createServer().execAndClose {
val response = future.get(1, TimeUnit.MINUTES)
assertEquals(1, response.seq)
assertEquals(1, response.origSeq)
Expand All @@ -162,7 +190,7 @@ internal open class DefaultGrpcRouterTest {

clientServerBaton.give("client thread started")
grpcContext.get().call {
createClient().singleRequestSingleResponse(Request.newBuilder().setSeq(1).build())
createClientSync().singleRequestSingleResponse(Request.newBuilder().setSeq(1).build())
}
}

Expand Down Expand Up @@ -211,7 +239,7 @@ internal open class DefaultGrpcRouterTest {
Context.current()
.withDeadline(Deadline.after(RETRY_TIMEOUT / 2, TimeUnit.MILLISECONDS), deadlineExecutor)
.call {
createClient().singleRequestSingleResponse(Request.newBuilder().setSeq(1).build())
createClientSync().singleRequestSingleResponse(Request.newBuilder().setSeq(1).build())
}
}

Expand Down Expand Up @@ -254,7 +282,7 @@ internal open class DefaultGrpcRouterTest {

val future = executor.submit<Response> {
clientServerBaton.give("client thread started")
createClient().singleRequestSingleResponse(Request.newBuilder().setSeq(1).build())
createClientSync().singleRequestSingleResponse(Request.newBuilder().setSeq(1).build())
}

clientServerBaton.get("wait client thread start")
Expand Down Expand Up @@ -295,12 +323,12 @@ internal open class DefaultGrpcRouterTest {
val clientServerBaton = Baton("client-server")
val future = executor.submit<Response> {
clientServerBaton.giveAndGet("client thread started", "wait server start")
createClient().singleRequestSingleResponse(Request.newBuilder().setSeq(1).build())
createClientSync().singleRequestSingleResponse(Request.newBuilder().setSeq(1).build())
}

val handlerBaton = Baton("handler")

createServer(completeResponse = false, handlerBaton = handlerBaton).use(true) {
createServer(completeResponse = false, handlerBaton = handlerBaton).execAndClose(true) {
clientServerBaton.get("wait client thread start")
clientServerBaton.give("server started")
handlerBaton.get("wait response sent")
Expand Down Expand Up @@ -331,14 +359,14 @@ internal open class DefaultGrpcRouterTest {
val clientServerBaton = Baton("client-server")
val future = executor.submit<Response> {
clientServerBaton.giveAndGet("client thread started", "wait server start")
createClient(retryInterruptedTransaction = true).singleRequestSingleResponse(
createClientSync(retryInterruptedTransaction = true).singleRequestSingleResponse(
Request.newBuilder().setSeq(1).build()
)
}

val handlerBaton = Baton("handler")

createServer(completeResponse = false, handlerBaton = handlerBaton).use(true) {
createServer(completeResponse = false, handlerBaton = handlerBaton).execAndClose(true) {
clientServerBaton.get("wait client thread start")
clientServerBaton.give("server started")
handlerBaton.get("wait response sent")
Expand All @@ -348,7 +376,7 @@ internal open class DefaultGrpcRouterTest {
assertFalse(future.isDone)

DefaultGrpcRouter().use { grpcRouter ->
createServer(grpcRouter = grpcRouter).use {
createServer(grpcRouter = grpcRouter).execAndClose {
val response = future.get(1, TimeUnit.MINUTES)
assertEquals(1, response.seq)
assertEquals(1, response.origSeq)
Expand All @@ -358,12 +386,171 @@ internal open class DefaultGrpcRouterTest {
}

@Nested
@IntegrationTest
inner class SingleRequestMultipleResponseTest : AbstractGrpcRouterTest() {
inner class SingleRequestSingleResponseAsyncTest : AbstractGrpcRouterTest() {
@Test
override fun general() {
val streamObserver = mock<StreamObserver<Response>> { }
createServer().execAndClose(true) {
createClientAsync().singleRequestSingleResponse(Request.newBuilder().setSeq(1).build(), streamObserver)

val captor = argumentCaptor<Response> { }
verify(streamObserver, timeout(60 * 1_000)).onCompleted()
verify(streamObserver).onNext(captor.capture())
verifyNoMoreInteractions(streamObserver)

assertEquals(1, captor.allValues.size)
assertEquals(1, captor.firstValue.seq)
assertEquals(1, captor.firstValue.origSeq)
}
}
@Test
override fun `delayed server start`() {
val streamObserver = mock<StreamObserver<Response>> { }
createClientAsync().singleRequestSingleResponse(Request.newBuilder().setSeq(1).build(), streamObserver)

Thread.sleep(RETRY_TIMEOUT * 2)

createServer().execAndClose {
val captor = argumentCaptor<Response> { }
verify(streamObserver, timeout(60 * 1_000)).onCompleted()
verify(streamObserver).onNext(captor.capture())
verifyNoMoreInteractions(streamObserver)

assertEquals(1, captor.allValues.size)
assertEquals(1, captor.firstValue.seq)
assertEquals(1, captor.firstValue.origSeq)
}
}
@Test
override fun `cancel retry request`() {
val grpcContext = Context.current().withCancellation()

val streamObserver = mock<StreamObserver<Response>> { }
grpcContext.call {
createClientAsync().singleRequestSingleResponse(Request.newBuilder().setSeq(1).build(), streamObserver)
}

Thread.sleep(RETRY_TIMEOUT / 2)

val cancelExceptionMessage = "test request is canceled"
assertTrue(grpcContext.cancel(RuntimeException(cancelExceptionMessage)))

val captor = argumentCaptor<Throwable> { }
verify(streamObserver, timeout(60 * 1_000)).onError(captor.capture())
verifyNoMoreInteractions(streamObserver)

assertEquals(1, captor.allValues.size)
K_LOGGER.error(captor.firstValue) { "Handle exception" }

assertException(
captor.firstValue, ExceptionMetadata(
"CANCELLED: Context cancelled",
ExceptionMetadata(
cancelExceptionMessage,
)
)
)
}
@Test
override fun `deadline retry request`() {
val grpcContext = Context.current()
.withDeadline(Deadline.after(RETRY_TIMEOUT / 2, TimeUnit.MILLISECONDS), deadlineExecutor)

val streamObserver = mock<StreamObserver<Response>> { }
grpcContext.call {
createClientAsync().singleRequestSingleResponse(Request.newBuilder().setSeq(1).build(), streamObserver)
}

val captor = argumentCaptor<Throwable> { }
verify(streamObserver, timeout(60 * 1_000)).onError(captor.capture())
verifyNoMoreInteractions(streamObserver)

assertEquals(1, captor.allValues.size)
K_LOGGER.error(captor.firstValue) { "Handle exception" }

assertException(
captor.firstValue, ExceptionMetadata(
"DEADLINE_EXCEEDED: context timed out",
ExceptionMetadata(
"context timed out",
)
)
)
}
@Test
override fun `interrupt thread during retry request`() {
// this test isn't relevant for async request
}
@Test
override fun `server terminated intermediate session (retry false)`() {
val streamObserver = mock<StreamObserver<Response>> { }
val handlerBaton = Baton("handler")

createServer(completeResponse = false, handlerBaton = handlerBaton).execAndClose(true) {
createClientAsync().singleRequestSingleResponse(Request.newBuilder().setSeq(1).build(), streamObserver)

handlerBaton.get("wait response sent")
Thread.sleep(RETRY_TIMEOUT / 2)
}

val captor = argumentCaptor<Throwable> { }
verify(streamObserver, timeout(60 * 1_000)).onError(captor.capture())
verifyNoMoreInteractions(streamObserver)

assertEquals(1, captor.allValues.size)
K_LOGGER.error(captor.firstValue) { "Handle exception" }

assertException(
captor.firstValue, ExceptionMetadata(
"CANCELLED: $STATUS_DESCRIPTION_OF_INTERRUPTED_REQUEST",
)
)
}
@Test
override fun `server terminated intermediate session (retry true)`() {
val streamObserver = mock<StreamObserver<Response>> { }
val handlerBaton = Baton("handler")

// val clientServerBaton = Baton("client-server")
// val future = executor.submit<Response> {
// clientServerBaton.giveAndGet("client thread started", "wait server start")
// createClientSync(retryInterruptedTransaction = true).singleRequestSingleResponse(
// Request.newBuilder().setSeq(1).build()
// )
// }
//
// val handlerBaton = Baton("handler")

createServer(completeResponse = false, handlerBaton = handlerBaton).execAndClose(true) {
createClientAsync(retryInterruptedTransaction = true).singleRequestSingleResponse(Request.newBuilder().setSeq(1).build(), streamObserver)
handlerBaton.get("wait response sent")
Thread.sleep(RETRY_TIMEOUT / 2)
}

verifyNoMoreInteractions(streamObserver)
// assertFalse(future.isDone)

DefaultGrpcRouter().use { grpcRouter ->
createServer(grpcRouter = grpcRouter).execAndClose {
val captor = argumentCaptor<Response> { }
verify(streamObserver, timeout(60 * 1_000)).onCompleted()
verify(streamObserver).onNext(captor.capture())
verifyNoMoreInteractions(streamObserver)

assertEquals(1, captor.allValues.size)
assertEquals(1, captor.firstValue.seq)
assertEquals(1, captor.firstValue.origSeq)
}
}
}
}

@Nested
inner class SingleRequestMultipleResponseSyncTest : AbstractGrpcRouterTest() {
@Test
override fun general() {
createServer().use {
val responses = createClient().singleRequestMultipleResponse(Request.newBuilder().setSeq(1).build())
createServer().execAndClose {
val responses = createClientSync().singleRequestMultipleResponse(Request.newBuilder().setSeq(1).build())
.asSequence().toList()

assertEquals(2, responses.size)
Expand All @@ -375,7 +562,7 @@ internal open class DefaultGrpcRouterTest {
}
@Test
override fun `delayed server start`() {
val iterator = createClient().singleRequestMultipleResponse(Request.newBuilder().setSeq(1).build())
val iterator = createClientSync().singleRequestMultipleResponse(Request.newBuilder().setSeq(1).build())
val exception = assertThrows<StatusRuntimeException> {
// FIXME: gRPC router should retry to resend request when server isn't available.
// iterator can throw exception when server disappears in the intermediate of response retransmission
Expand Down Expand Up @@ -412,13 +599,13 @@ internal open class DefaultGrpcRouterTest {
val clientServerBaton = Baton("client-server")
val future = executor.submit<List<Response>> {
clientServerBaton.giveAndGet("client thread started", "wait server start")
createClient().singleRequestMultipleResponse(Request.newBuilder().setSeq(1).build())
createClientSync().singleRequestMultipleResponse(Request.newBuilder().setSeq(1).build())
.asSequence().toList()
}

val handlerBaton = Baton("handler")

createServer(completeResponse = true, handlerBaton = handlerBaton).use(true) {
createServer(completeResponse = true, handlerBaton = handlerBaton).execAndClose(true) {
clientServerBaton.get("wait client thread start")
clientServerBaton.give("server started")
handlerBaton.get("wait response sent")
Expand Down Expand Up @@ -446,14 +633,14 @@ internal open class DefaultGrpcRouterTest {
// FIXME: gRPC router should retry to resend request when server is terminated intermediate handling.
// iterator can throw exception when server disappears in the intermediate of response retransmission
// and retryInterruptedTransaction false
createClient(retryInterruptedTransaction = true).singleRequestMultipleResponse(
createClientSync(retryInterruptedTransaction = true).singleRequestMultipleResponse(
Request.newBuilder().setSeq(1).build()
).asSequence().toList()
}

val handlerBaton = Baton("handler")

createServer(completeResponse = true, handlerBaton = handlerBaton).use(true) {
createServer(completeResponse = true, handlerBaton = handlerBaton).execAndClose(true) {
clientServerBaton.get("wait client thread start")
clientServerBaton.give("server started")
handlerBaton.get("wait response sent")
Expand All @@ -480,7 +667,7 @@ internal open class DefaultGrpcRouterTest {

protected const val SERVER_PORT = 8080
protected const val RETRY_TIMEOUT = 1_000L
protected inline fun Server.use(force: Boolean = false, func: Server.() -> Unit) {
protected inline fun Server.execAndClose(force: Boolean = false, func: Server.() -> Unit = { }) {
try {
val startTime = Instant.now()
func()
Expand Down

0 comments on commit 0b54a21

Please sign in to comment.