Skip to content

Commit

Permalink
Update slotmap on error (#182)
Browse files Browse the repository at this point in the history
* When connect failed, update slotmap instead of sending command to random node
* When command failes (timeout, etc.) schedule slotmap update for next command
* Improve some of the error messages from node_get_by_table() 
* Refactoring: Set error message inside node_get_by_table() instead of outside

Co-authored-by: Bjorn Svensson <[email protected]>
  • Loading branch information
zuiderkwast and bjosv authored Aug 29, 2023
1 parent 2680113 commit e1fde9b
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 72 deletions.
82 changes: 30 additions & 52 deletions hircluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,7 @@

#define REDIS_COMMAND_CLUSTER_NODES "CLUSTER NODES"
#define REDIS_COMMAND_CLUSTER_SLOTS "CLUSTER SLOTS"

#define REDIS_COMMAND_ASKING "ASKING"
#define REDIS_COMMAND_PING "PING"

#define IP_PORT_SEPARATOR ':'

Expand Down Expand Up @@ -1425,6 +1423,7 @@ static int updateNodesAndSlotmap(redisClusterContext *cc, dict *nodes) {
cc->event_callback(cc, HIRCLUSTER_EVENT_READY, cc->event_privdata);
}
}
cc->need_update_route = 0;
return REDIS_OK;

