diff --git a/src/cluster.c b/src/cluster.c index 44f0c9c..261bdd4 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -504,101 +504,6 @@ static void cluster_nodes_swap_ctx(dict *nodes_f, dict *nodes_t) { } } -static int cluster_master_slave_mapping_with_name(valkeyClusterContext *cc, - dict **nodes, - valkeyClusterNode *node, - sds master_name) { - int ret; - dictEntry *di; - valkeyClusterNode *node_old; - listNode *lnode; - - if (node == NULL || master_name == NULL) { - return VALKEY_ERR; - } - - if (*nodes == NULL) { - *nodes = dictCreate(&clusterNodesRefDictType, NULL); - if (*nodes == NULL) { - goto oom; - } - } - - di = dictFind(*nodes, master_name); - if (di == NULL) { - sds key = sdsnewlen(master_name, sdslen(master_name)); - if (key == NULL) { - goto oom; - } - ret = dictAdd(*nodes, key, node); - if (ret != DICT_OK) { - sdsfree(key); - goto oom; - } - - } else { - node_old = dictGetEntryVal(di); - if (node_old == NULL) { - valkeyClusterSetError(cc, VALKEY_ERR_OTHER, "dict get value null"); - return VALKEY_ERR; - } - - if (node->role == VALKEY_ROLE_MASTER && - node_old->role == VALKEY_ROLE_MASTER) { - valkeyClusterSetError(cc, VALKEY_ERR_OTHER, - "two masters have the same name"); - return VALKEY_ERR; - } else if (node->role == VALKEY_ROLE_MASTER && - node_old->role == VALKEY_ROLE_SLAVE) { - if (node->slaves == NULL) { - node->slaves = listCreate(); - if (node->slaves == NULL) { - goto oom; - } - - node->slaves->free = listClusterNodeDestructor; - } - - if (node_old->slaves != NULL) { - while (listLength(node_old->slaves) > 0) { - lnode = listFirst(node_old->slaves); - if (listAddNodeHead(node->slaves, lnode->value) == NULL) { - goto oom; - } - node_old->slaves->free = NULL; - listDelNode(node_old->slaves, lnode); - } - listRelease(node_old->slaves); - node_old->slaves = NULL; - } - - if (listAddNodeHead(node->slaves, node_old) == NULL) { - goto oom; - } - dictSetHashVal(*nodes, di, node); - - } else if (node->role == VALKEY_ROLE_SLAVE) { - if (node_old->slaves == NULL) { - node_old->slaves = listCreate(); - if (node_old->slaves == NULL) { - goto oom; - } - - node_old->slaves->free = listClusterNodeDestructor; - } - if (listAddNodeTail(node_old->slaves, node) == NULL) { - goto oom; - } - } - } - - return VALKEY_OK; - -oom: - valkeyClusterSetError(cc, VALKEY_ERR_OOM, "Out of memory"); - return VALKEY_ERR; -} - /** * Parse the "cluster slots" command reply to nodes dict. */ @@ -784,6 +689,92 @@ static dict *parse_cluster_slots(valkeyClusterContext *cc, valkeyReply *reply) { return NULL; } +/* Store a parsed replica node in a dict using the primary_id as key. + * Additional replicas for a primary are stored within the first replica. */ +static int store_replica_node(dict **replicas, char *primary_id, valkeyClusterNode *node) { + if (*replicas == NULL) { + *replicas = dictCreate(&clusterNodesDictType, NULL); + if (replicas == NULL) + return VALKEY_ERR; + } + + sds key = sdsnew(primary_id); + if (key == NULL) + return VALKEY_ERR; + + dictEntry *de = dictFind(*replicas, key); + if (de == NULL) { + if (dictAdd(*replicas, key, node) != DICT_OK) { + sdsfree(key); + return VALKEY_ERR; + } + return VALKEY_OK; + } + + /* Store replica node in the existing replica node. */ + sdsfree(key); + valkeyClusterNode *n = dictGetEntryVal(de); + if (n->slaves == NULL) { + n->slaves = listCreate(); + if (n->slaves == NULL) + return VALKEY_ERR; + n->slaves->free = listClusterNodeDestructor; + } + if (listAddNodeTail(n->slaves, node) == NULL) + return VALKEY_ERR; + + return VALKEY_OK; +} + +/* Move parsed replica nodes from the collection to related primary. */ +static int move_replica_nodes(dict *replicas, dict *nodes) { + if (replicas == NULL) + return VALKEY_OK; + + dictIterator di; + dictInitIterator(&di, nodes); + dictEntry *de; + while ((de = dictNext(&di))) { + valkeyClusterNode *primary = dictGetEntryVal(de); + + /* Move all replica nodes related to this primary. */ + dictEntry *der = dictFind(replicas, primary->name); + if (der != NULL) { + if (primary->slaves == NULL) { + primary->slaves = listCreate(); + if (primary->slaves == NULL) { + return VALKEY_ERR; + } + primary->slaves->free = listClusterNodeDestructor; + } + + /* Move all replicas stored in the first parsed replica. */ + valkeyClusterNode *replica = dictGetEntryVal(der); + if (replica->slaves != NULL) { + while (listLength(replica->slaves) > 0) { + listNode *node = listFirst(replica->slaves); + if (listAddNodeTail(primary->slaves, node->value) == NULL) { + return VALKEY_ERR; + } + /* Delete element without freeing the moved cluster node. */ + replica->slaves->free = NULL; + listDelNode(replica->slaves, node); + replica->slaves->free = listClusterNodeDestructor; + } + listRelease(replica->slaves); + replica->slaves = NULL; + } + /* Move replica that was parsed first. */ + if (listAddNodeHead(primary->slaves, replica) == NULL) { + return VALKEY_ERR; + } + /* Replicas moved, reset the dict data to avoid freeing. */ + dictSetHashVal(replicas, der, NULL); + } + } + return VALKEY_OK; +} + /* Parse a node from a single CLUSTER NODES line. Only parse primary nodes if * the 'replica_master_id' argument is NULL, otherwise replicas are parsed and * its master_id is given via 'replica_master_id'. */ @@ -944,9 +935,9 @@ static int parse_cluster_nodes_line(valkeyClusterContext *cc, char *line, */ static dict *parse_cluster_nodes(valkeyClusterContext *cc, valkeyReply *reply) { dict *nodes = NULL; - dict *nodes_name = NULL; int slot_ranges_found = 0; int add_replicas = cc->flags & VALKEYCLUSTER_FLAG_ADD_SLAVE; + dict *replicas = NULL; if (reply->type != VALKEY_REPLY_STRING) { valkeyClusterSetError(cc, VALKEY_ERR_OTHER, "Unexpected reply type"); @@ -991,24 +982,12 @@ static dict *parse_cluster_nodes(valkeyClusterContext *cc, valkeyReply *reply) { } slot_ranges_found += listLength(node->slots); - if (add_replicas) { - if (cluster_master_slave_mapping_with_name(cc, &nodes_name, node, node->name) != VALKEY_OK) { - goto error; - } - } } else { assert(node->role == VALKEY_ROLE_SLAVE); - sds id = sdsnew(master_id); - if (id == NULL) { + if (store_replica_node(&replicas, master_id, node) != VALKEY_OK) { freeValkeyClusterNode(node); goto oom; } - if (cluster_master_slave_mapping_with_name(cc, &nodes_name, node, id) != VALKEY_OK) { - freeValkeyClusterNode(node); - sdsfree(id); - goto error; - } - sdsfree(id); } } @@ -1017,7 +996,10 @@ static dict *parse_cluster_nodes(valkeyClusterContext *cc, valkeyReply *reply) { goto error; } - dictRelease(nodes_name); + if (move_replica_nodes(replicas, nodes) != VALKEY_OK) { + goto oom; + } + dictRelease(replicas); return nodes; @@ -1026,18 +1008,7 @@ static dict *parse_cluster_nodes(valkeyClusterContext *cc, valkeyReply *reply) { // passthrough error: - if (nodes_name != NULL) { - /* Only free parsed replicas since the `nodes` dict owns primary nodes. */ - dictIterator di; - dictInitIterator(&di, nodes_name); - dictEntry *de; - while ((de = dictNext(&di))) { - valkeyClusterNode *node = dictGetEntryVal(de); - if (node->role == VALKEY_ROLE_SLAVE) - freeValkeyClusterNode(node); - } - dictRelease(nodes_name); - } + dictRelease(replicas); dictRelease(nodes); return NULL; } diff --git a/tests/ct_out_of_memory_handling.c b/tests/ct_out_of_memory_handling.c index 6877c8d..201599f 100644 --- a/tests/ct_out_of_memory_handling.c +++ b/tests/ct_out_of_memory_handling.c @@ -171,14 +171,14 @@ void test_alloc_failure_handling(void) { // Connect { - for (int i = 0; i < 91; ++i) { + for (int i = 0; i < 88; ++i) { prepare_allocation_test(cc, i); result = valkeyClusterConnect2(cc); assert(result == VALKEY_ERR); ASSERT_STR_EQ(cc->errstr, "Out of memory"); } - prepare_allocation_test(cc, 91); + prepare_allocation_test(cc, 88); result = valkeyClusterConnect2(cc); assert(result == VALKEY_OK); } @@ -521,14 +521,14 @@ void test_alloc_failure_handling_async(void) { // Connect { - for (int i = 0; i < 89; ++i) { + for (int i = 0; i < 86; ++i) { prepare_allocation_test(acc->cc, i); result = valkeyClusterConnect2(acc->cc); assert(result == VALKEY_ERR); ASSERT_STR_EQ(acc->cc->errstr, "Out of memory"); } - prepare_allocation_test(acc->cc, 89); + prepare_allocation_test(acc->cc, 86); result = valkeyClusterConnect2(acc->cc); assert(result == VALKEY_OK); } diff --git a/tests/ut_slotmap_update.c b/tests/ut_slotmap_update.c index b1cc848..5d80f12 100644 --- a/tests/ut_slotmap_update.c +++ b/tests/ut_slotmap_update.c @@ -289,14 +289,14 @@ void test_parse_cluster_nodes_with_multiple_replicas(void) { assert(strcmp(node->addr, "127.0.0.1:30004") == 0); assert(node->role == VALKEY_ROLE_SLAVE); node = listNodeValue(listNext(&li)); - assert(strcmp(node->name, "824fe116063bc5fcf9f4ffd895bc17aee7731ac3") == 0); - assert(strcmp(node->addr, "127.0.0.1:30006") == 0); - assert(node->role == VALKEY_ROLE_SLAVE); - node = listNodeValue(listNext(&li)); assert(strcmp(node->name, "6ec23923021cf3ffec47632106199cb7f496ce01") == 0); assert(strcmp(node->addr, "127.0.0.1:30005") == 0); assert(node->role == VALKEY_ROLE_SLAVE); node = listNodeValue(listNext(&li)); + assert(strcmp(node->name, "824fe116063bc5fcf9f4ffd895bc17aee7731ac3") == 0); + assert(strcmp(node->addr, "127.0.0.1:30006") == 0); + assert(node->role == VALKEY_ROLE_SLAVE); + node = listNodeValue(listNext(&li)); assert(strcmp(node->name, "67ed2db8d677e59ec4a4cefb06858cf2a1a89fa1") == 0); assert(strcmp(node->addr, "127.0.0.1:30002") == 0); assert(node->role == VALKEY_ROLE_SLAVE);