Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix error handling in the event loop. #9990

Merged
merged 6 commits into from
Jan 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
162 changes: 97 additions & 65 deletions src/collective/loop.cc
Original file line number Diff line number Diff line change
@@ -1,11 +1,19 @@
/**
* Copyright 2023, XGBoost Contributors
* Copyright 2023-2024, XGBoost Contributors
*/
#include "loop.h"

#include <queue> // for queue
#include <cstddef> // for size_t
#include <cstdint> // for int32_t
#include <exception> // for exception, current_exception, rethrow_exception
#include <mutex> // for lock_guard, unique_lock
#include <queue> // for queue
#include <string> // for string
#include <thread> // for thread
#include <utility> // for move

#include "rabit/internal/socket.h" // for PollHelper
#include "xgboost/collective/result.h" // for Fail, Success
#include "xgboost/collective/socket.h" // for FailWithCode
#include "xgboost/logging.h" // for CHECK

Expand Down Expand Up @@ -109,62 +117,94 @@ Result Loop::EmptyQueue(std::queue<Op>* p_queue) const {
}

void Loop::Process() {
// consumer
auto set_rc = [this](Result&& rc) {
std::lock_guard lock{rc_lock_};
rc_ = std::forward<Result>(rc);
};

// This loop cannot exit unless `stop_` is set to true. There must always be a thread to
// answer the blocking call even if there are errors, otherwise the blocking will wait
// forever.
while (true) {
std::unique_lock lock{mu_};
cv_.wait(lock, [this] { return !this->queue_.empty() || stop_; });
if (stop_) {
break;
}
try {
std::unique_lock lock{mu_};
cv_.wait(lock, [this] { return !this->queue_.empty() || stop_; });
if (stop_) {
break; // only point where this loop can exit.
}

// Move the global queue into a local variable to unblock it.
std::queue<Op> qcopy;

bool is_blocking = false;
while (!queue_.empty()) {
auto op = queue_.front();
queue_.pop();
if (op.code == Op::kBlock) {
is_blocking = true;
// Block must be the last op in the current batch since no further submit can be
// issued until the blocking call is finished.
CHECK(queue_.empty());
} else {
qcopy.push(op);
}
}

auto unlock_notify = [&](bool is_blocking, bool stop) {
if (!is_blocking) {
std::lock_guard guard{mu_};
stop_ = stop;
} else {
stop_ = stop;
// Unblock, we can write to the global queue again.
lock.unlock();
}
cv_.notify_one();
};

// move the queue
std::queue<Op> qcopy;
bool is_blocking = false;
while (!queue_.empty()) {
auto op = queue_.front();
queue_.pop();
if (op.code == Op::kBlock) {
is_blocking = true;

// Clear the local queue, this is blocking the current worker thread (but not the
// client thread), wait until all operations are finished.
auto rc = this->EmptyQueue(&qcopy);

if (is_blocking) {
// The unlock is delayed if this is a blocking call
lock.unlock();
}

// Notify the client thread who called block after all error conditions are set.
auto notify_if_block = [&] {
if (is_blocking) {
std::unique_lock lock{mu_};
block_done_ = true;
lock.unlock();
block_cv_.notify_one();
}
};

// Handle error
if (!rc.OK()) {
set_rc(std::move(rc));
} else {
qcopy.push(op);
CHECK(qcopy.empty());
}
}
// unblock the queue
if (!is_blocking) {
lock.unlock();
}
// clear the queue
auto rc = this->EmptyQueue(&qcopy);
// Handle error
if (!rc.OK()) {
unlock_notify(is_blocking, true);
std::lock_guard<std::mutex> guard{rc_lock_};
this->rc_ = std::move(rc);
return;
}

CHECK(qcopy.empty());
unlock_notify(is_blocking, false);
notify_if_block();
} catch (std::exception const& e) {
curr_exce_ = std::current_exception();
set_rc(Fail("Exception inside the event loop:" + std::string{e.what()}));
} catch (...) {
curr_exce_ = std::current_exception();
set_rc(Fail("Unknown exception inside the event loop."));
}
}
}

Result Loop::Stop() {
// Finish all remaining tasks
CHECK_EQ(this->Block().OK(), this->rc_.OK());

// Notify the loop to stop
std::unique_lock lock{mu_};
stop_ = true;
lock.unlock();
this->cv_.notify_one();

CHECK_EQ(this->Block().OK(), this->rc_.OK());
if (this->worker_.joinable()) {
this->worker_.join();
}

if (curr_exce_) {
std::rethrow_exception(curr_exce_);
Expand All @@ -175,44 +215,36 @@ Result Loop::Stop() {

[[nodiscard]] Result Loop::Block() {
{
// Check whether the last op was successful, stop if not.
std::lock_guard<std::mutex> guard{rc_lock_};
if (!rc_.OK()) {
return std::move(rc_);
stop_ = true;
}
}

if (!this->worker_.joinable()) {
std::lock_guard<std::mutex> guard{rc_lock_};
return Fail("Worker has stopped.", std::move(rc_));
}

this->Submit(Op{Op::kBlock});

{
// Wait for the block call to finish.
std::unique_lock lock{mu_};
cv_.wait(lock, [this] { return (this->queue_.empty()) || stop_; });
block_cv_.wait(lock, [this] { return block_done_ || stop_; });
block_done_ = false;
}

{
// Transfer the rc.
std::lock_guard<std::mutex> lock{rc_lock_};
return std::move(rc_);
}
}

Loop::Loop(std::chrono::seconds timeout) : timeout_{timeout} {
timer_.Init(__func__);
worker_ = std::thread{[this] {
try {
this->Process();
} catch (std::exception const& e) {
std::lock_guard<std::mutex> guard{mu_};
if (!curr_exce_) {
curr_exce_ = std::current_exception();
rc_ = Fail("Exception was thrown");
}
stop_ = true;
cv_.notify_all();
} catch (...) {
std::lock_guard<std::mutex> guard{mu_};
if (!curr_exce_) {
curr_exce_ = std::current_exception();
rc_ = Fail("Exception was thrown");
}
stop_ = true;
cv_.notify_all();
}
}};
worker_ = std::thread{[this] { this->Process(); }};
}
} // namespace xgboost::collective
30 changes: 19 additions & 11 deletions src/collective/loop.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright 2023, XGBoost Contributors
* Copyright 2023-2024, XGBoost Contributors
*/
#pragma once
#include <chrono> // for seconds
Expand All @@ -10,7 +10,6 @@
#include <mutex> // for unique_lock, mutex
#include <queue> // for queue
#include <thread> // for thread
#include <utility> // for move

#include "../common/timer.h" // for Monitor
#include "xgboost/collective/result.h" // for Result
Expand All @@ -37,10 +36,15 @@ class Loop {
};

private:
std::thread worker_;
std::condition_variable cv_;
std::mutex mu_;
std::queue<Op> queue_;
std::thread worker_; // thread worker to execute the tasks

std::condition_variable cv_; // CV used to notify a new submit call
std::condition_variable block_cv_; // CV used to notify the blocking call
bool block_done_{false}; // Flag to indicate whether the blocking call has finished.

std::queue<Op> queue_; // event queue
std::mutex mu_; // mutex to protect the queue, cv, and block_done

std::chrono::seconds timeout_;

Result rc_;
Expand All @@ -51,29 +55,33 @@ class Loop {
common::Monitor mutable timer_;

Result EmptyQueue(std::queue<Op>* p_queue) const;
// The cunsumer function that runs inside a worker thread.
void Process();

public:
/**
* @brief Stop the worker thread.
*/
Result Stop();

void Submit(Op op) {
// producer
std::unique_lock lock{mu_};
queue_.push(op);
lock.unlock();
cv_.notify_one();
}

/**
* @brief Block the event loop until all ops are finished. In the case of failure, this
* loop should be not be used for new operations.
*/
[[nodiscard]] Result Block();

explicit Loop(std::chrono::seconds timeout);

~Loop() noexcept(false) {
// The worker will be joined in the stop function.
this->Stop();

if (worker_.joinable()) {
worker_.join();
}
}
};
} // namespace xgboost::collective
Loading