Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[TH2-5050] Decrease timeout for sending on each attempt #67

Merged
merged 8 commits into from
Oct 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,10 @@ This microservice allows sending and receiving messages via FIX protocol
+ *logoutOnIncorrectServerSequence* - whether to logout session when server send message with sequence number less than expected. If `false` then internal conn sequence will be reset to sequence number from server message.
+ *connectionTimeoutOnSend* - timeout in milliseconds for sending message from queue thread
(please read about [acknowledgment timeout](https://www.rabbitmq.com/consumers.html#acknowledgement-timeout) to understand the problem).
_Default, 30000 mls._
_Default, 30000 mls._ Each failed sending attempt decreases the timeout in half (but not less than _minConnectionTimeoutOnSend_).
The timeout is reset to the original value after a successful sending attempt.
If connection is not established within the specified timeout an error will be reported.
+ *minConnectionTimeoutOnSend* - minimum value for the sending message timeout in milliseconds. _Default value is 1000 mls._

### Security settings

Expand Down Expand Up @@ -333,6 +335,11 @@ spec:

# Changelog

## 1.5.0

* `minConnectionTimeoutOnSend` parameter is added.
* Sending timeout now decreases in half on each failed attempt (but not less than `minConnectionTimeoutOnSend`).

## 1.4.2
* Ungraceful session disconnect support.
* Removed NPE when session is reset by schedule.
Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ dependencies {
}
implementation "com.exactpro.th2:common-utils:2.2.0-dev"
implementation 'com.exactpro.th2:netty-bytebuf-utils:0.0.1'
implementation 'com.exactpro.th2:conn-dirty-tcp-core:3.2.1-dev'
implementation 'com.exactpro.th2:conn-dirty-tcp-core:3.3.0-dev'
implementation 'com.exactpro.th2:grpc-lw-data-provider:2.2.0-dev'

implementation 'org.slf4j:slf4j-api'
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
release_version=1.4.2
release_version=1.5.0
23 changes: 15 additions & 8 deletions src/main/java/com/exactpro/th2/FixHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.exactpro.th2.common.utils.event.transport.EventUtilsKt;
import com.exactpro.th2.conn.dirty.fix.FixField;
import com.exactpro.th2.conn.dirty.fix.MessageLoader;
import com.exactpro.th2.conn.dirty.tcp.core.SendingTimeoutHandler;
import com.exactpro.th2.conn.dirty.tcp.core.api.IChannel;
import com.exactpro.th2.conn.dirty.tcp.core.api.IChannel.SendMode;
import com.exactpro.th2.conn.dirty.tcp.core.api.IHandler;
Expand Down Expand Up @@ -57,6 +58,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;

import kotlin.jvm.functions.Function1;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
Expand Down Expand Up @@ -165,6 +167,8 @@ public class FixHandler implements AutoCloseable, IHandler {

private final AtomicReference<Future<?>> heartbeatTimer = new AtomicReference<>(CompletableFuture.completedFuture(null));
private final AtomicReference<Future<?>> testRequestTimer = new AtomicReference<>(CompletableFuture.completedFuture(null));

private final SendingTimeoutHandler sendingTimeoutHandler;
private Future<?> reconnectRequestTimer = CompletableFuture.completedFuture(null);
private volatile IChannel channel;
protected FixHandlerSettings settings;
Expand Down Expand Up @@ -235,9 +239,11 @@ public FixHandler(IHandlerContext context) {
if (settings.getHeartBtInt() <= 0) throw new IllegalArgumentException("HeartBtInt cannot be negative or zero");
if (settings.getTestRequestDelay() <= 0) throw new IllegalArgumentException("TestRequestDelay cannot be negative or zero");
if (settings.getDisconnectRequestDelay() <= 0) throw new IllegalArgumentException("DisconnectRequestDelay cannot be negative or zero");
if (settings.getConnectionTimeoutOnSend() <= 0) {
throw new IllegalArgumentException("connectionTimeoutOnSend must be greater than zero");
}
this.sendingTimeoutHandler = SendingTimeoutHandler.create(
settings.getMinConnectionTimeoutOnSend(),
settings.getConnectionTimeoutOnSend(),
context::send
);
}

@Override
Expand Down Expand Up @@ -276,7 +282,7 @@ private CompletableFuture<MessageID> send(@NotNull ByteBuf body, @NotNull Map<St
try {
disconnect(!isUngracefulDisconnect);
enabled.set(false);
channel.open().get(settings.getConnectionTimeoutOnSend(), TimeUnit.MILLISECONDS);
sendingTimeoutHandler.getWithTimeout(channel.open());
} catch (Exception e) {
context.send(CommonUtil.toErrorEvent(String.format("Error while ending session %s by user logout. Is graceful disconnect: %b", channel.getSessionAlias(), !isUngracefulDisconnect), e));
}
Expand All @@ -286,15 +292,16 @@ private CompletableFuture<MessageID> send(@NotNull ByteBuf body, @NotNull Map<St
// TODO: probably, this should be moved to the core part
// But those changes will break API
// So, let's keep it here for now
long deadline = System.currentTimeMillis() + settings.getConnectionTimeoutOnSend();
long deadline = sendingTimeoutHandler.getDeadline();
long currentTimeout = sendingTimeoutHandler.getCurrentTimeout();

if (!channel.isOpen()) {
try {
channel.open().get(settings.getConnectionTimeoutOnSend(), TimeUnit.MILLISECONDS);
sendingTimeoutHandler.getWithTimeout(channel.open());
} catch (TimeoutException e) {
ExceptionUtils.rethrow(new TimeoutException(
String.format("could not open connection before timeout %d mls elapsed",
settings.getConnectionTimeoutOnSend())));
currentTimeout)));
} catch (Exception e) {
ExceptionUtils.rethrow(e);
}
Expand All @@ -310,7 +317,7 @@ private CompletableFuture<MessageID> send(@NotNull ByteBuf body, @NotNull Map<St
if (System.currentTimeMillis() > deadline) {
// The method should have checked exception in signature...
ExceptionUtils.rethrow(new TimeoutException(String.format("session was not established within %d mls",
settings.getConnectionTimeoutOnSend())));
currentTimeout)));
}
}

