Skip to content
This repository has been archived by the owner on Feb 20, 2021. It is now read-only.

Commit

Permalink
[Fix] Use overlapped sockets for cluster failover communication.
Browse files Browse the repository at this point in the history
  • Loading branch information
enricogior committed Jul 1, 2016
1 parent 2388de8 commit ef8807b
Showing 1 changed file with 54 additions and 7 deletions.
61 changes: 54 additions & 7 deletions src/cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ POSIX_ONLY(#include <sys/socket.h>)
POSIX_ONLY(#include <sys/file.h>)
#include <math.h>

WIN32_ONLY(extern int WSIOCP_QueueAccept(int listenfd);)

/* A global reference to myself is handy to make code more clear.
* Myself always points to server.cluster->myself, that is, the clusterNode
* that represents this node. */
Expand Down Expand Up @@ -598,14 +600,23 @@ void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {

/* If the server is starting up, don't accept cluster connections:
* UPDATE messages may interact with the database content. */
if (server.masterhost == NULL && server.loading) return;
if (server.masterhost == NULL && server.loading) {
WIN32_ONLY(WSIOCP_QueueAccept(fd);)
return;
}

while(max--) {
cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
if (cfd == ANET_ERR) {
if (errno != EWOULDBLOCK)
redisLog(REDIS_VERBOSE,
"Error accepting cluster node: %s", server.neterr);
#ifdef _WIN32
if (WSIOCP_QueueAccept(fd) == -1) {
redisLog(REDIS_WARNING,
"acceptTcpHandler: failed to queue another accept.");
}
#endif
return;
}
anetNonBlock(NULL,cfd);
Expand Down Expand Up @@ -1994,6 +2005,42 @@ void handleLinkIOError(clusterLink *link) {
freeClusterLink(link);
}

#ifdef _WIN32
void clusterWriteDone(aeEventLoop *el, int fd, void *privdata, int written) {
WSIOCP_Request *req = (WSIOCP_Request *) privdata;
clusterLink *link = (clusterLink *) req->client;
REDIS_NOTUSED(el);
REDIS_NOTUSED(fd);

if (sdslen(link->sndbuf) == written) {
sdsrange(link->sndbuf, written, -1);
aeDeleteFileEvent(server.el, link->fd, AE_WRITABLE);
redisLog(REDIS_WARNING, "clusterWriteDone written %d fd %d", written, link->fd);
}
}

void clusterWriteHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
clusterLink *link = (clusterLink*) privdata;
REDIS_NOTUSED(el);
REDIS_NOTUSED(mask);

int result = WSIOCP_SocketSend(fd,
(char*) link->sndbuf,
(int) (sdslen(link->sndbuf)),
el,
link,
NULL,
clusterWriteDone);
if (errno == WSA_IO_PENDING)
redisLog(REDIS_WARNING, "WSA_IO_PENDING writing to socket fd %d", link->fd);

if (result == SOCKET_ERROR && errno != WSA_IO_PENDING) {
redisLog(REDIS_WARNING, "Error writing to socket fd", link->fd);
handleLinkIOError(link);
return;
}
}
#else
/* Send data. This is handled using a trivial send buffer that gets
* consumed by write(). We don't try to optimize this for speed too much
* as this is a very low traffic channel. */
Expand All @@ -2010,10 +2057,11 @@ void clusterWriteHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
handleLinkIOError(link);
return;
}
sdsrange(link->sndbuf,(int)nwritten,-1); WIN_PORT_FIX /* cast (int) */
sdsrange(link->sndbuf,nwritten,-1);
if (sdslen(link->sndbuf) == 0)
aeDeleteFileEvent(server.el, link->fd, AE_WRITABLE);
}
#endif

/* Read data. Try to read the first field of the header first to check the
* full length of the packet. When a whole packet is in memory this function
Expand Down Expand Up @@ -2046,22 +2094,22 @@ void clusterReadHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
"Bad message length or signature received "
"from Cluster bus.");
handleLinkIOError(link);
IF_WIN32(goto done,return);
return;
}
}
readlen = ntohl(hdr->totlen) - rcvbuflen;
if (readlen > sizeof(buf)) readlen = sizeof(buf);
}

nread = read(fd,buf,readlen);
if (nread == -1 && errno == EAGAIN) IF_WIN32(goto done, return); /* No more data ready. */
if (nread == -1 && errno == EAGAIN) { WIN32_ONLY(WSIOCP_QueueNextRead(fd);) return; } /* No more data ready. */

if (nread <= 0) {
/* I/O error... */
redisLog(REDIS_DEBUG,"I/O error reading from node link: %s",
(nread == 0) ? "connection closed" : strerror(errno));
handleLinkIOError(link);
IF_WIN32(goto done, return);
return;
} else {
/* Read data and recast the pointer to the new buffer. */
link->rcvbuf = sdscatlen(link->rcvbuf,buf,nread);
Expand All @@ -2075,11 +2123,10 @@ void clusterReadHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
sdsfree(link->rcvbuf);
link->rcvbuf = sdsempty();
} else {
IF_WIN32(goto done, return); /* Link no longer valid. */
return; /* Link no longer valid. */
}
}
}
WIN32_ONLY(done:)
WIN32_ONLY(WSIOCP_QueueNextRead(fd);)
}

Expand Down

0 comments on commit ef8807b

Please sign in to comment.