Skip to content

Commit

Permalink
[TH2-5102] Add message property with operation time (#66)
Browse files Browse the repository at this point in the history
* Use mutable map when sending message

* Update version and readme

* Update changelog
  • Loading branch information
OptimumCode authored Oct 30, 2023
1 parent 5403fa1 commit f79f830
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 13 deletions.
8 changes: 7 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# th2-conn-dirty-fix (1.4.2)
# th2-conn-dirty-fix (1.5.1)

This microservice allows sending and receiving messages via FIX protocol

Expand Down Expand Up @@ -335,6 +335,12 @@ spec:

# Changelog

## 1.5.1

* Property `th2.operation_timestamp` is added to metadata to each message
* Use mutable map for metadata when sending a messages from the handler
* Fix error when new property with operation timestamp added to the immutable map

## 1.5.0

* `minConnectionTimeoutOnSend` parameter is added.
Expand Down
28 changes: 16 additions & 12 deletions src/main/java/com/exactpro/th2/FixHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -586,7 +586,7 @@ public void sendResendRequest(int beginSeqNo, int endSeqNo) { //do private
resendRequest.append(BEGIN_SEQ_NO).append(beginSeqNo).append(SOH);
resendRequest.append(END_SEQ_NO).append(endSeqNo).append(SOH);
setChecksumAndBodyLength(resendRequest);
channel.send(Unpooled.wrappedBuffer(resendRequest.toString().getBytes(StandardCharsets.UTF_8)), Collections.emptyMap(), null, IChannel.SendMode.MANGLE);
channel.send(Unpooled.wrappedBuffer(resendRequest.toString().getBytes(StandardCharsets.UTF_8)), createMetadataMap(), null, IChannel.SendMode.MANGLE);
resetHeartbeatTask();
}

Expand All @@ -598,7 +598,7 @@ void sendResendRequest(int beginSeqNo) { //do private
setChecksumAndBodyLength(resendRequest);

if (enabled.get()) {
channel.send(Unpooled.wrappedBuffer(resendRequest.toString().getBytes(StandardCharsets.UTF_8)), Collections.emptyMap(), null, IChannel.SendMode.MANGLE);
channel.send(Unpooled.wrappedBuffer(resendRequest.toString().getBytes(StandardCharsets.UTF_8)), createMetadataMap(), null, IChannel.SendMode.MANGLE);
resetHeartbeatTask();
} else {
sendLogon();
Expand Down Expand Up @@ -655,15 +655,15 @@ private void recovery(int beginSeqNo, int endSeqNo) {
if(sequence - 1 != lastProcessedSequence.get() ) {
StringBuilder sequenceReset =
createSequenceReset(Math.max(beginSeqNo, lastProcessedSequence.get() + 1), sequence);
channel.send(Unpooled.wrappedBuffer(sequenceReset.toString().getBytes(StandardCharsets.UTF_8)), Collections.emptyMap(), null, SendMode.MANGLE);
channel.send(Unpooled.wrappedBuffer(sequenceReset.toString().getBytes(StandardCharsets.UTF_8)), createMetadataMap(), null, SendMode.MANGLE);
resetHeartbeatTask();
}

setTime(buf);
setPossDup(buf);
updateLength(buf);
updateChecksum(buf);
channel.send(buf, Collections.emptyMap(), null, SendMode.MANGLE);
channel.send(buf, createMetadataMap(), null, SendMode.MANGLE);

resetHeartbeatTask();

Expand All @@ -681,15 +681,15 @@ private void recovery(int beginSeqNo, int endSeqNo) {
String seqReset = createSequenceReset(Math.max(lastProcessedSequence.get() + 1, beginSeqNo), msgSeqNum.get() + 1).toString();
channel.send(
Unpooled.wrappedBuffer(seqReset.getBytes(StandardCharsets.UTF_8)),
Collections.emptyMap(), null, SendMode.MANGLE
createMetadataMap(), null, SendMode.MANGLE
);
}
} else {
String seqReset =
createSequenceReset(beginSeqNo, msgSeqNum.get() + 1).toString();
channel.send(
Unpooled.wrappedBuffer(seqReset.getBytes(StandardCharsets.UTF_8)),
Collections.emptyMap(), null, SendMode.MANGLE
createMetadataMap(), null, SendMode.MANGLE
);
}
resetHeartbeatTask();
Expand All @@ -700,7 +700,7 @@ private void recovery(int beginSeqNo, int endSeqNo) {
createSequenceReset(Math.max(beginSeqNo, lastProcessedSequence.get() + 1), msgSeqNum.get() + 1).toString();
channel.send(
Unpooled.buffer().writeBytes(seqReset.getBytes(StandardCharsets.UTF_8)),
Collections.emptyMap(), null, SendMode.MANGLE
createMetadataMap(), null, SendMode.MANGLE
);
} finally {
recoveryLock.unlock();
Expand All @@ -716,7 +716,7 @@ private void sendSequenceReset() {
setChecksumAndBodyLength(sequenceReset);

if (enabled.get()) {
channel.send(Unpooled.wrappedBuffer(sequenceReset.toString().getBytes(StandardCharsets.UTF_8)), Collections.emptyMap(), null, IChannel.SendMode.MANGLE);
channel.send(Unpooled.wrappedBuffer(sequenceReset.toString().getBytes(StandardCharsets.UTF_8)), createMetadataMap(), null, IChannel.SendMode.MANGLE);
resetHeartbeatTask();
} else {
sendLogon();
Expand Down Expand Up @@ -880,7 +880,7 @@ private void sendHeartbeatTestReqId(String testReqId) {

if (enabled.get()) {
info("Send Heartbeat to server - %s", heartbeat);
channel.send(Unpooled.wrappedBuffer(heartbeat.toString().getBytes(StandardCharsets.UTF_8)), Collections.emptyMap(), null, IChannel.SendMode.MANGLE);
channel.send(Unpooled.wrappedBuffer(heartbeat.toString().getBytes(StandardCharsets.UTF_8)), createMetadataMap(), null, IChannel.SendMode.MANGLE);
resetHeartbeatTask();

} else {
Expand All @@ -894,7 +894,7 @@ public void sendTestRequest() { //do private
testRequest.append(TEST_REQ_ID).append(testReqID.incrementAndGet());
setChecksumAndBodyLength(testRequest);
if (enabled.get()) {
channel.send(Unpooled.wrappedBuffer(testRequest.toString().getBytes(StandardCharsets.UTF_8)), Collections.emptyMap(), null, IChannel.SendMode.MANGLE);
channel.send(Unpooled.wrappedBuffer(testRequest.toString().getBytes(StandardCharsets.UTF_8)), createMetadataMap(), null, IChannel.SendMode.MANGLE);
info("Send TestRequest to server - %s", testRequest);
resetTestRequestTask();
resetHeartbeatTask();
Expand Down Expand Up @@ -942,7 +942,7 @@ public void sendLogon() {

setChecksumAndBodyLength(logon);
info("Send logon - %s", logon);
channel.send(Unpooled.wrappedBuffer(logon.toString().getBytes(StandardCharsets.UTF_8)), Collections.emptyMap(), null, IChannel.SendMode.MANGLE);
channel.send(Unpooled.wrappedBuffer(logon.toString().getBytes(StandardCharsets.UTF_8)), createMetadataMap(), null, IChannel.SendMode.MANGLE);
}

private void sendLogout() {
Expand All @@ -963,7 +963,7 @@ private void sendLogout(String text) {
try {
channel.send(
Unpooled.wrappedBuffer(logout.toString().getBytes(StandardCharsets.UTF_8)),
Collections.emptyMap(),
createMetadataMap(),
null,
IChannel.SendMode.MANGLE
).get();
Expand Down Expand Up @@ -1174,4 +1174,8 @@ private void debug(String message, Object... args) {
LOGGER.debug("{} - {}: {}", channel.getSessionGroup(), channel.getSessionAlias(), String.format(message, args));
}
}

private Map<String, String> createMetadataMap() {
return new HashMap<>(2);
}
}

0 comments on commit f79f830

Please sign in to comment.