Expand Down
10 changes: 10 additions & 0 deletions src/main/java/com/exactpro/th2/FixHandlerSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ public class FixHandlerSettings implements IHandlerSettings {
*/
private long connectionTimeoutOnSend = DEFAULT_CONNECTION_TIMEOUT_ON_SEND;

private long minConnectionTimeoutOnSend = 1_000;

@JsonDeserialize(using = DateTimeFormatterDeserializer.class)
private DateTimeFormatter sendingDateTimeFormat = DateTimeFormatter.ofPattern("yyyyMMdd-HH:mm:ss.SSSSSSSSS");

Expand Down Expand Up @@ -317,4 +319,12 @@ public long getConnectionTimeoutOnSend() {
public void setConnectionTimeoutOnSend(long connectionTimeoutOnSend) {
this.connectionTimeoutOnSend = connectionTimeoutOnSend;
}

public long getMinConnectionTimeoutOnSend() {
return minConnectionTimeoutOnSend;
}

public void setMinConnectionTimeoutOnSend(long minConnectionTimeoutOnSend) {
this.minConnectionTimeoutOnSend = minConnectionTimeoutOnSend;
}
}
2 changes: 2 additions & 0 deletions src/test/java/com/exactpro/th2/FixHandlerSendTimeoutTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ void sendTimeoutOnConnectionOpen() {
settings.setPort(42);
settings.setHost("localhost");
settings.setConnectionTimeoutOnSend(300); // 300 millis
settings.setMinConnectionTimeoutOnSend(100);
Mockito.when(contextMock.getSettings())
.thenReturn(settings);
var fixHandler = new FixHandler(contextMock);
Expand Down Expand Up @@ -100,6 +101,7 @@ void sendTimeoutOnSessionEnabled() {
settings.setPort(42);
settings.setHost("localhost");
settings.setConnectionTimeoutOnSend(300); // 300 millis
settings.setMinConnectionTimeoutOnSend(100);
LocalTime currentTime = LocalTime.now(ZoneOffset.UTC);
int deltaMinutes = currentTime.isAfter(LocalTime.NOON)
? -1
Expand Down
Loading