Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: socket engine stats functions #1353

Merged
merged 2 commits into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 78 additions & 19 deletions include/dpp/socketengine.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <cstdint>
#include <unordered_map>
#include <memory>
#include <string_view>
#include <functional>
#include <shared_mutex>
#include <dpp/thread_pool.h>
Expand Down Expand Up @@ -74,6 +75,51 @@ using socket_write_event = std::function<void(dpp::socket fd, const struct socke
*/
using socket_error_event = std::function<void(dpp::socket fd, const struct socket_events&, int error_code)>;

/**
* @brief Contains statistics about the IO loop
*/
struct DPP_EXPORT socket_stats {
/**
* @brief Number of reads since startup
*/
uint64_t reads{0};

/**
* @brief Number of writes since startup
*/
uint64_t writes{0};

/**
* @brief Number of errors since startup
*/
uint64_t errors{0};

/**
* @brief Number of updates to file descriptors
*/
uint64_t updates{0};

/**
* @brief Number of deletions of file descriptors
*/
uint64_t deletions{0};

/**
* @brief Number of loop iterations since startup
*/
uint64_t iterations{0};

/**
* @brief Number of currently active file descriptors
*/
uint64_t active_fds{0};

/**
* @brief Socket engine type
*/
std::string_view engine_type;
};

/**
* @brief Represents an active socket event set in the socket engine.
*
Expand Down Expand Up @@ -150,21 +196,6 @@ using socket_container = std::unordered_map<dpp::socket, std::unique_ptr<socket_
*/
struct DPP_EXPORT socket_engine_base {

/**
* @brief Mutex for fds
*/
std::shared_mutex fds_mutex;

/**
* @brief File descriptors, and their states
*/
socket_container fds;

/**
* @brief Number of file descriptors we are waiting to delete
*/
size_t to_delete_count{0};

/**
* @brief Owning cluster
*/
Expand Down Expand Up @@ -238,21 +269,49 @@ struct DPP_EXPORT socket_engine_base {
*/
void prune();

/**
* @brief Merge new flags in with the given file descriptor
* @param fd file descriptor
* @param extra_flags extra flags to add
*/
void inplace_modify_fd(dpp::socket fd, uint8_t extra_flags);

/**
* @brief Get statistics for socket engine
* @return socket stats
*/
const socket_stats& get_stats() const;

protected:

/**
* @brief Called by the prune() function to remove sockets when safe to do so.
* This is normally at the end or before an iteration of the event loop.
* @param fd File descriptor to remove
* @brief Mutex for fds
*/
virtual bool remove_socket(dpp::socket fd);
std::shared_mutex fds_mutex;

/**
* @brief File descriptors, and their states
*/
socket_container fds;

/**
* @brief Socket engine statistics
*/
socket_stats stats{};

/**
* @brief Find a file descriptors socket events
* @param fd file descriptor
* @return file descriptor or nullptr if doesn't exist
*/
socket_events* get_fd(dpp::socket fd);

/**
* @brief Called by the prune() function to remove sockets when safe to do so.
* This is normally at the end or before an iteration of the event loop.
* @param fd File descriptor to remove
*/
virtual bool remove_socket(dpp::socket fd);
};

/**
Expand Down
27 changes: 27 additions & 0 deletions src/dpp/socketengine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,14 @@ bool socket_engine_base::register_socket(const socket_events &e) {
auto i = fds.find(e.fd);
if (e.fd != INVALID_SOCKET && i == fds.end()) {
fds.emplace(e.fd, std::make_unique<socket_events>(e));
stats.active_fds++;
return true;
}
if (e.fd != INVALID_SOCKET && i != fds.end()) {
remove_socket(e.fd);
fds.erase(i);
fds.emplace(e.fd, std::make_unique<socket_events>(e));
stats.updates++;
return true;
}
return false;
Expand All @@ -51,6 +53,7 @@ bool socket_engine_base::update_socket(const socket_events &e) {
if (e.fd != INVALID_SOCKET && fds.find(e.fd) != fds.end()) {
auto iter = fds.find(e.fd);
*(iter->second) = e;
stats.updates++;
return true;
}
return false;
Expand Down Expand Up @@ -83,6 +86,23 @@ socket_events* socket_engine_base::get_fd(dpp::socket fd) {
return iter->second.get();
}

void socket_engine_base::inplace_modify_fd(dpp::socket fd, uint8_t extra_flags) {
bool should_modify;
socket_events s{};
{
std::lock_guard lk(fds_mutex);
auto i = fds.find(fd);
should_modify = i != fds.end() && (i->second->flags & extra_flags) != extra_flags;
if (should_modify) {
i->second->flags |= extra_flags;
s = *(i->second);
}
}
if (should_modify) {
update_socket(s);
}
}

void socket_engine_base::prune() {
if (time(nullptr) != last_time) {
try {
Expand All @@ -101,6 +121,7 @@ void socket_engine_base::prune() {

last_time = time(nullptr);
}
stats.iterations++;
}

bool socket_engine_base::delete_socket(dpp::socket fd) {
Expand All @@ -110,11 +131,17 @@ bool socket_engine_base::delete_socket(dpp::socket fd) {
return false;
}
iter->second->flags |= WANT_DELETION;
stats.deletions++;
stats.active_fds--;
return true;
}

bool socket_engine_base::remove_socket(dpp::socket fd) {
return true;
}

const socket_stats& socket_engine_base::get_stats() const {
return stats;
}

}
8 changes: 7 additions & 1 deletion src/dpp/socketengines/epoll.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ struct DPP_EXPORT socket_engine_epoll : public socket_engine_base {

int epoll_handle{INVALID_SOCKET};
static constexpr size_t MAX_EVENTS = 65536;
std::array<struct epoll_event, MAX_EVENTS> events;
std::array<struct epoll_event, MAX_EVENTS> events{};

socket_engine_epoll(const socket_engine_epoll&) = delete;
socket_engine_epoll(socket_engine_epoll&&) = delete;
Expand All @@ -65,6 +65,7 @@ struct DPP_EXPORT socket_engine_epoll : public socket_engine_base {
if (epoll_handle == -1) {
throw dpp::connection_exception("Failed to initialise epoll()");
}
stats.engine_type = "epoll";
}

~socket_engine_epoll() override {
Expand All @@ -89,13 +90,15 @@ struct DPP_EXPORT socket_engine_epoll : public socket_engine_base {
if ((eh->flags & WANT_DELETION) == 0L) try {

if ((ev.events & EPOLLHUP) != 0U) {
stats.errors++;
if (eh->on_error) {
eh->on_error(fd, *eh, EPIPE);
}
continue;
}

if ((ev.events & EPOLLERR) != 0U) {
stats.errors++;
socklen_t codesize = sizeof(int);
int errcode{};
if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &errcode, &codesize) < 0) {
Expand All @@ -111,18 +114,21 @@ struct DPP_EXPORT socket_engine_epoll : public socket_engine_base {
/* Should we have a flag to allow keeping WANT_WRITE? Maybe like WANT_WRITE_ONCE or GREEDY_WANT_WRITE, eh */
eh->flags = modify_event(epoll_handle, eh, eh->flags & ~WANT_WRITE);
if (eh->on_write) {
stats.writes++;
eh->on_write(fd, *eh);
}
}

if ((ev.events & EPOLLIN) != 0U) {
if (eh->on_read) {
stats.reads++;
eh->on_read(fd, *eh);
}
}

} catch (const std::exception& e) {
owner->log(ll_trace, "Socket loop exception: " + std::string(e.what()));
stats.errors++;
eh->on_error(fd, *eh, 0);
}

