Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[TH2-5050] Decrease timeout for sending on each attempt #67

Merged
merged 8 commits into from
Oct 24, 2023
Merged
82 changes: 74 additions & 8 deletions src/main/java/com/exactpro/th2/FixHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;

import kotlin.jvm.functions.Function1;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
Expand All @@ -65,6 +67,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.concurrent.NotThreadSafe;

import static com.exactpro.th2.conn.dirty.fix.FixByteBufUtilKt.findField;
import static com.exactpro.th2.conn.dirty.fix.FixByteBufUtilKt.findLastField;
import static com.exactpro.th2.conn.dirty.fix.FixByteBufUtilKt.firstField;
Expand Down Expand Up @@ -165,6 +169,8 @@ public class FixHandler implements AutoCloseable, IHandler {

private final AtomicReference<Future<?>> heartbeatTimer = new AtomicReference<>(CompletableFuture.completedFuture(null));
private final AtomicReference<Future<?>> testRequestTimer = new AtomicReference<>(CompletableFuture.completedFuture(null));

private final TimeoutSendHandler timeoutSendHandler;
private Future<?> reconnectRequestTimer = CompletableFuture.completedFuture(null);
private volatile IChannel channel;
protected FixHandlerSettings settings;
Expand Down Expand Up @@ -235,9 +241,11 @@ public FixHandler(IHandlerContext context) {
if (settings.getHeartBtInt() <= 0) throw new IllegalArgumentException("HeartBtInt cannot be negative or zero");
if (settings.getTestRequestDelay() <= 0) throw new IllegalArgumentException("TestRequestDelay cannot be negative or zero");
if (settings.getDisconnectRequestDelay() <= 0) throw new IllegalArgumentException("DisconnectRequestDelay cannot be negative or zero");
if (settings.getConnectionTimeoutOnSend() <= 0) {
throw new IllegalArgumentException("connectionTimeoutOnSend must be greater than zero");
}
this.timeoutSendHandler = new TimeoutSendHandler(
settings.getConnectionTimeoutOnSend(),
settings.getMinConnectionTimeoutOnSend(),
context::send
);
}

@Override
Expand Down Expand Up @@ -276,7 +284,7 @@ private CompletableFuture<MessageID> send(@NotNull ByteBuf body, @NotNull Map<St
try {
disconnect(!isUngracefulDisconnect);
enabled.set(false);
channel.open().get(settings.getConnectionTimeoutOnSend(), TimeUnit.MILLISECONDS);
timeoutSendHandler.getWithTimeout(channel.open());
} catch (Exception e) {
context.send(CommonUtil.toErrorEvent(String.format("Error while ending session %s by user logout. Is graceful disconnect: %b", channel.getSessionAlias(), !isUngracefulDisconnect), e));
}
Expand All @@ -286,15 +294,16 @@ private CompletableFuture<MessageID> send(@NotNull ByteBuf body, @NotNull Map<St
// TODO: probably, this should be moved to the core part
// But those changes will break API
// So, let's keep it here for now
long deadline = System.currentTimeMillis() + settings.getConnectionTimeoutOnSend();
long deadline = timeoutSendHandler.getDeadline();
long currentTimeout = timeoutSendHandler.getCurrentTimeout();

if (!channel.isOpen()) {
try {
channel.open().get(settings.getConnectionTimeoutOnSend(), TimeUnit.MILLISECONDS);
timeoutSendHandler.getWithTimeout(channel.open());
} catch (TimeoutException e) {
ExceptionUtils.rethrow(new TimeoutException(
String.format("could not open connection before timeout %d mls elapsed",
settings.getConnectionTimeoutOnSend())));
currentTimeout)));
} catch (Exception e) {
ExceptionUtils.rethrow(e);
}
Expand All @@ -310,7 +319,7 @@ private CompletableFuture<MessageID> send(@NotNull ByteBuf body, @NotNull Map<St
if (System.currentTimeMillis() > deadline) {
// The method should have checked exception in signature...
ExceptionUtils.rethrow(new TimeoutException(String.format("session was not established within %d mls",
settings.getConnectionTimeoutOnSend())));
currentTimeout)));
}
}

