Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[TH2-4991] Optional book per session configuration #51

Merged
merged 12 commits into from
Jun 20, 2024
1 change: 0 additions & 1 deletion .github/workflows/docker-publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ on:
branches:
- master
- version-*
- dev-version-*
paths:
- gradle.properties
# - package_info.json
Expand Down
13 changes: 11 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# th2-conn-dirty-fix (1.2.0)
# th2-conn-dirty-fix (1.3.0)

This microservice allows sending and receiving messages via FIX protocol

Expand Down Expand Up @@ -42,6 +42,7 @@ This microservice allows sending and receiving messages via FIX protocol
+ *resetSeqNumFlag* - resetting sequence number in initial Logon message (when conn started)
+ *resetOnLogon* - resetting the sequence number in Logon in other cases (e.g. disconnect)
+ *loadSequencesFromCradle* - defines if sequences will be loaded from cradle to use them in logon message.
+ *loadMissedMessagesFromCradle* - defines how retransmission will be handled. If true, then requested through `ResendRequest` messages (or messages requested on Logon with `NextExpectedSeqNum`) will be loaded from cradle.
+ *sessionStartTime* - UTC time when session starts. (`nullable`)
+ *sessionEndTime* - UTC time when session ends. required if startSessionTime is filled.
+ *sendingDateTimeFormat* - `SendingTime` field format for outgoing messages. (`nullable`, `default format` in this case is `"yyyyMMdd-HH:mm:ss.SSSSSSSSS"`)
Expand Down Expand Up @@ -327,8 +328,13 @@ spec:

# Changelog

## 1.2.0
## 1.3.0
* Added support for th2 transport protocol
* Added configuration option for non-default book per session.
* loading requested messages from cradle.

## 1.1.1
* fix scheduling: hasn't worked for some ranges.

## 1.1.0
* state reset option on server update.
Expand All @@ -343,6 +349,9 @@ spec:
## 1.0.0
* Bump `conn-dirty-tcp-core` to `3.0.0` for books and pages support

## 0.3.0
* Ability to recover messages from cradle.

## 0.2.0
* optional state reset on silent server reset.

Expand Down
8 changes: 4 additions & 4 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,13 @@ repositories {
}

