diff --git a/src/test/kotlin/com/exactpro/th2/conn/dirty/fix/StrategiesTest.kt b/src/test/kotlin/com/exactpro/th2/conn/dirty/fix/StrategiesTest.kt index 666cfc7..95aa95b 100644 --- a/src/test/kotlin/com/exactpro/th2/conn/dirty/fix/StrategiesTest.kt +++ b/src/test/kotlin/com/exactpro/th2/conn/dirty/fix/StrategiesTest.kt @@ -29,6 +29,7 @@ import com.exactpro.th2.conn.dirty.fix.brokenconn.configuration.BatchSendConfigu import com.exactpro.th2.conn.dirty.fix.brokenconn.configuration.BrokenConnConfiguration import com.exactpro.th2.conn.dirty.fix.brokenconn.configuration.ChangeSequenceConfiguration import com.exactpro.th2.conn.dirty.fix.brokenconn.configuration.MissMessageConfiguration +import com.exactpro.th2.conn.dirty.fix.brokenconn.configuration.RecoveryConfig import com.exactpro.th2.conn.dirty.fix.brokenconn.configuration.ResendRequestConfiguration import com.exactpro.th2.conn.dirty.fix.brokenconn.configuration.RuleConfiguration import com.exactpro.th2.conn.dirty.fix.brokenconn.configuration.SplitSendConfiguration @@ -81,7 +82,6 @@ import java.util.concurrent.CompletableFuture import java.util.concurrent.CopyOnWriteArrayList import java.util.concurrent.Executors import java.util.concurrent.Future -import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicInteger import java.util.regex.Pattern import kotlin.test.assertContains @@ -501,31 +501,129 @@ class StrategiesTest { val testContext = createTestContext( handlerSettings, - searchMessageGroups = { request -> - val from = if (request.hasStartTimestamp()) request.startTimestamp else null - val to = if (request.hasEndTimestamp()) request.endTimestamp else null - when (request.searchDirection) { - TimeRelation.NEXT -> - messages - .toMutableList() - .asSequence() - .filter { response -> - (from == null || Timestamps.compare(from, response.message.messageId.timestamp) <= 0) && - (to == null || Timestamps.compare(to, response.message.messageId.timestamp) >= 0) - } + searchMessageGroups = { request -> searchMessageGroups(messages, request) }, + ) + + val context = testContext.context + val channel = testContext.channel + val handler = testContext.fixHandler + val executor = Executors.newCachedThreadPool() + + var msgSender: Future<*>? = null + try { + TestServerEmulator( + executor, + handler, + channel, + ::getMessageId, + ::logonResponse, + ::resendRequest, + ).use { server -> + whenever(channel.send(any(), any(), anyOrNull(), any())).doAnswer { + val sendMode = it.arguments[3] as IChannel.SendMode + val byteBuf = it.arguments[0] as ByteBuf + handleOutgoingMessages(handler, channel, messages, server, sendMode, byteBuf) + } - TimeRelation.PREVIOUS -> - messages - .reversed() - .asSequence() - .filter { response -> - (from == null || Timestamps.compare(from, response.message.messageId.timestamp) >= 0) && - (to == null || Timestamps.compare(to, response.message.messageId.timestamp) <= 0) + handler.use { + handler.onStart() + verify(context, timeout(ruleDuration.toMillis() + correction)).send( + argThat { + toProto(TEST_BOOK, TEST_SCOPE).name.contains("$CREATE_OUTGOING_GAP strategy started") + }, + anyOrNull(), + ) + + msgSender = executor.submit { sendBusinessMessages(handler) } + + verify(context, timeout(ruleDuration.toMillis() * 10)).send( + argThat { + toProto(TEST_BOOK, TEST_SCOPE).name.contains("$CREATE_OUTGOING_GAP strategy finished") + }, + anyOrNull()) + + } + } + } finally { + msgSender?.cancel(true) + executor.shutdownGracefully() + } + } + + private fun handleOutgoingMessages( + handler: FixHandler, + channel: IChannel, + messages: MutableList, + server: TestServerEmulator, + sendMode: IChannel.SendMode, + byteBuf: ByteBuf, + ): CompletableFuture = try { + + LOGGER.info { "put $sendMode ${byteBuf.toString(US_ASCII)}" } + + if (sendMode.handle) { + handler.onOutgoing(channel, byteBuf, mutableMapOf()) + } + + if (!byteBuf.isEmpty()) { + if (sendMode.mqPublish) { + messages.add( + MessageSearchResponse + .newBuilder() + .apply { + messageBuilder.apply { + messageIdBuilder.apply { + timestamp = Timestamps.now() + sequence = byteBuf.findField(MSG_SEQ_NUM_TAG)?.value?.toLong() ?: 0L } + bodyRaw = UnsafeByteOperations.unsafeWrap(byteBuf.toByteArray()) + } + }.build(), + ) + LOGGER.info { "publish to mq ${byteBuf.findField(MSG_SEQ_NUM_TAG)?.value}" } + } - else -> error("Unsupported search direction") - }.iterator() - }, + if (sendMode.socketSend) { + server.consume(byteBuf) + } + } + + CompletableFuture.completedFuture(getMessageId()) + } catch (e: InterruptedException) { + Thread.currentThread().interrupt() + throw e + } + + @Test + fun `outgoing gap strategy - long recovery in case of sequence reset for admin is false test`() { + val ruleDuration = Duration.of(500, MILLIS) + val correction = ruleDuration.millis() / 2 + val messages = CopyOnWriteArrayList() + val handlerSettings: FixHandlerSettings = + createHandlerSettings( + BrokenConnConfiguration( + SchedulerType.CONSECUTIVE, + listOf( + RuleConfiguration(DEFAULT, duration = ruleDuration, cleanUpDuration = Duration.ZERO), + RuleConfiguration( + CREATE_OUTGOING_GAP, + duration = ruleDuration, + cleanUpDuration = Duration.ZERO, + missOutgoingMessagesConfiguration = MissMessageConfiguration(3), + recoveryConfig = RecoveryConfig( + sequenceResetForAdmin = false + ) + ), + ), + ), + ).apply { + isLoadMissedMessagesFromCradle = true + cradleSaveTimeoutMs = 500 + } + val testContext = + createTestContext( + handlerSettings, + searchMessageGroups = { request -> searchMessageGroups(messages, request) }, ) val context = testContext.context @@ -533,7 +631,6 @@ class StrategiesTest { val handler = testContext.fixHandler val executor = Executors.newCachedThreadPool() - val activeSending = AtomicBoolean(true) var msgSender: Future<*>? = null try { TestServerEmulator( @@ -545,43 +642,87 @@ class StrategiesTest { ::resendRequest, ).use { server -> whenever(channel.send(any(), any(), anyOrNull(), any())).doAnswer { - try { - val sendMode = it.arguments[3] as IChannel.SendMode - val byteBuf = it.arguments[0] as ByteBuf - LOGGER.info { "put $sendMode ${byteBuf.toString(US_ASCII)}" } - - if (sendMode.handle) { - handler.onOutgoing(channel, byteBuf, mutableMapOf()) - } - - if (!byteBuf.isEmpty()) { - if (sendMode.mqPublish) { - messages.add( - MessageSearchResponse - .newBuilder() - .apply { - messageBuilder.apply { - messageIdBuilder.apply { - timestamp = Timestamps.now() - sequence = byteBuf.findField(MSG_SEQ_NUM_TAG)?.value?.toLong() ?: 0L - } - bodyRaw = UnsafeByteOperations.unsafeWrap(byteBuf.toByteArray()) - } - }.build(), - ) - LOGGER.info { "publish to mq ${byteBuf.findField(MSG_SEQ_NUM_TAG)?.value}" } - } + val sendMode = it.arguments[3] as IChannel.SendMode + val byteBuf = it.arguments[0] as ByteBuf + handleOutgoingMessages(handler, channel, messages, server, sendMode, byteBuf) + } - if (sendMode.socketSend) { - server.consume(byteBuf) - } - } + handler.use { + handler.onStart() + verify(context, timeout(ruleDuration.toMillis() + correction)).send( + argThat { + toProto(TEST_BOOK, TEST_SCOPE).name.contains("$CREATE_OUTGOING_GAP strategy started") + }, + anyOrNull(), + ) + + msgSender = executor.submit { sendBusinessMessages(handler) } + + verify(context, timeout(ruleDuration.toMillis() * 10)).send( + argThat { + toProto(TEST_BOOK, TEST_SCOPE).name.contains("$CREATE_OUTGOING_GAP strategy finished") + }, + anyOrNull()) - CompletableFuture.completedFuture(getMessageId()) - } catch (e: InterruptedException) { - Thread.currentThread().interrupt() - throw e - } + } + } + } finally { + msgSender?.cancel(true) + executor.shutdownGracefully() + } + } + + @Test + fun `outgoing gap strategy - long recovery in case of out of order is true test`() { + val ruleDuration = Duration.of(500, MILLIS) + val correction = ruleDuration.millis() / 2 + val messages = CopyOnWriteArrayList() + val handlerSettings: FixHandlerSettings = + createHandlerSettings( + BrokenConnConfiguration( + SchedulerType.CONSECUTIVE, + listOf( + RuleConfiguration(DEFAULT, duration = ruleDuration, cleanUpDuration = Duration.ZERO), + RuleConfiguration( + CREATE_OUTGOING_GAP, + duration = ruleDuration, + cleanUpDuration = Duration.ZERO, + missOutgoingMessagesConfiguration = MissMessageConfiguration(3), + recoveryConfig = RecoveryConfig( + outOfOrder = true + ) + ), + ), + ), + ).apply { + isLoadMissedMessagesFromCradle = true + cradleSaveTimeoutMs = 500 + } + val testContext = + createTestContext( + handlerSettings, + searchMessageGroups = { request -> searchMessageGroups(messages, request) }, + ) + + val context = testContext.context + val channel = testContext.channel + val handler = testContext.fixHandler + val executor = Executors.newCachedThreadPool() + + var msgSender: Future<*>? = null + try { + TestServerEmulator( + executor, + handler, + channel, + ::getMessageId, + ::logonResponse, + ::resendRequest, + ).use { server -> + whenever(channel.send(any(), any(), anyOrNull(), any())).doAnswer { + val sendMode = it.arguments[3] as IChannel.SendMode + val byteBuf = it.arguments[0] as ByteBuf + handleOutgoingMessages(handler, channel, messages, server, sendMode, byteBuf) } handler.use { @@ -593,37 +734,66 @@ class StrategiesTest { anyOrNull(), ) - msgSender = - executor.submit { - try { - var seq = 2 - while (activeSending.get()) { - seq += 1 - handler.send(businessMessage(seq).asExpandable(), mutableMapOf(), null) - Thread.sleep(1) - } - } catch (e: Exception) { - LOGGER.error(e) { "Send message problem" } - } finally { - LOGGER.info { "Stopped sending" } - } - } + msgSender = executor.submit { sendBusinessMessages(handler) } verify(context, timeout(ruleDuration.toMillis() * 10)).send( argThat { - toProto(TEST_BOOK, TEST_SCOPE).name.contains("$CREATE_OUTGOING_GAP strategy finished") - }, - anyOrNull()) + toProto(TEST_BOOK, TEST_SCOPE).name.contains("$CREATE_OUTGOING_GAP strategy finished") + }, + anyOrNull()) } } } finally { - activeSending.set(false) msgSender?.cancel(true) executor.shutdownGracefully() } } + private fun searchMessageGroups( + messages: List, + request: MessageGroupsSearchRequest + ): Iterator { + val from = if (request.hasStartTimestamp()) request.startTimestamp else null + val to = if (request.hasEndTimestamp()) request.endTimestamp else null + return when (request.searchDirection) { + TimeRelation.NEXT -> + messages + .toMutableList() + .asSequence() + .filter { response -> + (from == null || Timestamps.compare(from, response.message.messageId.timestamp) <= 0) && + (to == null || Timestamps.compare(to, response.message.messageId.timestamp) >= 0) + } + + TimeRelation.PREVIOUS -> + messages + .reversed() + .asSequence() + .filter { response -> + (from == null || Timestamps.compare(from, response.message.messageId.timestamp) >= 0) && + (to == null || Timestamps.compare(to, response.message.messageId.timestamp) <= 0) + } + + else -> error("Unsupported search direction") + }.iterator() + } + + private fun sendBusinessMessages(handler: FixHandler) { + try { + var seq = 2 + while (!Thread.currentThread().isInterrupted) { + seq += 1 + handler.send(businessMessage(seq).asExpandable(), mutableMapOf(), null) + Thread.sleep(1) + } + } catch (e: Exception) { + LOGGER.error(e) { "Send message problem" } + } finally { + LOGGER.info { "Stopped sending" } + } + } + private fun verifyChangeStrategy( channel: IChannel, ruleDuration: Duration,