Skip to content

Commit

Permalink
thread_pause_work_stealing()
Browse files Browse the repository at this point in the history
  • Loading branch information
lihuiba committed Dec 2, 2024
1 parent 586c167 commit e07b13d
Show file tree
Hide file tree
Showing 8 changed files with 42 additions and 12 deletions.
1 change: 1 addition & 0 deletions io/epoll-ng.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,7 @@ class EventEngineEPollNG : public MasterEventEngine,
Event event{fd, interests | ONE_SHOT, &waiter};
int ret = add_interest(event);
if (ret < 0) LOG_ERROR_RETURN(0, -1, "failed to add event interest");
SCOPED_PAUSE_WORK_STEALING;
ret = thread_usleep(timeout);
ERRNO err;
if (ret == -1 && err.no == EOK) {
Expand Down
1 change: 1 addition & 0 deletions io/epoll.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,7 @@ ok: entry.interests |= eint;
return rm_interest({fd, EVENT_RWE| ONE_SHOT, 0}); // remove fd from epoll
int ret = add_interest({fd, interest | ONE_SHOT, CURRENT});
if (ret < 0) LOG_ERROR_RETURN(0, -1, "failed to add event interest");
SCOPED_PAUSE_WORK_STEALING;
ret = thread_usleep(timeout);
ERRNO err;
if (ret == -1 && err.no == EOK) {
Expand Down
1 change: 1 addition & 0 deletions io/fstack-dpdk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ class FstackDpdkEngine : public MasterEventEngine, public CascadingEventEngine,
int wait_for_fd(int fd, uint32_t interests, Timeout timeout) override {
short ev = (interests == EVENT_READ) ? EVFILT_READ : EVFILT_WRITE;
enqueue(fd, ev, EV_ADD | EV_ONESHOT, 0, CURRENT);
SCOPED_PAUSE_WORK_STEALING;
int ret = thread_usleep(timeout);
ERRNO err;
if (ret == -1 && err.no == EOK) {
Expand Down
1 change: 1 addition & 0 deletions io/iouring-wrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ class iouringEngine : public MasterEventEngine, public CascadingEventEngine, pub
io_uring_sqe_set_data(sqe, &timer_ctx);
}

SCOPED_PAUSE_WORK_STEALING;
photon::thread_sleep(-1);

if (likely(errno == EOK)) {
Expand Down
2 changes: 2 additions & 0 deletions io/kqueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ limitations under the License.
#include <vector>
#include <sys/event.h>
#include <photon/common/alog.h>
#include <photon/thread/thread.h>
#include "events_map.h"
#include "reset_handle.h"

Expand Down Expand Up @@ -107,6 +108,7 @@ class KQueue : public MasterEventEngine, public CascadingEventEngine, public Res
short ev = (interests == EVENT_READ) ? EVFILT_READ : EVFILT_WRITE;
int ret = enqueue(fd, ev, EV_ADD | EV_ONESHOT, 0, current);
if (ret < 0) return ret;
SCOPED_PAUSE_WORK_STEALING;
ret = thread_usleep(timeout);
ERRNO err;
if (likely(ret == -1 && err.no == EOK)) {
Expand Down
6 changes: 6 additions & 0 deletions thread/test/test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1751,8 +1751,14 @@ TEST(WorkStealing, basic) {
});
auto th = thread_create(&ws_basic, &stolen, 0, 0,
THREAD_ENABLE_WORK_STEALING | THREAD_JOINABLE);
thread_pause_work_stealing(true, th);
::usleep(1000 * 10); // emulate a busy work of 10ms
EXPECT_FALSE(stolen);

thread_pause_work_stealing(false, th);
::usleep(1000 * 10); // emulate a busy work of 10ms
EXPECT_TRUE(stolen);

thread_join((join_handle*)th);
running = false;
vcpu.join();
Expand Down
23 changes: 13 additions & 10 deletions thread/thread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ namespace photon
int idx = -1; /* index in the sleep queue array */
int error_number = 0;
thread_list* waitq = nullptr; /* the q if WAITING in a queue */
uint32_t flags = 0;
uint16_t state = states::READY;
spinlock lock, _; // Current usage of thread.lock:
// interrupt()
Expand All @@ -186,7 +187,6 @@ namespace photon
// thread_join()
// ScopedLockHead
// try_work_stealing()?
int flags = 0;
uint64_t ts_wakeup = 0; /* Wakeup time when thread is sleeping */
// offset 64B
union {
Expand All @@ -208,7 +208,8 @@ namespace photon
enum shift {
joinable = 0,
enable_work_stealing = 1,
shutting_down = 2, // the thread should cancel what is doing, and quit
pause_work_stealing = 2,
shutting_down = 3, // the thread should cancel what is doing, and quit
}; // current job ASAP; not allowed to sleep or block more
// than 10ms, otherwise -1 will be returned and errno == EPERM
bool is_bit(int i) { return flags & (1<<i); }
Expand All @@ -219,8 +220,8 @@ namespace photon
void set_joinable(bool flag = true) { set_bit(shift::joinable, flag); }
bool is_shutting_down() { return is_bit(shift::shutting_down); }
void set_shutting_down(bool flag = true) { set_bit(shift::shutting_down, flag); }
bool allow_work_stealing() { return is_bit(shift::enable_work_stealing); }

bool allow_work_stealing() { return is_bit(shift::enable_work_stealing) &&
!(flags & THREAD_PAUSE_WORK_STEALING); }
int set_error_number() {
if (likely(error_number)) {
errno = error_number;
Expand Down Expand Up @@ -296,6 +297,7 @@ namespace photon
#pragma GCC diagnostic ignored "-Winvalid-offsetof"
static_assert(offsetof(thread, vcpu) == offsetof(partial_thread, vcpu), "...");
static_assert(offsetof(thread, tls) == offsetof(partial_thread, tls), "...");
static_assert(offsetof(thread,flags) == offsetof(partial_thread,flags), "...");
#pragma GCC diagnostic pop

struct thread_list : public intrusive_list<thread>
Expand Down Expand Up @@ -657,12 +659,6 @@ namespace photon
}
};

#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Winvalid-offsetof"
static_assert(offsetof(thread, arg) == 0x40, "...");
static_assert(offsetof(thread, start) == 0x48, "...");
#pragma GCC diagnostic pop

inline void thread::dequeue_ready_atomic(states newstat)
{
assert("this is not in runq, and this->lock is locked");
Expand All @@ -687,6 +683,13 @@ namespace photon
to->get_vcpu()->switch_count++;
}

#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Winvalid-offsetof"
// the offsets are used in _photon_thread_stub() assembly code
static_assert(offsetof(thread, arg) == 0x40, "...");
static_assert(offsetof(thread, start) == 0x48, "...");
#pragma GCC diagnostic pop

#if defined(__x86_64__)
#if !defined(_WIN64)
asm(
Expand Down
19 changes: 17 additions & 2 deletions thread/thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ namespace photon
constexpr uint8_t VCPU_ENABLE_PASSIVE_WORK_STEALING = 2; // allow this vCPU to be stolen by other vCPUs
constexpr uint32_t THREAD_JOINABLE = 1; // allow this thread to be joined
constexpr uint32_t THREAD_ENABLE_WORK_STEALING = 2; // allow this thread to be stolen by other vCPUs
constexpr uint32_t THREAD_PAUSE_WORK_STEALING = 4; // temporarily pause work-stealing for a thread

int vcpu_init(uint64_t flags = 0);
int vcpu_fini();
Expand Down Expand Up @@ -149,12 +150,26 @@ namespace photon
// A helper struct in order to make some function calls inline.
// The memory layout of its first 4 fields is the same as the one of thread.
struct partial_thread {
uint64_t _, __;
uint64_t _[2];
volatile vcpu_base* vcpu;
uint64_t ___[5];
uint64_t __[3];
uint32_t flags, ___[3];
void* tls;
};

// this function doesn't affect whether WS is enabled or not for the thread
inline void thread_pause_work_stealing(bool flag, thread* th = CURRENT) {
auto& flags = ((partial_thread*)th)->flags;
if (flag) {
flags |= THREAD_PAUSE_WORK_STEALING;
} else {
flags &= ~THREAD_PAUSE_WORK_STEALING;
}
}
#define SCOPED_PAUSE_WORK_STEALING \
thread_pause_work_stealing(true); \
DEFER(thread_pause_work_stealing(false));

inline vcpu_base* get_vcpu(thread* th = CURRENT) {
auto vcpu = ((partial_thread*)th) -> vcpu;
return (vcpu_base*)vcpu;
Expand Down

0 comments on commit e07b13d

Please sign in to comment.