diff --git a/src/main/java/com/exactpro/th2/FixHandler.java b/src/main/java/com/exactpro/th2/FixHandler.java index 147c5ec..6344683 100644 --- a/src/main/java/com/exactpro/th2/FixHandler.java +++ b/src/main/java/com/exactpro/th2/FixHandler.java @@ -57,6 +57,8 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Consumer; + import kotlin.jvm.functions.Function1; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.exception.ExceptionUtils; @@ -65,6 +67,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.concurrent.NotThreadSafe; + import static com.exactpro.th2.conn.dirty.fix.FixByteBufUtilKt.findField; import static com.exactpro.th2.conn.dirty.fix.FixByteBufUtilKt.findLastField; import static com.exactpro.th2.conn.dirty.fix.FixByteBufUtilKt.firstField; @@ -239,7 +243,8 @@ public FixHandler(IHandlerContext context) { if (settings.getDisconnectRequestDelay() <= 0) throw new IllegalArgumentException("DisconnectRequestDelay cannot be negative or zero"); this.timeoutSendHandler = new TimeoutSendHandler( settings.getConnectionTimeoutOnSend(), - settings.getMinConnectionTimeoutOnSend() + settings.getMinConnectionTimeoutOnSend(), + context::send ); } @@ -1172,12 +1177,15 @@ private void debug(String message, Object... args) { } } + @NotThreadSafe private static class TimeoutSendHandler { + private final Consumer eventConsumer; private final long maxTimeout; private final long minTimeout; private long currentTimeout; + private int attempts; - TimeoutSendHandler(long maxTimeout, long minTimeout) { + TimeoutSendHandler(long maxTimeout, long minTimeout, Consumer eventConsumer) { if (maxTimeout < minTimeout) { throw new IllegalArgumentException("max timeout must be greater than min timeout"); } @@ -1189,19 +1197,34 @@ private static class TimeoutSendHandler { } this.maxTimeout = maxTimeout; this.minTimeout = minTimeout; + this.eventConsumer = requireNonNull(eventConsumer, "event consumer"); currentTimeout = maxTimeout; } public void getWithTimeout(Future future) throws ExecutionException, InterruptedException, TimeoutException { try { future.get(currentTimeout, TimeUnit.MILLISECONDS); + if (attempts > 0) { + generateEvent(attempts); + } currentTimeout = maxTimeout; + attempts = 0; } catch (ExecutionException | InterruptedException | TimeoutException ex) { currentTimeout = Math.max(minTimeout, currentTimeout / 2); + attempts += 1; throw ex; } } + private void generateEvent(int attempts) { + eventConsumer.accept( + Event.start().endTimestamp() + .status(Event.Status.FAILED) + .name("Message sending attempt successful after " + attempts + " failed attempt(s)") + .type("MessageSendingAttempts") + ); + } + public long getCurrentTimeout() { return currentTimeout; }