Skip to content

Commit

Permalink
document socket_engine_base
Browse files Browse the repository at this point in the history
  • Loading branch information
braindigitalis committed Nov 11, 2024
1 parent 22bcdda commit 8ce80b2
Show file tree
Hide file tree
Showing 6 changed files with 208 additions and 12 deletions.
166 changes: 165 additions & 1 deletion include/dpp/socketengine.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,48 +29,212 @@

namespace dpp {

/**
* @brief Types of IO events a socket may subscribe to.
*/
enum socket_event_flags : uint8_t {
/**
* @brief Socket wants to receive events when it can be read from.
* This is provided by the underlying implementation.
*/
WANT_READ = 1,
/**
* @brief Socket wants to receive events when it can be written to.
* This is provided by the underlying implementation, and will be
* a one-off event. If you want to receive ongoing write events you
* must re-request this event type each time.
*/
WANT_WRITE = 2,
/**
* @brief Socket wants to receive events that indicate an error condition.
* Note that EOF (graceful close) is not an error condition and is indicated
* by errno being 0 and ::read() returning 0.
*/
WANT_ERROR = 4,
/**
* @brief Socket should be removed as soon as is safe to do so. Generally, this is
* after the current iteration through the active event list.
*/
WANT_DELETION = 8,
};

/**
* @brief Read ready event
*/
using socket_read_event = std::function<void(dpp::socket fd, const struct socket_events&)>;

/**
* @brief Write ready event
*/
using socket_write_event = std::function<void(dpp::socket fd, const struct socket_events&)>;

/**
* @brief Error event
*/
using socket_error_event = std::function<void(dpp::socket fd, const struct socket_events&, int error_code)>;

/**
* @brief Represents an active socket event set in the socket engine.
*
* An event set contains a file descriptor, a set of event handler callbacks, and
* a set of bitmask flags which indicate which events it wants to receive.
* It is possible to quickly toggle event types on or off, as it is not always necessary
* or desired to receive all events all the time, in fact doing so can cause an event
* storm which will consume 100% CPU (e.g. if you request to receive write events all
* the time).
*/
struct socket_events {
/**
* @brief File descriptor
*
* This should be a valid file descriptor created via ::socket().
*/
dpp::socket fd{INVALID_SOCKET};

/**
* @brief Flag bit mask of values from dpp::socket_event_flags
*/
uint8_t flags{0};

/**
* @brief Read ready event
* @note This function will be called from a different thread to that
* which adds the event set to the socket engine.
*/
socket_read_event on_read{};

/**
* @brief Write ready event
* @note This function will be called from a different thread to that
* which adds the event set to the socket engine.
*/
socket_write_event on_write{};

/**
* @brief Error event
* @note This function will be called from a different thread to that
* which adds the event set to the socket engine.
*/
socket_error_event on_error{};

/**
* @brief Construct a new socket_events
* @param socket_fd file descriptor
* @param _flags initial flags bitmask
* @param read_event read ready event
* @param write_event write ready event
* @param error_event error event
*/
socket_events(dpp::socket socket_fd, uint8_t _flags, const socket_read_event& read_event, const socket_write_event& write_event = {}, const socket_error_event& error_event = {})
: fd(socket_fd), flags(_flags), on_read(read_event), on_write(write_event), on_error(error_event) { }

};

/**
* @brief Container of event sets keyed by socket file descriptor
*/
using socket_container = std::unordered_map<dpp::socket, std::unique_ptr<socket_events>>;

/**
* @brief This is the base class for socket engines.
* The actual implementation is OS specific and the correct implementation is detected by
* CMake. It is then compiled specifically into DPP so only one implementation can exist
* in the implementation. All implementations should behave identically to the user, abstracting
* out implementation-specific behaviours (e.g. difference between edge and level triggered
* event mechanisms etc).
*/
struct socket_engine_base {
/**
* @brief File descriptors, and their states
*/
socket_container fds;

/**
* @brief Thread pool.
* Event calls go into the thread pool and are called as
* and when threads in the pool are available.
*/
std::unique_ptr<thread_pool> pool;

/**
* @brief Number of file descriptors we are waiting to delete
*/
size_t to_delete_count{0};

/**
* @brief Default constructor
*/
socket_engine_base();

/**
* @brief Non-copyable
*/
socket_engine_base(const socket_engine_base&) = delete;

/**
* @brief Non-copyable
*/
socket_engine_base(socket_engine_base&&) = delete;

/**
* @brief Non-movable
*/
socket_engine_base& operator=(const socket_engine_base&) = delete;

/**
* @brief Non-movable
*/
socket_engine_base& operator=(socket_engine_base&&) = delete;

/**
* @brief Default destructor
*/
virtual ~socket_engine_base() = default;

/**
* @brief Should be called repeatedly in a loop.
* Will run for a maximum of 1 second.
*/
virtual void process_events() = 0;

/**
* @brief Register a new socket with the socket engine
* @param e Socket events
* @return true if socket was added
*/
virtual bool register_socket(const socket_events& e);

/**
* @brief Update an existing socket in the socket engine
* @param e Socket events
* @return true if socket was updated
*/
virtual bool update_socket(const socket_events& e);

/**
* @brief Delete a socket from the socket engine
* @note This will not remove the socket immediately. It will set the
* WANT_DELETION flag causing it to be removed as soon as is safe to do so
* (once all events associated with it are completed).
* @param e File descriptor
* @return true if socket was queued for deletion
*/
bool delete_socket(dpp::socket fd);

/**
* @brief Iterate through the list of sockets and remove any
* with WANT_DELETION set. This will also call implementation-specific
* remove_socket() on each entry to be removed.
*/
void prune();
protected:

virtual bool remove_socket(dpp::socket fd);
};

/* This is implemented by whatever derived form socket_engine takes */
/**
* @brief This is implemented by whatever derived form socket_engine takes
*/
std::unique_ptr<socket_engine_base> create_socket_engine();

#ifndef _WIN32
Expand Down
38 changes: 29 additions & 9 deletions src/dpp/socketengine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <csignal>
#include <memory>
#include <sslclient.h>
#include <iostream>

namespace dpp {

Expand All @@ -44,15 +45,6 @@ bool socket_engine_base::update_socket(const socket_events &e) {
return false;
}

bool socket_engine_base::remove_socket(dpp::socket fd) {
auto iter = fds.find(fd);
if (iter != fds.end()) {
fds.erase(iter);
return true;
}
return false;
}

socket_engine_base::socket_engine_base() {
#ifndef WIN32
set_signal_handler(SIGALRM);
Expand All @@ -70,4 +62,32 @@ socket_engine_base::socket_engine_base() {
pool = std::make_unique<thread_pool>();
}

void socket_engine_base::prune() {
if (to_delete_count > 0) {
for (auto it = fds.cbegin(); it != fds.cend();) {
if ((it->second->flags & WANT_DELETION) != 0L) {
remove_socket(it->second->fd);
it = fds.erase(it);
} else {
++it;
}
}
to_delete_count = 0;
}
}

bool socket_engine_base::delete_socket(dpp::socket fd) {
auto iter = fds.find(fd);
if (iter == fds.end() || ((iter->second->flags & WANT_DELETION) != 0L)) {
return false;
}
iter->second->flags |= WANT_DELETION;
to_delete_count++;
return true;
}

bool socket_engine_base::remove_socket(dpp::socket fd) {
return false;
}

}
3 changes: 3 additions & 0 deletions src/dpp/socketengines/epoll.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ struct socket_engine_epoll : public socket_engine_base {
});
}
}
prune();
}

