Skip to content

Commit

Permalink
[TS-2459] added tests for outOfOrder = true, sequenceResetForAdmin = …
Browse files Browse the repository at this point in the history
…false cases
  • Loading branch information
Nikita-Smirnov-Exactpro committed Jul 10, 2024
1 parent 5516e29 commit 1647d07
Showing 1 changed file with 248 additions and 78 deletions.
326 changes: 248 additions & 78 deletions src/test/kotlin/com/exactpro/th2/conn/dirty/fix/StrategiesTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import com.exactpro.th2.conn.dirty.fix.brokenconn.configuration.BatchSendConfigu
import com.exactpro.th2.conn.dirty.fix.brokenconn.configuration.BrokenConnConfiguration
import com.exactpro.th2.conn.dirty.fix.brokenconn.configuration.ChangeSequenceConfiguration
import com.exactpro.th2.conn.dirty.fix.brokenconn.configuration.MissMessageConfiguration
import com.exactpro.th2.conn.dirty.fix.brokenconn.configuration.RecoveryConfig
import com.exactpro.th2.conn.dirty.fix.brokenconn.configuration.ResendRequestConfiguration
import com.exactpro.th2.conn.dirty.fix.brokenconn.configuration.RuleConfiguration
import com.exactpro.th2.conn.dirty.fix.brokenconn.configuration.SplitSendConfiguration
Expand Down Expand Up @@ -81,7 +82,6 @@ import java.util.concurrent.CompletableFuture
import java.util.concurrent.CopyOnWriteArrayList
import java.util.concurrent.Executors
import java.util.concurrent.Future
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicInteger
import java.util.regex.Pattern
import kotlin.test.assertContains
Expand Down Expand Up @@ -501,39 +501,136 @@ class StrategiesTest {
val testContext =
createTestContext(
handlerSettings,
searchMessageGroups = { request ->
val from = if (request.hasStartTimestamp()) request.startTimestamp else null
val to = if (request.hasEndTimestamp()) request.endTimestamp else null
when (request.searchDirection) {
TimeRelation.NEXT ->
messages
.toMutableList()
.asSequence()
.filter { response ->
(from == null || Timestamps.compare(from, response.message.messageId.timestamp) <= 0) &&
(to == null || Timestamps.compare(to, response.message.messageId.timestamp) >= 0)
}
searchMessageGroups = { request -> searchMessageGroups(messages, request) },
)

val context = testContext.context
val channel = testContext.channel
val handler = testContext.fixHandler
val executor = Executors.newCachedThreadPool()

var msgSender: Future<*>? = null
try {
TestServerEmulator(
executor,
handler,
channel,
::getMessageId,
::logonResponse,
::resendRequest,
).use { server ->
whenever(channel.send(any(), any(), anyOrNull(), any())).doAnswer {
val sendMode = it.arguments[3] as IChannel.SendMode
val byteBuf = it.arguments[0] as ByteBuf
handleOutgoingMessages(handler, channel, messages, server, sendMode, byteBuf)
}

TimeRelation.PREVIOUS ->
messages
.reversed()
.asSequence()
.filter { response ->
(from == null || Timestamps.compare(from, response.message.messageId.timestamp) >= 0) &&
(to == null || Timestamps.compare(to, response.message.messageId.timestamp) <= 0)
handler.use {
handler.onStart()
verify(context, timeout(ruleDuration.toMillis() + correction)).send(
argThat {
toProto(TEST_BOOK, TEST_SCOPE).name.contains("$CREATE_OUTGOING_GAP strategy started")
},
anyOrNull(),
)

msgSender = executor.submit { sendBusinessMessages(handler) }

verify(context, timeout(ruleDuration.toMillis() * 10)).send(
argThat {
toProto(TEST_BOOK, TEST_SCOPE).name.contains("$CREATE_OUTGOING_GAP strategy finished")
},
anyOrNull())

}
}
} finally {
msgSender?.cancel(true)
executor.shutdownGracefully()
}
}

private fun handleOutgoingMessages(
handler: FixHandler,
channel: IChannel,
messages: MutableList<MessageSearchResponse>,
server: TestServerEmulator,
sendMode: IChannel.SendMode,
byteBuf: ByteBuf,
): CompletableFuture<MessageID> = try {

LOGGER.info { "put $sendMode ${byteBuf.toString(US_ASCII)}" }

if (sendMode.handle) {
handler.onOutgoing(channel, byteBuf, mutableMapOf())
}

if (!byteBuf.isEmpty()) {
if (sendMode.mqPublish) {
messages.add(
MessageSearchResponse
.newBuilder()
.apply {
messageBuilder.apply {
messageIdBuilder.apply {
timestamp = Timestamps.now()
sequence = byteBuf.findField(MSG_SEQ_NUM_TAG)?.value?.toLong() ?: 0L
}
bodyRaw = UnsafeByteOperations.unsafeWrap(byteBuf.toByteArray())
}
}.build(),
)
LOGGER.info { "publish to mq ${byteBuf.findField(MSG_SEQ_NUM_TAG)?.value}" }
}

else -> error("Unsupported search direction")
}.iterator()
},
if (sendMode.socketSend) {
server.consume(byteBuf)
}
}

CompletableFuture.completedFuture(getMessageId())
} catch (e: InterruptedException) {
Thread.currentThread().interrupt()
throw e
}

@Test
fun `outgoing gap strategy - long recovery in case of sequence reset for admin is false test`() {
val ruleDuration = Duration.of(500, MILLIS)
val correction = ruleDuration.millis() / 2
val messages = CopyOnWriteArrayList<MessageSearchResponse>()
val handlerSettings: FixHandlerSettings =
createHandlerSettings(
BrokenConnConfiguration(
SchedulerType.CONSECUTIVE,
listOf(
RuleConfiguration(DEFAULT, duration = ruleDuration, cleanUpDuration = Duration.ZERO),
RuleConfiguration(
CREATE_OUTGOING_GAP,
duration = ruleDuration,
cleanUpDuration = Duration.ZERO,
missOutgoingMessagesConfiguration = MissMessageConfiguration(3),
recoveryConfig = RecoveryConfig(
sequenceResetForAdmin = false
)
),
),
),
).apply {
isLoadMissedMessagesFromCradle = true
cradleSaveTimeoutMs = 500
}
val testContext =
createTestContext(
handlerSettings,
searchMessageGroups = { request -> searchMessageGroups(messages, request) },
)

val context = testContext.context
val channel = testContext.channel
val handler = testContext.fixHandler
val executor = Executors.newCachedThreadPool()

val activeSending = AtomicBoolean(true)
var msgSender: Future<*>? = null
try {
TestServerEmulator(
Expand All @@ -545,43 +642,87 @@ class StrategiesTest {
::resendRequest,
).use { server ->
whenever(channel.send(any(), any(), anyOrNull(), any())).doAnswer {
try {
val sendMode = it.arguments[3] as IChannel.SendMode
val byteBuf = it.arguments[0] as ByteBuf
LOGGER.info { "put $sendMode ${byteBuf.toString(US_ASCII)}" }

if (sendMode.handle) {
handler.onOutgoing(channel, byteBuf, mutableMapOf())
}

if (!byteBuf.isEmpty()) {
if (sendMode.mqPublish) {
messages.add(
MessageSearchResponse
.newBuilder()
.apply {
messageBuilder.apply {
messageIdBuilder.apply {
timestamp = Timestamps.now()
sequence = byteBuf.findField(MSG_SEQ_NUM_TAG)?.value?.toLong() ?: 0L
}
bodyRaw = UnsafeByteOperations.unsafeWrap(byteBuf.toByteArray())
}
}.build(),
)
LOGGER.info { "publish to mq ${byteBuf.findField(MSG_SEQ_NUM_TAG)?.value}" }
}
val sendMode = it.arguments[3] as IChannel.SendMode
val byteBuf = it.arguments[0] as ByteBuf
handleOutgoingMessages(handler, channel, messages, server, sendMode, byteBuf)
}

if (sendMode.socketSend) {
server.consume(byteBuf)
}
}
handler.use {
handler.onStart()
verify(context, timeout(ruleDuration.toMillis() + correction)).send(
argThat {
toProto(TEST_BOOK, TEST_SCOPE).name.contains("$CREATE_OUTGOING_GAP strategy started")
},
anyOrNull(),
)

msgSender = executor.submit { sendBusinessMessages(handler) }

verify(context, timeout(ruleDuration.toMillis() * 10)).send(
argThat {
toProto(TEST_BOOK, TEST_SCOPE).name.contains("$CREATE_OUTGOING_GAP strategy finished")
},
anyOrNull())

CompletableFuture.completedFuture(getMessageId())
} catch (e: InterruptedException) {
Thread.currentThread().interrupt()
throw e
}
}
}
} finally {
msgSender?.cancel(true)
executor.shutdownGracefully()
}
}

@Test
fun `outgoing gap strategy - long recovery in case of out of order is true test`() {
val ruleDuration = Duration.of(500, MILLIS)
val correction = ruleDuration.millis() / 2
val messages = CopyOnWriteArrayList<MessageSearchResponse>()
val handlerSettings: FixHandlerSettings =
createHandlerSettings(
BrokenConnConfiguration(
SchedulerType.CONSECUTIVE,
listOf(
RuleConfiguration(DEFAULT, duration = ruleDuration, cleanUpDuration = Duration.ZERO),
RuleConfiguration(
CREATE_OUTGOING_GAP,
duration = ruleDuration,
cleanUpDuration = Duration.ZERO,
missOutgoingMessagesConfiguration = MissMessageConfiguration(3),
recoveryConfig = RecoveryConfig(
outOfOrder = true
)
),
),
),
).apply {
isLoadMissedMessagesFromCradle = true
cradleSaveTimeoutMs = 500
}
val testContext =
createTestContext(
handlerSettings,
searchMessageGroups = { request -> searchMessageGroups(messages, request) },
)

val context = testContext.context
val channel = testContext.channel
val handler = testContext.fixHandler
val executor = Executors.newCachedThreadPool()

var msgSender: Future<*>? = null
try {
TestServerEmulator(
executor,
handler,
channel,
::getMessageId,
::logonResponse,
::resendRequest,
).use { server ->
whenever(channel.send(any(), any(), anyOrNull(), any())).doAnswer {
val sendMode = it.arguments[3] as IChannel.SendMode
val byteBuf = it.arguments[0] as ByteBuf
handleOutgoingMessages(handler, channel, messages, server, sendMode, byteBuf)
}

handler.use {
Expand All @@ -593,37 +734,66 @@ class StrategiesTest {
anyOrNull(),
)

msgSender =
executor.submit {
try {
var seq = 2
while (activeSending.get()) {
seq += 1
handler.send(businessMessage(seq).asExpandable(), mutableMapOf(), null)
Thread.sleep(1)
}
} catch (e: Exception) {
LOGGER.error(e) { "Send message problem" }
} finally {
LOGGER.info { "Stopped sending" }
}
}
msgSender = executor.submit { sendBusinessMessages(handler) }

verify(context, timeout(ruleDuration.toMillis() * 10)).send(
argThat {
toProto(TEST_BOOK, TEST_SCOPE).name.contains("$CREATE_OUTGOING_GAP strategy finished")
},
anyOrNull())
toProto(TEST_BOOK, TEST_SCOPE).name.contains("$CREATE_OUTGOING_GAP strategy finished")
},
anyOrNull())

}
}
} finally {
activeSending.set(false)
msgSender?.cancel(true)
executor.shutdownGracefully()
}
}

