Skip to content

Commit

Permalink
[TH2-5076] Use UTC timezone for sending time tag and for in MessageLo…
Browse files Browse the repository at this point in the history
…ader

* Refactoring
  • Loading branch information
Nikita-Smirnov-Exactpro committed Sep 21, 2023
1 parent 2d8467f commit 69ba280
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 14 deletions.
24 changes: 12 additions & 12 deletions src/main/java/com/exactpro/th2/FixHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,8 @@ public class FixHandler implements AutoCloseable, IHandler {
private final MessageLoader messageLoader;
private final ReentrantLock recoveryLock = new ReentrantLock();

private AtomicReference<Future<?>> heartbeatTimer = new AtomicReference<>(CompletableFuture.completedFuture(null));
private AtomicReference<Future<?>> testRequestTimer = new AtomicReference<>(CompletableFuture.completedFuture(null));
private final AtomicReference<Future<?>> heartbeatTimer = new AtomicReference<>(CompletableFuture.completedFuture(null));
private final AtomicReference<Future<?>> testRequestTimer = new AtomicReference<>(CompletableFuture.completedFuture(null));
private Future<?> reconnectRequestTimer = CompletableFuture.completedFuture(null);
private volatile IChannel channel;
protected FixHandlerSettings settings;
Expand Down Expand Up @@ -294,18 +294,16 @@ public CompletableFuture<MessageID> send(@NotNull RawMessage rawMessage) {
}
}

CompletableFuture<MessageID> result = CompletableFuture.completedFuture(null);
try {
recoveryLock.lock();
result = channel.send(toByteBuf(rawMessage.getBody()), rawMessage.getMetadata().getPropertiesMap(), getEventId(rawMessage), SendMode.HANDLE_AND_MANGLE);
return channel.send(toByteBuf(rawMessage.getBody()), rawMessage.getMetadata().getPropertiesMap(), getEventId(rawMessage), SendMode.HANDLE_AND_MANGLE);
} finally {
recoveryLock.unlock();
}
return result;
}

@Override
public ByteBuf onReceive(IChannel channel, ByteBuf buffer) {
public ByteBuf onReceive(@NotNull IChannel channel, ByteBuf buffer) {
int offset = buffer.readerIndex();
if (offset == buffer.writerIndex()) return null;

Expand Down Expand Up @@ -369,7 +367,7 @@ public Map<String, String> onIncoming(@NotNull IChannel channel, @NotNull ByteBu
FixField possDup = findField(message, POSS_DUP_TAG);
boolean isDup = false;
if(possDup != null) {
isDup = possDup.getValue().equals(IS_POSS_DUP);
isDup = IS_POSS_DUP.equals(possDup.getValue());
}

String msgTypeValue = requireNonNull(msgType.getValue());
Expand Down Expand Up @@ -522,7 +520,7 @@ private void resetSequence(ByteBuf message) {
FixField seqNumValue = findField(message, NEW_SEQ_NO_TAG);

if(seqNumValue != null) {
if(gapFillMode == null || gapFillMode.getValue().equals("N")) {
if(gapFillMode == null || "N".equals(gapFillMode.getValue())) {
serverMsgSeqNum.set(Integer.parseInt(requireNonNull(seqNumValue.getValue())));
} else {
serverMsgSeqNum.set(Integer.parseInt(requireNonNull(seqNumValue.getValue())) - 1);
Expand Down Expand Up @@ -604,7 +602,7 @@ private void recovery(int beginSeqNo, int endSeqNo) {
|| msgTypeField == null || msgTypeField.getValue() == null) {
return true;
}
Integer sequence = Integer.parseInt(seqNum.getValue());
int sequence = Integer.parseInt(seqNum.getValue());
String msgType = msgTypeField.getValue();

if(sequence < beginSeqNo) return true;
Expand Down Expand Up @@ -727,7 +725,8 @@ public void onOutgoingUpdateTag(@NotNull ByteBuf message, @NotNull Map<String, S
FixField beginString = findField(message, BEGIN_STRING_TAG);

if (beginString == null) {
beginString = firstField(message).insertPrevious(BEGIN_STRING_TAG, settings.getBeginString());
beginString = requireNonNull(firstField(message), () -> "First filed isn't found in message: " + message.toString(US_ASCII))
.insertPrevious(BEGIN_STRING_TAG, settings.getBeginString());
}

FixField bodyLength = findField(message, BODY_LENGTH_TAG, US_ASCII, beginString);
Expand All @@ -749,7 +748,8 @@ public void onOutgoingUpdateTag(@NotNull ByteBuf message, @NotNull Map<String, S
FixField checksum = findLastField(message, CHECKSUM_TAG);

if (checksum == null) {
checksum = lastField(message).insertNext(CHECKSUM_TAG, STUBBING_VALUE); //stubbing until finish checking message
checksum = requireNonNull(lastField(message), "Last filed isn't found in message: " + message.toString(US_ASCII))
.insertNext(CHECKSUM_TAG, STUBBING_VALUE); //stubbing until finish checking message
}

FixField msgSeqNum = findField(message, MSG_SEQ_NUM_TAG, US_ASCII, bodyLength);
Expand Down Expand Up @@ -1042,7 +1042,7 @@ public int getBodyLength(ByteBuf message) {

public String getTime() {
DateTimeFormatter formatter = settings.getSendingDateTimeFormat();
LocalDateTime datetime = LocalDateTime.now();
LocalDateTime datetime = LocalDateTime.now(ZoneOffset.UTC);
return formatter.format(datetime);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class MessageLoader(
init {
val today = LocalDate.now(ZoneOffset.UTC)
val start = sessionStartTime?.atDate(today)
val now = LocalDateTime.now()
val now = LocalDateTime.now(ZoneOffset.UTC)
if(start == null) {
sessionStart = OffsetDateTime
.now(ZoneOffset.UTC)
Expand Down Expand Up @@ -152,7 +152,7 @@ class MessageLoader(
createSearchRequest(Instant.now().toTimestamp(), direction, sessionAlias)
)

var firstValidMessage = firstValidMessageDetails(backwardIterator) ?: return@withCancellation
val firstValidMessage = firstValidMessageDetails(backwardIterator) ?: return@withCancellation

var messagesToSkip = firstValidMessage.payloadSequence - fromSequence

Expand Down

0 comments on commit 69ba280

Please sign in to comment.