bool register_socket(const socket_events& e) final {
Expand Down Expand Up @@ -174,6 +175,8 @@ struct socket_engine_epoll : public socket_engine_base {
return r;
}

protected:

bool remove_socket(dpp::socket fd) final {
bool r = socket_engine_base::remove_socket(fd);
if (r) {
Expand Down
2 changes: 2 additions & 0 deletions src/dpp/socketengines/kqueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,8 @@ struct socket_engine_kqueue : public socket_engine_base {
return r;
}

protected:

bool remove_socket(dpp::socket fd) final {
bool r = socket_engine_base::remove_socket(fd);
if (r) {
Expand Down
2 changes: 2 additions & 0 deletions src/dpp/socketengines/poll.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@ struct socket_engine_poll : public socket_engine_base {
return r;
}

protected:

bool remove_socket(dpp::socket fd) final {
bool r = socket_engine_base::remove_socket(fd);
if (r) {
Expand Down
9 changes: 7 additions & 2 deletions src/sockettest/socket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,20 +46,25 @@ int main() {
dpp::socket_events events(
sfd,
dpp::WANT_READ | dpp::WANT_WRITE | dpp::WANT_ERROR,
[](dpp::socket fd, const struct dpp::socket_events& e) {
[&se](dpp::socket fd, const struct dpp::socket_events& e) {
int r = 0;
do {
char buf[128]{0};
r = ::read(e.fd, buf, 127);
if (r > 0) {
buf[127] = 0;
std::cout << buf;
std::cout.flush();
}
} while (r > 0);
if (r == 0 || (errno && errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR)) {
dpp::close_socket(fd);
se->delete_socket(fd);
}
},
[](dpp::socket fd, const struct dpp::socket_events& e) {
std::cout << "WANT_WRITE event on socket " << fd << "\n";
::write(e.fd, "GET / HTTP/1.0\r\n\r\n", strlen("GET / HTTP/1.0\r\n\r\n"));
::write(e.fd, "GET / HTTP/1.0\r\nConnection: close\r\n\r\n", strlen("GET / HTTP/1.0\r\nConnection: close\r\n\r\n"));
},
[](dpp::socket fd, const struct dpp::socket_events&, int error_code) {
std::cout << "WANT_ERROR event on socket " << fd << " with code " << error_code << "\n";
Expand Down

0 comments on commit 8ce80b2

Please sign in to comment.