Skip to content

Commit

Permalink
Fixing receiver notification logic and setting error code where needed
Browse files Browse the repository at this point in the history
Summary: Fixing receiver notification logic and setting error code where needed

Reviewed By: nikunjy, ldemailly

Differential Revision: D2585049

fb-gh-sync-id: 54b5c278fde95d7c81283ec14c711c216d178e87
  • Loading branch information
uddipta authored and ldemailly committed Oct 28, 2015
1 parent f940090 commit 27ed573
Show file tree
Hide file tree
Showing 3 changed files with 5 additions and 4 deletions.
7 changes: 5 additions & 2 deletions ReceiverThread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -747,6 +747,7 @@ ReceiverState ReceiverThread::sendDoneCmd() {
if (socket_.write(buf_, 1) != 1) {
PLOG(ERROR) << "unable to send DONE " << threadIndex_;
doneSendFailure_ = true;
threadStats_.setErrorCode(SOCKET_WRITE_ERROR);
return ACCEPT_WITH_TIMEOUT;
}

Expand All @@ -756,13 +757,15 @@ ReceiverState ReceiverThread::sendDoneCmd() {
if (read != 1 || buf_[0] != Protocol::DONE_CMD) {
LOG(ERROR) << *this << " did not receive ack for DONE";
doneSendFailure_ = true;
threadStats_.setErrorCode(SOCKET_READ_ERROR);
return ACCEPT_WITH_TIMEOUT;
}

read = socket_.read(buf_, Protocol::kMinBufLength);
if (read != 0) {
LOG(ERROR) << *this << " EOF not found where expected";
doneSendFailure_ = true;
threadStats_.setErrorCode(SOCKET_READ_ERROR);
return ACCEPT_WITH_TIMEOUT;
}
socket_.closeCurrentConnection();
Expand All @@ -781,7 +784,7 @@ ReceiverState ReceiverThread::finishWithError() {
auto guard = cv->acquire();
wdtParent_->addCheckpoint(checkpoint_);
controller_->markState(threadIndex_, FINISHED);
// guard deletion notifies one thread
guard.notifyOne();
return END;
}

Expand Down Expand Up @@ -813,7 +816,7 @@ ReceiverState ReceiverThread::waitForFinishOrNewCheckpoint() {
auto guard = cv->acquire();
auto state = checkForFinishOrNewCheckpoints();
if (state != WAIT_FOR_FINISH_OR_NEW_CHECKPOINT) {
// guard automatically notfies one
guard.notifyOne();
return state;
}
START_PERF_TIMER
Expand Down
1 change: 0 additions & 1 deletion SenderThread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,6 @@ SenderState SenderThread::processDoneCmd() {

SenderState SenderThread::processWaitCmd() {
LOG(INFO) << *this << " entered PROCESS_WAIT_CMD state ";
;
ThreadTransferHistory &transferHistory = getTransferHistory();
VLOG(1) << "received WAIT_CMD, port " << port_;
transferHistory.markAllAcknowledged();
Expand Down
1 change: 0 additions & 1 deletion util/ThreadsController.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ ConditionGuardImpl::~ConditionGuardImpl() {
if (lock_ != nullptr) {
delete lock_;
}
cv_.notify_one();
}

ConditionGuardImpl::ConditionGuardImpl(mutex &guardMutex,
Expand Down

0 comments on commit 27ed573

Please sign in to comment.