Skip to content

Commit

Permalink
[TH2-5122] added SingleRequestMultipleResponseAsyncTest
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita-Smirnov-Exactpro committed Nov 8, 2023
1 parent 75f9e8f commit 06aeffc
Show file tree
Hide file tree
Showing 2 changed files with 174 additions and 9 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ version = release_version
ext {
grpcVersion = '1.56.0'
protobufVersion = '3.23.2' // The protoc:3.23.3 https://github.com/protocolbuffers/protobuf/issues/13070
serviceGeneratorVersion = '3.5.1'
serviceGeneratorVersion = '3.5.1-th2-5122-+'

cradleVersion = '5.1.1-dev'
junitVersion = '5.10.0'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import com.exactpro.th2.common.test.grpc.Request
import com.exactpro.th2.common.test.grpc.Response
import com.exactpro.th2.common.test.grpc.TestGrpc.TestImplBase
import com.exactpro.th2.common.test.grpc.TestService
import com.exactpro.th2.service.AbstractGrpcService.MID_TRANSFER_FAILURE_EXCEPTION_MESSAGE
import com.exactpro.th2.service.AbstractGrpcService.ROOT_RETRY_SYNC_EXCEPTION_MESSAGE
import com.exactpro.th2.service.AbstractGrpcService.STATUS_DESCRIPTION_OF_INTERRUPTED_REQUEST
import io.grpc.Context
import io.grpc.Deadline
Expand All @@ -42,6 +44,7 @@ import org.junit.jupiter.api.assertThrows
import org.mockito.kotlin.argumentCaptor
import org.mockito.kotlin.mock
import org.mockito.kotlin.timeout
import org.mockito.kotlin.times
import org.mockito.kotlin.verify
import org.mockito.kotlin.verifyNoMoreInteractions
import org.testcontainers.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder
Expand Down Expand Up @@ -275,7 +278,7 @@ internal class DefaultGrpcRouterTest {
exception, ExceptionMetadata(
"java.lang.RuntimeException: Can not execute GRPC blocking request",
ExceptionMetadata(
"Can not execute GRPC blocking request",
ROOT_RETRY_SYNC_EXCEPTION_MESSAGE,
suspended = listOf(
ExceptionMetadata(
"CANCELLED: $STATUS_DESCRIPTION_OF_INTERRUPTED_REQUEST",
Expand Down Expand Up @@ -601,9 +604,9 @@ internal class DefaultGrpcRouterTest {
K_LOGGER.error(exception) { "Handle exception" }
assertException(
exception, ExceptionMetadata(
"java.lang.IllegalStateException: Request failures mid-transfer",
"java.lang.IllegalStateException: $MID_TRANSFER_FAILURE_EXCEPTION_MESSAGE",
ExceptionMetadata(
"Request failures mid-transfer",
MID_TRANSFER_FAILURE_EXCEPTION_MESSAGE,
ExceptionMetadata(
"CANCELLED: $STATUS_DESCRIPTION_OF_INTERRUPTED_REQUEST"
)
Expand Down Expand Up @@ -637,9 +640,9 @@ internal class DefaultGrpcRouterTest {
K_LOGGER.error(exception) { "Handle exception" }
assertException(
exception, ExceptionMetadata(
"java.lang.IllegalStateException: Request failures mid-transfer",
"java.lang.IllegalStateException: $MID_TRANSFER_FAILURE_EXCEPTION_MESSAGE",
ExceptionMetadata(
"Request failures mid-transfer",
MID_TRANSFER_FAILURE_EXCEPTION_MESSAGE,
ExceptionMetadata(
"CANCELLED: $STATUS_DESCRIPTION_OF_INTERRUPTED_REQUEST"
)
Expand All @@ -649,13 +652,175 @@ internal class DefaultGrpcRouterTest {
}
}

@Nested
inner class SingleRequestMultipleResponseAsyncTest : AbstractGrpcRouterTest() {
@Test
override fun general() {
val streamObserver = mock<StreamObserver<Response>> { }
createServer().execAndClose(true) {
createClientAsync().singleRequestMultipleResponse(createRequest(), streamObserver)

val captor = argumentCaptor<Response> { }
verify(streamObserver, timeout(60 * 1_000)).onCompleted()
verify(streamObserver, times(2)).onNext(captor.capture())
verifyNoMoreInteractions(streamObserver)

assertEquals(2, captor.allValues.size)
captor.allValues.forEachIndexed { index, response ->
assertEquals(index + 1, response.seq)
assertEquals(1, response.origSeq)
}
}
}

@Test
override fun `delayed server start`() {
val streamObserver = mock<StreamObserver<Response>> { }
createClientAsync().singleRequestMultipleResponse(createRequest(), streamObserver)

Thread.sleep(RETRY_TIMEOUT / 2)

createServer().execAndClose {
val captor = argumentCaptor<Response> { }
verify(streamObserver, timeout(60 * 1_000)).onCompleted()
verify(streamObserver, times(2)).onNext(captor.capture())
verifyNoMoreInteractions(streamObserver)

assertEquals(2, captor.allValues.size)
captor.allValues.forEachIndexed { index, response ->
assertEquals(index + 1, response.seq)
assertEquals(1, response.origSeq)
}
}
}

@Test
override fun `cancel retry request`() {
val grpcContext = Context.current().withCancellation()

val streamObserver = mock<StreamObserver<Response>> { }
grpcContext.call {
createClientAsync().singleRequestMultipleResponse(createRequest(), streamObserver)
}

Thread.sleep(RETRY_TIMEOUT / 2)

assertTrue(grpcContext.cancel(RuntimeException(CANCEL_REASON)))

val captor = argumentCaptor<Throwable> { }
verify(streamObserver, timeout(60 * 1_000)).onError(captor.capture())
verifyNoMoreInteractions(streamObserver)

assertEquals(1, captor.allValues.size)
K_LOGGER.error(captor.firstValue) { "Handle exception" }

assertException(
captor.firstValue, ExceptionMetadata(
"CANCELLED: Context cancelled",
ExceptionMetadata(
CANCEL_REASON,
)
)
)
}

@Test
override fun `deadline retry request`() {
val grpcContext = Context.current()
.withDeadline(Deadline.after(RETRY_TIMEOUT / 2, TimeUnit.MILLISECONDS), deadlineExecutor)

val streamObserver = mock<StreamObserver<Response>> { }
grpcContext.call {
createClientAsync().singleRequestMultipleResponse(createRequest(), streamObserver)
}

val captor = argumentCaptor<Throwable> { }
verify(streamObserver, timeout(60 * 1_000)).onError(captor.capture())
verifyNoMoreInteractions(streamObserver)

assertEquals(1, captor.allValues.size)
K_LOGGER.error(captor.firstValue) { "Handle exception" }

assertException(
captor.firstValue, ExceptionMetadata(
"DEADLINE_EXCEEDED: context timed out",
ExceptionMetadata(
"context timed out",
)
)
)
}

@Test
override fun `interrupt thread during retry request`() {
// this test isn't relevant for async request
}

@Test
override fun `server terminated intermediate session (retry false)`() {
val exception = `server terminated intermediate session`(false)
K_LOGGER.error(exception) { "Handle exception" }

assertException(
exception, ExceptionMetadata(
"CANCELLED: $STATUS_DESCRIPTION_OF_INTERRUPTED_REQUEST",
)
)
}

@Test
override fun `server terminated intermediate session (retry true)`() {
val exception = `server terminated intermediate session`(true)
K_LOGGER.error(exception) { "Handle exception" }

assertException(
exception, ExceptionMetadata(
MID_TRANSFER_FAILURE_EXCEPTION_MESSAGE,
suspended = listOf(
ExceptionMetadata(
"CANCELLED: $STATUS_DESCRIPTION_OF_INTERRUPTED_REQUEST",
)
)
)
)
}

private fun `server terminated intermediate session`(retryInterruptedTransaction: Boolean): Throwable {
val streamObserver = mock<StreamObserver<Response>> { }
val handlerBaton = Baton("handler")

createServer(completeResponse = false, handlerBaton = handlerBaton).execAndClose(true) {
createClientAsync(retryInterruptedTransaction = retryInterruptedTransaction)
.singleRequestMultipleResponse(createRequest(), streamObserver)

handlerBaton.get("wait response sent")
Thread.sleep(RETRY_TIMEOUT / 2)
}

val onErrorCaptor = argumentCaptor<Throwable> { }
verify(streamObserver, timeout(60 * 1_000)).onError(onErrorCaptor.capture())
val onNextCaptor = argumentCaptor<Response> { }
verify(streamObserver, times(2)).onNext(onNextCaptor.capture())
verifyNoMoreInteractions(streamObserver)

assertEquals(1, onErrorCaptor.allValues.size)
assertEquals(2, onNextCaptor.allValues.size)
onNextCaptor.allValues.forEachIndexed { index, response ->
assertEquals(index + 1, response.seq)
assertEquals(1, response.origSeq)
}

return onErrorCaptor.firstValue
}
}

private fun assertInterruptedSync(exception: ExecutionException) {
K_LOGGER.error(exception) { "Handle exception" }
assertException(
exception, ExceptionMetadata(
"java.lang.RuntimeException: Can not execute GRPC blocking request",
ExceptionMetadata(
"Can not execute GRPC blocking request",
ROOT_RETRY_SYNC_EXCEPTION_MESSAGE,
suspended = listOf(
ExceptionMetadata(
"UNAVAILABLE: io exception",
Expand All @@ -681,7 +846,7 @@ internal class DefaultGrpcRouterTest {
exception, ExceptionMetadata(
"java.lang.RuntimeException: Can not execute GRPC blocking request",
ExceptionMetadata(
"Can not execute GRPC blocking request",
ROOT_RETRY_SYNC_EXCEPTION_MESSAGE,
suspended = listOf(
ExceptionMetadata(
"UNAVAILABLE: io exception",
Expand Down Expand Up @@ -710,7 +875,7 @@ internal class DefaultGrpcRouterTest {
exception, ExceptionMetadata(
"java.lang.RuntimeException: Can not execute GRPC blocking request",
ExceptionMetadata(
"Can not execute GRPC blocking request",
ROOT_RETRY_SYNC_EXCEPTION_MESSAGE,
suspended = listOf(
ExceptionMetadata(
"UNAVAILABLE: io exception",
Expand Down

0 comments on commit 06aeffc

Please sign in to comment.