From 89e07f4826e5704004d087cef5f7cfdcdc4ad9a6 Mon Sep 17 00:00:00 2001 From: isengrims <104489572+isengrims@users.noreply.github.com> Date: Wed, 31 May 2023 19:39:09 +0400 Subject: [PATCH 1/9] [TS-1306-v2] Handle server start after shutdown without sequence reset (#44) --- .github/workflows/dev-docker-publish.yml | 1 + README.md | 10 ++- gradle.properties | 2 +- .../java/com/exactpro/th2/FixHandler.java | 68 ++++++++++++------- .../com/exactpro/th2/FixHandlerSettings.java | 9 +++ .../th2/conn/dirty/fix/SequenceLoader.kt | 4 +- 6 files changed, 64 insertions(+), 30 deletions(-) diff --git a/.github/workflows/dev-docker-publish.yml b/.github/workflows/dev-docker-publish.yml index 430474c..b8551de 100644 --- a/.github/workflows/dev-docker-publish.yml +++ b/.github/workflows/dev-docker-publish.yml @@ -10,6 +10,7 @@ on: paths-ignore: - README.md + jobs: build-job: uses: th2-net/.github/.github/workflows/compound-java-dev.yml@main diff --git a/README.md b/README.md index fb91c74..21d1708 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# th2-conn-dirty-fix (1.0.2) +# th2-conn-dirty-fix (1.1.0) This microservice allows sending and receiving messages via FIX protocol @@ -47,6 +47,7 @@ This microservice allows sending and receiving messages via FIX protocol + *sendingDateTimeFormat* - `SendingTime` field format for outgoing messages. (`nullable`, `default format` in this case is `"yyyyMMdd-HH:mm:ss.SSSSSSSSS"`) + *useNextExpectedSeqNum* - session management based on next expected sequence number. (`false` by default) + *saveAdminMessages* - defines if admin messages will be saved to internal outgoing buffer. (`false` by default) ++ *resetStateOnServerReset* - whether to reset the server sequence after receiving logout with text `Next Expected MSN too high, MSN to be sent is x but received y`. ### Security settings @@ -326,6 +327,9 @@ spec: # Changelog +## 1.1.0 +* state reset option on server update. + ## 1.0.2 * dev releases * apply changes from version-0 @@ -336,6 +340,10 @@ spec: ## 1.0.0 * Bump `conn-dirty-tcp-core` to `3.0.0` for books and pages support + +## 0.2.0 +* optional state reset on silent server reset. + ## 0.1.1 * correct sequence numbers increments. * update conn-dirty-tcp-core to `2.3.0` diff --git a/gradle.properties b/gradle.properties index c83ee73..6a3f359 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,2 +1,2 @@ -release_version=1.0.2 +release_version=1.1.0 jackson_version=2.11.2 diff --git a/src/main/java/com/exactpro/th2/FixHandler.java b/src/main/java/com/exactpro/th2/FixHandler.java index 547fc9d..03413fa 100644 --- a/src/main/java/com/exactpro/th2/FixHandler.java +++ b/src/main/java/com/exactpro/th2/FixHandler.java @@ -321,6 +321,13 @@ public Map onIncoming(@NotNull IChannel channel, @NotNull ByteBu isDup = possDup.getValue().equals(IS_POSS_DUP); } + String msgTypeValue = requireNonNull(msgType.getValue()); + if(msgTypeValue.equals(MSG_TYPE_LOGOUT)) { + serverMsgSeqNum.incrementAndGet(); + handleLogout(message); + return metadata; + } + int receivedMsgSeqNum = Integer.parseInt(requireNonNull(msgSeqNumValue.getValue())); if(receivedMsgSeqNum < serverMsgSeqNum.get() && !isDup) { @@ -337,7 +344,7 @@ public Map onIncoming(@NotNull IChannel channel, @NotNull ByteBu sendResendRequest(serverMsgSeqNum.get(), receivedMsgSeqNum); } - String msgTypeValue = requireNonNull(msgType.getValue()); + switch (msgTypeValue) { case MSG_TYPE_HEARTBEAT: if (LOGGER.isInfoEnabled()) LOGGER.info("Heartbeat received - {}", message.toString(US_ASCII)); @@ -386,31 +393,7 @@ public Map onIncoming(@NotNull IChannel channel, @NotNull ByteBu } break; case MSG_TYPE_LOGOUT: //extract logout reason - if (LOGGER.isInfoEnabled()) LOGGER.info("Logout received - {}", message.toString(US_ASCII)); - FixField sessionStatus = findField(message, SESSION_STATUS_TAG); - boolean isSequenceChanged = false; - if(sessionStatus != null) { - int statusCode = Integer.parseInt(Objects.requireNonNull(sessionStatus.getValue())); - if(statusCode != SUCCESSFUL_LOGOUT_CODE) { - FixField text = findField(message, TEXT_TAG); - if (text != null) { - LOGGER.warn("Received Logout has text (58) tag: {}", text.getValue()); - String value = StringUtils.substringBetween(text.getValue(), "expecting ", " but received"); - if (value != null) { - msgSeqNum.set(Integer.parseInt(value) - 1); - isSequenceChanged = true; - } - } - } - } - if(!enabled.get() && !isSequenceChanged) { - msgSeqNum.incrementAndGet(); - } - - cancelFuture(heartbeatTimer); - cancelFuture(testRequestTimer); - enabled.set(false); - context.send(CommonUtil.toEvent("logout for sender - " + settings.getSenderCompID()));//make more useful + handleLogout(message); break; case MSG_TYPE_RESEND_REQUEST: if (LOGGER.isInfoEnabled()) LOGGER.info("Resend request received - {}", message.toString(US_ASCII)); @@ -429,6 +412,39 @@ public Map onIncoming(@NotNull IChannel channel, @NotNull ByteBu return metadata; } + private void handleLogout(@NotNull ByteBuf message) { + if (LOGGER.isInfoEnabled()) LOGGER.info("Logout received - {}", message.toString(US_ASCII)); + FixField sessionStatus = findField(message, SESSION_STATUS_TAG); + boolean isSequenceChanged = false; + if(sessionStatus != null) { + int statusCode = Integer.parseInt(Objects.requireNonNull(sessionStatus.getValue())); + if(statusCode != SUCCESSFUL_LOGOUT_CODE) { + FixField text = findField(message, TEXT_TAG); + if (text != null) { + LOGGER.warn("Received Logout has text (58) tag: {}", text.getValue()); + String wrongClientSequence = StringUtils.substringBetween(text.getValue(), "expecting ", " but received"); + if (wrongClientSequence != null) { + msgSeqNum.set(Integer.parseInt(wrongClientSequence) - 1); + isSequenceChanged = true; + } + String wrongClientNextExpectedSequence = StringUtils.substringBetween(text.getValue(), "MSN to be sent is ", " but received"); + if(wrongClientNextExpectedSequence != null && settings.getResetStateOnServerReset()) { + serverMsgSeqNum.set(Integer.parseInt(wrongClientNextExpectedSequence)); + } + } + } + } + + if(!enabled.get() && !isSequenceChanged) { + msgSeqNum.incrementAndGet(); + } + + cancelFuture(heartbeatTimer); + cancelFuture(testRequestTimer); + enabled.set(false); + context.send(CommonUtil.toEvent("logout for sender - " + settings.getSenderCompID()));//make more useful + } + private void resetSequence(ByteBuf message) { FixField gapFillMode = findField(message, GAP_FILL_FLAG_TAG); FixField seqNumValue = findField(message, NEW_SEQ_NO_TAG); diff --git a/src/main/java/com/exactpro/th2/FixHandlerSettings.java b/src/main/java/com/exactpro/th2/FixHandlerSettings.java index ef6c4dc..c2563d4 100644 --- a/src/main/java/com/exactpro/th2/FixHandlerSettings.java +++ b/src/main/java/com/exactpro/th2/FixHandlerSettings.java @@ -55,6 +55,7 @@ public class FixHandlerSettings implements IHandlerSettings { private Boolean useNextExpectedSeqNum = false; private Boolean saveAdminMessages = false; private Boolean loadSequencesFromCradle = false; + private Boolean resetStateOnServerReset = false; @JsonDeserialize(using = LocalTimeDeserializer.class) private LocalTime sessionStartTime; @@ -233,6 +234,14 @@ public void setLoadSequencesFromCradle(Boolean loadSequencesFromCradle) { this.loadSequencesFromCradle = loadSequencesFromCradle; } + public Boolean getResetStateOnServerReset() { + return resetStateOnServerReset; + } + + public void setResetStateOnServerReset(Boolean resetStateOnServerReset) { + this.resetStateOnServerReset = resetStateOnServerReset; + } + public Boolean useNextExpectedSeqNum() { return useNextExpectedSeqNum; } diff --git a/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/SequenceLoader.kt b/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/SequenceLoader.kt index c69f325..27f9dc0 100644 --- a/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/SequenceLoader.kt +++ b/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/SequenceLoader.kt @@ -65,7 +65,7 @@ class SequenceLoader( var message: MessageGroupResponse? = null for (response in dataProvider.searchMessages(request)) { message = response.message - if (sessionStartTime != null && compare(sessionStartDateTime, message.timestamp) > 0) { + if (sessionStartTime != null && compare(sessionStartDateTime, message.messageId.timestamp) > 0) { return 0 } val buffer = Unpooled.wrappedBuffer(message.bodyRaw.asReadOnlyByteBuffer()) @@ -73,7 +73,7 @@ class SequenceLoader( } return when (message) { null -> 0 - else -> searchSeq(createSearchRequest(message.timestamp, message.messageId.direction)) + else -> searchSeq(createSearchRequest(message.messageId.timestamp, message.messageId.direction)) } } From 34bf5bd90b72f4f941281e65e25e1bfab21d745b Mon Sep 17 00:00:00 2001 From: isengrims <104489572+isengrims@users.noreply.github.com> Date: Fri, 9 Jun 2023 17:43:13 +0400 Subject: [PATCH 2/9] Scheduling fix (#47) --- README.md | 7 ++-- gradle.properties | 2 +- .../java/com/exactpro/th2/FixHandler.java | 13 ++++++-- .../th2/conn/dirty/fix/SequenceLoader.kt | 33 ++++++++++++++++--- 4 files changed, 45 insertions(+), 10 deletions(-) diff --git a/README.md b/README.md index 21d1708..14f5eb3 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# th2-conn-dirty-fix (1.1.0) +# th2-conn-dirty-fix (1.1.1) This microservice allows sending and receiving messages via FIX protocol @@ -327,6 +327,9 @@ spec: # Changelog +## 1.1.1 +* fix scheduling: hasn't worked for some ranges. + ## 1.1.0 * state reset option on server update. @@ -339,8 +342,6 @@ spec: ## 1.0.0 -* Bump `conn-dirty-tcp-core` to `3.0.0` for books and pages support - ## 0.2.0 * optional state reset on silent server reset. diff --git a/gradle.properties b/gradle.properties index 6a3f359..fa5af25 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,2 +1,2 @@ -release_version=1.1.0 +release_version=1.1.1 jackson_version=2.11.2 diff --git a/src/main/java/com/exactpro/th2/FixHandler.java b/src/main/java/com/exactpro/th2/FixHandler.java index 03413fa..29bcad1 100644 --- a/src/main/java/com/exactpro/th2/FixHandler.java +++ b/src/main/java/com/exactpro/th2/FixHandler.java @@ -32,6 +32,7 @@ import java.net.InetSocketAddress; import java.nio.charset.StandardCharsets; import java.nio.file.Paths; +import java.time.LocalDate; import java.time.LocalDateTime; import java.time.LocalTime; import java.time.ZoneOffset; @@ -181,8 +182,6 @@ public FixHandler(IHandlerContext context) { if(scheduleTime.isBefore(now)) { scheduleTime = now.plusDays(1).with(resetTime); - } else if(now.isBefore(now.with(settings.getSessionStartTime()))) { - sessionActive.set(false); } long time = now.until(scheduleTime, ChronoUnit.SECONDS); @@ -192,6 +191,16 @@ public FixHandler(IHandlerContext context) { channel.close(); sessionActive.set(false); }, time, DAY_SECONDS, TimeUnit.SECONDS); + + LocalDate today = LocalDate.now(ZoneOffset.UTC); + + LocalDateTime start = settings.getSessionStartTime().atDate(today); + LocalDateTime end = settings.getSessionEndTime().atDate(today); + + LocalDateTime nowDateTime = LocalDateTime.now(ZoneOffset.UTC); + if(nowDateTime.isAfter(end) && nowDateTime.isBefore(start)) { + sessionActive.set(false); + } } String host = settings.getHost(); diff --git a/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/SequenceLoader.kt b/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/SequenceLoader.kt index 27f9dc0..eb33ed5 100644 --- a/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/SequenceLoader.kt +++ b/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/SequenceLoader.kt @@ -29,10 +29,13 @@ import com.google.protobuf.Timestamp import com.google.protobuf.util.Timestamps.compare import io.netty.buffer.Unpooled import java.time.Instant +import java.time.LocalDate +import java.time.LocalDateTime import java.time.LocalTime import java.time.OffsetDateTime import java.time.ZoneId import java.time.ZoneOffset +import java.time.ZonedDateTime class SequenceLoader( @@ -41,10 +44,32 @@ class SequenceLoader( private val sessionAlias: String, private val bookName: String ) { - private val sessionStart = OffsetDateTime - .now(ZoneOffset.UTC) - .with(sessionStartTime ?: LocalTime.now()) - .atZoneSameInstant(ZoneId.systemDefault()); + private val sessionStart: ZonedDateTime + + init { + val today = LocalDate.now(ZoneOffset.UTC) + val start = sessionStartTime?.atDate(today) + val now = LocalDateTime.now() + if(start == null) { + sessionStart = OffsetDateTime + .now(ZoneOffset.UTC) + .with(LocalTime.now()) + .atZoneSameInstant(ZoneId.systemDefault()) + } else { + sessionStart = if(start.isAfter(now)) { + OffsetDateTime + .now(ZoneOffset.UTC) + .minusDays(1) + .with(sessionStartTime) + .atZoneSameInstant(ZoneId.systemDefault()) + } else { + OffsetDateTime + .now(ZoneOffset.UTC) + .with(sessionStartTime) + .atZoneSameInstant(ZoneId.systemDefault()) + } + } + } private val sessionStartDateTime = sessionStart .toInstant() From 6ef6a5750ddbe3567bdd6842c4e036a878b6f4b6 Mon Sep 17 00:00:00 2001 From: isengrims <104489572+isengrims@users.noreply.github.com> Date: Wed, 28 Jun 2023 14:38:59 +0400 Subject: [PATCH 3/9] [TS-1290-v2] Recovery from cradle. (#45) --- .github/workflows/docker-publish.yml | 1 - README.md | 10 +- gradle.properties | 3 +- .../java/com/exactpro/th2/FixHandler.java | 194 ++++++++++-- .../com/exactpro/th2/FixHandlerSettings.java | 9 + .../com/exactpro/th2/constants/Constants.java | 4 +- .../th2/conn/dirty/fix/MessageLoader.kt | 275 +++++++++++++++++ .../th2/conn/dirty/fix/ProviderCall.kt | 28 ++ .../th2/conn/dirty/fix/SequenceLoader.kt | 123 -------- .../java/com/exactpro/th2/FixHandlerTest.java | 29 +- .../java/com/exactpro/th2/RecoveryTest.java | 281 ++++++++++++++++++ .../th2/conn/dirty/fix/MessageSearcher.kt | 41 +++ 12 files changed, 823 insertions(+), 175 deletions(-) create mode 100644 src/main/kotlin/com/exactpro/th2/conn/dirty/fix/MessageLoader.kt create mode 100644 src/main/kotlin/com/exactpro/th2/conn/dirty/fix/ProviderCall.kt delete mode 100644 src/main/kotlin/com/exactpro/th2/conn/dirty/fix/SequenceLoader.kt create mode 100644 src/test/java/com/exactpro/th2/RecoveryTest.java create mode 100644 src/test/kotlin/com/exactpro/th2/conn/dirty/fix/MessageSearcher.kt diff --git a/.github/workflows/docker-publish.yml b/.github/workflows/docker-publish.yml index 2cc6e05..cec50a6 100644 --- a/.github/workflows/docker-publish.yml +++ b/.github/workflows/docker-publish.yml @@ -5,7 +5,6 @@ on: branches: - master - version-* - - dev-version-* paths: - gradle.properties # - package_info.json diff --git a/README.md b/README.md index 14f5eb3..341df80 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# th2-conn-dirty-fix (1.1.1) +# th2-conn-dirty-fix (1.2.0) This microservice allows sending and receiving messages via FIX protocol @@ -42,6 +42,7 @@ This microservice allows sending and receiving messages via FIX protocol + *resetSeqNumFlag* - resetting sequence number in initial Logon message (when conn started) + *resetOnLogon* - resetting the sequence number in Logon in other cases (e.g. disconnect) + *loadSequencesFromCradle* - defines if sequences will be loaded from cradle to use them in logon message. ++ *loadMissedMessagesFromCradle* - defines how retransmission will be handled. If true, then requested through `ResendRequest` messages (or messages requested on Logon with `NextExpectedSeqNum`) will be loaded from cradle. + *sessionStartTime* - UTC time when session starts. (`nullable`) + *sessionEndTime* - UTC time when session ends. required if startSessionTime is filled. + *sendingDateTimeFormat* - `SendingTime` field format for outgoing messages. (`nullable`, `default format` in this case is `"yyyyMMdd-HH:mm:ss.SSSSSSSSS"`) @@ -327,6 +328,9 @@ spec: # Changelog +## 1.2.0 +* loading requested messages from cradle. + ## 1.1.1 * fix scheduling: hasn't worked for some ranges. @@ -341,6 +345,10 @@ spec: * Add bookId to lw data provider query ## 1.0.0 +* Bump `conn-dirty-tcp-core` to `3.0.0` for books and pages support + +## 0.3.0 +* Ability to recover messages from cradle. ## 0.2.0 * optional state reset on silent server reset. diff --git a/gradle.properties b/gradle.properties index fa5af25..994869d 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,2 +1,3 @@ -release_version=1.1.1 +release_version=1.2.0 jackson_version=2.11.2 + diff --git a/src/main/java/com/exactpro/th2/FixHandler.java b/src/main/java/com/exactpro/th2/FixHandler.java index 29bcad1..24765d2 100644 --- a/src/main/java/com/exactpro/th2/FixHandler.java +++ b/src/main/java/com/exactpro/th2/FixHandler.java @@ -17,10 +17,11 @@ package com.exactpro.th2; import com.exactpro.th2.common.event.Event; +import com.exactpro.th2.common.grpc.Direction; import com.exactpro.th2.common.grpc.MessageID; import com.exactpro.th2.common.grpc.RawMessage; import com.exactpro.th2.conn.dirty.fix.FixField; -import com.exactpro.th2.conn.dirty.fix.SequenceLoader; +import com.exactpro.th2.conn.dirty.fix.MessageLoader; import com.exactpro.th2.conn.dirty.tcp.core.api.IChannel; import com.exactpro.th2.conn.dirty.tcp.core.api.IChannel.SendMode; import com.exactpro.th2.conn.dirty.tcp.core.api.IHandler; @@ -51,6 +52,8 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReentrantLock; +import kotlin.jvm.functions.Function1; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.exception.ExceptionUtils; import org.jetbrains.annotations.NotNull; @@ -66,6 +69,7 @@ import static com.exactpro.th2.conn.dirty.fix.KeyFileType.Companion.OperationMode.ENCRYPT_MODE; import static com.exactpro.th2.conn.dirty.tcp.core.util.CommonUtil.getEventId; import static com.exactpro.th2.conn.dirty.tcp.core.util.CommonUtil.toByteBuf; +import static com.exactpro.th2.constants.Constants.ADMIN_MESSAGES; import static com.exactpro.th2.constants.Constants.BEGIN_SEQ_NO; import static com.exactpro.th2.constants.Constants.BEGIN_SEQ_NO_TAG; import static com.exactpro.th2.constants.Constants.BEGIN_STRING_TAG; @@ -98,7 +102,10 @@ import static com.exactpro.th2.constants.Constants.NEW_SEQ_NO_TAG; import static com.exactpro.th2.constants.Constants.NEXT_EXPECTED_SEQ_NUM; import static com.exactpro.th2.constants.Constants.NEXT_EXPECTED_SEQ_NUMBER_TAG; +import static com.exactpro.th2.constants.Constants.ORIG_SENDING_TIME; +import static com.exactpro.th2.constants.Constants.ORIG_SENDING_TIME_TAG; import static com.exactpro.th2.constants.Constants.PASSWORD; +import static com.exactpro.th2.constants.Constants.POSS_DUP; import static com.exactpro.th2.constants.Constants.POSS_DUP_TAG; import static com.exactpro.th2.constants.Constants.RESET_SEQ_NUM; import static com.exactpro.th2.constants.Constants.SENDER_COMP_ID; @@ -145,7 +152,8 @@ public class FixHandler implements AutoCloseable, IHandler { private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1); private final IHandlerContext context; private final InetSocketAddress address; - private final DataProviderService dataProvider; + private final MessageLoader messageLoader; + private final ReentrantLock recoveryLock = new ReentrantLock(); private AtomicReference> heartbeatTimer = new AtomicReference<>(CompletableFuture.completedFuture(null)); private AtomicReference> testRequestTimer = new AtomicReference<>(CompletableFuture.completedFuture(null)); @@ -156,10 +164,14 @@ public class FixHandler implements AutoCloseable, IHandler { public FixHandler(IHandlerContext context) { this.context = context; this.settings = (FixHandlerSettings) context.getSettings(); - if(settings.isLoadSequencesFromCradle()) { - this.dataProvider = context.getGrpcService(DataProviderService.class); + if(settings.isLoadSequencesFromCradle() || settings.isLoadMissedMessagesFromCradle()) { + this.messageLoader = new MessageLoader( + context.getGrpcService(DataProviderService.class), + settings.getSessionStartTime(), + context.getBookName() + ); } else { - this.dataProvider = null; + this.messageLoader = null; } if(settings.getSessionStartTime() != null) { @@ -221,13 +233,7 @@ public FixHandler(IHandlerContext context) { public void onStart() { channel = context.createChannel(address, settings.getSecurity(), Map.of(), true, settings.getReconnectDelay() * 1000L, Integer.MAX_VALUE); if(settings.isLoadSequencesFromCradle()) { - SequenceLoader seqLoader = new SequenceLoader( - dataProvider, - settings.getSessionStartTime(), - channel.getSessionAlias(), - context.getBookName() - ); - SequenceHolder sequences = seqLoader.load(); + SequenceHolder sequences = messageLoader.loadInitialSequences(channel.getSessionAlias()); LOGGER.info("Loaded sequences are: client - {}, server - {}", sequences.getClientSeq(), sequences.getServerSeq()); msgSeqNum.set(sequences.getClientSeq()); serverMsgSeqNum.set(sequences.getServerSeq()); @@ -259,7 +265,14 @@ public CompletableFuture send(@NotNull RawMessage rawMessage) { } } - return channel.send(toByteBuf(rawMessage.getBody()), rawMessage.getMetadata().getPropertiesMap(), getEventId(rawMessage), SendMode.HANDLE_AND_MANGLE); + CompletableFuture result = CompletableFuture.completedFuture(null); + try { + recoveryLock.lock(); + result = channel.send(toByteBuf(rawMessage.getBody()), rawMessage.getMetadata().getPropertiesMap(), getEventId(rawMessage), SendMode.HANDLE_AND_MANGLE); + } finally { + recoveryLock.unlock(); + } + return result; } @Override @@ -473,12 +486,13 @@ private void reset() { msgSeqNum.set(0); serverMsgSeqNum.set(0); sessionActive.set(true); + messageLoader.updateTime(); channel.open(); } public void sendResendRequest(int beginSeqNo, int endSeqNo) { //do private StringBuilder resendRequest = new StringBuilder(); - setHeader(resendRequest, MSG_TYPE_RESEND_REQUEST, msgSeqNum.incrementAndGet()); + setHeader(resendRequest, MSG_TYPE_RESEND_REQUEST, msgSeqNum.incrementAndGet(), null); resendRequest.append(BEGIN_SEQ_NO).append(beginSeqNo).append(SOH); resendRequest.append(END_SEQ_NO).append(endSeqNo).append(SOH); setChecksumAndBodyLength(resendRequest); @@ -488,7 +502,7 @@ public void sendResendRequest(int beginSeqNo, int endSeqNo) { //do private void sendResendRequest(int beginSeqNo) { //do private StringBuilder resendRequest = new StringBuilder(); - setHeader(resendRequest, MSG_TYPE_RESEND_REQUEST, msgSeqNum.incrementAndGet()); + setHeader(resendRequest, MSG_TYPE_RESEND_REQUEST, msgSeqNum.incrementAndGet(), null); resendRequest.append(BEGIN_SEQ_NO).append(beginSeqNo); resendRequest.append(END_SEQ_NO).append(0); setChecksumAndBodyLength(resendRequest); @@ -520,24 +534,96 @@ private void handleResendRequest(ByteBuf message) { } private void recovery(int beginSeqNo, int endSeqNo) { - if (endSeqNo == 0) { - endSeqNo = msgSeqNum.get() + 1; - } - LOGGER.info("Returning messages from {} to {}", beginSeqNo, endSeqNo); + AtomicInteger lastProcessedSequence = new AtomicInteger(beginSeqNo - 1); + try { + recoveryLock.lock(); - StringBuilder sequenceReset = new StringBuilder(); - setHeader(sequenceReset, MSG_TYPE_SEQUENCE_RESET, beginSeqNo); - sequenceReset.append(GAP_FILL_FLAG).append("Y"); - sequenceReset.append(NEW_SEQ_NO).append(endSeqNo); - setChecksumAndBodyLength(sequenceReset); + if (endSeqNo == 0) { + endSeqNo = msgSeqNum.get() + 1; + } - channel.send(Unpooled.wrappedBuffer(sequenceReset.toString().getBytes(StandardCharsets.UTF_8)), Collections.emptyMap(), null, SendMode.MANGLE); - resetHeartbeatTask(); + int endSeq = endSeqNo; + LOGGER.info("Loading messages from {} to {}", beginSeqNo, endSeqNo); + if(settings.isLoadMissedMessagesFromCradle()) { + Function1 processMessage = (buf) -> { + FixField seqNum = findField(buf, MSG_SEQ_NUM_TAG); + FixField msgTypeField = findField(buf, MSG_TYPE_TAG); + if(seqNum == null || seqNum.getValue() == null + || msgTypeField == null || msgTypeField.getValue() == null) { + return true; + } + Integer sequence = Integer.parseInt(seqNum.getValue()); + String msgType = msgTypeField.getValue(); + + if(sequence < beginSeqNo) return true; + if(sequence > endSeq) return false; + + if(ADMIN_MESSAGES.contains(msgType)) return true; + FixField possDup = findField(buf, POSS_DUP_TAG); + if(possDup != null && possDup.getValue() == IS_POSS_DUP) return true; + + if(sequence - 1 != lastProcessedSequence.get() ) { + int newSeqNo = sequence; + StringBuilder sequenceReset = + createSequenceReset(Math.max(beginSeqNo, lastProcessedSequence.get() + 1), newSeqNo); + channel.send(Unpooled.wrappedBuffer(sequenceReset.toString().getBytes(StandardCharsets.UTF_8)), Collections.emptyMap(), null, SendMode.MANGLE); + resetHeartbeatTask(); + } + + setTime(buf); + setPossDup(buf); + updateLength(buf); + updateChecksum(buf); + channel.send(buf, Collections.emptyMap(), null, SendMode.MANGLE); + + resetHeartbeatTask(); + + lastProcessedSequence.set(sequence); + return true; + }; + + messageLoader.processMessagesInRange( + Direction.SECOND, + channel.getSessionAlias(), + beginSeqNo, + processMessage + ); + + if(lastProcessedSequence.get() < endSeq) { + String seqReset = createSequenceReset(Math.max(lastProcessedSequence.get() + 1, beginSeqNo), msgSeqNum.get() + 1).toString(); + channel.send( + Unpooled.wrappedBuffer(seqReset.getBytes(StandardCharsets.UTF_8)), + Collections.emptyMap(), null, SendMode.MANGLE + ); + } + } else { + String seqReset = + createSequenceReset(beginSeqNo, msgSeqNum.get() + 1).toString(); + channel.send( + Unpooled.wrappedBuffer(seqReset.getBytes(StandardCharsets.UTF_8)), + Collections.emptyMap(), null, SendMode.MANGLE + ); + } + resetHeartbeatTask(); + + } catch (Exception e) { + LOGGER.error("Error while loading messages for recovery", e); + String seqReset = + createSequenceReset(Math.max(beginSeqNo, lastProcessedSequence.get() + 1), msgSeqNum.get() + 1).toString(); + channel.send( + Unpooled.buffer().writeBytes(seqReset.getBytes(StandardCharsets.UTF_8)), + Collections.emptyMap(), null, SendMode.MANGLE + ); + } finally { + recoveryLock.unlock(); + } } private void sendSequenceReset() { StringBuilder sequenceReset = new StringBuilder(); - setHeader(sequenceReset, MSG_TYPE_SEQUENCE_RESET, msgSeqNum.incrementAndGet()); + String time = getTime(); + setHeader(sequenceReset, MSG_TYPE_SEQUENCE_RESET, msgSeqNum.incrementAndGet(), time); + sequenceReset.append(ORIG_SENDING_TIME).append(time); sequenceReset.append(NEW_SEQ_NO).append(msgSeqNum.get() + 1); setChecksumAndBodyLength(sequenceReset); @@ -696,7 +782,7 @@ public void sendHeartbeat() { StringBuilder heartbeat = new StringBuilder(); int seqNum = msgSeqNum.incrementAndGet(); - setHeader(heartbeat, MSG_TYPE_HEARTBEAT, seqNum); + setHeader(heartbeat, MSG_TYPE_HEARTBEAT, seqNum, null); setChecksumAndBodyLength(heartbeat); if (enabled.get()) { @@ -711,7 +797,7 @@ public void sendHeartbeat() { public void sendTestRequest() { //do private StringBuilder testRequest = new StringBuilder(); - setHeader(testRequest, MSG_TYPE_TEST_REQUEST, msgSeqNum.incrementAndGet()); + setHeader(testRequest, MSG_TYPE_TEST_REQUEST, msgSeqNum.incrementAndGet(), null); testRequest.append(TEST_REQ_ID).append(testReqID.incrementAndGet()); setChecksumAndBodyLength(testRequest); if (enabled.get()) { @@ -736,7 +822,7 @@ public void sendLogon() { else reset = settings.getResetOnLogon(); if (reset) msgSeqNum.getAndSet(0); - setHeader(logon, MSG_TYPE_LOGON, msgSeqNum.get() + 1); + setHeader(logon, MSG_TYPE_LOGON, msgSeqNum.get() + 1, null); if (settings.useNextExpectedSeqNum()) logon.append(NEXT_EXPECTED_SEQ_NUM).append(serverMsgSeqNum.get() + 1); if (settings.getEncryptMethod() != null) logon.append(ENCRYPT_METHOD).append(settings.getEncryptMethod()); logon.append(HEART_BT_INT).append(settings.getHeartBtInt()); @@ -766,7 +852,7 @@ public void sendLogon() { private void sendLogout() { if (enabled.get()) { StringBuilder logout = new StringBuilder(); - setHeader(logout, MSG_TYPE_LOGOUT, msgSeqNum.incrementAndGet()); + setHeader(logout, MSG_TYPE_LOGOUT, msgSeqNum.incrementAndGet(), null); setChecksumAndBodyLength(logout); LOGGER.debug("Sending logout - {}", logout); @@ -820,14 +906,19 @@ private void waitLogoutResponse() { } } - private void setHeader(StringBuilder stringBuilder, String msgType, Integer seqNum) { + private void setHeader(StringBuilder stringBuilder, String msgType, Integer seqNum, String time) { stringBuilder.append(BEGIN_STRING_TAG).append("=").append(settings.getBeginString()); stringBuilder.append(MSG_TYPE).append(msgType); stringBuilder.append(MSG_SEQ_NUM).append(seqNum); if (settings.getSenderCompID() != null) stringBuilder.append(SENDER_COMP_ID).append(settings.getSenderCompID()); if (settings.getTargetCompID() != null) stringBuilder.append(TARGET_COMP_ID).append(settings.getTargetCompID()); if (settings.getSenderSubID() != null) stringBuilder.append(SENDER_SUB_ID).append(settings.getSenderSubID()); - stringBuilder.append(SENDING_TIME).append(getTime()); + stringBuilder.append(SENDING_TIME); + if(time != null) { + stringBuilder.append(time); + } else { + stringBuilder.append(getTime()); + } } private void setChecksumAndBodyLength(StringBuilder stringBuilder) { @@ -882,6 +973,43 @@ public String getTime() { return formatter.format(datetime); } + private void setTime(ByteBuf buf) { + FixField sendingTime = findField(buf, SENDING_TIME_TAG); + FixField seqNum = requireNonNull(findField(buf, MSG_SEQ_NUM_TAG), "SeqNum field was null."); + + String time = getTime(); + if (sendingTime == null) { + seqNum.insertNext(SENDING_TIME, time).insertNext(ORIG_SENDING_TIME, time); + } else { + String value = sendingTime.getValue(); + + if (value == null || value.isEmpty() || value.equals("null")) { + sendingTime.setValue(time); + sendingTime.insertNext(ORIG_SENDING_TIME, time); + } else { + sendingTime.setValue(time); + sendingTime.insertNext(ORIG_SENDING_TIME, value); + } + } + } + + private void setPossDup(ByteBuf buf) { + FixField sendingTime = requireNonNull(findField(buf, SENDING_TIME_TAG)); + sendingTime.insertNext(POSS_DUP_TAG, IS_POSS_DUP); + } + + private StringBuilder createSequenceReset(int seqNo, int newSeqNo) { + StringBuilder sequenceReset = new StringBuilder(); + String time = getTime(); + setHeader(sequenceReset, MSG_TYPE_SEQUENCE_RESET, seqNo, time); + sequenceReset.append(ORIG_SENDING_TIME).append(time); + sequenceReset.append(POSS_DUP).append(IS_POSS_DUP); + sequenceReset.append(GAP_FILL_FLAG).append("Y"); + sequenceReset.append(NEW_SEQ_NO).append(newSeqNo); + setChecksumAndBodyLength(sequenceReset); + return sequenceReset; + } + public AtomicBoolean getEnabled() { return enabled; } diff --git a/src/main/java/com/exactpro/th2/FixHandlerSettings.java b/src/main/java/com/exactpro/th2/FixHandlerSettings.java index c2563d4..0aaff99 100644 --- a/src/main/java/com/exactpro/th2/FixHandlerSettings.java +++ b/src/main/java/com/exactpro/th2/FixHandlerSettings.java @@ -55,6 +55,7 @@ public class FixHandlerSettings implements IHandlerSettings { private Boolean useNextExpectedSeqNum = false; private Boolean saveAdminMessages = false; private Boolean loadSequencesFromCradle = false; + private Boolean loadMissedMessagesFromCradle = false; private Boolean resetStateOnServerReset = false; @JsonDeserialize(using = LocalTimeDeserializer.class) @@ -234,6 +235,14 @@ public void setLoadSequencesFromCradle(Boolean loadSequencesFromCradle) { this.loadSequencesFromCradle = loadSequencesFromCradle; } + public Boolean isLoadMissedMessagesFromCradle() { + return loadMissedMessagesFromCradle; + } + + public void setLoadMissedMessagesFromCradle(Boolean loadMissedMessagesFromCradle) { + this.loadMissedMessagesFromCradle = loadMissedMessagesFromCradle; + } + public Boolean getResetStateOnServerReset() { return resetStateOnServerReset; } diff --git a/src/main/java/com/exactpro/th2/constants/Constants.java b/src/main/java/com/exactpro/th2/constants/Constants.java index b796a44..35252d7 100644 --- a/src/main/java/com/exactpro/th2/constants/Constants.java +++ b/src/main/java/com/exactpro/th2/constants/Constants.java @@ -33,6 +33,7 @@ public class Constants { public static final Integer TARGET_COMP_ID_TAG = 56; public static final Integer MSG_SEQ_NUM_TAG = 34; public static final Integer SENDING_TIME_TAG = 52; + public static final Integer ORIG_SENDING_TIME_TAG = 122; public static final Integer CHECKSUM_TAG = 10; public static final Integer DEFAULT_APPL_VER_ID_TAG = 1137; public static final Integer SENDER_SUB_ID_TAG = 50; @@ -79,7 +80,8 @@ public class Constants { public static final String SENDER_SUB_ID = SOH + SENDER_SUB_ID_TAG + "="; public static final String RESET_SEQ_NUM = SOH + RESET_SEQ_NUM_TAG + "="; public static final String NEXT_EXPECTED_SEQ_NUM = SOH + NEXT_EXPECTED_SEQ_NUMBER_TAG + "="; - public static final String POSS_DUP = SOH + NEXT_EXPECTED_SEQ_NUMBER_TAG + "="; + public static final String POSS_DUP = SOH + POSS_DUP_TAG + "="; + public static final String ORIG_SENDING_TIME = SOH + ORIG_SENDING_TIME_TAG + "="; //message types public static final String MSG_TYPE_LOGON = "A"; diff --git a/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/MessageLoader.kt b/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/MessageLoader.kt new file mode 100644 index 0000000..2ff5fb5 --- /dev/null +++ b/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/MessageLoader.kt @@ -0,0 +1,275 @@ +/* + * Copyright 2023 Exactpro (Exactpro Systems Limited) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.exactpro.th2.conn.dirty.fix + +import com.exactpro.th2.SequenceHolder +import com.exactpro.th2.common.grpc.Direction +import com.exactpro.th2.common.message.toTimestamp +import com.exactpro.th2.common.util.toInstant +import com.exactpro.th2.constants.Constants.IS_POSS_DUP +import com.exactpro.th2.constants.Constants.MSG_SEQ_NUM_TAG +import com.exactpro.th2.constants.Constants.POSS_DUP_TAG +import com.exactpro.th2.dataprovider.lw.grpc.DataProviderService +import com.exactpro.th2.dataprovider.lw.grpc.MessageGroupResponse +import com.exactpro.th2.dataprovider.lw.grpc.MessageSearchRequest +import com.exactpro.th2.dataprovider.lw.grpc.MessageSearchResponse +import com.exactpro.th2.dataprovider.lw.grpc.MessageStream +import com.exactpro.th2.dataprovider.lw.grpc.TimeRelation +import com.exactpro.th2.lme.oe.util.ProviderCall +import com.google.protobuf.Timestamp +import com.google.protobuf.util.Timestamps.compare +import io.netty.buffer.ByteBuf +import io.netty.buffer.Unpooled +import java.time.Instant +import java.time.LocalDate +import java.time.LocalDateTime +import java.time.LocalTime +import java.time.OffsetDateTime +import java.time.OffsetTime +import java.time.ZoneId +import java.time.ZoneOffset +import java.time.ZonedDateTime +import java.util.concurrent.locks.ReentrantLock +import kotlin.concurrent.withLock +import mu.KotlinLogging + +class MessageLoader( + private val dataProvider: DataProviderService, + private val sessionStartTime: LocalTime?, + private val bookName: String +) { + private var sessionStart: ZonedDateTime + private val searchLock = ReentrantLock() + + init { + val today = LocalDate.now(ZoneOffset.UTC) + val start = sessionStartTime?.atDate(today) + val now = LocalDateTime.now() + if(start == null) { + sessionStart = OffsetDateTime + .now(ZoneOffset.UTC) + .with(LocalTime.now()) + .atZoneSameInstant(ZoneId.systemDefault()) + } else { + sessionStart = if(start.isAfter(now)) { + OffsetDateTime + .now(ZoneOffset.UTC) + .minusDays(1) + .with(sessionStartTime) + .atZoneSameInstant(ZoneId.systemDefault()) + } else { + OffsetDateTime + .now(ZoneOffset.UTC) + .with(sessionStartTime) + .atZoneSameInstant(ZoneId.systemDefault()) + } + } + } + + private var sessionStartTimestamp = sessionStart + .toInstant() + .toTimestamp() + + private var previousDaySessionStart = sessionStart + .minusDays(1) + .toInstant() + .toTimestamp() + + fun updateTime() { + searchLock.withLock { + sessionStart = ZonedDateTime + .now(ZoneOffset.UTC) + .with(OffsetTime.now(ZoneOffset.UTC)) + sessionStartTimestamp = sessionStart + .toInstant() + .toTimestamp() + previousDaySessionStart = sessionStart + .minusDays(1) + .toInstant() + .toTimestamp() + } + } + + fun loadInitialSequences(sessionAlias: String): SequenceHolder = searchLock.withLock { + val serverSeq = ProviderCall.withCancellation { + searchMessage( + dataProvider.searchMessages( + createSearchRequest( + Instant.now().toTimestamp(), + Direction.FIRST, + sessionAlias + ) + ) + ) { _, seqNum -> seqNum?.toInt() ?: 0 } + } + val clientSeq = ProviderCall.withCancellation { + searchMessage( + dataProvider.searchMessages( + createSearchRequest( + Instant.now().toTimestamp(), + Direction.SECOND, + sessionAlias + ) + ), + true + ) { _, seqNum -> seqNum?.toInt() ?: 0 } + } + K_LOGGER.info { "Loaded sequences: client sequence - $clientSeq; server sequence - $serverSeq" } + return SequenceHolder(clientSeq, serverSeq) + } + + fun processMessagesInRange( + direction: Direction, + sessionAlias: String, + fromSequence: Long, + processMessage: (ByteBuf) -> Boolean + ) = searchLock.withLock { + processMessagesInRangeInternal(direction, sessionAlias, fromSequence, processMessage) + } + + fun processMessagesInRangeInternal( + direction: Direction, + sessionAlias: String, + fromSequence: Long, + processMessage: (ByteBuf) -> Boolean + ) { + var timestamp: Timestamp? = null + ProviderCall.withCancellation { + val backwardIterator = dataProvider.searchMessages( + createSearchRequest(Instant.now().toTimestamp(), direction, sessionAlias) + ) + + var firstValidMessage = firstValidMessageDetails(backwardIterator) ?: return@withCancellation + + var messagesToSkip = firstValidMessage.payloadSequence - fromSequence + + timestamp = firstValidMessage.timestamp + + while (backwardIterator.hasNext() && messagesToSkip > 0) { + val message = backwardIterator.next().message + if(compare(message.messageId.timestamp, previousDaySessionStart) <= 0) { + continue + } + timestamp = message.messageId.timestamp + messagesToSkip -= 1 + if(messagesToSkip == 0L) { + + val buf = Unpooled.copiedBuffer(message.bodyRaw.toByteArray()) + val sequence = buf.findField(MSG_SEQ_NUM_TAG)?.value?.toInt() ?: continue + + if(checkPossDup(buf)) { + val validMessage = firstValidMessageDetails(backwardIterator) ?: break + + timestamp = validMessage.timestamp + if(validMessage.payloadSequence <= fromSequence) { + break + } else { + messagesToSkip = validMessage.payloadSequence - fromSequence + } + + } else { + + if(sequence <= fromSequence) { + break + } else { + messagesToSkip = sequence - fromSequence + } + } + } + } + } + + val startSearchTimestamp = timestamp ?: return + + K_LOGGER.info { "Loading retransmission messages from ${startSearchTimestamp.toInstant()}" } + + ProviderCall.withCancellation { + + val iterator = dataProvider.searchMessages( + createSearchRequest( + startSearchTimestamp, + direction, + sessionAlias, + TimeRelation.NEXT, + Instant.now().toTimestamp() + ) + ) + + while (iterator.hasNext()) { + val message = Unpooled.buffer().writeBytes(iterator.next().message.bodyRaw.toByteArray()) + if (!processMessage(message)) break + } + } + } + + private fun searchMessage( + iterator: Iterator, + checkPossFlag: Boolean = false, + extractValue: (MessageGroupResponse?, String?) -> T + ): T { + var message: MessageGroupResponse? + while (iterator.hasNext()) { + message = iterator.next().message + if(sessionStartTime != null && compare(sessionStartTimestamp, message.messageId.timestamp) > 0) { + return extractValue(message, null) + } + + val bodyRaw = Unpooled.copiedBuffer(message.bodyRaw.toByteArray()) + val seqNum = bodyRaw.findField(MSG_SEQ_NUM_TAG)?.value ?: continue + + if(checkPossFlag && checkPossDup(bodyRaw)) continue + + return extractValue(message, seqNum) + } + return extractValue(null, null) + } + + private fun firstValidMessageDetails(iterator: Iterator): MessageDetails? = searchMessage( + iterator, + true + ) { message, seqNum -> + if(message == null || seqNum == null) return@searchMessage null + MessageDetails(seqNum.toInt(), message.messageId.sequence, message.messageId.timestamp) + } + + private fun createSearchRequest( + timestamp: Timestamp, + direction: Direction, + sessionAlias: String, + searchDirection: TimeRelation = TimeRelation.PREVIOUS, + endTimestamp: Timestamp = previousDaySessionStart + ) = MessageSearchRequest.newBuilder().apply { + startTimestamp = timestamp + this.endTimestamp = endTimestamp + addResponseFormats(BASE64_FORMAT) + addStream( + MessageStream.newBuilder() + .setName(sessionAlias) + .setDirection(direction) + ) + bookIdBuilder.name = bookName + this.searchDirection = searchDirection + }.build() + + private fun checkPossDup(buf: ByteBuf): Boolean = buf.findField(POSS_DUP_TAG)?.value == IS_POSS_DUP + + data class MessageDetails(val payloadSequence: Int, val messageSequence: Long, val timestamp: Timestamp) + + companion object { + val K_LOGGER = KotlinLogging.logger { } + private const val BASE64_FORMAT = "BASE_64" + } +} \ No newline at end of file diff --git a/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/ProviderCall.kt b/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/ProviderCall.kt new file mode 100644 index 0000000..d2906f5 --- /dev/null +++ b/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/ProviderCall.kt @@ -0,0 +1,28 @@ +/******************************************************************************* + * Copyright (c) 2023, Exactpro Systems LLC + * www.exactpro.com + * Build Software to Test Software + * + * All rights reserved. + * This is unpublished, licensed software, confidential and proprietary + * information which is the property of Exactpro Systems LLC or its licensors. + ******************************************************************************/ +package com.exactpro.th2.lme.oe.util + +import io.grpc.Context + +class ProviderCall { + companion object { + fun withCancellation(code: () -> T): T { + return Context.current().withCancellation().use { context -> + val toRestore = context.attach() + val result = try { + code() + } finally { + context.detach(toRestore) + } + return@use result + } + } + } +} \ No newline at end of file diff --git a/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/SequenceLoader.kt b/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/SequenceLoader.kt deleted file mode 100644 index eb33ed5..0000000 --- a/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/SequenceLoader.kt +++ /dev/null @@ -1,123 +0,0 @@ -/* - * Copyright 2023 Exactpro (Exactpro Systems Limited) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.exactpro.th2.conn.dirty.fix - -import com.exactpro.th2.SequenceHolder -import com.exactpro.th2.common.grpc.Direction -import com.exactpro.th2.common.message.toTimestamp -import com.exactpro.th2.constants.Constants.MSG_SEQ_NUM_TAG -import com.exactpro.th2.dataprovider.lw.grpc.DataProviderService -import com.exactpro.th2.dataprovider.lw.grpc.MessageGroupResponse -import com.exactpro.th2.dataprovider.lw.grpc.MessageSearchRequest -import com.exactpro.th2.dataprovider.lw.grpc.MessageStream -import com.exactpro.th2.dataprovider.lw.grpc.TimeRelation -import com.google.protobuf.Int32Value -import com.google.protobuf.Timestamp -import com.google.protobuf.util.Timestamps.compare -import io.netty.buffer.Unpooled -import java.time.Instant -import java.time.LocalDate -import java.time.LocalDateTime -import java.time.LocalTime -import java.time.OffsetDateTime -import java.time.ZoneId -import java.time.ZoneOffset -import java.time.ZonedDateTime - - -class SequenceLoader( - private val dataProvider: DataProviderService, - private val sessionStartTime: LocalTime?, - private val sessionAlias: String, - private val bookName: String -) { - private val sessionStart: ZonedDateTime - - init { - val today = LocalDate.now(ZoneOffset.UTC) - val start = sessionStartTime?.atDate(today) - val now = LocalDateTime.now() - if(start == null) { - sessionStart = OffsetDateTime - .now(ZoneOffset.UTC) - .with(LocalTime.now()) - .atZoneSameInstant(ZoneId.systemDefault()) - } else { - sessionStart = if(start.isAfter(now)) { - OffsetDateTime - .now(ZoneOffset.UTC) - .minusDays(1) - .with(sessionStartTime) - .atZoneSameInstant(ZoneId.systemDefault()) - } else { - OffsetDateTime - .now(ZoneOffset.UTC) - .with(sessionStartTime) - .atZoneSameInstant(ZoneId.systemDefault()) - } - } - } - - private val sessionStartDateTime = sessionStart - .toInstant() - .toTimestamp() - - private val sessionStartYesterday = sessionStart - .minusDays(1) - .toInstant() - .toTimestamp() - - fun load(): SequenceHolder { - val serverSeq = searchSeq(createSearchRequest(Instant.now().toTimestamp(), Direction.FIRST)) - val clientSeq = searchSeq(createSearchRequest(Instant.now().toTimestamp(), Direction.SECOND)) - return SequenceHolder(clientSeq, serverSeq) - } - - private fun searchSeq(request: MessageSearchRequest): Int { - var message: MessageGroupResponse? = null - for (response in dataProvider.searchMessages(request)) { - message = response.message - if (sessionStartTime != null && compare(sessionStartDateTime, message.messageId.timestamp) > 0) { - return 0 - } - val buffer = Unpooled.wrappedBuffer(message.bodyRaw.asReadOnlyByteBuffer()) - return buffer.findField(MSG_SEQ_NUM_TAG)?.value?.toInt() ?: continue - } - return when (message) { - null -> 0 - else -> searchSeq(createSearchRequest(message.messageId.timestamp, message.messageId.direction)) - } - } - - private fun createSearchRequest(timestamp: Timestamp, direction: Direction) = - MessageSearchRequest.newBuilder().apply { - startTimestamp = timestamp - endTimestamp = sessionStartYesterday - searchDirection = TimeRelation.PREVIOUS - addResponseFormats(BASE_64_FORMAT) - addStream( - MessageStream.newBuilder() - .setName(sessionAlias) - .setDirection(direction) - ) - bookIdBuilder.name = bookName - resultCountLimit = Int32Value.of(5) - }.build() - - companion object { - const val BASE_64_FORMAT = "BASE_64" - } -} \ No newline at end of file diff --git a/src/test/java/com/exactpro/th2/FixHandlerTest.java b/src/test/java/com/exactpro/th2/FixHandlerTest.java index 0cd7c5e..0a61f8a 100644 --- a/src/test/java/com/exactpro/th2/FixHandlerTest.java +++ b/src/test/java/com/exactpro/th2/FixHandlerTest.java @@ -20,17 +20,10 @@ import com.exactpro.th2.common.grpc.MessageID; import com.exactpro.th2.conn.dirty.tcp.core.api.IChannel; import com.exactpro.th2.conn.dirty.tcp.core.api.IHandlerContext; +import com.exactpro.th2.dataprovider.lw.grpc.DataProviderService; import com.exactpro.th2.util.MessageUtil; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; -import kotlin.Unit; -import org.jetbrains.annotations.NotNull; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.mockito.Mockito; - import java.net.InetSocketAddress; import java.nio.charset.StandardCharsets; import java.time.Clock; @@ -43,13 +36,19 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; +import kotlin.Unit; +import org.jetbrains.annotations.NotNull; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; import static com.exactpro.th2.conn.dirty.fix.FixByteBufUtilKt.findField; import static com.exactpro.th2.constants.Constants.BEGIN_STRING_TAG; import static com.exactpro.th2.constants.Constants.BODY_LENGTH_TAG; import static com.exactpro.th2.constants.Constants.CHECKSUM_TAG; import static com.exactpro.th2.constants.Constants.DEFAULT_APPL_VER_ID_TAG; -import static com.exactpro.th2.constants.Constants.END_SEQ_NO_TAG; import static com.exactpro.th2.constants.Constants.MSG_SEQ_NUM_TAG; import static com.exactpro.th2.constants.Constants.MSG_TYPE_TAG; import static com.exactpro.th2.constants.Constants.NEW_SEQ_NO_TAG; @@ -60,7 +59,7 @@ class FixHandlerTest { - + private static final ByteBuf logonResponse = Unpooled.wrappedBuffer("8=FIXT.1.1\0019=105\00135=A\00134=1\00149=server\00156=client\00150=system\00152=2014-12-22T10:15:30Z\00198=0\001108=30\0011137=9\0011409=0\00110=203\001".getBytes(StandardCharsets.US_ASCII)); private Channel channel; private FixHandler fixHandler; private static ByteBuf buffer; @@ -77,10 +76,9 @@ static void init() { @BeforeEach void beforeEach() { - channel = new Channel(createHandlerSettings()); + channel = new Channel(createHandlerSettings(), null); fixHandler = channel.getFixHandler(); fixHandler.onOpen(channel); - ByteBuf logonResponse = Unpooled.wrappedBuffer("8=FIXT.1.1\0019=105\00135=A\00134=1\00149=server\00156=client\00150=system\00152=2014-12-22T10:15:30Z\00198=0\001108=30\0011137=9\0011409=0\00110=203\001".getBytes(StandardCharsets.US_ASCII)); fixHandler.onIncoming(channel, logonResponse); } @@ -329,7 +327,7 @@ void handleResendRequestTest() { channel.clearQueue(); fixHandler.onIncoming(channel, resendRequest); ByteBuf sequenceReset = channel.getQueue().get(0); - assertEquals("8=FIXT.1.1\u00019=75\u000135=4\u000134=1\u000149=client\u000156=server\u000150=trader\u000152=2014-12-22T10:15:30Z\u0001123=Y\u000136=5\u000110=115\u0001", new String(sequenceReset.array())); + assertEquals("8=FIXT.1.1\u00019=105\u000135=4\u000134=1\u000149=client\u000156=server\u000150=trader\u000152=2014-12-22T10:15:30Z\u0001122=2014-12-22T10:15:30Z\u000143=Y\u0001123=Y\u000136=5\u000110=162\u0001", new String(sequenceReset.array())); channel.clearQueue(); fixHandler.sendResendRequest(2); ByteBuf resendRequestOutgoing = channel.getQueue().get(0); @@ -410,7 +408,6 @@ void updateTagTest() { MessageUtil.updateTag(buf, DEFAULT_APPL_VER_ID_TAG.toString(), "1"); assertEquals(expected2, buf.toString(StandardCharsets.US_ASCII)); } - } class Channel implements IChannel { @@ -418,10 +415,12 @@ class Channel implements IChannel { private final MyFixHandler fixHandler; private final List queue = new ArrayList<>(); - Channel(FixHandlerSettings fixHandlerSettings) { + Channel(FixHandlerSettings fixHandlerSettings, DataProviderService dataProviderService) { this.fixHandlerSettings = fixHandlerSettings; IHandlerContext context = Mockito.mock(IHandlerContext.class); Mockito.when(context.getSettings()).thenReturn(this.fixHandlerSettings); + Mockito.when(context.getGrpcService(DataProviderService.class)).thenReturn(dataProviderService); + Mockito.when(context.getBookName()).thenReturn("bookName"); this.fixHandler = new MyFixHandler(context); } diff --git a/src/test/java/com/exactpro/th2/RecoveryTest.java b/src/test/java/com/exactpro/th2/RecoveryTest.java new file mode 100644 index 0000000..f0618dd --- /dev/null +++ b/src/test/java/com/exactpro/th2/RecoveryTest.java @@ -0,0 +1,281 @@ +/* + * Copyright 2023 Exactpro (Exactpro Systems Limited) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.exactpro.th2; + +import com.exactpro.th2.conn.dirty.fix.MessageSearcher; +import com.exactpro.th2.dataprovider.lw.grpc.DataProviderService; +import com.exactpro.th2.dataprovider.lw.grpc.MessageGroupResponse; +import com.exactpro.th2.dataprovider.lw.grpc.MessageSearchRequest; +import com.exactpro.th2.dataprovider.lw.grpc.MessageSearchResponse; +import com.google.protobuf.ByteString; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import static com.exactpro.th2.FixHandlerTest.createHandlerSettings; +import static com.exactpro.th2.conn.dirty.fix.FixByteBufUtilKt.findField; +import static com.exactpro.th2.constants.Constants.MSG_SEQ_NUM_TAG; +import static com.exactpro.th2.constants.Constants.MSG_TYPE_SEQUENCE_RESET; +import static com.exactpro.th2.constants.Constants.MSG_TYPE_TAG; +import static com.exactpro.th2.constants.Constants.NEW_SEQ_NO_TAG; +import static com.exactpro.th2.constants.Constants.POSS_DUP_TAG; +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class RecoveryTest { + + private static final ByteBuf logonResponse = Unpooled.wrappedBuffer("8=FIXT.1.1\0019=105\00135=A\00134=1\00149=server\00156=client\00150=system\00152=2014-12-22T10:15:30Z\00198=0\001108=30\0011137=9\0011409=0\00110=203\001".getBytes(StandardCharsets.US_ASCII)); + private Channel channel; + private FixHandler fixHandler; + + @Test + void testSequenceResetInRange() { + FixHandlerSettings settings = createHandlerSettings(); + settings.setLoadMissedMessagesFromCradle(true); + DataProviderService dataProviderService = Mockito.mock(DataProviderService.class); + MessageSearcher ms = new MessageSearcher( + List.of( + messageSearchResponse(2), + messageSearchResponse(3), + messageSearchResponse(4), + messageSearchResponse(5) + ) + ); + Mockito.when(dataProviderService.searchMessages(Mockito.any())).thenAnswer( + x -> ms.searchMessages(x.getArgumentAt(0, MessageSearchRequest.class)) + ); + channel = new Channel(settings, dataProviderService); + fixHandler = channel.getFixHandler(); + fixHandler.onOpen(channel); + fixHandler.onIncoming(channel, logonResponse); + // requesting resend from 2 to 5 + ByteBuf resendRequest = Unpooled.wrappedBuffer("8=FIXT.1.1\u00019=73\u000135=2\u000134=2\u000149=client\u000156=server\u000150=trader\u000152=2014-12-22T10:15:30Z\u00017=2\u000116=5\u000110=226\u0001".getBytes(StandardCharsets.UTF_8)); + fixHandler.onIncoming(channel, resendRequest); + assertEquals(5, channel.getQueue().size()); + + for(int i = 1; i <= 4; i++) { + ByteBuf buf = channel.getQueue().get(i); + assertEquals(findField(buf, MSG_TYPE_TAG).getValue(), "C"); + assertEquals(Integer.parseInt(findField(buf, MSG_SEQ_NUM_TAG).getValue()), i + 1); + assertEquals(findField(buf, POSS_DUP_TAG).getValue(), "Y"); + } + } + + @Test + void testSequenceResetInsideRange() { + FixHandlerSettings settings = createHandlerSettings(); + settings.setLoadMissedMessagesFromCradle(true); + DataProviderService dataProviderService = Mockito.mock(DataProviderService.class); + MessageSearcher ms = new MessageSearcher( + List.of( + messageSearchResponse(4), + messageSearchResponse(5) + ) + ); + Mockito.when(dataProviderService.searchMessages(Mockito.any())).thenAnswer( + x -> ms.searchMessages(x.getArgumentAt(0, MessageSearchRequest.class)) + ); + channel = new Channel(settings, dataProviderService); + fixHandler = channel.getFixHandler(); + fixHandler.onOpen(channel); + fixHandler.onIncoming(channel, logonResponse); + // handler sequence after loop is 22 + for(int i = 0; i <= 20; i++) { + fixHandler.onOutgoing( + channel, + Unpooled.buffer().writeBytes(messageWithoutSeqNum().getBytes(StandardCharsets.UTF_8)), + new HashMap<>() + ); + } + // requesting resend from 2 to 8 + ByteBuf resendRequest = Unpooled.wrappedBuffer("8=FIXT.1.1\u00019=73\u000135=2\u000134=2\u000149=client\u000156=server\u000150=trader\u000152=2014-12-22T10:15:30Z\u00017=2\u000116=8\u000110=226\u0001".getBytes(StandardCharsets.UTF_8)); + fixHandler.onIncoming(channel, resendRequest); + assertEquals(channel.getQueue().size(), 5); + + // for missed messages after beginSeqNo to 4 + ByteBuf firstSequenceReset = channel.getQueue().get(1); + assertEquals(findField(firstSequenceReset, MSG_TYPE_TAG).getValue(), MSG_TYPE_SEQUENCE_RESET); + assertEquals(Integer.parseInt(findField(firstSequenceReset, MSG_SEQ_NUM_TAG).getValue()), 2); + assertEquals(Integer.parseInt(findField(firstSequenceReset, NEW_SEQ_NO_TAG).getValue()), 4); + + ByteBuf message4 = channel.getQueue().get(2); + + assertEquals(findField(message4, MSG_TYPE_TAG).getValue(), "C"); + assertEquals(Integer.parseInt(findField(message4, MSG_SEQ_NUM_TAG).getValue()), 4); + assertEquals(findField(message4, POSS_DUP_TAG).getValue(), "Y"); + + ByteBuf message5 = channel.getQueue().get(3); + + assertEquals(findField(message5, MSG_TYPE_TAG).getValue(), "C"); + assertEquals(Integer.parseInt(findField(message5, MSG_SEQ_NUM_TAG).getValue()), 5); + assertEquals(findField(message5, POSS_DUP_TAG).getValue(), "Y"); + + // For missed messages after 4 + ByteBuf seqReset2 = channel.getQueue().get(4); + assertEquals(findField(seqReset2, MSG_TYPE_TAG).getValue(), MSG_TYPE_SEQUENCE_RESET); + assertEquals(Integer.parseInt(findField(seqReset2, MSG_SEQ_NUM_TAG).getValue()), 6); + assertEquals(Integer.parseInt(findField(seqReset2, NEW_SEQ_NO_TAG).getValue()), 23); + } + + @Test + void testSequenceResetOutOfRange() { + FixHandlerSettings settings = createHandlerSettings(); + settings.setLoadMissedMessagesFromCradle(true); + DataProviderService dataProviderService = Mockito.mock(DataProviderService.class); + MessageSearcher ms = new MessageSearcher( + List.of( + messageSearchResponse(1), + messageSearchResponse(2), + messageSearchResponse(3), + messageSearchResponse(4), + messageSearchResponse(5), + messageSearchResponse(6) + ) + ); + Mockito.when(dataProviderService.searchMessages(Mockito.any())).thenAnswer( + x -> ms.searchMessages(x.getArgumentAt(0, MessageSearchRequest.class)) + ); + channel = new Channel(settings, dataProviderService); + fixHandler = channel.getFixHandler(); + fixHandler.onOpen(channel); + fixHandler.onIncoming(channel, logonResponse); + // requesting resend from 2 to 5 + ByteBuf resendRequest = Unpooled.wrappedBuffer("8=FIXT.1.1\u00019=73\u000135=2\u000134=2\u000149=client\u000156=server\u000150=trader\u000152=2014-12-22T10:15:30Z\u00017=2\u000116=5\u000110=226\u0001".getBytes(StandardCharsets.UTF_8)); + fixHandler.onIncoming(channel, resendRequest); + assertEquals(5, channel.getQueue().size()); + for(int i = 1; i <= 4; i++) { + ByteBuf buf = channel.getQueue().get(i); + assertEquals(findField(buf, MSG_TYPE_TAG).getValue(), "C"); + assertEquals(Integer.parseInt(findField(buf, MSG_SEQ_NUM_TAG).getValue()), i + 1); + assertEquals(findField(buf, POSS_DUP_TAG).getValue(), "Y"); + } + } + + @Test + void testSequenceResetAdminMessages() { + FixHandlerSettings settings = createHandlerSettings(); + settings.setLoadMissedMessagesFromCradle(true); + DataProviderService dataProviderService = Mockito.mock(DataProviderService.class); + MessageSearcher ms = new MessageSearcher( + List.of( + messageSearchResponseAdmin(2), + messageSearchResponse(4), + messageSearchResponseAdmin(5), + messageSearchResponseAdmin(6) + ) + ); + Mockito.when(dataProviderService.searchMessages(Mockito.any())).thenAnswer( + x -> ms.searchMessages(x.getArgumentAt(0, MessageSearchRequest.class)) + ); + channel = new Channel(settings, dataProviderService); + fixHandler = channel.getFixHandler(); + fixHandler.onOpen(channel); + fixHandler.onIncoming(channel, logonResponse); + // handler sequence after loop is 22 + for(int i = 0; i <= 20; i++) { + fixHandler.onOutgoing( + channel, + Unpooled.buffer().writeBytes(messageWithoutSeqNum().getBytes(StandardCharsets.UTF_8)), + new HashMap<>() + ); + } + // requesting resend from 1 to 5 + ByteBuf resendRequest = Unpooled.wrappedBuffer("8=FIXT.1.1\u00019=73\u000135=2\u000134=2\u000149=client\u000156=server\u000150=trader\u000152=2014-12-22T10:15:30Z\u00017=1\u000116=5\u000110=226\u0001".getBytes(StandardCharsets.UTF_8)); + fixHandler.onIncoming(channel, resendRequest); + + // sequence reset for meesages from 1 to 3 ( 1, 2 - missing, 3 - admin ) + ByteBuf seqReset1 = channel.getQueue().get(1); + assertEquals(findField(seqReset1, MSG_TYPE_TAG).getValue(), MSG_TYPE_SEQUENCE_RESET); + assertEquals(Integer.parseInt(findField(seqReset1, MSG_SEQ_NUM_TAG).getValue()), 1); + assertEquals(Integer.parseInt(findField(seqReset1, NEW_SEQ_NO_TAG).getValue()), 4); + + ByteBuf message = channel.getQueue().get(2); + assertEquals(findField(message, MSG_TYPE_TAG).getValue(), "C"); + assertEquals(Integer.parseInt(findField(message, MSG_SEQ_NUM_TAG).getValue()), 4); + assertEquals(findField(message, POSS_DUP_TAG).getValue(), "Y"); + + // sequence reset for meesages from 1 to 3 ( 1, 2 - missing, 3 - admin ) + ByteBuf seqReset2 = channel.getQueue().get(3); + assertEquals(findField(seqReset2, MSG_TYPE_TAG).getValue(), MSG_TYPE_SEQUENCE_RESET); + assertEquals(Integer.parseInt(findField(seqReset2, MSG_SEQ_NUM_TAG).getValue()), 5); + assertEquals(Integer.parseInt(findField(seqReset2, NEW_SEQ_NO_TAG).getValue()), 23); + + } + + @Test + void allMessagesMissed() { + FixHandlerSettings settings = createHandlerSettings(); + settings.setLoadMissedMessagesFromCradle(true); + DataProviderService dataProviderService = Mockito.mock(DataProviderService.class); + Mockito.when(dataProviderService.searchMessages(Mockito.any())).thenReturn( + new ArrayList().iterator() + ); + channel = new Channel(settings, dataProviderService); + fixHandler = channel.getFixHandler(); + fixHandler.onOpen(channel); + fixHandler.onIncoming(channel, logonResponse); + // handler sequence after loop is 22 + for(int i = 0; i <= 20; i++) { + fixHandler.onOutgoing( + channel, + Unpooled.buffer().writeBytes(messageWithoutSeqNum().getBytes(StandardCharsets.UTF_8)), + new HashMap<>() + ); + } + // requesting resend from 1 to 5 + ByteBuf resendRequest = Unpooled.wrappedBuffer("8=FIXT.1.1\u00019=73\u000135=2\u000134=2\u000149=client\u000156=server\u000150=trader\u000152=2014-12-22T10:15:30Z\u00017=1\u000116=5\u000110=226\u0001".getBytes(StandardCharsets.UTF_8)); + fixHandler.onIncoming(channel, resendRequest); + + // sequence reset for meesages from 1 to 3 ( 1, 2 - missing, 3 - admin ) + ByteBuf seqReset = channel.getQueue().get(1); + assertEquals(findField(seqReset, MSG_TYPE_TAG).getValue(), MSG_TYPE_SEQUENCE_RESET); + assertEquals(Integer.parseInt(findField(seqReset, MSG_SEQ_NUM_TAG).getValue()), 1); + assertEquals(Integer.parseInt(findField(seqReset, NEW_SEQ_NO_TAG).getValue()), 23); + } + + private MessageSearchResponse messageSearchResponse(Integer sequence) { + return MessageSearchResponse.newBuilder() + .setMessage( + MessageGroupResponse.newBuilder() + .setBodyRaw(ByteString.copyFromUtf8(message(sequence))) + ).build(); + } + + private MessageSearchResponse messageSearchResponseAdmin(Integer sequence) { + return MessageSearchResponse.newBuilder() + .setMessage( + MessageGroupResponse.newBuilder() + .setBodyRaw(ByteString.copyFromUtf8(adminMessage(sequence))) + ).build(); + } + + private String message(Integer sequence) { + return String.format("8=FIXT.1.1\u00019=70\u000135=C\u0001552=1\u000149=client\u000134=%d\u000156=server\u000152=2014-12-22T10:15:30Z\u000150=trader\u000110=132\u0001", sequence); + } + + private String messageWithoutSeqNum() { + return String.format("8=FIXT.1.1\u00019=70\u000135=C\u0001552=1\u000149=client\u000156=server\u000152=2014-12-22T10:15:30Z\u000150=trader\u000110=132\u0001"); + } + + private String adminMessage(Integer sequence) { + return String.format("8=FIXT.1.1\u00019=70\u000135=4\u0001552=1\u000149=client\u000134=%d\u000156=server\u000152=2014-12-22T10:15:30Z\u000150=trader\u000110=132\u0001", sequence); + } +} diff --git a/src/test/kotlin/com/exactpro/th2/conn/dirty/fix/MessageSearcher.kt b/src/test/kotlin/com/exactpro/th2/conn/dirty/fix/MessageSearcher.kt new file mode 100644 index 0000000..8d7553b --- /dev/null +++ b/src/test/kotlin/com/exactpro/th2/conn/dirty/fix/MessageSearcher.kt @@ -0,0 +1,41 @@ +/* + * Copyright 2022-2023 Exactpro (Exactpro Systems Limited) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.exactpro.th2.conn.dirty.fix + +import com.exactpro.th2.dataprovider.lw.grpc.MessageSearchRequest +import com.exactpro.th2.dataprovider.lw.grpc.MessageSearchResponse +import com.exactpro.th2.dataprovider.lw.grpc.TimeRelation +import com.google.protobuf.util.Timestamps + +class MessageSearcher(private val messages: List) { + + fun searchMessages(request: MessageSearchRequest): Iterator { + val limit = request.resultCountLimit.value + val startTimestamp = request.startTimestamp + val searchDirection = request.searchDirection + + var filteredMessages = if (searchDirection == TimeRelation.NEXT) { + messages.filter { + Timestamps.compare(it.message.messageId.timestamp, startTimestamp) >= 0 } + } else { + messages.filter { + Timestamps.compare(it.message.messageId.timestamp, startTimestamp) <= 0 + }.reversed() + } + + return filteredMessages.iterator() + } +} \ No newline at end of file From 1f02cee594666ab0827eba189271f62697e06f0c Mon Sep 17 00:00:00 2001 From: isengrims <104489572+isengrims@users.noreply.github.com> Date: Fri, 11 Aug 2023 20:09:54 +0400 Subject: [PATCH 4/9] Test request handler (#56) --- README.md | 8 +- build.gradle | 8 +- gradle.properties | 2 +- .../java/com/exactpro/th2/FixHandler.java | 88 ++++++++++++++----- .../com/exactpro/th2/FixHandlerSettings.java | 10 +-- .../com/exactpro/th2/constants/Constants.java | 2 + 6 files changed, 84 insertions(+), 34 deletions(-) diff --git a/README.md b/README.md index 341df80..943996b 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# th2-conn-dirty-fix (1.2.0) +# th2-conn-dirty-fix (1.3.0) This microservice allows sending and receiving messages via FIX protocol @@ -49,6 +49,7 @@ This microservice allows sending and receiving messages via FIX protocol + *useNextExpectedSeqNum* - session management based on next expected sequence number. (`false` by default) + *saveAdminMessages* - defines if admin messages will be saved to internal outgoing buffer. (`false` by default) + *resetStateOnServerReset* - whether to reset the server sequence after receiving logout with text `Next Expected MSN too high, MSN to be sent is x but received y`. ++ *logoutOnIncorrectServerSequence* - whether to logout session when server send message with sequence number less than expected. If `false` then internal conn sequence will be reset to sequence number from server message. ### Security settings @@ -328,6 +329,11 @@ spec: # Changelog +## 1.3.0 +* Added handling for incoming test request messages +* Fixed resetSeqNum flag handling on incoming logon messages. +* Added option to automatically reset server sequence when internal conn sequence doesn't match with sequence that server sent. + ## 1.2.0 * loading requested messages from cradle. diff --git a/build.gradle b/build.gradle index ab58b29..6d44354 100644 --- a/build.gradle +++ b/build.gradle @@ -2,7 +2,8 @@ plugins { id 'java' id 'org.jetbrains.kotlin.jvm' version '1.6.21' id 'com.palantir.docker' version '0.25.0' - id "org.owasp.dependencycheck" version "8.1.2" + id "org.owasp.dependencycheck" version "8.2.1" + id "com.gorylenko.gradle-git-properties" version "2.4.1" } apply plugin: 'application' @@ -45,9 +46,9 @@ repositories { } dependencies { - api platform('com.exactpro.th2:bom:4.2.0') + api platform('com.exactpro.th2:bom:4.4.0') - implementation 'com.exactpro.th2:common:5.2.0-dev' + implementation 'com.exactpro.th2:common:5.2.2-dev' implementation 'com.exactpro.th2:netty-bytebuf-utils:0.0.1' implementation('com.exactpro.th2:conn-dirty-tcp-core:3.0.0-dev') { exclude group: 'org.slf4j', module: 'slf4j-log4j12' @@ -57,6 +58,7 @@ dependencies { implementation 'org.slf4j:slf4j-api' implementation 'io.github.microutils:kotlin-logging:2.1.23' + implementation 'org.apache.commons:commons-lang3:3.13.0' implementation 'io.netty:netty-all' implementation 'org.jetbrains.kotlin:kotlin-stdlib-jdk8:1.6.21' diff --git a/gradle.properties b/gradle.properties index 994869d..a192dcd 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,3 +1,3 @@ -release_version=1.2.0 +release_version=1.3.0 jackson_version=2.11.2 diff --git a/src/main/java/com/exactpro/th2/FixHandler.java b/src/main/java/com/exactpro/th2/FixHandler.java index 24765d2..edd1490 100644 --- a/src/main/java/com/exactpro/th2/FixHandler.java +++ b/src/main/java/com/exactpro/th2/FixHandler.java @@ -86,6 +86,7 @@ import static com.exactpro.th2.constants.Constants.GAP_FILL_FLAG_TAG; import static com.exactpro.th2.constants.Constants.HEART_BT_INT; import static com.exactpro.th2.constants.Constants.IS_POSS_DUP; +import static com.exactpro.th2.constants.Constants.IS_SEQUENCE_RESET_FLAG; import static com.exactpro.th2.constants.Constants.MSG_SEQ_NUM; import static com.exactpro.th2.constants.Constants.MSG_SEQ_NUM_TAG; import static com.exactpro.th2.constants.Constants.MSG_TYPE; @@ -103,11 +104,11 @@ import static com.exactpro.th2.constants.Constants.NEXT_EXPECTED_SEQ_NUM; import static com.exactpro.th2.constants.Constants.NEXT_EXPECTED_SEQ_NUMBER_TAG; import static com.exactpro.th2.constants.Constants.ORIG_SENDING_TIME; -import static com.exactpro.th2.constants.Constants.ORIG_SENDING_TIME_TAG; import static com.exactpro.th2.constants.Constants.PASSWORD; import static com.exactpro.th2.constants.Constants.POSS_DUP; import static com.exactpro.th2.constants.Constants.POSS_DUP_TAG; import static com.exactpro.th2.constants.Constants.RESET_SEQ_NUM; +import static com.exactpro.th2.constants.Constants.RESET_SEQ_NUM_TAG; import static com.exactpro.th2.constants.Constants.SENDER_COMP_ID; import static com.exactpro.th2.constants.Constants.SENDER_COMP_ID_TAG; import static com.exactpro.th2.constants.Constants.SENDER_SUB_ID; @@ -120,6 +121,7 @@ import static com.exactpro.th2.constants.Constants.TARGET_COMP_ID_TAG; import static com.exactpro.th2.constants.Constants.TEST_REQ_ID; import static com.exactpro.th2.constants.Constants.TEST_REQ_ID_TAG; +import static com.exactpro.th2.constants.Constants.TEXT; import static com.exactpro.th2.constants.Constants.TEXT_TAG; import static com.exactpro.th2.constants.Constants.USERNAME; import static com.exactpro.th2.netty.bytebuf.util.ByteBufUtil.indexOf; @@ -352,11 +354,24 @@ public Map onIncoming(@NotNull IChannel channel, @NotNull ByteBu int receivedMsgSeqNum = Integer.parseInt(requireNonNull(msgSeqNumValue.getValue())); + if(msgTypeValue.equals(MSG_TYPE_LOGON) && receivedMsgSeqNum < serverMsgSeqNum.get()) { + FixField resetSeqNumFlagField = findField(message, RESET_SEQ_NUM_TAG); + if(resetSeqNumFlagField != null && Objects.equals(resetSeqNumFlagField.getValue(), IS_SEQUENCE_RESET_FLAG)) { + serverMsgSeqNum.set(0); + } + } + if(receivedMsgSeqNum < serverMsgSeqNum.get() && !isDup) { - sendLogout(); - reconnectRequestTimer = executorService.schedule(this::sendLogon, settings.getReconnectDelay(), TimeUnit.SECONDS); + if(settings.isLogoutOnIncorrectServerSequence()) { + context.send(CommonUtil.toEvent(String.format("Received server sequence %d but expected %d. Sending logout with text: MsgSeqNum is too low...", receivedMsgSeqNum, serverMsgSeqNum.get()))); + sendLogout(String.format("MsgSeqNum too low, expecting %d but received %d", serverMsgSeqNum.get() + 1, receivedMsgSeqNum)); + reconnectRequestTimer = executorService.schedule(this::sendLogon, settings.getReconnectDelay(), TimeUnit.SECONDS); + if (LOGGER.isErrorEnabled()) LOGGER.error("Invalid message. SeqNum is less than expected {}: {}", serverMsgSeqNum.get() + 1, message.toString(US_ASCII)); + } else { + context.send(CommonUtil.toEvent(String.format("Received server sequence %d but expected %d. Correcting server sequence.", receivedMsgSeqNum, serverMsgSeqNum.get() + 1))); + serverMsgSeqNum.set(receivedMsgSeqNum - 1); + } metadata.put(REJECT_REASON, "SeqNum is less than expected."); - if (LOGGER.isErrorEnabled()) LOGGER.error("Invalid message. SeqNum is less than expected {}: {}", serverMsgSeqNum.get(), message.toString(US_ASCII)); return metadata; } @@ -425,6 +440,10 @@ public Map onIncoming(@NotNull IChannel channel, @NotNull ByteBu if (LOGGER.isInfoEnabled()) LOGGER.info("Sequence reset received - {}", message.toString(US_ASCII)); resetSequence(message); break; + case MSG_TYPE_TEST_REQUEST: + if (LOGGER.isInfoEnabled()) LOGGER.info("Test request received - {}", message.toString(US_ASCII)); + handleTestRequest(message, metadata); + break; } resetTestRequestTask(); @@ -434,26 +453,32 @@ public Map onIncoming(@NotNull IChannel channel, @NotNull ByteBu return metadata; } + private Map handleTestRequest(ByteBuf message, Map metadata) { + FixField testReqId = findField(message, TEST_REQ_ID_TAG); + if(testReqId == null || testReqId.getValue() == null) { + metadata.put(REJECT_REASON, "Test Request message hasn't got TestReqId field."); + return metadata; + } + + sendHeartbeatTestReqId(testReqId.getValue()); + + return null; + } + private void handleLogout(@NotNull ByteBuf message) { if (LOGGER.isInfoEnabled()) LOGGER.info("Logout received - {}", message.toString(US_ASCII)); - FixField sessionStatus = findField(message, SESSION_STATUS_TAG); boolean isSequenceChanged = false; - if(sessionStatus != null) { - int statusCode = Integer.parseInt(Objects.requireNonNull(sessionStatus.getValue())); - if(statusCode != SUCCESSFUL_LOGOUT_CODE) { - FixField text = findField(message, TEXT_TAG); - if (text != null) { - LOGGER.warn("Received Logout has text (58) tag: {}", text.getValue()); - String wrongClientSequence = StringUtils.substringBetween(text.getValue(), "expecting ", " but received"); - if (wrongClientSequence != null) { - msgSeqNum.set(Integer.parseInt(wrongClientSequence) - 1); - isSequenceChanged = true; - } - String wrongClientNextExpectedSequence = StringUtils.substringBetween(text.getValue(), "MSN to be sent is ", " but received"); - if(wrongClientNextExpectedSequence != null && settings.getResetStateOnServerReset()) { - serverMsgSeqNum.set(Integer.parseInt(wrongClientNextExpectedSequence)); - } - } + FixField text = findField(message, TEXT_TAG); + if (text != null) { + LOGGER.warn("Received Logout has text (58) tag: {}", text.getValue()); + String wrongClientSequence = StringUtils.substringBetween(text.getValue(), "expecting ", " but received"); + if (wrongClientSequence != null) { + msgSeqNum.set(Integer.parseInt(wrongClientSequence) - 1); + isSequenceChanged = true; + } + String wrongClientNextExpectedSequence = StringUtils.substringBetween(text.getValue(), "MSN to be sent is ", " but received"); + if(wrongClientNextExpectedSequence != null && settings.getResetStateOnServerReset()) { + serverMsgSeqNum.set(Integer.parseInt(wrongClientNextExpectedSequence)); } } @@ -778,11 +803,16 @@ public void onOpen(@NotNull IChannel channel) { sendLogon(); } - public void sendHeartbeat() { + public void sendHeartbeat() {sendHeartbeatTestReqId(null);} + + private void sendHeartbeatTestReqId(String testReqId) { StringBuilder heartbeat = new StringBuilder(); int seqNum = msgSeqNum.incrementAndGet(); setHeader(heartbeat, MSG_TYPE_HEARTBEAT, seqNum, null); + if(testReqId != null) { + heartbeat.append(TEST_REQ_ID).append(testReqId); + } setChecksumAndBodyLength(heartbeat); if (enabled.get()) { @@ -818,8 +848,11 @@ public void sendLogon() { } StringBuilder logon = new StringBuilder(); Boolean reset; - if (!connStarted.get()) reset = settings.getResetSeqNumFlag(); - else reset = settings.getResetOnLogon(); + if (!connStarted.get()) { + reset = settings.getResetSeqNumFlag(); + } else { + reset = settings.getResetOnLogon(); + } if (reset) msgSeqNum.getAndSet(0); setHeader(logon, MSG_TYPE_LOGON, msgSeqNum.get() + 1, null); @@ -850,9 +883,16 @@ public void sendLogon() { } private void sendLogout() { + sendLogout(null); + } + + private void sendLogout(String text) { if (enabled.get()) { StringBuilder logout = new StringBuilder(); setHeader(logout, MSG_TYPE_LOGOUT, msgSeqNum.incrementAndGet(), null); + if(text != null) { + logout.append(TEXT).append(text); + } setChecksumAndBodyLength(logout); LOGGER.debug("Sending logout - {}", logout); diff --git a/src/main/java/com/exactpro/th2/FixHandlerSettings.java b/src/main/java/com/exactpro/th2/FixHandlerSettings.java index 0aaff99..3417160 100644 --- a/src/main/java/com/exactpro/th2/FixHandlerSettings.java +++ b/src/main/java/com/exactpro/th2/FixHandlerSettings.java @@ -53,10 +53,10 @@ public class FixHandlerSettings implements IHandlerSettings { private Boolean resetSeqNumFlag = false; private Boolean resetOnLogon = false; private Boolean useNextExpectedSeqNum = false; - private Boolean saveAdminMessages = false; private Boolean loadSequencesFromCradle = false; private Boolean loadMissedMessagesFromCradle = false; private Boolean resetStateOnServerReset = false; + private Boolean logoutOnIncorrectServerSequence = false; @JsonDeserialize(using = LocalTimeDeserializer.class) private LocalTime sessionStartTime; @@ -259,12 +259,12 @@ public void setUseNextExpectedSeqNum(Boolean useNextExpectedSeqNum) { this.useNextExpectedSeqNum = useNextExpectedSeqNum; } - public Boolean isSaveAdminMessages() { - return saveAdminMessages; + public Boolean isLogoutOnIncorrectServerSequence() { + return logoutOnIncorrectServerSequence; } - public void setSaveAdminMessages(Boolean saveAdminMessages) { - this.saveAdminMessages = saveAdminMessages; + public void setLogoutOnIncorrectServerSequence(Boolean logoutOnIncorrectServerSequence) { + this.logoutOnIncorrectServerSequence = logoutOnIncorrectServerSequence; } public LocalTime getSessionStartTime() { diff --git a/src/main/java/com/exactpro/th2/constants/Constants.java b/src/main/java/com/exactpro/th2/constants/Constants.java index 35252d7..de4beed 100644 --- a/src/main/java/com/exactpro/th2/constants/Constants.java +++ b/src/main/java/com/exactpro/th2/constants/Constants.java @@ -82,6 +82,7 @@ public class Constants { public static final String NEXT_EXPECTED_SEQ_NUM = SOH + NEXT_EXPECTED_SEQ_NUMBER_TAG + "="; public static final String POSS_DUP = SOH + POSS_DUP_TAG + "="; public static final String ORIG_SENDING_TIME = SOH + ORIG_SENDING_TIME_TAG + "="; + public static final String TEXT = SOH + TEXT_TAG + "="; //message types public static final String MSG_TYPE_LOGON = "A"; @@ -100,5 +101,6 @@ public class Constants { ); public static final String IS_POSS_DUP = "Y"; + public static final String IS_SEQUENCE_RESET_FLAG = "Y"; public static final int SUCCESSFUL_LOGOUT_CODE = 4; } From 5a62f039eec8a7b62a5d678edb7e3eaa4a8fcfe3 Mon Sep 17 00:00:00 2001 From: isengrims <104489572+isengrims@users.noreply.github.com> Date: Tue, 15 Aug 2023 14:58:54 +0400 Subject: [PATCH 5/9] Fix recovery (#57) --- README.md | 4 +++- gradle.properties | 2 +- src/main/java/com/exactpro/th2/FixHandler.java | 9 +++++---- 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index 943996b..ea5f10e 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# th2-conn-dirty-fix (1.3.0) +# th2-conn-dirty-fix (1.3.1) This microservice allows sending and receiving messages via FIX protocol @@ -328,6 +328,8 @@ spec: ``` # Changelog +# 1.3.1 +* fix multiple consequent SOH characters ## 1.3.0 * Added handling for incoming test request messages diff --git a/gradle.properties b/gradle.properties index a192dcd..81dbf81 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,3 +1,3 @@ -release_version=1.3.0 +release_version=1.3.1 jackson_version=2.11.2 diff --git a/src/main/java/com/exactpro/th2/FixHandler.java b/src/main/java/com/exactpro/th2/FixHandler.java index edd1490..60c5ffe 100644 --- a/src/main/java/com/exactpro/th2/FixHandler.java +++ b/src/main/java/com/exactpro/th2/FixHandler.java @@ -104,6 +104,7 @@ import static com.exactpro.th2.constants.Constants.NEXT_EXPECTED_SEQ_NUM; import static com.exactpro.th2.constants.Constants.NEXT_EXPECTED_SEQ_NUMBER_TAG; import static com.exactpro.th2.constants.Constants.ORIG_SENDING_TIME; +import static com.exactpro.th2.constants.Constants.ORIG_SENDING_TIME_TAG; import static com.exactpro.th2.constants.Constants.PASSWORD; import static com.exactpro.th2.constants.Constants.POSS_DUP; import static com.exactpro.th2.constants.Constants.POSS_DUP_TAG; @@ -585,7 +586,7 @@ private void recovery(int beginSeqNo, int endSeqNo) { if(ADMIN_MESSAGES.contains(msgType)) return true; FixField possDup = findField(buf, POSS_DUP_TAG); - if(possDup != null && possDup.getValue() == IS_POSS_DUP) return true; + if(possDup != null && Objects.equals(possDup.getValue(), IS_POSS_DUP)) return true; if(sequence - 1 != lastProcessedSequence.get() ) { int newSeqNo = sequence; @@ -1019,16 +1020,16 @@ private void setTime(ByteBuf buf) { String time = getTime(); if (sendingTime == null) { - seqNum.insertNext(SENDING_TIME, time).insertNext(ORIG_SENDING_TIME, time); + seqNum.insertNext(SENDING_TIME_TAG, time).insertNext(SENDING_TIME_TAG, time); } else { String value = sendingTime.getValue(); if (value == null || value.isEmpty() || value.equals("null")) { sendingTime.setValue(time); - sendingTime.insertNext(ORIG_SENDING_TIME, time); + sendingTime.insertNext(ORIG_SENDING_TIME_TAG, time); } else { sendingTime.setValue(time); - sendingTime.insertNext(ORIG_SENDING_TIME, value); + sendingTime.insertNext(ORIG_SENDING_TIME_TAG, value); } } } From 226b57b57a6bfd16f8cf643ea885ec870cd72d71 Mon Sep 17 00:00:00 2001 From: isengrims <104489572+isengrims@users.noreply.github.com> Date: Tue, 29 Aug 2023 18:57:59 +0400 Subject: [PATCH 6/9] [TH2-5026] Improve logging. (#55) --- README.md | 5 +- gradle.properties | 2 +- .../java/com/exactpro/th2/FixHandler.java | 84 +++++++++++-------- 3 files changed, 56 insertions(+), 35 deletions(-) diff --git a/README.md b/README.md index ea5f10e..16490a7 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# th2-conn-dirty-fix (1.3.1) +# th2-conn-dirty-fix (1.3.2) This microservice allows sending and receiving messages via FIX protocol @@ -328,6 +328,9 @@ spec: ``` # Changelog +### 1.3.2 +* Improve logging: log session group and session alias for each log message. + # 1.3.1 * fix multiple consequent SOH characters diff --git a/gradle.properties b/gradle.properties index 81dbf81..56657b9 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,3 +1,3 @@ -release_version=1.3.1 +release_version=1.3.2 jackson_version=2.11.2 diff --git a/src/main/java/com/exactpro/th2/FixHandler.java b/src/main/java/com/exactpro/th2/FixHandler.java index 60c5ffe..23829f0 100644 --- a/src/main/java/com/exactpro/th2/FixHandler.java +++ b/src/main/java/com/exactpro/th2/FixHandler.java @@ -237,7 +237,7 @@ public void onStart() { channel = context.createChannel(address, settings.getSecurity(), Map.of(), true, settings.getReconnectDelay() * 1000L, Integer.MAX_VALUE); if(settings.isLoadSequencesFromCradle()) { SequenceHolder sequences = messageLoader.loadInitialSequences(channel.getSessionAlias()); - LOGGER.info("Loaded sequences are: client - {}, server - {}", sequences.getClientSeq(), sequences.getServerSeq()); + info("Loaded sequences are: client - %d, server - %d", sequences.getClientSeq(), sequences.getServerSeq()); msgSeqNum.set(sequences.getClientSeq()); serverMsgSeqNum.set(sequences.getServerSeq()); } @@ -260,11 +260,11 @@ public CompletableFuture send(@NotNull RawMessage rawMessage) { } while (channel.isOpen() && !enabled.get()) { - if (LOGGER.isWarnEnabled()) LOGGER.warn("Session is not yet logged in: {}", channel.getSessionAlias()); + warn("Session is not yet logged in"); try { Thread.sleep(1000); } catch (InterruptedException e) { - LOGGER.error("Error while sleeping."); + error("Error while sleeping.", null); } } @@ -329,14 +329,14 @@ public Map onIncoming(@NotNull IChannel channel, @NotNull ByteBu FixField msgSeqNumValue = findField(message, MSG_SEQ_NUM_TAG); if (msgSeqNumValue == null) { metadata.put(REJECT_REASON, "No msgSeqNum Field"); - if (LOGGER.isErrorEnabled()) LOGGER.error("Invalid message. No MsgSeqNum in message: {}", message.toString(US_ASCII)); + if(LOGGER.isErrorEnabled()) error("Invalid message. No MsgSeqNum in message: %s", null, message.toString(US_ASCII)); return metadata; } FixField msgType = findField(message, MSG_TYPE_TAG); if (msgType == null) { metadata.put(REJECT_REASON, "No msgType Field"); - if (LOGGER.isErrorEnabled()) LOGGER.error("Invalid message. No MsgType in message: {}", message.toString(US_ASCII)); + if(LOGGER.isErrorEnabled()) error("Invalid message. No MsgType in message: %s", null, message.toString(US_ASCII)); return metadata; } @@ -367,7 +367,7 @@ public Map onIncoming(@NotNull IChannel channel, @NotNull ByteBu context.send(CommonUtil.toEvent(String.format("Received server sequence %d but expected %d. Sending logout with text: MsgSeqNum is too low...", receivedMsgSeqNum, serverMsgSeqNum.get()))); sendLogout(String.format("MsgSeqNum too low, expecting %d but received %d", serverMsgSeqNum.get() + 1, receivedMsgSeqNum)); reconnectRequestTimer = executorService.schedule(this::sendLogon, settings.getReconnectDelay(), TimeUnit.SECONDS); - if (LOGGER.isErrorEnabled()) LOGGER.error("Invalid message. SeqNum is less than expected {}: {}", serverMsgSeqNum.get() + 1, message.toString(US_ASCII)); + if (LOGGER.isErrorEnabled()) error("Invalid message. SeqNum is less than expected %d: %s", null, serverMsgSeqNum.get(), message.toString(US_ASCII)); } else { context.send(CommonUtil.toEvent(String.format("Received server sequence %d but expected %d. Correcting server sequence.", receivedMsgSeqNum, serverMsgSeqNum.get() + 1))); serverMsgSeqNum.set(receivedMsgSeqNum - 1); @@ -385,18 +385,18 @@ public Map onIncoming(@NotNull IChannel channel, @NotNull ByteBu switch (msgTypeValue) { case MSG_TYPE_HEARTBEAT: - if (LOGGER.isInfoEnabled()) LOGGER.info("Heartbeat received - {}", message.toString(US_ASCII)); + if(LOGGER.isInfoEnabled()) info("Heartbeat received - %s", message.toString(US_ASCII)); checkHeartbeat(message); break; case MSG_TYPE_LOGON: - if (LOGGER.isInfoEnabled()) LOGGER.info("Logon received - {}", message.toString(US_ASCII)); + if(LOGGER.isInfoEnabled()) info("Logon received - %s", message.toString(US_ASCII)); boolean connectionSuccessful = checkLogon(message); if (connectionSuccessful) { if(settings.useNextExpectedSeqNum()) { FixField nextExpectedSeqField = findField(message, NEXT_EXPECTED_SEQ_NUMBER_TAG); if(nextExpectedSeqField == null) { metadata.put(REJECT_REASON, "No NextExpectedSeqNum field"); - if (LOGGER.isErrorEnabled()) LOGGER.error("Invalid message. No NextExpectedSeqNum in message: {}", message.toString(US_ASCII)); + if(LOGGER.isErrorEnabled()) error("Invalid message. No NextExpectedSeqNum in message: %s", null, message.toString(US_ASCII)); return metadata; } @@ -430,15 +430,13 @@ public Map onIncoming(@NotNull IChannel channel, @NotNull ByteBu reconnectRequestTimer = executorService.schedule(this::sendLogon, settings.getReconnectDelay(), TimeUnit.SECONDS); } break; - case MSG_TYPE_LOGOUT: //extract logout reason - handleLogout(message); - break; + //extract logout reason case MSG_TYPE_RESEND_REQUEST: - if (LOGGER.isInfoEnabled()) LOGGER.info("Resend request received - {}", message.toString(US_ASCII)); + if(LOGGER.isInfoEnabled()) info("Resend request received - %s", message.toString(US_ASCII)); handleResendRequest(message); break; case MSG_TYPE_SEQUENCE_RESET: //gap fill - if (LOGGER.isInfoEnabled()) LOGGER.info("Sequence reset received - {}", message.toString(US_ASCII)); + if(LOGGER.isInfoEnabled()) info("Sequence reset received - %s", message.toString(US_ASCII)); resetSequence(message); break; case MSG_TYPE_TEST_REQUEST: @@ -467,7 +465,7 @@ private Map handleTestRequest(ByteBuf message, Map processMessage = (buf) -> { FixField seqNum = findField(buf, MSG_SEQ_NUM_TAG); @@ -633,7 +631,7 @@ private void recovery(int beginSeqNo, int endSeqNo) { resetHeartbeatTask(); } catch (Exception e) { - LOGGER.error("Error while loading messages for recovery", e); + error("Error while loading messages for recovery", e); String seqReset = createSequenceReset(Math.max(beginSeqNo, lastProcessedSequence.get() + 1), msgSeqNum.get() + 1).toString(); channel.send( @@ -690,9 +688,7 @@ private boolean checkLogon(ByteBuf message) { public void onOutgoing(@NotNull IChannel channel, @NotNull ByteBuf message, @NotNull Map metadata) { onOutgoingUpdateTag(message, metadata); - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Outgoing message: {}", message.toString(US_ASCII)); - } + if(LOGGER.isDebugEnabled()) debug("Outgoing message: %s", message.toString(US_ASCII)); } public void onOutgoingUpdateTag(@NotNull ByteBuf message, @NotNull Map metadata) { @@ -715,9 +711,7 @@ public void onOutgoingUpdateTag(@NotNull ByteBuf message, @NotNull Map> future) { future.get().cancel(false); } + + private void info(String message, Object... args) { + if(LOGGER.isInfoEnabled()) { + LOGGER.info("{} - {}: {}", channel.getSessionGroup(), channel.getSessionAlias(), String.format(message, args)); + } + } + + private void error(String message, Throwable throwable, Object... args) { + if(LOGGER.isErrorEnabled()) { + LOGGER.error("{} - {}: {}", channel.getSessionGroup(), channel.getSessionAlias(), String.format(message, args), throwable); + } + } + + private void warn(String message, Object... args) { + if(LOGGER.isWarnEnabled()) { + LOGGER.warn("{} - {}: {}", channel.getSessionGroup(), channel.getSessionAlias(), String.format(message, args)); + } + } + + private void debug(String message, Object... args) { + if(LOGGER.isDebugEnabled()) { + LOGGER.debug("{} - {}: {}", channel.getSessionGroup(), channel.getSessionAlias(), String.format(message, args)); + } + } } \ No newline at end of file From 689bf6c76223df7749163549eb3ee0613267273d Mon Sep 17 00:00:00 2001 From: isengrims <104489572+isengrims@users.noreply.github.com> Date: Fri, 15 Sep 2023 13:29:27 +0400 Subject: [PATCH 7/9] [TH2-5014] Ungracefull disconnect based on message property. (#62) * [TH2-5014] Ungraceful disconnect. --- README.md | 6 ++- gradle.properties | 2 +- .../java/com/exactpro/th2/FixHandler.java | 42 ++++++++++++++++++- 3 files changed, 46 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 16490a7..0e9c4c6 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# th2-conn-dirty-fix (1.3.2) +# th2-conn-dirty-fix (1.4.0) This microservice allows sending and receiving messages via FIX protocol @@ -328,6 +328,10 @@ spec: ``` # Changelog +### 1.4.0 +* Ungraceful session disconnect support. +* Removed NPE when session is reset by schedule. + ### 1.3.2 * Improve logging: log session group and session alias for each log message. diff --git a/gradle.properties b/gradle.properties index 56657b9..dd536e5 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,3 +1,3 @@ -release_version=1.3.2 +release_version=1.4.0 jackson_version=2.11.2 diff --git a/src/main/java/com/exactpro/th2/FixHandler.java b/src/main/java/com/exactpro/th2/FixHandler.java index 23829f0..d61deed 100644 --- a/src/main/java/com/exactpro/th2/FixHandler.java +++ b/src/main/java/com/exactpro/th2/FixHandler.java @@ -45,6 +45,7 @@ import java.util.Map; import java.util.Objects; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; @@ -117,7 +118,6 @@ import static com.exactpro.th2.constants.Constants.SENDING_TIME; import static com.exactpro.th2.constants.Constants.SENDING_TIME_TAG; import static com.exactpro.th2.constants.Constants.SESSION_STATUS_TAG; -import static com.exactpro.th2.constants.Constants.SUCCESSFUL_LOGOUT_CODE; import static com.exactpro.th2.constants.Constants.TARGET_COMP_ID; import static com.exactpro.th2.constants.Constants.TARGET_COMP_ID_TAG; import static com.exactpro.th2.constants.Constants.TEST_REQ_ID; @@ -144,6 +144,7 @@ public class FixHandler implements AutoCloseable, IHandler { private static final byte BYTE_SOH = 1; private static final String STRING_MSG_TYPE = "MsgType"; private static final String REJECT_REASON = "Reject reason"; + private static final String UNGRACEFUL_DISCONNECT_PROPERTY = "ungracefulDisconnect"; private static final String STUBBING_VALUE = "XXX"; private final AtomicInteger msgSeqNum = new AtomicInteger(0); @@ -251,6 +252,31 @@ public CompletableFuture send(@NotNull RawMessage rawMessage) { throw new IllegalStateException("Session is not active. It is not possible to send messages."); } + ByteBuf buf = Unpooled.copiedBuffer(rawMessage.getBody().toByteArray()); + Map props = rawMessage.getMetadata().getPropertiesMap(); + + FixField msgType = findField(buf, MSG_TYPE_TAG); + boolean isLogout = msgType != null && Objects.equals(msgType.getValue(), MSG_TYPE_LOGOUT); + if(isLogout && !channel.isOpen()) { + String message = String.format("%s - %s: Logout ignored as channel is already closed.", channel.getSessionGroup(), channel.getSessionAlias()); + LOGGER.warn(message); + context.send(CommonUtil.toEvent(message)); + return CompletableFuture.completedFuture(null); + } + + boolean isUngracefulDisconnect = Boolean.parseBoolean(props.get(UNGRACEFUL_DISCONNECT_PROPERTY)); + if(isLogout) { + context.send(CommonUtil.toEvent(String.format("Closing session %s. Is graceful disconnect: %b", channel.getSessionAlias(), !isUngracefulDisconnect))); + try { + disconnect(!isUngracefulDisconnect); + enabled.set(false); + channel.open().get(); + } catch (Exception e) { + context.send(CommonUtil.toErrorEvent(String.format("Error while ending session %s by user logout. Is graceful disconnect: %b", channel.getSessionAlias(), !isUngracefulDisconnect), e)); + } + return CompletableFuture.completedFuture(null); + } + if (!channel.isOpen()) { try { channel.open().get(); @@ -510,7 +536,9 @@ private void reset() { msgSeqNum.set(0); serverMsgSeqNum.set(0); sessionActive.set(true); - messageLoader.updateTime(); + if(messageLoader != null) { + messageLoader.updateTime(); + } channel.open(); } @@ -941,6 +969,16 @@ private void waitLogoutResponse() { } } + private void disconnect(Boolean graceful) throws ExecutionException, InterruptedException { + if(graceful) { + sendLogout(); + waitLogoutResponse(); + } + resetHeartbeatTask(); + resetTestRequestTask(); + channel.close().get(); + } + private void setHeader(StringBuilder stringBuilder, String msgType, Integer seqNum, String time) { stringBuilder.append(BEGIN_STRING_TAG).append("=").append(settings.getBeginString()); stringBuilder.append(MSG_TYPE).append(msgType); From 2d8467fc2899c912ddef95e94fd22cf343c576c9 Mon Sep 17 00:00:00 2001 From: isengrims <104489572+isengrims@users.noreply.github.com> Date: Mon, 18 Sep 2023 15:40:21 +0400 Subject: [PATCH 8/9] Disable automatic dev releases for branches with name dev-version-* (#64) Co-authored-by: Denis Plotnikov --- .github/workflows/dev-docker-publish.yml | 1 - .github/workflows/dev-release-docker-publish.yml | 6 ++---- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/.github/workflows/dev-docker-publish.yml b/.github/workflows/dev-docker-publish.yml index b8551de..eab42dc 100644 --- a/.github/workflows/dev-docker-publish.yml +++ b/.github/workflows/dev-docker-publish.yml @@ -5,7 +5,6 @@ on: branches-ignore: - master - version-* - - dev-version-* - dependabot** paths-ignore: - README.md diff --git a/.github/workflows/dev-release-docker-publish.yml b/.github/workflows/dev-release-docker-publish.yml index ec3a015..54c4d76 100644 --- a/.github/workflows/dev-release-docker-publish.yml +++ b/.github/workflows/dev-release-docker-publish.yml @@ -3,10 +3,8 @@ name: Build and publish dev-release Java distributions to sonatype. on: workflow_dispatch: push: - branches: - - dev-version-* - paths: - - gradle.properties + tags: + - \d+.\d+.\d+-dev jobs: build-job: From 6f8d220f2ab321da9d04cef07ff26f5e45662c17 Mon Sep 17 00:00:00 2001 From: Nikita Smirnov <46124551+Nikita-Smirnov-Exactpro@users.noreply.github.com> Date: Thu, 21 Sep 2023 12:10:46 +0400 Subject: [PATCH 9/9] =?UTF-8?q?[TH2-5076]=20Use=20UTC=20timezone=20for=20s?= =?UTF-8?q?ending=20time=20tag=20and=20for=20in=20MessageLo=E2=80=A6=20(#6?= =?UTF-8?q?5)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 5 +++- gradle.properties | 2 +- .../java/com/exactpro/th2/FixHandler.java | 24 +++++++++---------- .../th2/conn/dirty/fix/MessageLoader.kt | 4 ++-- 4 files changed, 19 insertions(+), 16 deletions(-) diff --git a/README.md b/README.md index 0e9c4c6..0a38d34 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# th2-conn-dirty-fix (1.4.0) +# th2-conn-dirty-fix (1.4.1) This microservice allows sending and receiving messages via FIX protocol @@ -328,6 +328,9 @@ spec: ``` # Changelog +### 1.4.1 +* Use UTC time zone for sending time tag + ### 1.4.0 * Ungraceful session disconnect support. * Removed NPE when session is reset by schedule. diff --git a/gradle.properties b/gradle.properties index dd536e5..749e44a 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,3 +1,3 @@ -release_version=1.4.0 +release_version=1.4.1 jackson_version=2.11.2 diff --git a/src/main/java/com/exactpro/th2/FixHandler.java b/src/main/java/com/exactpro/th2/FixHandler.java index d61deed..7de2846 100644 --- a/src/main/java/com/exactpro/th2/FixHandler.java +++ b/src/main/java/com/exactpro/th2/FixHandler.java @@ -159,8 +159,8 @@ public class FixHandler implements AutoCloseable, IHandler { private final MessageLoader messageLoader; private final ReentrantLock recoveryLock = new ReentrantLock(); - private AtomicReference> heartbeatTimer = new AtomicReference<>(CompletableFuture.completedFuture(null)); - private AtomicReference> testRequestTimer = new AtomicReference<>(CompletableFuture.completedFuture(null)); + private final AtomicReference> heartbeatTimer = new AtomicReference<>(CompletableFuture.completedFuture(null)); + private final AtomicReference> testRequestTimer = new AtomicReference<>(CompletableFuture.completedFuture(null)); private Future reconnectRequestTimer = CompletableFuture.completedFuture(null); private volatile IChannel channel; protected FixHandlerSettings settings; @@ -294,18 +294,16 @@ public CompletableFuture send(@NotNull RawMessage rawMessage) { } } - CompletableFuture result = CompletableFuture.completedFuture(null); try { recoveryLock.lock(); - result = channel.send(toByteBuf(rawMessage.getBody()), rawMessage.getMetadata().getPropertiesMap(), getEventId(rawMessage), SendMode.HANDLE_AND_MANGLE); + return channel.send(toByteBuf(rawMessage.getBody()), rawMessage.getMetadata().getPropertiesMap(), getEventId(rawMessage), SendMode.HANDLE_AND_MANGLE); } finally { recoveryLock.unlock(); } - return result; } @Override - public ByteBuf onReceive(IChannel channel, ByteBuf buffer) { + public ByteBuf onReceive(@NotNull IChannel channel, ByteBuf buffer) { int offset = buffer.readerIndex(); if (offset == buffer.writerIndex()) return null; @@ -369,7 +367,7 @@ public Map onIncoming(@NotNull IChannel channel, @NotNull ByteBu FixField possDup = findField(message, POSS_DUP_TAG); boolean isDup = false; if(possDup != null) { - isDup = possDup.getValue().equals(IS_POSS_DUP); + isDup = IS_POSS_DUP.equals(possDup.getValue()); } String msgTypeValue = requireNonNull(msgType.getValue()); @@ -522,7 +520,7 @@ private void resetSequence(ByteBuf message) { FixField seqNumValue = findField(message, NEW_SEQ_NO_TAG); if(seqNumValue != null) { - if(gapFillMode == null || gapFillMode.getValue().equals("N")) { + if(gapFillMode == null || "N".equals(gapFillMode.getValue())) { serverMsgSeqNum.set(Integer.parseInt(requireNonNull(seqNumValue.getValue()))); } else { serverMsgSeqNum.set(Integer.parseInt(requireNonNull(seqNumValue.getValue())) - 1); @@ -604,7 +602,7 @@ private void recovery(int beginSeqNo, int endSeqNo) { || msgTypeField == null || msgTypeField.getValue() == null) { return true; } - Integer sequence = Integer.parseInt(seqNum.getValue()); + int sequence = Integer.parseInt(seqNum.getValue()); String msgType = msgTypeField.getValue(); if(sequence < beginSeqNo) return true; @@ -727,7 +725,8 @@ public void onOutgoingUpdateTag(@NotNull ByteBuf message, @NotNull Map "First filed isn't found in message: " + message.toString(US_ASCII)) + .insertPrevious(BEGIN_STRING_TAG, settings.getBeginString()); } FixField bodyLength = findField(message, BODY_LENGTH_TAG, US_ASCII, beginString); @@ -749,7 +748,8 @@ public void onOutgoingUpdateTag(@NotNull ByteBuf message, @NotNull Map