From c86a82dc6efae2e75f298bedab224adfca13294a Mon Sep 17 00:00:00 2001 From: "nikita.smirnov" Date: Thu, 4 Jul 2024 18:32:29 +0400 Subject: [PATCH] [TS-2459] implemented test for reproducing problem --- .../th2/conn/dirty/fix/TestStrategies.kt | 177 +++++++++++++----- 1 file changed, 130 insertions(+), 47 deletions(-) diff --git a/src/test/kotlin/com/exactpro/th2/conn/dirty/fix/TestStrategies.kt b/src/test/kotlin/com/exactpro/th2/conn/dirty/fix/TestStrategies.kt index c1208c5..8fa9d95 100644 --- a/src/test/kotlin/com/exactpro/th2/conn/dirty/fix/TestStrategies.kt +++ b/src/test/kotlin/com/exactpro/th2/conn/dirty/fix/TestStrategies.kt @@ -58,6 +58,7 @@ import org.junit.jupiter.api.assertAll import org.mockito.kotlin.any import org.mockito.kotlin.anyOrNull import org.mockito.kotlin.anyVararg +import org.mockito.kotlin.argThat import org.mockito.kotlin.argumentCaptor import org.mockito.kotlin.clearInvocations import org.mockito.kotlin.description @@ -75,6 +76,9 @@ import kotlin.test.assertNotNull import kotlin.text.Charsets.US_ASCII import com.exactpro.th2.common.grpc.Event as ProtoEvent +private const val TEST_BOOK = "test-book" +private const val TEST_SCOPE = "test-scope" + class TestStrategies { private class TestContext( @@ -374,18 +378,17 @@ class TestStrategies { @Test fun `outgoing gap strategy test`() { - val defaultRuleDuration = Duration.of(1, SECONDS) - val businessRuleDuration = Duration.of(1, SECONDS) - val businessRuleCleanupDuration = Duration.of(1, SECONDS) + val defaultRuleDuration = Duration.of(500, MILLIS) + val businessRuleDuration = Duration.of(500, MILLIS) val messages = mutableListOf, IChannel.SendMode>>() val testContext = createTestContext(BrokenConnConfiguration( SchedulerType.CONSECUTIVE, listOf( - RuleConfiguration(DEFAULT, duration = defaultRuleDuration, cleanUpDuration = Duration.of(100, MILLIS)), + RuleConfiguration(DEFAULT, duration = defaultRuleDuration, cleanUpDuration = Duration.ZERO), RuleConfiguration( CREATE_OUTGOING_GAP, duration = businessRuleDuration, - cleanUpDuration = businessRuleCleanupDuration, + cleanUpDuration = Duration.ZERO, missOutgoingMessagesConfiguration = MissMessageConfiguration(3) ), ) @@ -395,54 +398,134 @@ class TestStrategies { val context = testContext.context val channel = testContext.channel - try { - testContext.fixHandler.use { handler -> - handler.onStart() - context.verifyInvocationsAndClean { - verify(this).createChannel(any(), any(), any(), any(), any(), any(), anyVararg()) - channel.verifyInvocationsAndClean { - verify( - this, - times(2).description("Check channel state in the onStart and sendLogon methods"), - ).isOpen - verify(this, description("open channel with default strategy first")).open() - verifySend(mapOf(35 to "A", 34 to "1"), 100) - } - verifySendEvent("successful login", "Info", 100) - } - verifyChangeStrategy(channel, defaultRuleDuration, DEFAULT, CREATE_OUTGOING_GAP, 2) - context.verifyInvocationsAndClean { - val events = captureSendEvents(2, 100) - assertAll( - { assertEquals("successful login", events[0].name) }, - { assertContains(events[1].name, "$CREATE_OUTGOING_GAP strategy started") }, - ) + testContext.fixHandler.use { handler -> + handler.onStart() + context.verifyInvocationsAndClean { + verify(this).createChannel(any(), any(), any(), any(), any(), any(), anyVararg()) + channel.verifyInvocationsAndClean { + verify( + this, + times(2).description("Check channel state in the onStart and sendLogon methods"), + ).isOpen + verify(this, description("open channel with default strategy first")).open() + verifySend(mapOf(35 to "A", 34 to "1"), 100) } + verifySendEvent("successful login", "Info", 100) + } - handler.onOutgoing(channel, businessMessage(3).asExpandable(), mutableMapOf()) - handler.onOutgoing(channel, businessMessage(4).asExpandable(), mutableMapOf()) - handler.onOutgoing(channel, businessMessage(5).asExpandable(), mutableMapOf()) - handler.onOutgoing(channel, businessMessage(6).asExpandable(), mutableMapOf()) + verifyChangeStrategy(channel, defaultRuleDuration, DEFAULT, CREATE_OUTGOING_GAP, 2) + context.verifyInvocationsAndClean { + val events = captureSendEvents(2, 100) + assertAll( + { assertEquals("successful login", events[0].name) }, + { assertContains(events[1].name, "$CREATE_OUTGOING_GAP strategy started") }, + ) + } - handler.onIncoming(channel, resendRequest(3, 3, 5), getMessageId()) - channel.verifyInvocationsAndClean { - verify(this, timeout(100).description("check status during handleResendRequest method")).isOpen - val byteBufs = captureSend(3, 100) - assertAll( - { byteBufs[0].assertContains(mapOf(35 to "AE", 43 to "Y", 34 to "3")) }, - { byteBufs[1].assertContains(mapOf(35 to "AE", 43 to "Y", 34 to "4")) }, - { byteBufs[2].assertContains(mapOf(35 to "AE", 43 to "Y", 34 to "5")) }, - ) + handler.onOutgoing(channel, businessMessage(3).asExpandable(), mutableMapOf()) + handler.onOutgoing(channel, businessMessage(4).asExpandable(), mutableMapOf()) + handler.onOutgoing(channel, businessMessage(5).asExpandable(), mutableMapOf()) + handler.onOutgoing(channel, businessMessage(6).asExpandable(), mutableMapOf()) + + handler.onIncoming(channel, resendRequest(3, 3, 5), getMessageId()) + channel.verifyInvocationsAndClean { + verify(this, timeout(100).description("check status during handleResendRequest method")).isOpen + val byteBufs = captureSend(3, 100) + assertAll( + { byteBufs[0].assertContains(mapOf(35 to "AE", 43 to "Y", 34 to "3")) }, + { byteBufs[1].assertContains(mapOf(35 to "AE", 43 to "Y", 34 to "4")) }, + { byteBufs[2].assertContains(mapOf(35 to "AE", 43 to "Y", 34 to "5")) }, + ) + } + + verifyChangeStrategy(channel, businessRuleDuration, CREATE_OUTGOING_GAP, DEFAULT, 7) + context.verifyInvocationsAndClean { + val events = captureSendEvents(2, 100) + assertAll( + { assertEquals("successful login", events[0].name) }, + { assertContains(events[1].name, "$CREATE_OUTGOING_GAP strategy finished") }, + ) + + } + } + } + + @Test + fun `outgoing gap strategy - long recovery in case of mixing recovery message with non-recovery messages test`() { + val defaultRuleDuration = Duration.of(500, MILLIS) + val businessRuleDuration = Duration.of(500, MILLIS) + val messages = mutableListOf, IChannel.SendMode>>() + val testContext = createTestContext(BrokenConnConfiguration( + SchedulerType.CONSECUTIVE, + listOf( + RuleConfiguration(DEFAULT, duration = defaultRuleDuration, cleanUpDuration = Duration.ZERO), + RuleConfiguration( + CREATE_OUTGOING_GAP, + duration = businessRuleDuration, + cleanUpDuration = Duration.ZERO, + allowMessagesBeforeRetransmissionFinishes = true, + missOutgoingMessagesConfiguration = MissMessageConfiguration(3) + ), + ) + )) { msg, mode, mtd -> + messages.add(Triple(msg, mode, mtd)) + } + + val context = testContext.context + val channel = testContext.channel + val handler = testContext.fixHandler + + var outgoingSequence = 0 + var incomingSequence = 0 + whenever(channel.send(any(), any(), anyOrNull(), any())).doAnswer { + val byteBuf = it.arguments[0] as ByteBuf + LOGGER.info { "channel.send ${byteBuf.toString(US_ASCII)}" } + + val seq = requireNotNull(byteBuf.findField(34)?.value?.toInt()) + val posDup = byteBuf.findField(43)?.value == "Y" + + if (posDup) { + if (outgoingSequence + 1 == seq) { + outgoingSequence = seq } + } else { + if (byteBuf.findField(35)?.value == "A") { + incomingSequence += 1 + handler.onIncoming(channel, logonResponse(incomingSequence), getMessageId()) + } + if (outgoingSequence + 1 == seq) { + outgoingSequence = seq + } else { + incomingSequence += 1 + handler.onIncoming(channel, resendRequest(incomingSequence, outgoingSequence, 0), getMessageId()) + } + } - verifyChangeStrategy(channel, businessRuleDuration, CREATE_OUTGOING_GAP, DEFAULT, 7) - context.verifyInvocationsAndClean { - verifySendEvent("successful login", "Info", 100) + CompletableFuture.completedFuture(getMessageId()) + } + + handler.use { + handler.onStart() + verify(context, timeout(defaultRuleDuration.toMillis() + 100)).send(argThat { + toProto(TEST_BOOK, TEST_SCOPE).name.contains("$CREATE_OUTGOING_GAP strategy started") + }, anyOrNull()) + + val future = CompletableFuture.runAsync { + while (!Thread.currentThread().isInterrupted) { + val byteBuf = businessMessage(0).asExpandable() + handler.onOutgoing(channel, byteBuf, mutableMapOf()) + if (!byteBuf.isEmpty()) { + channel.send(byteBuf, mutableMapOf()) + } } } - } finally { - channel.close() + + verify(context, timeout(businessRuleDuration.toMillis() * 10)).send(argThat { + toProto(TEST_BOOK, TEST_SCOPE).name.contains("$CREATE_OUTGOING_GAP strategy finished") + }, anyOrNull()) + + future.cancel(true) } } @@ -870,7 +953,7 @@ class TestStrategies { this.also { val captor = argumentCaptor { } verify(this, timeout(timeout)).send(captor.capture(), anyOrNull()) - val event = captor.allValues.single().toProto("test-book", "test-scope") + val event = captor.allValues.single().toProto(TEST_BOOK, TEST_SCOPE) assertAll( { name?.let { assertContains(event.name, name) } }, { type?.let { assertContains(event.type, type) } }, @@ -883,7 +966,7 @@ class TestStrategies { ): List { val captor = argumentCaptor { } verify(this, timeout(timeout).times(times)).send(captor.capture(), anyOrNull()) - return captor.allValues.map { it.toProto("test-book", "test-scope") } + return captor.allValues.map { it.toProto(TEST_BOOK, TEST_SCOPE) } } private fun ByteBuf.assertContains(values: Map) {