private fun searchMessageGroups(
messages: List<MessageSearchResponse>,
request: MessageGroupsSearchRequest
): Iterator<MessageSearchResponse> {
val from = if (request.hasStartTimestamp()) request.startTimestamp else null
val to = if (request.hasEndTimestamp()) request.endTimestamp else null
return when (request.searchDirection) {
TimeRelation.NEXT ->
messages
.toMutableList()
.asSequence()
.filter { response ->
(from == null || Timestamps.compare(from, response.message.messageId.timestamp) <= 0) &&
(to == null || Timestamps.compare(to, response.message.messageId.timestamp) >= 0)
}

TimeRelation.PREVIOUS ->
messages
.reversed()
.asSequence()
.filter { response ->
(from == null || Timestamps.compare(from, response.message.messageId.timestamp) >= 0) &&
(to == null || Timestamps.compare(to, response.message.messageId.timestamp) <= 0)
}

else -> error("Unsupported search direction")
}.iterator()
}

private fun sendBusinessMessages(handler: FixHandler) {
try {
var seq = 2
while (!Thread.currentThread().isInterrupted) {
seq += 1
handler.send(businessMessage(seq).asExpandable(), mutableMapOf(), null)
Thread.sleep(1)
}
} catch (e: Exception) {
LOGGER.error(e) { "Send message problem" }
} finally {
LOGGER.info { "Stopped sending" }
}
}

private fun verifyChangeStrategy(
channel: IChannel,
ruleDuration: Duration,
Expand Down

0 comments on commit 1647d07

Please sign in to comment.