Skip to content

Commit

Permalink
very basic work-stealing (#629)
Browse files Browse the repository at this point in the history
very basic work-stealing
  • Loading branch information
lihuiba authored Dec 2, 2024
1 parent 383d10e commit 4857bc7
Show file tree
Hide file tree
Showing 8 changed files with 267 additions and 67 deletions.
1 change: 1 addition & 0 deletions io/epoll-ng.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,7 @@ class EventEngineEPollNG : public MasterEventEngine,
Event event{fd, interests | ONE_SHOT, &waiter};
int ret = add_interest(event);
if (ret < 0) LOG_ERROR_RETURN(0, -1, "failed to add event interest");
SCOPED_PAUSE_WORK_STEALING;
ret = thread_usleep(timeout);
ERRNO err;
if (ret == -1 && err.no == EOK) {
Expand Down
1 change: 1 addition & 0 deletions io/epoll.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,7 @@ ok: entry.interests |= eint;
return rm_interest({fd, EVENT_RWE| ONE_SHOT, 0}); // remove fd from epoll
int ret = add_interest({fd, interest | ONE_SHOT, CURRENT});
if (ret < 0) LOG_ERROR_RETURN(0, -1, "failed to add event interest");
SCOPED_PAUSE_WORK_STEALING;
ret = thread_usleep(timeout);
ERRNO err;
if (ret == -1 && err.no == EOK) {
Expand Down
1 change: 1 addition & 0 deletions io/fstack-dpdk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ class FstackDpdkEngine : public MasterEventEngine, public CascadingEventEngine,
int wait_for_fd(int fd, uint32_t interests, Timeout timeout) override {
short ev = (interests == EVENT_READ) ? EVFILT_READ : EVFILT_WRITE;
enqueue(fd, ev, EV_ADD | EV_ONESHOT, 0, CURRENT);
SCOPED_PAUSE_WORK_STEALING;
int ret = thread_usleep(timeout);
ERRNO err;
if (ret == -1 && err.no == EOK) {
Expand Down
1 change: 1 addition & 0 deletions io/iouring-wrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ class iouringEngine : public MasterEventEngine, public CascadingEventEngine, pub
io_uring_sqe_set_data(sqe, &timer_ctx);
}

SCOPED_PAUSE_WORK_STEALING;
photon::thread_sleep(-1);

if (likely(errno == EOK)) {
Expand Down
2 changes: 2 additions & 0 deletions io/kqueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ limitations under the License.
#include <vector>
#include <sys/event.h>
#include <photon/common/alog.h>
#include <photon/thread/thread.h>
#include "events_map.h"
#include "reset_handle.h"

Expand Down Expand Up @@ -107,6 +108,7 @@ class KQueue : public MasterEventEngine, public CascadingEventEngine, public Res
short ev = (interests == EVENT_READ) ? EVFILT_READ : EVFILT_WRITE;
int ret = enqueue(fd, ev, EV_ADD | EV_ONESHOT, 0, current);
if (ret < 0) return ret;
SCOPED_PAUSE_WORK_STEALING;
ret = thread_usleep(timeout);
ERRNO err;
if (likely(ret == -1 && err.no == EOK)) {
Expand Down
33 changes: 32 additions & 1 deletion thread/test/test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1733,6 +1733,37 @@ static void* promise_worker(void* arg) {
return 0;
}

static void* ws_basic(void* arg) {
auto& stolen = *(bool*)arg;
EXPECT_FALSE(stolen);
LOG_INFO("this work (thread) has been stolen");
stolen = true;
return nullptr;
}

TEST(WorkStealing, basic) {
bool running = true, stolen = false;
std::thread vcpu([&](){
vcpu_init(VCPU_ENABLE_ACTIVE_WORK_STEALING);
while(running)
thread_usleep(1000);
DEFER(vcpu_fini());
});
auto th = thread_create(&ws_basic, &stolen, 0, 0,
THREAD_ENABLE_WORK_STEALING | THREAD_JOINABLE);
thread_pause_work_stealing(true, th);
::usleep(1000 * 10); // emulate a busy work of 10ms
EXPECT_FALSE(stolen);

thread_pause_work_stealing(false, th);
::usleep(1000 * 10); // emulate a busy work of 10ms
EXPECT_TRUE(stolen);

thread_join((join_handle*)th);
running = false;
vcpu.join();
}

TEST(future, test1) {
Future<int> fut;
auto th = thread_create(promise_worker, &fut);
Expand Down Expand Up @@ -1768,7 +1799,7 @@ int main(int argc, char** arg)
::testing::InitGoogleTest(&argc, arg);
gflags::ParseCommandLineFlags(&argc, &arg, true);
default_audit_logger.log_output = log_output_stdout;
photon::vcpu_init();
photon::vcpu_init(VCPU_ENABLE_PASSIVE_WORK_STEALING);
set_log_output_level(ALOG_INFO);

if (FLAGS_vcpus <= 1)
Expand Down
Loading

0 comments on commit 4857bc7

Please sign in to comment.