diff --git a/README.md b/README.md index 1975bd6..530b50c 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# th2-conn-dirty-fix (0.2.2) +# th2-conn-dirty-fix (0.3.0) This microservice allows sending and receiving messages via FIX protocol @@ -41,6 +41,7 @@ This microservice allows sending and receiving messages via FIX protocol + *resetSeqNumFlag* - resetting sequence number in initial Logon message (when conn started) + *resetOnLogon* - resetting the sequence number in Logon in other cases (e.g. disconnect) + *loadSequencesFromCradle* - defines if sequences will be loaded from cradle to use them in logon message. ++ *loadMissedMessagesFromCradle* - defines how retransmission will be handled. If true, then requested through `ResendRequest` messages (or messages requested on Logon with `NextExpectedSeqNum`) will be loaded from cradle. + *sessionStartTime* - UTC time when session starts. (`nullable`) + *sessionEndTime* - UTC time when session ends. required if startSessionTime is filled. + *sendingDateTimeFormat* - `SendingTime` field format for outgoing messages. (`nullable`, `default format` in this case is `"yyyyMMdd-HH:mm:ss.SSSSSSSSS"`) @@ -324,6 +325,9 @@ spec: ``` # Changelog +## 0.3.0 +* Ability to recover messages from cradle. + ## 0.2.2 * session scheduling fix: hasn't worked for some ranges. diff --git a/gradle.properties b/gradle.properties index b0771f8..bf2b61a 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,2 +1,2 @@ -release_version=0.2.2 +release_version=0.3.0 jackson_version=2.11.2 diff --git a/src/main/java/com/exactpro/th2/FixHandler.java b/src/main/java/com/exactpro/th2/FixHandler.java index 7ce70cc..e5a075f 100644 --- a/src/main/java/com/exactpro/th2/FixHandler.java +++ b/src/main/java/com/exactpro/th2/FixHandler.java @@ -17,10 +17,11 @@ package com.exactpro.th2; import com.exactpro.th2.common.event.Event; +import com.exactpro.th2.common.grpc.Direction; import com.exactpro.th2.common.grpc.MessageID; import com.exactpro.th2.common.grpc.RawMessage; import com.exactpro.th2.conn.dirty.fix.FixField; -import com.exactpro.th2.conn.dirty.fix.SequenceLoader; +import com.exactpro.th2.conn.dirty.fix.MessageLoader; import com.exactpro.th2.conn.dirty.tcp.core.api.IChannel; import com.exactpro.th2.conn.dirty.tcp.core.api.IChannel.SendMode; import com.exactpro.th2.conn.dirty.tcp.core.api.IHandler; @@ -51,6 +52,8 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReentrantLock; +import kotlin.jvm.functions.Function1; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.exception.ExceptionUtils; import org.jetbrains.annotations.NotNull; @@ -66,6 +69,7 @@ import static com.exactpro.th2.conn.dirty.fix.KeyFileType.Companion.OperationMode.ENCRYPT_MODE; import static com.exactpro.th2.conn.dirty.tcp.core.util.CommonUtil.getEventId; import static com.exactpro.th2.conn.dirty.tcp.core.util.CommonUtil.toByteBuf; +import static com.exactpro.th2.constants.Constants.ADMIN_MESSAGES; import static com.exactpro.th2.constants.Constants.BEGIN_SEQ_NO; import static com.exactpro.th2.constants.Constants.BEGIN_SEQ_NO_TAG; import static com.exactpro.th2.constants.Constants.BEGIN_STRING_TAG; @@ -98,7 +102,10 @@ import static com.exactpro.th2.constants.Constants.NEW_SEQ_NO_TAG; import static com.exactpro.th2.constants.Constants.NEXT_EXPECTED_SEQ_NUM; import static com.exactpro.th2.constants.Constants.NEXT_EXPECTED_SEQ_NUMBER_TAG; +import static com.exactpro.th2.constants.Constants.ORIG_SENDING_TIME; +import static com.exactpro.th2.constants.Constants.ORIG_SENDING_TIME_TAG; import static com.exactpro.th2.constants.Constants.PASSWORD; +import static com.exactpro.th2.constants.Constants.POSS_DUP; import static com.exactpro.th2.constants.Constants.POSS_DUP_TAG; import static com.exactpro.th2.constants.Constants.RESET_SEQ_NUM; import static com.exactpro.th2.constants.Constants.SENDER_COMP_ID; @@ -145,7 +152,8 @@ public class FixHandler implements AutoCloseable, IHandler { private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1); private final IHandlerContext context; private final InetSocketAddress address; - private final DataProviderService dataProvider; + private final MessageLoader messageLoader; + private final ReentrantLock recoveryLock = new ReentrantLock(); private AtomicReference> heartbeatTimer = new AtomicReference<>(CompletableFuture.completedFuture(null)); private AtomicReference> testRequestTimer = new AtomicReference<>(CompletableFuture.completedFuture(null)); @@ -156,10 +164,13 @@ public class FixHandler implements AutoCloseable, IHandler { public FixHandler(IHandlerContext context) { this.context = context; this.settings = (FixHandlerSettings) context.getSettings(); - if(settings.isLoadSequencesFromCradle()) { - this.dataProvider = context.getGrpcService(DataProviderService.class); + if(settings.isLoadSequencesFromCradle() || settings.isLoadMissedMessagesFromCradle()) { + this.messageLoader = new MessageLoader( + context.getGrpcService(DataProviderService.class), + settings.getSessionStartTime() + ); } else { - this.dataProvider = null; + this.messageLoader = null; } if(settings.getSessionStartTime() != null) { @@ -221,8 +232,7 @@ public FixHandler(IHandlerContext context) { public void onStart() { channel = context.createChannel(address, settings.getSecurity(), Map.of(), true, settings.getReconnectDelay() * 1000L, Integer.MAX_VALUE); if(settings.isLoadSequencesFromCradle()) { - SequenceLoader seqLoader = new SequenceLoader(dataProvider, settings.getSessionStartTime(), channel.getSessionAlias()); - SequenceHolder sequences = seqLoader.load(); + SequenceHolder sequences = messageLoader.loadInitialSequences(channel.getSessionAlias()); LOGGER.info("Loaded sequences are: client - {}, server - {}", sequences.getClientSeq(), sequences.getServerSeq()); msgSeqNum.set(sequences.getClientSeq()); serverMsgSeqNum.set(sequences.getServerSeq()); @@ -254,7 +264,14 @@ public CompletableFuture send(@NotNull RawMessage rawMessage) { } } - return channel.send(toByteBuf(rawMessage.getBody()), rawMessage.getMetadata().getPropertiesMap(), getEventId(rawMessage), SendMode.HANDLE_AND_MANGLE); + CompletableFuture result = CompletableFuture.completedFuture(null); + try { + recoveryLock.lock(); + result = channel.send(toByteBuf(rawMessage.getBody()), rawMessage.getMetadata().getPropertiesMap(), getEventId(rawMessage), SendMode.HANDLE_AND_MANGLE); + } finally { + recoveryLock.unlock(); + } + return result; } @Override @@ -468,12 +485,13 @@ private void reset() { msgSeqNum.set(0); serverMsgSeqNum.set(0); sessionActive.set(true); + messageLoader.updateTime(); channel.open(); } public void sendResendRequest(int beginSeqNo, int endSeqNo) { //do private StringBuilder resendRequest = new StringBuilder(); - setHeader(resendRequest, MSG_TYPE_RESEND_REQUEST, msgSeqNum.incrementAndGet()); + setHeader(resendRequest, MSG_TYPE_RESEND_REQUEST, msgSeqNum.incrementAndGet(), null); resendRequest.append(BEGIN_SEQ_NO).append(beginSeqNo).append(SOH); resendRequest.append(END_SEQ_NO).append(endSeqNo).append(SOH); setChecksumAndBodyLength(resendRequest); @@ -483,7 +501,7 @@ public void sendResendRequest(int beginSeqNo, int endSeqNo) { //do private void sendResendRequest(int beginSeqNo) { //do private StringBuilder resendRequest = new StringBuilder(); - setHeader(resendRequest, MSG_TYPE_RESEND_REQUEST, msgSeqNum.incrementAndGet()); + setHeader(resendRequest, MSG_TYPE_RESEND_REQUEST, msgSeqNum.incrementAndGet(), null); resendRequest.append(BEGIN_SEQ_NO).append(beginSeqNo); resendRequest.append(END_SEQ_NO).append(0); setChecksumAndBodyLength(resendRequest); @@ -515,24 +533,96 @@ private void handleResendRequest(ByteBuf message) { } private void recovery(int beginSeqNo, int endSeqNo) { - if (endSeqNo == 0) { - endSeqNo = msgSeqNum.get() + 1; - } - LOGGER.info("Returning messages from {} to {}", beginSeqNo, endSeqNo); + AtomicInteger lastProcessedSequence = new AtomicInteger(beginSeqNo - 1); + try { + recoveryLock.lock(); - StringBuilder sequenceReset = new StringBuilder(); - setHeader(sequenceReset, MSG_TYPE_SEQUENCE_RESET, beginSeqNo); - sequenceReset.append(GAP_FILL_FLAG).append("Y"); - sequenceReset.append(NEW_SEQ_NO).append(endSeqNo); - setChecksumAndBodyLength(sequenceReset); + if (endSeqNo == 0) { + endSeqNo = msgSeqNum.get() + 1; + } - channel.send(Unpooled.wrappedBuffer(sequenceReset.toString().getBytes(StandardCharsets.UTF_8)), Collections.emptyMap(), null, SendMode.MANGLE); - resetHeartbeatTask(); + int endSeq = endSeqNo; + LOGGER.info("Loading messages from {} to {}", beginSeqNo, endSeqNo); + if(settings.isLoadMissedMessagesFromCradle()) { + Function1 processMessage = (buf) -> { + FixField seqNum = findField(buf, MSG_SEQ_NUM_TAG); + FixField msgTypeField = findField(buf, MSG_TYPE_TAG); + if(seqNum == null || seqNum.getValue() == null + || msgTypeField == null || msgTypeField.getValue() == null) { + return true; + } + Integer sequence = Integer.parseInt(seqNum.getValue()); + String msgType = msgTypeField.getValue(); + + if(sequence < beginSeqNo) return true; + if(sequence > endSeq) return false; + + if(ADMIN_MESSAGES.contains(msgType)) return true; + FixField possDup = findField(buf, POSS_DUP_TAG); + if(possDup != null && possDup.getValue() == IS_POSS_DUP) return true; + + if(sequence - 1 != lastProcessedSequence.get() ) { + int newSeqNo = sequence; + StringBuilder sequenceReset = + createSequenceReset(Math.max(beginSeqNo, lastProcessedSequence.get() + 1), newSeqNo); + channel.send(Unpooled.wrappedBuffer(sequenceReset.toString().getBytes(StandardCharsets.UTF_8)), Collections.emptyMap(), null, SendMode.MANGLE); + resetHeartbeatTask(); + } + + setTime(buf); + setPossDup(buf); + updateLength(buf); + updateChecksum(buf); + channel.send(buf, Collections.emptyMap(), null, SendMode.MANGLE); + + resetHeartbeatTask(); + + lastProcessedSequence.set(sequence); + return true; + }; + + messageLoader.processMessagesInRange( + Direction.SECOND, + channel.getSessionAlias(), + beginSeqNo, + processMessage + ); + + if(lastProcessedSequence.get() < endSeq) { + String seqReset = createSequenceReset(Math.max(lastProcessedSequence.get() + 1, beginSeqNo), msgSeqNum.get() + 1).toString(); + channel.send( + Unpooled.wrappedBuffer(seqReset.getBytes(StandardCharsets.UTF_8)), + Collections.emptyMap(), null, SendMode.MANGLE + ); + } + } else { + String seqReset = + createSequenceReset(beginSeqNo, msgSeqNum.get() + 1).toString(); + channel.send( + Unpooled.wrappedBuffer(seqReset.getBytes(StandardCharsets.UTF_8)), + Collections.emptyMap(), null, SendMode.MANGLE + ); + } + resetHeartbeatTask(); + + } catch (Exception e) { + LOGGER.error("Error while loading messages for recovery", e); + String seqReset = + createSequenceReset(Math.max(beginSeqNo, lastProcessedSequence.get() + 1), msgSeqNum.get() + 1).toString(); + channel.send( + Unpooled.buffer().writeBytes(seqReset.getBytes(StandardCharsets.UTF_8)), + Collections.emptyMap(), null, SendMode.MANGLE + ); + } finally { + recoveryLock.unlock(); + } } private void sendSequenceReset() { StringBuilder sequenceReset = new StringBuilder(); - setHeader(sequenceReset, MSG_TYPE_SEQUENCE_RESET, msgSeqNum.incrementAndGet()); + String time = getTime(); + setHeader(sequenceReset, MSG_TYPE_SEQUENCE_RESET, msgSeqNum.incrementAndGet(), time); + sequenceReset.append(ORIG_SENDING_TIME).append(time); sequenceReset.append(NEW_SEQ_NO).append(msgSeqNum.get() + 1); setChecksumAndBodyLength(sequenceReset); @@ -691,7 +781,7 @@ public void sendHeartbeat() { StringBuilder heartbeat = new StringBuilder(); int seqNum = msgSeqNum.incrementAndGet(); - setHeader(heartbeat, MSG_TYPE_HEARTBEAT, seqNum); + setHeader(heartbeat, MSG_TYPE_HEARTBEAT, seqNum, null); setChecksumAndBodyLength(heartbeat); if (enabled.get()) { @@ -706,7 +796,7 @@ public void sendHeartbeat() { public void sendTestRequest() { //do private StringBuilder testRequest = new StringBuilder(); - setHeader(testRequest, MSG_TYPE_TEST_REQUEST, msgSeqNum.incrementAndGet()); + setHeader(testRequest, MSG_TYPE_TEST_REQUEST, msgSeqNum.incrementAndGet(), null); testRequest.append(TEST_REQ_ID).append(testReqID.incrementAndGet()); setChecksumAndBodyLength(testRequest); if (enabled.get()) { @@ -731,7 +821,7 @@ public void sendLogon() { else reset = settings.getResetOnLogon(); if (reset) msgSeqNum.getAndSet(0); - setHeader(logon, MSG_TYPE_LOGON, msgSeqNum.get() + 1); + setHeader(logon, MSG_TYPE_LOGON, msgSeqNum.get() + 1, null); if (settings.useNextExpectedSeqNum()) logon.append(NEXT_EXPECTED_SEQ_NUM).append(serverMsgSeqNum.get() + 1); if (settings.getEncryptMethod() != null) logon.append(ENCRYPT_METHOD).append(settings.getEncryptMethod()); logon.append(HEART_BT_INT).append(settings.getHeartBtInt()); @@ -761,7 +851,7 @@ public void sendLogon() { private void sendLogout() { if (enabled.get()) { StringBuilder logout = new StringBuilder(); - setHeader(logout, MSG_TYPE_LOGOUT, msgSeqNum.incrementAndGet()); + setHeader(logout, MSG_TYPE_LOGOUT, msgSeqNum.incrementAndGet(), null); setChecksumAndBodyLength(logout); LOGGER.debug("Sending logout - {}", logout); @@ -815,14 +905,19 @@ private void waitLogoutResponse() { } } - private void setHeader(StringBuilder stringBuilder, String msgType, Integer seqNum) { + private void setHeader(StringBuilder stringBuilder, String msgType, Integer seqNum, String time) { stringBuilder.append(BEGIN_STRING_TAG).append("=").append(settings.getBeginString()); stringBuilder.append(MSG_TYPE).append(msgType); stringBuilder.append(MSG_SEQ_NUM).append(seqNum); if (settings.getSenderCompID() != null) stringBuilder.append(SENDER_COMP_ID).append(settings.getSenderCompID()); if (settings.getTargetCompID() != null) stringBuilder.append(TARGET_COMP_ID).append(settings.getTargetCompID()); if (settings.getSenderSubID() != null) stringBuilder.append(SENDER_SUB_ID).append(settings.getSenderSubID()); - stringBuilder.append(SENDING_TIME).append(getTime()); + stringBuilder.append(SENDING_TIME); + if(time != null) { + stringBuilder.append(time); + } else { + stringBuilder.append(getTime()); + } } private void setChecksumAndBodyLength(StringBuilder stringBuilder) { @@ -877,6 +972,43 @@ public String getTime() { return formatter.format(datetime); } + private void setTime(ByteBuf buf) { + FixField sendingTime = findField(buf, SENDING_TIME_TAG); + FixField seqNum = requireNonNull(findField(buf, MSG_SEQ_NUM_TAG), "SeqNum field was null."); + + String time = getTime(); + if (sendingTime == null) { + seqNum.insertNext(SENDING_TIME, time).insertNext(ORIG_SENDING_TIME, time); + } else { + String value = sendingTime.getValue(); + + if (value == null || value.isEmpty() || value.equals("null")) { + sendingTime.setValue(time); + sendingTime.insertNext(ORIG_SENDING_TIME, time); + } else { + sendingTime.setValue(time); + sendingTime.insertNext(ORIG_SENDING_TIME, value); + } + } + } + + private void setPossDup(ByteBuf buf) { + FixField sendingTime = requireNonNull(findField(buf, SENDING_TIME_TAG)); + sendingTime.insertNext(POSS_DUP_TAG, IS_POSS_DUP); + } + + private StringBuilder createSequenceReset(int seqNo, int newSeqNo) { + StringBuilder sequenceReset = new StringBuilder(); + String time = getTime(); + setHeader(sequenceReset, MSG_TYPE_SEQUENCE_RESET, seqNo, time); + sequenceReset.append(ORIG_SENDING_TIME).append(time); + sequenceReset.append(POSS_DUP).append(IS_POSS_DUP); + sequenceReset.append(GAP_FILL_FLAG).append("Y"); + sequenceReset.append(NEW_SEQ_NO).append(newSeqNo); + setChecksumAndBodyLength(sequenceReset); + return sequenceReset; + } + public AtomicBoolean getEnabled() { return enabled; } diff --git a/src/main/java/com/exactpro/th2/FixHandlerSettings.java b/src/main/java/com/exactpro/th2/FixHandlerSettings.java index c2563d4..c6995ca 100644 --- a/src/main/java/com/exactpro/th2/FixHandlerSettings.java +++ b/src/main/java/com/exactpro/th2/FixHandlerSettings.java @@ -55,6 +55,7 @@ public class FixHandlerSettings implements IHandlerSettings { private Boolean useNextExpectedSeqNum = false; private Boolean saveAdminMessages = false; private Boolean loadSequencesFromCradle = false; + private Boolean loadMissedMessagesFromCradle = false; private Boolean resetStateOnServerReset = false; @JsonDeserialize(using = LocalTimeDeserializer.class) @@ -234,6 +235,13 @@ public void setLoadSequencesFromCradle(Boolean loadSequencesFromCradle) { this.loadSequencesFromCradle = loadSequencesFromCradle; } + public Boolean isLoadMissedMessagesFromCradle() { + return loadMissedMessagesFromCradle; + } + + public void setLoadMissedMessagesFromCradle(Boolean loadMissedMessagesFromCradle) { + this.loadMissedMessagesFromCradle = loadMissedMessagesFromCradle; + } public Boolean getResetStateOnServerReset() { return resetStateOnServerReset; } diff --git a/src/main/java/com/exactpro/th2/constants/Constants.java b/src/main/java/com/exactpro/th2/constants/Constants.java index b796a44..35252d7 100644 --- a/src/main/java/com/exactpro/th2/constants/Constants.java +++ b/src/main/java/com/exactpro/th2/constants/Constants.java @@ -33,6 +33,7 @@ public class Constants { public static final Integer TARGET_COMP_ID_TAG = 56; public static final Integer MSG_SEQ_NUM_TAG = 34; public static final Integer SENDING_TIME_TAG = 52; + public static final Integer ORIG_SENDING_TIME_TAG = 122; public static final Integer CHECKSUM_TAG = 10; public static final Integer DEFAULT_APPL_VER_ID_TAG = 1137; public static final Integer SENDER_SUB_ID_TAG = 50; @@ -79,7 +80,8 @@ public class Constants { public static final String SENDER_SUB_ID = SOH + SENDER_SUB_ID_TAG + "="; public static final String RESET_SEQ_NUM = SOH + RESET_SEQ_NUM_TAG + "="; public static final String NEXT_EXPECTED_SEQ_NUM = SOH + NEXT_EXPECTED_SEQ_NUMBER_TAG + "="; - public static final String POSS_DUP = SOH + NEXT_EXPECTED_SEQ_NUMBER_TAG + "="; + public static final String POSS_DUP = SOH + POSS_DUP_TAG + "="; + public static final String ORIG_SENDING_TIME = SOH + ORIG_SENDING_TIME_TAG + "="; //message types public static final String MSG_TYPE_LOGON = "A"; diff --git a/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/MessageLoader.kt b/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/MessageLoader.kt new file mode 100644 index 0000000..8bcdd54 --- /dev/null +++ b/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/MessageLoader.kt @@ -0,0 +1,267 @@ +/* + * Copyright 2023 Exactpro (Exactpro Systems Limited) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.exactpro.th2.conn.dirty.fix + +import com.exactpro.th2.SequenceHolder +import com.exactpro.th2.common.grpc.Direction +import com.exactpro.th2.common.message.toTimestamp +import com.exactpro.th2.common.util.toInstant +import com.exactpro.th2.constants.Constants.IS_POSS_DUP +import com.exactpro.th2.constants.Constants.MSG_SEQ_NUM_TAG +import com.exactpro.th2.constants.Constants.POSS_DUP +import com.exactpro.th2.constants.Constants.POSS_DUP_TAG +import com.exactpro.th2.dataprovider.grpc.DataProviderService +import com.exactpro.th2.dataprovider.grpc.MessageGroupResponse +import com.exactpro.th2.dataprovider.grpc.MessageSearchRequest +import com.exactpro.th2.dataprovider.grpc.MessageSearchResponse +import com.exactpro.th2.dataprovider.grpc.MessageStream +import com.exactpro.th2.dataprovider.grpc.TimeRelation +import com.exactpro.th2.lme.oe.util.ProviderCall +import com.google.protobuf.Int32Value +import com.google.protobuf.Timestamp +import com.google.protobuf.util.Timestamps.compare +import io.netty.buffer.ByteBuf +import io.netty.buffer.Unpooled +import java.time.Instant +import java.time.LocalDate +import java.time.LocalDateTime +import java.time.LocalTime +import java.time.OffsetDateTime +import java.time.OffsetTime +import java.time.ZoneId +import java.time.ZoneOffset +import java.time.ZonedDateTime +import java.util.concurrent.locks.ReentrantLock +import kotlin.concurrent.withLock +import kotlin.math.ceil +import mu.KotlinLogging + +class MessageLoader( + private val dataProvider: DataProviderService, + private val sessionStartTime: LocalTime? +) { + private var sessionStart: ZonedDateTime + private val searchLock = ReentrantLock() + + init { + val today = LocalDate.now(ZoneOffset.UTC) + val start = sessionStartTime?.atDate(today) + val now = LocalDateTime.now() + if(start == null) { + sessionStart = OffsetDateTime + .now(ZoneOffset.UTC) + .with(LocalTime.now()) + .atZoneSameInstant(ZoneId.systemDefault()) + } else { + sessionStart = if(start.isAfter(now)) { + OffsetDateTime + .now(ZoneOffset.UTC) + .minusDays(1) + .with(sessionStartTime) + .atZoneSameInstant(ZoneId.systemDefault()) + } else { + OffsetDateTime + .now(ZoneOffset.UTC) + .with(sessionStartTime) + .atZoneSameInstant(ZoneId.systemDefault()) + } + } + } + + private var sessionStartTimestamp = sessionStart + .toInstant() + .toTimestamp() + + private var previousDaySessionStart = sessionStart + .minusDays(1) + .toInstant() + .toTimestamp() + + fun updateTime() { + searchLock.withLock { + sessionStart = ZonedDateTime + .now(ZoneOffset.UTC) + .with(OffsetTime.now(ZoneOffset.UTC)) + sessionStartTimestamp = sessionStart + .toInstant() + .toTimestamp() + previousDaySessionStart = sessionStart + .minusDays(1) + .toInstant() + .toTimestamp() + } + } + + fun loadInitialSequences(sessionAlias: String): SequenceHolder { + val serverSeq = ProviderCall.withCancellation { + searchMessage( + dataProvider.searchMessages( + createSearchRequest( + Instant.now().toTimestamp(), + Direction.FIRST, + sessionAlias + ) + ) + ) { _, seqNum -> seqNum?.toInt() ?: 0 } + } + val clientSeq = ProviderCall.withCancellation { + searchMessage( + dataProvider.searchMessages( + createSearchRequest( + Instant.now().toTimestamp(), + Direction.SECOND, + sessionAlias + ) + ), + true + ) { _, seqNum -> seqNum?.toInt() ?: 0 } + } + K_LOGGER.info { "Loaded sequences: client sequence - $clientSeq; server sequence - $serverSeq" } + return SequenceHolder(clientSeq, serverSeq) + } + + fun processMessagesInRange( + direction: Direction, + sessionAlias: String, + fromSequence: Long, + processMessage: (ByteBuf) -> Boolean + ) { + var timestamp: Timestamp? = null + ProviderCall.withCancellation { + val backwardIterator = dataProvider.searchMessages( + createSearchRequest(Instant.now().toTimestamp(), direction, sessionAlias) + ) + + var firstValidMessage = firstValidMessageDetails(backwardIterator) ?: return@withCancellation + + var messagesToSkip = firstValidMessage.payloadSequence - fromSequence + + timestamp = firstValidMessage.timestamp + + while (backwardIterator.hasNext() && messagesToSkip > 0) { + val message = backwardIterator.next().message + if(compare(message.timestamp, previousDaySessionStart) <= 0) { + continue + } + timestamp = message.timestamp + messagesToSkip -= 1 + if(messagesToSkip == 0L) { + + val buf = Unpooled.copiedBuffer(message.bodyRaw.toByteArray()) + val sequence = buf.findField(MSG_SEQ_NUM_TAG)?.value?.toInt() ?: continue + + if(checkPossDup(buf)) { + val validMessage = firstValidMessageDetails(backwardIterator) ?: break + + timestamp = validMessage.timestamp + if(validMessage.payloadSequence <= fromSequence) { + break + } else { + messagesToSkip = validMessage.payloadSequence - fromSequence + } + + } else { + + if(sequence <= fromSequence) { + break + } else { + messagesToSkip = sequence - fromSequence + } + } + } + } + } + + val startSearchTimestamp = timestamp ?: return + + K_LOGGER.info { "Loading retransmission messages from ${startSearchTimestamp.toInstant()}" } + + ProviderCall.withCancellation { + + val iterator = dataProvider.searchMessages( + createSearchRequest( + startSearchTimestamp, + direction, + sessionAlias, + TimeRelation.NEXT, + Instant.now().toTimestamp() + ) + ) + + while (iterator.hasNext()) { + val message = Unpooled.buffer().writeBytes(iterator.next().message.bodyRaw.toByteArray()) + if (!processMessage(message)) break + } + } + } + + private fun searchMessage( + iterator: Iterator, + checkPossFlag: Boolean = false, + extractValue: (MessageGroupResponse?, String?) -> T + ): T { + var message: MessageGroupResponse? + while (iterator.hasNext()) { + message = iterator.next().message + if(sessionStartTime != null && compare(sessionStartTimestamp, message.timestamp) > 0) { + return extractValue(message, null) + } + + val bodyRaw = Unpooled.copiedBuffer(message.bodyRaw.toByteArray()) + val seqNum = bodyRaw.findField(MSG_SEQ_NUM_TAG)?.value ?: continue + + if(checkPossFlag && checkPossDup(bodyRaw)) continue + + return extractValue(message, seqNum) + } + return extractValue(null, null) + } + + private fun firstValidMessageDetails(iterator: Iterator): MessageDetails? = searchMessage( + iterator, + true + ) { message, seqNum -> + if(message == null || seqNum == null) return@searchMessage null + MessageDetails(seqNum.toInt(), message.messageId.sequence, message.timestamp) + } + + private fun createSearchRequest( + timestamp: Timestamp, + direction: Direction, + sessionAlias: String, + searchDirection: TimeRelation = TimeRelation.PREVIOUS, + endTimestamp: Timestamp = previousDaySessionStart + ) = MessageSearchRequest.newBuilder().apply { + startTimestamp = timestamp + this.endTimestamp = endTimestamp + addResponseFormats(BASE64_FORMAT) + addStream( + MessageStream.newBuilder() + .setName(sessionAlias) + .setDirection(direction) + ) + this.searchDirection = searchDirection + }.build() + + private fun checkPossDup(buf: ByteBuf): Boolean = buf.findField(POSS_DUP_TAG)?.value == IS_POSS_DUP + + data class MessageDetails(val payloadSequence: Int, val messageSequence: Long, val timestamp: Timestamp) + + companion object { + val K_LOGGER = KotlinLogging.logger { } + private const val BASE64_FORMAT = "BASE_64" + } +} \ No newline at end of file diff --git a/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/ProviderCall.kt b/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/ProviderCall.kt new file mode 100644 index 0000000..d2906f5 --- /dev/null +++ b/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/ProviderCall.kt @@ -0,0 +1,28 @@ +/******************************************************************************* + * Copyright (c) 2023, Exactpro Systems LLC + * www.exactpro.com + * Build Software to Test Software + * + * All rights reserved. + * This is unpublished, licensed software, confidential and proprietary + * information which is the property of Exactpro Systems LLC or its licensors. + ******************************************************************************/ +package com.exactpro.th2.lme.oe.util + +import io.grpc.Context + +class ProviderCall { + companion object { + fun withCancellation(code: () -> T): T { + return Context.current().withCancellation().use { context -> + val toRestore = context.attach() + val result = try { + code() + } finally { + context.detach(toRestore) + } + return@use result + } + } + } +} \ No newline at end of file diff --git a/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/SequenceLoader.kt b/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/SequenceLoader.kt deleted file mode 100644 index 46ad81d..0000000 --- a/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/SequenceLoader.kt +++ /dev/null @@ -1,123 +0,0 @@ -/* - * Copyright 2023 Exactpro (Exactpro Systems Limited) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.exactpro.th2.conn.dirty.fix - -import com.exactpro.th2.SequenceHolder -import com.exactpro.th2.common.grpc.Direction -import com.exactpro.th2.common.message.toTimestamp -import com.exactpro.th2.constants.Constants.MSG_SEQ_NUM_TAG -import com.exactpro.th2.dataprovider.grpc.DataProviderService -import com.exactpro.th2.dataprovider.grpc.MessageGroupResponse -import com.exactpro.th2.dataprovider.grpc.MessageSearchRequest -import com.exactpro.th2.dataprovider.grpc.MessageSearchResponse -import com.exactpro.th2.dataprovider.grpc.MessageStream -import com.exactpro.th2.dataprovider.grpc.TimeRelation -import com.google.protobuf.Int32Value -import com.google.protobuf.Timestamp -import com.google.protobuf.util.Timestamps -import com.google.protobuf.util.Timestamps.compare -import io.netty.buffer.Unpooled -import java.time.Instant -import java.time.LocalDate -import java.time.LocalDateTime -import java.time.LocalTime -import java.time.OffsetDateTime -import java.time.ZoneId -import java.time.ZoneOffset -import java.time.ZonedDateTime - - -class SequenceLoader( - private val dataProvider: DataProviderService, - private val sessionStartTime: LocalTime?, - private val sessionAlias: String, -) { - private val sessionStart: ZonedDateTime - - init { - val today = LocalDate.now(ZoneOffset.UTC) - val start = sessionStartTime?.atDate(today) - val now = LocalDateTime.now() - if(start == null) { - sessionStart = OffsetDateTime - .now(ZoneOffset.UTC) - .with(LocalTime.now()) - .atZoneSameInstant(ZoneId.systemDefault()) - } else { - sessionStart = if(start.isAfter(now)) { - OffsetDateTime - .now(ZoneOffset.UTC) - .minusDays(1) - .with(sessionStartTime) - .atZoneSameInstant(ZoneId.systemDefault()) - } else { - OffsetDateTime - .now(ZoneOffset.UTC) - .with(sessionStartTime) - .atZoneSameInstant(ZoneId.systemDefault()) - } - } - } - - private val sessionStartDateTime = sessionStart - .toInstant() - .toTimestamp() - - private val sessionStartYesterday = sessionStart - .minusDays(1) - .toInstant() - .toTimestamp() - - fun load(): SequenceHolder { - val serverSeq = searchSeq(createSearchRequest(Instant.now().toTimestamp(), Direction.FIRST)) - val clientSeq = searchSeq(createSearchRequest(Instant.now().toTimestamp(), Direction.SECOND)) - return SequenceHolder(clientSeq, serverSeq) - } - - private fun searchSeq(request: MessageSearchRequest): Int { - var message: MessageGroupResponse? = null - for (response in dataProvider.searchMessages(request)) { - message = response.message - if (sessionStartTime != null && compare(sessionStartDateTime, message.timestamp) > 0) { - return 0 - } - val buffer = Unpooled.wrappedBuffer(message.bodyRaw.asReadOnlyByteBuffer()) - return buffer.findField(MSG_SEQ_NUM_TAG)?.value?.toInt() ?: continue - } - return when (message) { - null -> 0 - else -> searchSeq(createSearchRequest(message.timestamp, message.messageId.direction)) - } - } - - private fun createSearchRequest(timestamp: Timestamp, direction: Direction) = - MessageSearchRequest.newBuilder().apply { - startTimestamp = timestamp - endTimestamp = sessionStartYesterday - searchDirection = TimeRelation.PREVIOUS - addResponseFormats(BASE_64_FORMAT) - addStream( - MessageStream.newBuilder() - .setName(sessionAlias) - .setDirection(direction) - ) - resultCountLimit = Int32Value.of(5) - }.build() - - companion object { - const val BASE_64_FORMAT = "BASE_64" - } -} \ No newline at end of file diff --git a/src/test/java/com/exactpro/th2/FixHandlerTest.java b/src/test/java/com/exactpro/th2/FixHandlerTest.java index 0cd7c5e..919696a 100644 --- a/src/test/java/com/exactpro/th2/FixHandlerTest.java +++ b/src/test/java/com/exactpro/th2/FixHandlerTest.java @@ -20,9 +20,14 @@ import com.exactpro.th2.common.grpc.MessageID; import com.exactpro.th2.conn.dirty.tcp.core.api.IChannel; import com.exactpro.th2.conn.dirty.tcp.core.api.IHandlerContext; +import com.exactpro.th2.dataprovider.grpc.DataProviderService; +import com.exactpro.th2.dataprovider.grpc.MessageGroupResponse; +import com.exactpro.th2.dataprovider.grpc.MessageSearchResponse; import com.exactpro.th2.util.MessageUtil; +import com.google.protobuf.ByteString; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; +import java.nio.charset.Charset; import kotlin.Unit; import org.jetbrains.annotations.NotNull; import org.junit.jupiter.api.AfterAll; @@ -49,18 +54,22 @@ import static com.exactpro.th2.constants.Constants.BODY_LENGTH_TAG; import static com.exactpro.th2.constants.Constants.CHECKSUM_TAG; import static com.exactpro.th2.constants.Constants.DEFAULT_APPL_VER_ID_TAG; -import static com.exactpro.th2.constants.Constants.END_SEQ_NO_TAG; +import static com.exactpro.th2.constants.Constants.GAP_FILL_FLAG_TAG; import static com.exactpro.th2.constants.Constants.MSG_SEQ_NUM_TAG; +import static com.exactpro.th2.constants.Constants.MSG_TYPE_SEQUENCE_RESET; import static com.exactpro.th2.constants.Constants.MSG_TYPE_TAG; import static com.exactpro.th2.constants.Constants.NEW_SEQ_NO_TAG; +import static com.exactpro.th2.constants.Constants.POSS_DUP; +import static com.exactpro.th2.constants.Constants.POSS_DUP_TAG; import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; class FixHandlerTest { - + private static final ByteBuf logonResponse = Unpooled.wrappedBuffer("8=FIXT.1.1\0019=105\00135=A\00134=1\00149=server\00156=client\00150=system\00152=2014-12-22T10:15:30Z\00198=0\001108=30\0011137=9\0011409=0\00110=203\001".getBytes(StandardCharsets.US_ASCII)); private Channel channel; private FixHandler fixHandler; private static ByteBuf buffer; @@ -77,10 +86,9 @@ static void init() { @BeforeEach void beforeEach() { - channel = new Channel(createHandlerSettings()); + channel = new Channel(createHandlerSettings(), null); fixHandler = channel.getFixHandler(); fixHandler.onOpen(channel); - ByteBuf logonResponse = Unpooled.wrappedBuffer("8=FIXT.1.1\0019=105\00135=A\00134=1\00149=server\00156=client\00150=system\00152=2014-12-22T10:15:30Z\00198=0\001108=30\0011137=9\0011409=0\00110=203\001".getBytes(StandardCharsets.US_ASCII)); fixHandler.onIncoming(channel, logonResponse); } @@ -329,7 +337,7 @@ void handleResendRequestTest() { channel.clearQueue(); fixHandler.onIncoming(channel, resendRequest); ByteBuf sequenceReset = channel.getQueue().get(0); - assertEquals("8=FIXT.1.1\u00019=75\u000135=4\u000134=1\u000149=client\u000156=server\u000150=trader\u000152=2014-12-22T10:15:30Z\u0001123=Y\u000136=5\u000110=115\u0001", new String(sequenceReset.array())); + assertEquals("8=FIXT.1.1\u00019=105\u000135=4\u000134=1\u000149=client\u000156=server\u000150=trader\u000152=2014-12-22T10:15:30Z\u0001122=2014-12-22T10:15:30Z\u000143=Y\u0001123=Y\u000136=5\u000110=162\u0001", new String(sequenceReset.array())); channel.clearQueue(); fixHandler.sendResendRequest(2); ByteBuf resendRequestOutgoing = channel.getQueue().get(0); @@ -410,7 +418,6 @@ void updateTagTest() { MessageUtil.updateTag(buf, DEFAULT_APPL_VER_ID_TAG.toString(), "1"); assertEquals(expected2, buf.toString(StandardCharsets.US_ASCII)); } - } class Channel implements IChannel { @@ -418,10 +425,11 @@ class Channel implements IChannel { private final MyFixHandler fixHandler; private final List queue = new ArrayList<>(); - Channel(FixHandlerSettings fixHandlerSettings) { + Channel(FixHandlerSettings fixHandlerSettings, DataProviderService dataProviderService) { this.fixHandlerSettings = fixHandlerSettings; IHandlerContext context = Mockito.mock(IHandlerContext.class); Mockito.when(context.getSettings()).thenReturn(this.fixHandlerSettings); + Mockito.when(context.getGrpcService(DataProviderService.class)).thenReturn(dataProviderService); this.fixHandler = new MyFixHandler(context); } diff --git a/src/test/java/com/exactpro/th2/RecoveryTest.java b/src/test/java/com/exactpro/th2/RecoveryTest.java new file mode 100644 index 0000000..622c157 --- /dev/null +++ b/src/test/java/com/exactpro/th2/RecoveryTest.java @@ -0,0 +1,281 @@ +/* + * Copyright 2023 Exactpro (Exactpro Systems Limited) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.exactpro.th2; + +import com.exactpro.th2.conn.dirty.fix.MessageSearcher; +import com.exactpro.th2.dataprovider.grpc.DataProviderService; +import com.exactpro.th2.dataprovider.grpc.MessageGroupResponse; +import com.exactpro.th2.dataprovider.grpc.MessageSearchRequest; +import com.exactpro.th2.dataprovider.grpc.MessageSearchResponse; +import com.google.protobuf.ByteString; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import static com.exactpro.th2.FixHandlerTest.createHandlerSettings; +import static com.exactpro.th2.conn.dirty.fix.FixByteBufUtilKt.findField; +import static com.exactpro.th2.constants.Constants.MSG_SEQ_NUM_TAG; +import static com.exactpro.th2.constants.Constants.MSG_TYPE_SEQUENCE_RESET; +import static com.exactpro.th2.constants.Constants.MSG_TYPE_TAG; +import static com.exactpro.th2.constants.Constants.NEW_SEQ_NO_TAG; +import static com.exactpro.th2.constants.Constants.POSS_DUP_TAG; +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class RecoveryTest { + + private static final ByteBuf logonResponse = Unpooled.wrappedBuffer("8=FIXT.1.1\0019=105\00135=A\00134=1\00149=server\00156=client\00150=system\00152=2014-12-22T10:15:30Z\00198=0\001108=30\0011137=9\0011409=0\00110=203\001".getBytes(StandardCharsets.US_ASCII)); + private Channel channel; + private FixHandler fixHandler; + + @Test + void testSequenceResetInRange() { + FixHandlerSettings settings = createHandlerSettings(); + settings.setLoadMissedMessagesFromCradle(true); + DataProviderService dataProviderService = Mockito.mock(DataProviderService.class); + MessageSearcher ms = new MessageSearcher( + List.of( + messageSearchResponse(2), + messageSearchResponse(3), + messageSearchResponse(4), + messageSearchResponse(5) + ) + ); + Mockito.when(dataProviderService.searchMessages(Mockito.any())).thenAnswer( + x -> ms.searchMessages(x.getArgumentAt(0, MessageSearchRequest.class)) + ); + channel = new Channel(settings, dataProviderService); + fixHandler = channel.getFixHandler(); + fixHandler.onOpen(channel); + fixHandler.onIncoming(channel, logonResponse); + // requesting resend from 2 to 5 + ByteBuf resendRequest = Unpooled.wrappedBuffer("8=FIXT.1.1\u00019=73\u000135=2\u000134=2\u000149=client\u000156=server\u000150=trader\u000152=2014-12-22T10:15:30Z\u00017=2\u000116=5\u000110=226\u0001".getBytes(StandardCharsets.UTF_8)); + fixHandler.onIncoming(channel, resendRequest); + assertEquals(5, channel.getQueue().size()); + + for(int i = 1; i <= 4; i++) { + ByteBuf buf = channel.getQueue().get(i); + assertEquals(findField(buf, MSG_TYPE_TAG).getValue(), "C"); + assertEquals(Integer.parseInt(findField(buf, MSG_SEQ_NUM_TAG).getValue()), i + 1); + assertEquals(findField(buf, POSS_DUP_TAG).getValue(), "Y"); + } + } + + @Test + void testSequenceResetInsideRange() { + FixHandlerSettings settings = createHandlerSettings(); + settings.setLoadMissedMessagesFromCradle(true); + DataProviderService dataProviderService = Mockito.mock(DataProviderService.class); + MessageSearcher ms = new MessageSearcher( + List.of( + messageSearchResponse(4), + messageSearchResponse(5) + ) + ); + Mockito.when(dataProviderService.searchMessages(Mockito.any())).thenAnswer( + x -> ms.searchMessages(x.getArgumentAt(0, MessageSearchRequest.class)) + ); + channel = new Channel(settings, dataProviderService); + fixHandler = channel.getFixHandler(); + fixHandler.onOpen(channel); + fixHandler.onIncoming(channel, logonResponse); + // handler sequence after loop is 22 + for(int i = 0; i <= 20; i++) { + fixHandler.onOutgoing( + channel, + Unpooled.buffer().writeBytes(messageWithoutSeqNum().getBytes(StandardCharsets.UTF_8)), + new HashMap<>() + ); + } + // requesting resend from 2 to 8 + ByteBuf resendRequest = Unpooled.wrappedBuffer("8=FIXT.1.1\u00019=73\u000135=2\u000134=2\u000149=client\u000156=server\u000150=trader\u000152=2014-12-22T10:15:30Z\u00017=2\u000116=8\u000110=226\u0001".getBytes(StandardCharsets.UTF_8)); + fixHandler.onIncoming(channel, resendRequest); + assertEquals(channel.getQueue().size(), 5); + + // for missed messages after beginSeqNo to 4 + ByteBuf firstSequenceReset = channel.getQueue().get(1); + assertEquals(findField(firstSequenceReset, MSG_TYPE_TAG).getValue(), MSG_TYPE_SEQUENCE_RESET); + assertEquals(Integer.parseInt(findField(firstSequenceReset, MSG_SEQ_NUM_TAG).getValue()), 2); + assertEquals(Integer.parseInt(findField(firstSequenceReset, NEW_SEQ_NO_TAG).getValue()), 4); + + ByteBuf message4 = channel.getQueue().get(2); + + assertEquals(findField(message4, MSG_TYPE_TAG).getValue(), "C"); + assertEquals(Integer.parseInt(findField(message4, MSG_SEQ_NUM_TAG).getValue()), 4); + assertEquals(findField(message4, POSS_DUP_TAG).getValue(), "Y"); + + ByteBuf message5 = channel.getQueue().get(3); + + assertEquals(findField(message5, MSG_TYPE_TAG).getValue(), "C"); + assertEquals(Integer.parseInt(findField(message5, MSG_SEQ_NUM_TAG).getValue()), 5); + assertEquals(findField(message5, POSS_DUP_TAG).getValue(), "Y"); + + // For missed messages after 4 + ByteBuf seqReset2 = channel.getQueue().get(4); + assertEquals(findField(seqReset2, MSG_TYPE_TAG).getValue(), MSG_TYPE_SEQUENCE_RESET); + assertEquals(Integer.parseInt(findField(seqReset2, MSG_SEQ_NUM_TAG).getValue()), 6); + assertEquals(Integer.parseInt(findField(seqReset2, NEW_SEQ_NO_TAG).getValue()), 23); + } + + @Test + void testSequenceResetOutOfRange() { + FixHandlerSettings settings = createHandlerSettings(); + settings.setLoadMissedMessagesFromCradle(true); + DataProviderService dataProviderService = Mockito.mock(DataProviderService.class); + MessageSearcher ms = new MessageSearcher( + List.of( + messageSearchResponse(1), + messageSearchResponse(2), + messageSearchResponse(3), + messageSearchResponse(4), + messageSearchResponse(5), + messageSearchResponse(6) + ) + ); + Mockito.when(dataProviderService.searchMessages(Mockito.any())).thenAnswer( + x -> ms.searchMessages(x.getArgumentAt(0, MessageSearchRequest.class)) + ); + channel = new Channel(settings, dataProviderService); + fixHandler = channel.getFixHandler(); + fixHandler.onOpen(channel); + fixHandler.onIncoming(channel, logonResponse); + // requesting resend from 2 to 5 + ByteBuf resendRequest = Unpooled.wrappedBuffer("8=FIXT.1.1\u00019=73\u000135=2\u000134=2\u000149=client\u000156=server\u000150=trader\u000152=2014-12-22T10:15:30Z\u00017=2\u000116=5\u000110=226\u0001".getBytes(StandardCharsets.UTF_8)); + fixHandler.onIncoming(channel, resendRequest); + assertEquals(5, channel.getQueue().size()); + for(int i = 1; i <= 4; i++) { + ByteBuf buf = channel.getQueue().get(i); + assertEquals(findField(buf, MSG_TYPE_TAG).getValue(), "C"); + assertEquals(Integer.parseInt(findField(buf, MSG_SEQ_NUM_TAG).getValue()), i + 1); + assertEquals(findField(buf, POSS_DUP_TAG).getValue(), "Y"); + } + } + + @Test + void testSequenceResetAdminMessages() { + FixHandlerSettings settings = createHandlerSettings(); + settings.setLoadMissedMessagesFromCradle(true); + DataProviderService dataProviderService = Mockito.mock(DataProviderService.class); + MessageSearcher ms = new MessageSearcher( + List.of( + messageSearchResponseAdmin(2), + messageSearchResponse(4), + messageSearchResponseAdmin(5), + messageSearchResponseAdmin(6) + ) + ); + Mockito.when(dataProviderService.searchMessages(Mockito.any())).thenAnswer( + x -> ms.searchMessages(x.getArgumentAt(0, MessageSearchRequest.class)) + ); + channel = new Channel(settings, dataProviderService); + fixHandler = channel.getFixHandler(); + fixHandler.onOpen(channel); + fixHandler.onIncoming(channel, logonResponse); + // handler sequence after loop is 22 + for(int i = 0; i <= 20; i++) { + fixHandler.onOutgoing( + channel, + Unpooled.buffer().writeBytes(messageWithoutSeqNum().getBytes(StandardCharsets.UTF_8)), + new HashMap<>() + ); + } + // requesting resend from 1 to 5 + ByteBuf resendRequest = Unpooled.wrappedBuffer("8=FIXT.1.1\u00019=73\u000135=2\u000134=2\u000149=client\u000156=server\u000150=trader\u000152=2014-12-22T10:15:30Z\u00017=1\u000116=5\u000110=226\u0001".getBytes(StandardCharsets.UTF_8)); + fixHandler.onIncoming(channel, resendRequest); + + // sequence reset for meesages from 1 to 3 ( 1, 2 - missing, 3 - admin ) + ByteBuf seqReset1 = channel.getQueue().get(1); + assertEquals(findField(seqReset1, MSG_TYPE_TAG).getValue(), MSG_TYPE_SEQUENCE_RESET); + assertEquals(Integer.parseInt(findField(seqReset1, MSG_SEQ_NUM_TAG).getValue()), 1); + assertEquals(Integer.parseInt(findField(seqReset1, NEW_SEQ_NO_TAG).getValue()), 4); + + ByteBuf message = channel.getQueue().get(2); + assertEquals(findField(message, MSG_TYPE_TAG).getValue(), "C"); + assertEquals(Integer.parseInt(findField(message, MSG_SEQ_NUM_TAG).getValue()), 4); + assertEquals(findField(message, POSS_DUP_TAG).getValue(), "Y"); + + // sequence reset for meesages from 1 to 3 ( 1, 2 - missing, 3 - admin ) + ByteBuf seqReset2 = channel.getQueue().get(3); + assertEquals(findField(seqReset2, MSG_TYPE_TAG).getValue(), MSG_TYPE_SEQUENCE_RESET); + assertEquals(Integer.parseInt(findField(seqReset2, MSG_SEQ_NUM_TAG).getValue()), 5); + assertEquals(Integer.parseInt(findField(seqReset2, NEW_SEQ_NO_TAG).getValue()), 23); + + } + + @Test + void allMessagesMissed() { + FixHandlerSettings settings = createHandlerSettings(); + settings.setLoadMissedMessagesFromCradle(true); + DataProviderService dataProviderService = Mockito.mock(DataProviderService.class); + Mockito.when(dataProviderService.searchMessages(Mockito.any())).thenReturn( + new ArrayList().iterator() + ); + channel = new Channel(settings, dataProviderService); + fixHandler = channel.getFixHandler(); + fixHandler.onOpen(channel); + fixHandler.onIncoming(channel, logonResponse); + // handler sequence after loop is 22 + for(int i = 0; i <= 20; i++) { + fixHandler.onOutgoing( + channel, + Unpooled.buffer().writeBytes(messageWithoutSeqNum().getBytes(StandardCharsets.UTF_8)), + new HashMap<>() + ); + } + // requesting resend from 1 to 5 + ByteBuf resendRequest = Unpooled.wrappedBuffer("8=FIXT.1.1\u00019=73\u000135=2\u000134=2\u000149=client\u000156=server\u000150=trader\u000152=2014-12-22T10:15:30Z\u00017=1\u000116=5\u000110=226\u0001".getBytes(StandardCharsets.UTF_8)); + fixHandler.onIncoming(channel, resendRequest); + + // sequence reset for meesages from 1 to 3 ( 1, 2 - missing, 3 - admin ) + ByteBuf seqReset = channel.getQueue().get(1); + assertEquals(findField(seqReset, MSG_TYPE_TAG).getValue(), MSG_TYPE_SEQUENCE_RESET); + assertEquals(Integer.parseInt(findField(seqReset, MSG_SEQ_NUM_TAG).getValue()), 1); + assertEquals(Integer.parseInt(findField(seqReset, NEW_SEQ_NO_TAG).getValue()), 23); + } + + private MessageSearchResponse messageSearchResponse(Integer sequence) { + return MessageSearchResponse.newBuilder() + .setMessage( + MessageGroupResponse.newBuilder() + .setBodyRaw(ByteString.copyFromUtf8(message(sequence))) + ).build(); + } + + private MessageSearchResponse messageSearchResponseAdmin(Integer sequence) { + return MessageSearchResponse.newBuilder() + .setMessage( + MessageGroupResponse.newBuilder() + .setBodyRaw(ByteString.copyFromUtf8(adminMessage(sequence))) + ).build(); + } + + private String message(Integer sequence) { + return String.format("8=FIXT.1.1\u00019=70\u000135=C\u0001552=1\u000149=client\u000134=%d\u000156=server\u000152=2014-12-22T10:15:30Z\u000150=trader\u000110=132\u0001", sequence); + } + + private String messageWithoutSeqNum() { + return String.format("8=FIXT.1.1\u00019=70\u000135=C\u0001552=1\u000149=client\u000156=server\u000152=2014-12-22T10:15:30Z\u000150=trader\u000110=132\u0001"); + } + + private String adminMessage(Integer sequence) { + return String.format("8=FIXT.1.1\u00019=70\u000135=4\u0001552=1\u000149=client\u000134=%d\u000156=server\u000152=2014-12-22T10:15:30Z\u000150=trader\u000110=132\u0001", sequence); + } +} diff --git a/src/test/kotlin/com/exactpro/th2/conn/dirty/fix/MessageSearcher.kt b/src/test/kotlin/com/exactpro/th2/conn/dirty/fix/MessageSearcher.kt new file mode 100644 index 0000000..a82a073 --- /dev/null +++ b/src/test/kotlin/com/exactpro/th2/conn/dirty/fix/MessageSearcher.kt @@ -0,0 +1,41 @@ +/* + * Copyright 2022-2023 Exactpro (Exactpro Systems Limited) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.exactpro.th2.conn.dirty.fix + +import com.exactpro.th2.dataprovider.grpc.MessageSearchRequest +import com.exactpro.th2.dataprovider.grpc.MessageSearchResponse +import com.exactpro.th2.dataprovider.grpc.TimeRelation +import com.google.protobuf.util.Timestamps + +class MessageSearcher(private val messages: List) { + + fun searchMessages(request: MessageSearchRequest): Iterator { + val limit = request.resultCountLimit.value + val startTimestamp = request.startTimestamp + val searchDirection = request.searchDirection + + var filteredMessages = if (searchDirection == TimeRelation.NEXT) { + messages.filter { + Timestamps.compare(it.message.timestamp, startTimestamp) >= 0 } + } else { + messages.filter { + Timestamps.compare(it.message.timestamp, startTimestamp) <= 0 + }.reversed() + } + + return filteredMessages.iterator() + } +} \ No newline at end of file