oom:
Expand Down Expand Up @@ -2018,50 +2017,27 @@ redisContext *ctx_get_by_node(redisClusterContext *cc, redisClusterNode *node) {

static redisClusterNode *node_get_by_table(redisClusterContext *cc,
uint32_t slot_num) {
if (cc == NULL || cc->table == NULL) {
if (cc == NULL) {
return NULL;
}

if (slot_num >= REDIS_CLUSTER_SLOTS) {
__redisClusterSetError(cc, REDIS_ERR_OTHER, "invalid slot");
return NULL;
}

return cc->table[slot_num];
}

static redisClusterNode *node_get_which_connected(redisClusterContext *cc) {
dictEntry *de;
redisClusterNode *node;
redisContext *c = NULL;

if (cc == NULL || cc->nodes == NULL) {
if (cc->table == NULL) {
__redisClusterSetError(cc, REDIS_ERR_OTHER, "slotmap not available");
return NULL;
}

dictIterator di;
dictInitIterator(&di, cc->nodes);

while ((de = dictNext(&di)) != NULL) {
node = dictGetEntryVal(de);
if (node == NULL) {
continue;
}

c = ctx_get_by_node(cc, node);
if (c == NULL || c->err) {
continue;
}

redisReply *reply = redisCommand(c, REDIS_COMMAND_PING);
if (reply != NULL && reply->type == REDIS_REPLY_STATUS &&
reply->str != NULL && strcmp(reply->str, "PONG") == 0) {
freeReplyObject(reply);
return node;
}
freeReplyObject(reply);
if (cc->table[slot_num] == NULL) {
__redisClusterSetError(cc, REDIS_ERR_OTHER,
"slot not served by any node");
return NULL;
}

return NULL;
return cc->table[slot_num];
}

/* Helper function for the redisClusterAppendCommand* family of functions.
Expand All @@ -2082,7 +2058,6 @@ static int __redisClusterAppendCommand(redisClusterContext *cc,

node = node_get_by_table(cc, (uint32_t)command->slot_num);
if (node == NULL) {
__redisClusterSetError(cc, REDIS_ERR_OTHER, "node get by slot error");
return REDIS_ERR;
}

Expand Down Expand Up @@ -2148,8 +2123,6 @@ static int __redisClusterGetReply(redisClusterContext *cc, int slot_num,

node = node_get_by_table(cc, (uint32_t)slot_num);
if (node == NULL) {
__redisClusterSetError(cc, REDIS_ERR_OTHER,
"slot not served by any node");
return REDIS_ERR;
}

Expand Down Expand Up @@ -2257,28 +2230,21 @@ static void *redis_cluster_command_execute(redisClusterContext *cc,

node = node_get_by_table(cc, (uint32_t)command->slot_num);
if (node == NULL) {
__redisClusterSetError(cc, REDIS_ERR_OTHER, "node get by table error");
goto error;
}

c = ctx_get_by_node(cc, node);
if (c == NULL) {
goto error;
} else if (c->err) {
node = node_get_which_connected(cc);
if (node == NULL) {
__redisClusterSetError(cc, REDIS_ERR_OTHER,
"no reachable node in cluster");
if (c == NULL || c->err) {
/* Failed to connect. Maybe there was a failover and this node is gone.
* Update slotmap to find out. */
if (cluster_update_route(cc) != REDIS_OK) {
goto error;
}

cc->retry_count++;
if (cc->retry_count > cc->max_retry_count) {
__redisClusterSetError(cc, REDIS_ERR_CLUSTER_TOO_MANY_RETRIES,
"too many cluster retries");
node = node_get_by_table(cc, (uint32_t)command->slot_num);
if (node == NULL) {
goto error;
}

c = ctx_get_by_node(cc, node);
if (c == NULL) {
goto error;
Expand All @@ -2297,8 +2263,20 @@ static void *redis_cluster_command_execute(redisClusterContext *cc,
goto error;
}

/* If update slotmap has been scheduled, do that in the same pipeline. */
if (cc->need_update_route && c_updating_route == NULL) {
if (clusterUpdateRouteSendCommand(cc, c) == REDIS_OK) {
c_updating_route = c;
}
}

if (redisGetReply(c, &reply) != REDIS_OK) {
__redisClusterSetError(cc, c->err, c->errstr);
/* We may need to update the slotmap if this node is removed from the
* cluster, but the current request may have already timed out so we
* schedule it for later. */
if (c->err != REDIS_ERR_OOM)
cc->need_update_route = 1;
goto error;
}

Expand Down Expand Up @@ -4133,8 +4111,8 @@ int redisClusterAsyncFormattedCommand(redisClusterAsyncContext *acc,

node = node_get_by_table(cc, (uint32_t)slot_num);
if (node == NULL) {
__redisClusterAsyncSetError(acc, REDIS_ERR_OTHER,
"node get by table error");
/* node_get_by_table() has set the error on cc. */
__redisClusterAsyncSetError(acc, cc->err, cc->errstr);
goto error;
}

Expand Down
4 changes: 2 additions & 2 deletions tests/ct_connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ void test_async_password_wrong(void) {
ret = redisClusterAsyncCommand(acc, commandCallback, &r, "SET key1 Hello");
assert(ret == REDIS_ERR);
assert(acc->err == REDIS_ERR_OTHER);
assert(strcmp(acc->errstr, "node get by table error") == 0);
assert(strcmp(acc->errstr, "slotmap not available") == 0);

event_base_dispatch(base);

Expand Down Expand Up @@ -432,7 +432,7 @@ void test_async_password_missing(void) {
ret = redisClusterAsyncCommand(acc, commandCallback, &r, "SET key1 Hello");
assert(ret == REDIS_ERR);
assert(acc->err == REDIS_ERR_OTHER);
assert(strcmp(acc->errstr, "node get by table error") == 0);
assert(strcmp(acc->errstr, "slotmap not available") == 0);

event_base_dispatch(base);

Expand Down
38 changes: 20 additions & 18 deletions tests/ct_out_of_memory_handling.c
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,19 @@ void prepare_allocation_test_async(redisClusterAsyncContext *acc,
memset(acc->errstr, '\0', strlen(acc->errstr));
}

/* Helper */
redisClusterNode *getNodeByPort(redisClusterContext *cc, int port) {
redisClusterNodeIterator ni;
redisClusterInitNodeIterator(&ni, cc);
redisClusterNode *node;
while ((node = redisClusterNodeNext(&ni)) != NULL) {
if (node->port == port)
return node;
}
assert(0);
return NULL;
}

/* Test of allocation handling in the blocking API */
void test_alloc_failure_handling(void) {
int result;
Expand Down Expand Up @@ -403,11 +416,7 @@ void test_alloc_failure_handling(void) {
prepare_allocation_test(cc, i);
reply = redisClusterCommand(cc, "GET foo");
assert(reply == NULL);
if (i < 14 || i > 26) {
ASSERT_STR_EQ(cc->errstr, "Out of memory");
} else {
ASSERT_STR_EQ(cc->errstr, "no reachable node in cluster");
}
ASSERT_STR_EQ(cc->errstr, "Out of memory");
}

/* Test ASK reply handling without OOM */
Expand All @@ -419,6 +428,9 @@ void test_alloc_failure_handling(void) {
/* Finalize the migration. Skip OOM testing during these steps by
* allowing a high number of allocations. */
prepare_allocation_test(cc, 1000);
/* Fetch the nodes again, in case the slotmap has been reloaded. */
srcNode = redisClusterGetNodeByKey(cc, "foo");
dstNode = getNodeByPort(cc, dstPort);
reply = redisClusterCommandToNode(
cc, srcNode, "CLUSTER SETSLOT %d NODE %s", slot, replyDstId->str);
CHECK_REPLY_OK(cc, reply);
Expand All @@ -433,11 +445,7 @@ void test_alloc_failure_handling(void) {
prepare_allocation_test(cc, i);
reply = redisClusterCommand(cc, "GET foo");
assert(reply == NULL);
if (i < 14 || i > 26) {
ASSERT_STR_EQ(cc->errstr, "Out of memory");
} else {
ASSERT_STR_EQ(cc->errstr, "no reachable node in cluster");
}
ASSERT_STR_EQ(cc->errstr, "Out of memory");
}

/* Test MOVED reply handling without OOM */
Expand All @@ -449,14 +457,8 @@ void test_alloc_failure_handling(void) {
/* MOVED triggers a slotmap update which currently replaces all cluster_node
* objects. We can get the new objects by searching for its server ports.
* This enables us to migrate the slot back to the original node. */
redisClusterInitNodeIterator(&ni, cc);
redisClusterNode *node;
while ((node = redisClusterNodeNext(&ni)) != NULL) {
if (node->port == srcPort)
srcNode = node;
if (node->port == dstPort)
dstNode = node;
}
srcNode = getNodeByPort(cc, srcPort);
dstNode = getNodeByPort(cc, dstPort);

/* Migrate back slot, required by the next testcase. Skip OOM testing
* during these final steps by allowing a high number of allocations. */
Expand Down

0 comments on commit e1fde9b

Please sign in to comment.