diff --git a/.github/workflows/build-dev-release.yml b/.github/workflows/build-dev-release.yml new file mode 100644 index 0000000..b438d11 --- /dev/null +++ b/.github/workflows/build-dev-release.yml @@ -0,0 +1,15 @@ +name: Build and publish dev release Docker image to Github Container Registry ghcr.io + +on: workflow_dispatch + +jobs: + build: + uses: th2-net/.github/.github/workflows/compound-java.yml@main + with: + build-target: 'Docker' + devRelease: true + createTag: true + docker-username: ${{ github.actor }} + secrets: + docker-password: ${{ secrets.GITHUB_TOKEN }} + nvd-api-key: ${{ secrets.NVD_APIKEY }} \ No newline at end of file diff --git a/.github/workflows/docker-publish.yml b/.github/workflows/build-release.yml similarity index 53% rename from .github/workflows/docker-publish.yml rename to .github/workflows/build-release.yml index cec50a6..dcf70be 100644 --- a/.github/workflows/docker-publish.yml +++ b/.github/workflows/build-release.yml @@ -1,20 +1,15 @@ -name: Build and publish Docker distributions to Github Container Registry ghcr.io +name: Build and publish release Docker image to Github Container Registry ghcr.io -on: - push: - branches: - - master - - version-* - paths: - - gradle.properties -# - package_info.json +on: workflow_dispatch jobs: - build-job: + build: uses: th2-net/.github/.github/workflows/compound-java.yml@main with: build-target: 'Docker' + devRelease: false + createTag: true docker-username: ${{ github.actor }} secrets: docker-password: ${{ secrets.GITHUB_TOKEN }} - \ No newline at end of file + nvd-api-key: ${{ secrets.NVD_APIKEY }} \ No newline at end of file diff --git a/.github/workflows/dev-docker-publish.yml b/.github/workflows/build-sanpshot.yml similarity index 75% rename from .github/workflows/dev-docker-publish.yml rename to .github/workflows/build-sanpshot.yml index 6f77755..9366a44 100644 --- a/.github/workflows/dev-docker-publish.yml +++ b/.github/workflows/build-sanpshot.yml @@ -1,4 +1,4 @@ -name: Dev build and publish Docker distributions to Github Container Registry ghcr.io +name: Build and publish Docker image to Github Container Registry ghcr.io on: push: @@ -17,3 +17,4 @@ jobs: docker-username: ${{ github.actor }} secrets: docker-password: ${{ secrets.GITHUB_TOKEN }} + nvd-api-key: ${{ secrets.NVD_APIKEY }} \ No newline at end of file diff --git a/.github/workflows/ci-unwelcome-words.yml b/.github/workflows/ci-unwelcome-words.yml index 686d0fd..4e5f3a6 100644 --- a/.github/workflows/ci-unwelcome-words.yml +++ b/.github/workflows/ci-unwelcome-words.yml @@ -7,17 +7,17 @@ jobs: test: runs-on: ubuntu-20.04 steps: - - uses: actions/checkout@v3 - with: - ref: ${{ github.sha }} - - name: Checkout tool - uses: actions/checkout@v3 - with: - repository: exactpro-th2/ci-github-action - ref: master - token: ${{ secrets.PAT_CI_ACTION }} - path: ci-github-action - - name: Run CI action - uses: ./ci-github-action - with: - ref: ${{ github.sha }} \ No newline at end of file + - uses: actions/checkout@v4 + with: + ref: ${{ github.sha }} + - name: Checkout tool + uses: actions/checkout@v4 + with: + repository: exactpro-th2/ci-github-action + ref: master + token: ${{ secrets.PAT_CI_ACTION }} + path: ci-github-action + - name: Run CI action + uses: ./ci-github-action + with: + ref: ${{ github.sha }} diff --git a/Dockerfile b/Dockerfile index 8b11ebe..3ee4f84 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM gradle:7.1-jdk11 AS build +FROM gradle:7.6-jdk11 AS build ARG release_version COPY ./ . RUN gradle --no-daemon clean build dockerPrepare -Prelease_version=${release_version} diff --git a/README.md b/README.md index 2aefac7..eeb37ac 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# th2-conn-dirty-fix (0.0.5) +# th2-conn-dirty-fix (1.7.0) This microservice allows sending and receiving messages via FIX protocol @@ -12,6 +12,7 @@ This microservice allows sending and receiving messages via FIX protocol ## Session settings ++ *sessionGroup* - session group for incoming/outgoing th2 messages (equal to session alias by default) + *sessionAlias* - session alias for incoming/outgoing th2 messages + *handler* - handler settings + *mangler* - mangler settings @@ -40,11 +41,21 @@ This microservice allows sending and receiving messages via FIX protocol + *disconnectRequestDelay* - the interval for the shutdown request + *resetSeqNumFlag* - resetting sequence number in initial Logon message (when conn started) + *resetOnLogon* - resetting the sequence number in Logon in other cases (e.g. disconnect) -+ *stateFilePath* - path to file where sequences will be saved to use with next login attempts. It is useful when acceptor does not support sequence reset. (`nullable`) ++ *loadSequencesFromCradle* - defines if sequences will be loaded from cradle to use them in logon message. ++ *loadMissedMessagesFromCradle* - defines how retransmission will be handled. If true, then requested through `ResendRequest` messages (or messages requested on Logon with `NextExpectedSeqNum`) will be loaded from cradle. + *sessionStartTime* - UTC time when session starts. (`nullable`) + *sessionEndTime* - UTC time when session ends. required if startSessionTime is filled. ++ *sendingDateTimeFormat* - `SendingTime` field format for outgoing messages. (`nullable`, `default format` in this case is `"yyyyMMdd-HH:mm:ss.SSSSSSSSS"`) + *useNextExpectedSeqNum* - session management based on next expected sequence number. (`false` by default) + *saveAdminMessages* - defines if admin messages will be saved to internal outgoing buffer. (`false` by default) ++ *resetStateOnServerReset* - whether to reset the server sequence after receiving logout with text `Next Expected MSN too high, MSN to be sent is x but received y`. ++ *logoutOnIncorrectServerSequence* - whether to logout session when server send message with sequence number less than expected. If `false` then internal conn sequence will be reset to sequence number from server message. ++ *connectionTimeoutOnSend* - timeout in milliseconds for sending message from queue thread + (please read about [acknowledgment timeout](https://www.rabbitmq.com/consumers.html#acknowledgement-timeout) to understand the problem). + _Default, 30000 mls._ Each failed sending attempt decreases the timeout in half (but not less than _minConnectionTimeoutOnSend_). + The timeout is reset to the original value after a successful sending attempt. + If connection is not established within the specified timeout an error will be reported. ++ *minConnectionTimeoutOnSend* - minimum value for the sending message timeout in milliseconds. _Default value is 1000 mls._ ### Security settings @@ -220,11 +231,12 @@ metadata: name: fix-client spec: image-name: ghcr.io/th2-net/th2-conn-dirty-fix - image-version: 0.0.1 + image-version: 1.0.0 type: th2-conn custom-config: maxBatchSize: 1000 maxFlushTime: 1000 + batchByGroup: true publishSentEvents: true publishConnectEvents: true sessions: @@ -273,6 +285,9 @@ spec: - remove: { tag: 110, matches: (.*) } update-checksum: false pins: + - name: to_data_provider + connection-type: grpc-client + service-class: com.exactpro.th2.dataprovider.grpc.DataProviderService - name: to_send connection-type: mq attributes: @@ -320,22 +335,135 @@ spec: # Changelog +## 1.7.0 +* Added support for th2 transport protocol +* Added configuration option for non-default book per session. +* Migrated to th2 gradle plugin `0.0.8` +* Updated: + * common: `5.13.1-dev` + * conn-dirty-tcp-core: `3.6.0-dev` + +## 1.6.1 + +* Channel subscriptions recovery on failure +* Updated bom: `4.6.1-dev` +* Updated common: `5.10.0-dev` +* Updated common-utils: `2.2.3-dev` +* Updated conn-dirty-tcp-core: `3.5.0-dev` + +## 1.5.1 + +* Property `th2.operation_timestamp` is added to metadata to each message +* Use mutable map for metadata when sending a messages from the handler + * Fix error when new property with operation timestamp added to the immutable map + +## 1.5.0 + +* `minConnectionTimeoutOnSend` parameter is added. +* Sending timeout now decreases in half on each failed attempt (but not less than `minConnectionTimeoutOnSend`). + +## 1.4.2 +* Ungraceful session disconnect support. +* Removed NPE when session is reset by schedule. +* Use UTC time zone for sending time tag + +## 1.4.1 +* Timeout on send from queue thread + * Parameter `connectionTimeoutOnSend` was added + +## 1.4.0 +* Updated bom: `4.5.0-dev` +* Updated common: `5.4.0-dev` +* Updated common-utils: `2.2.0-dev` +* Updated grpc-lw-data-provider: `2.1.0-dev` +* Updated kotlin: `1.8.22` +* Added support for th2 transport protocol + +## 1.3.2 +* Improve logging: log session group and session alias for each log message. + +## 1.3.1 +* fix multiple consequent SOH characters + +## 1.3.0 +* Added handling for incoming test request messages +* Fixed resetSeqNum flag handling on incoming logon messages. +* Added option to automatically reset server sequence when internal conn sequence doesn't match with sequence that server sent. + +## 1.2.1 +* fix multiple consequent SOH characters + +## 1.2.0 +* loading requested messages from cradle. + +## 1.1.1 +* fix scheduling: hasn't worked for some ranges. + +## 1.1.0 +* state reset option on server update. + +## 1.0.2 +* dev releases +* apply changes from version-0 + +## 1.0.1 +* Add bookId to lw data provider query + +## 1.0.0 +* Bump `conn-dirty-tcp-core` to `3.0.0` for books and pages support + +## 0.3.0 +* Ability to recover messages from cradle. + +## 0.2.0 +* optional state reset on silent server reset. + +## 0.1.1 +* correct sequence numbers increments. +* update conn-dirty-tcp-core to `2.3.0` + +## 0.1.0 +* correct handling of sequence reset with `endSeqNo = 0` +* Skip messages mangling on error in `demo-fix-mangler` with error event instead of throwing exception. +* allow unconditional rule application + +## 0.0.10 +* disable reconnect when session is in not-active state. + +## 0.0.9 +* correct heartbeat and test request handling + +## 0.0.8 + +* th2-common upgrade to `3.44.1` +* th2-bom upgrade to `4.2.0` + +## 0.0.7 + +* wait for acceptor logout response on close +* load sequences from lwdp + +## 0.0.6 + +* wait for logout to be sent + ## 0.0.5 * copy messages before putting them into cache ## 0.0.4 + * Session management based on NextExpectedSeqNum field. * Recovery handling - * outgoing messages are now saved - * if message wasn't saved sequence reset message with gap fill mode flag is sent. + * outgoing messages are now saved + * if message wasn't saved sequence reset message with gap fill mode flag is sent. * Session start and Session end configuration to handle sequence reset by exchange schedule. ## 0.0.3 * Added new password option into settings -* Provided ability to specify encrypt algorithm for reading key from file and encrypting password and new password fields +* Provided ability to specify encrypt algorithm for reading key from file and encrypting password and new password fields ## 0.0.2 -* Supported the password encryption via `RSA` algorithm. +* Supported the password encryption via `RSA` algorithm. \ No newline at end of file diff --git a/build.gradle b/build.gradle index b88c8a4..8c930be 100644 --- a/build.gradle +++ b/build.gradle @@ -1,28 +1,20 @@ +import org.jetbrains.kotlin.gradle.tasks.KotlinCompile + plugins { - id 'java' - id 'org.jetbrains.kotlin.jvm' version '1.6.21' - id 'com.palantir.docker' version '0.25.0' + id "application" + id "com.exactpro.th2.gradle.component" version "0.0.8" + id 'org.jetbrains.kotlin.jvm' version '1.8.22' + id "org.jetbrains.kotlin.kapt" version "1.8.22" } -apply plugin: 'application' -apply plugin: 'com.palantir.docker' -apply plugin: 'kotlin-kapt' - group 'com.exactpro.th2' version release_version -sourceCompatibility = 11 -targetCompatibility = 11 - -ext { - sharedDir = file("${project.rootDir}/shared") +kotlin { + jvmToolchain(11) } repositories { - maven { - name 'MavenLocal' - url sharedDir - } mavenCentral() maven { @@ -37,39 +29,37 @@ repositories { mavenLocal() - configurations.all { + configurations.configureEach { resolutionStrategy.cacheChangingModulesFor 0, 'seconds' resolutionStrategy.cacheDynamicVersionsFor 0, 'seconds' } } dependencies { - api platform('com.exactpro.th2:bom:4.1.0') - - implementation 'com.exactpro.th2:common:3.44.0' - implementation 'com.exactpro.th2:netty-bytebuf-utils:0.0.1' - implementation ('com.exactpro.th2:conn-dirty-tcp-core:2.0.5') { - exclude group: 'org.slf4j', module: 'slf4j-log4j12' - because 'Projects should use only slf4j-api, without coupling to a certain implementation' + implementation("com.exactpro.th2:common:5.13.1-dev") { + exclude group: 'com.exactpro.th2', module: 'task-utils' } + implementation "com.exactpro.th2:common-utils:2.2.3-dev" + implementation 'com.exactpro.th2:netty-bytebuf-utils:0.0.1' + implementation'com.exactpro.th2:conn-dirty-tcp-core:3.6.0-dev' + implementation 'com.exactpro.th2:grpc-lw-data-provider:2.3.1-dev' implementation 'org.slf4j:slf4j-api' - implementation 'io.github.microutils:kotlin-logging:2.1.23' + implementation 'io.github.microutils:kotlin-logging:3.0.5' + implementation 'org.apache.commons:commons-lang3' - implementation 'io.netty:netty-all:4.1.86.Final' - implementation 'org.jetbrains.kotlin:kotlin-stdlib-jdk8:1.6.21' - implementation 'com.google.auto.service:auto-service:1.0.1' + implementation 'io.netty:netty-all' + implementation 'com.google.auto.service:auto-service:1.1.1' - implementation group: 'com.fasterxml.jackson.core', name: 'jackson-core', version: jackson_version - implementation group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: jackson_version - implementation group: 'com.fasterxml.jackson.module', name: 'jackson-module-kotlin', version: jackson_version - implementation group: 'com.fasterxml.jackson.dataformat', name: 'jackson-dataformat-yaml', version: jackson_version + implementation 'com.fasterxml.jackson.core:jackson-core' + implementation 'com.fasterxml.jackson.core:jackson-databind' + implementation 'com.fasterxml.jackson.module:jackson-module-kotlin' - testImplementation 'org.mockito:mockito-all:1.10.19' - testImplementation 'org.jetbrains.kotlin:kotlin-test-junit5:1.7.10' + testImplementation 'org.mockito:mockito-core:5.12.0' + testImplementation 'org.jetbrains.kotlin:kotlin-test-junit5:1.8.22' - annotationProcessor 'com.google.auto.service:auto-service:1.0.1' - kapt 'com.google.auto.service:auto-service:1.0.1' + annotationProcessor 'com.google.auto.service:auto-service:1.1.1' + kapt 'com.google.auto.service:auto-service:1.1.1' } test { @@ -77,31 +67,11 @@ test { } application { - mainClassName 'com.exactpro.th2.conn.dirty.tcp.core.Main' -} - -applicationName = 'service' - -distTar { - archiveName "${applicationName}.tar" -} - -dockerPrepare { - dependsOn distTar -} - -docker { - copySpec.from(tarTree("$buildDir/distributions/${applicationName}.tar")) + mainClass.set('com.exactpro.th2.conn.dirty.tcp.core.Main') } -compileKotlin { - kotlinOptions { - jvmTarget = '11' - } -} - -compileTestKotlin { - kotlinOptions { - jvmTarget = '11' - } -} +dependencyCheck { + suppressionFile="suppressions.xml" + //FIXME: we should check all used dependencies + skipConfigurations = ['kapt', 'kaptClasspath_kaptKotlin', 'kaptTest', 'kaptTestFixtures', 'annotationProcessor'] +} \ No newline at end of file diff --git a/gradle.properties b/gradle.properties index 5858fb1..538334b 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,2 +1,3 @@ -release_version=0.0.5 -jackson_version=2.11.2 \ No newline at end of file +release_version=1.7.0 +description='Dirty-TCP client' +vcs_url=https://github.com/th2-net/th2-conn-dirty-fix \ No newline at end of file diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index 69a9715..070cb70 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,5 +1,5 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-7.1-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-7.6-bin.zip zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists diff --git a/src/main/java/com/exactpro/th2/FixHandler.java b/src/main/java/com/exactpro/th2/FixHandler.java index 80bd069..cb3b06e 100644 --- a/src/main/java/com/exactpro/th2/FixHandler.java +++ b/src/main/java/com/exactpro/th2/FixHandler.java @@ -16,27 +16,27 @@ package com.exactpro.th2; +import com.exactpro.th2.common.event.Event; +import com.exactpro.th2.common.grpc.EventID; +import com.exactpro.th2.common.grpc.Direction; import com.exactpro.th2.common.grpc.MessageID; import com.exactpro.th2.common.grpc.RawMessage; +import com.exactpro.th2.common.utils.event.transport.EventUtilsKt; import com.exactpro.th2.conn.dirty.fix.FixField; +import com.exactpro.th2.conn.dirty.fix.MessageLoader; +import com.exactpro.th2.conn.dirty.tcp.core.SendingTimeoutHandler; import com.exactpro.th2.conn.dirty.tcp.core.api.IChannel; import com.exactpro.th2.conn.dirty.tcp.core.api.IChannel.SendMode; import com.exactpro.th2.conn.dirty.tcp.core.api.IHandler; import com.exactpro.th2.conn.dirty.tcp.core.api.IHandlerContext; import com.exactpro.th2.conn.dirty.tcp.core.util.CommonUtil; -import com.exactpro.th2.util.Util; +import com.exactpro.th2.dataprovider.lw.grpc.DataProviderService; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; -import org.apache.commons.lang3.StringUtils; -import org.apache.commons.lang3.exception.ExceptionUtils; -import org.jetbrains.annotations.NotNull; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; import java.net.InetSocketAddress; import java.nio.charset.StandardCharsets; import java.nio.file.Paths; +import java.time.LocalDate; import java.time.LocalDateTime; import java.time.LocalTime; import java.time.ZoneOffset; @@ -48,12 +48,24 @@ import java.util.Map; import java.util.Objects; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReentrantLock; + +import kotlin.jvm.functions.Function1; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import static com.exactpro.th2.conn.dirty.fix.FixByteBufUtilKt.findField; import static com.exactpro.th2.conn.dirty.fix.FixByteBufUtilKt.findLastField; @@ -81,6 +93,7 @@ import static com.exactpro.th2.constants.Constants.GAP_FILL_FLAG_TAG; import static com.exactpro.th2.constants.Constants.HEART_BT_INT; import static com.exactpro.th2.constants.Constants.IS_POSS_DUP; +import static com.exactpro.th2.constants.Constants.IS_SEQUENCE_RESET_FLAG; import static com.exactpro.th2.constants.Constants.MSG_SEQ_NUM; import static com.exactpro.th2.constants.Constants.MSG_SEQ_NUM_TAG; import static com.exactpro.th2.constants.Constants.MSG_TYPE; @@ -97,9 +110,13 @@ import static com.exactpro.th2.constants.Constants.NEW_SEQ_NO_TAG; import static com.exactpro.th2.constants.Constants.NEXT_EXPECTED_SEQ_NUM; import static com.exactpro.th2.constants.Constants.NEXT_EXPECTED_SEQ_NUMBER_TAG; +import static com.exactpro.th2.constants.Constants.ORIG_SENDING_TIME; +import static com.exactpro.th2.constants.Constants.ORIG_SENDING_TIME_TAG; import static com.exactpro.th2.constants.Constants.PASSWORD; +import static com.exactpro.th2.constants.Constants.POSS_DUP; import static com.exactpro.th2.constants.Constants.POSS_DUP_TAG; import static com.exactpro.th2.constants.Constants.RESET_SEQ_NUM; +import static com.exactpro.th2.constants.Constants.RESET_SEQ_NUM_TAG; import static com.exactpro.th2.constants.Constants.SENDER_COMP_ID; import static com.exactpro.th2.constants.Constants.SENDER_COMP_ID_TAG; import static com.exactpro.th2.constants.Constants.SENDER_SUB_ID; @@ -107,11 +124,11 @@ import static com.exactpro.th2.constants.Constants.SENDING_TIME; import static com.exactpro.th2.constants.Constants.SENDING_TIME_TAG; import static com.exactpro.th2.constants.Constants.SESSION_STATUS_TAG; -import static com.exactpro.th2.constants.Constants.SUCCESSFUL_LOGOUT_CODE; import static com.exactpro.th2.constants.Constants.TARGET_COMP_ID; import static com.exactpro.th2.constants.Constants.TARGET_COMP_ID_TAG; import static com.exactpro.th2.constants.Constants.TEST_REQ_ID; import static com.exactpro.th2.constants.Constants.TEST_REQ_ID_TAG; +import static com.exactpro.th2.constants.Constants.TEXT; import static com.exactpro.th2.constants.Constants.TEXT_TAG; import static com.exactpro.th2.constants.Constants.USERNAME; import static com.exactpro.th2.netty.bytebuf.util.ByteBufUtil.indexOf; @@ -133,11 +150,11 @@ public class FixHandler implements AutoCloseable, IHandler { private static final byte BYTE_SOH = 1; private static final String STRING_MSG_TYPE = "MsgType"; private static final String REJECT_REASON = "Reject reason"; + private static final String UNGRACEFUL_DISCONNECT_PROPERTY = "ungracefulDisconnect"; private static final String STUBBING_VALUE = "XXX"; - private final Log outgoingMessages = new Log(10000); - private final AtomicInteger msgSeqNum; - private final AtomicInteger serverMsgSeqNum; + private final AtomicInteger msgSeqNum = new AtomicInteger(0); + private final AtomicInteger serverMsgSeqNum = new AtomicInteger(0); private final AtomicInteger testReqID = new AtomicInteger(0); private final AtomicBoolean sessionActive = new AtomicBoolean(true); private final AtomicBoolean enabled = new AtomicBoolean(false); @@ -145,28 +162,30 @@ public class FixHandler implements AutoCloseable, IHandler { private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1); private final IHandlerContext context; private final InetSocketAddress address; + private final MessageLoader messageLoader; + private final ReentrantLock recoveryLock = new ReentrantLock(); + + private final AtomicReference> heartbeatTimer = new AtomicReference<>(CompletableFuture.completedFuture(null)); + private final AtomicReference> testRequestTimer = new AtomicReference<>(CompletableFuture.completedFuture(null)); - private Future heartbeatTimer = CompletableFuture.completedFuture(null); - private Future testRequestTimer = CompletableFuture.completedFuture(null); + private final SendingTimeoutHandler sendingTimeoutHandler; private Future reconnectRequestTimer = CompletableFuture.completedFuture(null); private volatile IChannel channel; protected FixHandlerSettings settings; - private long lastSendTime = System.currentTimeMillis(); public FixHandler(IHandlerContext context) { this.context = context; this.settings = (FixHandlerSettings) context.getSettings(); - if(settings.getStateFilePath() == null || !settings.getStateFilePath().exists()) { - msgSeqNum = new AtomicInteger(0); - serverMsgSeqNum = new AtomicInteger(0); + if(settings.isLoadSequencesFromCradle() || settings.isLoadMissedMessagesFromCradle()) { + this.messageLoader = new MessageLoader( + context.getGrpcService(DataProviderService.class), + settings.getSessionStartTime(), + context.getBookName() + ); } else { - SequenceHolder sequences = Util.readSequences(settings.getStateFilePath()); - msgSeqNum = new AtomicInteger(sequences.getClientSeq()); - serverMsgSeqNum = new AtomicInteger(sequences.getServerSeq()); + this.messageLoader = null; } - LOGGER.info("Initial sequences are: client - {}, server - {}", msgSeqNum.get(), serverMsgSeqNum.get()); - if(settings.getSessionStartTime() != null) { Objects.requireNonNull(settings.getSessionEndTime(), "Session end is required when session start is presented"); LocalTime resetTime = settings.getSessionStartTime(); @@ -187,15 +206,25 @@ public FixHandler(IHandlerContext context) { if(scheduleTime.isBefore(now)) { scheduleTime = now.plusDays(1).with(resetTime); - } else if(now.isBefore(now.with(settings.getSessionStartTime()))) { - sessionActive.set(false); } long time = now.until(scheduleTime, ChronoUnit.SECONDS); executorService.scheduleAtFixedRate(() -> { - this.close(); + sendLogout(); + waitLogoutResponse(); + channel.close(); sessionActive.set(false); }, time, DAY_SECONDS, TimeUnit.SECONDS); + + LocalDate today = LocalDate.now(ZoneOffset.UTC); + + LocalDateTime start = settings.getSessionStartTime().atDate(today); + LocalDateTime end = settings.getSessionEndTime().atDate(today); + + LocalDateTime nowDateTime = LocalDateTime.now(ZoneOffset.UTC); + if(nowDateTime.isAfter(end) && nowDateTime.isBefore(start)) { + sessionActive.set(false); + } } String host = settings.getHost(); @@ -210,33 +239,111 @@ public FixHandler(IHandlerContext context) { if (settings.getHeartBtInt() <= 0) throw new IllegalArgumentException("HeartBtInt cannot be negative or zero"); if (settings.getTestRequestDelay() <= 0) throw new IllegalArgumentException("TestRequestDelay cannot be negative or zero"); if (settings.getDisconnectRequestDelay() <= 0) throw new IllegalArgumentException("DisconnectRequestDelay cannot be negative or zero"); + this.sendingTimeoutHandler = SendingTimeoutHandler.create( + settings.getMinConnectionTimeoutOnSend(), + settings.getConnectionTimeoutOnSend(), + context::send + ); } @Override public void onStart() { channel = context.createChannel(address, settings.getSecurity(), Map.of(), true, settings.getReconnectDelay() * 1000L, Integer.MAX_VALUE); + if(settings.isLoadSequencesFromCradle()) { + SequenceHolder sequences = messageLoader.loadInitialSequences(channel.getSessionGroup(), channel.getSessionAlias()); + info("Loaded sequences are: client - %d, server - %d", sequences.getClientSeq(), sequences.getServerSeq()); + msgSeqNum.set(sequences.getClientSeq()); + serverMsgSeqNum.set(sequences.getServerSeq()); + } + // This method returns CompletableFuture, but we don't handle it + // Probably, this is because we don't care in the current moment + // whether we are connected or not - just initial trigger for connection channel.open(); } @NotNull - @Override - public CompletableFuture send(@NotNull RawMessage rawMessage) { + private CompletableFuture send(@NotNull ByteBuf body, @NotNull Map properties, @Nullable EventID eventID) { if (!sessionActive.get()) { throw new IllegalStateException("Session is not active. It is not possible to send messages."); } + + FixField msgType = findField(body, MSG_TYPE_TAG); + boolean isLogout = msgType != null && Objects.equals(msgType.getValue(), MSG_TYPE_LOGOUT); + if(isLogout && !channel.isOpen()) { + String message = String.format("%s - %s: Logout ignored as channel is already closed.", channel.getSessionGroup(), channel.getSessionAlias()); + LOGGER.warn(message); + context.send(CommonUtil.toEvent(message)); + return CompletableFuture.completedFuture(null); + } + + boolean isUngracefulDisconnect = Boolean.parseBoolean(properties.get(UNGRACEFUL_DISCONNECT_PROPERTY)); + if(isLogout) { + context.send(CommonUtil.toEvent(String.format("Closing session %s. Is graceful disconnect: %b", channel.getSessionAlias(), !isUngracefulDisconnect))); + try { + disconnect(!isUngracefulDisconnect); + enabled.set(false); + sendingTimeoutHandler.getWithTimeout(channel.open()); + } catch (Exception e) { + context.send(CommonUtil.toErrorEvent(String.format("Error while ending session %s by user logout. Is graceful disconnect: %b", channel.getSessionAlias(), !isUngracefulDisconnect), e)); + } + return CompletableFuture.completedFuture(null); + } + + // TODO: probably, this should be moved to the core part + // But those changes will break API + // So, let's keep it here for now + long deadline = sendingTimeoutHandler.getDeadline(); + long currentTimeout = sendingTimeoutHandler.getCurrentTimeout(); + if (!channel.isOpen()) { try { - channel.open().get(); + sendingTimeoutHandler.getWithTimeout(channel.open()); + } catch (TimeoutException e) { + ExceptionUtils.rethrow(new TimeoutException( + String.format("could not open connection before timeout %d mls elapsed", + currentTimeout))); } catch (Exception e) { ExceptionUtils.rethrow(e); } } - return channel.send(toByteBuf(rawMessage.getBody()), rawMessage.getMetadata().getPropertiesMap(), getEventId(rawMessage), SendMode.HANDLE_AND_MANGLE); + while (channel.isOpen() && !enabled.get()) { + warn("Session is not yet logged in"); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + error("Error while sleeping.", null); + } + if (System.currentTimeMillis() > deadline) { + // The method should have checked exception in signature... + ExceptionUtils.rethrow(new TimeoutException(String.format("session was not established within %d mls", + currentTimeout))); + } + } + + recoveryLock.lock(); + try { + return channel.send(body, properties, eventID, SendMode.HANDLE_AND_MANGLE); + } finally { + recoveryLock.unlock(); + } + } + + @NotNull + @Override + public CompletableFuture send(@NotNull RawMessage rawMessage) { + return send(toByteBuf(rawMessage.getBody()), rawMessage.getMetadata().getPropertiesMap(), getEventId(rawMessage)); + } + + @NotNull + @Override + public CompletableFuture send(@NotNull com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.RawMessage message) { + final var id = message.getEventId(); + return send(message.getBody(), message.getMetadata(), id != null ? EventUtilsKt.toProto(id) : null); } @Override - public ByteBuf onReceive(IChannel channel, ByteBuf buffer) { + public ByteBuf onReceive(@NotNull IChannel channel, ByteBuf buffer) { int offset = buffer.readerIndex(); if (offset == buffer.writerIndex()) return null; @@ -286,30 +393,50 @@ public Map onIncoming(@NotNull IChannel channel, @NotNull ByteBu FixField msgSeqNumValue = findField(message, MSG_SEQ_NUM_TAG); if (msgSeqNumValue == null) { metadata.put(REJECT_REASON, "No msgSeqNum Field"); - if (LOGGER.isErrorEnabled()) LOGGER.error("Invalid message. No MsgSeqNum in message: {}", message.toString(US_ASCII)); + if(LOGGER.isErrorEnabled()) error("Invalid message. No MsgSeqNum in message: %s", null, message.toString(US_ASCII)); return metadata; } FixField msgType = findField(message, MSG_TYPE_TAG); if (msgType == null) { metadata.put(REJECT_REASON, "No msgType Field"); - if (LOGGER.isErrorEnabled()) LOGGER.error("Invalid message. No MsgType in message: {}", message.toString(US_ASCII)); + if(LOGGER.isErrorEnabled()) error("Invalid message. No MsgType in message: %s", null, message.toString(US_ASCII)); return metadata; } FixField possDup = findField(message, POSS_DUP_TAG); boolean isDup = false; if(possDup != null) { - isDup = possDup.getValue().equals(IS_POSS_DUP); + isDup = Objects.equals(possDup.getValue(), IS_POSS_DUP); + } + + String msgTypeValue = requireNonNull(msgType.getValue()); + if(msgTypeValue.equals(MSG_TYPE_LOGOUT)) { + serverMsgSeqNum.incrementAndGet(); + handleLogout(message); + return metadata; } int receivedMsgSeqNum = Integer.parseInt(requireNonNull(msgSeqNumValue.getValue())); + if(msgTypeValue.equals(MSG_TYPE_LOGON) && receivedMsgSeqNum < serverMsgSeqNum.get()) { + FixField resetSeqNumFlagField = findField(message, RESET_SEQ_NUM_TAG); + if(resetSeqNumFlagField != null && Objects.equals(resetSeqNumFlagField.getValue(), IS_SEQUENCE_RESET_FLAG)) { + serverMsgSeqNum.set(0); + } + } + if(receivedMsgSeqNum < serverMsgSeqNum.get() && !isDup) { - sendLogout(); - reconnectRequestTimer = executorService.schedule(this::sendLogon, settings.getReconnectDelay(), TimeUnit.SECONDS); + if(settings.isLogoutOnIncorrectServerSequence()) { + context.send(CommonUtil.toEvent(String.format("Received server sequence %d but expected %d. Sending logout with text: MsgSeqNum is too low...", receivedMsgSeqNum, serverMsgSeqNum.get()))); + sendLogout(String.format("MsgSeqNum too low, expecting %d but received %d", serverMsgSeqNum.get() + 1, receivedMsgSeqNum)); + reconnectRequestTimer = executorService.schedule(this::sendLogon, settings.getReconnectDelay(), TimeUnit.SECONDS); + if (LOGGER.isErrorEnabled()) error("Invalid message. SeqNum is less than expected %d: %s", null, serverMsgSeqNum.get(), message.toString(US_ASCII)); + } else { + context.send(CommonUtil.toEvent(String.format("Received server sequence %d but expected %d. Correcting server sequence.", receivedMsgSeqNum, serverMsgSeqNum.get() + 1))); + serverMsgSeqNum.set(receivedMsgSeqNum - 1); + } metadata.put(REJECT_REASON, "SeqNum is less than expected."); - if (LOGGER.isErrorEnabled()) LOGGER.error("Invalid message. SeqNum is less than expected {}: {}", serverMsgSeqNum.get(), message.toString(US_ASCII)); return metadata; } @@ -319,30 +446,38 @@ public Map onIncoming(@NotNull IChannel channel, @NotNull ByteBu sendResendRequest(serverMsgSeqNum.get(), receivedMsgSeqNum); } - String msgTypeValue = requireNonNull(msgType.getValue()); + switch (msgTypeValue) { case MSG_TYPE_HEARTBEAT: - if (LOGGER.isInfoEnabled()) LOGGER.info("Heartbeat received - {}", message.toString(US_ASCII)); + if(LOGGER.isInfoEnabled()) info("Heartbeat received - %s", message.toString(US_ASCII)); checkHeartbeat(message); - testRequestTimer = executorService.schedule(this::sendTestRequest, settings.getTestRequestDelay(), TimeUnit.SECONDS); break; case MSG_TYPE_LOGON: - if (LOGGER.isInfoEnabled()) LOGGER.info("Logon received - {}", message.toString(US_ASCII)); + if(LOGGER.isInfoEnabled()) info("Logon received - %s", message.toString(US_ASCII)); boolean connectionSuccessful = checkLogon(message); if (connectionSuccessful) { if(settings.useNextExpectedSeqNum()) { FixField nextExpectedSeqField = findField(message, NEXT_EXPECTED_SEQ_NUMBER_TAG); if(nextExpectedSeqField == null) { metadata.put(REJECT_REASON, "No NextExpectedSeqNum field"); - if (LOGGER.isErrorEnabled()) LOGGER.error("Invalid message. No NextExpectedSeqNum in message: {}", message.toString(US_ASCII)); + if(LOGGER.isErrorEnabled()) error("Invalid message. No NextExpectedSeqNum in message: %s", null, message.toString(US_ASCII)); return metadata; } int nextExpectedSeqNumber = Integer.parseInt(requireNonNull(nextExpectedSeqField.getValue())); - int seqNum = msgSeqNum.get(); + int seqNum = msgSeqNum.incrementAndGet() + 1; if(nextExpectedSeqNumber < seqNum) { recovery(nextExpectedSeqNumber, seqNum); + } else if (nextExpectedSeqNumber > seqNum) { + context.send( + Event.start() + .name(String.format("Corrected next client seq num from %s to %s", seqNum, nextExpectedSeqNumber)) + .type("Logon") + ); + msgSeqNum.set(nextExpectedSeqNumber - 1); } + } else { + msgSeqNum.incrementAndGet(); } enabled.set(true); @@ -351,79 +486,87 @@ public Map onIncoming(@NotNull IChannel channel, @NotNull ByteBu connStarted.set(true); } - if (heartbeatTimer != null) { - heartbeatTimer.cancel(false); - } - heartbeatTimer = executorService.scheduleWithFixedDelay(this::sendHeartbeat, 1, 1, TimeUnit.SECONDS); + resetHeartbeatTask(); - testRequestTimer = executorService.schedule(this::sendTestRequest, settings.getTestRequestDelay(), TimeUnit.SECONDS); + resetTestRequestTask(); } else { enabled.set(false); reconnectRequestTimer = executorService.schedule(this::sendLogon, settings.getReconnectDelay(), TimeUnit.SECONDS); } break; - case MSG_TYPE_LOGOUT: //extract logout reason - if (LOGGER.isInfoEnabled()) LOGGER.info("Logout received - {}", message.toString(US_ASCII)); - FixField sessionStatus = findField(message, SESSION_STATUS_TAG); - - if(sessionStatus != null) { - int statusCode = Integer.parseInt(Objects.requireNonNull(sessionStatus.getValue())); - if(statusCode != SUCCESSFUL_LOGOUT_CODE) { - FixField text = findField(message, TEXT_TAG); - if (text != null) { - LOGGER.warn("Received Logout has text (58) tag: {}", text.getValue()); - String value = StringUtils.substringBetween(text.getValue(), "expecting ", " but received"); - if (value != null) { - msgSeqNum.set(Integer.parseInt(value) - 1); - } else { - msgSeqNum.set(msgSeqNum.get() - 1); - } - } else { - msgSeqNum.set(msgSeqNum.get() - 1); - } - serverMsgSeqNum.set(Integer.parseInt(msgSeqNumValue.getValue()) - 1); - } - } - if (heartbeatTimer != null) { - heartbeatTimer.cancel(false); - } - if (testRequestTimer != null) { - testRequestTimer.cancel(false); - } - enabled.set(false); - context.send(CommonUtil.toEvent("logout for sender - " + settings.getSenderCompID()));//make more useful - break; + //extract logout reason case MSG_TYPE_RESEND_REQUEST: - if (LOGGER.isInfoEnabled()) LOGGER.info("Resend request received - {}", message.toString(US_ASCII)); + if(LOGGER.isInfoEnabled()) info("Resend request received - %s", message.toString(US_ASCII)); handleResendRequest(message); break; case MSG_TYPE_SEQUENCE_RESET: //gap fill - if (LOGGER.isInfoEnabled()) LOGGER.info("Sequence reset received - {}", message.toString(US_ASCII)); + if(LOGGER.isInfoEnabled()) info("Sequence reset received - %s", message.toString(US_ASCII)); resetSequence(message); break; + case MSG_TYPE_TEST_REQUEST: + if (LOGGER.isInfoEnabled()) LOGGER.info("Test request received - {}", message.toString(US_ASCII)); + handleTestRequest(message, metadata); + break; } - if (testRequestTimer != null && !testRequestTimer.isCancelled()) { - testRequestTimer.cancel(false); - } + resetTestRequestTask(); metadata.put(STRING_MSG_TYPE, msgTypeValue); return metadata; } + private Map handleTestRequest(ByteBuf message, Map metadata) { + FixField testReqId = findField(message, TEST_REQ_ID_TAG); + if(testReqId == null || testReqId.getValue() == null) { + metadata.put(REJECT_REASON, "Test Request message hasn't got TestReqId field."); + return metadata; + } + + sendHeartbeatTestReqId(testReqId.getValue()); + + return null; + } + + private void handleLogout(@NotNull ByteBuf message) { + if(LOGGER.isInfoEnabled()) info("Logout received - %s", message.toString(US_ASCII)); + boolean isSequenceChanged = false; + FixField text = findField(message, TEXT_TAG); + if (text != null) { + LOGGER.warn("Received Logout has text (58) tag: {}", text.getValue()); + String wrongClientSequence = StringUtils.substringBetween(text.getValue(), "expecting ", " but received"); + if (wrongClientSequence != null) { + msgSeqNum.set(Integer.parseInt(wrongClientSequence) - 1); + isSequenceChanged = true; + } + String wrongClientNextExpectedSequence = StringUtils.substringBetween(text.getValue(), "MSN to be sent is ", " but received"); + if(wrongClientNextExpectedSequence != null && settings.getResetStateOnServerReset()) { + serverMsgSeqNum.set(Integer.parseInt(wrongClientNextExpectedSequence)); + } + } + + if(!enabled.get() && !isSequenceChanged) { + msgSeqNum.incrementAndGet(); + } + + cancelFuture(heartbeatTimer); + cancelFuture(testRequestTimer); + enabled.set(false); + context.send(CommonUtil.toEvent("logout for sender - " + settings.getSenderCompID()));//make more useful + } + private void resetSequence(ByteBuf message) { FixField gapFillMode = findField(message, GAP_FILL_FLAG_TAG); FixField seqNumValue = findField(message, NEW_SEQ_NO_TAG); if(seqNumValue != null) { - if(gapFillMode == null || gapFillMode.equals("N")) { + if(gapFillMode == null || "N".equals(gapFillMode.getValue())) { serverMsgSeqNum.set(Integer.parseInt(requireNonNull(seqNumValue.getValue()))); } else { serverMsgSeqNum.set(Integer.parseInt(requireNonNull(seqNumValue.getValue())) - 1); } } else { - LOGGER.trace("Failed to reset servers MsgSeqNum. No such tag in message: {}", message.toString(US_ASCII)); + if(LOGGER.isWarnEnabled()) warn("Failed to reset servers MsgSeqNum. No such tag in message: %s", message.toString(US_ASCII)); } } @@ -431,29 +574,32 @@ private void reset() { msgSeqNum.set(0); serverMsgSeqNum.set(0); sessionActive.set(true); - sendLogon(); + if(messageLoader != null) { + messageLoader.updateTime(); + } + channel.open(); } public void sendResendRequest(int beginSeqNo, int endSeqNo) { //do private - lastSendTime = System.currentTimeMillis(); StringBuilder resendRequest = new StringBuilder(); - setHeader(resendRequest, MSG_TYPE_RESEND_REQUEST, msgSeqNum.incrementAndGet()); + setHeader(resendRequest, MSG_TYPE_RESEND_REQUEST, msgSeqNum.incrementAndGet(), null); resendRequest.append(BEGIN_SEQ_NO).append(beginSeqNo).append(SOH); resendRequest.append(END_SEQ_NO).append(endSeqNo).append(SOH); setChecksumAndBodyLength(resendRequest); - channel.send(Unpooled.wrappedBuffer(resendRequest.toString().getBytes(StandardCharsets.UTF_8)), Collections.emptyMap(), null, IChannel.SendMode.MANGLE); + channel.send(Unpooled.wrappedBuffer(resendRequest.toString().getBytes(StandardCharsets.UTF_8)), createMetadataMap(), null, IChannel.SendMode.MANGLE); + resetHeartbeatTask(); } void sendResendRequest(int beginSeqNo) { //do private - lastSendTime = System.currentTimeMillis(); StringBuilder resendRequest = new StringBuilder(); - setHeader(resendRequest, MSG_TYPE_RESEND_REQUEST, msgSeqNum.incrementAndGet()); + setHeader(resendRequest, MSG_TYPE_RESEND_REQUEST, msgSeqNum.incrementAndGet(), null); resendRequest.append(BEGIN_SEQ_NO).append(beginSeqNo); resendRequest.append(END_SEQ_NO).append(0); setChecksumAndBodyLength(resendRequest); if (enabled.get()) { - channel.send(Unpooled.wrappedBuffer(resendRequest.toString().getBytes(StandardCharsets.UTF_8)), Collections.emptyMap(), null, IChannel.SendMode.MANGLE); + channel.send(Unpooled.wrappedBuffer(resendRequest.toString().getBytes(StandardCharsets.UTF_8)), createMetadataMap(), null, IChannel.SendMode.MANGLE); + resetHeartbeatTask(); } else { sendLogon(); } @@ -478,37 +624,100 @@ private void handleResendRequest(ByteBuf message) { } private void recovery(int beginSeqNo, int endSeqNo) { - if (endSeqNo == 0) { - endSeqNo = msgSeqNum.get(); - } - LOGGER.info("Returning messages from {} to {}", beginSeqNo, endSeqNo); - for (int i = beginSeqNo; i <= endSeqNo; i++) { - ByteBuf storedMsg = outgoingMessages.get(i); - if (storedMsg == null) { - StringBuilder sequenceReset = new StringBuilder(); - setHeader(sequenceReset, MSG_TYPE_SEQUENCE_RESET, i); - sequenceReset.append(GAP_FILL_FLAG).append("Y"); - sequenceReset.append(NEW_SEQ_NO).append(i + 1); - setChecksumAndBodyLength(sequenceReset); - channel.send(Unpooled.wrappedBuffer(sequenceReset.toString().getBytes(StandardCharsets.UTF_8)), Collections.emptyMap(), null, SendMode.MANGLE); + AtomicInteger lastProcessedSequence = new AtomicInteger(beginSeqNo - 1); + try { + recoveryLock.lock(); + + if (endSeqNo == 0) { + endSeqNo = msgSeqNum.get() + 1; + } + + int endSeq = endSeqNo; + info("Loading messages from %d to %d", beginSeqNo, endSeqNo); + if(settings.isLoadMissedMessagesFromCradle()) { + Function1 processMessage = (buf) -> { + FixField seqNum = findField(buf, MSG_SEQ_NUM_TAG); + FixField msgTypeField = findField(buf, MSG_TYPE_TAG); + if(seqNum == null || seqNum.getValue() == null + || msgTypeField == null || msgTypeField.getValue() == null) { + return true; + } + int sequence = Integer.parseInt(seqNum.getValue()); + String msgType = msgTypeField.getValue(); + + if(sequence < beginSeqNo) return true; + if(sequence > endSeq) return false; + + if(ADMIN_MESSAGES.contains(msgType)) return true; + FixField possDup = findField(buf, POSS_DUP_TAG); + if(possDup != null && Objects.equals(possDup.getValue(), IS_POSS_DUP)) return true; + + if(sequence - 1 != lastProcessedSequence.get() ) { + StringBuilder sequenceReset = + createSequenceReset(Math.max(beginSeqNo, lastProcessedSequence.get() + 1), sequence); + channel.send(Unpooled.wrappedBuffer(sequenceReset.toString().getBytes(StandardCharsets.UTF_8)), createMetadataMap(), null, SendMode.MANGLE); + resetHeartbeatTask(); + } + + setTime(buf); + setPossDup(buf); + updateLength(buf); + updateChecksum(buf); + channel.send(buf, createMetadataMap(), null, SendMode.MANGLE); + + resetHeartbeatTask(); + + lastProcessedSequence.set(sequence); + return true; + }; + + messageLoader.processMessagesInRange( + channel.getSessionGroup(), channel.getSessionAlias(), Direction.SECOND, + beginSeqNo, + processMessage + ); + + if(lastProcessedSequence.get() < endSeq) { + String seqReset = createSequenceReset(Math.max(lastProcessedSequence.get() + 1, beginSeqNo), msgSeqNum.get() + 1).toString(); + channel.send( + Unpooled.wrappedBuffer(seqReset.getBytes(StandardCharsets.UTF_8)), + createMetadataMap(), null, SendMode.MANGLE + ); + } } else { - if (LOGGER.isInfoEnabled()) LOGGER.info("Resending message: {}", storedMsg.toString(US_ASCII)); - FixField sendingTime = findField(storedMsg, SENDING_TIME_TAG); - sendingTime.insertNext(POSS_DUP_TAG, IS_POSS_DUP); - channel.send(storedMsg, Collections.emptyMap(), null, SendMode.MANGLE); + String seqReset = + createSequenceReset(beginSeqNo, msgSeqNum.get() + 1).toString(); + channel.send( + Unpooled.wrappedBuffer(seqReset.getBytes(StandardCharsets.UTF_8)), + createMetadataMap(), null, SendMode.MANGLE + ); } + resetHeartbeatTask(); + + } catch (Exception e) { + error("Error while loading messages for recovery", e); + String seqReset = + createSequenceReset(Math.max(beginSeqNo, lastProcessedSequence.get() + 1), msgSeqNum.get() + 1).toString(); + channel.send( + Unpooled.buffer().writeBytes(seqReset.getBytes(StandardCharsets.UTF_8)), + createMetadataMap(), null, SendMode.MANGLE + ); + } finally { + recoveryLock.unlock(); } } private void sendSequenceReset() { - lastSendTime = System.currentTimeMillis(); StringBuilder sequenceReset = new StringBuilder(); - setHeader(sequenceReset, MSG_TYPE_SEQUENCE_RESET, msgSeqNum.incrementAndGet()); + String time = getTime(); + setHeader(sequenceReset, MSG_TYPE_SEQUENCE_RESET, msgSeqNum.incrementAndGet(), time); + sequenceReset.append(ORIG_SENDING_TIME).append(time); sequenceReset.append(NEW_SEQ_NO).append(msgSeqNum.get() + 1); setChecksumAndBodyLength(sequenceReset); if (enabled.get()) { - channel.send(Unpooled.wrappedBuffer(sequenceReset.toString().getBytes(StandardCharsets.UTF_8)), Collections.emptyMap(), null, IChannel.SendMode.MANGLE); + channel.send(Unpooled.wrappedBuffer(sequenceReset.toString().getBytes(StandardCharsets.UTF_8)), createMetadataMap(), null, IChannel.SendMode.MANGLE); + resetHeartbeatTask(); } else { sendLogon(); } @@ -541,12 +750,11 @@ private boolean checkLogon(ByteBuf message) { @Override public void onOutgoing(@NotNull IChannel channel, @NotNull ByteBuf message, @NotNull Map metadata) { - lastSendTime = System.currentTimeMillis(); onOutgoingUpdateTag(message, metadata); - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Outgoing message: {}", message.toString(US_ASCII)); - } + if(LOGGER.isDebugEnabled()) debug("Outgoing message: %s", message.toString(US_ASCII)); + + if(enabled.get()) resetHeartbeatTask(); } public void onOutgoingUpdateTag(@NotNull ByteBuf message, @NotNull Map metadata) { @@ -557,7 +765,8 @@ public void onOutgoingUpdateTag(@NotNull ByteBuf message, @NotNull Map "First filed isn't found in message: " + message.toString(US_ASCII)) + .insertPrevious(BEGIN_STRING_TAG, settings.getBeginString()); } FixField bodyLength = findField(message, BODY_LENGTH_TAG, US_ASCII, beginString); @@ -569,9 +778,7 @@ public void onOutgoingUpdateTag(@NotNull ByteBuf message, @NotNull Map> future) { + future.get().cancel(false); + } + + private void info(String message, Object... args) { + if(LOGGER.isInfoEnabled()) { + LOGGER.info("{} - {}: {}", channel.getSessionGroup(), channel.getSessionAlias(), String.format(message, args)); + } + } + + private void error(String message, Throwable throwable, Object... args) { + if(LOGGER.isErrorEnabled()) { + LOGGER.error("{} - {}: {}", channel.getSessionGroup(), channel.getSessionAlias(), String.format(message, args), throwable); + } + } + + private void warn(String message, Object... args) { + if(LOGGER.isWarnEnabled()) { + LOGGER.warn("{} - {}: {}", channel.getSessionGroup(), channel.getSessionAlias(), String.format(message, args)); + } + } + + private void debug(String message, Object... args) { + if(LOGGER.isDebugEnabled()) { + LOGGER.debug("{} - {}: {}", channel.getSessionGroup(), channel.getSessionAlias(), String.format(message, args)); + } + } + + private Map createMetadataMap() { + return new HashMap<>(2); } } \ No newline at end of file diff --git a/src/main/java/com/exactpro/th2/FixHandlerSettings.java b/src/main/java/com/exactpro/th2/FixHandlerSettings.java index 673065f..4e7e0a2 100644 --- a/src/main/java/com/exactpro/th2/FixHandlerSettings.java +++ b/src/main/java/com/exactpro/th2/FixHandlerSettings.java @@ -19,13 +19,15 @@ import com.exactpro.th2.conn.dirty.fix.KeyFileType; import com.exactpro.th2.conn.dirty.tcp.core.api.IChannel.Security; import com.exactpro.th2.conn.dirty.tcp.core.api.IHandlerSettings; +import com.exactpro.th2.util.DateTimeFormatterDeserializer; import com.exactpro.th2.util.LocalTimeDeserializer; import com.fasterxml.jackson.databind.annotation.JsonDeserialize; -import java.io.File; import java.time.LocalTime; +import java.time.format.DateTimeFormatter; public class FixHandlerSettings implements IHandlerSettings { + private static final int DEFAULT_CONNECTION_TIMEOUT_ON_SEND = 30_000; private String host = null; private int port = 0; private Security security = new Security(); @@ -41,8 +43,6 @@ public class FixHandlerSettings implements IHandlerSettings { private String newPassword; private String passwordEncryptKeyFilePath; private KeyFileType passwordEncryptKeyFileType = KeyFileType.PEM_PUBLIC_KEY; - - private File stateFilePath; /** * Value from Java Security Standard Algorithm Names */ @@ -54,7 +54,10 @@ public class FixHandlerSettings implements IHandlerSettings { private Boolean resetSeqNumFlag = false; private Boolean resetOnLogon = false; private Boolean useNextExpectedSeqNum = false; - private Boolean saveAdminMessages = false; + private Boolean loadSequencesFromCradle = false; + private Boolean loadMissedMessagesFromCradle = false; + private Boolean resetStateOnServerReset = false; + private Boolean logoutOnIncorrectServerSequence = false; @JsonDeserialize(using = LocalTimeDeserializer.class) private LocalTime sessionStartTime; @@ -66,6 +69,25 @@ public class FixHandlerSettings implements IHandlerSettings { private int reconnectDelay = 5; private int disconnectRequestDelay = 5; + /** + * Timeout in milliseconds during which the connection should be opened and session is logged in. + * Otherwise, the send operation will be interrupted + */ + private long connectionTimeoutOnSend = DEFAULT_CONNECTION_TIMEOUT_ON_SEND; + + private long minConnectionTimeoutOnSend = 1_000; + + @JsonDeserialize(using = DateTimeFormatterDeserializer.class) + private DateTimeFormatter sendingDateTimeFormat = DateTimeFormatter.ofPattern("yyyyMMdd-HH:mm:ss.SSSSSSSSS"); + + public DateTimeFormatter getSendingDateTimeFormat() { + return this.sendingDateTimeFormat; + } + + public void setSendingDateTimeFormat(DateTimeFormatter sendingDateTimeFormat) { + this.sendingDateTimeFormat = sendingDateTimeFormat; + } + public String getHost() { return host; } @@ -214,12 +236,28 @@ public void setPasswordEncryptAlgorithm(String passwordEncryptAlgorithm) { this.passwordEncryptAlgorithm = passwordEncryptAlgorithm; } - public File getStateFilePath() { - return stateFilePath; + public Boolean isLoadSequencesFromCradle() { + return loadSequencesFromCradle; + } + + public void setLoadSequencesFromCradle(Boolean loadSequencesFromCradle) { + this.loadSequencesFromCradle = loadSequencesFromCradle; + } + + public Boolean isLoadMissedMessagesFromCradle() { + return loadMissedMessagesFromCradle; + } + + public void setLoadMissedMessagesFromCradle(Boolean loadMissedMessagesFromCradle) { + this.loadMissedMessagesFromCradle = loadMissedMessagesFromCradle; } - public void setStateFilePath(File stateFilePath) { - this.stateFilePath = stateFilePath; + public Boolean getResetStateOnServerReset() { + return resetStateOnServerReset; + } + + public void setResetStateOnServerReset(Boolean resetStateOnServerReset) { + this.resetStateOnServerReset = resetStateOnServerReset; } public Boolean useNextExpectedSeqNum() { @@ -230,12 +268,12 @@ public void setUseNextExpectedSeqNum(Boolean useNextExpectedSeqNum) { this.useNextExpectedSeqNum = useNextExpectedSeqNum; } - public Boolean isSaveAdminMessages() { - return saveAdminMessages; + public Boolean isLogoutOnIncorrectServerSequence() { + return logoutOnIncorrectServerSequence; } - public void setSaveAdminMessages(Boolean saveAdminMessages) { - this.saveAdminMessages = saveAdminMessages; + public void setLogoutOnIncorrectServerSequence(Boolean logoutOnIncorrectServerSequence) { + this.logoutOnIncorrectServerSequence = logoutOnIncorrectServerSequence; } public LocalTime getSessionStartTime() { @@ -273,4 +311,20 @@ public int getDisconnectRequestDelay() { public void setDisconnectRequestDelay(int disconnectRequestDelay) { this.disconnectRequestDelay = disconnectRequestDelay; } + + public long getConnectionTimeoutOnSend() { + return connectionTimeoutOnSend; + } + + public void setConnectionTimeoutOnSend(long connectionTimeoutOnSend) { + this.connectionTimeoutOnSend = connectionTimeoutOnSend; + } + + public long getMinConnectionTimeoutOnSend() { + return minConnectionTimeoutOnSend; + } + + public void setMinConnectionTimeoutOnSend(long minConnectionTimeoutOnSend) { + this.minConnectionTimeoutOnSend = minConnectionTimeoutOnSend; + } } diff --git a/src/main/java/com/exactpro/th2/constants/Constants.java b/src/main/java/com/exactpro/th2/constants/Constants.java index f953b8f..de4beed 100644 --- a/src/main/java/com/exactpro/th2/constants/Constants.java +++ b/src/main/java/com/exactpro/th2/constants/Constants.java @@ -17,6 +17,7 @@ package com.exactpro.th2.constants; +import java.time.format.DateTimeFormatter; import java.util.Collections; import java.util.Set; @@ -32,6 +33,7 @@ public class Constants { public static final Integer TARGET_COMP_ID_TAG = 56; public static final Integer MSG_SEQ_NUM_TAG = 34; public static final Integer SENDING_TIME_TAG = 52; + public static final Integer ORIG_SENDING_TIME_TAG = 122; public static final Integer CHECKSUM_TAG = 10; public static final Integer DEFAULT_APPL_VER_ID_TAG = 1137; public static final Integer SENDER_SUB_ID_TAG = 50; @@ -78,7 +80,9 @@ public class Constants { public static final String SENDER_SUB_ID = SOH + SENDER_SUB_ID_TAG + "="; public static final String RESET_SEQ_NUM = SOH + RESET_SEQ_NUM_TAG + "="; public static final String NEXT_EXPECTED_SEQ_NUM = SOH + NEXT_EXPECTED_SEQ_NUMBER_TAG + "="; - public static final String POSS_DUP = SOH + NEXT_EXPECTED_SEQ_NUMBER_TAG + "="; + public static final String POSS_DUP = SOH + POSS_DUP_TAG + "="; + public static final String ORIG_SENDING_TIME = SOH + ORIG_SENDING_TIME_TAG + "="; + public static final String TEXT = SOH + TEXT_TAG + "="; //message types public static final String MSG_TYPE_LOGON = "A"; @@ -97,5 +101,6 @@ public class Constants { ); public static final String IS_POSS_DUP = "Y"; + public static final String IS_SEQUENCE_RESET_FLAG = "Y"; public static final int SUCCESSFUL_LOGOUT_CODE = 4; } diff --git a/src/main/java/com/exactpro/th2/util/DateTimeFormatterDeserializer.java b/src/main/java/com/exactpro/th2/util/DateTimeFormatterDeserializer.java new file mode 100644 index 0000000..45371b2 --- /dev/null +++ b/src/main/java/com/exactpro/th2/util/DateTimeFormatterDeserializer.java @@ -0,0 +1,36 @@ +/* + * Copyright 2023 Exactpro (Exactpro Systems Limited) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.exactpro.th2.util; + +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.deser.std.StdDeserializer; + +import java.io.IOException; +import java.time.LocalTime; +import java.time.format.DateTimeFormatter; + +public class DateTimeFormatterDeserializer extends StdDeserializer { + + public DateTimeFormatterDeserializer() { + super(DateTimeFormatter.class); + } + + @Override + public DateTimeFormatter deserialize(JsonParser parser, DeserializationContext context) throws IOException { + return DateTimeFormatter.ofPattern(parser.getValueAsString()); + } +} diff --git a/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/FixProtocolMangler.kt b/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/FixProtocolMangler.kt index e491866..31db7f3 100644 --- a/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/FixProtocolMangler.kt +++ b/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/FixProtocolMangler.kt @@ -17,6 +17,7 @@ package com.exactpro.th2.conn.dirty.fix import com.exactpro.th2.common.event.Event +import com.exactpro.th2.common.event.Event.Status.FAILED import com.exactpro.th2.common.event.Event.Status.PASSED import com.exactpro.th2.common.event.EventUtils.createMessageBean import com.exactpro.th2.common.event.bean.IRow @@ -26,7 +27,8 @@ import com.exactpro.th2.conn.dirty.tcp.core.api.IMangler import com.exactpro.th2.conn.dirty.tcp.core.api.IManglerContext import com.exactpro.th2.conn.dirty.tcp.core.api.IManglerFactory import com.exactpro.th2.conn.dirty.tcp.core.api.IManglerSettings -import com.fasterxml.jackson.dataformat.yaml.YAMLMapper +import com.fasterxml.jackson.databind.json.JsonMapper +import com.fasterxml.jackson.module.kotlin.KotlinFeature import com.fasterxml.jackson.module.kotlin.KotlinModule import com.fasterxml.jackson.module.kotlin.readValue import com.google.auto.service.AutoService @@ -36,12 +38,22 @@ import mu.KotlinLogging private val LOGGER = KotlinLogging.logger {} -private val MAPPER = YAMLMapper.builder() - .addModule(KotlinModule(nullIsSameAsDefault = true)) +private val MAPPER = JsonMapper.builder() + .addModule( + KotlinModule.Builder() + .withReflectionCacheSize(512) + .configure(KotlinFeature.NullToEmptyCollection, false) + .configure(KotlinFeature.NullToEmptyMap, false) + .configure(KotlinFeature.NullIsSameAsDefault, enabled = true) + .configure(KotlinFeature.SingletonSupport, false) + .configure(KotlinFeature.StrictNullChecks, false) + .build() + ) .build() private const val RULE_NAME_PROPERTY = "rule-name" private const val RULE_ACTIONS_PROPERTY = "rule-actions" +private const val MANGLE_EVENT_TYPE = "Mangle" class FixProtocolMangler(context: IManglerContext) : IMangler { private val rules = (context.settings as FixProtocolManglerSettings).rules @@ -49,22 +61,47 @@ class FixProtocolMangler(context: IManglerContext) : IMangler { override fun onOutgoing(channel: IChannel, message: ByteBuf, metadata: MutableMap): Event? { LOGGER.trace { "Processing message: ${message.toString(Charsets.UTF_8)}" } - val (rule, unconditionally) = getRule(message, metadata) ?: return null - val (name, results, message) = MessageTransformer.transform(message, rule, unconditionally) ?: return null + val (rule, unconditionally) = try { + getRule(message, metadata) ?: return null + } catch (e: Exception) { + return Event.start().apply { + name("Message wasn't mangled. Configuration error.") + type(MANGLE_EVENT_TYPE) + status(FAILED) + bodyData(createMessageBean("Message metadata: $metadata")) + exception(e, true) + } + } + + val (name, results, byteBuf) = MessageTransformer.transform(message, rule, unconditionally) ?: return null return Event.start().apply { - name("Message mangled") - type("Mangle") - status(PASSED) + type(MANGLE_EVENT_TYPE) + if(results.any { it.statusDesc.status == ActionStatus.FAIL }) { + name("Message was partially mangled.") + status(FAILED) + + if (metadata[RULE_ACTIONS_PROPERTY] != null) { + bodyData(createMessageBean("Action source is $RULE_ACTIONS_PROPERTY. " + + "Data: ${metadata[RULE_ACTIONS_PROPERTY]}")) + } else { + bodyData(createMessageBean("Action source is service configuration. " + + "Data: ${MAPPER.writeValueAsString(rules)}")) + } + } else { + name("Message mangled.") + status(PASSED) + } bodyData(createMessageBean("Original message:")) - bodyData(createMessageBean(ByteBufUtil.prettyHexDump(message))) + bodyData(createMessageBean(ByteBufUtil.prettyHexDump(byteBuf))) TableBuilder().run { results.forEach { result -> - row(ActionRow(name, result.tag, result.value, result.action.toString())) + row(ActionRow(name, result.tag, result.value, + result.action.toString(), result.statusDesc.status.name, + result.statusDesc.description)) } - bodyData(build()) } } @@ -72,18 +109,18 @@ class FixProtocolMangler(context: IManglerContext) : IMangler { private fun getRule(message: ByteBuf, metadata: MutableMap): Pair? { metadata[RULE_NAME_PROPERTY]?.also { name -> - val rule = rules.find { it.name == name } ?: throw IllegalArgumentException("No rule with name: $name") + val rule = rules.find { it.name == name } + ?: throw IllegalArgumentException("Invalid '$RULE_NAME_PROPERTY' value - $name. No rule with name found in configuration: $name. Searched in [ ${rules.joinToString(",") {it.name}} ]") return rule to true } metadata[RULE_ACTIONS_PROPERTY]?.also { yaml -> - val actions = try { - MAPPER.readValue>(yaml) + return try { + val actions = MAPPER.readValue>(yaml) + Rule("custom", listOf(Transform(listOf(), actions))) to true } catch (e: Exception) { - throw IllegalArgumentException("Invalid '$RULE_ACTIONS_PROPERTY' value", e) + throw IllegalArgumentException("Invalid '$RULE_ACTIONS_PROPERTY' value - $yaml", e) } - - return Rule("custom", listOf(Transform(listOf(), actions))) to true } if (rules.isEmpty()) return null @@ -117,4 +154,6 @@ private data class ActionRow( val corruptedTag: Int, val corruptedValue: String?, val corruptionDescription: String, + val status: String, + val errorDescription: String? ) : IRow \ No newline at end of file diff --git a/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/MessageLoader.kt b/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/MessageLoader.kt new file mode 100644 index 0000000..2a4977d --- /dev/null +++ b/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/MessageLoader.kt @@ -0,0 +1,282 @@ +/* + * Copyright 2023 Exactpro (Exactpro Systems Limited) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.exactpro.th2.conn.dirty.fix + +import com.exactpro.th2.SequenceHolder +import com.exactpro.th2.common.grpc.Direction +import com.exactpro.th2.common.message.toTimestamp +import com.exactpro.th2.common.util.toInstant +import com.exactpro.th2.constants.Constants.IS_POSS_DUP +import com.exactpro.th2.constants.Constants.MSG_SEQ_NUM_TAG +import com.exactpro.th2.constants.Constants.POSS_DUP_TAG +import com.exactpro.th2.dataprovider.lw.grpc.DataProviderService +import com.exactpro.th2.dataprovider.lw.grpc.MessageGroupResponse +import com.exactpro.th2.dataprovider.lw.grpc.MessageGroupsSearchRequest +import com.exactpro.th2.dataprovider.lw.grpc.MessageSearchResponse +import com.exactpro.th2.dataprovider.lw.grpc.MessageStream +import com.exactpro.th2.dataprovider.lw.grpc.TimeRelation +import com.google.protobuf.Timestamp +import com.google.protobuf.util.Timestamps.compare +import io.netty.buffer.ByteBuf +import io.netty.buffer.Unpooled +import java.time.Instant +import java.time.LocalDate +import java.time.LocalDateTime +import java.time.LocalTime +import java.time.OffsetDateTime +import java.time.OffsetTime +import java.time.ZoneId +import java.time.ZoneOffset +import java.time.ZonedDateTime +import java.util.concurrent.locks.ReentrantLock +import kotlin.concurrent.withLock +import mu.KotlinLogging + +class MessageLoader( + private val dataProvider: DataProviderService, + private val sessionStartTime: LocalTime?, + private val bookName: String +) { + private var sessionStart: ZonedDateTime + private val searchLock = ReentrantLock() + + init { + val today = LocalDate.now(ZoneOffset.UTC) + val start = sessionStartTime?.atDate(today) + val now = LocalDateTime.now(ZoneOffset.UTC) + if(start == null) { + sessionStart = OffsetDateTime + .now(ZoneOffset.UTC) + .with(LocalTime.now()) + .atZoneSameInstant(ZoneId.systemDefault()) + } else { + sessionStart = if(start.isAfter(now)) { + OffsetDateTime + .now(ZoneOffset.UTC) + .minusDays(1) + .with(sessionStartTime) + .atZoneSameInstant(ZoneId.systemDefault()) + } else { + OffsetDateTime + .now(ZoneOffset.UTC) + .with(sessionStartTime) + .atZoneSameInstant(ZoneId.systemDefault()) + } + } + } + + private var sessionStartTimestamp = sessionStart + .toInstant() + .toTimestamp() + + private var previousDaySessionStart = sessionStart + .minusDays(1) + .toInstant() + .toTimestamp() + + fun updateTime() { + searchLock.withLock { + sessionStart = ZonedDateTime + .now(ZoneOffset.UTC) + .with(OffsetTime.now(ZoneOffset.UTC)) + sessionStartTimestamp = sessionStart + .toInstant() + .toTimestamp() + previousDaySessionStart = sessionStart + .minusDays(1) + .toInstant() + .toTimestamp() + } + } + + fun loadInitialSequences(sessionGroup: String, sessionAlias: String): SequenceHolder = searchLock.withLock { + val serverSeq = searchMessage(sessionGroup, sessionAlias, Direction.FIRST, false) + val clientSeq = searchMessage(sessionGroup, sessionAlias, Direction.SECOND, true) + K_LOGGER.info { "Loaded sequences: client sequence - $clientSeq; server sequence - $serverSeq" } + return SequenceHolder(clientSeq, serverSeq) + } + + fun processMessagesInRange( + sessionGroup: String, + sessionAlias: String, + direction: Direction, + fromSequence: Long, + processMessage: (ByteBuf) -> Boolean + ) = searchLock.withLock { + processMessagesInRangeInternal(sessionGroup, sessionAlias, direction, fromSequence, processMessage) + } + + private fun processMessagesInRangeInternal( + sessionGroup: String, + sessionAlias: String, + direction: Direction, + fromSequence: Long, + processMessage: (ByteBuf) -> Boolean + ) { + var timestamp: Timestamp? = null + ProviderCall.withCancellation { + val backwardIterator = dataProvider.searchMessageGroups( + createSearchGroupRequest( + from = Instant.now().toTimestamp(), + sessionGroup = sessionGroup, + sessionAlias = sessionAlias, + direction = direction + ) + ) + + val firstValidMessage = firstValidMessageDetails(backwardIterator) ?: return@withCancellation + + var messagesToSkip = firstValidMessage.payloadSequence - fromSequence + + timestamp = firstValidMessage.timestamp + + while (backwardIterator.hasNext() && messagesToSkip > 0) { + val message = backwardIterator.next().message + if(compare(message.messageId.timestamp, previousDaySessionStart) <= 0) { + continue + } + timestamp = message.messageId.timestamp + messagesToSkip -= 1 + if(messagesToSkip == 0L) { + + val buf = Unpooled.copiedBuffer(message.bodyRaw.toByteArray()) + val sequence = buf.findField(MSG_SEQ_NUM_TAG)?.value?.toInt() ?: continue + + if(checkPossDup(buf)) { + val validMessage = firstValidMessageDetails(backwardIterator) ?: break + + timestamp = validMessage.timestamp + if(validMessage.payloadSequence <= fromSequence) { + break + } else { + messagesToSkip = validMessage.payloadSequence - fromSequence + } + + } else { + + if(sequence <= fromSequence) { + break + } else { + messagesToSkip = sequence - fromSequence + } + } + } + } + } + + val startSearchTimestamp = timestamp ?: return + + K_LOGGER.info { "Loading retransmission messages from ${startSearchTimestamp.toInstant()}" } + + ProviderCall.withCancellation { + + val iterator = dataProvider.searchMessageGroups( + createSearchGroupRequest( + from = startSearchTimestamp, + to = Instant.now().toTimestamp(), + sessionGroup = sessionGroup, + sessionAlias = sessionAlias, + direction = direction, + timeRelation = TimeRelation.NEXT, + ) + ) + + while (iterator.hasNext()) { + val message = Unpooled.buffer().writeBytes(iterator.next().message.bodyRaw.toByteArray()) + if (!processMessage(message)) break + } + } + } + + private fun searchMessage( + sessionGroup: String, + sessionAlias: String, + direction: Direction, + checkPossFlag: Boolean + ) = ProviderCall.withCancellation { + searchMessage( + dataProvider.searchMessageGroups( + createSearchGroupRequest( + from = Instant.now().toTimestamp(), + sessionGroup = sessionGroup, + sessionAlias = sessionAlias, + direction = direction + ) + ), + checkPossFlag + ) { _, seqNum -> seqNum?.toInt() ?: 0 } + } + + private fun searchMessage( + iterator: Iterator, + checkPossFlag: Boolean = false, + extractValue: (MessageGroupResponse?, String?) -> T + ): T { + var message: MessageGroupResponse? + while (iterator.hasNext()) { + message = iterator.next().message + if(sessionStartTime != null && compare(sessionStartTimestamp, message.messageId.timestamp) > 0) { + return extractValue(message, null) + } + + val bodyRaw = Unpooled.copiedBuffer(message.bodyRaw.toByteArray()) + val seqNum = bodyRaw.findField(MSG_SEQ_NUM_TAG)?.value ?: continue + + if(checkPossFlag && checkPossDup(bodyRaw)) continue + + return extractValue(message, seqNum) + } + return extractValue(null, null) + } + + private fun firstValidMessageDetails(iterator: Iterator): MessageDetails? = searchMessage( + iterator, + true + ) { message, seqNum -> + if(message == null || seqNum == null) return@searchMessage null + MessageDetails(seqNum.toInt(), message.messageId.sequence, message.messageId.timestamp) + } + + private fun createSearchGroupRequest( + from: Timestamp, + to: Timestamp = previousDaySessionStart, + sessionGroup: String, + sessionAlias: String, + direction: Direction, + timeRelation: TimeRelation = TimeRelation.PREVIOUS, + ) = MessageGroupsSearchRequest.newBuilder().apply { + startTimestamp = from + endTimestamp = to + addResponseFormats(BASE64_FORMAT) + addStream( + MessageStream.newBuilder() + .setName(sessionAlias) + .setDirection(direction) + ) + addMessageGroup(MessageGroupsSearchRequest.Group.newBuilder().setName(sessionGroup)) + bookIdBuilder.name = bookName + searchDirection = timeRelation + }.build() + + private fun checkPossDup(buf: ByteBuf): Boolean = buf.findField(POSS_DUP_TAG)?.value == IS_POSS_DUP + + data class MessageDetails(val payloadSequence: Int, val messageSequence: Long, val timestamp: Timestamp) + + companion object { + val K_LOGGER = KotlinLogging.logger { } + private const val BASE64_FORMAT = "BASE_64" + } +} \ No newline at end of file diff --git a/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/MessageTransformer.kt b/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/MessageTransformer.kt index 43460f8..2afe615 100644 --- a/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/MessageTransformer.kt +++ b/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/MessageTransformer.kt @@ -63,62 +63,67 @@ object MessageTransformer { private fun transform(message: ByteBuf, actions: List) = sequence { actions.forEach { action -> - action.set?.apply { - val tag = singleTag - val value = singleValue - - if (message.setField(tag, value)) { - yield(ActionResult(tag, value, action)) + try { + action.set?.apply { + val tag = singleTag + val value = singleValue + + if (message.setField(tag, value)) { + yield(ActionResult(tag, value, action)) + } } - } - action.add?.also { field -> - val tag = field.singleTag - val value = field.singleValue + action.add?.also { field -> + val tag = field.singleTag + val value = field.singleValue - action.before?.find(message)?.let { next -> - next.insertPrevious(tag, value) - yield(ActionResult(tag, value, action)) - } + action.before?.find(message)?.let { next -> + next.insertPrevious(tag, value) + yield(ActionResult(tag, value, action)) + } - action.after?.find(message)?.let { previous -> - previous.insertNext(tag, value) - yield(ActionResult(tag, value, action)) + action.after?.find(message)?.let { previous -> + previous.insertNext(tag, value) + yield(ActionResult(tag, value, action)) + } } - } - - action.move?.find(message)?.let { field -> - val tag = checkNotNull(field.tag) { "Field tag for move was empty" } - val value = field.value - action.before?.find(message)?.let { next -> - field.clear() - next.insertPrevious(tag, value) - yield(ActionResult(tag, value, action)) + action.move?.find(message)?.let { field -> + val tag = checkNotNull(field.tag) { "Field tag for move was empty." } + val value = field.value + + action.before?.find(message)?.let { next -> + field.clear() + next.insertPrevious(tag, value) + yield(ActionResult(tag, value, action)) + } + + action.after?.find(message)?.let { previous -> + previous.insertNext(tag, value) + field.clear() + yield(ActionResult(tag, value, action)) + } } - action.after?.find(message)?.let { previous -> - previous.insertNext(tag, value) + action.remove?.find(message)?.let { field -> + val tag = checkNotNull(field.tag) { "Field tag for remove was empty." } field.clear() - yield(ActionResult(tag, value, action)) + yield(ActionResult(tag, null, action)) } - } - action.remove?.find(message)?.let { field -> - val tag = checkNotNull(field.tag) { "Field tag for remove was empty" } - field.clear() - yield(ActionResult(tag, null, action)) - } - - action.replace?.find(message)?.let { field -> - val with = action.with!! - val tag = with.singleTag - val value = with.singleValue + action.replace?.find(message)?.let { field -> + val with = action.with!! + val tag = with.singleTag + val value = with.singleValue - field.tag = tag - field.value = value + field.tag = tag + field.value = value - yield(ActionResult(tag, value, action)) + yield(ActionResult(tag, value, action)) + } + } catch (e: Exception) { + logger.error(e) { "Error while applying action $action" } + yield(ActionResult(-1, null, action, ActionStatusDescription("Error while applying action: $action. Message: ${e.message}", ActionStatus.FAIL))) } } } @@ -223,13 +228,12 @@ data class Action( data class Transform( - @JsonAlias("when") val conditions: List, + @JsonAlias("when") val conditions: List = listOf(), @JsonAlias("then") val actions: List, @JsonAlias("update-length") val updateLength: Boolean = true, @JsonAlias("update-checksum") val updateChecksum: Boolean = true, ) { init { - require(conditions.isNotEmpty()) { "Transformation must have at least one condition" } require(actions.isNotEmpty()) { "Transformation must have at least one action" } } @@ -257,4 +261,7 @@ data class Rule( } data class TransformResult(val rule: RuleID, val results: List, val message: ByteBuf) -data class ActionResult(val tag: Tag, val value: String?, val action: Action) \ No newline at end of file +data class ActionResult(val tag: Tag, val value: String?, val action: Action, val statusDesc: ActionStatusDescription = SUCCESS_ACTION_DESCRIPTION) +data class ActionStatusDescription(val description: String? = null, val status: ActionStatus) +enum class ActionStatus { SUCCESS, FAIL; } +private val SUCCESS_ACTION_DESCRIPTION = ActionStatusDescription(status = ActionStatus.SUCCESS) \ No newline at end of file diff --git a/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/ProviderCall.kt b/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/ProviderCall.kt new file mode 100644 index 0000000..3b42685 --- /dev/null +++ b/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/ProviderCall.kt @@ -0,0 +1,22 @@ +/******************************************************************************* + * Copyright (c) 2023, Exactpro Systems LLC + * www.exactpro.com + * Build Software to Test Software + * + * All rights reserved. + * This is unpublished, licensed software, confidential and proprietary + * information which is the property of Exactpro Systems LLC or its licensors. + ******************************************************************************/ +package com.exactpro.th2.conn.dirty.fix + +import io.grpc.Context + +class ProviderCall { + companion object { + fun withCancellation(code: () -> T): T { + return Context.current().withCancellation().use { context -> + context.call { code() } + } + } + } +} \ No newline at end of file diff --git a/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/SequenceLoader.kt b/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/SequenceLoader.kt new file mode 100644 index 0000000..e69de29 diff --git a/src/test/java/com/exactpro/th2/FixHandlerSendTimeoutTest.java b/src/test/java/com/exactpro/th2/FixHandlerSendTimeoutTest.java new file mode 100644 index 0000000..938e66c --- /dev/null +++ b/src/test/java/com/exactpro/th2/FixHandlerSendTimeoutTest.java @@ -0,0 +1,143 @@ +/* + * Copyright 2023-2024 Exactpro (Exactpro Systems Limited) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.exactpro.th2; + +import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.Direction; +import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.MessageId; +import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.RawMessage; +import com.exactpro.th2.conn.dirty.tcp.core.api.IChannel; +import com.exactpro.th2.conn.dirty.tcp.core.api.IHandlerContext; +import kotlin.Unit; +import org.jetbrains.annotations.NotNull; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import java.time.Instant; +import java.time.LocalTime; +import java.time.ZoneOffset; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeoutException; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyLong; + +class FixHandlerSendTimeoutTest { + @Test + void sendTimeoutOnConnectionOpen() { + var contextMock = Mockito.mock(IHandlerContext.class); + var channelMock = Mockito.mock(IChannel.class); + Mockito.when(contextMock.createChannel( + any(), + any(), + any(), + anyBoolean(), + anyLong(), + anyInt(), + any(String[].class) + )) + .thenReturn(channelMock); + Mockito.when(channelMock.open()) + .thenReturn(new CompletableFuture<>()); // future never completes + var settings = new FixHandlerSettings(); + settings.setPort(42); + settings.setHost("localhost"); + settings.setConnectionTimeoutOnSend(300); // 300 millis + settings.setMinConnectionTimeoutOnSend(100); + Mockito.when(contextMock.getSettings()) + .thenReturn(settings); + try(var fixHandler = new FixHandler(contextMock)) { + fixHandler.onStart(); + var exception = Assertions.assertThrows(TimeoutException.class, () -> + fixHandler.send(RawMessage.builder() + .setId(MessageId.builder() + .setDirection(Direction.OUTGOING) + .setSessionAlias("test") + .setSequence(1) + .setTimestamp(Instant.now()) + .build()) + .build())); + Assertions.assertEquals( + "could not open connection before timeout 300 mls elapsed", + exception.getMessage(), + "unexpected message" + ); + } + } + + @Test + void sendTimeoutOnSessionEnabled() { + var contextMock = Mockito.mock(IHandlerContext.class); + var channelMock = Mockito.mock(IChannel.class); + Mockito.when(contextMock.createChannel( + any(), + any(), + any(), + anyBoolean(), + anyLong(), + anyInt(), + any(String[].class) + )) + .thenReturn(channelMock); + Mockito.when(channelMock.open()) + .thenReturn(CompletableFuture.completedFuture(Unit.INSTANCE)); // completed immediately + Mockito.when(channelMock.isOpen()).thenReturn(true); + var settings = createSettings(); + Mockito.when(contextMock.getSettings()) + .thenReturn(settings); + try(var fixHandler = new FixHandler(contextMock)) { + fixHandler.onStart(); + var exception = Assertions.assertThrows(TimeoutException.class, () -> + fixHandler.send(RawMessage.builder() + .setId(MessageId.builder() + .setDirection(Direction.OUTGOING) + .setSessionAlias("test") + .setSequence(1) + .setTimestamp(Instant.now()) + .build()) + .build())); + Assertions.assertEquals( + "session was not established within 300 mls", + exception.getMessage(), + "unexpected message" + ); + } + } + + @NotNull + private static FixHandlerSettings createSettings() { + var settings = new FixHandlerSettings(); + settings.setPort(42); + settings.setHost("localhost"); + settings.setConnectionTimeoutOnSend(300); // 300 millis + settings.setMinConnectionTimeoutOnSend(100); + LocalTime currentTime = LocalTime.now(ZoneOffset.UTC); + int deltaMinutes = currentTime.isAfter(LocalTime.NOON) + ? -1 + : 1; + if (deltaMinutes > 0) { + settings.setSessionStartTime(currentTime.plusMinutes(deltaMinutes)); + settings.setSessionEndTime(currentTime.plusMinutes(deltaMinutes * 2)); + } else { + settings.setSessionStartTime(currentTime.plusMinutes(deltaMinutes * 2)); + settings.setSessionEndTime(currentTime.plusMinutes(deltaMinutes)); + } + return settings; + } +} diff --git a/src/test/java/com/exactpro/th2/FixHandlerTest.java b/src/test/java/com/exactpro/th2/FixHandlerTest.java index ece4a57..0a61f8a 100644 --- a/src/test/java/com/exactpro/th2/FixHandlerTest.java +++ b/src/test/java/com/exactpro/th2/FixHandlerTest.java @@ -20,34 +20,38 @@ import com.exactpro.th2.common.grpc.MessageID; import com.exactpro.th2.conn.dirty.tcp.core.api.IChannel; import com.exactpro.th2.conn.dirty.tcp.core.api.IHandlerContext; +import com.exactpro.th2.dataprovider.lw.grpc.DataProviderService; import com.exactpro.th2.util.MessageUtil; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; -import kotlin.Unit; -import org.jetbrains.annotations.NotNull; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; -import org.junit.jupiter.api.Test; -import org.mockito.Mockito; - import java.net.InetSocketAddress; import java.nio.charset.StandardCharsets; import java.time.Clock; import java.time.Instant; +import java.time.LocalDateTime; import java.time.ZoneId; +import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; +import kotlin.Unit; +import org.jetbrains.annotations.NotNull; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; +import static com.exactpro.th2.conn.dirty.fix.FixByteBufUtilKt.findField; import static com.exactpro.th2.constants.Constants.BEGIN_STRING_TAG; import static com.exactpro.th2.constants.Constants.BODY_LENGTH_TAG; import static com.exactpro.th2.constants.Constants.CHECKSUM_TAG; import static com.exactpro.th2.constants.Constants.DEFAULT_APPL_VER_ID_TAG; +import static com.exactpro.th2.constants.Constants.MSG_SEQ_NUM_TAG; import static com.exactpro.th2.constants.Constants.MSG_TYPE_TAG; +import static com.exactpro.th2.constants.Constants.NEW_SEQ_NO_TAG; import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -55,26 +59,27 @@ class FixHandlerTest { - private static final Channel channel = new Channel(); + private static final ByteBuf logonResponse = Unpooled.wrappedBuffer("8=FIXT.1.1\0019=105\00135=A\00134=1\00149=server\00156=client\00150=system\00152=2014-12-22T10:15:30Z\00198=0\001108=30\0011137=9\0011409=0\00110=203\001".getBytes(StandardCharsets.US_ASCII)); + private Channel channel; + private FixHandler fixHandler; private static ByteBuf buffer; private static ByteBuf oneMessageBuffer; private static ByteBuf brokenBuffer; - private static FixHandler fixHandler = channel.getFixHandler(); @BeforeAll static void init() { - fixHandler.onOpen(channel); - ByteBuf logonResponse = Unpooled.wrappedBuffer("8=FIXT.1.1\0019=105\00135=A\00134=1\00149=server\00156=client\00150=system\00152=2014-12-22T10:15:30Z\00198=0\001108=30\0011137=9\0011409=0\00110=203\001".getBytes(StandardCharsets.US_ASCII)); - fixHandler.onIncoming(channel, logonResponse); - oneMessageBuffer = Unpooled.wrappedBuffer("8=FIXT.1.1\0019=13\00135=AE\001552=1\00110=169\001".getBytes(StandardCharsets.US_ASCII)); buffer = Unpooled.wrappedBuffer(("8=FIXT.1.1\0019=13\00135=AE\001552=1\00110=169\0018=FIXT.1.1\0019=13\00135=NN" + "\001552=2\00110=100\0018=FIXT.1.1\0019=13\00135=NN\001552=2\00110=100\001").getBytes(StandardCharsets.US_ASCII)); brokenBuffer = Unpooled.wrappedBuffer("A8=FIXT.1.1\0019=13\00135=AE\001552=1\00110=16913138=FIXT.1.1\0019=13\00135=AE\001552=1\00110=169\001".getBytes(StandardCharsets.US_ASCII)); + oneMessageBuffer = Unpooled.wrappedBuffer("8=FIXT.1.1\0019=13\00135=AE\001552=1\00110=169\001".getBytes(StandardCharsets.US_ASCII)); } @BeforeEach void beforeEach() { - fixHandler.getEnabled().set(true); + channel = new Channel(createHandlerSettings(), null); + fixHandler = channel.getFixHandler(); + fixHandler.onOpen(channel); + fixHandler.onIncoming(channel, logonResponse); } @AfterAll @@ -150,6 +155,61 @@ void sendResendRequestTest() { assertEquals(expectedResendRequest, new String(channel.getQueue().get(1).array())); } + @NotNull + public static FixHandlerSettings createHandlerSettings() { + final FixHandlerSettings fixHandlerSettings = new FixHandlerSettings(); + fixHandlerSettings.setHost("127.0.0.1"); + fixHandlerSettings.setPort(8080); + fixHandlerSettings.setBeginString("FIXT.1.1"); + fixHandlerSettings.setHeartBtInt(30); + fixHandlerSettings.setSenderCompID("client"); + fixHandlerSettings.setTargetCompID("server"); + fixHandlerSettings.setEncryptMethod("0"); + fixHandlerSettings.setUsername("username"); + fixHandlerSettings.setPassword("pass"); + fixHandlerSettings.setTestRequestDelay(10); + fixHandlerSettings.setReconnectDelay(5); + fixHandlerSettings.setDisconnectRequestDelay(5); + fixHandlerSettings.setResetSeqNumFlag(false); + fixHandlerSettings.setResetOnLogon(false); + fixHandlerSettings.setDefaultApplVerID("9"); + fixHandlerSettings.setSenderSubID("trader"); + return fixHandlerSettings; + } + + @NotNull + private static FixHandler createFixHandler() { + FixHandlerSettings fixHandlerSettings = createHandlerSettings(); + IHandlerContext context = Mockito.mock(IHandlerContext.class); + Mockito.when(context.getSettings()).thenReturn(fixHandlerSettings); + return new FixHandler(context); + } + + @Test + void getTimeTestWithSendingDateTimeFormatBeingNull() { + FixHandler originalFixHandler = createFixHandler(); + DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyyMMdd-HH:mm:ss.SSSSSSSSS"); + String actual = originalFixHandler.getTime(); + LocalDateTime time = LocalDateTime.parse(actual, formatter); + String expected = formatter.format(time); + + assertEquals(expected, actual); + } + + @Test + void getTimeTestWithNewSendingDateTimeFormat() { + FixHandler originalFixHandler = createFixHandler(); + DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyyMMdd-HH:mm:ss.SSS"); + originalFixHandler.settings.setSendingDateTimeFormat(formatter); + String actual = originalFixHandler.getTime(); + + LocalDateTime time = LocalDateTime.parse(actual, formatter); + String expected = formatter.format(time); + + assertEquals(expected, actual); + } + + @Test void onConnectionTest() { channel.clearQueue(); @@ -161,10 +221,25 @@ void onConnectionTest() { } catch (InterruptedException e) { e.printStackTrace(); } - assertEquals("8=FIXT.1.1\u00019=105\u000135=A\u000134=7\u000149=client\u000156=server\u000150=trader\u000152=2014-12-22T10:15:30Z\u000198=0\u0001108=30\u00011137=9\u0001553=username\u0001554=pass\u000110=209\u0001", + assertEquals("8=FIXT.1.1\u00019=105\u000135=A\u000134=2\u000149=client\u000156=server\u000150=trader\u000152=2014-12-22T10:15:30Z\u000198=0\u0001108=30\u00011137=9\u0001553=username\u0001554=pass\u000110=204\u0001", new String(channel.getQueue().get(0).array())); } + @Test + void logoutDisconnectTest() { + channel.clearQueue(); + channel.close(); + fixHandler.onOpen(channel); + channel.close(); + var logon = "8=FIXT.1.1\u00019=105\u000135=A\u000134=2\u000149=client\u000156=server\u000150=trader\u000152=2014-12-22T10:15:30Z\u000198=0\u0001108=30\u00011137=9\u0001553=username\u0001554=pass\u000110=204\u0001"; + assertEquals(channel.getQueue().size(), 1); + assertEquals(logon, new String(channel.getQueue().get(0).array())); + channel.clearQueue(); + fixHandler.onOpen(channel); + assertEquals(channel.getQueue().size(), 1); + assertEquals(logon, new String(channel.getQueue().get(0).array())); + } + @Test void onOutgoingMessageTest() { ByteBuf bufferForPrepareMessage1 = Unpooled.buffer().writeBytes("8=FIXT.1.1\0019=13\001552=1\00149=client\00134=8\00156=null\00110=169\001".getBytes(StandardCharsets.US_ASCII)); @@ -173,9 +248,9 @@ void onOutgoingMessageTest() { ByteBuf bufferForPrepareMessage4 = Unpooled.buffer().writeBytes("8=FIXT.1.1\0019=192\00135=A\00111=3428785\00122=8\00138=30\00140=2\00144=55\00148=INSTR1\00154=1\00159=0\00160=20220127-18:38:35\001526=11111\001528=A\001581=1\001453=4\001448=DEMO-CONN1\001447=D\001452=76\001448=0\001447=P\001452=3\001448=0\00147=P\001452=122\001448=3\001447=P\001452=12\00110=228\001".getBytes(StandardCharsets.US_ASCII)); String expectedMessage1 = "8=FIXT.1.1\u00019=70\u000135=A\u0001552=1\u000149=client\u000134=8\u000156=server\u000152=2014-12-22T10:15:30Z\u000150=trader\u000110=132\u0001"; - String expectedMessage2 = "8=FIXT.1.1\u00019=65\u000134=4\u000149=client\u000156=server\u000152=2014-12-22T10:15:30Z\u000150=trader\u0001552=1\u000110=157\u0001"; - String expectedMessage3 = "8=FIXT.1.1\u00019=243\u000135=A\u000134=5\u000149=client\u000156=server\u000150=trader\u000111=9977764\u000122=8\u000138=100\u000140=2\u000144=55\u000152=20220127-12:00:40.775\u000148=INSTR2\u000154=2\u000159=3\u000160=20220127-15:00:36\u0001528=A\u0001581=1\u0001453=4\u0001448=DEMO-CONN2\u0001447=D\u0001452=76\u0001448=0\u0001447=P\u0001452=3\u0001448=0\u0001447=P\u0001452=122\u0001448=3\u0001447=P\u0001452=12\u000110=120\u0001"; - String expectedMessage4 = "8=FIXT.1.1\u00019=250\u000135=A\u000134=6\u000149=client\u000156=server\u000152=2014-12-22T10:15:30Z\u000150=trader\u000111=3428785\u000122=8\u000138=30\u000140=2\u000144=55\u000148=INSTR1\u000154=1\u000159=0\u000160=20220127-18:38:35\u0001526=11111\u0001528=A\u0001581=1\u0001453=4\u0001448=DEMO-CONN1\u0001447=D\u0001452=76\u0001448=0\u0001447=P\u0001452=3\u0001448=0\u000147=P\u0001452=122\u0001448=3\u0001447=P\u0001452=12\u000110=235\u0001"; + String expectedMessage2 = "8=FIXT.1.1\u00019=65\u000134=2\u000149=client\u000156=server\u000152=2014-12-22T10:15:30Z\u000150=trader\u0001552=1\u000110=155\u0001"; + String expectedMessage3 = "8=FIXT.1.1\u00019=243\u000135=A\u000134=3\u000149=client\u000156=server\u000150=trader\u000111=9977764\u000122=8\u000138=100\u000140=2\u000144=55\u000152=20220127-12:00:40.775\u000148=INSTR2\u000154=2\u000159=3\u000160=20220127-15:00:36\u0001528=A\u0001581=1\u0001453=4\u0001448=DEMO-CONN2\u0001447=D\u0001452=76\u0001448=0\u0001447=P\u0001452=3\u0001448=0\u0001447=P\u0001452=122\u0001448=3\u0001447=P\u0001452=12\u000110=118\u0001"; + String expectedMessage4 = "8=FIXT.1.1\u00019=250\u000135=A\u000134=4\u000149=client\u000156=server\u000152=2014-12-22T10:15:30Z\u000150=trader\u000111=3428785\u000122=8\u000138=30\u000140=2\u000144=55\u000148=INSTR1\u000154=1\u000159=0\u000160=20220127-18:38:35\u0001526=11111\u0001528=A\u0001581=1\u0001453=4\u0001448=DEMO-CONN1\u0001447=D\u0001452=76\u0001448=0\u0001447=P\u0001452=3\u0001448=0\u000147=P\u0001452=122\u0001448=3\u0001447=P\u0001452=12\u000110=233\u0001"; Map expected = new HashMap<>(); expected.put("MsgType", "A"); Map expected2 = new HashMap<>(); @@ -191,7 +266,6 @@ void onOutgoingMessageTest() { Map actual2 = new HashMap<>(); fixHandler.onOutgoing(channel, bufferForPrepareMessage2, actual2); assertEquals(expected2, actual2); - fixHandler.onOutgoing(channel, bufferForPrepareMessage3, expected3); fixHandler.onOutgoing(channel, bufferForPrepareMessage4, expected4); @@ -238,7 +312,7 @@ void getByteByfBodyLengthTest() { @Test void sendTestRequestTest() { - String expected = "8=FIXT.1.1\u00019=70\u000135=1\u000134=1\u000149=client\u000156=server\u000150=trader\u000152=2014-12-22T10:15:30Z\u0001112=1\u000110=101\u0001"; + String expected = "8=FIXT.1.1\u00019=70\u000135=1\u000134=2\u000149=client\u000156=server\u000150=trader\u000152=2014-12-22T10:15:30Z\u0001112=1\u000110=102\u0001"; channel.clearQueue(); fixHandler.sendTestRequest(); assertEquals(expected, new String(channel.getQueue().get(0).array())); @@ -246,7 +320,19 @@ void sendTestRequestTest() { @Test void handleResendRequestTest() { - //later + for (int i = 0; i < 3; i++) { + fixHandler.sendResendRequest(1); + } + ByteBuf resendRequest = Unpooled.wrappedBuffer("8=FIXT.1.1\u00019=70\u000135=2\u000134=2\u00017=1\u000116=0\u000149=client\u000156=server\u000150=trader\u000152=2014-12-22T10:15:30Z\u000110=101\u0001".getBytes(StandardCharsets.US_ASCII)); + channel.clearQueue(); + fixHandler.onIncoming(channel, resendRequest); + ByteBuf sequenceReset = channel.getQueue().get(0); + assertEquals("8=FIXT.1.1\u00019=105\u000135=4\u000134=1\u000149=client\u000156=server\u000150=trader\u000152=2014-12-22T10:15:30Z\u0001122=2014-12-22T10:15:30Z\u000143=Y\u0001123=Y\u000136=5\u000110=162\u0001", new String(sequenceReset.array())); + channel.clearQueue(); + fixHandler.sendResendRequest(2); + ByteBuf resendRequestOutgoing = channel.getQueue().get(0); + assertEquals("8=FIXT.1.1\u00019=73\u000135=2\u000134=5\u000149=client\u000156=server\u000150=trader\u000152=2014-12-22T10:15:30Z\u00017=2\u000116=0\u000110=230\u0001", new String(resendRequestOutgoing.array())); + assertEquals(findField(resendRequestOutgoing, MSG_SEQ_NUM_TAG).getValue(), findField(sequenceReset, NEW_SEQ_NO_TAG).getValue()); } @Test @@ -322,7 +408,6 @@ void updateTagTest() { MessageUtil.updateTag(buf, DEFAULT_APPL_VER_ID_TAG.toString(), "1"); assertEquals(expected2, buf.toString(StandardCharsets.US_ASCII)); } - } class Channel implements IChannel { @@ -330,26 +415,12 @@ class Channel implements IChannel { private final MyFixHandler fixHandler; private final List queue = new ArrayList<>(); - Channel() { - this.fixHandlerSettings = new FixHandlerSettings(); - fixHandlerSettings.setHost("127.0.0.1"); - fixHandlerSettings.setPort(8080); - fixHandlerSettings.setBeginString("FIXT.1.1"); - fixHandlerSettings.setHeartBtInt(30); - fixHandlerSettings.setSenderCompID("client"); - fixHandlerSettings.setTargetCompID("server"); - fixHandlerSettings.setEncryptMethod("0"); - fixHandlerSettings.setUsername("username"); - fixHandlerSettings.setPassword("pass"); - fixHandlerSettings.setTestRequestDelay(10); - fixHandlerSettings.setReconnectDelay(5); - fixHandlerSettings.setDisconnectRequestDelay(5); - fixHandlerSettings.setResetSeqNumFlag(false); - fixHandlerSettings.setResetOnLogon(false); - fixHandlerSettings.setDefaultApplVerID("9"); - fixHandlerSettings.setSenderSubID("trader"); + Channel(FixHandlerSettings fixHandlerSettings, DataProviderService dataProviderService) { + this.fixHandlerSettings = fixHandlerSettings; IHandlerContext context = Mockito.mock(IHandlerContext.class); - Mockito.when(context.getSettings()).thenReturn(fixHandlerSettings); + Mockito.when(context.getSettings()).thenReturn(this.fixHandlerSettings); + Mockito.when(context.getGrpcService(DataProviderService.class)).thenReturn(dataProviderService); + Mockito.when(context.getBookName()).thenReturn("bookName"); this.fixHandler = new MyFixHandler(context); } diff --git a/src/test/java/com/exactpro/th2/RecoveryTest.java b/src/test/java/com/exactpro/th2/RecoveryTest.java new file mode 100644 index 0000000..9399681 --- /dev/null +++ b/src/test/java/com/exactpro/th2/RecoveryTest.java @@ -0,0 +1,281 @@ +/* + * Copyright 2023-2024 Exactpro (Exactpro Systems Limited) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.exactpro.th2; + +import com.exactpro.th2.conn.dirty.fix.MessageSearcher; +import com.exactpro.th2.dataprovider.lw.grpc.DataProviderService; +import com.exactpro.th2.dataprovider.lw.grpc.MessageGroupResponse; +import com.exactpro.th2.dataprovider.lw.grpc.MessageGroupsSearchRequest; +import com.exactpro.th2.dataprovider.lw.grpc.MessageSearchResponse; +import com.google.protobuf.ByteString; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; + +import static com.exactpro.th2.FixHandlerTest.createHandlerSettings; +import static com.exactpro.th2.conn.dirty.fix.FixByteBufUtilKt.findField; +import static com.exactpro.th2.constants.Constants.MSG_SEQ_NUM_TAG; +import static com.exactpro.th2.constants.Constants.MSG_TYPE_SEQUENCE_RESET; +import static com.exactpro.th2.constants.Constants.MSG_TYPE_TAG; +import static com.exactpro.th2.constants.Constants.NEW_SEQ_NO_TAG; +import static com.exactpro.th2.constants.Constants.POSS_DUP_TAG; +import static org.junit.jupiter.api.Assertions.assertEquals; + +@SuppressWarnings("DataFlowIssue") +public class RecoveryTest { + + private static final ByteBuf logonResponse = Unpooled.wrappedBuffer("8=FIXT.1.1\0019=105\00135=A\00134=1\00149=server\00156=client\00150=system\00152=2014-12-22T10:15:30Z\00198=0\001108=30\0011137=9\0011409=0\00110=203\001".getBytes(StandardCharsets.US_ASCII)); + private Channel channel; + private FixHandler fixHandler; + + @Test + void testSequenceResetInRange() { + FixHandlerSettings settings = createHandlerSettings(); + settings.setLoadMissedMessagesFromCradle(true); + DataProviderService dataProviderService = Mockito.mock(DataProviderService.class); + MessageSearcher ms = new MessageSearcher( + List.of( + messageSearchResponse(2), + messageSearchResponse(3), + messageSearchResponse(4), + messageSearchResponse(5) + ) + ); + Mockito.when(dataProviderService.searchMessageGroups(Mockito.any())).thenAnswer( + x -> ms.searchMessages(x.getArgument(0, MessageGroupsSearchRequest.class)) + ); + channel = new Channel(settings, dataProviderService); + fixHandler = channel.getFixHandler(); + fixHandler.onOpen(channel); + fixHandler.onIncoming(channel, logonResponse); + // requesting resend from 2 to 5 + ByteBuf resendRequest = Unpooled.wrappedBuffer("8=FIXT.1.1\u00019=73\u000135=2\u000134=2\u000149=client\u000156=server\u000150=trader\u000152=2014-12-22T10:15:30Z\u00017=2\u000116=5\u000110=226\u0001".getBytes(StandardCharsets.UTF_8)); + fixHandler.onIncoming(channel, resendRequest); + assertEquals(5, channel.getQueue().size()); + + for(int i = 1; i <= 4; i++) { + ByteBuf buf = channel.getQueue().get(i); + assertEquals(findField(buf, MSG_TYPE_TAG).getValue(), "C"); + assertEquals(Integer.parseInt(findField(buf, MSG_SEQ_NUM_TAG).getValue()), i + 1); + assertEquals(findField(buf, POSS_DUP_TAG).getValue(), "Y"); + } + } + + @Test + void testSequenceResetInsideRange() { + FixHandlerSettings settings = createHandlerSettings(); + settings.setLoadMissedMessagesFromCradle(true); + DataProviderService dataProviderService = Mockito.mock(DataProviderService.class); + MessageSearcher ms = new MessageSearcher( + List.of( + messageSearchResponse(4), + messageSearchResponse(5) + ) + ); + Mockito.when(dataProviderService.searchMessageGroups(Mockito.any())).thenAnswer( + x -> ms.searchMessages(x.getArgument(0, MessageGroupsSearchRequest.class)) + ); + channel = new Channel(settings, dataProviderService); + fixHandler = channel.getFixHandler(); + fixHandler.onOpen(channel); + fixHandler.onIncoming(channel, logonResponse); + // handler sequence after loop is 22 + for(int i = 0; i <= 20; i++) { + fixHandler.onOutgoing( + channel, + Unpooled.buffer().writeBytes(messageWithoutSeqNum().getBytes(StandardCharsets.UTF_8)), + new HashMap<>() + ); + } + // requesting resend from 2 to 8 + ByteBuf resendRequest = Unpooled.wrappedBuffer("8=FIXT.1.1\u00019=73\u000135=2\u000134=2\u000149=client\u000156=server\u000150=trader\u000152=2014-12-22T10:15:30Z\u00017=2\u000116=8\u000110=226\u0001".getBytes(StandardCharsets.UTF_8)); + fixHandler.onIncoming(channel, resendRequest); + assertEquals(channel.getQueue().size(), 5); + + // for missed messages after beginSeqNo to 4 + ByteBuf firstSequenceReset = channel.getQueue().get(1); + assertEquals(findField(firstSequenceReset, MSG_TYPE_TAG).getValue(), MSG_TYPE_SEQUENCE_RESET); + assertEquals(Integer.parseInt(findField(firstSequenceReset, MSG_SEQ_NUM_TAG).getValue()), 2); + assertEquals(Integer.parseInt(findField(firstSequenceReset, NEW_SEQ_NO_TAG).getValue()), 4); + + ByteBuf message4 = channel.getQueue().get(2); + + assertEquals(findField(message4, MSG_TYPE_TAG).getValue(), "C"); + assertEquals(Integer.parseInt(findField(message4, MSG_SEQ_NUM_TAG).getValue()), 4); + assertEquals(findField(message4, POSS_DUP_TAG).getValue(), "Y"); + + ByteBuf message5 = channel.getQueue().get(3); + + assertEquals(findField(message5, MSG_TYPE_TAG).getValue(), "C"); + assertEquals(Integer.parseInt(findField(message5, MSG_SEQ_NUM_TAG).getValue()), 5); + assertEquals(findField(message5, POSS_DUP_TAG).getValue(), "Y"); + + // For missed messages after 4 + ByteBuf seqReset2 = channel.getQueue().get(4); + assertEquals(findField(seqReset2, MSG_TYPE_TAG).getValue(), MSG_TYPE_SEQUENCE_RESET); + assertEquals(Integer.parseInt(findField(seqReset2, MSG_SEQ_NUM_TAG).getValue()), 6); + assertEquals(Integer.parseInt(findField(seqReset2, NEW_SEQ_NO_TAG).getValue()), 23); + } + + @Test + void testSequenceResetOutOfRange() { + FixHandlerSettings settings = createHandlerSettings(); + settings.setLoadMissedMessagesFromCradle(true); + DataProviderService dataProviderService = Mockito.mock(DataProviderService.class); + MessageSearcher ms = new MessageSearcher( + List.of( + messageSearchResponse(1), + messageSearchResponse(2), + messageSearchResponse(3), + messageSearchResponse(4), + messageSearchResponse(5), + messageSearchResponse(6) + ) + ); + Mockito.when(dataProviderService.searchMessageGroups(Mockito.any())).thenAnswer( + x -> ms.searchMessages(x.getArgument(0, MessageGroupsSearchRequest.class)) + ); + channel = new Channel(settings, dataProviderService); + fixHandler = channel.getFixHandler(); + fixHandler.onOpen(channel); + fixHandler.onIncoming(channel, logonResponse); + // requesting resend from 2 to 5 + ByteBuf resendRequest = Unpooled.wrappedBuffer("8=FIXT.1.1\u00019=73\u000135=2\u000134=2\u000149=client\u000156=server\u000150=trader\u000152=2014-12-22T10:15:30Z\u00017=2\u000116=5\u000110=226\u0001".getBytes(StandardCharsets.UTF_8)); + fixHandler.onIncoming(channel, resendRequest); + assertEquals(5, channel.getQueue().size()); + for(int i = 1; i <= 4; i++) { + ByteBuf buf = channel.getQueue().get(i); + assertEquals(findField(buf, MSG_TYPE_TAG).getValue(), "C"); + assertEquals(Integer.parseInt(findField(buf, MSG_SEQ_NUM_TAG).getValue()), i + 1); + assertEquals(findField(buf, POSS_DUP_TAG).getValue(), "Y"); + } + } + + @Test + void testSequenceResetAdminMessages() { + FixHandlerSettings settings = createHandlerSettings(); + settings.setLoadMissedMessagesFromCradle(true); + DataProviderService dataProviderService = Mockito.mock(DataProviderService.class); + MessageSearcher ms = new MessageSearcher( + List.of( + messageSearchResponseAdmin(2), + messageSearchResponse(4), + messageSearchResponseAdmin(5), + messageSearchResponseAdmin(6) + ) + ); + Mockito.when(dataProviderService.searchMessageGroups(Mockito.any())).thenAnswer( + x -> ms.searchMessages(x.getArgument(0, MessageGroupsSearchRequest.class)) + ); + channel = new Channel(settings, dataProviderService); + fixHandler = channel.getFixHandler(); + fixHandler.onOpen(channel); + fixHandler.onIncoming(channel, logonResponse); + // handler sequence after loop is 22 + for(int i = 0; i <= 20; i++) { + fixHandler.onOutgoing( + channel, + Unpooled.buffer().writeBytes(messageWithoutSeqNum().getBytes(StandardCharsets.UTF_8)), + new HashMap<>() + ); + } + // requesting resend from 1 to 5 + ByteBuf resendRequest = Unpooled.wrappedBuffer("8=FIXT.1.1\u00019=73\u000135=2\u000134=2\u000149=client\u000156=server\u000150=trader\u000152=2014-12-22T10:15:30Z\u00017=1\u000116=5\u000110=226\u0001".getBytes(StandardCharsets.UTF_8)); + fixHandler.onIncoming(channel, resendRequest); + + // sequence reset for messages from 1 to 3 ( 1, 2 - missing, 3 - admin ) + ByteBuf seqReset1 = channel.getQueue().get(1); + assertEquals(findField(seqReset1, MSG_TYPE_TAG).getValue(), MSG_TYPE_SEQUENCE_RESET); + assertEquals(Integer.parseInt(findField(seqReset1, MSG_SEQ_NUM_TAG).getValue()), 1); + assertEquals(Integer.parseInt(findField(seqReset1, NEW_SEQ_NO_TAG).getValue()), 4); + + ByteBuf message = channel.getQueue().get(2); + assertEquals(findField(message, MSG_TYPE_TAG).getValue(), "C"); + assertEquals(Integer.parseInt(findField(message, MSG_SEQ_NUM_TAG).getValue()), 4); + assertEquals(findField(message, POSS_DUP_TAG).getValue(), "Y"); + + // sequence reset for messages from 1 to 3 ( 1, 2 - missing, 3 - admin ) + ByteBuf seqReset2 = channel.getQueue().get(3); + assertEquals(findField(seqReset2, MSG_TYPE_TAG).getValue(), MSG_TYPE_SEQUENCE_RESET); + assertEquals(Integer.parseInt(findField(seqReset2, MSG_SEQ_NUM_TAG).getValue()), 5); + assertEquals(Integer.parseInt(findField(seqReset2, NEW_SEQ_NO_TAG).getValue()), 23); + + } + + @Test + void allMessagesMissed() { + FixHandlerSettings settings = createHandlerSettings(); + settings.setLoadMissedMessagesFromCradle(true); + DataProviderService dataProviderService = Mockito.mock(DataProviderService.class); + Mockito.when(dataProviderService.searchMessageGroups(Mockito.any())).thenReturn(Collections.emptyIterator()); + channel = new Channel(settings, dataProviderService); + fixHandler = channel.getFixHandler(); + fixHandler.onOpen(channel); + fixHandler.onIncoming(channel, logonResponse); + // handler sequence after loop is 22 + for(int i = 0; i <= 20; i++) { + fixHandler.onOutgoing( + channel, + Unpooled.buffer().writeBytes(messageWithoutSeqNum().getBytes(StandardCharsets.UTF_8)), + new HashMap<>() + ); + } + // requesting resend from 1 to 5 + ByteBuf resendRequest = Unpooled.wrappedBuffer("8=FIXT.1.1\u00019=73\u000135=2\u000134=2\u000149=client\u000156=server\u000150=trader\u000152=2014-12-22T10:15:30Z\u00017=1\u000116=5\u000110=226\u0001".getBytes(StandardCharsets.UTF_8)); + fixHandler.onIncoming(channel, resendRequest); + + // sequence reset for messages from 1 to 3 ( 1, 2 - missing, 3 - admin ) + ByteBuf seqReset = channel.getQueue().get(1); + assertEquals(findField(seqReset, MSG_TYPE_TAG).getValue(), MSG_TYPE_SEQUENCE_RESET); + assertEquals(Integer.parseInt(findField(seqReset, MSG_SEQ_NUM_TAG).getValue()), 1); + assertEquals(Integer.parseInt(findField(seqReset, NEW_SEQ_NO_TAG).getValue()), 23); + } + + private MessageSearchResponse messageSearchResponse(Integer sequence) { + return MessageSearchResponse.newBuilder() + .setMessage( + MessageGroupResponse.newBuilder() + .setBodyRaw(ByteString.copyFromUtf8(message(sequence))) + ).build(); + } + + private MessageSearchResponse messageSearchResponseAdmin(Integer sequence) { + return MessageSearchResponse.newBuilder() + .setMessage( + MessageGroupResponse.newBuilder() + .setBodyRaw(ByteString.copyFromUtf8(adminMessage(sequence))) + ).build(); + } + + private String message(Integer sequence) { + return String.format("8=FIXT.1.1\u00019=70\u000135=C\u0001552=1\u000149=client\u000134=%d\u000156=server\u000152=2014-12-22T10:15:30Z\u000150=trader\u000110=132\u0001", sequence); + } + + private String messageWithoutSeqNum() { + return "8=FIXT.1.1\u00019=70\u000135=C\u0001552=1\u000149=client\u000156=server\u000152=2014-12-22T10:15:30Z\u000150=trader\u000110=132\u0001"; + } + + private String adminMessage(Integer sequence) { + return String.format("8=FIXT.1.1\u00019=70\u000135=4\u0001552=1\u000149=client\u000134=%d\u000156=server\u000152=2014-12-22T10:15:30Z\u000150=trader\u000110=132\u0001", sequence); + } +} diff --git a/src/test/kotlin/com/exactpro/th2/conn/dirty/fix/MessageSearcher.kt b/src/test/kotlin/com/exactpro/th2/conn/dirty/fix/MessageSearcher.kt new file mode 100644 index 0000000..44210da --- /dev/null +++ b/src/test/kotlin/com/exactpro/th2/conn/dirty/fix/MessageSearcher.kt @@ -0,0 +1,40 @@ +/* + * Copyright 2022-2023 Exactpro (Exactpro Systems Limited) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.exactpro.th2.conn.dirty.fix + +import com.exactpro.th2.dataprovider.lw.grpc.MessageGroupsSearchRequest +import com.exactpro.th2.dataprovider.lw.grpc.MessageSearchResponse +import com.exactpro.th2.dataprovider.lw.grpc.TimeRelation +import com.google.protobuf.util.Timestamps + +class MessageSearcher(private val messages: List) { + + fun searchMessages(request: MessageGroupsSearchRequest): Iterator { + val startTimestamp = request.startTimestamp + val searchDirection = request.searchDirection + + val filteredMessages = if (searchDirection == TimeRelation.NEXT) { + messages.filter { + Timestamps.compare(it.message.messageId.timestamp, startTimestamp) >= 0 } + } else { + messages.filter { + Timestamps.compare(it.message.messageId.timestamp, startTimestamp) <= 0 + }.reversed() + } + + return filteredMessages.iterator() + } +} \ No newline at end of file diff --git a/suppressions.xml b/suppressions.xml new file mode 100644 index 0000000..70a0bc0 --- /dev/null +++ b/suppressions.xml @@ -0,0 +1,16 @@ + + + + + + + ^pkg:maven/com\.exactpro\.th2/grpc-.*@.*$ + cpe:/a:grpc:grpc + + + + + ^pkg:maven/com\.exactpro\.th2/task-utils@.*$ + cpe:/a:utils_project:utils + + \ No newline at end of file