dependencies {
api platform('com.exactpro.th2:bom:4.2.0')
api platform('com.exactpro.th2:bom:4.4.0')

implementation "com.exactpro.th2:common:5.3.0-json-raw-body-5200549463-bac6816-SNAPSHOT"
implementation "com.exactpro.th2:common-utils:2.1.0-transport-protocol-5153730274-14792e9-SNAPSHOT"
implementation "com.exactpro.th2:common:5.3.0-separate-executor+"
implementation "com.exactpro.th2:common-utils:2.1.0-transport-protocol+"

implementation 'com.exactpro.th2:netty-bytebuf-utils:0.0.1'
implementation('com.exactpro.th2:conn-dirty-tcp-core:3.1.0-TH2-4907-5201637508-59b18cc-SNAPSHOT') {
implementation('com.exactpro.th2:conn-dirty-tcp-core:3.1.0-TH2-4991+') {
exclude group: 'org.slf4j', module: 'slf4j-log4j12'
because 'Projects should use only slf4j-api, without coupling to a certain implementation'
}
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
release_version=1.2.0
release_version=1.3.0
jackson_version=2.15.1
207 changes: 172 additions & 35 deletions src/main/java/com/exactpro/th2/FixHandler.java

Large diffs are not rendered by default.

9 changes: 9 additions & 0 deletions src/main/java/com/exactpro/th2/FixHandlerSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public class FixHandlerSettings implements IHandlerSettings {
private Boolean useNextExpectedSeqNum = false;
private Boolean saveAdminMessages = false;
private Boolean loadSequencesFromCradle = false;
private Boolean loadMissedMessagesFromCradle = false;
private Boolean resetStateOnServerReset = false;

@JsonDeserialize(using = LocalTimeDeserializer.class)
Expand Down Expand Up @@ -234,6 +235,14 @@ public void setLoadSequencesFromCradle(Boolean loadSequencesFromCradle) {
this.loadSequencesFromCradle = loadSequencesFromCradle;
}

public Boolean isLoadMissedMessagesFromCradle() {
return loadMissedMessagesFromCradle;
}

public void setLoadMissedMessagesFromCradle(Boolean loadMissedMessagesFromCradle) {
this.loadMissedMessagesFromCradle = loadMissedMessagesFromCradle;
}

public Boolean getResetStateOnServerReset() {
return resetStateOnServerReset;
}
Expand Down
4 changes: 3 additions & 1 deletion src/main/java/com/exactpro/th2/constants/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public class Constants {
public static final Integer TARGET_COMP_ID_TAG = 56;
public static final Integer MSG_SEQ_NUM_TAG = 34;
public static final Integer SENDING_TIME_TAG = 52;
public static final Integer ORIG_SENDING_TIME_TAG = 122;
public static final Integer CHECKSUM_TAG = 10;
public static final Integer DEFAULT_APPL_VER_ID_TAG = 1137;
public static final Integer SENDER_SUB_ID_TAG = 50;
Expand Down Expand Up @@ -79,7 +80,8 @@ public class Constants {
public static final String SENDER_SUB_ID = SOH + SENDER_SUB_ID_TAG + "=";
public static final String RESET_SEQ_NUM = SOH + RESET_SEQ_NUM_TAG + "=";
public static final String NEXT_EXPECTED_SEQ_NUM = SOH + NEXT_EXPECTED_SEQ_NUMBER_TAG + "=";
public static final String POSS_DUP = SOH + NEXT_EXPECTED_SEQ_NUMBER_TAG + "=";
public static final String POSS_DUP = SOH + POSS_DUP_TAG + "=";
public static final String ORIG_SENDING_TIME = SOH + ORIG_SENDING_TIME_TAG + "=";

//message types
public static final String MSG_TYPE_LOGON = "A";
Expand Down
275 changes: 275 additions & 0 deletions src/main/kotlin/com/exactpro/th2/conn/dirty/fix/MessageLoader.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,275 @@
/*
* Copyright 2023 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.exactpro.th2.conn.dirty.fix

import com.exactpro.th2.SequenceHolder
import com.exactpro.th2.common.grpc.Direction
import com.exactpro.th2.common.message.toTimestamp
import com.exactpro.th2.common.util.toInstant
import com.exactpro.th2.constants.Constants.IS_POSS_DUP
import com.exactpro.th2.constants.Constants.MSG_SEQ_NUM_TAG
import com.exactpro.th2.constants.Constants.POSS_DUP_TAG
import com.exactpro.th2.dataprovider.lw.grpc.DataProviderService
import com.exactpro.th2.dataprovider.lw.grpc.MessageGroupResponse
import com.exactpro.th2.dataprovider.lw.grpc.MessageSearchRequest
import com.exactpro.th2.dataprovider.lw.grpc.MessageSearchResponse
import com.exactpro.th2.dataprovider.lw.grpc.MessageStream
import com.exactpro.th2.dataprovider.lw.grpc.TimeRelation
import com.exactpro.th2.lme.oe.util.ProviderCall
import com.google.protobuf.Timestamp
import com.google.protobuf.util.Timestamps.compare
import io.netty.buffer.ByteBuf
import io.netty.buffer.Unpooled
import java.time.Instant
import java.time.LocalDate
import java.time.LocalDateTime
import java.time.LocalTime
import java.time.OffsetDateTime
import java.time.OffsetTime
import java.time.ZoneId
import java.time.ZoneOffset
import java.time.ZonedDateTime
import java.util.concurrent.locks.ReentrantLock
import kotlin.concurrent.withLock
import mu.KotlinLogging

class MessageLoader(
private val dataProvider: DataProviderService,
private val sessionStartTime: LocalTime?,
private val bookName: String
) {
private var sessionStart: ZonedDateTime
private val searchLock = ReentrantLock()

init {
val today = LocalDate.now(ZoneOffset.UTC)
val start = sessionStartTime?.atDate(today)
val now = LocalDateTime.now()
if(start == null) {
sessionStart = OffsetDateTime
.now(ZoneOffset.UTC)
.with(LocalTime.now())
.atZoneSameInstant(ZoneId.systemDefault())
} else {
sessionStart = if(start.isAfter(now)) {
OffsetDateTime
.now(ZoneOffset.UTC)
.minusDays(1)
.with(sessionStartTime)
.atZoneSameInstant(ZoneId.systemDefault())
} else {
OffsetDateTime
.now(ZoneOffset.UTC)
.with(sessionStartTime)
.atZoneSameInstant(ZoneId.systemDefault())
}
}
}

private var sessionStartTimestamp = sessionStart
.toInstant()
.toTimestamp()

private var previousDaySessionStart = sessionStart
.minusDays(1)
.toInstant()
.toTimestamp()

fun updateTime() {
searchLock.withLock {
sessionStart = ZonedDateTime
.now(ZoneOffset.UTC)
.with(OffsetTime.now(ZoneOffset.UTC))
sessionStartTimestamp = sessionStart
.toInstant()
.toTimestamp()
previousDaySessionStart = sessionStart
.minusDays(1)
.toInstant()
.toTimestamp()
}
}

fun loadInitialSequences(sessionAlias: String): SequenceHolder = searchLock.withLock {
val serverSeq = ProviderCall.withCancellation {
searchMessage(
dataProvider.searchMessages(
createSearchRequest(
Instant.now().toTimestamp(),
Direction.FIRST,
sessionAlias
)
)
) { _, seqNum -> seqNum?.toInt() ?: 0 }
}
val clientSeq = ProviderCall.withCancellation {
searchMessage(
dataProvider.searchMessages(
createSearchRequest(
Instant.now().toTimestamp(),
Direction.SECOND,
sessionAlias
)
),
true
) { _, seqNum -> seqNum?.toInt() ?: 0 }
}
K_LOGGER.info { "Loaded sequences: client sequence - $clientSeq; server sequence - $serverSeq" }
return SequenceHolder(clientSeq, serverSeq)
}

fun processMessagesInRange(
direction: Direction,
sessionAlias: String,
fromSequence: Long,
processMessage: (ByteBuf) -> Boolean
) = searchLock.withLock {
processMessagesInRangeInternal(direction, sessionAlias, fromSequence, processMessage)
}

fun processMessagesInRangeInternal(
direction: Direction,
sessionAlias: String,
fromSequence: Long,
processMessage: (ByteBuf) -> Boolean
) {
var timestamp: Timestamp? = null
ProviderCall.withCancellation {
val backwardIterator = dataProvider.searchMessages(
createSearchRequest(Instant.now().toTimestamp(), direction, sessionAlias)
)

var firstValidMessage = firstValidMessageDetails(backwardIterator) ?: return@withCancellation

var messagesToSkip = firstValidMessage.payloadSequence - fromSequence

timestamp = firstValidMessage.timestamp

while (backwardIterator.hasNext() && messagesToSkip > 0) {
val message = backwardIterator.next().message
if(compare(message.messageId.timestamp, previousDaySessionStart) <= 0) {
continue
}
timestamp = message.messageId.timestamp
messagesToSkip -= 1
if(messagesToSkip == 0L) {

val buf = Unpooled.copiedBuffer(message.bodyRaw.toByteArray())
val sequence = buf.findField(MSG_SEQ_NUM_TAG)?.value?.toInt() ?: continue

if(checkPossDup(buf)) {
val validMessage = firstValidMessageDetails(backwardIterator) ?: break

timestamp = validMessage.timestamp
if(validMessage.payloadSequence <= fromSequence) {
break
} else {
messagesToSkip = validMessage.payloadSequence - fromSequence
}

} else {

if(sequence <= fromSequence) {
break
} else {
messagesToSkip = sequence - fromSequence
}
}
}
}
}

val startSearchTimestamp = timestamp ?: return

K_LOGGER.info { "Loading retransmission messages from ${startSearchTimestamp.toInstant()}" }

ProviderCall.withCancellation {

val iterator = dataProvider.searchMessages(
createSearchRequest(
startSearchTimestamp,
direction,
sessionAlias,
TimeRelation.NEXT,
Instant.now().toTimestamp()
)
)

while (iterator.hasNext()) {
val message = Unpooled.buffer().writeBytes(iterator.next().message.bodyRaw.toByteArray())
if (!processMessage(message)) break
}
}
}

private fun <T> searchMessage(
iterator: Iterator<MessageSearchResponse>,
checkPossFlag: Boolean = false,
extractValue: (MessageGroupResponse?, String?) -> T
): T {
var message: MessageGroupResponse?
while (iterator.hasNext()) {
message = iterator.next().message
if(sessionStartTime != null && compare(sessionStartTimestamp, message.messageId.timestamp) > 0) {
return extractValue(message, null)
}

val bodyRaw = Unpooled.copiedBuffer(message.bodyRaw.toByteArray())
val seqNum = bodyRaw.findField(MSG_SEQ_NUM_TAG)?.value ?: continue

if(checkPossFlag && checkPossDup(bodyRaw)) continue

return extractValue(message, seqNum)
}
return extractValue(null, null)
}

private fun firstValidMessageDetails(iterator: Iterator<MessageSearchResponse>): MessageDetails? = searchMessage(
iterator,
true
) { message, seqNum ->
if(message == null || seqNum == null) return@searchMessage null
MessageDetails(seqNum.toInt(), message.messageId.sequence, message.messageId.timestamp)
}

private fun createSearchRequest(
timestamp: Timestamp,
direction: Direction,
sessionAlias: String,
searchDirection: TimeRelation = TimeRelation.PREVIOUS,
endTimestamp: Timestamp = previousDaySessionStart
) = MessageSearchRequest.newBuilder().apply {
startTimestamp = timestamp
this.endTimestamp = endTimestamp
addResponseFormats(BASE64_FORMAT)
addStream(
MessageStream.newBuilder()
.setName(sessionAlias)
.setDirection(direction)
)
bookIdBuilder.name = bookName
this.searchDirection = searchDirection
}.build()

private fun checkPossDup(buf: ByteBuf): Boolean = buf.findField(POSS_DUP_TAG)?.value == IS_POSS_DUP

data class MessageDetails(val payloadSequence: Int, val messageSequence: Long, val timestamp: Timestamp)

companion object {
val K_LOGGER = KotlinLogging.logger { }
private const val BASE64_FORMAT = "BASE_64"
}
}
Loading