Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

very basic work-stealing #629

Merged
merged 3 commits into from
Dec 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions io/epoll-ng.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,7 @@ class EventEngineEPollNG : public MasterEventEngine,
Event event{fd, interests | ONE_SHOT, &waiter};
int ret = add_interest(event);
if (ret < 0) LOG_ERROR_RETURN(0, -1, "failed to add event interest");
SCOPED_PAUSE_WORK_STEALING;
ret = thread_usleep(timeout);
ERRNO err;
if (ret == -1 && err.no == EOK) {
Expand Down
1 change: 1 addition & 0 deletions io/epoll.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,7 @@ ok: entry.interests |= eint;
return rm_interest({fd, EVENT_RWE| ONE_SHOT, 0}); // remove fd from epoll
int ret = add_interest({fd, interest | ONE_SHOT, CURRENT});
if (ret < 0) LOG_ERROR_RETURN(0, -1, "failed to add event interest");
SCOPED_PAUSE_WORK_STEALING;
ret = thread_usleep(timeout);
ERRNO err;
if (ret == -1 && err.no == EOK) {
Expand Down
1 change: 1 addition & 0 deletions io/fstack-dpdk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ class FstackDpdkEngine : public MasterEventEngine, public CascadingEventEngine,
int wait_for_fd(int fd, uint32_t interests, Timeout timeout) override {
short ev = (interests == EVENT_READ) ? EVFILT_READ : EVFILT_WRITE;
enqueue(fd, ev, EV_ADD | EV_ONESHOT, 0, CURRENT);
SCOPED_PAUSE_WORK_STEALING;
int ret = thread_usleep(timeout);
ERRNO err;
if (ret == -1 && err.no == EOK) {
Expand Down
1 change: 1 addition & 0 deletions io/iouring-wrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ class iouringEngine : public MasterEventEngine, public CascadingEventEngine, pub
io_uring_sqe_set_data(sqe, &timer_ctx);
}

SCOPED_PAUSE_WORK_STEALING;
photon::thread_sleep(-1);

if (likely(errno == EOK)) {
Expand Down
2 changes: 2 additions & 0 deletions io/kqueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ limitations under the License.
#include <vector>
#include <sys/event.h>
#include <photon/common/alog.h>
#include <photon/thread/thread.h>
#include "events_map.h"
#include "reset_handle.h"

Expand Down Expand Up @@ -107,6 +108,7 @@ class KQueue : public MasterEventEngine, public CascadingEventEngine, public Res
short ev = (interests == EVENT_READ) ? EVFILT_READ : EVFILT_WRITE;
int ret = enqueue(fd, ev, EV_ADD | EV_ONESHOT, 0, current);
if (ret < 0) return ret;
SCOPED_PAUSE_WORK_STEALING;
ret = thread_usleep(timeout);
ERRNO err;
if (likely(ret == -1 && err.no == EOK)) {
Expand Down
33 changes: 32 additions & 1 deletion thread/test/test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1733,6 +1733,37 @@ static void* promise_worker(void* arg) {
return 0;
}

static void* ws_basic(void* arg) {
auto& stolen = *(bool*)arg;
EXPECT_FALSE(stolen);
LOG_INFO("this work (thread) has been stolen");
stolen = true;
return nullptr;
}

TEST(WorkStealing, basic) {
bool running = true, stolen = false;
std::thread vcpu([&](){
vcpu_init(VCPU_ENABLE_ACTIVE_WORK_STEALING);
while(running)
thread_usleep(1000);
DEFER(vcpu_fini());
});
auto th = thread_create(&ws_basic, &stolen, 0, 0,
THREAD_ENABLE_WORK_STEALING | THREAD_JOINABLE);
thread_pause_work_stealing(true, th);
::usleep(1000 * 10); // emulate a busy work of 10ms
EXPECT_FALSE(stolen);

thread_pause_work_stealing(false, th);
::usleep(1000 * 10); // emulate a busy work of 10ms
EXPECT_TRUE(stolen);

thread_join((join_handle*)th);
running = false;
vcpu.join();
}

TEST(future, test1) {
Future<int> fut;
auto th = thread_create(promise_worker, &fut);
Expand Down Expand Up @@ -1768,7 +1799,7 @@ int main(int argc, char** arg)
::testing::InitGoogleTest(&argc, arg);
gflags::ParseCommandLineFlags(&argc, &arg, true);
default_audit_logger.log_output = log_output_stdout;
photon::vcpu_init();
photon::vcpu_init(VCPU_ENABLE_PASSIVE_WORK_STEALING);
set_log_output_level(ALOG_INFO);

if (FLAGS_vcpus <= 1)
Expand Down
Loading
Loading