Skip to content

Commit

Permalink
[TH2-5050] Decrease timeout for sending on each attempt (#67)
Browse files Browse the repository at this point in the history
* Descrease timeout for sending on each attempt

* Set min timeout in tests

* Add event notification when sending successful after failed attempts

* Migrate to atomics for holding current state

* Use locks under the hood

* Move handler to the core part

* Use dev release for tcp-dirty-core

* Update version and readme
  • Loading branch information
OptimumCode authored Oct 24, 2023
1 parent 27c30e2 commit 5403fa1
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 11 deletions.
9 changes: 8 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,10 @@ This microservice allows sending and receiving messages via FIX protocol
+ *logoutOnIncorrectServerSequence* - whether to logout session when server send message with sequence number less than expected. If `false` then internal conn sequence will be reset to sequence number from server message.
+ *connectionTimeoutOnSend* - timeout in milliseconds for sending message from queue thread
(please read about [acknowledgment timeout](https://www.rabbitmq.com/consumers.html#acknowledgement-timeout) to understand the problem).
_Default, 30000 mls._
_Default, 30000 mls._ Each failed sending attempt decreases the timeout in half (but not less than _minConnectionTimeoutOnSend_).
The timeout is reset to the original value after a successful sending attempt.
If connection is not established within the specified timeout an error will be reported.
+ *minConnectionTimeoutOnSend* - minimum value for the sending message timeout in milliseconds. _Default value is 1000 mls._

### Security settings

Expand Down Expand Up @@ -333,6 +335,11 @@ spec:

# Changelog

## 1.5.0

* `minConnectionTimeoutOnSend` parameter is added.
* Sending timeout now decreases in half on each failed attempt (but not less than `minConnectionTimeoutOnSend`).

## 1.4.2
* Ungraceful session disconnect support.
* Removed NPE when session is reset by schedule.
Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ dependencies {
}
implementation "com.exactpro.th2:common-utils:2.2.0-dev"
implementation 'com.exactpro.th2:netty-bytebuf-utils:0.0.1'
implementation 'com.exactpro.th2:conn-dirty-tcp-core:3.2.1-dev'
implementation 'com.exactpro.th2:conn-dirty-tcp-core:3.3.0-dev'
implementation 'com.exactpro.th2:grpc-lw-data-provider:2.2.0-dev'

implementation 'org.slf4j:slf4j-api'
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
release_version=1.4.2
release_version=1.5.0
23 changes: 15 additions & 8 deletions src/main/java/com/exactpro/th2/FixHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.exactpro.th2.common.utils.event.transport.EventUtilsKt;
import com.exactpro.th2.conn.dirty.fix.FixField;
import com.exactpro.th2.conn.dirty.fix.MessageLoader;
import com.exactpro.th2.conn.dirty.tcp.core.SendingTimeoutHandler;
import com.exactpro.th2.conn.dirty.tcp.core.api.IChannel;
import com.exactpro.th2.conn.dirty.tcp.core.api.IChannel.SendMode;
import com.exactpro.th2.conn.dirty.tcp.core.api.IHandler;
Expand Down Expand Up @@ -57,6 +58,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;

import kotlin.jvm.functions.Function1;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
Expand Down Expand Up @@ -165,6 +167,8 @@ public class FixHandler implements AutoCloseable, IHandler {

private final AtomicReference<Future<?>> heartbeatTimer = new AtomicReference<>(CompletableFuture.completedFuture(null));
private final AtomicReference<Future<?>> testRequestTimer = new AtomicReference<>(CompletableFuture.completedFuture(null));

private final SendingTimeoutHandler sendingTimeoutHandler;
private Future<?> reconnectRequestTimer = CompletableFuture.completedFuture(null);
private volatile IChannel channel;
protected FixHandlerSettings settings;
Expand Down Expand Up @@ -235,9 +239,11 @@ public FixHandler(IHandlerContext context) {
if (settings.getHeartBtInt() <= 0) throw new IllegalArgumentException("HeartBtInt cannot be negative or zero");
if (settings.getTestRequestDelay() <= 0) throw new IllegalArgumentException("TestRequestDelay cannot be negative or zero");
if (settings.getDisconnectRequestDelay() <= 0) throw new IllegalArgumentException("DisconnectRequestDelay cannot be negative or zero");
if (settings.getConnectionTimeoutOnSend() <= 0) {
throw new IllegalArgumentException("connectionTimeoutOnSend must be greater than zero");
}
this.sendingTimeoutHandler = SendingTimeoutHandler.create(
settings.getMinConnectionTimeoutOnSend(),
settings.getConnectionTimeoutOnSend(),
context::send
);
}

@Override
Expand Down Expand Up @@ -276,7 +282,7 @@ private CompletableFuture<MessageID> send(@NotNull ByteBuf body, @NotNull Map<St
try {
disconnect(!isUngracefulDisconnect);
enabled.set(false);
channel.open().get(settings.getConnectionTimeoutOnSend(), TimeUnit.MILLISECONDS);
sendingTimeoutHandler.getWithTimeout(channel.open());
} catch (Exception e) {
context.send(CommonUtil.toErrorEvent(String.format("Error while ending session %s by user logout. Is graceful disconnect: %b", channel.getSessionAlias(), !isUngracefulDisconnect), e));
}
Expand All @@ -286,15 +292,16 @@ private CompletableFuture<MessageID> send(@NotNull ByteBuf body, @NotNull Map<St
// TODO: probably, this should be moved to the core part
// But those changes will break API
// So, let's keep it here for now
long deadline = System.currentTimeMillis() + settings.getConnectionTimeoutOnSend();
long deadline = sendingTimeoutHandler.getDeadline();
long currentTimeout = sendingTimeoutHandler.getCurrentTimeout();

if (!channel.isOpen()) {
try {
channel.open().get(settings.getConnectionTimeoutOnSend(), TimeUnit.MILLISECONDS);
sendingTimeoutHandler.getWithTimeout(channel.open());
} catch (TimeoutException e) {
ExceptionUtils.rethrow(new TimeoutException(
String.format("could not open connection before timeout %d mls elapsed",
settings.getConnectionTimeoutOnSend())));
currentTimeout)));
} catch (Exception e) {
ExceptionUtils.rethrow(e);
}
Expand All @@ -310,7 +317,7 @@ private CompletableFuture<MessageID> send(@NotNull ByteBuf body, @NotNull Map<St
if (System.currentTimeMillis() > deadline) {
// The method should have checked exception in signature...
ExceptionUtils.rethrow(new TimeoutException(String.format("session was not established within %d mls",
settings.getConnectionTimeoutOnSend())));
currentTimeout)));
}
}

Expand Down
10 changes: 10 additions & 0 deletions src/main/java/com/exactpro/th2/FixHandlerSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ public class FixHandlerSettings implements IHandlerSettings {
*/
private long connectionTimeoutOnSend = DEFAULT_CONNECTION_TIMEOUT_ON_SEND;

private long minConnectionTimeoutOnSend = 1_000;

@JsonDeserialize(using = DateTimeFormatterDeserializer.class)
private DateTimeFormatter sendingDateTimeFormat = DateTimeFormatter.ofPattern("yyyyMMdd-HH:mm:ss.SSSSSSSSS");

Expand Down Expand Up @@ -317,4 +319,12 @@ public long getConnectionTimeoutOnSend() {
public void setConnectionTimeoutOnSend(long connectionTimeoutOnSend) {
this.connectionTimeoutOnSend = connectionTimeoutOnSend;
}

public long getMinConnectionTimeoutOnSend() {
return minConnectionTimeoutOnSend;
}

public void setMinConnectionTimeoutOnSend(long minConnectionTimeoutOnSend) {
this.minConnectionTimeoutOnSend = minConnectionTimeoutOnSend;
}
}
2 changes: 2 additions & 0 deletions src/test/java/com/exactpro/th2/FixHandlerSendTimeoutTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ void sendTimeoutOnConnectionOpen() {
settings.setPort(42);
settings.setHost("localhost");
settings.setConnectionTimeoutOnSend(300); // 300 millis
settings.setMinConnectionTimeoutOnSend(100);
Mockito.when(contextMock.getSettings())
.thenReturn(settings);
var fixHandler = new FixHandler(contextMock);
Expand Down Expand Up @@ -100,6 +101,7 @@ void sendTimeoutOnSessionEnabled() {
settings.setPort(42);
settings.setHost("localhost");
settings.setConnectionTimeoutOnSend(300); // 300 millis
settings.setMinConnectionTimeoutOnSend(100);
LocalTime currentTime = LocalTime.now(ZoneOffset.UTC);
int deltaMinutes = currentTime.isAfter(LocalTime.NOON)
? -1
Expand Down

0 comments on commit 5403fa1

Please sign in to comment.