Skip to content

Commit

Permalink
Load messages from cradle on Retransmission (#38)
Browse files Browse the repository at this point in the history
  • Loading branch information
isengrims authored Jun 28, 2023
1 parent 554565f commit f97ed1e
Show file tree
Hide file tree
Showing 11 changed files with 809 additions and 161 deletions.
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# th2-conn-dirty-fix (0.2.2)
# th2-conn-dirty-fix (0.3.0)

This microservice allows sending and receiving messages via FIX protocol

Expand Down Expand Up @@ -41,6 +41,7 @@ This microservice allows sending and receiving messages via FIX protocol
+ *resetSeqNumFlag* - resetting sequence number in initial Logon message (when conn started)
+ *resetOnLogon* - resetting the sequence number in Logon in other cases (e.g. disconnect)
+ *loadSequencesFromCradle* - defines if sequences will be loaded from cradle to use them in logon message.
+ *loadMissedMessagesFromCradle* - defines how retransmission will be handled. If true, then requested through `ResendRequest` messages (or messages requested on Logon with `NextExpectedSeqNum`) will be loaded from cradle.
+ *sessionStartTime* - UTC time when session starts. (`nullable`)
+ *sessionEndTime* - UTC time when session ends. required if startSessionTime is filled.
+ *sendingDateTimeFormat* - `SendingTime` field format for outgoing messages. (`nullable`, `default format` in this case is `"yyyyMMdd-HH:mm:ss.SSSSSSSSS"`)
Expand Down Expand Up @@ -324,6 +325,9 @@ spec:
```

# Changelog
## 0.3.0
* Ability to recover messages from cradle.

## 0.2.2
* session scheduling fix: hasn't worked for some ranges.

Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
release_version=0.2.2
release_version=0.3.0
jackson_version=2.11.2
188 changes: 160 additions & 28 deletions src/main/java/com/exactpro/th2/FixHandler.java

Large diffs are not rendered by default.

8 changes: 8 additions & 0 deletions src/main/java/com/exactpro/th2/FixHandlerSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public class FixHandlerSettings implements IHandlerSettings {
private Boolean useNextExpectedSeqNum = false;
private Boolean saveAdminMessages = false;
private Boolean loadSequencesFromCradle = false;
private Boolean loadMissedMessagesFromCradle = false;
private Boolean resetStateOnServerReset = false;

@JsonDeserialize(using = LocalTimeDeserializer.class)
Expand Down Expand Up @@ -234,6 +235,13 @@ public void setLoadSequencesFromCradle(Boolean loadSequencesFromCradle) {
this.loadSequencesFromCradle = loadSequencesFromCradle;
}

public Boolean isLoadMissedMessagesFromCradle() {
return loadMissedMessagesFromCradle;
}

public void setLoadMissedMessagesFromCradle(Boolean loadMissedMessagesFromCradle) {
this.loadMissedMessagesFromCradle = loadMissedMessagesFromCradle;
}
public Boolean getResetStateOnServerReset() {
return resetStateOnServerReset;
}
Expand Down
4 changes: 3 additions & 1 deletion src/main/java/com/exactpro/th2/constants/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public class Constants {
public static final Integer TARGET_COMP_ID_TAG = 56;
public static final Integer MSG_SEQ_NUM_TAG = 34;
public static final Integer SENDING_TIME_TAG = 52;
public static final Integer ORIG_SENDING_TIME_TAG = 122;
public static final Integer CHECKSUM_TAG = 10;
public static final Integer DEFAULT_APPL_VER_ID_TAG = 1137;
public static final Integer SENDER_SUB_ID_TAG = 50;
Expand Down Expand Up @@ -79,7 +80,8 @@ public class Constants {
public static final String SENDER_SUB_ID = SOH + SENDER_SUB_ID_TAG + "=";
public static final String RESET_SEQ_NUM = SOH + RESET_SEQ_NUM_TAG + "=";
public static final String NEXT_EXPECTED_SEQ_NUM = SOH + NEXT_EXPECTED_SEQ_NUMBER_TAG + "=";
public static final String POSS_DUP = SOH + NEXT_EXPECTED_SEQ_NUMBER_TAG + "=";
public static final String POSS_DUP = SOH + POSS_DUP_TAG + "=";
public static final String ORIG_SENDING_TIME = SOH + ORIG_SENDING_TIME_TAG + "=";

//message types
public static final String MSG_TYPE_LOGON = "A";
Expand Down
267 changes: 267 additions & 0 deletions src/main/kotlin/com/exactpro/th2/conn/dirty/fix/MessageLoader.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,267 @@
/*
* Copyright 2023 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.exactpro.th2.conn.dirty.fix

import com.exactpro.th2.SequenceHolder
import com.exactpro.th2.common.grpc.Direction
import com.exactpro.th2.common.message.toTimestamp
import com.exactpro.th2.common.util.toInstant
import com.exactpro.th2.constants.Constants.IS_POSS_DUP
import com.exactpro.th2.constants.Constants.MSG_SEQ_NUM_TAG
import com.exactpro.th2.constants.Constants.POSS_DUP
import com.exactpro.th2.constants.Constants.POSS_DUP_TAG
import com.exactpro.th2.dataprovider.grpc.DataProviderService
import com.exactpro.th2.dataprovider.grpc.MessageGroupResponse
import com.exactpro.th2.dataprovider.grpc.MessageSearchRequest
import com.exactpro.th2.dataprovider.grpc.MessageSearchResponse
import com.exactpro.th2.dataprovider.grpc.MessageStream
import com.exactpro.th2.dataprovider.grpc.TimeRelation
import com.exactpro.th2.lme.oe.util.ProviderCall
import com.google.protobuf.Int32Value
import com.google.protobuf.Timestamp
import com.google.protobuf.util.Timestamps.compare
import io.netty.buffer.ByteBuf
import io.netty.buffer.Unpooled
import java.time.Instant
import java.time.LocalDate
import java.time.LocalDateTime
import java.time.LocalTime
import java.time.OffsetDateTime
import java.time.OffsetTime
import java.time.ZoneId
import java.time.ZoneOffset
import java.time.ZonedDateTime
import java.util.concurrent.locks.ReentrantLock
import kotlin.concurrent.withLock
import kotlin.math.ceil
import mu.KotlinLogging

class MessageLoader(
private val dataProvider: DataProviderService,
private val sessionStartTime: LocalTime?
) {
private var sessionStart: ZonedDateTime
private val searchLock = ReentrantLock()

init {
val today = LocalDate.now(ZoneOffset.UTC)
val start = sessionStartTime?.atDate(today)
val now = LocalDateTime.now()
if(start == null) {
sessionStart = OffsetDateTime
.now(ZoneOffset.UTC)
.with(LocalTime.now())
.atZoneSameInstant(ZoneId.systemDefault())
} else {
sessionStart = if(start.isAfter(now)) {
OffsetDateTime
.now(ZoneOffset.UTC)
.minusDays(1)
.with(sessionStartTime)
.atZoneSameInstant(ZoneId.systemDefault())
} else {
OffsetDateTime
.now(ZoneOffset.UTC)
.with(sessionStartTime)
.atZoneSameInstant(ZoneId.systemDefault())
}
}
}

private var sessionStartTimestamp = sessionStart
.toInstant()
.toTimestamp()

private var previousDaySessionStart = sessionStart
.minusDays(1)
.toInstant()
.toTimestamp()

fun updateTime() {
searchLock.withLock {
sessionStart = ZonedDateTime
.now(ZoneOffset.UTC)
.with(OffsetTime.now(ZoneOffset.UTC))
sessionStartTimestamp = sessionStart
.toInstant()
.toTimestamp()
previousDaySessionStart = sessionStart
.minusDays(1)
.toInstant()
.toTimestamp()
}
}

fun loadInitialSequences(sessionAlias: String): SequenceHolder {
val serverSeq = ProviderCall.withCancellation {
searchMessage(
dataProvider.searchMessages(
createSearchRequest(
Instant.now().toTimestamp(),
Direction.FIRST,
sessionAlias
)
)
) { _, seqNum -> seqNum?.toInt() ?: 0 }
}
val clientSeq = ProviderCall.withCancellation {
searchMessage(
dataProvider.searchMessages(
createSearchRequest(
Instant.now().toTimestamp(),
Direction.SECOND,
sessionAlias
)
),
true
) { _, seqNum -> seqNum?.toInt() ?: 0 }
}
K_LOGGER.info { "Loaded sequences: client sequence - $clientSeq; server sequence - $serverSeq" }
return SequenceHolder(clientSeq, serverSeq)
}

fun processMessagesInRange(
direction: Direction,
sessionAlias: String,
fromSequence: Long,
processMessage: (ByteBuf) -> Boolean
) {
var timestamp: Timestamp? = null
ProviderCall.withCancellation {
val backwardIterator = dataProvider.searchMessages(
createSearchRequest(Instant.now().toTimestamp(), direction, sessionAlias)
)

var firstValidMessage = firstValidMessageDetails(backwardIterator) ?: return@withCancellation

var messagesToSkip = firstValidMessage.payloadSequence - fromSequence

timestamp = firstValidMessage.timestamp

while (backwardIterator.hasNext() && messagesToSkip > 0) {
val message = backwardIterator.next().message
if(compare(message.timestamp, previousDaySessionStart) <= 0) {
continue
}
timestamp = message.timestamp
messagesToSkip -= 1
if(messagesToSkip == 0L) {

val buf = Unpooled.copiedBuffer(message.bodyRaw.toByteArray())
val sequence = buf.findField(MSG_SEQ_NUM_TAG)?.value?.toInt() ?: continue

if(checkPossDup(buf)) {
val validMessage = firstValidMessageDetails(backwardIterator) ?: break

timestamp = validMessage.timestamp
if(validMessage.payloadSequence <= fromSequence) {
break
} else {
messagesToSkip = validMessage.payloadSequence - fromSequence
}

} else {

if(sequence <= fromSequence) {
break
} else {
messagesToSkip = sequence - fromSequence
}
}
}
}
}

val startSearchTimestamp = timestamp ?: return

K_LOGGER.info { "Loading retransmission messages from ${startSearchTimestamp.toInstant()}" }

ProviderCall.withCancellation {

val iterator = dataProvider.searchMessages(
createSearchRequest(
startSearchTimestamp,
direction,
sessionAlias,
TimeRelation.NEXT,
Instant.now().toTimestamp()
)
)

while (iterator.hasNext()) {
val message = Unpooled.buffer().writeBytes(iterator.next().message.bodyRaw.toByteArray())
if (!processMessage(message)) break
}
}
}

private fun <T> searchMessage(
iterator: Iterator<MessageSearchResponse>,
checkPossFlag: Boolean = false,
extractValue: (MessageGroupResponse?, String?) -> T
): T {
var message: MessageGroupResponse?
while (iterator.hasNext()) {
message = iterator.next().message
if(sessionStartTime != null && compare(sessionStartTimestamp, message.timestamp) > 0) {
return extractValue(message, null)
}

val bodyRaw = Unpooled.copiedBuffer(message.bodyRaw.toByteArray())
val seqNum = bodyRaw.findField(MSG_SEQ_NUM_TAG)?.value ?: continue

if(checkPossFlag && checkPossDup(bodyRaw)) continue

return extractValue(message, seqNum)
}
return extractValue(null, null)
}

private fun firstValidMessageDetails(iterator: Iterator<MessageSearchResponse>): MessageDetails? = searchMessage(
iterator,
true
) { message, seqNum ->
if(message == null || seqNum == null) return@searchMessage null
MessageDetails(seqNum.toInt(), message.messageId.sequence, message.timestamp)
}

private fun createSearchRequest(
timestamp: Timestamp,
direction: Direction,
sessionAlias: String,
searchDirection: TimeRelation = TimeRelation.PREVIOUS,
endTimestamp: Timestamp = previousDaySessionStart
) = MessageSearchRequest.newBuilder().apply {
startTimestamp = timestamp
this.endTimestamp = endTimestamp
addResponseFormats(BASE64_FORMAT)
addStream(
MessageStream.newBuilder()
.setName(sessionAlias)
.setDirection(direction)
)
this.searchDirection = searchDirection
}.build()

private fun checkPossDup(buf: ByteBuf): Boolean = buf.findField(POSS_DUP_TAG)?.value == IS_POSS_DUP

data class MessageDetails(val payloadSequence: Int, val messageSequence: Long, val timestamp: Timestamp)

companion object {
val K_LOGGER = KotlinLogging.logger { }
private const val BASE64_FORMAT = "BASE_64"
}
}
28 changes: 28 additions & 0 deletions src/main/kotlin/com/exactpro/th2/conn/dirty/fix/ProviderCall.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*******************************************************************************
* Copyright (c) 2023, Exactpro Systems LLC
* www.exactpro.com
* Build Software to Test Software
*
* All rights reserved.
* This is unpublished, licensed software, confidential and proprietary
* information which is the property of Exactpro Systems LLC or its licensors.
******************************************************************************/
package com.exactpro.th2.lme.oe.util

import io.grpc.Context

class ProviderCall {
companion object {
fun <T> withCancellation(code: () -> T): T {
return Context.current().withCancellation().use { context ->
val toRestore = context.attach()
val result = try {
code()
} finally {
context.detach(toRestore)
}
return@use result
}
}
}
}
Loading

0 comments on commit f97ed1e

Please sign in to comment.