Skip to content

Commit

Permalink
Migrate to atomics for holding current state
Browse files Browse the repository at this point in the history
  • Loading branch information
OptimumCode committed Oct 23, 2023
1 parent 1875f69 commit c6f3b54
Showing 1 changed file with 15 additions and 14 deletions.
29 changes: 15 additions & 14 deletions src/main/java/com/exactpro/th2/FixHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
Expand All @@ -67,7 +68,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.concurrent.NotThreadSafe;
import javax.annotation.concurrent.ThreadSafe;

import static com.exactpro.th2.conn.dirty.fix.FixByteBufUtilKt.findField;
import static com.exactpro.th2.conn.dirty.fix.FixByteBufUtilKt.findLastField;
Expand Down Expand Up @@ -1177,13 +1178,13 @@ private void debug(String message, Object... args) {
}
}

@NotThreadSafe
@ThreadSafe
private static class TimeoutSendHandler {
private final Consumer<Event> eventConsumer;
private final long maxTimeout;
private final long minTimeout;
private long currentTimeout;
private int attempts;
private final AtomicLong currentTimeout;
private final AtomicInteger attempts = new AtomicInteger();

TimeoutSendHandler(long maxTimeout, long minTimeout, Consumer<Event> eventConsumer) {
if (maxTimeout < minTimeout) {
Expand All @@ -1198,20 +1199,20 @@ private static class TimeoutSendHandler {
this.maxTimeout = maxTimeout;
this.minTimeout = minTimeout;
this.eventConsumer = requireNonNull(eventConsumer, "event consumer");
currentTimeout = maxTimeout;
currentTimeout = new AtomicLong(maxTimeout);
}

public void getWithTimeout(Future<?> future) throws ExecutionException, InterruptedException, TimeoutException {
try {
future.get(currentTimeout, TimeUnit.MILLISECONDS);
if (attempts > 0) {
generateEvent(attempts);
future.get(currentTimeout.get(), TimeUnit.MILLISECONDS);
int spentAttempts = attempts.getAndSet(0);
if (spentAttempts > 0) {
generateEvent(spentAttempts);
}
currentTimeout = maxTimeout;
attempts = 0;
currentTimeout.set(maxTimeout);
} catch (ExecutionException | InterruptedException | TimeoutException ex) {
currentTimeout = Math.max(minTimeout, currentTimeout / 2);
attempts += 1;
attempts.incrementAndGet();
currentTimeout.updateAndGet(cur -> Math.max(minTimeout, cur / 2));
throw ex;
}
}
Expand All @@ -1226,11 +1227,11 @@ private void generateEvent(int attempts) {
}

public long getCurrentTimeout() {
return currentTimeout;
return currentTimeout.get();
}

public long getDeadline() {
return System.currentTimeMillis() + currentTimeout;
return System.currentTimeMillis() + currentTimeout.get();
}
}
}

0 comments on commit c6f3b54

Please sign in to comment.