diff --git a/src/linux_tcp/tcpconnection.cpp b/src/linux_tcp/tcpconnection.cpp index d7aaf30d..7d9e8cce 100644 --- a/src/linux_tcp/tcpconnection.cpp +++ b/src/linux_tcp/tcpconnection.cpp @@ -26,7 +26,7 @@ namespace AMQP { */ TcpConnection::TcpConnection(TcpHandler *handler, const Address &address) : _handler(handler), - _state(new TcpResolver(this, address.hostname(), address.port(), address.secure())), + _state(new TcpResolver(this, address.hostname(), address.port(), address.secure(), address.option("connectTimeout", 5))), _connection(this, address.login(), address.vhost()) { // tell the handler diff --git a/src/linux_tcp/tcpresolver.h b/src/linux_tcp/tcpresolver.h index 0d3bb618..6ad9f9a8 100644 --- a/src/linux_tcp/tcpresolver.h +++ b/src/linux_tcp/tcpresolver.h @@ -53,6 +53,12 @@ class TcpResolver : public TcpExtState * @var uint16_t */ uint16_t _port; + + /** + * Timeout for the connect call in seconds. + * @var int + */ + int _timeout; /** * A pipe that is used to send back the socket that is connected to RabbitMQ @@ -93,6 +99,9 @@ class TcpResolver : public TcpExtState // get address info AddressInfo addresses(_hostname.data(), _port); + // an fdset to monitor for writability + fd_set writeset; + // iterate over the addresses for (size_t i = 0; i < addresses.size(); ++i) { @@ -101,12 +110,47 @@ class TcpResolver : public TcpExtState // move on on failure if (_socket < 0) continue; - - // connect to the socket + + // turn socket into a non-blocking socket and set the close-on-exec bit + fcntl(_socket, F_SETFL, O_NONBLOCK | O_CLOEXEC); + + // try to connect non-blocking if (connect(_socket, addresses[i]->ai_addr, addresses[i]->ai_addrlen) == 0) break; - + + // we set the timeout to a timeout, with 5 seconds as the default + struct timeval timeout{_timeout,0}; + + // reset the fdset + FD_ZERO(&writeset); + + // set the fd to monitor for writing + FD_SET(_socket, &writeset); + + // perform a select, wait for something to happen on one of the fds + int ret = select(_socket + 1, nullptr, &writeset, nullptr, &timeout); + // log the error for the time being - _error = strerror(errno); + if (ret == 0) _error = "connection timed out"; + + // otherwise, select might've failed + else if (ret < 0) _error = strerror(errno); + + // otherwise the connect failed/succeeded + else + { + // the error + int err = 0; + socklen_t len = 4; + + // get the options + getsockopt(_socket, SOL_SOCKET, SO_ERROR, &err, &len); + + // if the error is zero, we break, socket is now valid + if (err == 0) break; + + // set the error with the value + _error = strerror(err); + } // close socket because connect failed ::close(_socket); @@ -118,9 +162,6 @@ class TcpResolver : public TcpExtState // connection succeeded, mark socket as non-blocking if (_socket >= 0) { - // turn socket into a non-blocking socket and set the close-on-exec bit - fcntl(_socket, F_SETFL, O_NONBLOCK | O_CLOEXEC); - // we want to enable "nodelay" on sockets (otherwise all send operations are s-l-o-w int optval = 1; @@ -138,11 +179,8 @@ class TcpResolver : public TcpExtState _error = error.what(); } - // notify the master thread by sending a byte over the pipe - if (!_pipe.notify()) - { - _error = strerror(errno); - } + // notify the master thread by sending a byte over the pipe, store error if this fails + if (!_pipe.notify()) _error = strerror(errno); } public: @@ -152,12 +190,14 @@ class TcpResolver : public TcpExtState * @param hostname The hostname for the lookup * @param portnumber The portnumber for the lookup * @param secure Do we need a secure tls connection when ready? + * @param timeout timeout per connection attempt */ - TcpResolver(TcpParent *parent, std::string hostname, uint16_t port, bool secure) : + TcpResolver(TcpParent *parent, std::string hostname, uint16_t port, bool secure, int timeout) : TcpExtState(parent), _hostname(std::move(hostname)), _secure(secure), - _port(port) + _port(port), + _timeout(timeout) { // tell the event loop to monitor the filedescriptor of the pipe parent->onIdle(this, _pipe.in(), readable);