diff --git a/README.md b/README.md index 6c91ce5..923fa67 100644 --- a/README.md +++ b/README.md @@ -336,6 +336,7 @@ spec: ## 1.4.2 * Add property `encode-mode: dirty` for messages that are corrupted with transformation strategies: `TRANSFORM_MESSAGE_STRATEGY`, `INVALID_CHECKSUM` and `TRANSFORM_LOGON` +* Added synchronization for business messages sent via rabbitmq and messages that are sent from handler. ## 1.4.1 diff --git a/src/main/java/com/exactpro/th2/FixHandler.java b/src/main/java/com/exactpro/th2/FixHandler.java index 78dbaf0..ab7b453 100644 --- a/src/main/java/com/exactpro/th2/FixHandler.java +++ b/src/main/java/com/exactpro/th2/FixHandler.java @@ -93,13 +93,7 @@ import java.util.Objects; import java.util.Random; import java.util.Set; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -239,9 +233,12 @@ public class FixHandler implements AutoCloseable, IHandler { private volatile IChannel channel; protected FixHandlerSettings settings; private final MessageTransformer messageTransformer = MessageTransformer.INSTANCE; - private final PasswordManager passwordManager; + private final ReentrantLock communicationLock = new ReentrantLock(); + + private final ReentrantLock disconnectStrategyLock = new ReentrantLock(); + public FixHandler(IHandlerContext context) { this.context = context; strategyRootEvent = context.send(CommonUtil.toEvent("Strategy root event"), null); @@ -352,7 +349,14 @@ public void onStart() { @NotNull public CompletableFuture send(@NotNull ByteBuf body, @NotNull Map properties, @Nullable EventID eventID) { - strategy.getSendStrategy(SendStrategy::getSendPreprocessor).process(body, properties); + try { + disconnectStrategyLock.lock(); + strategy.getSendStrategy(SendStrategy::getSendPreprocessor).process(body, properties); + } + finally { + disconnectStrategyLock.unlock(); + } + if (!sessionActive.get()) { throw new IllegalStateException("Session is not active. It is not possible to send messages."); } @@ -415,7 +419,7 @@ public CompletableFuture send(@NotNull ByteBuf body, @NotNull Map send(@NotNull ByteBuf body, @NotNull Map send(@NotNull ByteBuf body, @NotNull Map strategy.getState().addMessageID(x), executorService); - resetHeartbeatTask(); - } - - void sendResendRequest(int beginSeqNo) { //do private - StringBuilder resendRequest = new StringBuilder(); - setHeader(resendRequest, MSG_TYPE_RESEND_REQUEST, msgSeqNum.incrementAndGet(), null); - resendRequest.append(BEGIN_SEQ_NO).append(beginSeqNo); - resendRequest.append(END_SEQ_NO).append(0); - setChecksumAndBodyLength(resendRequest); - - if (enabled.get()) { + try { + communicationLock.lock(); + + LOGGER.info("Sending resend request: {} - {}", beginSeqNo, endSeqNo); + StringBuilder resendRequest = new StringBuilder(); + setHeader(resendRequest, MSG_TYPE_RESEND_REQUEST, msgSeqNum.incrementAndGet(), null, isPossDup); + resendRequest.append(BEGIN_SEQ_NO).append(beginSeqNo); + resendRequest.append(END_SEQ_NO).append(endSeqNo); + setChecksumAndBodyLength(resendRequest); channel.send(Unpooled.wrappedBuffer(resendRequest.toString().getBytes(StandardCharsets.UTF_8)), strategy.getState().enrichProperties(), null, SendMode.HANDLE_AND_MANGLE) - .thenAcceptAsync(x -> strategy.getState().addMessageID(x), executorService); + .thenAcceptAsync(x -> strategy.getState().addMessageID(x), executorService); resetHeartbeatTask(); + + } finally { + communicationLock.unlock(); + } + } + + void sendResendRequest(int beginSeqNo) { //do private + try { + communicationLock.lock(); + + StringBuilder resendRequest = new StringBuilder(); + setHeader(resendRequest, MSG_TYPE_RESEND_REQUEST, msgSeqNum.incrementAndGet(), null); + resendRequest.append(BEGIN_SEQ_NO).append(beginSeqNo); + resendRequest.append(END_SEQ_NO).append(0); + setChecksumAndBodyLength(resendRequest); + + if (enabled.get()) { + channel.send(Unpooled.wrappedBuffer(resendRequest.toString().getBytes(StandardCharsets.UTF_8)), + strategy.getState().enrichProperties(), + null, + SendMode.HANDLE_AND_MANGLE) + .thenAcceptAsync(x -> strategy.getState().addMessageID(x), executorService); + resetHeartbeatTask(); + } + + } finally { + communicationLock.unlock(); } } @@ -1036,7 +1060,7 @@ public void onOutgoingUpdateTag(@NotNull ByteBuf message, @NotNull Map strategy.getState().addMessageID(x), executorService); - LOGGER.info("Send TestRequest to server - {}", testRequest); - resetTestRequestTask(); - resetHeartbeatTask(); + try { + communicationLock.lock(); + + StringBuilder testRequest = new StringBuilder(); + setHeader(testRequest, MSG_TYPE_TEST_REQUEST, msgSeqNum.incrementAndGet(), null, isPossDup); + testRequest.append(TEST_REQ_ID).append(testReqID.incrementAndGet()); + setChecksumAndBodyLength(testRequest); + channel.send(Unpooled.wrappedBuffer(testRequest.toString().getBytes(StandardCharsets.UTF_8)), + strategy.getState().enrichProperties(), + null, + SendMode.HANDLE_AND_MANGLE) + .thenAcceptAsync(x -> strategy.getState().addMessageID(x), executorService); + LOGGER.info("Send TestRequest to server - {}", testRequest); + resetTestRequestTask(); + resetHeartbeatTask(); + } finally { + communicationLock.unlock(); + } } else { sendLogon(); } @@ -1261,29 +1296,35 @@ private void sendLogout() { } private void sendLogout(String text, boolean isPossDup) { - if (enabled.get()) { - StringBuilder logout = new StringBuilder(); - setHeader(logout, MSG_TYPE_LOGOUT, msgSeqNum.incrementAndGet(), null, isPossDup); - if(text != null) { - logout.append(TEXT).append(text); - } - setChecksumAndBodyLength(logout); + try { + communicationLock.lock(); - LOGGER.debug("Sending logout - {}", logout); + if (enabled.get()) { + StringBuilder logout = new StringBuilder(); + setHeader(logout, MSG_TYPE_LOGOUT, msgSeqNum.incrementAndGet(), null, isPossDup); + if(text != null) { + logout.append(TEXT).append(text); + } + setChecksumAndBodyLength(logout); - try { - MessageID messageID = channel.send( - Unpooled.wrappedBuffer(logout.toString().getBytes(StandardCharsets.UTF_8)), - strategy.getState().enrichProperties(), - null, - SendMode.HANDLE_AND_MANGLE - ).get(); - strategy.getState().addMessageID(messageID); + LOGGER.debug("Sending logout - {}", logout); - LOGGER.info("Sent logout - {}", logout); - } catch (Exception e) { - LOGGER.error("Failed to send logout - {}", logout, e); + try { + MessageID messageID = channel.send( + Unpooled.wrappedBuffer(logout.toString().getBytes(StandardCharsets.UTF_8)), + strategy.getState().enrichProperties(), + null, + SendMode.HANDLE_AND_MANGLE + ).get(); + strategy.getState().addMessageID(messageID); + + LOGGER.info("Sent logout - {}", logout); + } catch (Exception e) { + LOGGER.error("Failed to send logout - {}", logout, e); + } } + } finally { + communicationLock.unlock(); } } @@ -1517,7 +1558,7 @@ private void transformProcessor( long startTime = System.currentTimeMillis(); while (System.currentTimeMillis() - startTime <= timeToBlock) { try { - Thread.sleep(100); + Thread.sleep(10); } catch (Exception e) { LOGGER.error("Error while blocking send.", e); } @@ -1680,7 +1721,7 @@ private Map blockReceiveQueue(ByteBuf message, Map {x.setSendPreprocessor(this::blockSend); return Unit.INSTANCE; }); - strategy.setCleanupHandler(this::cleanupDisconnectStrategy); - ruleStartEvent(configuration.getRuleType(), strategy.getStartTime()); try { - disconnect(configuration.getGracefulDisconnect()); - } catch (Exception e) { - String message = String.format("Error while setting up %s", strategy.getType()); - LOGGER.error(message, e); - context.send(CommonUtil.toErrorEvent(message, e), strategyRootEvent); + disconnectStrategyLock.lock(); + strategy.resetStrategyAndState(configuration); + strategy.setCleanupHandler(this::cleanupDisconnectStrategy); + ruleStartEvent(configuration.getRuleType(), strategy.getStartTime()); + try { + disconnect(configuration.getGracefulDisconnect()); + } catch (Exception e) { + String message = String.format("Error while setting up %s", strategy.getType()); + LOGGER.error(message, e); + context.send(CommonUtil.toErrorEvent(message, e), strategyRootEvent); + } + } finally { + disconnectStrategyLock.unlock(); } } private void cleanupDisconnectStrategy() { var state = strategy.getState(); - strategy.updateSendStrategy(x -> {x.setSendPreprocessor(this::defaultMessageProcessor); return Unit.INSTANCE;}); try { openChannelAndWaitForLogon(); Thread.sleep(strategy.getConfig().getCleanUpDuration().toMillis()); @@ -2132,31 +2176,39 @@ private void runReconnectWithSequenceResetStrategy(RuleConfiguration configurati } private void sendSequenceReset(RuleConfiguration configuration) { - strategy.resetStrategyAndState(configuration); Instant start = Instant.now(); - SendSequenceResetConfiguration config = configuration.getSendSequenceResetConfiguration(); + try { + communicationLock.lock(); + + strategy.resetStrategyAndState(configuration); + SendSequenceResetConfiguration config = configuration.getSendSequenceResetConfiguration(); + + StringBuilder sequenceReset = new StringBuilder(); + String time = getTime(); + setHeader(sequenceReset, MSG_TYPE_SEQUENCE_RESET, msgSeqNum.incrementAndGet(), time); + sequenceReset.append(ORIG_SENDING_TIME).append(time); + if(config.getChangeUp()) { + int seqNum = msgSeqNum.get(); + sequenceReset.append(NEW_SEQ_NO).append(seqNum + 5); + msgSeqNum.set(seqNum + 5); + } else { + sequenceReset.append(NEW_SEQ_NO).append(msgSeqNum.get() - 5); + } + setChecksumAndBodyLength(sequenceReset); - StringBuilder sequenceReset = new StringBuilder(); - String time = getTime(); - setHeader(sequenceReset, MSG_TYPE_SEQUENCE_RESET, msgSeqNum.incrementAndGet(), time); - sequenceReset.append(ORIG_SENDING_TIME).append(time); - if(config.getChangeUp()) { - int seqNum = msgSeqNum.get(); - sequenceReset.append(NEW_SEQ_NO).append(seqNum + 5); - msgSeqNum.set(seqNum + 5); - } else { - sequenceReset.append(NEW_SEQ_NO).append(msgSeqNum.get() - 5); + channel.send(Unpooled.wrappedBuffer(sequenceReset.toString().getBytes(StandardCharsets.UTF_8)), + strategy.getState().enrichProperties(), + null, + SendMode.HANDLE_AND_MANGLE) + .thenAcceptAsync(x -> strategy.getState().addMessageID(x), executorService); + resetHeartbeatTask(); + strategy.cleanupStrategy(); + } catch (Exception e) { + ruleEndEvent(configuration.getRuleType(), start, strategy.getState().getMessageIDs()); + } finally { + communicationLock.unlock(); } - setChecksumAndBodyLength(sequenceReset); - channel.send(Unpooled.wrappedBuffer(sequenceReset.toString().getBytes(StandardCharsets.UTF_8)), - strategy.getState().enrichProperties(), - null, - SendMode.HANDLE_AND_MANGLE) - .thenAcceptAsync(x -> strategy.getState().addMessageID(x), executorService); - resetHeartbeatTask(); - strategy.cleanupStrategy(); - ruleEndEvent(configuration.getRuleType(), start, strategy.getState().getMessageIDs()); } private void setupBatchSendStrategy(RuleConfiguration configuration) { @@ -2443,7 +2495,7 @@ private void waitUntilLoggedIn() { while (!enabled.get() && System.currentTimeMillis() - start < 2000) { LOGGER.info("Waiting until session will be logged in: {}", channel.getSessionAlias()); try { - Thread.sleep(100); + Thread.sleep(10); } catch (Exception e) { LOGGER.error("Error while waiting session login.", e); } @@ -2456,7 +2508,7 @@ private void waitLogoutResponse() { if (LOGGER.isWarnEnabled()) LOGGER.warn("Waiting session logout: {}", channel.getSessionAlias()); try { //noinspection BusyWait - Thread.sleep(1000); + Thread.sleep(10); } catch (InterruptedException e) { LOGGER.error("Error while sleeping."); } @@ -2509,15 +2561,6 @@ public String getRandomOldPassword() { return previouslyUsedPasswords.get(random.nextInt(previouslyUsedPasswords.size())); } - private T getRandomElementFromList(List elements) { - if(elements.isEmpty()) return null; - return elements.get(random.nextInt(elements.size())); - } - - public AtomicBoolean getEnabled() { - return enabled; - } - private void resetHeartbeatTask() { heartbeatTimer.getAndSet( executorService.schedule(