Skip to content

Commit

Permalink
pass hash map instead of empty map for properties filling
Browse files Browse the repository at this point in the history
  • Loading branch information
Denis Plotnikov committed Oct 27, 2023
1 parent f700ec6 commit 8dce679
Showing 1 changed file with 17 additions and 17 deletions.
34 changes: 17 additions & 17 deletions src/main/java/com/exactpro/th2/FixHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -731,7 +731,7 @@ public void sendResendRequest(int beginSeqNo, int endSeqNo) { //do private
resendRequest.append(BEGIN_SEQ_NO).append(beginSeqNo);
resendRequest.append(END_SEQ_NO).append(endSeqNo);
setChecksumAndBodyLength(resendRequest);
channel.send(Unpooled.wrappedBuffer(resendRequest.toString().getBytes(StandardCharsets.UTF_8)), Collections.emptyMap(), null, SendMode.HANDLE_AND_MANGLE)
channel.send(Unpooled.wrappedBuffer(resendRequest.toString().getBytes(StandardCharsets.UTF_8)), new HashMap<String, String>(), null, SendMode.HANDLE_AND_MANGLE)
.thenAcceptAsync(x -> strategy.getState().addMessageID(x), executorService);
resetHeartbeatTask();
}
Expand All @@ -744,7 +744,7 @@ void sendResendRequest(int beginSeqNo) { //do private
setChecksumAndBodyLength(resendRequest);

if (enabled.get()) {
channel.send(Unpooled.wrappedBuffer(resendRequest.toString().getBytes(StandardCharsets.UTF_8)), Collections.emptyMap(), null, SendMode.HANDLE_AND_MANGLE)
channel.send(Unpooled.wrappedBuffer(resendRequest.toString().getBytes(StandardCharsets.UTF_8)), new HashMap<String, String>(), null, SendMode.HANDLE_AND_MANGLE)
.thenAcceptAsync(x -> strategy.getState().addMessageID(x), executorService);
resetHeartbeatTask();
} else {
Expand Down Expand Up @@ -804,7 +804,7 @@ private void recovery(int beginSeqNo, int endSeqNo, RecoveryConfig recoveryConfi
if(sequence - 1 != lastProcessedSequence.get() ) {
StringBuilder sequenceReset =
createSequenceReset(Math.max(beginSeqNo, lastProcessedSequence.get() + 1), sequence);
channel.send(Unpooled.wrappedBuffer(sequenceReset.toString().getBytes(StandardCharsets.UTF_8)), Collections.emptyMap(), null, SendMode.MANGLE);
channel.send(Unpooled.wrappedBuffer(sequenceReset.toString().getBytes(StandardCharsets.UTF_8)), new HashMap<String, String>(), null, SendMode.MANGLE);
resetHeartbeatTask();
}

Expand All @@ -813,7 +813,7 @@ private void recovery(int beginSeqNo, int endSeqNo, RecoveryConfig recoveryConfi
updateLength(buf);
updateChecksum(buf);
if(!skip.get()) {
channel.send(buf, Collections.emptyMap(), null, SendMode.MANGLE)
channel.send(buf, new HashMap<String, String>(), null, SendMode.MANGLE)
.thenAcceptAsync(x -> strategy.getState().addMessageID(x), executorService);
}

Expand All @@ -824,7 +824,7 @@ private void recovery(int beginSeqNo, int endSeqNo, RecoveryConfig recoveryConfi

if(!skip.get() && recoveryConfig.getOutOfOrder()) {
skip.set(true);
channel.send(skipped.get(), Collections.emptyMap(), null, SendMode.MANGLE)
channel.send(skipped.get(), new HashMap<String, String>(), null, SendMode.MANGLE)
.thenAcceptAsync(x -> strategy.getState().addMessageID(x), executorService);
}

Expand All @@ -845,15 +845,15 @@ private void recovery(int beginSeqNo, int endSeqNo, RecoveryConfig recoveryConfi
String seqReset = createSequenceReset(Math.max(lastProcessedSequence.get() + 1, beginSeqNo), msgSeqNum.get() + 1).toString();
channel.send(
Unpooled.wrappedBuffer(seqReset.getBytes(StandardCharsets.UTF_8)),
Collections.emptyMap(), null, SendMode.MANGLE
new HashMap<String, String>(), null, SendMode.MANGLE
).thenAcceptAsync(x -> strategy.getState().addMessageID(x), executorService);
}
} else {
String seqReset =
createSequenceReset(beginSeqNo, msgSeqNum.get() + 1).toString();
channel.send(
Unpooled.wrappedBuffer(seqReset.getBytes(StandardCharsets.UTF_8)),
Collections.emptyMap(), null, SendMode.MANGLE
new HashMap<String, String>(), null, SendMode.MANGLE
);
}
resetHeartbeatTask();
Expand All @@ -864,7 +864,7 @@ private void recovery(int beginSeqNo, int endSeqNo, RecoveryConfig recoveryConfi
createSequenceReset(Math.max(beginSeqNo, lastProcessedSequence.get() + 1), msgSeqNum.get() + 1).toString();
channel.send(
Unpooled.buffer().writeBytes(seqReset.getBytes(StandardCharsets.UTF_8)),
Collections.emptyMap(), null, SendMode.MANGLE
new HashMap<String, String>(), null, SendMode.MANGLE
);
}
}
Expand All @@ -878,7 +878,7 @@ private void sendSequenceReset() {
setChecksumAndBodyLength(sequenceReset);

if (enabled.get()) {
channel.send(Unpooled.wrappedBuffer(sequenceReset.toString().getBytes(StandardCharsets.UTF_8)), Collections.emptyMap(), null, SendMode.HANDLE_AND_MANGLE)
channel.send(Unpooled.wrappedBuffer(sequenceReset.toString().getBytes(StandardCharsets.UTF_8)), new HashMap<String, String>(), null, SendMode.HANDLE_AND_MANGLE)
.thenAcceptAsync(x -> strategy.getState().addMessageID(x), executorService);
resetHeartbeatTask();
} else {
Expand Down Expand Up @@ -1048,7 +1048,7 @@ private void sendHeartbeatWithTestRequest(String testRequestId) {

if (enabled.get()) {
LOGGER.info("Send Heartbeat to server - {}", heartbeat);
channel.send(Unpooled.wrappedBuffer(heartbeat.toString().getBytes(StandardCharsets.UTF_8)), Collections.emptyMap(), null, SendMode.HANDLE_AND_MANGLE);
channel.send(Unpooled.wrappedBuffer(heartbeat.toString().getBytes(StandardCharsets.UTF_8)), new HashMap<String, String>(), null, SendMode.HANDLE_AND_MANGLE);
resetHeartbeatTask();

} else {
Expand All @@ -1062,7 +1062,7 @@ public void sendTestRequest() { //do private
testRequest.append(TEST_REQ_ID).append(testReqID.incrementAndGet());
setChecksumAndBodyLength(testRequest);
if (enabled.get()) {
channel.send(Unpooled.wrappedBuffer(testRequest.toString().getBytes(StandardCharsets.UTF_8)), Collections.emptyMap(), null, SendMode.HANDLE_AND_MANGLE)
channel.send(Unpooled.wrappedBuffer(testRequest.toString().getBytes(StandardCharsets.UTF_8)), new HashMap<String, String>(), null, SendMode.HANDLE_AND_MANGLE)
.thenAcceptAsync(x -> strategy.getState().addMessageID(x), executorService);
LOGGER.info("Send TestRequest to server - {}", testRequest);
resetTestRequestTask();
Expand Down Expand Up @@ -1111,7 +1111,7 @@ public void sendLogon() {

setChecksumAndBodyLength(logon);
LOGGER.info("Send logon - {}", logon);
channel.send(Unpooled.wrappedBuffer(logon.toString().getBytes(StandardCharsets.UTF_8)), Collections.emptyMap(), null, SendMode.HANDLE_AND_MANGLE)
channel.send(Unpooled.wrappedBuffer(logon.toString().getBytes(StandardCharsets.UTF_8)), new HashMap<String, String>(), null, SendMode.HANDLE_AND_MANGLE)
.thenAcceptAsync(x -> strategy.getState().addMessageID(x), executorService);
}

Expand All @@ -1133,7 +1133,7 @@ private void sendLogout(String text) {
try {
MessageID messageID = channel.send(
Unpooled.wrappedBuffer(logout.toString().getBytes(StandardCharsets.UTF_8)),
Collections.emptyMap(),
new HashMap<String, String>(),
null,
SendMode.HANDLE_AND_MANGLE
).get();
Expand Down Expand Up @@ -1892,7 +1892,7 @@ private void cleanupBatchSendStrategy() {
strategy.updateSendStrategy(x -> {
state.executeOnBatchCacheIfCondition(size -> size > 0, message -> {
try {
channel.send(message, Collections.emptyMap(), null, SendMode.DIRECT)
channel.send(message, new HashMap<String, String>(), null, SendMode.DIRECT)
.thenAcceptAsync(messageID -> strategy.getState().addMessageID(messageID), executorService);
} catch (Exception e) {
LOGGER.error("Error while sending batch.", e);
Expand Down Expand Up @@ -2006,7 +2006,7 @@ private void recoveryFromState(Integer beginSeqNo, Integer endSeqNo) {

channel.send(
Unpooled.wrappedBuffer(seqReset.toString().getBytes(StandardCharsets.UTF_8)),
Collections.emptyMap(), null, SendMode.MANGLE
new HashMap<String, String>(), null, SendMode.MANGLE
).thenAcceptAsync(x -> strategy.getState().addMessageID(x), executorService);
} else {
setTime(missedMessage);
Expand All @@ -2016,7 +2016,7 @@ private void recoveryFromState(Integer beginSeqNo, Integer endSeqNo) {

LOGGER.info("Sending recovery message from state: {}", missedMessage.toString(US_ASCII));
if(!skip) {
channel.send(missedMessage, Collections.emptyMap(), null, SendMode.MANGLE)
channel.send(missedMessage, new HashMap<String, String>(), null, SendMode.MANGLE)
.thenAcceptAsync(x -> strategy.getState().addMessageID(x), executorService);
}

Expand All @@ -2026,7 +2026,7 @@ private void recoveryFromState(Integer beginSeqNo, Integer endSeqNo) {
}

if(!skip && recoveryConfig.getOutOfOrder()) {
channel.send(skipped, Collections.emptyMap(), null, SendMode.MANGLE)
channel.send(skipped, new HashMap<String, String>(), null, SendMode.MANGLE)
.thenAcceptAsync(x -> strategy.getState().addMessageID(x), executorService);
skip = true;
}
Expand Down

0 comments on commit 8dce679

Please sign in to comment.