From 190343ca0ce28537ee7ffa9370fd787ac220fc5a Mon Sep 17 00:00:00 2001 From: "nikita.smirnov" Date: Fri, 23 Feb 2024 20:46:11 +0400 Subject: [PATCH] [TS-2126] Property `th2.broken.strategy` is added to metadata to each message when a strategy is active --- README.md | 15 +-- .../java/com/exactpro/th2/FixHandler.java | 107 ++++++++++++------ .../fix/brokenconn/strategy/StrategyState.kt | 29 +++-- 3 files changed, 98 insertions(+), 53 deletions(-) diff --git a/README.md b/README.md index 030fc5f..00b2048 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# th2-conn-dirty-fix (1.2.0) +# th2-conn-dirty-fix (1.5.2) This microservice allows sending and receiving messages via FIX protocol @@ -336,6 +336,10 @@ spec: # Changelog +## 1.5.2 + +* Property `th2.broken.strategy` is added to metadata to each message when a strategy is active + ## 1.5.1 * Property `th2.operation_timestamp` is added to metadata to each message @@ -404,18 +408,9 @@ spec: ## 1.0.0 * Bump `conn-dirty-tcp-core` to `3.0.0` for books and pages support -<<<<<<< HEAD -<<<<<<< HEAD -======= -## 0.3.0 -* Ability to recover messages from cradle. - ->>>>>>> original/dev-version-1 -======= ## 0.3.0 * Ability to recover messages from cradle. ->>>>>>> original/dev-version-1 ## 0.2.0 * optional state reset on silent server reset. diff --git a/src/main/java/com/exactpro/th2/FixHandler.java b/src/main/java/com/exactpro/th2/FixHandler.java index ca80f1e..132cd2a 100644 --- a/src/main/java/com/exactpro/th2/FixHandler.java +++ b/src/main/java/com/exactpro/th2/FixHandler.java @@ -1,5 +1,5 @@ /* - * Copyright 2022-2023 Exactpro (Exactpro Systems Limited) + * Copyright 2022-2024 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -76,7 +76,6 @@ import java.time.format.DateTimeFormatter; import java.time.temporal.ChronoUnit; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -96,12 +95,11 @@ import java.util.concurrent.locks.ReentrantLock; import java.util.function.Consumer; import java.util.stream.Collectors; -import javax.net.SocketFactory; + import kotlin.Unit; import kotlin.jvm.functions.Function1; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.exception.ExceptionUtils; -import org.checkerframework.checker.units.qual.A; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.slf4j.Logger; @@ -501,8 +499,10 @@ public ByteBuf onReceive(@NotNull IChannel channel, @NotNull ByteBuf buffer) { public Map onIncoming(@NotNull IChannel channel, @NotNull ByteBuf message, MessageID messageId) { Map metadata = new HashMap<>(); + StrategyState state = strategy.getState(); + state.enrichProperties(metadata); if(strategy.getIncomingMessageStrategy(IncomingMessagesStrategy::getIncomingMessagesPreprocessor).process(message, metadata) != null) { - strategy.getState().addMessageID(messageId); + state.addMessageID(messageId); return metadata; } @@ -536,7 +536,7 @@ public Map onIncoming(@NotNull IChannel channel, @NotNull ByteBu String msgTypeValue = requireNonNull(msgType.getValue()); if(msgTypeValue.equals(MSG_TYPE_LOGOUT)) { serverMsgSeqNum.incrementAndGet(); - strategy.getState().addMessageID(messageId); + state.addMessageID(messageId); strategy.getIncomingMessageStrategy(x -> x.getLogoutStrategy()).process(message, metadata); return metadata; } @@ -579,7 +579,7 @@ public Map onIncoming(@NotNull IChannel channel, @NotNull ByteBu handleHeartbeat(message); break; case MSG_TYPE_LOGON: - strategy.getState().addMessageID(messageId);strategy.getState().addMessageID(messageId); + state.addMessageID(messageId); Map logonMetadata = strategy.getIncomingMessageStrategy(IncomingMessagesStrategy::getLogonStrategy).process(message, metadata); if (logonMetadata != null) return logonMetadata; if(serverMsgSeqNum.get() < receivedMsgSeqNum && !isDup && !enabled.get()) { @@ -589,17 +589,17 @@ public Map onIncoming(@NotNull IChannel channel, @NotNull ByteBu } break; case MSG_TYPE_RESEND_REQUEST: - strategy.getState().addMessageID(messageId); + state.addMessageID(messageId); if (LOGGER.isInfoEnabled()) LOGGER.info("Resend request received - {}", message.toString(US_ASCII)); handleResendRequest(message); break; case MSG_TYPE_SEQUENCE_RESET: //gap fill - strategy.getState().addMessageID(messageId); + state.addMessageID(messageId); if (LOGGER.isInfoEnabled()) LOGGER.info("Sequence reset received - {}", message.toString(US_ASCII)); resetSequence(message); break; case MSG_TYPE_TEST_REQUEST: - strategy.getState().addMessageID(messageId); + state.addMessageID(messageId); if(LOGGER.isInfoEnabled()) LOGGER.info("Test request received - {}", message.toString(US_ASCII)); if(strategy.getIncomingMessageStrategy(IncomingMessagesStrategy::getTestRequestProcessor).process(message, metadata) != null) { return metadata; @@ -607,7 +607,7 @@ public Map onIncoming(@NotNull IChannel channel, @NotNull ByteBu break; default: if(isDup) { - strategy.getState().addMessageID(messageId); + state.addMessageID(messageId); } if(LOGGER.isInfoEnabled()) LOGGER.info("Received message - {}", message.toString(US_ASCII)); } @@ -789,7 +789,10 @@ public void sendResendRequest(int beginSeqNo, int endSeqNo, boolean isPossDup) { resendRequest.append(BEGIN_SEQ_NO).append(beginSeqNo); resendRequest.append(END_SEQ_NO).append(endSeqNo); setChecksumAndBodyLength(resendRequest); - channel.send(Unpooled.wrappedBuffer(resendRequest.toString().getBytes(StandardCharsets.UTF_8)), new HashMap(), null, SendMode.HANDLE_AND_MANGLE) + channel.send(Unpooled.wrappedBuffer(resendRequest.toString().getBytes(StandardCharsets.UTF_8)), + strategy.getState().enrichProperties(), + null, + SendMode.HANDLE_AND_MANGLE) .thenAcceptAsync(x -> strategy.getState().addMessageID(x), executorService); resetHeartbeatTask(); } @@ -802,7 +805,10 @@ void sendResendRequest(int beginSeqNo) { //do private setChecksumAndBodyLength(resendRequest); if (enabled.get()) { - channel.send(Unpooled.wrappedBuffer(resendRequest.toString().getBytes(StandardCharsets.UTF_8)), new HashMap(), null, SendMode.HANDLE_AND_MANGLE) + channel.send(Unpooled.wrappedBuffer(resendRequest.toString().getBytes(StandardCharsets.UTF_8)), + strategy.getState().enrichProperties(), + null, + SendMode.HANDLE_AND_MANGLE) .thenAcceptAsync(x -> strategy.getState().addMessageID(x), executorService); resetHeartbeatTask(); } @@ -869,7 +875,10 @@ private void recovery(int beginSeqNo, int endSeqNo, RecoveryConfig recoveryConfi if(sequence - 1 != lastProcessedSequence.get() ) { StringBuilder sequenceReset = createSequenceReset(Math.max(beginSeqNo, lastProcessedSequence.get() + 1), sequence); - channel.send(Unpooled.wrappedBuffer(sequenceReset.toString().getBytes(StandardCharsets.UTF_8)), new HashMap(), null, SendMode.MANGLE); + channel.send(Unpooled.wrappedBuffer(sequenceReset.toString().getBytes(StandardCharsets.UTF_8)), + strategy.getState().enrichProperties(), + null, + SendMode.MANGLE); resetHeartbeatTask(); } @@ -878,7 +887,7 @@ private void recovery(int beginSeqNo, int endSeqNo, RecoveryConfig recoveryConfi updateLength(buf); updateChecksum(buf); if(!skip.get()) { - channel.send(buf, new HashMap(), null, SendMode.MANGLE) + channel.send(buf, strategy.getState().enrichProperties(), null, SendMode.MANGLE) .thenAcceptAsync(x -> strategy.getState().addMessageID(x), executorService); try { Thread.sleep(settings.getRecoverySendIntervalMs()); @@ -894,7 +903,7 @@ private void recovery(int beginSeqNo, int endSeqNo, RecoveryConfig recoveryConfi if(!skip.get() && recoveryConfig.getOutOfOrder()) { skip.set(true); - channel.send(skipped.get(), new HashMap(), null, SendMode.MANGLE) + channel.send(skipped.get(), strategy.getState().enrichProperties(), null, SendMode.MANGLE) .thenAcceptAsync(x -> strategy.getState().addMessageID(x), executorService); try { Thread.sleep(settings.getRecoverySendIntervalMs()); @@ -920,7 +929,7 @@ private void recovery(int beginSeqNo, int endSeqNo, RecoveryConfig recoveryConfi String seqReset = createSequenceReset(Math.max(lastProcessedSequence.get() + 1, beginSeqNo), msgSeqNum.get() + 1).toString(); channel.send( Unpooled.wrappedBuffer(seqReset.getBytes(StandardCharsets.UTF_8)), - new HashMap(), null, SendMode.MANGLE + strategy.getState().enrichProperties(), null, SendMode.MANGLE ).thenAcceptAsync(x -> strategy.getState().addMessageID(x), executorService); } } else { @@ -928,7 +937,7 @@ private void recovery(int beginSeqNo, int endSeqNo, RecoveryConfig recoveryConfi createSequenceReset(beginSeqNo, msgSeqNum.get() + 1).toString(); channel.send( Unpooled.wrappedBuffer(seqReset.getBytes(StandardCharsets.UTF_8)), - new HashMap(), null, SendMode.MANGLE + strategy.getState().enrichProperties(), null, SendMode.MANGLE ); } resetHeartbeatTask(); @@ -938,8 +947,10 @@ private void recovery(int beginSeqNo, int endSeqNo, RecoveryConfig recoveryConfi String seqReset = createSequenceReset(Math.max(beginSeqNo, lastProcessedSequence.get() + 1), msgSeqNum.get() + 1).toString(); channel.send( - Unpooled.buffer().writeBytes(seqReset.getBytes(StandardCharsets.UTF_8)), - new HashMap(), null, SendMode.MANGLE + Unpooled.buffer().writeBytes(seqReset.getBytes(StandardCharsets.UTF_8)), + strategy.getState().enrichProperties(), + null, + SendMode.MANGLE ); } } @@ -978,6 +989,7 @@ private boolean checkLogon(ByteBuf message) { @Override public void onOutgoing(@NotNull IChannel channel, @NotNull ByteBuf message, @NotNull Map metadata) { + strategy.getState().enrichProperties(metadata); strategy.getOutgoingMessageStrategy(OutgoingMessagesStrategy::getOutgoingMessageProcessor).process(message, metadata); if (LOGGER.isInfoEnabled()) LOGGER.info("Outgoing message: {}", message.toString(US_ASCII)); @@ -1123,7 +1135,10 @@ private void sendHeartbeatWithTestRequest(String testRequestId, boolean possDup) if (enabled.get()) { LOGGER.info("Send Heartbeat to server - {}", heartbeat); - channel.send(Unpooled.wrappedBuffer(heartbeat.toString().getBytes(StandardCharsets.UTF_8)), new HashMap(), null, SendMode.HANDLE_AND_MANGLE); + channel.send(Unpooled.wrappedBuffer(heartbeat.toString().getBytes(StandardCharsets.UTF_8)), + strategy.getState().enrichProperties(), + null, + SendMode.HANDLE_AND_MANGLE); resetHeartbeatTask(); } else { @@ -1141,7 +1156,10 @@ public void sendTestRequestWithPossDup(boolean isPossDup) { //do private testRequest.append(TEST_REQ_ID).append(testReqID.incrementAndGet()); setChecksumAndBodyLength(testRequest); if (enabled.get()) { - channel.send(Unpooled.wrappedBuffer(testRequest.toString().getBytes(StandardCharsets.UTF_8)), new HashMap(), null, SendMode.HANDLE_AND_MANGLE) + channel.send(Unpooled.wrappedBuffer(testRequest.toString().getBytes(StandardCharsets.UTF_8)), + strategy.getState().enrichProperties(), + null, + SendMode.HANDLE_AND_MANGLE) .thenAcceptAsync(x -> strategy.getState().addMessageID(x), executorService); LOGGER.info("Send TestRequest to server - {}", testRequest); resetTestRequestTask(); @@ -1176,7 +1194,10 @@ public void sendLogon() { StringBuilder logon = buildLogon(props); LOGGER.info("Send logon - {}", logon); - channel.send(Unpooled.wrappedBuffer(logon.toString().getBytes(StandardCharsets.UTF_8)), props, null, SendMode.HANDLE_AND_MANGLE) + channel.send(Unpooled.wrappedBuffer(logon.toString().getBytes(StandardCharsets.UTF_8)), + strategy.getState().enrichProperties(props), + null, + SendMode.HANDLE_AND_MANGLE) .thenAcceptAsync(x -> strategy.getState().addMessageID(x), executorService); } @@ -1250,7 +1271,7 @@ private void sendLogout(String text, boolean isPossDup) { try { MessageID messageID = channel.send( Unpooled.wrappedBuffer(logout.toString().getBytes(StandardCharsets.UTF_8)), - new HashMap(), + strategy.getState().enrichProperties(), null, SendMode.HANDLE_AND_MANGLE ).get(); @@ -1302,9 +1323,11 @@ public void close() { // defaultSend(IChannel channel, ByteBuf message, Map properties, EventID eventID) { - CompletableFuture messageId = channel.send(message, properties, eventID, SendMode.HANDLE_AND_MANGLE); - return messageId; + private CompletableFuture defaultSend(IChannel channel, + ByteBuf message, + Map properties, + EventID eventID) { + return channel.send(message, strategy.getState().enrichProperties(properties), eventID, SendMode.HANDLE_AND_MANGLE); } private CompletableFuture bulkSend(IChannel channel, ByteBuf message, Map properties, EventID eventID) { @@ -1316,7 +1339,10 @@ private CompletableFuture bulkSend(IChannel channel, ByteBuf message, strategyState.updateCacheAndRunOnCondition(message, x -> x >= config.getBatchSize(), buffer -> { try { LOGGER.info("Sending batch of size: {}", config.getBatchSize()); - channel.send(asExpandable(buffer), properties, eventID, SendMode.DIRECT) + channel.send(asExpandable(buffer), + strategy.getState().enrichProperties(properties), + eventID, + SendMode.DIRECT) .thenAcceptAsync(strategyState::addMessageID, executorService); } catch (Exception e) { LOGGER.error("Error while sending batch.", e); @@ -1349,7 +1375,10 @@ private CompletableFuture splitSend(IChannel channel, ByteBuf message } catch (InterruptedException e) { LOGGER.error("Error while sending messages in different tcp packets."); } - channel.send(asExpandable(slice), metadata, eventID, SendMode.DIRECT_SOCKET); + channel.send(asExpandable(slice), + strategy.getState().enrichProperties(metadata), + eventID, + SendMode.DIRECT_SOCKET); resetHeartbeatTask(); sendingTimes.add(Instant.now()); } @@ -1358,7 +1387,10 @@ private CompletableFuture splitSend(IChannel channel, ByteBuf message String slicesTimestamps = sendingTimes.stream().map(formatter::format).collect(Collectors.joining(",")); metadata.put(SPLIT_SEND_TIMESTAMPS_PROPERTY, slicesTimestamps); LOGGER.info("Sent message by slices: {}", slicesTimestamps); - CompletableFuture messageID = channel.send(asExpandable(message), metadata, eventID, SendMode.DIRECT_MSTORE); + CompletableFuture messageID = channel.send(asExpandable(message), + strategy.getState().enrichProperties(metadata), + eventID, + SendMode.DIRECT_MSTORE); messageID.thenAcceptAsync(x -> strategy.getState().addMessageID(x), executorService); return messageID; } @@ -1739,7 +1771,7 @@ private void runLogonFromAnotherConnection(RuleConfiguration configuration) { props.put("sentUsingAnotherSocket", "True"); ByteBuf logonBuf = Unpooled.wrappedBuffer(logon.toString().getBytes(StandardCharsets.UTF_8)); - channel.send(logonBuf, props, null, SendMode.DIRECT_MSTORE) + channel.send(logonBuf, strategy.getState().enrichProperties(props), null, SendMode.DIRECT_MSTORE) .thenAcceptAsync(x -> { strategy.getState().addMessageID(x); }, executorService); @@ -2099,7 +2131,10 @@ private void sendSequenceReset(RuleConfiguration configuration) { } setChecksumAndBodyLength(sequenceReset); - channel.send(Unpooled.wrappedBuffer(sequenceReset.toString().getBytes(StandardCharsets.UTF_8)), new HashMap(), null, SendMode.HANDLE_AND_MANGLE) + channel.send(Unpooled.wrappedBuffer(sequenceReset.toString().getBytes(StandardCharsets.UTF_8)), + strategy.getState().enrichProperties(), + null, + SendMode.HANDLE_AND_MANGLE) .thenAcceptAsync(x -> strategy.getState().addMessageID(x), executorService); resetHeartbeatTask(); strategy.cleanupStrategy(); @@ -2118,7 +2153,7 @@ private void cleanupBatchSendStrategy() { strategy.updateSendStrategy(x -> { state.executeOnBatchCacheIfCondition(size -> size > 0, message -> { try { - channel.send(message, new HashMap(), null, SendMode.DIRECT) + channel.send(message, strategy.getState().enrichProperties(), null, SendMode.DIRECT) .thenAcceptAsync(messageID -> strategy.getState().addMessageID(messageID), executorService); } catch (Exception e) { LOGGER.error("Error while sending batch.", e); @@ -2252,7 +2287,7 @@ private void recoveryFromState(Integer beginSeqNo, Integer endSeqNo) { channel.send( Unpooled.wrappedBuffer(seqReset.toString().getBytes(StandardCharsets.UTF_8)), - new HashMap(), null, SendMode.MANGLE + strategy.getState().enrichProperties(), null, SendMode.MANGLE ).thenAcceptAsync(x -> strategy.getState().addMessageID(x), executorService); } else { FixField possDup = findField(missedMessage, POSS_DUP_TAG); @@ -2271,7 +2306,7 @@ private void recoveryFromState(Integer beginSeqNo, Integer endSeqNo) { LOGGER.info("Sending recovery message from state: {}", missedMessage.toString(US_ASCII)); if(!skip) { - channel.send(missedMessage, new HashMap(), null, SendMode.MANGLE) + channel.send(missedMessage, strategy.getState().enrichProperties(), null, SendMode.MANGLE) .thenAcceptAsync(x -> strategy.getState().addMessageID(x), executorService); try { Thread.sleep(settings.getRecoverySendIntervalMs()); @@ -2286,7 +2321,7 @@ private void recoveryFromState(Integer beginSeqNo, Integer endSeqNo) { } if(!skip && recoveryConfig.getOutOfOrder()) { - channel.send(skipped, new HashMap(), null, SendMode.MANGLE) + channel.send(skipped, strategy.getState().enrichProperties(), null, SendMode.MANGLE) .thenAcceptAsync(x -> strategy.getState().addMessageID(x), executorService); try { Thread.sleep(settings.getRecoverySendIntervalMs()); diff --git a/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/brokenconn/strategy/StrategyState.kt b/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/brokenconn/strategy/StrategyState.kt index 129f9ae..1c0507a 100644 --- a/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/brokenconn/strategy/StrategyState.kt +++ b/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/brokenconn/strategy/StrategyState.kt @@ -1,5 +1,5 @@ /* - * Copyright 2023 Exactpro (Exactpro Systems Limited) + * Copyright 2023-2024 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,19 +16,18 @@ package com.exactpro.th2.conn.dirty.fix.brokenconn.strategy import com.exactpro.th2.common.grpc.MessageID +import com.exactpro.th2.common.message.toJson import com.exactpro.th2.conn.dirty.fix.brokenconn.configuration.RuleConfiguration import com.exactpro.th2.netty.bytebuf.util.asExpandable -import com.google.protobuf.TextFormat.shortDebugString import io.netty.buffer.ByteBuf import io.netty.buffer.CompositeByteBuf import io.netty.buffer.Unpooled +import mu.KotlinLogging import java.time.Instant -import java.util.Collections import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.locks.ReentrantReadWriteLock import kotlin.concurrent.read import kotlin.concurrent.write -import mu.KotlinLogging class StrategyState(val config: RuleConfiguration? = null, private val missedMessagesCache: ConcurrentHashMap = ConcurrentHashMap() @@ -36,7 +35,7 @@ class StrategyState(val config: RuleConfiguration? = null, val startTime: Instant = Instant.now() val type = config?.ruleType ?: RuleType.DEFAULT private val batchMessageCache: CompositeByteBuf = Unpooled.compositeBuffer() - private val messageIDs: MutableList = ArrayList() + private val messageIDs: MutableList = ArrayList() private val lock = ReentrantReadWriteLock() private var batchMessageCacheSize = 0 @@ -102,9 +101,24 @@ class StrategyState(val config: RuleConfiguration? = null, } } + @JvmOverloads + fun enrichProperties(properties: MutableMap? = null): MutableMap { + if (type != RuleType.DEFAULT) { + return properties?.apply { + val previous = put(STRATEGY_PROPERTY, type.name) + when (previous) { + null -> { /* do noting */ } + type.name -> K_LOGGER.debug { "Strategy name $type is already set" } + else -> K_LOGGER.warn { "Strategy name $properties has been replaced to $type" } + } + } ?: hashMapOf(STRATEGY_PROPERTY to type.name) + } + return hashMapOf() + } + fun addMessageID(messageID: MessageID?) = lock.write { if (messageIDs.size + 1 >= TOO_BIG_MESSAGE_IDS_LIST) { - K_LOGGER.warn { "Strategy ${type} messageIDs list is too big. Skiping messageID: ${shortDebugString(messageID)}" } + K_LOGGER.warn { "Strategy $type messageIDs list is too big. Skipping messageID: ${messageID?.toJson()}" } } messageID?.let { messageIDs.add(it) } } @@ -114,7 +128,8 @@ class StrategyState(val config: RuleConfiguration? = null, } companion object { - private const val TOO_BIG_MESSAGE_IDS_LIST = 300; + private const val STRATEGY_PROPERTY: String = "th2.broken.strategy" + private const val TOO_BIG_MESSAGE_IDS_LIST = 300 private val K_LOGGER = KotlinLogging.logger { } fun StrategyState.resetAndCopyMissedMessages(ruleConfiguration: RuleConfiguration? = null): StrategyState = StrategyState(ruleConfiguration, this.missedMessagesCache)