Expand Down Expand Up @@ -1167,4 +1176,61 @@ private void debug(String message, Object... args) {
LOGGER.debug("{} - {}: {}", channel.getSessionGroup(), channel.getSessionAlias(), String.format(message, args));
}
}

@NotThreadSafe
private static class TimeoutSendHandler {
private final Consumer<Event> eventConsumer;
private final long maxTimeout;
private final long minTimeout;
private long currentTimeout;
private int attempts;

TimeoutSendHandler(long maxTimeout, long minTimeout, Consumer<Event> eventConsumer) {
if (maxTimeout < minTimeout) {
throw new IllegalArgumentException("max timeout must be greater than min timeout");
}
if (maxTimeout <= 0) {
throw new IllegalArgumentException("connectionTimeoutOnSend must be greater than zero");
}
if (minTimeout <= 0) {
throw new IllegalArgumentException("minConnectionTimeoutOnSend must be greater than zero");
}
this.maxTimeout = maxTimeout;
this.minTimeout = minTimeout;
this.eventConsumer = requireNonNull(eventConsumer, "event consumer");
currentTimeout = maxTimeout;
}

public void getWithTimeout(Future<?> future) throws ExecutionException, InterruptedException, TimeoutException {
try {
future.get(currentTimeout, TimeUnit.MILLISECONDS);
if (attempts > 0) {
generateEvent(attempts);
}
currentTimeout = maxTimeout;
attempts = 0;
} catch (ExecutionException | InterruptedException | TimeoutException ex) {
currentTimeout = Math.max(minTimeout, currentTimeout / 2);
attempts += 1;
throw ex;
}
}

private void generateEvent(int attempts) {
eventConsumer.accept(
Event.start().endTimestamp()
.status(Event.Status.FAILED)
.name("Message sending attempt successful after " + attempts + " failed attempt(s)")
.type("MessageSendingAttempts")
);
}

public long getCurrentTimeout() {
return currentTimeout;
}

public long getDeadline() {
return System.currentTimeMillis() + currentTimeout;
}
}
}
10 changes: 10 additions & 0 deletions src/main/java/com/exactpro/th2/FixHandlerSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ public class FixHandlerSettings implements IHandlerSettings {
*/
private long connectionTimeoutOnSend = DEFAULT_CONNECTION_TIMEOUT_ON_SEND;

private long minConnectionTimeoutOnSend = 1_000;

@JsonDeserialize(using = DateTimeFormatterDeserializer.class)
private DateTimeFormatter sendingDateTimeFormat = DateTimeFormatter.ofPattern("yyyyMMdd-HH:mm:ss.SSSSSSSSS");

Expand Down Expand Up @@ -317,4 +319,12 @@ public long getConnectionTimeoutOnSend() {
public void setConnectionTimeoutOnSend(long connectionTimeoutOnSend) {
this.connectionTimeoutOnSend = connectionTimeoutOnSend;
}

public long getMinConnectionTimeoutOnSend() {
return minConnectionTimeoutOnSend;
}

public void setMinConnectionTimeoutOnSend(long minConnectionTimeoutOnSend) {
this.minConnectionTimeoutOnSend = minConnectionTimeoutOnSend;
}
}
2 changes: 2 additions & 0 deletions src/test/java/com/exactpro/th2/FixHandlerSendTimeoutTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ void sendTimeoutOnConnectionOpen() {
settings.setPort(42);
settings.setHost("localhost");
settings.setConnectionTimeoutOnSend(300); // 300 millis
settings.setMinConnectionTimeoutOnSend(100);
Mockito.when(contextMock.getSettings())
.thenReturn(settings);
var fixHandler = new FixHandler(contextMock);
Expand Down Expand Up @@ -100,6 +101,7 @@ void sendTimeoutOnSessionEnabled() {
settings.setPort(42);
settings.setHost("localhost");
settings.setConnectionTimeoutOnSend(300); // 300 millis
settings.setMinConnectionTimeoutOnSend(100);
LocalTime currentTime = LocalTime.now(ZoneOffset.UTC);
int deltaMinutes = currentTime.isAfter(LocalTime.NOON)
? -1
Expand Down
Loading