From 730cce24c05fe264a899bed0cd888734b7d44106 Mon Sep 17 00:00:00 2001 From: "denis.plotnikov" Date: Wed, 23 Oct 2024 16:05:20 +0300 Subject: [PATCH] Additional properties related to message corrupetion strategies. Avoid disconnect loop by skipping corrupted messages during recovery from cradle. --- src/main/java/com/exactpro/th2/FixHandler.java | 3 +++ .../kotlin/com/exactpro/th2/conn/dirty/fix/MessageLoader.kt | 6 +++++- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/exactpro/th2/FixHandler.java b/src/main/java/com/exactpro/th2/FixHandler.java index b40f0c0..73d496a 100644 --- a/src/main/java/com/exactpro/th2/FixHandler.java +++ b/src/main/java/com/exactpro/th2/FixHandler.java @@ -1496,6 +1496,7 @@ private void transformProcessor( strategyState.transformIfCondition( x -> x <= config.getNumberOfTimesToTransform(), () -> { + metadata.put("OriginalMessageType", msgTypeField.getValue()); messageTransformer.transformWithoutResults(message, transformation.getCombinedActions()); if(transformation.getNewPassword() != null) { if(transformation.getEncryptKey() != null) { @@ -1694,6 +1695,7 @@ private Map corruptMessageStructureProcessor(ByteBuf message, Ma } state.getMessageCorrupted().set(true); + metadata.put("isCorruptedMessage", "Y"); return null; } @@ -1720,6 +1722,7 @@ private Map adjustSendingTimeStrategyProcessor(ByteBuf message, AdjustSendingTimeConfiguration config = strategy.getAdjustSendingTimeConfiguration(); metadata.put("sendingTimeUpdated", "Y"); metadata.put("sendingTimeUpdateSeconds", Long.toString(config.getAdjustDuration().toSeconds())); + metadata.put("sendingTimeUpdateSign", config.getSubstract() ? "-" : "+"); metadata.put(ENCODE_MODE_PROPERTY_NAME, DIRTY_ENCODE_MODE_NAME); updateSendingTime(message, config.getAdjustDuration(), config.getSubstract()); diff --git a/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/MessageLoader.kt b/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/MessageLoader.kt index 406b44d..d4215c0 100644 --- a/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/MessageLoader.kt +++ b/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/MessageLoader.kt @@ -234,7 +234,11 @@ class MessageLoader( ) while (iterator.hasNext()) { - val message = Unpooled.buffer().writeBytes(iterator.next().message.bodyRaw.toByteArray()) + val next = iterator.next().message + if(next.messagePropertiesMap.getOrDefault("isCorruptedMessage", "N") == "Y") { + continue + } + val message = Unpooled.buffer().writeBytes(next.bodyRaw.toByteArray()) K_LOGGER.info { "Sending message to recovery processor: ${message.toString(US_ASCII)}" } if (!processMessage(message)) break }