Skip to content

Commit

Permalink
[#139] Implemented bitset approach to command timeouts, completed imp…
Browse files Browse the repository at this point in the history
…lementation of timeouts
  • Loading branch information
More-Wrong committed Nov 21, 2023
1 parent e22eb47 commit 4e5941d
Show file tree
Hide file tree
Showing 10 changed files with 286 additions and 102 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ private static List<ResponseBuilder> createLinkedPath(ReadToken start) {
return builders;
}

public static ResponseExecutionPath blank() {
return new ResponseExecutionPath(null);
}
public static ResponseExecutionPath parse(ReadToken start) {
List<ResponseBuilder> builders = createLinkedPath(start);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package net.zscript.javaclient.nodes;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -32,7 +35,7 @@ class BufferElement {
}

BufferElement(CommandExecutionPath cmd, long nanoTimeTimeout) {
CommandSequence seq = CommandSequence.from(cmd, currentEcho, supports32Locks, lockConditions);
CommandSequence seq = CommandSequence.from(cmd, echo.getEcho(), supports32Locks, lockConditions);
this.cmd = new AddressedCommand(seq);
this.sameLayer = true;
this.hadEchoBefore = false;
Expand Down Expand Up @@ -64,22 +67,17 @@ public long getNanoTimeTimeout() {
private final Connection connection;
private final Queue<BufferElement> buffer = new ArrayDeque<>();

private final EchoAssigner echo;

private int bufferSize;
private int currentBufferContent = 0;
private int currentEcho = 0x100;

private Collection<LockCondition> lockConditions = new ArrayList<>();
private boolean supports32Locks = false;

private void moveEchoValue() {
currentEcho++;
if (currentEcho > 0xffff) {
currentEcho = 0x100;
}
}

public ConnectionBuffer(Connection connection, int bufferSize) {
public ConnectionBuffer(Connection connection, EchoAssigner echo, int bufferSize) {
this.connection = connection;
this.echo = echo;
this.bufferSize = bufferSize;
}

Expand All @@ -106,10 +104,15 @@ public AddressedCommand match(ResponseSequence sequence) {
if (sequence.getResponseValue() != 0) {
throw new IllegalArgumentException("Cannot match notification sequence with command sequence");
}
if (!sequence.hasEchoValue()) {
return null;
}
boolean removeUpTo = true;
for (Iterator<BufferElement> iter = buffer.iterator(); iter.hasNext(); ) {
BufferElement element = iter.next();
if (element.isSameLayer() && element.getCommand().getContent().getEchoValue() == sequence.getEchoValue()) {
// if the echo value is auto-generated, clear the marker
echo.responseArrivedNormal(sequence.getEchoValue());
if (removeUpTo) {
clearOutTo(element);
} else {
Expand All @@ -133,6 +136,7 @@ public Collection<CommandSequence> checkTimeouts() {
if (currentNano - element.getNanoTimeTimeout() > 0) {
if (element.isSameLayer()) {
timedOut.add(element.getCommand().getContent());
echo.timeout(element.getCommand().getContent().getEchoValue());
}
iter.remove();
currentBufferContent -= element.length;
Expand Down Expand Up @@ -172,7 +176,14 @@ private boolean send(BufferElement element, boolean ignoreLength) {
if (!ignoreLength && element.length + currentBufferContent >= bufferSize) {
return false;
}
moveEchoValue();
// make sure echo system knows about echo usage...
if (element.hadEchoBefore) {
if (element.isSameLayer()) {
echo.manualEchoUse(element.getCommand().getContent().getEchoValue());
}
} else {
echo.moveEcho();
}
buffer.add(element);
currentBufferContent += element.length;
connection.send(element.getCommand());
Expand Down Expand Up @@ -208,16 +219,6 @@ public int getCurrentBufferContent() {
return currentBufferContent;
}

// checks if the target is going to be used as an echo value within 0xf00 uses of the current value.
// ignores the offset from wrapping by having a big enough window.
public boolean isApproachingEcho(int target) {
if (currentEcho > 0xf000) {
return target > currentEcho || target < currentEcho - 0xf000;
} else {
return target < currentEcho + 0x1000 && target > currentEcho;
}
}

public void setBufferSize(int bufferSize) {
this.bufferSize = bufferSize;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
package net.zscript.javaclient.nodes;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.BitSet;

public class EchoAssigner {
private static final Logger LOG = LoggerFactory.getLogger(EchoAssigner.class);

private final static int SEGMENT_TIMEOUTS_BEFORE_CHANGE = 0x0010;
private final static int SEGMENT_MAX_WAITING = 0x00C0;
private final static int SEGMENT_SIZE = 0x0100;

private final long minSegmentChangeTimeNanos;

private BitSet previousMessages = new BitSet(0);
private BitSet sentMessages = new BitSet(SEGMENT_SIZE);

private int previousSegmentOffset = SEGMENT_SIZE;
private int currentSegmentOffset = SEGMENT_SIZE;

private int timeoutCount = 0;
private int waitingCount = 0;
private int currentEcho = 0;

private long lastSegmentChangeTimeNanos = System.nanoTime();

public EchoAssigner(long minSegmentChangeTimeNanos) {
this.minSegmentChangeTimeNanos = minSegmentChangeTimeNanos;
}

public void moveEcho() {
if (waitingCount >= SEGMENT_MAX_WAITING) {
LOG.error("Too many messages waiting for response ({}). Reduce command rate or latency.", waitingCount);
}
if (sentMessages.get(currentEcho)) {
throw new IllegalStateException("Current echo value invalid");
}
sentMessages.set(currentEcho);
currentEcho = sentMessages.nextClearBit(currentEcho + 1);
if (currentEcho == SEGMENT_SIZE) {
currentEcho = sentMessages.nextClearBit(0);
if (currentEcho == SEGMENT_SIZE) {
throw new IllegalStateException("Ran out of echo values to assign");
}
}
waitingCount++;
}

public void manualEchoUse(int echo) {
int relativeEcho = echo - currentSegmentOffset;
if (relativeEcho >= 0 && relativeEcho < SEGMENT_SIZE) {
if (sentMessages.get(relativeEcho)) {
LOG.warn("Echo manually reused when timed out: {}", Integer.toHexString(echo));
} else if (relativeEcho == currentEcho) {
moveEcho();
} else {
sentMessages.set(relativeEcho);
}
}
int relativeEchoPrev = echo - previousSegmentOffset;
if (relativeEchoPrev >= 0 && relativeEchoPrev < SEGMENT_SIZE) {
if (previousMessages.get(relativeEcho)) {
LOG.warn("Echo manually reused when timed out: {}", Integer.toHexString(echo));
}
previousMessages.set(relativeEcho);
}
}

public int getEcho() {
return currentEcho + currentSegmentOffset;
}

public boolean isWaiting(int echo) {
int relativeEcho = echo - currentSegmentOffset;
if (relativeEcho >= 0 && relativeEcho < SEGMENT_SIZE) {
return sentMessages.get(echo);
}
int relativeEchoPrev = echo - previousSegmentOffset;
if (relativeEchoPrev >= 0 && relativeEchoPrev < SEGMENT_SIZE) {
return previousMessages.get(echo);
}
return false;
}

public void responseArrivedNormal(int echo) {
int relativeEcho = echo - currentSegmentOffset;
BitSet messagesTarget = sentMessages;
boolean count = true;
if (relativeEcho < 0 || relativeEcho >= SEGMENT_SIZE) {
relativeEcho = echo - previousSegmentOffset;
if (relativeEcho < 0 || relativeEcho >= SEGMENT_SIZE) {
return;
}
messagesTarget = previousMessages;
count = false;
}
if (messagesTarget.get(relativeEcho)) {
messagesTarget.clear(relativeEcho);
if (count) {
waitingCount--;
}
}
}

public void timeout(int echo) {
int relativeEcho = echo - currentSegmentOffset;
if (relativeEcho < 0 || relativeEcho >= SEGMENT_SIZE) {
// if in previous, no action required
return;
}
// if not a current message, no action needed
if (sentMessages.get(relativeEcho)) {
timeoutCount++;
if (timeoutCount >= SEGMENT_TIMEOUTS_BEFORE_CHANGE) {
long time = System.nanoTime();
if (time - lastSegmentChangeTimeNanos < minSegmentChangeTimeNanos) {
LOG.error("Connection timing out too much.");
} else {
LOG.info("Lingering timeout count: ({}). Changing echo value segment.", timeoutCount);
}
timeoutCount = 0;
waitingCount = 0;
currentEcho = 0;
previousMessages = sentMessages;
sentMessages = new BitSet(SEGMENT_SIZE);
previousSegmentOffset = currentSegmentOffset;
currentSegmentOffset += SEGMENT_SIZE;
if (currentSegmentOffset + SEGMENT_SIZE > 0x10000) {
currentSegmentOffset = SEGMENT_SIZE; // Skip the first segment, to leave space for manual echo
}
}
}
}

public boolean unmatchedReceive(int echo) {
int relativeEcho = echo - currentSegmentOffset;
BitSet messagesTarget = sentMessages;
boolean count = true;
if (relativeEcho < 0 || relativeEcho >= SEGMENT_SIZE) {
relativeEcho = echo - previousSegmentOffset;
if (relativeEcho < 0 || relativeEcho >= SEGMENT_SIZE) {
// go to the unmatched handler, as message is very old (or not one we're keeping track of)
return false;
}
messagesTarget = previousMessages;
count = false;
}
if (messagesTarget.get(relativeEcho)) {
messagesTarget.clear(relativeEcho);
if (count) {
timeoutCount--;
}
return true;
} else {
// goes to the unmatched handler
return false;
}
}

}
Loading

0 comments on commit 4e5941d

Please sign in to comment.