diff --git a/include/dpp/socketengine.h b/include/dpp/socketengine.h index 96c5c4216e..9c3be2ac47 100644 --- a/include/dpp/socketengine.h +++ b/include/dpp/socketengine.h @@ -29,48 +29,212 @@ namespace dpp { +/** + * @brief Types of IO events a socket may subscribe to. + */ enum socket_event_flags : uint8_t { + /** + * @brief Socket wants to receive events when it can be read from. + * This is provided by the underlying implementation. + */ WANT_READ = 1, + /** + * @brief Socket wants to receive events when it can be written to. + * This is provided by the underlying implementation, and will be + * a one-off event. If you want to receive ongoing write events you + * must re-request this event type each time. + */ WANT_WRITE = 2, + /** + * @brief Socket wants to receive events that indicate an error condition. + * Note that EOF (graceful close) is not an error condition and is indicated + * by errno being 0 and ::read() returning 0. + */ WANT_ERROR = 4, + /** + * @brief Socket should be removed as soon as is safe to do so. Generally, this is + * after the current iteration through the active event list. + */ + WANT_DELETION = 8, }; +/** + * @brief Read ready event + */ using socket_read_event = std::function; + +/** + * @brief Write ready event + */ using socket_write_event = std::function; + +/** + * @brief Error event + */ using socket_error_event = std::function; +/** + * @brief Represents an active socket event set in the socket engine. + * + * An event set contains a file descriptor, a set of event handler callbacks, and + * a set of bitmask flags which indicate which events it wants to receive. + * It is possible to quickly toggle event types on or off, as it is not always necessary + * or desired to receive all events all the time, in fact doing so can cause an event + * storm which will consume 100% CPU (e.g. if you request to receive write events all + * the time). + */ struct socket_events { + /** + * @brief File descriptor + * + * This should be a valid file descriptor created via ::socket(). + */ dpp::socket fd{INVALID_SOCKET}; + + /** + * @brief Flag bit mask of values from dpp::socket_event_flags + */ uint8_t flags{0}; + + /** + * @brief Read ready event + * @note This function will be called from a different thread to that + * which adds the event set to the socket engine. + */ socket_read_event on_read{}; + + /** + * @brief Write ready event + * @note This function will be called from a different thread to that + * which adds the event set to the socket engine. + */ socket_write_event on_write{}; + + /** + * @brief Error event + * @note This function will be called from a different thread to that + * which adds the event set to the socket engine. + */ socket_error_event on_error{}; + + /** + * @brief Construct a new socket_events + * @param socket_fd file descriptor + * @param _flags initial flags bitmask + * @param read_event read ready event + * @param write_event write ready event + * @param error_event error event + */ socket_events(dpp::socket socket_fd, uint8_t _flags, const socket_read_event& read_event, const socket_write_event& write_event = {}, const socket_error_event& error_event = {}) : fd(socket_fd), flags(_flags), on_read(read_event), on_write(write_event), on_error(error_event) { } }; +/** + * @brief Container of event sets keyed by socket file descriptor + */ using socket_container = std::unordered_map>; +/** + * @brief This is the base class for socket engines. + * The actual implementation is OS specific and the correct implementation is detected by + * CMake. It is then compiled specifically into DPP so only one implementation can exist + * in the implementation. All implementations should behave identically to the user, abstracting + * out implementation-specific behaviours (e.g. difference between edge and level triggered + * event mechanisms etc). + */ struct socket_engine_base { + /** + * @brief File descriptors, and their states + */ socket_container fds; + + /** + * @brief Thread pool. + * Event calls go into the thread pool and are called as + * and when threads in the pool are available. + */ std::unique_ptr pool; + /** + * @brief Number of file descriptors we are waiting to delete + */ + size_t to_delete_count{0}; + + /** + * @brief Default constructor + */ socket_engine_base(); + + /** + * @brief Non-copyable + */ socket_engine_base(const socket_engine_base&) = delete; + + /** + * @brief Non-copyable + */ socket_engine_base(socket_engine_base&&) = delete; + + /** + * @brief Non-movable + */ socket_engine_base& operator=(const socket_engine_base&) = delete; + + /** + * @brief Non-movable + */ socket_engine_base& operator=(socket_engine_base&&) = delete; + /** + * @brief Default destructor + */ virtual ~socket_engine_base() = default; + /** + * @brief Should be called repeatedly in a loop. + * Will run for a maximum of 1 second. + */ virtual void process_events() = 0; + + /** + * @brief Register a new socket with the socket engine + * @param e Socket events + * @return true if socket was added + */ virtual bool register_socket(const socket_events& e); + + /** + * @brief Update an existing socket in the socket engine + * @param e Socket events + * @return true if socket was updated + */ virtual bool update_socket(const socket_events& e); + + /** + * @brief Delete a socket from the socket engine + * @note This will not remove the socket immediately. It will set the + * WANT_DELETION flag causing it to be removed as soon as is safe to do so + * (once all events associated with it are completed). + * @param e File descriptor + * @return true if socket was queued for deletion + */ + bool delete_socket(dpp::socket fd); + + /** + * @brief Iterate through the list of sockets and remove any + * with WANT_DELETION set. This will also call implementation-specific + * remove_socket() on each entry to be removed. + */ + void prune(); +protected: + virtual bool remove_socket(dpp::socket fd); }; -/* This is implemented by whatever derived form socket_engine takes */ +/** + * @brief This is implemented by whatever derived form socket_engine takes + */ std::unique_ptr create_socket_engine(); #ifndef _WIN32 diff --git a/src/dpp/socketengine.cpp b/src/dpp/socketengine.cpp index ed632f3522..ef16d12e3b 100644 --- a/src/dpp/socketengine.cpp +++ b/src/dpp/socketengine.cpp @@ -24,6 +24,7 @@ #include #include #include +#include namespace dpp { @@ -44,15 +45,6 @@ bool socket_engine_base::update_socket(const socket_events &e) { return false; } -bool socket_engine_base::remove_socket(dpp::socket fd) { - auto iter = fds.find(fd); - if (iter != fds.end()) { - fds.erase(iter); - return true; - } - return false; -} - socket_engine_base::socket_engine_base() { #ifndef WIN32 set_signal_handler(SIGALRM); @@ -70,4 +62,32 @@ socket_engine_base::socket_engine_base() { pool = std::make_unique(); } +void socket_engine_base::prune() { + if (to_delete_count > 0) { + for (auto it = fds.cbegin(); it != fds.cend();) { + if ((it->second->flags & WANT_DELETION) != 0L) { + remove_socket(it->second->fd); + it = fds.erase(it); + } else { + ++it; + } + } + to_delete_count = 0; + } +} + +bool socket_engine_base::delete_socket(dpp::socket fd) { + auto iter = fds.find(fd); + if (iter == fds.end() || ((iter->second->flags & WANT_DELETION) != 0L)) { + return false; + } + iter->second->flags |= WANT_DELETION; + to_delete_count++; + return true; +} + +bool socket_engine_base::remove_socket(dpp::socket fd) { + return false; +} + } diff --git a/src/dpp/socketengines/epoll.cpp b/src/dpp/socketengines/epoll.cpp index e98cd405ea..06ec5c4224 100644 --- a/src/dpp/socketengines/epoll.cpp +++ b/src/dpp/socketengines/epoll.cpp @@ -123,6 +123,7 @@ struct socket_engine_epoll : public socket_engine_base { }); } } + prune(); } bool register_socket(const socket_events& e) final { @@ -174,6 +175,8 @@ struct socket_engine_epoll : public socket_engine_base { return r; } +protected: + bool remove_socket(dpp::socket fd) final { bool r = socket_engine_base::remove_socket(fd); if (r) { diff --git a/src/dpp/socketengines/kqueue.cpp b/src/dpp/socketengines/kqueue.cpp index fe07206df4..13b841f2bd 100644 --- a/src/dpp/socketengines/kqueue.cpp +++ b/src/dpp/socketengines/kqueue.cpp @@ -150,6 +150,8 @@ struct socket_engine_kqueue : public socket_engine_base { return r; } +protected: + bool remove_socket(dpp::socket fd) final { bool r = socket_engine_base::remove_socket(fd); if (r) { diff --git a/src/dpp/socketengines/poll.cpp b/src/dpp/socketengines/poll.cpp index 11734eab15..82a02cf20a 100644 --- a/src/dpp/socketengines/poll.cpp +++ b/src/dpp/socketengines/poll.cpp @@ -145,6 +145,8 @@ struct socket_engine_poll : public socket_engine_base { return r; } +protected: + bool remove_socket(dpp::socket fd) final { bool r = socket_engine_base::remove_socket(fd); if (r) { diff --git a/src/sockettest/socket.cpp b/src/sockettest/socket.cpp index 6177f389b9..8087f55b79 100644 --- a/src/sockettest/socket.cpp +++ b/src/sockettest/socket.cpp @@ -46,7 +46,7 @@ int main() { dpp::socket_events events( sfd, dpp::WANT_READ | dpp::WANT_WRITE | dpp::WANT_ERROR, - [](dpp::socket fd, const struct dpp::socket_events& e) { + [&se](dpp::socket fd, const struct dpp::socket_events& e) { int r = 0; do { char buf[128]{0}; @@ -54,12 +54,17 @@ int main() { if (r > 0) { buf[127] = 0; std::cout << buf; + std::cout.flush(); } } while (r > 0); + if (r == 0 || (errno && errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR)) { + dpp::close_socket(fd); + se->delete_socket(fd); + } }, [](dpp::socket fd, const struct dpp::socket_events& e) { std::cout << "WANT_WRITE event on socket " << fd << "\n"; - ::write(e.fd, "GET / HTTP/1.0\r\n\r\n", strlen("GET / HTTP/1.0\r\n\r\n")); + ::write(e.fd, "GET / HTTP/1.0\r\nConnection: close\r\n\r\n", strlen("GET / HTTP/1.0\r\nConnection: close\r\n\r\n")); }, [](dpp::socket fd, const struct dpp::socket_events&, int error_code) { std::cout << "WANT_ERROR event on socket " << fd << " with code " << error_code << "\n";