Skip to content

Commit

Permalink
Merge pull request #355 from CopernicaMarketingSoftware/connect-timeout
Browse files Browse the repository at this point in the history
Implement connectTimeout option on the TcpConnection initial resolve
  • Loading branch information
EmielBruijntjes authored Oct 5, 2020
2 parents 0686286 + 103fa13 commit 668bf82
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 15 deletions.
2 changes: 1 addition & 1 deletion src/linux_tcp/tcpconnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ namespace AMQP {
*/
TcpConnection::TcpConnection(TcpHandler *handler, const Address &address) :
_handler(handler),
_state(new TcpResolver(this, address.hostname(), address.port(), address.secure())),
_state(new TcpResolver(this, address.hostname(), address.port(), address.secure(), address.option("connectTimeout", 5))),
_connection(this, address.login(), address.vhost())
{
// tell the handler
Expand Down
68 changes: 54 additions & 14 deletions src/linux_tcp/tcpresolver.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,12 @@ class TcpResolver : public TcpExtState
* @var uint16_t
*/
uint16_t _port;

/**
* Timeout for the connect call in seconds.
* @var int
*/
int _timeout;

/**
* A pipe that is used to send back the socket that is connected to RabbitMQ
Expand Down Expand Up @@ -93,6 +99,9 @@ class TcpResolver : public TcpExtState
// get address info
AddressInfo addresses(_hostname.data(), _port);

// an fdset to monitor for writability
fd_set writeset;

// iterate over the addresses
for (size_t i = 0; i < addresses.size(); ++i)
{
Expand All @@ -101,12 +110,47 @@ class TcpResolver : public TcpExtState

// move on on failure
if (_socket < 0) continue;

// connect to the socket

// turn socket into a non-blocking socket and set the close-on-exec bit
fcntl(_socket, F_SETFL, O_NONBLOCK | O_CLOEXEC);

// try to connect non-blocking
if (connect(_socket, addresses[i]->ai_addr, addresses[i]->ai_addrlen) == 0) break;


// we set the timeout to a timeout, with 5 seconds as the default
struct timeval timeout{_timeout,0};

// reset the fdset
FD_ZERO(&writeset);

// set the fd to monitor for writing
FD_SET(_socket, &writeset);

// perform a select, wait for something to happen on one of the fds
int ret = select(_socket + 1, nullptr, &writeset, nullptr, &timeout);

// log the error for the time being
_error = strerror(errno);
if (ret == 0) _error = "connection timed out";

// otherwise, select might've failed
else if (ret < 0) _error = strerror(errno);

// otherwise the connect failed/succeeded
else
{
// the error
int err = 0;
socklen_t len = 4;

// get the options
getsockopt(_socket, SOL_SOCKET, SO_ERROR, &err, &len);

// if the error is zero, we break, socket is now valid
if (err == 0) break;

// set the error with the value
_error = strerror(err);
}

// close socket because connect failed
::close(_socket);
Expand All @@ -118,9 +162,6 @@ class TcpResolver : public TcpExtState
// connection succeeded, mark socket as non-blocking
if (_socket >= 0)
{
// turn socket into a non-blocking socket and set the close-on-exec bit
fcntl(_socket, F_SETFL, O_NONBLOCK | O_CLOEXEC);

// we want to enable "nodelay" on sockets (otherwise all send operations are s-l-o-w
int optval = 1;

Expand All @@ -138,11 +179,8 @@ class TcpResolver : public TcpExtState
_error = error.what();
}

// notify the master thread by sending a byte over the pipe
if (!_pipe.notify())
{
_error = strerror(errno);
}
// notify the master thread by sending a byte over the pipe, store error if this fails
if (!_pipe.notify()) _error = strerror(errno);
}

public:
Expand All @@ -152,12 +190,14 @@ class TcpResolver : public TcpExtState
* @param hostname The hostname for the lookup
* @param portnumber The portnumber for the lookup
* @param secure Do we need a secure tls connection when ready?
* @param timeout timeout per connection attempt
*/
TcpResolver(TcpParent *parent, std::string hostname, uint16_t port, bool secure) :
TcpResolver(TcpParent *parent, std::string hostname, uint16_t port, bool secure, int timeout) :
TcpExtState(parent),
_hostname(std::move(hostname)),
_secure(secure),
_port(port)
_port(port),
_timeout(timeout)
{
// tell the event loop to monitor the filedescriptor of the pipe
parent->onIdle(this, _pipe.in(), readable);
Expand Down

0 comments on commit 668bf82

Please sign in to comment.