Skip to content

Commit

Permalink
[TS-1928] added empty cache
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita-Smirnov-Exactpro committed Jul 12, 2024
1 parent 3cac653 commit e9b55b9
Show file tree
Hide file tree
Showing 4 changed files with 192 additions and 56 deletions.
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# th2-conn-dirty-fix (1.7.0)
# th2-conn-dirty-fix (1.8.0)

This microservice allows sending and receiving messages via FIX protocol

Expand Down Expand Up @@ -56,6 +56,7 @@ This microservice allows sending and receiving messages via FIX protocol
The timeout is reset to the original value after a successful sending attempt.
If connection is not established within the specified timeout an error will be reported.
+ *minConnectionTimeoutOnSend* - minimum value for the sending message timeout in milliseconds. _Default value is 1000 mls._
+ *messageCacheSize* - size of in memory message cache used for fast handling recovery. Cache disabled if value is zero or negative. _Default value is 100_

### Security settings

Expand Down Expand Up @@ -264,6 +265,7 @@ spec:
testRequestDelay: 60
reconnectDelay": 5
disconnectRequestDelay: 5
messageCacheSize: 100
mangler:
rules:
- name: rule-1
Expand Down Expand Up @@ -337,7 +339,7 @@ spec:

## 1.8.0

* Provided configurable session cache to handle server resend request
* Provided configurable in memory message cache to handle server resend request
* Updated th2 gradle plugin `0.1.1`

## 1.7.0
Expand Down
137 changes: 83 additions & 54 deletions src/main/java/com/exactpro/th2/FixHandler.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022-2023 Exactpro (Exactpro Systems Limited)
* Copyright 2022-2024 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,8 +17,8 @@
package com.exactpro.th2;

import com.exactpro.th2.common.event.Event;
import com.exactpro.th2.common.grpc.EventID;
import com.exactpro.th2.common.grpc.Direction;
import com.exactpro.th2.common.grpc.EventID;
import com.exactpro.th2.common.grpc.MessageID;
import com.exactpro.th2.common.grpc.RawMessage;
import com.exactpro.th2.common.utils.event.transport.EventUtilsKt;
Expand All @@ -31,8 +31,19 @@
import com.exactpro.th2.conn.dirty.tcp.core.api.IHandlerContext;
import com.exactpro.th2.conn.dirty.tcp.core.util.CommonUtil;
import com.exactpro.th2.dataprovider.lw.grpc.DataProviderService;
import com.exactpro.th2.util.EmptyCache;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import kotlin.jvm.functions.Function1;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.nio.file.Paths;
Expand All @@ -43,7 +54,6 @@
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.time.temporal.ChronoUnit;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
Expand All @@ -59,14 +69,6 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;

import kotlin.jvm.functions.Function1;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static com.exactpro.th2.conn.dirty.fix.FixByteBufUtilKt.findField;
import static com.exactpro.th2.conn.dirty.fix.FixByteBufUtilKt.findLastField;
import static com.exactpro.th2.conn.dirty.fix.FixByteBufUtilKt.firstField;
Expand Down Expand Up @@ -144,6 +146,7 @@

public class FixHandler implements AutoCloseable, IHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(FixHandler.class);
private static final Cache<Integer, ByteBuf> EMPTY_MESSAGE_CACHE = EmptyCache.emptyCache();

