diff --git a/src/cluster.c b/src/cluster.c index 3a7d8e2..6862a19 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -672,8 +672,7 @@ static int cluster_master_slave_mapping_with_name(valkeyClusterContext *cc, /** * Parse the "cluster slots" command reply to nodes dict. */ -static dict *parse_cluster_slots(valkeyClusterContext *cc, valkeyReply *reply, - int flags) { +static dict *parse_cluster_slots(valkeyClusterContext *cc, valkeyReply *reply) { int ret; cluster_slot *slot = NULL; dict *nodes = NULL; @@ -685,8 +684,13 @@ static dict *parse_cluster_slots(valkeyClusterContext *cc, valkeyReply *reply, valkeyClusterNode *master = NULL, *slave; uint32_t i, idx; - if (reply == NULL) { - return NULL; + if (reply->type != VALKEY_REPLY_ARRAY) { + valkeyClusterSetError(cc, VALKEY_ERR_OTHER, "Unexpected reply type"); + goto error; + } + if (reply->elements == 0) { + valkeyClusterSetError(cc, VALKEY_ERR_OTHER, "No slot information"); + goto error; } nodes = dictCreate(&clusterNodesDictType, NULL); @@ -694,13 +698,6 @@ static dict *parse_cluster_slots(valkeyClusterContext *cc, valkeyReply *reply, goto oom; } - if (reply->type != VALKEY_REPLY_ARRAY || reply->elements <= 0) { - valkeyClusterSetError(cc, VALKEY_ERR_OTHER, - "Command(cluster slots) reply error: " - "reply is not an array."); - goto error; - } - for (i = 0; i < reply->elements; i++) { elem_slots = reply->element[i]; if (elem_slots->type != VALKEY_REPLY_ARRAY || @@ -819,7 +816,7 @@ static dict *parse_cluster_slots(valkeyClusterContext *cc, valkeyReply *reply, } slot = NULL; - } else if (flags & VALKEYCLUSTER_FLAG_ADD_SLAVE) { + } else if (cc->flags & VALKEYCLUSTER_FLAG_ADD_SLAVE) { slave = node_get_with_slots(cc, elem_ip, elem_port, VALKEY_ROLE_SLAVE); if (slave == NULL) { @@ -864,8 +861,7 @@ static dict *parse_cluster_slots(valkeyClusterContext *cc, valkeyReply *reply, /** * Parse the "cluster nodes" command reply to nodes dict. */ -static dict *parse_cluster_nodes(valkeyClusterContext *cc, char *str, int str_len, - int flags) { +static dict *parse_cluster_nodes(valkeyClusterContext *cc, valkeyReply *reply) { int ret; dict *nodes = NULL; dict *nodes_name = NULL; @@ -880,13 +876,18 @@ static dict *parse_cluster_nodes(valkeyClusterContext *cc, char *str, int str_le int k; int len; + if (reply->type != VALKEY_REPLY_STRING) { + valkeyClusterSetError(cc, VALKEY_ERR_OTHER, "Unexpected reply type"); + goto error; + } + nodes = dictCreate(&clusterNodesDictType, NULL); if (nodes == NULL) { goto oom; } - start = str; - end = start + str_len; + start = reply->str; + end = start + reply->len; line_start = start; @@ -940,19 +941,20 @@ static dict *parse_cluster_nodes(valkeyClusterContext *cc, char *str, int str_le freeValkeyClusterNode(master); goto oom; } - - ret = dictAdd(nodes, key, master); - if (ret != DICT_OK) { - // Key already exists, but possibly an OOM error - valkeyClusterSetError( - cc, VALKEY_ERR_OTHER, - "The address already exists in the nodes"); + if (dictFind(nodes, key) != NULL) { + valkeyClusterSetError(cc, VALKEY_ERR_OTHER, + "Duplicate addresses in cluster nodes response"); sdsfree(key); freeValkeyClusterNode(master); goto error; } + if (dictAdd(nodes, key, master) != DICT_OK) { + sdsfree(key); + freeValkeyClusterNode(master); + goto oom; + } - if (flags & VALKEYCLUSTER_FLAG_ADD_SLAVE) { + if (cc->flags & VALKEYCLUSTER_FLAG_ADD_SLAVE) { ret = cluster_master_slave_mapping_with_name( cc, &nodes_name, master, master->name); if (ret != VALKEY_OK) { @@ -1004,7 +1006,7 @@ static dict *parse_cluster_nodes(valkeyClusterContext *cc, char *str, int str_le } // add slave node - else if ((flags & VALKEYCLUSTER_FLAG_ADD_SLAVE) && + else if ((cc->flags & VALKEYCLUSTER_FLAG_ADD_SLAVE) && (role_len >= 5 && memcmp(role, "slave", 5) == 0)) { slave = node_get_with_nodes(cc, part, count_part, VALKEY_ROLE_SLAVE); @@ -1064,92 +1066,41 @@ static int clusterUpdateRouteSendCommand(valkeyClusterContext *cc, VALKEY_COMMAND_CLUSTER_SLOTS : VALKEY_COMMAND_CLUSTER_NODES); if (valkeyAppendCommand(c, cmd) != VALKEY_OK) { - const char *msg = (cc->flags & VALKEYCLUSTER_FLAG_ROUTE_USE_SLOTS ? - "Command (cluster slots) send error." : - "Command (cluster nodes) send error."); - valkeyClusterSetError(cc, c->err, msg); + valkeyClusterSetError(cc, c->err, c->errstr); return VALKEY_ERR; } /* Flush buffer to socket. */ - if (valkeyBufferWrite(c, NULL) == VALKEY_ERR) + if (valkeyBufferWrite(c, NULL) == VALKEY_ERR) { + valkeyClusterSetError(cc, c->err, c->errstr); return VALKEY_ERR; + } return VALKEY_OK; } -/* Receives and handles a CLUSTER SLOTS reply from node with context c. */ -static int handleClusterSlotsReply(valkeyClusterContext *cc, valkeyContext *c) { +/* Receives and handles a CLUSTER SLOTS or CLUSTER NODES reply from node with + * context c. */ +static int clusterUpdateRouteHandleReply(valkeyClusterContext *cc, + valkeyContext *c) { valkeyReply *reply = NULL; - int result = valkeyGetReply(c, (void **)&reply); - if (result != VALKEY_OK) { - if (c->err == VALKEY_ERR_TIMEOUT) { - valkeyClusterSetError( - cc, c->err, - "Command (cluster slots) reply error (socket timeout)"); - } else { - valkeyClusterSetError( - cc, VALKEY_ERR_OTHER, - "Command (cluster slots) reply error (NULL)."); - } - return VALKEY_ERR; - } else if (reply->type != VALKEY_REPLY_ARRAY) { - if (reply->type == VALKEY_REPLY_ERROR) { - valkeyClusterSetError(cc, VALKEY_ERR_OTHER, reply->str); - } else { - valkeyClusterSetError( - cc, VALKEY_ERR_OTHER, - "Command (cluster slots) reply error: type is not array."); - } - freeReplyObject(reply); + if (valkeyGetReply(c, (void **)&reply) != VALKEY_OK) { + valkeyClusterSetError(cc, c->err, c->errstr); return VALKEY_ERR; } - - dict *nodes = parse_cluster_slots(cc, reply, cc->flags); - freeReplyObject(reply); - return updateNodesAndSlotmap(cc, nodes); -} - -/* Receives and handles a CLUSTER NODES reply from node with context c. */ -static int handleClusterNodesReply(valkeyClusterContext *cc, valkeyContext *c) { - valkeyReply *reply = NULL; - int result = valkeyGetReply(c, (void **)&reply); - if (result != VALKEY_OK) { - if (c->err == VALKEY_ERR_TIMEOUT) { - valkeyClusterSetError(cc, c->err, - "Command (cluster nodes) reply error " - "(socket timeout)"); - } else { - valkeyClusterSetError(cc, VALKEY_ERR_OTHER, - "Command (cluster nodes) reply error " - "(NULL)."); - } - return VALKEY_ERR; - } else if (reply->type != VALKEY_REPLY_STRING) { - if (reply->type == VALKEY_REPLY_ERROR) { - valkeyClusterSetError(cc, VALKEY_ERR_OTHER, reply->str); - } else { - valkeyClusterSetError(cc, VALKEY_ERR_OTHER, - "Command(cluster nodes) reply error: " - "type is not string."); - } + if (reply->type == VALKEY_REPLY_ERROR) { + valkeyClusterSetError(cc, VALKEY_ERR_OTHER, reply->str); freeReplyObject(reply); return VALKEY_ERR; } - dict *nodes = parse_cluster_nodes(cc, reply->str, reply->len, cc->flags); - freeReplyObject(reply); - return updateNodesAndSlotmap(cc, nodes); -} - -/* Receives and handles a CLUSTER SLOTS or CLUSTER NODES reply from node with - * context c. */ -static int clusterUpdateRouteHandleReply(valkeyClusterContext *cc, - valkeyContext *c) { + dict *nodes; if (cc->flags & VALKEYCLUSTER_FLAG_ROUTE_USE_SLOTS) { - return handleClusterSlotsReply(cc, c); + nodes = parse_cluster_slots(cc, reply); } else { - return handleClusterNodesReply(cc, c); + nodes = parse_cluster_nodes(cc, reply); } + freeReplyObject(reply); + return updateNodesAndSlotmap(cc, nodes); } /** @@ -3025,7 +2976,7 @@ void clusterSlotsReplyCallback(valkeyAsyncContext *ac, void *r, } valkeyClusterContext *cc = acc->cc; - dict *nodes = parse_cluster_slots(cc, reply, cc->flags); + dict *nodes = parse_cluster_slots(cc, reply); if (updateNodesAndSlotmap(cc, nodes) != VALKEY_OK) { /* Ignore failures for now */ } @@ -3046,7 +2997,7 @@ void clusterNodesReplyCallback(valkeyAsyncContext *ac, void *r, } valkeyClusterContext *cc = acc->cc; - dict *nodes = parse_cluster_nodes(cc, reply->str, reply->len, cc->flags); + dict *nodes = parse_cluster_nodes(cc, reply); if (updateNodesAndSlotmap(cc, nodes) != VALKEY_OK) { /* Ignore failures for now */ } diff --git a/tests/ct_out_of_memory_handling.c b/tests/ct_out_of_memory_handling.c index 6fba31e..5158b07 100644 --- a/tests/ct_out_of_memory_handling.c +++ b/tests/ct_out_of_memory_handling.c @@ -173,6 +173,7 @@ void test_alloc_failure_handling(void) { prepare_allocation_test(cc, i); result = valkeyClusterConnect2(cc); assert(result == VALKEY_ERR); + ASSERT_STR_EQ(cc->errstr, "Out of memory"); } prepare_allocation_test(cc, 128); @@ -521,6 +522,7 @@ void test_alloc_failure_handling_async(void) { prepare_allocation_test(acc->cc, i); result = valkeyClusterConnect2(acc->cc); assert(result == VALKEY_ERR); + ASSERT_STR_EQ(acc->cc->errstr, "Out of memory"); } prepare_allocation_test(acc->cc, 126);