Expand Down
5 changes: 5 additions & 0 deletions src/dpp/socketengines/kqueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ struct DPP_EXPORT socket_engine_kqueue : public socket_engine_base {
if (kqueue_handle == -1) {
throw dpp::connection_exception("Failed to initialise kqueue()");
}
stats.engine_type = "kqueue";
}

~socket_engine_kqueue() override {
Expand Down Expand Up @@ -78,6 +79,7 @@ struct DPP_EXPORT socket_engine_kqueue : public socket_engine_base {
if (kev.flags & EV_EOF || kev.flags & EV_ERROR) {
if (eh->on_error) {
eh->on_error(kev.ident, *eh, kev.fflags);
stats.errors++;
}
continue;
}
Expand All @@ -86,15 +88,18 @@ struct DPP_EXPORT socket_engine_kqueue : public socket_engine_base {
eh->flags &= ~bits_to_clr;
if (eh->on_write) {
eh->on_write(kev.ident, *eh);
stats.writes++;
}
}
else if (filter == EVFILT_READ) {
if (eh->on_read) {
eh->on_read(kev.ident, *eh);
stats.reads++;
}
}

} catch (const std::exception& e) {
stats.errors++;
owner->log(ll_trace, "Socket loop exception: " + std::string(e.what()));
eh->on_error(kev.ident, *eh, 0);
}
Expand Down
9 changes: 8 additions & 1 deletion src/dpp/socketengines/poll.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ struct DPP_EXPORT socket_engine_poll : public socket_engine_base {

if ((revents & POLLHUP) != 0) {
eh->on_error(fd, *eh, 0);
stats.errors++;
continue;
}

Expand All @@ -109,21 +110,25 @@ struct DPP_EXPORT socket_engine_poll : public socket_engine_base {
if (getsockopt(fd, SOL_SOCKET, SO_ERROR, (char *) &errcode, &codesize) < 0) {
errcode = errno;
}
stats.errors++;
eh->on_error(fd, *eh, errcode);
continue;
}

if ((revents & POLLIN) != 0) {
stats.reads++;
eh->on_read(fd, *eh);
}

if ((revents & POLLOUT) != 0) {
stats.writes++;
eh->flags &= ~WANT_WRITE;
update_socket(*eh);
eh->on_write(fd, *eh);
}

} catch (const std::exception &e) {
stats.errors++;
eh->on_error(fd, *eh, 0);
}

Expand Down Expand Up @@ -181,7 +186,9 @@ struct DPP_EXPORT socket_engine_poll : public socket_engine_base {
return r;
}

explicit socket_engine_poll(cluster* creator) : socket_engine_base(creator) { };
explicit socket_engine_poll(cluster* creator) : socket_engine_base(creator) {
stats.engine_type = "poll";
};

protected:

Expand Down
17 changes: 1 addition & 16 deletions src/dpp/wsclient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,22 +132,7 @@ void websocket_client::write(const std::string_view data, ws_opcode _opcode)
ssl_client::socket_write(data);
}

bool should_append_want_write = false;
socket_events *new_se = nullptr;
{
std::lock_guard lk(owner->socketengine->fds_mutex);
auto i = owner->socketengine->fds.find(sfd);

should_append_want_write = i != owner->socketengine->fds.end() && (i->second->flags & WANT_WRITE) != WANT_WRITE;
if (should_append_want_write) {
new_se = i->second.get();
new_se->flags |= WANT_WRITE;
}
}

if (should_append_want_write) {
owner->socketengine->update_socket(*new_se);
}
owner->socketengine->inplace_modify_fd(sfd, WANT_WRITE);
}

bool websocket_client::handle_buffer(std::string& buffer)
Expand Down
Loading