Skip to content

Commit

Permalink
improve threading
Browse files Browse the repository at this point in the history
  • Loading branch information
braindigitalis committed Nov 21, 2024
1 parent f80a975 commit e9ef7a7
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 23 deletions.
1 change: 0 additions & 1 deletion src/dpp/socketengine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ socket_engine_base::socket_engine_base(cluster* creator) : owner(creator) {
throw dpp::connection_exception(err_connect_failure, "WSAStartup failure");
}
#endif
//pool = std::make_unique<thread_pool>();
}

time_t last_time = time(nullptr);
Expand Down
22 changes: 12 additions & 10 deletions src/dpp/socketengines/epoll.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,15 @@ int modify_event(int epoll_handle, socket_events* eh, int new_events) {
struct DPP_EXPORT socket_engine_epoll : public socket_engine_base {

int epoll_handle{INVALID_SOCKET};
static const int epoll_hint = 128;
std::vector<struct epoll_event> events;
static constexpr size_t MAX_EVENTS = 65536;
std::array<struct epoll_event, MAX_EVENTS> events;

socket_engine_epoll(const socket_engine_epoll&) = delete;
socket_engine_epoll(socket_engine_epoll&&) = delete;
socket_engine_epoll& operator=(const socket_engine_epoll&) = delete;
socket_engine_epoll& operator=(socket_engine_epoll&&) = delete;

explicit socket_engine_epoll(cluster* creator) : socket_engine_base(creator), epoll_handle(epoll_create(socket_engine_epoll::epoll_hint)) {
events.resize(socket_engine_epoll::epoll_hint);
explicit socket_engine_epoll(cluster* creator) : socket_engine_base(creator), epoll_handle(epoll_create(MAX_EVENTS)) {
if (epoll_handle == -1) {
throw dpp::connection_exception("Failed to initialise epoll()");
}
Expand All @@ -75,7 +74,7 @@ struct DPP_EXPORT socket_engine_epoll : public socket_engine_base {

void process_events() final {
const int sleep_length = 1000;
int i = epoll_wait(epoll_handle, events.data(), static_cast<int>(events.size()), sleep_length);
int i = epoll_wait(epoll_handle, events.data(), MAX_EVENTS, sleep_length);

for (int j = 0; j < i; j++) {
epoll_event ev = events[j];
Expand Down Expand Up @@ -145,14 +144,14 @@ struct DPP_EXPORT socket_engine_epoll : public socket_engine_base {
if ((e.flags & WANT_ERROR) != 0) {
ev.events |= EPOLLERR;
}
ev.data.ptr = fds.find(e.fd)->second.get();
{
std::unique_lock lock(fds_mutex);
ev.data.ptr = fds.find(e.fd)->second.get();
}
int i = epoll_ctl(epoll_handle, EPOLL_CTL_ADD, e.fd, &ev);
if (i < 0) {
throw dpp::connection_exception("Failed to register socket to epoll_ctl()");
}
if (fds.size() * 2 > events.size()) {
events.resize(fds.size() * 2);
}
}
return r;
}
Expand All @@ -171,7 +170,10 @@ struct DPP_EXPORT socket_engine_epoll : public socket_engine_base {
if ((e.flags & WANT_ERROR) != 0) {
ev.events |= EPOLLERR;
}
ev.data.ptr = fds.find(e.fd)->second.get();
{
std::unique_lock lock(fds_mutex);
ev.data.ptr = fds.find(e.fd)->second.get();
}
int i = epoll_ctl(epoll_handle, EPOLL_CTL_MOD, e.fd, &ev);
if (i < 0) {
throw dpp::connection_exception("Failed to modify socket with epoll_ctl()");
Expand Down
24 changes: 13 additions & 11 deletions src/dpp/socketengines/kqueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <memory>
#include <vector>
#include <array>
#include <cstdint>
#include <sys/types.h>
#include <unistd.h>
#include <dpp/cluster.h>
Expand All @@ -33,8 +34,10 @@ namespace dpp {

struct DPP_EXPORT socket_engine_kqueue : public socket_engine_base {

static constexpr size_t MAX_SOCKET_VALUE = 65536;

int kqueue_handle{INVALID_SOCKET};
std::array<struct kevent, 65536> ke_list;
std::array<struct kevent, MAX_SOCKET_VALUE> ke_list;

socket_engine_kqueue(const socket_engine_kqueue&) = delete;
socket_engine_kqueue(socket_engine_kqueue&&) = delete;
Expand All @@ -57,7 +60,7 @@ struct DPP_EXPORT socket_engine_kqueue : public socket_engine_base {
struct timespec ts{};
ts.tv_sec = 1;

int i = kevent(kqueue_handle, NULL, 0, ke_list.data(), static_cast<int>(ke_list.size()), &ts);
int i = kevent(kqueue_handle, nullptr, 0, ke_list.data(), static_cast<int>(ke_list.size()), &ts);
if (i < 0) {
return;
}
Expand Down Expand Up @@ -101,15 +104,15 @@ struct DPP_EXPORT socket_engine_kqueue : public socket_engine_base {
bool register_socket(const socket_events& e) final {
bool r = socket_engine_base::register_socket(e);
if (r) {
struct kevent ke;
struct kevent ke{};
socket_events* se = fds.find(e.fd)->second.get();
if ((se->flags & WANT_READ) != 0) {
EV_SET(&ke, e.fd, EVFILT_READ, EV_ADD, 0, 0, static_cast<CAST_TYPE>(se));
kevent(kqueue_handle, &ke, 1, 0, 0, nullptr);
kevent(kqueue_handle, &ke, 1, nullptr, 0, nullptr);
}
if ((se->flags & WANT_WRITE) != 0) {
EV_SET(&ke, e.fd, EVFILT_WRITE, EV_ADD | EV_ONESHOT, 0, 0, static_cast<CAST_TYPE>(se));
kevent(kqueue_handle, &ke, 1, 0, 0, nullptr);
kevent(kqueue_handle, &ke, 1, nullptr, 0, nullptr);
}
}
return r;
Expand All @@ -118,15 +121,14 @@ struct DPP_EXPORT socket_engine_kqueue : public socket_engine_base {
bool update_socket(const socket_events& e) final {
bool r = socket_engine_base::update_socket(e);
if (r) {
socket_events* se = fds.find(e.fd)->second.get();
struct kevent ke;
struct kevent ke{};
if ((e.flags & WANT_READ) != 0) {
EV_SET(&ke, e.fd, EVFILT_READ, EV_ADD, 0, 0, static_cast<CAST_TYPE>(se));
kevent(kqueue_handle, &ke, 1, 0, 0, nullptr);
kevent(kqueue_handle, &ke, 1, nullptr, 0, nullptr);
}
if ((e.flags & WANT_WRITE) != 0) {
EV_SET(&ke, e.fd, EVFILT_WRITE, EV_ADD | EV_ONESHOT, 0, 0, static_cast<CAST_TYPE>(se));
kevent(kqueue_handle, &ke, 1, 0, 0, nullptr);
kevent(kqueue_handle, &ke, 1, nullptr, 0, nullptr);
}
}
return r;
Expand All @@ -139,9 +141,9 @@ struct DPP_EXPORT socket_engine_kqueue : public socket_engine_base {
if (r) {
struct kevent ke;
EV_SET(&ke, fd, EVFILT_WRITE, EV_DELETE, 0, 0, nullptr);
kevent(kqueue_handle, &ke, 1, 0, 0, nullptr);
kevent(kqueue_handle, &ke, 1, nullptr, 0, nullptr);
EV_SET(&ke, fd, EVFILT_READ, EV_DELETE, 0, 0, nullptr);
kevent(kqueue_handle, &ke, 1, 0, 0, nullptr);
kevent(kqueue_handle, &ke, 1, nullptr, 0, nullptr);
}
return r;
}
Expand Down

0 comments on commit e9ef7a7

Please sign in to comment.