Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[TH2-5076] Use UTC timezone for sending time tag and for in MessageLo… #65

Merged
merged 2 commits into from
Sep 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# th2-conn-dirty-fix (1.4.0)
# th2-conn-dirty-fix (1.4.1)

This microservice allows sending and receiving messages via FIX protocol

Expand Down Expand Up @@ -328,6 +328,9 @@ spec:
```

# Changelog
### 1.4.1
* Use UTC time zone for sending time tag

### 1.4.0
* Ungraceful session disconnect support.
* Removed NPE when session is reset by schedule.
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
release_version=1.4.0
release_version=1.4.1
jackson_version=2.11.2

24 changes: 12 additions & 12 deletions src/main/java/com/exactpro/th2/FixHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,8 @@ public class FixHandler implements AutoCloseable, IHandler {
private final MessageLoader messageLoader;
private final ReentrantLock recoveryLock = new ReentrantLock();

private AtomicReference<Future<?>> heartbeatTimer = new AtomicReference<>(CompletableFuture.completedFuture(null));
private AtomicReference<Future<?>> testRequestTimer = new AtomicReference<>(CompletableFuture.completedFuture(null));
private final AtomicReference<Future<?>> heartbeatTimer = new AtomicReference<>(CompletableFuture.completedFuture(null));
private final AtomicReference<Future<?>> testRequestTimer = new AtomicReference<>(CompletableFuture.completedFuture(null));
private Future<?> reconnectRequestTimer = CompletableFuture.completedFuture(null);
private volatile IChannel channel;
protected FixHandlerSettings settings;
Expand Down Expand Up @@ -294,18 +294,16 @@ public CompletableFuture<MessageID> send(@NotNull RawMessage rawMessage) {
}
}

CompletableFuture<MessageID> result = CompletableFuture.completedFuture(null);
try {
recoveryLock.lock();
result = channel.send(toByteBuf(rawMessage.getBody()), rawMessage.getMetadata().getPropertiesMap(), getEventId(rawMessage), SendMode.HANDLE_AND_MANGLE);
return channel.send(toByteBuf(rawMessage.getBody()), rawMessage.getMetadata().getPropertiesMap(), getEventId(rawMessage), SendMode.HANDLE_AND_MANGLE);
} finally {
recoveryLock.unlock();
}
return result;
}

@Override
public ByteBuf onReceive(IChannel channel, ByteBuf buffer) {
public ByteBuf onReceive(@NotNull IChannel channel, ByteBuf buffer) {
int offset = buffer.readerIndex();
if (offset == buffer.writerIndex()) return null;

Expand Down Expand Up @@ -369,7 +367,7 @@ public Map<String, String> onIncoming(@NotNull IChannel channel, @NotNull ByteBu
FixField possDup = findField(message, POSS_DUP_TAG);
boolean isDup = false;
if(possDup != null) {
isDup = possDup.getValue().equals(IS_POSS_DUP);
isDup = IS_POSS_DUP.equals(possDup.getValue());
}

String msgTypeValue = requireNonNull(msgType.getValue());
Expand Down Expand Up @@ -522,7 +520,7 @@ private void resetSequence(ByteBuf message) {
FixField seqNumValue = findField(message, NEW_SEQ_NO_TAG);

if(seqNumValue != null) {
if(gapFillMode == null || gapFillMode.getValue().equals("N")) {
if(gapFillMode == null || "N".equals(gapFillMode.getValue())) {
serverMsgSeqNum.set(Integer.parseInt(requireNonNull(seqNumValue.getValue())));
} else {
serverMsgSeqNum.set(Integer.parseInt(requireNonNull(seqNumValue.getValue())) - 1);
Expand Down Expand Up @@ -604,7 +602,7 @@ private void recovery(int beginSeqNo, int endSeqNo) {
|| msgTypeField == null || msgTypeField.getValue() == null) {
return true;
}
Integer sequence = Integer.parseInt(seqNum.getValue());
int sequence = Integer.parseInt(seqNum.getValue());
String msgType = msgTypeField.getValue();

if(sequence < beginSeqNo) return true;
Expand Down Expand Up @@ -727,7 +725,8 @@ public void onOutgoingUpdateTag(@NotNull ByteBuf message, @NotNull Map<String, S
FixField beginString = findField(message, BEGIN_STRING_TAG);

if (beginString == null) {
beginString = firstField(message).insertPrevious(BEGIN_STRING_TAG, settings.getBeginString());
beginString = requireNonNull(firstField(message), () -> "First filed isn't found in message: " + message.toString(US_ASCII))
.insertPrevious(BEGIN_STRING_TAG, settings.getBeginString());
}

FixField bodyLength = findField(message, BODY_LENGTH_TAG, US_ASCII, beginString);
Expand All @@ -749,7 +748,8 @@ public void onOutgoingUpdateTag(@NotNull ByteBuf message, @NotNull Map<String, S
FixField checksum = findLastField(message, CHECKSUM_TAG);

if (checksum == null) {
checksum = lastField(message).insertNext(CHECKSUM_TAG, STUBBING_VALUE); //stubbing until finish checking message
checksum = requireNonNull(lastField(message), "Last filed isn't found in message: " + message.toString(US_ASCII))
.insertNext(CHECKSUM_TAG, STUBBING_VALUE); //stubbing until finish checking message
}

FixField msgSeqNum = findField(message, MSG_SEQ_NUM_TAG, US_ASCII, bodyLength);
Expand Down Expand Up @@ -1042,7 +1042,7 @@ public int getBodyLength(ByteBuf message) {

public String getTime() {
DateTimeFormatter formatter = settings.getSendingDateTimeFormat();
LocalDateTime datetime = LocalDateTime.now();
LocalDateTime datetime = LocalDateTime.now(ZoneOffset.UTC);
return formatter.format(datetime);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class MessageLoader(
init {
val today = LocalDate.now(ZoneOffset.UTC)
val start = sessionStartTime?.atDate(today)
val now = LocalDateTime.now()
val now = LocalDateTime.now(ZoneOffset.UTC)
if(start == null) {
sessionStart = OffsetDateTime
.now(ZoneOffset.UTC)
Expand Down Expand Up @@ -152,7 +152,7 @@ class MessageLoader(
createSearchRequest(Instant.now().toTimestamp(), direction, sessionAlias)
)

var firstValidMessage = firstValidMessageDetails(backwardIterator) ?: return@withCancellation
val firstValidMessage = firstValidMessageDetails(backwardIterator) ?: return@withCancellation

var messagesToSkip = firstValidMessage.payloadSequence - fromSequence

Expand Down