private static final int DAY_SECONDS = 24 * 60 * 60;
private static final String SOH = "\001";
Expand Down Expand Up @@ -171,6 +174,7 @@ public class FixHandler implements AutoCloseable, IHandler {
private final SendingTimeoutHandler sendingTimeoutHandler;
private Future<?> reconnectRequestTimer = CompletableFuture.completedFuture(null);
private volatile IChannel channel;
private final Cache<Integer, ByteBuf> messageCache;
protected FixHandlerSettings settings;

public FixHandler(IHandlerContext context) {
Expand All @@ -186,6 +190,12 @@ public FixHandler(IHandlerContext context) {
this.messageLoader = null;
}

if (settings.getMessageCacheSize() > 0) {
this.messageCache = CacheBuilder.newBuilder().maximumSize(settings.getMessageCacheSize()).build();
} else {
this.messageCache = EMPTY_MESSAGE_CACHE;
}

if(settings.getSessionStartTime() != null) {
Objects.requireNonNull(settings.getSessionEndTime(), "Session end is required when session start is presented");
LocalTime resetTime = settings.getSessionStartTime();
Expand Down Expand Up @@ -299,11 +309,11 @@ private CompletableFuture<MessageID> send(@NotNull ByteBuf body, @NotNull Map<St
try {
sendingTimeoutHandler.getWithTimeout(channel.open());
} catch (TimeoutException e) {
ExceptionUtils.rethrow(new TimeoutException(
ExceptionUtils.asRuntimeException(new TimeoutException(
String.format("could not open connection before timeout %d mls elapsed",
currentTimeout)));
} catch (Exception e) {
ExceptionUtils.rethrow(e);
ExceptionUtils.asRuntimeException(e);
}
}

Expand All @@ -316,7 +326,7 @@ private CompletableFuture<MessageID> send(@NotNull ByteBuf body, @NotNull Map<St
}
if (System.currentTimeMillis() > deadline) {
// The method should have checked exception in signature...
ExceptionUtils.rethrow(new TimeoutException(String.format("session was not established within %d mls",
ExceptionUtils.asRuntimeException(new TimeoutException(String.format("session was not established within %d mls",
currentTimeout)));
}
}
Expand Down Expand Up @@ -380,7 +390,7 @@ public ByteBuf onReceive(@NotNull IChannel channel, ByteBuf buffer) {

@NotNull
@Override
public Map<String, String> onIncoming(@NotNull IChannel channel, @NotNull ByteBuf message) {
public Map<String, String> onIncoming(@NotNull IChannel channel, @NotNull ByteBuf message, @NotNull MessageID messageId) {
Map<String, String> metadata = new HashMap<>();

int beginString = indexOf(message, "8=FIX");
Expand Down Expand Up @@ -516,16 +526,14 @@ public Map<String, String> onIncoming(@NotNull IChannel channel, @NotNull ByteBu
return metadata;
}

private Map<String, String> handleTestRequest(ByteBuf message, Map<String, String> metadata) {
private void handleTestRequest(ByteBuf message, Map<String, String> metadata) {
FixField testReqId = findField(message, TEST_REQ_ID_TAG);
if(testReqId == null || testReqId.getValue() == null) {
metadata.put(REJECT_REASON, "Test Request message hasn't got TestReqId field.");
return metadata;
return;
}

sendHeartbeatTestReqId(testReqId.getValue());

return null;
}

private void handleLogout(@NotNull ByteBuf message) {
Expand Down Expand Up @@ -627,66 +635,87 @@ private void recovery(int beginSeqNo, int endSeqNo) {
AtomicInteger lastProcessedSequence = new AtomicInteger(beginSeqNo - 1);
try {
recoveryLock.lock();

if (endSeqNo == 0) {
endSeqNo = msgSeqNum.get() + 1;
}

int endSeq = endSeqNo;
info("Loading messages from %d to %d", beginSeqNo, endSeqNo);
if(settings.isLoadMissedMessagesFromCradle()) {
Function1<ByteBuf, Boolean> processMessage = (buf) -> {
FixField seqNum = findField(buf, MSG_SEQ_NUM_TAG);
FixField msgTypeField = findField(buf, MSG_TYPE_TAG);
if(seqNum == null || seqNum.getValue() == null
|| msgTypeField == null || msgTypeField.getValue() == null) {
return true;
}
int sequence = Integer.parseInt(seqNum.getValue());
String msgType = msgTypeField.getValue();
info("Recovery messages from %d to %d", beginSeqNo, endSeqNo);

if(sequence < beginSeqNo) return true;
if(sequence > endSeq) return false;
int beginSeq = beginSeqNo;
int endSeq = endSeqNo;

if(ADMIN_MESSAGES.contains(msgType)) return true;
FixField possDup = findField(buf, POSS_DUP_TAG);
if(possDup != null && Objects.equals(possDup.getValue(), IS_POSS_DUP)) return true;
Function1<ByteBuf, Boolean> processMessage = (buf) -> {
FixField seqNum = findField(buf, MSG_SEQ_NUM_TAG);
FixField msgTypeField = findField(buf, MSG_TYPE_TAG);
if(seqNum == null || seqNum.getValue() == null
|| msgTypeField == null || msgTypeField.getValue() == null) {
return true;
}
int sequence = Integer.parseInt(seqNum.getValue());
String msgType = msgTypeField.getValue();

if(sequence - 1 != lastProcessedSequence.get() ) {
StringBuilder sequenceReset =
createSequenceReset(Math.max(beginSeqNo, lastProcessedSequence.get() + 1), sequence);
channel.send(Unpooled.wrappedBuffer(sequenceReset.toString().getBytes(StandardCharsets.UTF_8)), createMetadataMap(), null, SendMode.MANGLE);
resetHeartbeatTask();
}
if(sequence < beginSeqNo) return true;
if(sequence > endSeq) return false;

setTime(buf);
setPossDup(buf);
updateLength(buf);
updateChecksum(buf);
channel.send(buf, createMetadataMap(), null, SendMode.MANGLE);
if(ADMIN_MESSAGES.contains(msgType)) return true;
FixField possDup = findField(buf, POSS_DUP_TAG);
if(possDup != null && Objects.equals(possDup.getValue(), IS_POSS_DUP)) return true;

if(sequence - 1 != lastProcessedSequence.get() ) {
StringBuilder sequenceReset =
createSequenceReset(Math.max(beginSeqNo, lastProcessedSequence.get() + 1), sequence);
channel.send(Unpooled.wrappedBuffer(sequenceReset.toString().getBytes(StandardCharsets.UTF_8)), createMetadataMap(), null, SendMode.MANGLE);
resetHeartbeatTask();
}

lastProcessedSequence.set(sequence);
return true;
};
setTime(buf);
setPossDup(buf);
updateLength(buf);
updateChecksum(buf);
channel.send(buf, createMetadataMap(), null, SendMode.MANGLE);

resetHeartbeatTask();

lastProcessedSequence.set(sequence);
return true;
};

if (messageCache != EMPTY_MESSAGE_CACHE) { // check aren't references equal
info("Loading messages from %d to %d from message cache", beginSeq, endSeqNo);
for (int i = beginSeq; i < endSeqNo; i++) {
ByteBuf message = messageCache.getIfPresent(i);
if (message == null) {
info("Messages from %d included to %d excluded have been recovered from message cache", beginSeq, i);
beginSeq = i;
break;
}

if (!processMessage.invoke(message)) {
if (LOGGER.isWarnEnabled()) warn(
"Message from message cache has been rejected by process function, message: %s",
message.toString(US_ASCII));
}
}
}

if(settings.isLoadMissedMessagesFromCradle()) {
info("Loading messages from %d to %d from cradle", beginSeq, endSeqNo);
messageLoader.processMessagesInRange(
channel.getSessionGroup(), channel.getSessionAlias(), Direction.SECOND,
beginSeqNo,
beginSeq,
processMessage
);

if(lastProcessedSequence.get() < endSeq) {
String seqReset = createSequenceReset(Math.max(lastProcessedSequence.get() + 1, beginSeqNo), msgSeqNum.get() + 1).toString();
String seqReset = createSequenceReset(Math.max(lastProcessedSequence.get() + 1, beginSeq), msgSeqNum.get() + 1).toString();
channel.send(
Unpooled.wrappedBuffer(seqReset.getBytes(StandardCharsets.UTF_8)),
createMetadataMap(), null, SendMode.MANGLE
);
}
} else {
String seqReset =
createSequenceReset(beginSeqNo, msgSeqNum.get() + 1).toString();
createSequenceReset(beginSeq, msgSeqNum.get() + 1).toString();
channel.send(
Unpooled.wrappedBuffer(seqReset.getBytes(StandardCharsets.UTF_8)),
createMetadataMap(), null, SendMode.MANGLE
Expand Down
7 changes: 7 additions & 0 deletions src/main/java/com/exactpro/th2/FixHandlerSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public class FixHandlerSettings implements IHandlerSettings {
* Value from Java Security Standard Algorithm Names
*/
private String passwordEncryptAlgorithm = "RSA";
private int messageCacheSize = 100;
private Boolean resetSeqNumFlag = false;
private Boolean resetOnLogon = false;
private Boolean useNextExpectedSeqNum = false;
Expand Down Expand Up @@ -196,6 +197,10 @@ public String getPasswordEncryptAlgorithm() {
return passwordEncryptAlgorithm;
}

public int getMessageCacheSize() {
return messageCacheSize;
}

public Boolean getResetSeqNumFlag() { return resetSeqNumFlag; }

public Boolean getResetOnLogon() { return resetOnLogon; }
Expand Down Expand Up @@ -292,6 +297,8 @@ public void setSessionEndTime(LocalTime sessionEndTime) {
this.sessionEndTime = sessionEndTime;
}

public void setMessageCacheSize(int messageCacheSize) { this.messageCacheSize = messageCacheSize; }

public void setResetSeqNumFlag(Boolean resetSeqNumFlag) { this.resetSeqNumFlag = resetSeqNumFlag; }

public void setResetOnLogon(Boolean resetOnLogon) { this.resetOnLogon = resetOnLogon; }
Expand Down
Loading

0 comments on commit e9b55b9

Please sign in to comment.