Skip to content

Commit

Permalink
Add event notification when sending successful after failed attempts
Browse files Browse the repository at this point in the history
  • Loading branch information
OptimumCode committed Oct 23, 2023
1 parent 1f25416 commit 1875f69
Showing 1 changed file with 25 additions and 2 deletions.
27 changes: 25 additions & 2 deletions src/main/java/com/exactpro/th2/FixHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;

import kotlin.jvm.functions.Function1;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
Expand All @@ -65,6 +67,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.concurrent.NotThreadSafe;

import static com.exactpro.th2.conn.dirty.fix.FixByteBufUtilKt.findField;
import static com.exactpro.th2.conn.dirty.fix.FixByteBufUtilKt.findLastField;
import static com.exactpro.th2.conn.dirty.fix.FixByteBufUtilKt.firstField;
Expand Down Expand Up @@ -239,7 +243,8 @@ public FixHandler(IHandlerContext context) {
if (settings.getDisconnectRequestDelay() <= 0) throw new IllegalArgumentException("DisconnectRequestDelay cannot be negative or zero");
this.timeoutSendHandler = new TimeoutSendHandler(
settings.getConnectionTimeoutOnSend(),
settings.getMinConnectionTimeoutOnSend()
settings.getMinConnectionTimeoutOnSend(),
context::send
);
}

Expand Down Expand Up @@ -1172,12 +1177,15 @@ private void debug(String message, Object... args) {
}
}

@NotThreadSafe
private static class TimeoutSendHandler {
private final Consumer<Event> eventConsumer;
private final long maxTimeout;
private final long minTimeout;
private long currentTimeout;
private int attempts;

TimeoutSendHandler(long maxTimeout, long minTimeout) {
TimeoutSendHandler(long maxTimeout, long minTimeout, Consumer<Event> eventConsumer) {
if (maxTimeout < minTimeout) {
throw new IllegalArgumentException("max timeout must be greater than min timeout");
}
Expand All @@ -1189,19 +1197,34 @@ private static class TimeoutSendHandler {
}
this.maxTimeout = maxTimeout;
this.minTimeout = minTimeout;
this.eventConsumer = requireNonNull(eventConsumer, "event consumer");
currentTimeout = maxTimeout;
}

public void getWithTimeout(Future<?> future) throws ExecutionException, InterruptedException, TimeoutException {
try {
future.get(currentTimeout, TimeUnit.MILLISECONDS);
if (attempts > 0) {
generateEvent(attempts);
}
currentTimeout = maxTimeout;
attempts = 0;
} catch (ExecutionException | InterruptedException | TimeoutException ex) {
currentTimeout = Math.max(minTimeout, currentTimeout / 2);
attempts += 1;
throw ex;
}
}

private void generateEvent(int attempts) {
eventConsumer.accept(
Event.start().endTimestamp()
.status(Event.Status.FAILED)
.name("Message sending attempt successful after " + attempts + " failed attempt(s)")
.type("MessageSendingAttempts")
);
}

public long getCurrentTimeout() {
return currentTimeout;
}
Expand Down

0 comments on commit 1875f69

Please sign in to comment.