diff --git a/README.md b/README.md index f312701..9c7220c 100644 --- a/README.md +++ b/README.md @@ -1,37 +1,26 @@ -# th2-conn-dirty-fix (0.0.2) +# th2-conn-dirty-fix (0.0.4) This microservice allows sending and receiving messages via FIX protocol ## Configuration -+ *autoStart* - enables/disable auto-starting of session on box start (`true` by default) -+ *autoStopAfter* - time in seconds after which session will be automatically stopped (`0` by default = disabled) ++ *sessions* - list of session settings + *maxBatchSize* - max size of outgoing message batch (`1000` by default) + *maxFlushTime* - max message batch flush time (`1000` by default) + *publishSentEvents* - enables/disables publish of "message sent" events (`true` by default) + *publishConnectEvents* - enables/disables publish of "connect/disconnect" events (`true` by default) -+ *sessions* - list of session settings ## Session settings + *sessionAlias* - session alias for incoming/outgoing th2 messages -+ *host* - service host -+ *port* - service port -+ *security* - connection security settings -+ *maxMessageRate* - max outgoing message rate for this session (unlimited by default) -+ *autoReconnect* - enables/disables auto-reconnect (`true` by default) -+ *reconnectDelay* - delay between reconnects (`5000` by default) + *handler* - handler settings + *mangler* - mangler settings -### Security settings - -+ *ssl* - enables SSL on connection (`false` by default) -+ *sni* - enables SNI support (`false` by default) -+ *certFile* - path to server certificate (`null` by default) -+ *acceptAllCerts* - accept all server certificates (`false` by default, takes precedence over `certFile`) ## Handler settings ++ *host* - service host ++ *port* - service port ++ *security* - connection security settings + *beginString* - defines the start of a new message and the protocol version + *heartBtInt* - message waiting interval + *senderCompID* - ID of the sender of the message @@ -51,6 +40,18 @@ This microservice allows sending and receiving messages via FIX protocol + *disconnectRequestDelay* - the interval for the shutdown request + *resetSeqNumFlag* - resetting sequence number in initial Logon message (when conn started) + *resetOnLogon* - resetting the sequence number in Logon in other cases (e.g. disconnect) ++ *stateFilePath* - path to file where sequences will be saved to use with next login attempts. It is useful when acceptor does not support sequence reset. (`nullable`) ++ *sessionStartTime* - UTC time when session starts. (`nullable`) ++ *sessionEndTime* - UTC time when session ends. required if startSessionTime is filled. ++ *useNextExpectedSeqNum* - session management based on next expected sequence number. (`false` by default) ++ *saveAdminMessages* - defines if admin messages will be saved to internal outgoing buffer. (`false` by default) + +### Security settings + ++ *ssl* - enables SSL on connection (`false` by default) ++ *sni* - enables SNI support (`false` by default) ++ *certFile* - path to server certificate (`null` by default) ++ *acceptAllCerts* - accept all server certificates (`false` by default, takes precedence over `certFile`) ## Mangler settings @@ -222,9 +223,7 @@ spec: image-version: 0.0.1 type: th2-conn custom-config: - autoStart: true - autoStopAfter: 0 - maxBatchSize: 100 + maxBatchSize: 1000 maxFlushTime: 1000 publishSentEvents: true publishConnectEvents: true @@ -283,18 +282,28 @@ spec: settings: storageOnDemand: false queueLength: 1000 - - name: outgoing_messages + - name: incoming_messages connection-type: mq attributes: - - second - publish + - store - raw - - name: incoming_messages + filters: + - metadata: + - field-name: direction + expected-value: FIRST + operation: EQUAL + - name: outgoing_messages connection-type: mq attributes: - - first - publish + - store - raw + filters: + - metadata: + - field-name: direction + expected-value: SECOND + operation: EQUAL extended-settings: externalBox: enabled: false @@ -311,6 +320,13 @@ spec: # Changelog +## 0.0.4 +* Session management based on NextExpectedSeqNum field. +* Recovery handling + * outgoing messages are now saved + * if message wasn't saved sequence reset message with gap fill mode flag is sent. +* Session start and Session end configuration to handle sequence reset by exchange schedule. + ## 0.0.3 * Added new password option into settings diff --git a/build.gradle b/build.gradle index 140931d..b88c8a4 100644 --- a/build.gradle +++ b/build.gradle @@ -14,7 +14,15 @@ version release_version sourceCompatibility = 11 targetCompatibility = 11 +ext { + sharedDir = file("${project.rootDir}/shared") +} + repositories { + maven { + name 'MavenLocal' + url sharedDir + } mavenCentral() maven { @@ -36,18 +44,20 @@ repositories { } dependencies { -// api platform('com.exactpro.th2:bom:4.0.2') - api platform('com.exactpro.th2:bom:3.1.0') + api platform('com.exactpro.th2:bom:4.1.0') - implementation 'com.exactpro.th2:conn-dirty-tcp-core:1.0.0' - implementation 'com.exactpro.th2:common:3.31.1' -// implementation 'com.exactpro.th2:common:3.42.1-TH2-2212-common-cannot-recover-channel-level-exceptions-3540133251-SNAPSHOT' + implementation 'com.exactpro.th2:common:3.44.0' + implementation 'com.exactpro.th2:netty-bytebuf-utils:0.0.1' + implementation ('com.exactpro.th2:conn-dirty-tcp-core:2.0.5') { + exclude group: 'org.slf4j', module: 'slf4j-log4j12' + because 'Projects should use only slf4j-api, without coupling to a certain implementation' + } - implementation 'org.slf4j:slf4j-api:1.7.33' - implementation 'io.github.microutils:kotlin-logging:2.1.21' + implementation 'org.slf4j:slf4j-api' + implementation 'io.github.microutils:kotlin-logging:2.1.23' - implementation 'io.netty:netty-all:4.1.72.Final' - implementation 'org.jetbrains.kotlin:kotlin-stdlib-jdk8:1.6.0' + implementation 'io.netty:netty-all:4.1.86.Final' + implementation 'org.jetbrains.kotlin:kotlin-stdlib-jdk8:1.6.21' implementation 'com.google.auto.service:auto-service:1.0.1' implementation group: 'com.fasterxml.jackson.core', name: 'jackson-core', version: jackson_version @@ -56,7 +66,7 @@ dependencies { implementation group: 'com.fasterxml.jackson.dataformat', name: 'jackson-dataformat-yaml', version: jackson_version testImplementation 'org.mockito:mockito-all:1.10.19' - testImplementation 'org.jetbrains.kotlin:kotlin-test-junit5:1.6.21' + testImplementation 'org.jetbrains.kotlin:kotlin-test-junit5:1.7.10' annotationProcessor 'com.google.auto.service:auto-service:1.0.1' kapt 'com.google.auto.service:auto-service:1.0.1' diff --git a/gradle.properties b/gradle.properties index 2259ea7..5fbfe13 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,2 +1,2 @@ -release_version=0.0.3 +release_version=0.0.4 jackson_version=2.11.2 \ No newline at end of file diff --git a/src/main/java/com/exactpro/th2/FixHandler.java b/src/main/java/com/exactpro/th2/FixHandler.java index da58093..6838801 100644 --- a/src/main/java/com/exactpro/th2/FixHandler.java +++ b/src/main/java/com/exactpro/th2/FixHandler.java @@ -16,24 +16,33 @@ package com.exactpro.th2; +import com.exactpro.th2.common.grpc.MessageID; +import com.exactpro.th2.common.grpc.RawMessage; import com.exactpro.th2.conn.dirty.fix.FixField; import com.exactpro.th2.conn.dirty.tcp.core.api.IChannel; -import com.exactpro.th2.conn.dirty.tcp.core.api.IContext; -import com.exactpro.th2.conn.dirty.tcp.core.api.IProtocolHandler; -import com.exactpro.th2.conn.dirty.tcp.core.api.IProtocolHandlerSettings; +import com.exactpro.th2.conn.dirty.tcp.core.api.IChannel.SendMode; +import com.exactpro.th2.conn.dirty.tcp.core.api.IHandler; +import com.exactpro.th2.conn.dirty.tcp.core.api.IHandlerContext; import com.exactpro.th2.conn.dirty.tcp.core.util.CommonUtil; -import com.google.auto.service.AutoService; +import com.exactpro.th2.util.Util; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.exception.ExceptionUtils; import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.net.InetSocketAddress; import java.nio.charset.StandardCharsets; import java.nio.file.Paths; import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.ZoneOffset; +import java.time.ZonedDateTime; import java.time.format.DateTimeFormatter; +import java.time.temporal.ChronoUnit; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -53,8 +62,9 @@ import static com.exactpro.th2.conn.dirty.fix.FixByteBufUtilKt.updateChecksum; import static com.exactpro.th2.conn.dirty.fix.FixByteBufUtilKt.updateLength; import static com.exactpro.th2.conn.dirty.fix.KeyFileType.Companion.OperationMode.ENCRYPT_MODE; -import static com.exactpro.th2.conn.dirty.tcp.core.util.ByteBufUtil.indexOf; -import static com.exactpro.th2.conn.dirty.tcp.core.util.ByteBufUtil.isEmpty; +import static com.exactpro.th2.conn.dirty.tcp.core.util.CommonUtil.getEventId; +import static com.exactpro.th2.conn.dirty.tcp.core.util.CommonUtil.toByteBuf; +import static com.exactpro.th2.constants.Constants.ADMIN_MESSAGES; import static com.exactpro.th2.constants.Constants.BEGIN_SEQ_NO; import static com.exactpro.th2.constants.Constants.BEGIN_SEQ_NO_TAG; import static com.exactpro.th2.constants.Constants.BEGIN_STRING_TAG; @@ -67,8 +77,10 @@ import static com.exactpro.th2.constants.Constants.ENCRYPT_METHOD; import static com.exactpro.th2.constants.Constants.END_SEQ_NO; import static com.exactpro.th2.constants.Constants.END_SEQ_NO_TAG; +import static com.exactpro.th2.constants.Constants.GAP_FILL_FLAG; import static com.exactpro.th2.constants.Constants.GAP_FILL_FLAG_TAG; import static com.exactpro.th2.constants.Constants.HEART_BT_INT; +import static com.exactpro.th2.constants.Constants.IS_POSS_DUP; import static com.exactpro.th2.constants.Constants.MSG_SEQ_NUM; import static com.exactpro.th2.constants.Constants.MSG_SEQ_NUM_TAG; import static com.exactpro.th2.constants.Constants.MSG_TYPE; @@ -83,7 +95,10 @@ import static com.exactpro.th2.constants.Constants.NEW_PASSWORD; import static com.exactpro.th2.constants.Constants.NEW_SEQ_NO; import static com.exactpro.th2.constants.Constants.NEW_SEQ_NO_TAG; +import static com.exactpro.th2.constants.Constants.NEXT_EXPECTED_SEQ_NUM; +import static com.exactpro.th2.constants.Constants.NEXT_EXPECTED_SEQ_NUMBER_TAG; import static com.exactpro.th2.constants.Constants.PASSWORD; +import static com.exactpro.th2.constants.Constants.POSS_DUP_TAG; import static com.exactpro.th2.constants.Constants.RESET_SEQ_NUM; import static com.exactpro.th2.constants.Constants.SENDER_COMP_ID; import static com.exactpro.th2.constants.Constants.SENDER_COMP_ID_TAG; @@ -92,12 +107,15 @@ import static com.exactpro.th2.constants.Constants.SENDING_TIME; import static com.exactpro.th2.constants.Constants.SENDING_TIME_TAG; import static com.exactpro.th2.constants.Constants.SESSION_STATUS_TAG; +import static com.exactpro.th2.constants.Constants.SUCCESSFUL_LOGOUT_CODE; import static com.exactpro.th2.constants.Constants.TARGET_COMP_ID; import static com.exactpro.th2.constants.Constants.TARGET_COMP_ID_TAG; import static com.exactpro.th2.constants.Constants.TEST_REQ_ID; import static com.exactpro.th2.constants.Constants.TEST_REQ_ID_TAG; import static com.exactpro.th2.constants.Constants.TEXT_TAG; import static com.exactpro.th2.constants.Constants.USERNAME; +import static com.exactpro.th2.netty.bytebuf.util.ByteBufUtil.indexOf; +import static com.exactpro.th2.netty.bytebuf.util.ByteBufUtil.isEmpty; import static com.exactpro.th2.util.MessageUtil.findByte; import static java.nio.charset.StandardCharsets.US_ASCII; import static java.util.Objects.requireNonNull; @@ -107,47 +125,118 @@ //todo ring buffer as cache //todo add events -@AutoService(IProtocolHandler.class) -public class FixHandler implements AutoCloseable, IProtocolHandler { - +public class FixHandler implements AutoCloseable, IHandler { private static final Logger LOGGER = LoggerFactory.getLogger(FixHandler.class); + + private static final int DAY_SECONDS = 24 * 60 * 60; private static final String SOH = "\001"; private static final byte BYTE_SOH = 1; private static final String STRING_MSG_TYPE = "MsgType"; private static final String REJECT_REASON = "Reject reason"; private static final String STUBBING_VALUE = "XXX"; + private final Log outgoingMessages = new Log(10000); - private final AtomicInteger msgSeqNum = new AtomicInteger(0); - private final AtomicInteger serverMsgSeqNum = new AtomicInteger(0); + private final AtomicInteger msgSeqNum; + private final AtomicInteger serverMsgSeqNum; private final AtomicInteger testReqID = new AtomicInteger(0); + private final AtomicBoolean sessionActive = new AtomicBoolean(true); private final AtomicBoolean enabled = new AtomicBoolean(false); private final AtomicBoolean connStarted = new AtomicBoolean(false); - private final ScheduledExecutorService executorService; - private final IContext context; + private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1); + private final IHandlerContext context; + private final InetSocketAddress address; + private Future heartbeatTimer = CompletableFuture.completedFuture(null); private Future testRequestTimer = CompletableFuture.completedFuture(null); private Future reconnectRequestTimer = CompletableFuture.completedFuture(null); - private Future disconnectRequest = CompletableFuture.completedFuture(null); - private IChannel client; + private volatile IChannel channel; protected FixHandlerSettings settings; private long lastSendTime = System.currentTimeMillis(); - public FixHandler(IContext context) { + public FixHandler(IHandlerContext context) { this.context = context; - executorService = Executors.newScheduledThreadPool(1); this.settings = (FixHandlerSettings) context.getSettings(); - requireNonNull(settings.getBeginString(), "BeginString can not be null"); - requireNonNull(settings.getResetSeqNumFlag(), "ResetSeqNumFlag can not be null"); - requireNonNull(settings.getResetOnLogon(), "ResetOnLogon can not be null"); - if(settings.getHeartBtInt() <= 0) throw new IllegalArgumentException("HeartBtInt cannot be negative or zero"); - if(settings.getTestRequestDelay() <= 0) throw new IllegalArgumentException("TestRequestDelay cannot be negative or zero"); - if(settings.getDisconnectRequestDelay() <= 0) throw new IllegalArgumentException("DisconnectRequestDelay cannot be negative or zero"); - if(settings.getPasswordEncryptKeyFilePath() != null) requireNonNull(settings.getPasswordEncryptKeyFileType(), - "PasswordEncryptKeyFileType can not be null, because the PasswordEncryptKeyFilePath `" + settings.getPasswordEncryptKeyFilePath() + "` isn't null"); + if(settings.getStateFilePath() == null || !settings.getStateFilePath().exists()) { + msgSeqNum = new AtomicInteger(0); + serverMsgSeqNum = new AtomicInteger(0); + } else { + SequenceHolder sequences = Util.readSequences(settings.getStateFilePath()); + msgSeqNum = new AtomicInteger(sequences.getClientSeq()); + serverMsgSeqNum = new AtomicInteger(sequences.getServerSeq()); + } + + LOGGER.info("Initial sequences are: client - {}, server - {}", msgSeqNum.get(), serverMsgSeqNum.get()); + + if(settings.getSessionStartTime() != null) { + Objects.requireNonNull(settings.getSessionEndTime(), "Session end is required when session start is presented"); + LocalTime resetTime = settings.getSessionStartTime(); + ZonedDateTime now = ZonedDateTime.now(ZoneOffset.UTC); + ZonedDateTime scheduleTime = now.with(resetTime); + + if(scheduleTime.isBefore(now)) { + scheduleTime = now.plusDays(1).with(resetTime); + } + long time = now.until(scheduleTime, ChronoUnit.SECONDS); + executorService.scheduleAtFixedRate(this::reset, time, DAY_SECONDS, TimeUnit.SECONDS); + } + + if(settings.getSessionEndTime() != null) { + LocalTime resetTime = settings.getSessionEndTime(); + ZonedDateTime now = ZonedDateTime.now(ZoneOffset.UTC); + ZonedDateTime scheduleTime = now.with(resetTime); + + if(scheduleTime.isBefore(now)) { + scheduleTime = now.plusDays(1).with(resetTime); + } else if(now.isBefore(now.with(settings.getSessionStartTime()))) { + sessionActive.set(false); + } + + long time = now.until(scheduleTime, ChronoUnit.SECONDS); + executorService.scheduleAtFixedRate(() -> { + this.close(); + sessionActive.set(false); + }, time, DAY_SECONDS, TimeUnit.SECONDS); + } + + String host = settings.getHost(); + if (host == null || host.isBlank()) throw new IllegalArgumentException("host cannot be blank"); + int port = settings.getPort(); + if (port < 1 || port > 65535) throw new IllegalArgumentException("port must be in 1..65535 range"); + address = new InetSocketAddress(host, port); + Objects.requireNonNull(settings.getSecurity(), "security cannot be null"); + Objects.requireNonNull(settings.getBeginString(), "BeginString can not be null"); + Objects.requireNonNull(settings.getResetSeqNumFlag(), "ResetSeqNumFlag can not be null"); + Objects.requireNonNull(settings.getResetOnLogon(), "ResetOnLogon can not be null"); + if (settings.getHeartBtInt() <= 0) throw new IllegalArgumentException("HeartBtInt cannot be negative or zero"); + if (settings.getTestRequestDelay() <= 0) throw new IllegalArgumentException("TestRequestDelay cannot be negative or zero"); + if (settings.getDisconnectRequestDelay() <= 0) throw new IllegalArgumentException("DisconnectRequestDelay cannot be negative or zero"); } @Override - public ByteBuf onReceive(ByteBuf buffer) { + public void onStart() { + channel = context.createChannel(address, settings.getSecurity(), Map.of(), true, settings.getReconnectDelay() * 1000L, Integer.MAX_VALUE); + channel.open(); + } + + @NotNull + @Override + public CompletableFuture send(@NotNull RawMessage rawMessage) { + if (!sessionActive.get()) { + throw new IllegalStateException("Session is not active. It is not possible to send messages."); + } + if (!channel.isOpen()) { + try { + channel.open().get(); + } catch (Exception e) { + ExceptionUtils.rethrow(e); + } + } + + return channel.send(toByteBuf(rawMessage.getBody()), rawMessage.getMetadata().getPropertiesMap(), getEventId(rawMessage), SendMode.HANDLE_AND_MANGLE); + } + + @Override + public ByteBuf onReceive(IChannel channel, ByteBuf buffer) { int offset = buffer.readerIndex(); if (offset == buffer.writerIndex()) return null; @@ -184,7 +273,7 @@ public ByteBuf onReceive(ByteBuf buffer) { @NotNull @Override - public Map onIncoming(@NotNull ByteBuf message) { + public Map onIncoming(@NotNull IChannel channel, @NotNull ByteBuf message) { Map metadata = new HashMap<>(); int beginString = indexOf(message, "8=FIX"); @@ -208,13 +297,26 @@ public Map onIncoming(@NotNull ByteBuf message) { return metadata; } - serverMsgSeqNum.incrementAndGet(); + FixField possDup = findField(message, POSS_DUP_TAG); + boolean isDup = false; + if(possDup != null) { + isDup = possDup.getValue().equals(IS_POSS_DUP); + } + int receivedMsgSeqNum = Integer.parseInt(requireNonNull(msgSeqNumValue.getValue())); - if (serverMsgSeqNum.get() < receivedMsgSeqNum) { - if (enabled.get()) { - sendResendRequest(serverMsgSeqNum.get(), receivedMsgSeqNum); - } + if(receivedMsgSeqNum < serverMsgSeqNum.get() && !isDup) { + sendLogout(); + reconnectRequestTimer = executorService.schedule(this::sendLogon, settings.getReconnectDelay(), TimeUnit.SECONDS); + metadata.put(REJECT_REASON, "SeqNum is less than expected."); + if (LOGGER.isErrorEnabled()) LOGGER.error("Invalid message. SeqNum is less than expected {}: {}", serverMsgSeqNum.get(), message.toString(US_ASCII)); + return metadata; + } + + serverMsgSeqNum.incrementAndGet(); + + if (serverMsgSeqNum.get() < receivedMsgSeqNum && !isDup && enabled.get()) { + sendResendRequest(serverMsgSeqNum.get(), receivedMsgSeqNum); } String msgTypeValue = requireNonNull(msgType.getValue()); @@ -227,8 +329,24 @@ public Map onIncoming(@NotNull ByteBuf message) { case MSG_TYPE_LOGON: if (LOGGER.isInfoEnabled()) LOGGER.info("Logon received - {}", message.toString(US_ASCII)); boolean connectionSuccessful = checkLogon(message); - enabled.set(connectionSuccessful); if (connectionSuccessful) { + if(settings.useNextExpectedSeqNum()) { + FixField nextExpectedSeqField = findField(message, NEXT_EXPECTED_SEQ_NUMBER_TAG); + if(nextExpectedSeqField == null) { + metadata.put(REJECT_REASON, "No NextExpectedSeqNum field"); + if (LOGGER.isErrorEnabled()) LOGGER.error("Invalid message. No NextExpectedSeqNum in message: {}", message.toString(US_ASCII)); + return metadata; + } + + int nextExpectedSeqNumber = Integer.parseInt(requireNonNull(nextExpectedSeqField.getValue())); + int seqNum = msgSeqNum.get(); + if(nextExpectedSeqNumber < seqNum) { + recovery(nextExpectedSeqNumber, seqNum); + } + } + + enabled.set(true); + if (!connStarted.get()){ connStarted.set(true); } @@ -240,22 +358,37 @@ public Map onIncoming(@NotNull ByteBuf message) { testRequestTimer = executorService.schedule(this::sendTestRequest, settings.getTestRequestDelay(), TimeUnit.SECONDS); } else { + enabled.set(false); reconnectRequestTimer = executorService.schedule(this::sendLogon, settings.getReconnectDelay(), TimeUnit.SECONDS); } break; case MSG_TYPE_LOGOUT: //extract logout reason if (LOGGER.isInfoEnabled()) LOGGER.info("Logout received - {}", message.toString(US_ASCII)); - FixField text = findField(message, TEXT_TAG); - if (text != null) { - LOGGER.warn("Received Logout has text (58) tag: {}", text.getValue()); - String value = StringUtils.substringBetween(text.getValue(), "expecting ", " but received"); - if (value != null) { - msgSeqNum.getAndSet(Integer.parseInt(value)-2); - serverMsgSeqNum.getAndSet(Integer.parseInt(msgSeqNumValue.getValue())); + FixField sessionStatus = findField(message, SESSION_STATUS_TAG); + + if(sessionStatus != null) { + int statusCode = Integer.parseInt(Objects.requireNonNull(sessionStatus.getValue())); + if(statusCode != SUCCESSFUL_LOGOUT_CODE) { + FixField text = findField(message, TEXT_TAG); + if (text != null) { + LOGGER.warn("Received Logout has text (58) tag: {}", text.getValue()); + String value = StringUtils.substringBetween(text.getValue(), "expecting ", " but received"); + if (value != null) { + msgSeqNum.set(Integer.parseInt(value) - 1); + } else { + msgSeqNum.set(msgSeqNum.get() - 1); + } + } else { + msgSeqNum.set(msgSeqNum.get() - 1); + } + serverMsgSeqNum.set(Integer.parseInt(msgSeqNumValue.getValue()) - 1); } } - if (disconnectRequest != null && !disconnectRequest.isCancelled()) { - disconnectRequest.cancel(false); + if (heartbeatTimer != null) { + heartbeatTimer.cancel(false); + } + if (testRequestTimer != null) { + testRequestTimer.cancel(false); } enabled.set(false); context.send(CommonUtil.toEvent("logout for sender - " + settings.getSenderCompID()));//make more useful @@ -280,16 +413,27 @@ public Map onIncoming(@NotNull ByteBuf message) { } private void resetSequence(ByteBuf message) { - FixField gapFillFlagValue = findField(message, GAP_FILL_FLAG_TAG); + FixField gapFillMode = findField(message, GAP_FILL_FLAG_TAG); FixField seqNumValue = findField(message, NEW_SEQ_NO_TAG); - if (seqNumValue != null && (gapFillFlagValue == null || requireNonNull(gapFillFlagValue.getValue()).equals("N"))) { - serverMsgSeqNum.set(Integer.parseInt(requireNonNull(seqNumValue.getValue()))); - } else if (LOGGER.isTraceEnabled()) { + if(seqNumValue != null) { + if(gapFillMode == null || gapFillMode.equals("N")) { + serverMsgSeqNum.set(Integer.parseInt(requireNonNull(seqNumValue.getValue()))); + } else { + serverMsgSeqNum.set(Integer.parseInt(requireNonNull(seqNumValue.getValue())) - 1); + } + } else { LOGGER.trace("Failed to reset servers MsgSeqNum. No such tag in message: {}", message.toString(US_ASCII)); } } + private void reset() { + msgSeqNum.set(0); + serverMsgSeqNum.set(0); + sessionActive.set(true); + sendLogon(); + } + public void sendResendRequest(int beginSeqNo, int endSeqNo) { //do private lastSendTime = System.currentTimeMillis(); StringBuilder resendRequest = new StringBuilder(); @@ -297,7 +441,7 @@ public void sendResendRequest(int beginSeqNo, int endSeqNo) { //do private resendRequest.append(BEGIN_SEQ_NO).append(beginSeqNo).append(SOH); resendRequest.append(END_SEQ_NO).append(endSeqNo).append(SOH); setChecksumAndBodyLength(resendRequest); - client.send(Unpooled.wrappedBuffer(resendRequest.toString().getBytes(StandardCharsets.UTF_8)), Collections.emptyMap(), IChannel.SendMode.MANGLE); + channel.send(Unpooled.wrappedBuffer(resendRequest.toString().getBytes(StandardCharsets.UTF_8)), Collections.emptyMap(), null, IChannel.SendMode.MANGLE); } void sendResendRequest(int beginSeqNo) { //do private @@ -309,16 +453,13 @@ void sendResendRequest(int beginSeqNo) { //do private setChecksumAndBodyLength(resendRequest); if (enabled.get()) { - client.send(Unpooled.wrappedBuffer(resendRequest.toString().getBytes(StandardCharsets.UTF_8)), Collections.emptyMap(), IChannel.SendMode.MANGLE); + channel.send(Unpooled.wrappedBuffer(resendRequest.toString().getBytes(StandardCharsets.UTF_8)), Collections.emptyMap(), null, IChannel.SendMode.MANGLE); } else { sendLogon(); } } private void handleResendRequest(ByteBuf message) { - if (disconnectRequest != null && !disconnectRequest.isCancelled()) { - disconnectRequest.cancel(false); - } FixField strBeginSeqNo = findField(message, BEGIN_SEQ_NO_TAG); FixField strEndSeqNo = findField(message, END_SEQ_NO_TAG); @@ -329,28 +470,36 @@ private void handleResendRequest(ByteBuf message) { try { // FIXME: there is not syn on the outgoing sequence. Should make operations with seq more careful - if (endSeqNo == 0) { - endSeqNo = msgSeqNum.get(); - } - LOGGER.info("Returning messages from {} to {}", beginSeqNo, endSeqNo); - for (int i = beginSeqNo; i <= endSeqNo; i++) { - ByteBuf storedMsg = outgoingMessages.get(i); - if (storedMsg == null) { - StringBuilder heartbeat = new StringBuilder(); - setHeader(heartbeat, MSG_TYPE_HEARTBEAT, i); - setChecksumAndBodyLength(heartbeat); - client.send(Unpooled.wrappedBuffer(heartbeat.toString().getBytes(StandardCharsets.UTF_8)), Collections.emptyMap(), IChannel.SendMode.MANGLE); - } else { - if (LOGGER.isInfoEnabled()) LOGGER.info("Resending message: {}", storedMsg.toString(US_ASCII)); - client.send(storedMsg, Collections.emptyMap(), IChannel.SendMode.MANGLE); - } - } + recovery(beginSeqNo, endSeqNo); } catch (Exception e) { sendSequenceReset(); } } } + private void recovery(int beginSeqNo, int endSeqNo) { + if (endSeqNo == 0) { + endSeqNo = msgSeqNum.get(); + } + LOGGER.info("Returning messages from {} to {}", beginSeqNo, endSeqNo); + for (int i = beginSeqNo; i <= endSeqNo; i++) { + ByteBuf storedMsg = outgoingMessages.get(i); + if (storedMsg == null) { + StringBuilder sequenceReset = new StringBuilder(); + setHeader(sequenceReset, MSG_TYPE_SEQUENCE_RESET, i); + sequenceReset.append(GAP_FILL_FLAG).append("Y"); + sequenceReset.append(NEW_SEQ_NO).append(i + 1); + setChecksumAndBodyLength(sequenceReset); + channel.send(Unpooled.wrappedBuffer(sequenceReset.toString().getBytes(StandardCharsets.UTF_8)), Collections.emptyMap(), null, SendMode.MANGLE); + } else { + if (LOGGER.isInfoEnabled()) LOGGER.info("Resending message: {}", storedMsg.toString(US_ASCII)); + FixField sendingTime = findField(storedMsg, SENDING_TIME_TAG); + sendingTime.insertNext(POSS_DUP_TAG, IS_POSS_DUP); + channel.send(storedMsg, Collections.emptyMap(), null, SendMode.MANGLE); + } + } + } + private void sendSequenceReset() { lastSendTime = System.currentTimeMillis(); StringBuilder sequenceReset = new StringBuilder(); @@ -359,7 +508,7 @@ private void sendSequenceReset() { setChecksumAndBodyLength(sequenceReset); if (enabled.get()) { - client.send(Unpooled.wrappedBuffer(sequenceReset.toString().getBytes(StandardCharsets.UTF_8)), Collections.emptyMap(), IChannel.SendMode.MANGLE); + channel.send(Unpooled.wrappedBuffer(sequenceReset.toString().getBytes(StandardCharsets.UTF_8)), Collections.emptyMap(), null, IChannel.SendMode.MANGLE); } else { sendLogon(); } @@ -391,7 +540,7 @@ private boolean checkLogon(ByteBuf message) { } @Override - public void onOutgoing(@NotNull ByteBuf message, @NotNull Map metadata) { + public void onOutgoing(@NotNull IChannel channel, @NotNull ByteBuf message, @NotNull Map metadata) { lastSendTime = System.currentTimeMillis(); onOutgoingUpdateTag(message, metadata); @@ -447,6 +596,8 @@ public void onOutgoingUpdateTag(@NotNull ByteBuf message, @NotNull Map enabled.set(false), settings.getDisconnectRequestDelay(), TimeUnit.SECONDS); + sendLogout(); } private void setHeader(StringBuilder stringBuilder, String msgType, Integer seqNum) { @@ -668,7 +843,7 @@ public int getBodyLength(ByteBuf message) { } public String getTime() { - String FIX_DATE_TIME_FORMAT_MS = "yyyyMMdd-HH:mm:ss.SSS"; + String FIX_DATE_TIME_FORMAT_MS = "yyyyMMdd-HH:mm:ss.SSSSSSSSS"; LocalDateTime datetime = LocalDateTime.now(); return DateTimeFormatter.ofPattern(FIX_DATE_TIME_FORMAT_MS).format(datetime); } diff --git a/src/main/java/com/exactpro/th2/FixHandlerContext.java b/src/main/java/com/exactpro/th2/FixHandlerContext.java deleted file mode 100644 index 639cbd8..0000000 --- a/src/main/java/com/exactpro/th2/FixHandlerContext.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Copyright 2022-2022 Exactpro (Exactpro Systems Limited) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.exactpro.th2; - -import com.exactpro.th2.common.event.Event; -import com.exactpro.th2.common.schema.dictionary.DictionaryType; -import com.exactpro.th2.conn.dirty.tcp.core.api.IChannel; -import com.exactpro.th2.conn.dirty.tcp.core.api.IContext; -import com.exactpro.th2.conn.dirty.tcp.core.api.IProtocolHandlerSettings; -import com.google.auto.service.AutoService; -import org.jetbrains.annotations.NotNull; - -import java.io.InputStream; - -@AutoService(IContext.class) -public class FixHandlerContext implements IContext { - - @NotNull - @Override - public IChannel getChannel() { - return null; - } - - @Override - public IProtocolHandlerSettings getSettings() { - return new FixHandlerSettings(); - } - - @NotNull - @Override - public InputStream get(@NotNull DictionaryType dictionaryType) { - return null; - } - - @Override - public void send(@NotNull Event event) { - } -} diff --git a/src/main/java/com/exactpro/th2/FixHandlerFactory.java b/src/main/java/com/exactpro/th2/FixHandlerFactory.java index c870314..a11165f 100644 --- a/src/main/java/com/exactpro/th2/FixHandlerFactory.java +++ b/src/main/java/com/exactpro/th2/FixHandlerFactory.java @@ -16,19 +16,18 @@ package com.exactpro.th2; -import com.exactpro.th2.conn.dirty.tcp.core.api.IContext; -import com.exactpro.th2.conn.dirty.tcp.core.api.IProtocolHandler; -import com.exactpro.th2.conn.dirty.tcp.core.api.IProtocolHandlerFactory; -import com.exactpro.th2.conn.dirty.tcp.core.api.IProtocolHandlerSettings; +import com.exactpro.th2.conn.dirty.tcp.core.api.IHandler; +import com.exactpro.th2.conn.dirty.tcp.core.api.IHandlerContext; +import com.exactpro.th2.conn.dirty.tcp.core.api.IHandlerFactory; +import com.exactpro.th2.conn.dirty.tcp.core.api.IHandlerSettings; import com.google.auto.service.AutoService; import org.jetbrains.annotations.NotNull; -@AutoService(IProtocolHandlerFactory.class) -public class FixHandlerFactory implements IProtocolHandlerFactory { - +@AutoService(IHandlerFactory.class) +public class FixHandlerFactory implements IHandlerFactory { @NotNull @Override - public Class getSettings() { + public Class getSettings() { return FixHandlerSettings.class; } @@ -38,9 +37,10 @@ public String getName() { return FixHandlerFactory.class.getSimpleName(); } + @NotNull @Override - public IProtocolHandler create(@NotNull IContext iContext) { - return new FixHandler(iContext); + public IHandler create(@NotNull IHandlerContext context) { + return new FixHandler(context); } } diff --git a/src/main/java/com/exactpro/th2/FixHandlerSettings.java b/src/main/java/com/exactpro/th2/FixHandlerSettings.java index 7f0ee8f..673065f 100644 --- a/src/main/java/com/exactpro/th2/FixHandlerSettings.java +++ b/src/main/java/com/exactpro/th2/FixHandlerSettings.java @@ -1,5 +1,5 @@ /* - * Copyright 2022-2022 Exactpro (Exactpro Systems Limited) + * Copyright 2022-2023 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,12 +17,18 @@ package com.exactpro.th2; import com.exactpro.th2.conn.dirty.fix.KeyFileType; -import com.exactpro.th2.conn.dirty.tcp.core.api.IProtocolHandlerSettings; -import com.google.auto.service.AutoService; - -@AutoService(IProtocolHandlerSettings.class) -public class FixHandlerSettings implements IProtocolHandlerSettings { - +import com.exactpro.th2.conn.dirty.tcp.core.api.IChannel.Security; +import com.exactpro.th2.conn.dirty.tcp.core.api.IHandlerSettings; +import com.exactpro.th2.util.LocalTimeDeserializer; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; + +import java.io.File; +import java.time.LocalTime; + +public class FixHandlerSettings implements IHandlerSettings { + private String host = null; + private int port = 0; + private Security security = new Security(); private String beginString = "FIXT.1.1"; private long heartBtInt = 30; private String senderCompID; @@ -35,6 +41,8 @@ public class FixHandlerSettings implements IProtocolHandlerSettings { private String newPassword; private String passwordEncryptKeyFilePath; private KeyFileType passwordEncryptKeyFileType = KeyFileType.PEM_PUBLIC_KEY; + + private File stateFilePath; /** * Value from Java Security Standard Algorithm Names */ @@ -45,10 +53,43 @@ public class FixHandlerSettings implements IProtocolHandlerSettings { private String passwordEncryptAlgorithm = "RSA"; private Boolean resetSeqNumFlag = false; private Boolean resetOnLogon = false; + private Boolean useNextExpectedSeqNum = false; + private Boolean saveAdminMessages = false; + + @JsonDeserialize(using = LocalTimeDeserializer.class) + private LocalTime sessionStartTime; + + @JsonDeserialize(using = LocalTimeDeserializer.class) + private LocalTime sessionEndTime; + private int testRequestDelay = 60; private int reconnectDelay = 5; private int disconnectRequestDelay = 5; + public String getHost() { + return host; + } + + public void setHost(String host) { + this.host = host; + } + + public int getPort() { + return port; + } + + public void setPort(int port) { + this.port = port; + } + + public Security getSecurity() { + return security; + } + + public void setSecurity(Security security) { + this.security = security; + } + public String getBeginString() { return beginString; } @@ -173,6 +214,46 @@ public void setPasswordEncryptAlgorithm(String passwordEncryptAlgorithm) { this.passwordEncryptAlgorithm = passwordEncryptAlgorithm; } + public File getStateFilePath() { + return stateFilePath; + } + + public void setStateFilePath(File stateFilePath) { + this.stateFilePath = stateFilePath; + } + + public Boolean useNextExpectedSeqNum() { + return useNextExpectedSeqNum; + } + + public void setUseNextExpectedSeqNum(Boolean useNextExpectedSeqNum) { + this.useNextExpectedSeqNum = useNextExpectedSeqNum; + } + + public Boolean isSaveAdminMessages() { + return saveAdminMessages; + } + + public void setSaveAdminMessages(Boolean saveAdminMessages) { + this.saveAdminMessages = saveAdminMessages; + } + + public LocalTime getSessionStartTime() { + return sessionStartTime; + } + + public void setSessionStartTime(LocalTime sessionStartTime) { + this.sessionStartTime = sessionStartTime; + } + + public LocalTime getSessionEndTime() { + return sessionEndTime; + } + + public void setSessionEndTime(LocalTime sessionEndTime) { + this.sessionEndTime = sessionEndTime; + } + public void setResetSeqNumFlag(Boolean resetSeqNumFlag) { this.resetSeqNumFlag = resetSeqNumFlag; } public void setResetOnLogon(Boolean resetOnLogon) { this.resetOnLogon = resetOnLogon; } diff --git a/src/main/java/com/exactpro/th2/SequenceHolder.java b/src/main/java/com/exactpro/th2/SequenceHolder.java new file mode 100644 index 0000000..6fbfc05 --- /dev/null +++ b/src/main/java/com/exactpro/th2/SequenceHolder.java @@ -0,0 +1,34 @@ +/* + * Copyright 2023 Exactpro (Exactpro Systems Limited) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.exactpro.th2; + +public class SequenceHolder { + private final int clientSeq; + private final int serverSeq; + + public SequenceHolder(int clientSeq, int serverSeq) { + this.clientSeq = clientSeq; + this.serverSeq = serverSeq; + } + + public int getClientSeq() { + return clientSeq; + } + + public int getServerSeq() { + return serverSeq; + } +} diff --git a/src/main/java/com/exactpro/th2/constants/Constants.java b/src/main/java/com/exactpro/th2/constants/Constants.java index 8d6e0b4..f953b8f 100644 --- a/src/main/java/com/exactpro/th2/constants/Constants.java +++ b/src/main/java/com/exactpro/th2/constants/Constants.java @@ -16,6 +16,10 @@ package com.exactpro.th2.constants; + +import java.util.Collections; +import java.util.Set; + public class Constants { public static final String SOH = "\001"; @@ -46,6 +50,8 @@ public class Constants { public static final Integer GAP_FILL_FLAG_TAG = 123; public static final Integer TEXT_TAG = 58; public static final Integer RESET_SEQ_NUM_TAG = 141; + public static final Integer NEXT_EXPECTED_SEQ_NUMBER_TAG = 789; + public static final Integer POSS_DUP_TAG = 43; //Fields public static final String BEGIN_STRING = SOH + BEGIN_STRING_TAG + "="; @@ -71,6 +77,8 @@ public class Constants { public static final String DEFAULT_APPL_VER_ID = SOH + DEFAULT_APPL_VER_ID_TAG + "="; public static final String SENDER_SUB_ID = SOH + SENDER_SUB_ID_TAG + "="; public static final String RESET_SEQ_NUM = SOH + RESET_SEQ_NUM_TAG + "="; + public static final String NEXT_EXPECTED_SEQ_NUM = SOH + NEXT_EXPECTED_SEQ_NUMBER_TAG + "="; + public static final String POSS_DUP = SOH + NEXT_EXPECTED_SEQ_NUMBER_TAG + "="; //message types public static final String MSG_TYPE_LOGON = "A"; @@ -80,4 +88,14 @@ public class Constants { public static final String MSG_TYPE_RESEND_REQUEST = "2"; public static final String MSG_TYPE_SEQUENCE_RESET = "4"; + public static final Set ADMIN_MESSAGES = Collections.unmodifiableSet( + Set.of( + MSG_TYPE_LOGON, MSG_TYPE_LOGOUT, + MSG_TYPE_HEARTBEAT, MSG_TYPE_RESEND_REQUEST, + MSG_TYPE_SEQUENCE_RESET, MSG_TYPE_TEST_REQUEST + ) + ); + + public static final String IS_POSS_DUP = "Y"; + public static final int SUCCESSFUL_LOGOUT_CODE = 4; } diff --git a/src/main/java/com/exactpro/th2/util/LocalTimeDeserializer.java b/src/main/java/com/exactpro/th2/util/LocalTimeDeserializer.java new file mode 100644 index 0000000..8cdb2d5 --- /dev/null +++ b/src/main/java/com/exactpro/th2/util/LocalTimeDeserializer.java @@ -0,0 +1,36 @@ +/* + * Copyright 2023 Exactpro (Exactpro Systems Limited) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.exactpro.th2.util; + +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.deser.std.StdDeserializer; + +import java.io.IOException; +import java.time.LocalTime; +import java.time.format.DateTimeFormatter; + +public class LocalTimeDeserializer extends StdDeserializer { + + public LocalTimeDeserializer() { + super(LocalTime.class); + } + + @Override + public LocalTime deserialize(JsonParser parser, DeserializationContext context) throws IOException { + return LocalTime.parse(parser.getValueAsString(), DateTimeFormatter.ISO_TIME); + } +} diff --git a/src/main/java/com/exactpro/th2/util/Util.java b/src/main/java/com/exactpro/th2/util/Util.java new file mode 100644 index 0000000..59458fb --- /dev/null +++ b/src/main/java/com/exactpro/th2/util/Util.java @@ -0,0 +1,49 @@ +/* + * Copyright 2023 Exactpro (Exactpro Systems Limited) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.exactpro.th2.util; + +import com.exactpro.th2.SequenceHolder; + +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileReader; +import java.io.FileWriter; +import java.io.IOException; + +public class Util { + public static SequenceHolder readSequences(File file) { + try (BufferedReader br = new BufferedReader(new FileReader(file))) { + int firstLine = Integer.parseInt(br.readLine()); + int secondLine = Integer.parseInt(br.readLine()); + return new SequenceHolder(firstLine, secondLine); + } catch (IOException e) { + throw new IllegalStateException("Error while reading sequence file " + file, e); + } + } + + public static void writeSequences(int msgSeqNum, int serverSeqNum, File file) throws IOException { + if(!file.getParentFile().exists()) { + file.getParentFile().mkdirs(); + } + file.createNewFile(); + try (BufferedWriter bw = new BufferedWriter(new FileWriter(file))) { + bw.write(String.valueOf(msgSeqNum)); + bw.newLine(); + bw.write(String.valueOf(serverSeqNum)); + } + } +} diff --git a/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/FixByteBufUtil.kt b/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/FixByteBufUtil.kt index 291939f..7b6548c 100644 --- a/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/FixByteBufUtil.kt +++ b/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/FixByteBufUtil.kt @@ -16,15 +16,15 @@ package com.exactpro.th2.conn.dirty.fix -import com.exactpro.th2.conn.dirty.tcp.core.util.EMPTY_STRING -import com.exactpro.th2.conn.dirty.tcp.core.util.endsWith -import com.exactpro.th2.conn.dirty.tcp.core.util.get -import com.exactpro.th2.conn.dirty.tcp.core.util.indexOf -import com.exactpro.th2.conn.dirty.tcp.core.util.insert -import com.exactpro.th2.conn.dirty.tcp.core.util.lastIndexOf -import com.exactpro.th2.conn.dirty.tcp.core.util.replace -import com.exactpro.th2.conn.dirty.tcp.core.util.requireReadable -import com.exactpro.th2.conn.dirty.tcp.core.util.subsequence +import com.exactpro.th2.netty.bytebuf.util.EMPTY_STRING +import com.exactpro.th2.netty.bytebuf.util.endsWith +import com.exactpro.th2.netty.bytebuf.util.get +import com.exactpro.th2.netty.bytebuf.util.indexOf +import com.exactpro.th2.netty.bytebuf.util.insert +import com.exactpro.th2.netty.bytebuf.util.lastIndexOf +import com.exactpro.th2.netty.bytebuf.util.replace +import com.exactpro.th2.netty.bytebuf.util.requireReadable +import com.exactpro.th2.netty.bytebuf.util.subsequence import io.netty.buffer.ByteBuf import java.nio.charset.Charset import kotlin.text.Charsets.UTF_8 diff --git a/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/FixProtocolMangler.kt b/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/FixProtocolMangler.kt index e734fd9..e491866 100644 --- a/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/FixProtocolMangler.kt +++ b/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/FixProtocolMangler.kt @@ -21,10 +21,11 @@ import com.exactpro.th2.common.event.Event.Status.PASSED import com.exactpro.th2.common.event.EventUtils.createMessageBean import com.exactpro.th2.common.event.bean.IRow import com.exactpro.th2.common.event.bean.builder.TableBuilder -import com.exactpro.th2.conn.dirty.tcp.core.api.IContext -import com.exactpro.th2.conn.dirty.tcp.core.api.IProtocolMangler -import com.exactpro.th2.conn.dirty.tcp.core.api.IProtocolManglerFactory -import com.exactpro.th2.conn.dirty.tcp.core.api.IProtocolManglerSettings +import com.exactpro.th2.conn.dirty.tcp.core.api.IChannel +import com.exactpro.th2.conn.dirty.tcp.core.api.IMangler +import com.exactpro.th2.conn.dirty.tcp.core.api.IManglerContext +import com.exactpro.th2.conn.dirty.tcp.core.api.IManglerFactory +import com.exactpro.th2.conn.dirty.tcp.core.api.IManglerSettings import com.fasterxml.jackson.dataformat.yaml.YAMLMapper import com.fasterxml.jackson.module.kotlin.KotlinModule import com.fasterxml.jackson.module.kotlin.readValue @@ -42,10 +43,10 @@ private val MAPPER = YAMLMapper.builder() private const val RULE_NAME_PROPERTY = "rule-name" private const val RULE_ACTIONS_PROPERTY = "rule-actions" -class FixProtocolMangler(context: IContext) : IProtocolMangler { +class FixProtocolMangler(context: IManglerContext) : IMangler { private val rules = (context.settings as FixProtocolManglerSettings).rules - override fun onOutgoing(message: ByteBuf, metadata: MutableMap): Event? { + override fun onOutgoing(channel: IChannel, message: ByteBuf, metadata: MutableMap): Event? { LOGGER.trace { "Processing message: ${message.toString(Charsets.UTF_8)}" } val (rule, unconditionally) = getRule(message, metadata) ?: return null @@ -102,14 +103,14 @@ class FixProtocolMangler(context: IContext) : IProtoco } } -@AutoService(IProtocolManglerFactory::class) -class FixProtocolManglerFactory : IProtocolManglerFactory { +@AutoService(IManglerFactory::class) +class FixProtocolManglerFactory : IManglerFactory { override val name = "demo-fix-mangler" override val settings = FixProtocolManglerSettings::class.java - override fun create(context: IContext) = FixProtocolMangler(context) + override fun create(context: IManglerContext) = FixProtocolMangler(context) } -class FixProtocolManglerSettings(val rules: List = emptyList()) : IProtocolManglerSettings +class FixProtocolManglerSettings(val rules: List = emptyList()) : IManglerSettings private data class ActionRow( val corruptionType: String, diff --git a/src/test/java/com/exactpro/th2/FixHandlerTest.java b/src/test/java/com/exactpro/th2/FixHandlerTest.java index 0933d07..ece4a57 100644 --- a/src/test/java/com/exactpro/th2/FixHandlerTest.java +++ b/src/test/java/com/exactpro/th2/FixHandlerTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2022-2022 Exactpro (Exactpro Systems Limited) + * Copyright 2022-2023 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,18 +16,19 @@ package com.exactpro.th2; +import com.exactpro.th2.common.grpc.EventID; import com.exactpro.th2.common.grpc.MessageID; import com.exactpro.th2.conn.dirty.tcp.core.api.IChannel; -import com.exactpro.th2.conn.dirty.tcp.core.api.IContext; -import com.exactpro.th2.conn.dirty.tcp.core.api.IProtocolHandlerSettings; -import com.exactpro.th2.conn.dirty.tcp.core.api.impl.Channel.Security; +import com.exactpro.th2.conn.dirty.tcp.core.api.IHandlerContext; import com.exactpro.th2.util.MessageUtil; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; +import kotlin.Unit; import org.jetbrains.annotations.NotNull; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.mockito.Mockito; @@ -41,7 +42,6 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Future; import static com.exactpro.th2.constants.Constants.BEGIN_STRING_TAG; import static com.exactpro.th2.constants.Constants.BODY_LENGTH_TAG; @@ -55,15 +55,17 @@ class FixHandlerTest { - private static final Client client = new Client(); + private static final Channel channel = new Channel(); private static ByteBuf buffer; private static ByteBuf oneMessageBuffer; private static ByteBuf brokenBuffer; - private static FixHandler fixHandler = client.getFixHandler(); + private static FixHandler fixHandler = channel.getFixHandler(); @BeforeAll static void init() { - fixHandler.onOpen(); + fixHandler.onOpen(channel); + ByteBuf logonResponse = Unpooled.wrappedBuffer("8=FIXT.1.1\0019=105\00135=A\00134=1\00149=server\00156=client\00150=system\00152=2014-12-22T10:15:30Z\00198=0\001108=30\0011137=9\0011409=0\00110=203\001".getBytes(StandardCharsets.US_ASCII)); + fixHandler.onIncoming(channel, logonResponse); oneMessageBuffer = Unpooled.wrappedBuffer("8=FIXT.1.1\0019=13\00135=AE\001552=1\00110=169\001".getBytes(StandardCharsets.US_ASCII)); buffer = Unpooled.wrappedBuffer(("8=FIXT.1.1\0019=13\00135=AE\001552=1\00110=169\0018=FIXT.1.1\0019=13\00135=NN" + "\001552=2\00110=100\0018=FIXT.1.1\0019=13\00135=NN\001552=2\00110=100\001").getBytes(StandardCharsets.US_ASCII)); @@ -85,18 +87,18 @@ void test3188(){ String body1 = "8=F"; String body2 = "IXT.1.1\0019=13\00135=AE\001552=1\00158=11111\00110=169\001"; ByteBuf byteBuf1 = Unpooled.buffer().writeBytes(body1.getBytes(StandardCharsets.UTF_8)); - fixHandler.onReceive(byteBuf1); + fixHandler.onReceive(channel, byteBuf1); assertEquals("8=F", byteBuf1.toString(StandardCharsets.US_ASCII)); byteBuf1.writeBytes(body2.getBytes(StandardCharsets.UTF_8)); - fixHandler.onReceive(byteBuf1); + fixHandler.onReceive(channel, byteBuf1); assertEquals("", byteBuf1.toString(StandardCharsets.US_ASCII)); } @Test void onDataBrokenMessageTest() { - ByteBuf result0 = fixHandler.onReceive(brokenBuffer); - ByteBuf result1 = fixHandler.onReceive(brokenBuffer); - ByteBuf result2 = fixHandler.onReceive(brokenBuffer); + ByteBuf result0 = fixHandler.onReceive(channel, brokenBuffer); + ByteBuf result1 = fixHandler.onReceive(channel, brokenBuffer); + ByteBuf result2 = fixHandler.onReceive(channel, brokenBuffer); String expected0 = "A"; assertNotNull(result0); @@ -111,9 +113,9 @@ void onReceiveCorrectMessagesTest() { buffer = Unpooled.wrappedBuffer(("8=FIXT.1.1\0019=13\00135=AE\001552=1\00158=11111\00110=169\0018=FIXT.1.1\0019=13\00135=NN" + "\001552=2\00110=100\0018=FIXT.1.1\0019=13\00135=NN\001552=2\00110=100\001").getBytes(StandardCharsets.US_ASCII)); - ByteBuf result0 = fixHandler.onReceive(buffer); - ByteBuf result1 = fixHandler.onReceive(buffer); - ByteBuf result2 = fixHandler.onReceive(buffer); + ByteBuf result0 = fixHandler.onReceive(channel, buffer); + ByteBuf result1 = fixHandler.onReceive(channel, buffer); + ByteBuf result2 = fixHandler.onReceive(channel, buffer); String expected1 = "8=FIXT.1.1\0019=13\00135=AE\001552=1\00158=11111\00110=169\001"; String expected2 = "8=FIXT.1.1\0019=13\00135=NN\001552=2\00110=100\001"; @@ -133,29 +135,34 @@ void onReceiveCorrectMessagesTest() { void sendResendRequestTest() { String expectedLogon = "8=FIXT.1.1\u00019=105\u000135=A\u000134=2\u000149=client\u000156=server\u0001" + "50=trader\u000152=2014-12-22T10:15:30Z\u000198=0\u0001108=30\u00011137=9\u0001553=username\u0001554=pass\u000110=204\u0001"; // #1 sent logon + ByteBuf logonResponse = Unpooled.wrappedBuffer("8=FIXT.1.1\0019=105\00135=A\00134=2\00149=server\00156=client\00150=system\00152=2014-12-22T10:15:30Z\00198=0\001108=30\0011137=9\0011409=0\00110=203\001".getBytes(StandardCharsets.US_ASCII)); + // #2 sent resendRequest String expectedResendRequest = "8=FIXT.1.1\u00019=73\u000135=2\u000134=3\u000149=client\u000156=server" + // #2 sent resendRequest "\u000150=trader\u000152=2014-12-22T10:15:30Z\u00017=1\u000116=0\u000110=227\u0001"; - client.clearQueue(); + channel.clearQueue(); fixHandler.sendLogon(); + fixHandler.onIncoming(channel, logonResponse); fixHandler.sendResendRequest(1); - assertEquals(expectedLogon, new String(client.getQueue().get(0).array())); + assertEquals(expectedLogon, new String(channel.getQueue().get(0).array())); //assertEquals(expectedHeartbeat, new String(client.getQueue().get(1).array())); - assertEquals(expectedResendRequest, new String(client.getQueue().get(1).array())); + assertEquals(expectedResendRequest, new String(channel.getQueue().get(1).array())); } @Test void onConnectionTest() { - client.clearQueue(); - fixHandler.onOpen(); + channel.clearQueue(); + fixHandler.onOpen(channel); + ByteBuf logonResponse = Unpooled.wrappedBuffer("8=FIXT.1.1\0019=105\00135=A\00134=1\00149=server\00156=client\00150=system\00152=2014-12-22T10:15:30Z\00198=0\001108=30\0011137=9\0011409=0\00110=203\001".getBytes(StandardCharsets.US_ASCII)); + fixHandler.onIncoming(channel, logonResponse); try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } assertEquals("8=FIXT.1.1\u00019=105\u000135=A\u000134=7\u000149=client\u000156=server\u000150=trader\u000152=2014-12-22T10:15:30Z\u000198=0\u0001108=30\u00011137=9\u0001553=username\u0001554=pass\u000110=209\u0001", - new String(client.getQueue().get(0).array())); + new String(channel.getQueue().get(0).array())); } @Test @@ -178,15 +185,15 @@ void onOutgoingMessageTest() { expected4.put("MsgType", "A"); Map actual = new HashMap<>(expected); - fixHandler.onOutgoing(bufferForPrepareMessage1, actual); + fixHandler.onOutgoing(channel, bufferForPrepareMessage1, actual); assertEquals(expected, actual); Map actual2 = new HashMap<>(); - fixHandler.onOutgoing(bufferForPrepareMessage2, actual2); + fixHandler.onOutgoing(channel, bufferForPrepareMessage2, actual2); assertEquals(expected2, actual2); - fixHandler.onOutgoing(bufferForPrepareMessage3, expected3); - fixHandler.onOutgoing(bufferForPrepareMessage4, expected4); + fixHandler.onOutgoing(channel, bufferForPrepareMessage3, expected3); + fixHandler.onOutgoing(channel, bufferForPrepareMessage4, expected4); bufferForPrepareMessage1.readerIndex(0); bufferForPrepareMessage2.readerIndex(0); @@ -232,9 +239,9 @@ void getByteByfBodyLengthTest() { @Test void sendTestRequestTest() { String expected = "8=FIXT.1.1\u00019=70\u000135=1\u000134=1\u000149=client\u000156=server\u000150=trader\u000152=2014-12-22T10:15:30Z\u0001112=1\u000110=101\u0001"; - client.clearQueue(); + channel.clearQueue(); fixHandler.sendTestRequest(); - assertEquals(expected, new String(client.getQueue().get(0).array())); + assertEquals(expected, new String(channel.getQueue().get(0).array())); } @Test @@ -318,13 +325,15 @@ void updateTagTest() { } -class Client implements IChannel { +class Channel implements IChannel { private final FixHandlerSettings fixHandlerSettings; private final MyFixHandler fixHandler; private final List queue = new ArrayList<>(); - Client() { + Channel() { this.fixHandlerSettings = new FixHandlerSettings(); + fixHandlerSettings.setHost("127.0.0.1"); + fixHandlerSettings.setPort(8080); fixHandlerSettings.setBeginString("FIXT.1.1"); fixHandlerSettings.setHeartBtInt(30); fixHandlerSettings.setSenderCompID("client"); @@ -339,38 +348,32 @@ class Client implements IChannel { fixHandlerSettings.setResetOnLogon(false); fixHandlerSettings.setDefaultApplVerID("9"); fixHandlerSettings.setSenderSubID("trader"); - IContext context = Mockito.mock(IContext.class); + IHandlerContext context = Mockito.mock(IHandlerContext.class); Mockito.when(context.getSettings()).thenReturn(fixHandlerSettings); - Mockito.when(context.getChannel()).thenReturn(this); this.fixHandler = new MyFixHandler(context); } @Override - public void open() { - - } - - @Override - public void open(InetSocketAddress address, Security security) { - + public CompletableFuture open() { + return CompletableFuture.completedFuture(Unit.INSTANCE); } @NotNull @Override - public Future send(@NotNull ByteBuf byteBuf, @NotNull Map map, @NotNull IChannel.SendMode sendMode) { + public CompletableFuture send(@NotNull ByteBuf byteBuf, @NotNull Map map, EventID eventId, @NotNull IChannel.SendMode sendMode) { queue.add(byteBuf); return CompletableFuture.completedFuture(MessageID.getDefaultInstance()); } @Override public boolean isOpen() { - return false; + return true; } @Override - public void close() { - + public CompletableFuture close() { + return CompletableFuture.completedFuture(Unit.INSTANCE); } public FixHandlerSettings getFixHandlerSettings() { @@ -399,11 +402,29 @@ public InetSocketAddress getAddress() { public Security getSecurity() { return new Security(); } + + @NotNull + @Override + public Map getAttributes() { + return Map.of(); + } + + @NotNull + @Override + public String getSessionAlias() { + return "alias"; + } + + @NotNull + @Override + public String getSessionGroup() { + return "group"; + } } class MyFixHandler extends FixHandler { - public MyFixHandler(IContext context) { + public MyFixHandler(IHandlerContext context) { super(context); } diff --git a/src/test/kotlin/com/exactpro/th2/conn/dirty/fix/TestMessageTransformer.kt b/src/test/kotlin/com/exactpro/th2/conn/dirty/fix/TestMessageTransformer.kt index f761318..8732e90 100644 --- a/src/test/kotlin/com/exactpro/th2/conn/dirty/fix/TestMessageTransformer.kt +++ b/src/test/kotlin/com/exactpro/th2/conn/dirty/fix/TestMessageTransformer.kt @@ -101,7 +101,6 @@ class TestMessageTransformer { private infix fun Int.eq(value: String) = field(this, value) private infix fun Int.to(value: String) = field(this, value) private infix fun Int.oneOf(value: List) = FieldDefinition(this, null, null, value) - private infix fun List.oneOf(value: List) = FieldDefinition(null, null, this, value) private infix fun Int.matches(pattern: String) = select(this, pattern) private infix fun Int.matching(pattern: String) = select(this, pattern) private fun set(field: FieldDefinition) = Action(set = field)