Skip to content

Commit

Permalink
Additional properties related to message corrupetion strategies. Avoi…
Browse files Browse the repository at this point in the history
…d disconnect loop by skipping corrupted messages during recovery from cradle.
  • Loading branch information
denis.plotnikov committed Oct 23, 2024
1 parent 0d69bcb commit 730cce2
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 1 deletion.
3 changes: 3 additions & 0 deletions src/main/java/com/exactpro/th2/FixHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -1496,6 +1496,7 @@ private void transformProcessor(
strategyState.transformIfCondition(
x -> x <= config.getNumberOfTimesToTransform(),
() -> {
metadata.put("OriginalMessageType", msgTypeField.getValue());
messageTransformer.transformWithoutResults(message, transformation.getCombinedActions());
if(transformation.getNewPassword() != null) {
if(transformation.getEncryptKey() != null) {
Expand Down Expand Up @@ -1694,6 +1695,7 @@ private Map<String, String> corruptMessageStructureProcessor(ByteBuf message, Ma
}

state.getMessageCorrupted().set(true);
metadata.put("isCorruptedMessage", "Y");

return null;
}
Expand All @@ -1720,6 +1722,7 @@ private Map<String, String> adjustSendingTimeStrategyProcessor(ByteBuf message,
AdjustSendingTimeConfiguration config = strategy.getAdjustSendingTimeConfiguration();
metadata.put("sendingTimeUpdated", "Y");
metadata.put("sendingTimeUpdateSeconds", Long.toString(config.getAdjustDuration().toSeconds()));
metadata.put("sendingTimeUpdateSign", config.getSubstract() ? "-" : "+");
metadata.put(ENCODE_MODE_PROPERTY_NAME, DIRTY_ENCODE_MODE_NAME);

updateSendingTime(message, config.getAdjustDuration(), config.getSubstract());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,11 @@ class MessageLoader(
)

while (iterator.hasNext()) {
val message = Unpooled.buffer().writeBytes(iterator.next().message.bodyRaw.toByteArray())
val next = iterator.next().message
if(next.messagePropertiesMap.getOrDefault("isCorruptedMessage", "N") == "Y") {
continue
}
val message = Unpooled.buffer().writeBytes(next.bodyRaw.toByteArray())
K_LOGGER.info { "Sending message to recovery processor: ${message.toString(US_ASCII)}" }
if (!processMessage(message)) break
}
Expand Down

0 comments on commit 730cce2

Please sign in to comment.