Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RATIS-1968. Remove unreached else block of trySendDelayed #989

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ public void setFirstRequest() {
isFirst = true;
}

public long getCallId() {
return callId;
}

@Override
public long getSeqNum() {
return seqNum;
Expand Down Expand Up @@ -133,7 +137,7 @@ private OrderedAsync(RaftClientImpl client, RaftProperties properties) {
}

private void resetSlidingWindow(RaftClientRequest request) {
getSlidingWindow(request).resetFirstSeqNum();
getSlidingWindow(request).resetFirstSeqNum(request.getCallId());
}

private SlidingWindow.Client<PendingOrderedRequest, RaftClientReply> getSlidingWindow(RaftClientRequest request) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ public long getSeqNum() {
return seqNum;
}

public long getCallId() {
return -1;
}

@Override
public void setReply(DataStreamReply dataStreamReply) {
replyFuture.complete(dataStreamReply);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.List;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
Expand Down Expand Up @@ -56,6 +57,7 @@ interface Request<REPLY> {

interface ClientSideRequest<REPLY> extends Request<REPLY> {
void setFirstRequest();
long getCallId();
}

interface ServerSideRequest<REPLY> extends Request<REPLY> {
Expand Down Expand Up @@ -228,13 +230,14 @@ class Client<REQUEST extends ClientSideRequest<REPLY>, REPLY> {
private final RequestMap<REQUEST, REPLY> requests;
/** Delayed requests. */
private final DelayedRequests delayedRequests = new DelayedRequests();
private final ConcurrentHashMap<Long, Long> map = new ConcurrentHashMap<>();

/** The seqNum for the next new request. */
private long nextSeqNum = 1;
/** The seqNum of the first request. */
private long firstSeqNum = -1;
private volatile long firstSeqNum = -1;
/** Is the first request replied? */
private boolean firstReplied;
private volatile boolean firstReplied;
/** The exception, if there is any. */
private Throwable exception;

Expand Down Expand Up @@ -300,6 +303,7 @@ private boolean sendOrDelayRequest(REQUEST request, Consumer<REQUEST> sendMethod

if (firstReplied) {
// already received the reply for the first request, submit any request.
map.put(request.getCallId(), getFirstSeqNum());
sendMethod.accept(request);
return true;
}
Expand All @@ -309,6 +313,7 @@ private boolean sendOrDelayRequest(REQUEST request, Consumer<REQUEST> sendMethod
LOG.debug("{}: detect firstSubmitted {} in {}", requests.getName(), request, this);
firstSeqNum = seqNum;
request.setFirstRequest();
map.put(request.getCallId(), getFirstSeqNum());
sendMethod.accept(request);
return true;
}
Expand All @@ -333,7 +338,9 @@ public synchronized void retry(REQUEST request, Consumer<REQUEST> sendMethod) {
private void removeRepliedFromHead() {
for (final Iterator<REQUEST> i = requests.iterator(); i.hasNext(); i.remove()) {
final REQUEST r = i.next();
if (!r.hasReply()) {
if (r.hasReply()) {
map.remove(r.getCallId());
} else {
return;
}
}
Expand Down Expand Up @@ -374,10 +381,12 @@ private void trySendDelayed(Consumer<REQUEST> sendMethod) {
}

/** Reset the {@link #firstSeqNum} The stream has an error. */
public synchronized void resetFirstSeqNum() {
firstSeqNum = -1;
firstReplied = false;
LOG.debug("After resetFirstSeqNum: {}", this);
public synchronized void resetFirstSeqNum(long callId) {
if (callId == -1 || getFirstSeqNum() == map.get(callId)) {
firstSeqNum = -1;
firstReplied = false;
LOG.debug("After resetFirstSeqNum: {}", this);
}
}

/** Fail all requests starting from the given seqNum. */
Expand Down Expand Up @@ -409,6 +418,10 @@ private void alreadyClosed(REQUEST request, Throwable e) {
public synchronized boolean isFirst(long seqNum) {
return seqNum == (firstSeqNum != -1 ? firstSeqNum : requests.firstSeqNum());
}

public long getFirstSeqNum() {
return firstSeqNum != -1 ? firstSeqNum : requests.firstSeqNum();
}
}

/**
Expand Down
Loading