Skip to content

Commit

Permalink
Merge remote-tracking branch 'original/dev-version-1' into affected_m…
Browse files Browse the repository at this point in the history
…essages_event
  • Loading branch information
Denis Plotnikov committed Nov 16, 2023
2 parents ef8292d + 65e2032 commit a4ff512
Show file tree
Hide file tree
Showing 7 changed files with 109 additions and 17 deletions.
15 changes: 14 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,10 @@ This microservice allows sending and receiving messages via FIX protocol
+ *logoutOnIncorrectServerSequence* - whether to logout session when server send message with sequence number less than expected. If `false` then internal conn sequence will be reset to sequence number from server message.
+ *connectionTimeoutOnSend* - timeout in milliseconds for sending message from queue thread
(please read about [acknowledgment timeout](https://www.rabbitmq.com/consumers.html#acknowledgement-timeout) to understand the problem).
_Default, 30000 mls._
_Default, 30000 mls._ Each failed sending attempt decreases the timeout in half (but not less than _minConnectionTimeoutOnSend_).
The timeout is reset to the original value after a successful sending attempt.
If connection is not established within the specified timeout an error will be reported.
+ *minConnectionTimeoutOnSend* - minimum value for the sending message timeout in milliseconds. _Default value is 1000 mls._

### Security settings

Expand Down Expand Up @@ -334,6 +336,17 @@ spec:

# Changelog

## 1.5.1

* Property `th2.operation_timestamp` is added to metadata to each message
* Use mutable map for metadata when sending a messages from the handler
* Fix error when new property with operation timestamp added to the immutable map

## 1.5.0

* `minConnectionTimeoutOnSend` parameter is added.
* Sending timeout now decreases in half on each failed attempt (but not less than `minConnectionTimeoutOnSend`).

## 1.4.2
* Ungraceful session disconnect support.
* Removed NPE when session is reset by schedule.
Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ dependencies {
implementation 'com.exactpro.th2:netty-bytebuf-utils:0.0.1'
implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.15.2'

implementation 'com.exactpro.th2:conn-dirty-tcp-core:3.2.1-TH2-5001+'
implementation 'com.exactpro.th2:conn-dirty-tcp-core:3.3.0-TH2-5001+'
implementation 'com.exactpro.th2:grpc-lw-data-provider:2.2.0-dev'

implementation 'org.slf4j:slf4j-api'
Expand Down
91 changes: 76 additions & 15 deletions src/main/java/com/exactpro/th2/FixHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import com.exactpro.th2.conn.dirty.fix.brokenconn.strategy.StrategyScheduler;
import com.exactpro.th2.conn.dirty.fix.brokenconn.strategy.StrategyState;
import com.exactpro.th2.conn.dirty.fix.brokenconn.strategy.api.CleanupHandler;
import com.exactpro.th2.conn.dirty.tcp.core.SendingTimeoutHandler;
import com.exactpro.th2.conn.dirty.tcp.core.api.IChannel;
import com.exactpro.th2.conn.dirty.tcp.core.api.IChannel.SendMode;
import com.exactpro.th2.conn.dirty.tcp.core.api.IHandler;
Expand Down Expand Up @@ -217,6 +218,8 @@ public class FixHandler implements AutoCloseable, IHandler {

private final AtomicReference<Future<?>> heartbeatTimer = new AtomicReference<>(CompletableFuture.completedFuture(null));
private final AtomicReference<Future<?>> testRequestTimer = new AtomicReference<>(CompletableFuture.completedFuture(null));

private final SendingTimeoutHandler sendingTimeoutHandler;
private Future<?> reconnectRequestTimer = CompletableFuture.completedFuture(null);
private volatile IChannel channel;
protected FixHandlerSettings settings;
Expand Down Expand Up @@ -302,6 +305,12 @@ public FixHandler(IHandlerContext context) {
throw new IllegalStateException("`newPassword` contains password that was already used in the past.");
}

this.sendingTimeoutHandler = SendingTimeoutHandler.create(
settings.getMinConnectionTimeoutOnSend(),
settings.getConnectionTimeoutOnSend(),
context::send
);

if (settings.getBrokenConnConfiguration() == null) {
scheduler = new StrategyScheduler(SchedulerType.CONSECUTIVE, Collections.emptyList());
return;
Expand Down Expand Up @@ -349,21 +358,26 @@ public CompletableFuture<MessageID> send(@NotNull ByteBuf body, @NotNull Map<Str
try {
disconnect(!isUngracefulDisconnect);
enabled.set(false);
channel.open().get(settings.getConnectionTimeoutOnSend(), TimeUnit.MILLISECONDS);
sendingTimeoutHandler.getWithTimeout(channel.open());
} catch (Exception e) {
context.send(CommonUtil.toErrorEvent(String.format("Error while ending session %s by user logout. Is graceful disconnect: %b", channel.getSessionAlias(), !isUngracefulDisconnect), e));
}
return CompletableFuture.completedFuture(null);
}

long deadline = System.currentTimeMillis() + settings.getConnectionTimeoutOnSend();
// TODO: probably, this should be moved to the core part
// But those changes will break API
// So, let's keep it here for now
long deadline = sendingTimeoutHandler.getDeadline();
long currentTimeout = sendingTimeoutHandler.getCurrentTimeout();

if (!channel.isOpen()) {
try {
channel.open().get(settings.getConnectionTimeoutOnSend(), TimeUnit.MILLISECONDS);
sendingTimeoutHandler.getWithTimeout(channel.open());
} catch (TimeoutException e) {
ExceptionUtils.rethrow(new TimeoutException(
String.format("could not open connection before timeout %d mls elapsed",
settings.getConnectionTimeoutOnSend())));
currentTimeout)));
} catch (Exception e) {
ExceptionUtils.rethrow(e);
}
Expand Down Expand Up @@ -724,10 +738,14 @@ private void reset() {
if(!channel.isOpen()) channel.open();
}

public void sendResendRequest(int beginSeqNo, int endSeqNo) { //do private
public void sendResendRequest(int beginSeqNo, int endSeqNo) {
sendResendRequest(beginSeqNo, endSeqNo, false);
}

public void sendResendRequest(int beginSeqNo, int endSeqNo, boolean isPossDup) { //do private
LOGGER.info("Sending resend request: {} - {}", beginSeqNo, endSeqNo);
StringBuilder resendRequest = new StringBuilder();
setHeader(resendRequest, MSG_TYPE_RESEND_REQUEST, msgSeqNum.incrementAndGet(), null);
setHeader(resendRequest, MSG_TYPE_RESEND_REQUEST, msgSeqNum.incrementAndGet(), null, isPossDup);
resendRequest.append(BEGIN_SEQ_NO).append(beginSeqNo);
resendRequest.append(END_SEQ_NO).append(endSeqNo);
setChecksumAndBodyLength(resendRequest);
Expand All @@ -747,8 +765,6 @@ void sendResendRequest(int beginSeqNo) { //do private
channel.send(Unpooled.wrappedBuffer(resendRequest.toString().getBytes(StandardCharsets.UTF_8)), new HashMap<String, String>(), null, SendMode.HANDLE_AND_MANGLE)
.thenAcceptAsync(x -> strategy.getState().addMessageID(x), executorService);
resetHeartbeatTask();
} else {
sendLogon();
}
}

Expand Down Expand Up @@ -1030,15 +1046,23 @@ public void onOpen(@NotNull IChannel channel) {
sendLogon();
}

public void sendHeartbeatWithPossDup(boolean isPossDup) {
sendHeartbeatWithTestRequest(null, isPossDup);
}

private void sendHeartbeatWithTestRequest(String testRequestID) {
sendHeartbeatWithTestRequest(testRequestID, false);
}

public void sendHeartbeat() {
sendHeartbeatWithTestRequest(null);
sendHeartbeatWithTestRequest(null, false);
}

private void sendHeartbeatWithTestRequest(String testRequestId) {
private void sendHeartbeatWithTestRequest(String testRequestId, boolean possDup) {
StringBuilder heartbeat = new StringBuilder();
int seqNum = msgSeqNum.incrementAndGet();

setHeader(heartbeat, MSG_TYPE_HEARTBEAT, seqNum, null);
setHeader(heartbeat, MSG_TYPE_HEARTBEAT, seqNum, null, possDup);

if(testRequestId != null) {
heartbeat.append(TEST_REQ_ID).append(testRequestId);
Expand All @@ -1057,8 +1081,12 @@ private void sendHeartbeatWithTestRequest(String testRequestId) {
}

public void sendTestRequest() { //do private
sendTestRequestWithPossDup(false);
}

public void sendTestRequestWithPossDup(boolean isPossDup) { //do private
StringBuilder testRequest = new StringBuilder();
setHeader(testRequest, MSG_TYPE_TEST_REQUEST, msgSeqNum.incrementAndGet(), null);
setHeader(testRequest, MSG_TYPE_TEST_REQUEST, msgSeqNum.incrementAndGet(), null, isPossDup);
testRequest.append(TEST_REQ_ID).append(testReqID.incrementAndGet());
setChecksumAndBodyLength(testRequest);
if (enabled.get()) {
Expand Down Expand Up @@ -1115,14 +1143,22 @@ public void sendLogon() {
.thenAcceptAsync(x -> strategy.getState().addMessageID(x), executorService);
}

private void sendLogout() {
sendLogout(null);
private void sendLogout(boolean isPossDup) {
sendLogout(null, isPossDup);
}

private void sendLogout(String text) {
sendLogout(text, false);
}

private void sendLogout() {
sendLogout(null, false);
}

private void sendLogout(String text, boolean isPossDup) {
if (enabled.get()) {
StringBuilder logout = new StringBuilder();
setHeader(logout, MSG_TYPE_LOGOUT, msgSeqNum.incrementAndGet(), null);
setHeader(logout, MSG_TYPE_LOGOUT, msgSeqNum.incrementAndGet(), null, isPossDup);
if(text != null) {
logout.append(TEXT).append(text);
}
Expand Down Expand Up @@ -1582,6 +1618,22 @@ private void runLogonAfterLogonStrategy(RuleConfiguration configuration) {
ruleEndEvent(configuration.getRuleType(), start, strategy.getState().getMessageIDs());
}

private void runPossDupSessionMessages(RuleConfiguration configuration) {
Instant start = Instant.now();
strategy.resetStrategyAndState(configuration);
ruleStartEvent(configuration.getRuleType(), strategy.getStartTime());
if(!enabled.get()) {
ruleErrorEvent(strategy.getType(), String.format("Session %s isn't logged in.", channel.getSessionAlias()), null);
return;
}

sendResendRequest(serverMsgSeqNum.get() - 2, serverMsgSeqNum.get(), true);
sendHeartbeatWithPossDup(true);
sendTestRequestWithPossDup(true);
sendLogout(true);
ruleEndEvent(configuration.getRuleType(), start, strategy.getState().getMessageIDs());
}

private void setupDisconnectStrategy(RuleConfiguration configuration) {
strategy.resetStrategyAndState(configuration);
strategy.updateSendStrategy(x -> {x.setSendPreprocessor(this::blockSend); return Unit.INSTANCE; });
Expand Down Expand Up @@ -1975,7 +2027,9 @@ private Consumer<RuleConfiguration> getSetupFunction(RuleConfiguration config) {
case IGNORE_INCOMING_MESSAGES: return this::setupIgnoreIncomingMessagesStrategy;
case DISCONNECT_WITH_RECONNECT: return this::setupDisconnectStrategy;
case FAKE_RETRANSMISSION: return this::setupFakeRetransmissionStrategy;
case INVALID_CHECKSUM: return this::setupTransformMessageStrategy;
case LOGON_AFTER_LOGON: return this::runLogonAfterLogonStrategy;
case POSS_DUP_SESSION_MESSAGES: return this::runPossDupSessionMessages;
case DEFAULT: return configuration -> strategy.cleanupStrategy();
default: throw new IllegalStateException(String.format("Unknown strategy type %s.", config.getRuleType()));
}
Expand Down Expand Up @@ -2130,6 +2184,10 @@ private void waitLogoutResponse() {
}

private void setHeader(StringBuilder stringBuilder, String msgType, Integer seqNum, String time) {
setHeader(stringBuilder, msgType, seqNum, time, false);
}

private void setHeader(StringBuilder stringBuilder, String msgType, Integer seqNum, String time, boolean isPossDup) {
stringBuilder.append(BEGIN_STRING_TAG).append("=").append(settings.getBeginString());
stringBuilder.append(MSG_TYPE).append(msgType);
stringBuilder.append(MSG_SEQ_NUM).append(seqNum);
Expand All @@ -2142,6 +2200,9 @@ private void setHeader(StringBuilder stringBuilder, String msgType, Integer seqN
} else {
stringBuilder.append(getTime());
}
if(isPossDup) {
stringBuilder.append(POSS_DUP).append(IS_POSS_DUP);
}
}

private void setChecksumAndBodyLength(StringBuilder stringBuilder) {
Expand Down
10 changes: 10 additions & 0 deletions src/main/java/com/exactpro/th2/FixHandlerSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ public class FixHandlerSettings implements IHandlerSettings {
*/
private long connectionTimeoutOnSend = DEFAULT_CONNECTION_TIMEOUT_ON_SEND;

private long minConnectionTimeoutOnSend = 1_000;

@JsonDeserialize(using = DateTimeFormatterDeserializer.class)
private DateTimeFormatter sendingDateTimeFormat = DateTimeFormatter.ofPattern("yyyyMMdd-HH:mm:ss.SSSSSSSSS");

Expand Down Expand Up @@ -338,4 +340,12 @@ public long getConnectionTimeoutOnSend() {
public void setConnectionTimeoutOnSend(long connectionTimeoutOnSend) {
this.connectionTimeoutOnSend = connectionTimeoutOnSend;
}

public long getMinConnectionTimeoutOnSend() {
return minConnectionTimeoutOnSend;
}

public void setMinConnectionTimeoutOnSend(long minConnectionTimeoutOnSend) {
this.minConnectionTimeoutOnSend = minConnectionTimeoutOnSend;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ data class RuleConfiguration(
RuleType.TRANSFORM_MESSAGE_STRATEGY -> {
require(transformMessageConfiguration != null) { "`transformMessageConfiguration` is required for $ruleType"}
}
RuleType.INVALID_CHECKSUM -> {
require(transformMessageConfiguration != null) { "`transformMessageConfiguration` is required for $ruleType" }
}
RuleType.BI_DIRECTIONAL_RESEND_REQUEST -> {
require(missIncomingMessagesConfiguration != null) { "`blockIncomingMessagesConfiguration` is required for $ruleType" }
require(missOutgoingMessagesConfiguration != null) { "`blockOutgoingMessagesConfiguration` is required for $ruleType" }
Expand All @@ -79,6 +82,7 @@ data class RuleConfiguration(
RuleType.DEFAULT -> {}
RuleType.FAKE_RETRANSMISSION -> {}
RuleType.LOGON_AFTER_LOGON -> {}
RuleType.POSS_DUP_SESSION_MESSAGES -> {}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,7 @@ enum class RuleType {
SPLIT_SEND,
LOGON_AFTER_LOGON,
FAKE_RETRANSMISSION,
INVALID_CHECKSUM,
POSS_DUP_SESSION_MESSAGES,
DEFAULT
}
2 changes: 2 additions & 0 deletions src/test/java/com/exactpro/th2/FixHandlerSendTimeoutTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ void sendTimeoutOnConnectionOpen() {
settings.setPort(42);
settings.setHost("localhost");
settings.setConnectionTimeoutOnSend(300); // 300 millis
settings.setMinConnectionTimeoutOnSend(100);
Mockito.when(contextMock.getSettings())
.thenReturn(settings);
var fixHandler = new FixHandler(contextMock);
Expand Down Expand Up @@ -100,6 +101,7 @@ void sendTimeoutOnSessionEnabled() {
settings.setPort(42);
settings.setHost("localhost");
settings.setConnectionTimeoutOnSend(300); // 300 millis
settings.setMinConnectionTimeoutOnSend(100);
LocalTime currentTime = LocalTime.now(ZoneOffset.UTC);
int deltaMinutes = currentTime.isAfter(LocalTime.NOON)
? -1
Expand Down

0 comments on commit a4ff512

Please sign in to comment.