diff --git a/src/main/java/com/exactpro/th2/FixHandler.java b/src/main/java/com/exactpro/th2/FixHandler.java index e31251e..d459f1f 100644 --- a/src/main/java/com/exactpro/th2/FixHandler.java +++ b/src/main/java/com/exactpro/th2/FixHandler.java @@ -731,7 +731,7 @@ public void sendResendRequest(int beginSeqNo, int endSeqNo) { //do private resendRequest.append(BEGIN_SEQ_NO).append(beginSeqNo); resendRequest.append(END_SEQ_NO).append(endSeqNo); setChecksumAndBodyLength(resendRequest); - channel.send(Unpooled.wrappedBuffer(resendRequest.toString().getBytes(StandardCharsets.UTF_8)), Collections.emptyMap(), null, SendMode.HANDLE_AND_MANGLE) + channel.send(Unpooled.wrappedBuffer(resendRequest.toString().getBytes(StandardCharsets.UTF_8)), new HashMap(), null, SendMode.HANDLE_AND_MANGLE) .thenAcceptAsync(x -> strategy.getState().addMessageID(x), executorService); resetHeartbeatTask(); } @@ -744,7 +744,7 @@ void sendResendRequest(int beginSeqNo) { //do private setChecksumAndBodyLength(resendRequest); if (enabled.get()) { - channel.send(Unpooled.wrappedBuffer(resendRequest.toString().getBytes(StandardCharsets.UTF_8)), Collections.emptyMap(), null, SendMode.HANDLE_AND_MANGLE) + channel.send(Unpooled.wrappedBuffer(resendRequest.toString().getBytes(StandardCharsets.UTF_8)), new HashMap(), null, SendMode.HANDLE_AND_MANGLE) .thenAcceptAsync(x -> strategy.getState().addMessageID(x), executorService); resetHeartbeatTask(); } else { @@ -804,7 +804,7 @@ private void recovery(int beginSeqNo, int endSeqNo, RecoveryConfig recoveryConfi if(sequence - 1 != lastProcessedSequence.get() ) { StringBuilder sequenceReset = createSequenceReset(Math.max(beginSeqNo, lastProcessedSequence.get() + 1), sequence); - channel.send(Unpooled.wrappedBuffer(sequenceReset.toString().getBytes(StandardCharsets.UTF_8)), Collections.emptyMap(), null, SendMode.MANGLE); + channel.send(Unpooled.wrappedBuffer(sequenceReset.toString().getBytes(StandardCharsets.UTF_8)), new HashMap(), null, SendMode.MANGLE); resetHeartbeatTask(); } @@ -813,7 +813,7 @@ private void recovery(int beginSeqNo, int endSeqNo, RecoveryConfig recoveryConfi updateLength(buf); updateChecksum(buf); if(!skip.get()) { - channel.send(buf, Collections.emptyMap(), null, SendMode.MANGLE) + channel.send(buf, new HashMap(), null, SendMode.MANGLE) .thenAcceptAsync(x -> strategy.getState().addMessageID(x), executorService); } @@ -824,7 +824,7 @@ private void recovery(int beginSeqNo, int endSeqNo, RecoveryConfig recoveryConfi if(!skip.get() && recoveryConfig.getOutOfOrder()) { skip.set(true); - channel.send(skipped.get(), Collections.emptyMap(), null, SendMode.MANGLE) + channel.send(skipped.get(), new HashMap(), null, SendMode.MANGLE) .thenAcceptAsync(x -> strategy.getState().addMessageID(x), executorService); } @@ -845,7 +845,7 @@ private void recovery(int beginSeqNo, int endSeqNo, RecoveryConfig recoveryConfi String seqReset = createSequenceReset(Math.max(lastProcessedSequence.get() + 1, beginSeqNo), msgSeqNum.get() + 1).toString(); channel.send( Unpooled.wrappedBuffer(seqReset.getBytes(StandardCharsets.UTF_8)), - Collections.emptyMap(), null, SendMode.MANGLE + new HashMap(), null, SendMode.MANGLE ).thenAcceptAsync(x -> strategy.getState().addMessageID(x), executorService); } } else { @@ -853,7 +853,7 @@ private void recovery(int beginSeqNo, int endSeqNo, RecoveryConfig recoveryConfi createSequenceReset(beginSeqNo, msgSeqNum.get() + 1).toString(); channel.send( Unpooled.wrappedBuffer(seqReset.getBytes(StandardCharsets.UTF_8)), - Collections.emptyMap(), null, SendMode.MANGLE + new HashMap(), null, SendMode.MANGLE ); } resetHeartbeatTask(); @@ -864,7 +864,7 @@ private void recovery(int beginSeqNo, int endSeqNo, RecoveryConfig recoveryConfi createSequenceReset(Math.max(beginSeqNo, lastProcessedSequence.get() + 1), msgSeqNum.get() + 1).toString(); channel.send( Unpooled.buffer().writeBytes(seqReset.getBytes(StandardCharsets.UTF_8)), - Collections.emptyMap(), null, SendMode.MANGLE + new HashMap(), null, SendMode.MANGLE ); } } @@ -878,7 +878,7 @@ private void sendSequenceReset() { setChecksumAndBodyLength(sequenceReset); if (enabled.get()) { - channel.send(Unpooled.wrappedBuffer(sequenceReset.toString().getBytes(StandardCharsets.UTF_8)), Collections.emptyMap(), null, SendMode.HANDLE_AND_MANGLE) + channel.send(Unpooled.wrappedBuffer(sequenceReset.toString().getBytes(StandardCharsets.UTF_8)), new HashMap(), null, SendMode.HANDLE_AND_MANGLE) .thenAcceptAsync(x -> strategy.getState().addMessageID(x), executorService); resetHeartbeatTask(); } else { @@ -1048,7 +1048,7 @@ private void sendHeartbeatWithTestRequest(String testRequestId) { if (enabled.get()) { LOGGER.info("Send Heartbeat to server - {}", heartbeat); - channel.send(Unpooled.wrappedBuffer(heartbeat.toString().getBytes(StandardCharsets.UTF_8)), Collections.emptyMap(), null, SendMode.HANDLE_AND_MANGLE); + channel.send(Unpooled.wrappedBuffer(heartbeat.toString().getBytes(StandardCharsets.UTF_8)), new HashMap(), null, SendMode.HANDLE_AND_MANGLE); resetHeartbeatTask(); } else { @@ -1062,7 +1062,7 @@ public void sendTestRequest() { //do private testRequest.append(TEST_REQ_ID).append(testReqID.incrementAndGet()); setChecksumAndBodyLength(testRequest); if (enabled.get()) { - channel.send(Unpooled.wrappedBuffer(testRequest.toString().getBytes(StandardCharsets.UTF_8)), Collections.emptyMap(), null, SendMode.HANDLE_AND_MANGLE) + channel.send(Unpooled.wrappedBuffer(testRequest.toString().getBytes(StandardCharsets.UTF_8)), new HashMap(), null, SendMode.HANDLE_AND_MANGLE) .thenAcceptAsync(x -> strategy.getState().addMessageID(x), executorService); LOGGER.info("Send TestRequest to server - {}", testRequest); resetTestRequestTask(); @@ -1111,7 +1111,7 @@ public void sendLogon() { setChecksumAndBodyLength(logon); LOGGER.info("Send logon - {}", logon); - channel.send(Unpooled.wrappedBuffer(logon.toString().getBytes(StandardCharsets.UTF_8)), Collections.emptyMap(), null, SendMode.HANDLE_AND_MANGLE) + channel.send(Unpooled.wrappedBuffer(logon.toString().getBytes(StandardCharsets.UTF_8)), new HashMap(), null, SendMode.HANDLE_AND_MANGLE) .thenAcceptAsync(x -> strategy.getState().addMessageID(x), executorService); } @@ -1133,7 +1133,7 @@ private void sendLogout(String text) { try { MessageID messageID = channel.send( Unpooled.wrappedBuffer(logout.toString().getBytes(StandardCharsets.UTF_8)), - Collections.emptyMap(), + new HashMap(), null, SendMode.HANDLE_AND_MANGLE ).get(); @@ -1892,7 +1892,7 @@ private void cleanupBatchSendStrategy() { strategy.updateSendStrategy(x -> { state.executeOnBatchCacheIfCondition(size -> size > 0, message -> { try { - channel.send(message, Collections.emptyMap(), null, SendMode.DIRECT) + channel.send(message, new HashMap(), null, SendMode.DIRECT) .thenAcceptAsync(messageID -> strategy.getState().addMessageID(messageID), executorService); } catch (Exception e) { LOGGER.error("Error while sending batch.", e); @@ -2006,7 +2006,7 @@ private void recoveryFromState(Integer beginSeqNo, Integer endSeqNo) { channel.send( Unpooled.wrappedBuffer(seqReset.toString().getBytes(StandardCharsets.UTF_8)), - Collections.emptyMap(), null, SendMode.MANGLE + new HashMap(), null, SendMode.MANGLE ).thenAcceptAsync(x -> strategy.getState().addMessageID(x), executorService); } else { setTime(missedMessage); @@ -2016,7 +2016,7 @@ private void recoveryFromState(Integer beginSeqNo, Integer endSeqNo) { LOGGER.info("Sending recovery message from state: {}", missedMessage.toString(US_ASCII)); if(!skip) { - channel.send(missedMessage, Collections.emptyMap(), null, SendMode.MANGLE) + channel.send(missedMessage, new HashMap(), null, SendMode.MANGLE) .thenAcceptAsync(x -> strategy.getState().addMessageID(x), executorService); } @@ -2026,7 +2026,7 @@ private void recoveryFromState(Integer beginSeqNo, Integer endSeqNo) { } if(!skip && recoveryConfig.getOutOfOrder()) { - channel.send(skipped, Collections.emptyMap(), null, SendMode.MANGLE) + channel.send(skipped, new HashMap(), null, SendMode.MANGLE) .thenAcceptAsync(x -> strategy.getState().addMessageID(x), executorService); skip = true; }