From 1c345ddabbd61b1ff8f052c7e1a70a9209ff8c39 Mon Sep 17 00:00:00 2001 From: thidas1290 Date: Fri, 25 Aug 2023 04:40:55 +0530 Subject: [PATCH 1/2] implemented heartbeat timer for LibUV example --- include/amqpcpp/libuv.h | 55 ++++++++++++++++++++++++++++++++++++++++- 1 file changed, 54 insertions(+), 1 deletion(-) diff --git a/include/amqpcpp/libuv.h b/include/amqpcpp/libuv.h index eebf2043..a81b1d8b 100644 --- a/include/amqpcpp/libuv.h +++ b/include/amqpcpp/libuv.h @@ -166,12 +166,30 @@ class LibUvHandler : public TcpHandler */ uv_loop_t *_loop; + /** + * timer + * @var uv_timer_t* + */ + uv_timer_t *_timer; + /** * All I/O watchers that are active, indexed by their filedescriptor * @var std::map */ std::map> _watchers; + /** + * timer callback + * @param handle Internal timer handle + */ + static void timer_cb(uv_timer_t* handle) + { + // retrieving the connection + TcpConnection* conn = static_cast(handle->data); + + // telliing the connection to send a heartbeat to the broker + conn->heartbeat(); + } /** * Method that is called by AMQP-CPP to register a filedescriptor for readability or writability @@ -205,6 +223,30 @@ class LibUvHandler : public TcpHandler } } + /** + * Method that is called when the server sends a heartbeat to the client + * @param connection The connection over which the heartbeat was received + * @param interval agreed interval by the broker + * @see ConnectionHandler::onHeartbeat + */ + uint16_t onNegotiate(TcpConnection *connection, uint16_t interval) override + { + if(interval < 60) interval = 60; + + // initialization of the timer + _timer = new uv_timer_t(); + uv_timer_init(_loop, _timer); + + // passing connection to be recieved at the callback + _timer->data = connection; + + // starting the timer with callback + uv_timer_start(_timer, timer_cb, 0, interval*1000); + + // returning the agreed heartbeat interval to the broker + return interval; + } + public: /** * Constructor @@ -215,7 +257,18 @@ class LibUvHandler : public TcpHandler /** * Destructor */ - virtual ~LibUvHandler() = default; + ~LibUvHandler() + { + // stopping the timer + uv_timer_stop(_timer); + + // closing the timer handle + uv_close(reinterpret_cast(_timer), [](uv_handle_t* handle){ + + // freeing the memory after callback + delete reinterpret_cast(handle); + }); + }; }; /** From 49fe76c49507b670fec838a9ddb021356e9c77d5 Mon Sep 17 00:00:00 2001 From: thidas1290 Date: Sun, 3 Sep 2023 00:55:56 +0530 Subject: [PATCH 2/2] updated heartbeat timer referencing LibEV example --- include/amqpcpp/libuv.h | 400 ++++++++++++++++++++++++++++++---------- 1 file changed, 306 insertions(+), 94 deletions(-) diff --git a/include/amqpcpp/libuv.h b/include/amqpcpp/libuv.h index a81b1d8b..87aeeea1 100644 --- a/include/amqpcpp/libuv.h +++ b/include/amqpcpp/libuv.h @@ -20,7 +20,7 @@ * Dependencies */ #include - +#include #include "amqpcpp/linux_tcp.h" /** @@ -34,6 +34,25 @@ namespace AMQP { class LibUvHandler : public TcpHandler { private: + /** + * Internal interface for the object that is being watched + */ + class Watchable + { + public: + /** + * The method that is called when a filedescriptor becomes active + * @param fd + * @param events + */ + virtual void onActive(int fd, int events) = 0; + + /** + * Method that is called when the timer expires + */ + virtual void onExpired() = 0; + }; + /** * Helper class that wraps a libev I/O watcher */ @@ -52,6 +71,12 @@ class LibUvHandler : public TcpHandler */ uv_poll_t *_poll; + + /** + * monitoring file descriptor + */ + int _fd; + /** * Callback method that is called by libuv when a filedescriptor becomes active * @param handle Internal poll handle @@ -61,23 +86,59 @@ class LibUvHandler : public TcpHandler static void callback(uv_poll_t *handle, int status, int events) { // retrieve the connection - TcpConnection *connection = static_cast(handle->data); + Watchable *obj = static_cast(handle->data); // tell the connection that its filedescriptor is active int fd = -1; uv_fileno(reinterpret_cast(handle), &fd); - connection->process(fd, uv_to_amqp_events(status, events)); + obj->onActive(fd, uv_to_amqp_events(status, events)); + + } + + + /** + * Convert event flags from UV format to AMQP format + */ + static int uv_to_amqp_events(int status, int events) + { + // if the socket is closed report both so we pick up the error + if (status != 0) + return AMQP::readable | AMQP::writable; + + // map read or write + int amqp_events = 0; + if (events & UV_READABLE) + amqp_events |= AMQP::readable; + if (events & UV_WRITABLE) + amqp_events |= AMQP::writable; + return amqp_events; } + /** + * Convert event flags from AMQP format to UV format + */ + static int amqp_to_uv_events(int events) + { + int uv_events = 0; + if (events & AMQP::readable) + uv_events |= UV_READABLE; + if (events & AMQP::writable) + uv_events |= UV_WRITABLE; + return uv_events; + } + + public: /** * Constructor * @param loop The current event loop - * @param connection The connection being watched + * @param obj The watchable object being watched * @param fd The filedescriptor being watched * @param events The events that should be monitored */ - Watcher(uv_loop_t *loop, TcpConnection *connection, int fd, int events) : _loop(loop) + Watcher(uv_loop_t *loop, Watchable *obj, int fd, int events) : + _loop(loop), + _fd(fd) { // create a new poll _poll = new uv_poll_t(); @@ -86,7 +147,7 @@ class LibUvHandler : public TcpHandler uv_poll_init(_loop, _poll, fd); // store the connection in the data "void*" - _poll->data = connection; + _poll->data = obj; // start the watcher uv_poll_start(_poll, amqp_to_uv_events(events), callback); @@ -125,38 +186,228 @@ class LibUvHandler : public TcpHandler uv_poll_start(_poll, amqp_to_uv_events(events), callback); } + /** + * Check if a filedescriptor is covered by the watcher + * @param fd + * @return bool + */ + bool contains (int fd) const { + return fd == _fd; + } + }; + + + class UvConnectionWrapper : private Watchable + { private: + uv_loop_t* _loop; + uv_timer_t _timer; + AMQP::TcpConnection* _conn; + std::list _watchers; + uint64_t _timeout; + uint64_t _expire; + uint64_t _next; + /** - * Convert event flags from UV format to AMQP format + * timer callback + * @param handle Internal timer handle */ - static int uv_to_amqp_events(int status, int events) + static void timer_cb(uv_timer_t* handle) { - // if the socket is closed report both so we pick up the error - if (status != 0) - return AMQP::readable | AMQP::writable; + // retrieving the connection + Watchable* obj = static_cast(handle->data); - // map read or write - int amqp_events = 0; - if (events & UV_READABLE) - amqp_events |= AMQP::readable; - if (events & UV_WRITABLE) - amqp_events |= AMQP::writable; - return amqp_events; + // telliing the connection to send a heartbeat to the broker + obj->onExpired(); + } + + bool timed() const + { + // if neither timers are set + return _expire > 0.0 || _next > 0.0; + } + + + virtual void onExpired() override + { + // get the current time + auto now = uv_now(_loop); + + // timer is no longer active, so the refcounter in the loop is restored + // uv_ref(_loop); + + // if the onNegotiate method was not yet called, and no heartbeat timeout was negotiated + if (_timeout == 0) + { + // this can happen in three situations: 1. a connect-timeout, 2. user space has + // told us that we're not interested in heartbeats, 3. rabbitmq does not want heartbeats, + // in either case we're no longer going to run further timers. + _next = _expire = 0.0; + + // if we have an initialized connection, user-space must have overridden the onNegotiate + // method, so we keep using the connection + if (_conn->initialized()) return; + + // this is a connection timeout, close the connection from our side too + return (void)_conn->close(true); + } + else if (now >= _expire) + { + // the server was inactive for a too long period of time, reset state + _next = _expire = 0.0; _timeout = 0; + + // close the connection because server was inactive (we close it with immediate effect, + // because it was inactive so we cannot trust it to respect the AMQP close handshake) + return (void)_conn->close(true); + } + else if (now >= _next) + { + // it's time for the next heartbeat + _conn->heartbeat(); + + // remember when we should send out the next one, so the next one should be + // sent only after _timout/2 seconds again _from now_ (no catching up) + _next = now + std::max(_timeout / 2, static_cast(1000)); + } + + // reset the timer to trigger again later + uv_timer_start(&_timer, timer_cb, std::min(_next, _expire) - now, 0.0); + + // and start it again + // ev_timer_start(_loop, &_timer); + + // and because the timer is running again, we restore the refcounter + uv_unref((uv_handle_t*)_loop); } /** - * Convert event flags from AMQP format to UV format + * Method that is called when a filedescriptor becomes active + * @param fd the filedescriptor that is active + * @param events the events that are active (readable/writable) */ - static int amqp_to_uv_events(int events) + virtual void onActive(int fd, int events) override { - int uv_events = 0; - if (events & AMQP::readable) - uv_events |= UV_READABLE; - if (events & AMQP::writable) - uv_events |= UV_WRITABLE; - return uv_events; + // if the server is readable, we have some extra time before it expires, the expire time + // is set to 1.5 * _timeout to close the connection when the third heartbeat is about to be sent + if (_timeout != 0 && (events & UV_READABLE)) _expire = uv_now(_loop) + _timeout * 1.5; + + // pass on to the connection + _conn->process(fd, events); + } + + uint64_t sec_to_ms(uint64_t sec) + { + if(sec > 0) return sec * 1000; + return 0; + } + + uint16_t ms_to_sec(uint64_t millis) + { + if(millis > 0) return millis / 1000; + return 0; } + + public: + UvConnectionWrapper(uv_loop_t* loop, AMQP::TcpConnection* connection, uint64_t timeout) : + _loop(loop), + _conn(connection), + _timeout(sec_to_ms(timeout)), + _next(0), + _expire(uv_now(_loop) + sec_to_ms(timeout)) + { + _timer.data = this; + uv_timer_init(_loop, &_timer); + uv_timer_start(&_timer, timer_cb, _timeout, 0.0); + // uv_unref((uv_handle_t*)_loop); + } + + UvConnectionWrapper(UvConnectionWrapper &&that) = delete; + UvConnectionWrapper(const UvConnectionWrapper &&that) = delete; + + virtual ~UvConnectionWrapper() + { + if(!timed()) return; + + // stop the timer + uv_timer_stop(&_timer); + + // restore loop refcount + // uv_ref((uv_handle_t*)_loop); + } + + bool contains(AMQP::TcpConnection* connection) const + { + return _conn == connection; + } + + + uint16_t start(uint16_t timeout) + { + // we now know for sure that the connection was set up + _timeout = sec_to_ms(timeout); + + // if heartbeats are disabled we do not have to set it + if (_timeout == 0) return 0; + + // calculate current time + auto now = uv_now(_loop); + + // we also know when the next heartbeat should be sent + _next = now + std::max(_timeout / 2, static_cast(1000)); + + // because the server has just sent us some data, we will update the expire time too + _expire = now + _timeout * 1.5; + + // stop the existing timer (we have to stop it and restart it, because ev_timer_set() + // on its own does not change the running timer) (note that we assume that the timer + // is already running and keeps on running, so no calls to ev_ref()/en_unref() here) + uv_timer_stop(&_timer); + + // find the earliest thing that expires + uv_timer_start(&_timer, timer_cb, std::min(_next, _expire) - now, 0.0); + + // and start it again + // ev_timer_start(_loop, &_timer); + + // expose the accepted interval + return ms_to_sec(_timeout); + } + + + /** + * Monitor a filedescriptor + * @param fd specific file discriptor + * @param events updated events + */ + void monitor(int fd, int events) + { + // should we remove? + if (events == 0) + { + // remove the io-watcher + _watchers.remove_if([fd](const Watcher &watcher) -> bool { + + // compare filedescriptors + return watcher.contains(fd); + }); + } + else + { + // look in the array for this filedescriptor + for (auto &watcher : _watchers) + { + // do we have a match? + if (watcher.contains(fd)) return watcher.events(events); + } + + // if not found we need to register a new filedescriptor + Watchable *watchable = this; + + // we should monitor a new filedescriptor + _watchers.emplace_back(_loop, watchable, fd, events); + } + } }; @@ -166,31 +417,26 @@ class LibUvHandler : public TcpHandler */ uv_loop_t *_loop; - /** - * timer - * @var uv_timer_t* - */ - uv_timer_t *_timer; - /** - * All I/O watchers that are active, indexed by their filedescriptor - * @var std::map - */ - std::map> _watchers; + std::list _wrappers; - /** - * timer callback - * @param handle Internal timer handle - */ - static void timer_cb(uv_timer_t* handle) + UvConnectionWrapper &lookup(TcpConnection *connection) { - // retrieving the connection - TcpConnection* conn = static_cast(handle->data); - - // telliing the connection to send a heartbeat to the broker - conn->heartbeat(); + // look for the appropriate connection + for (auto &wrapper : _wrappers) + { + // do we have a match? + if (wrapper.contains(connection)) return wrapper; + } + + // add to the wrappers + _wrappers.emplace_back(_loop, connection, 60); + + // done + return _wrappers.back(); } +protected: /** * Method that is called by AMQP-CPP to register a filedescriptor for readability or writability * @param connection The TCP connection object that is reporting @@ -199,28 +445,8 @@ class LibUvHandler : public TcpHandler */ virtual void monitor(TcpConnection *connection, int fd, int flags) override { - // do we already have this filedescriptor - auto iter = _watchers.find(fd); - - // was it found? - if (iter == _watchers.end()) - { - // we did not yet have this watcher - but that is ok if no filedescriptor was registered - if (flags == 0) return; - - // construct a new watcher, and put it in the map - _watchers[fd] = std::unique_ptr(new Watcher(_loop, connection, fd, flags)); - } - else if (flags == 0) - { - // the watcher does already exist, but we no longer have to watch this watcher - _watchers.erase(iter); - } - else - { - // change the events - iter->second->events(flags); - } + // lookup the appropriate wrapper and start monitoring + lookup(connection).monitor(fd, flags); } /** @@ -229,23 +455,20 @@ class LibUvHandler : public TcpHandler * @param interval agreed interval by the broker * @see ConnectionHandler::onHeartbeat */ - uint16_t onNegotiate(TcpConnection *connection, uint16_t interval) override + virtual uint16_t onNegotiate(TcpConnection *connection, uint16_t interval) override { - if(interval < 60) interval = 60; + // lookup the wrapper, and start the timer to check for activity and send heartbeats + return lookup(connection).start(interval); - // initialization of the timer - _timer = new uv_timer_t(); - uv_timer_init(_loop, _timer); + } - // passing connection to be recieved at the callback - _timer->data = connection; - - // starting the timer with callback - uv_timer_start(_timer, timer_cb, 0, interval*1000); - - // returning the agreed heartbeat interval to the broker - return interval; - } + virtual void onDetached(TcpConnection *connection) override + { + // remove from the array + _wrappers.remove_if([connection](const UvConnectionWrapper &wrapper) -> bool { + return wrapper.contains(connection); + }); + } public: /** @@ -257,18 +480,7 @@ class LibUvHandler : public TcpHandler /** * Destructor */ - ~LibUvHandler() - { - // stopping the timer - uv_timer_stop(_timer); - - // closing the timer handle - uv_close(reinterpret_cast(_timer), [](uv_handle_t* handle){ - - // freeing the memory after callback - delete reinterpret_cast(handle); - }); - }; + ~LibUvHandler() = default; }; /**