diff --git a/src/main/java/com/exactpro/th2/FixHandler.java b/src/main/java/com/exactpro/th2/FixHandler.java index 685bda9..d22b726 100644 --- a/src/main/java/com/exactpro/th2/FixHandler.java +++ b/src/main/java/com/exactpro/th2/FixHandler.java @@ -57,6 +57,7 @@ import com.exactpro.th2.conn.dirty.tcp.core.util.CommonUtil; import com.exactpro.th2.dataprovider.lw.grpc.DataProviderService; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import kotlin.Unit; @@ -217,7 +218,7 @@ public class FixHandler implements AutoCloseable, IHandler { private final AtomicBoolean enabled = new AtomicBoolean(false); private final AtomicBoolean connStarted = new AtomicBoolean(false); private final AtomicBoolean strategiesEnabled = new AtomicBoolean(true); - private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1); + private final ScheduledExecutorService executorService; private final IHandlerContext context; private final InetSocketAddress address; @@ -243,6 +244,12 @@ public FixHandler(IHandlerContext context) { this.context = context; strategyRootEvent = context.send(CommonUtil.toEvent("Strategy root event"), null); this.settings = (FixHandlerSettings) context.getSettings(); + + executorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder() + .setNameFormat(settings.getSenderCompID() + '/' + + settings.getSenderSubID() + '>' + + settings.getTargetCompID() + "-%d").build()); + if(settings.isLoadSequencesFromCradle() || settings.isLoadMissedMessagesFromCradle()) { this.messageLoader = new MessageLoader( executorService, @@ -756,9 +763,7 @@ private void resetSequence(ByteBuf message) { serverMsgSeqNum.set(Integer.parseInt(requireNonNull(seqNumValue.getValue()))); } else { int newSeqNo = Integer.parseInt(requireNonNull(seqNumValue.getValue())); - serverMsgSeqNum.updateAndGet(sequence -> { - return Math.max(sequence, newSeqNo - 1); - }); + serverMsgSeqNum.updateAndGet(sequence -> Math.max(sequence, newSeqNo - 1)); } } @@ -1029,7 +1034,7 @@ public void onOutgoingUpdateTag(@NotNull ByteBuf message, @NotNull Map messageI ruleEndEvent(type, start, messageIDS, Collections.emptyMap()); } - private void ruleErrorEvent(RuleType type, String message, Throwable error) { + private void ruleErrorEvent(RuleType type, String message, @Nullable Throwable error) { String errorLog = String.format("Rule %s error event: message - %s, error - %s", type, message, error); LOGGER.error(errorLog, error); - context.send( - Event + Event event = Event .start() .endTimestamp() .type(STRATEGY_EVENT_TYPE) .name(errorLog) - .exception(error, true) - .status(Event.Status.FAILED), + .status(Event.Status.FAILED); + if (error != null) { + event.exception(error, true); + } + context.send( + event, strategyRootEvent ); }