From f208ac9083603155b9da81e44f2ef14ac5ada0cc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Svensson?= Date: Tue, 8 Oct 2024 19:04:03 +0200 Subject: [PATCH] Refactor replica handling in cluster nodes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Björn Svensson --- src/cluster.c | 197 ++++++++++++++++++++++---------------------------- 1 file changed, 87 insertions(+), 110 deletions(-) diff --git a/src/cluster.c b/src/cluster.c index 3241109..2f1d992 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -506,101 +506,6 @@ static void cluster_nodes_swap_ctx(dict *nodes_f, dict *nodes_t) { } } -static int cluster_master_slave_mapping_with_name(valkeyClusterContext *cc, - dict **nodes, - valkeyClusterNode *node, - sds master_name) { - int ret; - dictEntry *di; - valkeyClusterNode *node_old; - listNode *lnode; - - if (node == NULL || master_name == NULL) { - return VALKEY_ERR; - } - - if (*nodes == NULL) { - *nodes = dictCreate(&clusterNodesRefDictType, NULL); - if (*nodes == NULL) { - goto oom; - } - } - - di = dictFind(*nodes, master_name); - if (di == NULL) { - sds key = sdsnewlen(master_name, sdslen(master_name)); - if (key == NULL) { - goto oom; - } - ret = dictAdd(*nodes, key, node); - if (ret != DICT_OK) { - sdsfree(key); - goto oom; - } - - } else { - node_old = dictGetEntryVal(di); - if (node_old == NULL) { - valkeyClusterSetError(cc, VALKEY_ERR_OTHER, "dict get value null"); - return VALKEY_ERR; - } - - if (node->role == VALKEY_ROLE_MASTER && - node_old->role == VALKEY_ROLE_MASTER) { - valkeyClusterSetError(cc, VALKEY_ERR_OTHER, - "two masters have the same name"); - return VALKEY_ERR; - } else if (node->role == VALKEY_ROLE_MASTER && - node_old->role == VALKEY_ROLE_SLAVE) { - if (node->slaves == NULL) { - node->slaves = listCreate(); - if (node->slaves == NULL) { - goto oom; - } - - node->slaves->free = listClusterNodeDestructor; - } - - if (node_old->slaves != NULL) { - node_old->slaves->free = NULL; - while (listLength(node_old->slaves) > 0) { - lnode = listFirst(node_old->slaves); - if (listAddNodeHead(node->slaves, lnode->value) == NULL) { - goto oom; - } - listDelNode(node_old->slaves, lnode); - } - listRelease(node_old->slaves); - node_old->slaves = NULL; - } - - if (listAddNodeHead(node->slaves, node_old) == NULL) { - goto oom; - } - dictSetHashVal(*nodes, di, node); - - } else if (node->role == VALKEY_ROLE_SLAVE) { - if (node_old->slaves == NULL) { - node_old->slaves = listCreate(); - if (node_old->slaves == NULL) { - goto oom; - } - - node_old->slaves->free = listClusterNodeDestructor; - } - if (listAddNodeTail(node_old->slaves, node) == NULL) { - goto oom; - } - } - } - - return VALKEY_OK; - -oom: - valkeyClusterSetError(cc, VALKEY_ERR_OOM, "Out of memory"); - return VALKEY_ERR; -} - /** * Parse the "cluster slots" command reply to nodes dict. */ @@ -790,6 +695,83 @@ static dict *parse_cluster_slots(valkeyClusterContext *cc, valkeyReply *reply) { return NULL; } +/* Store a replica node in a dict using the primary_id as key. Additional + * replicas for a primary are stored within the first added replica. */ +static int store_replica_node(dict **replicas, valkeyClusterNode *node, char *primary_id) { + if (*replicas == NULL) { + *replicas = dictCreate(&clusterNodesRefDictType, NULL); + if (replicas == NULL) + return VALKEY_ERR; + } + + sds key = sdsnew(primary_id); + if (key == NULL) + return VALKEY_ERR; + + dictEntry *de = dictFind(*replicas, key); + if (de == NULL) { + if (dictAdd(*replicas, key, node) != DICT_OK) { + sdsfree(key); + return VALKEY_ERR; + } + return VALKEY_OK; + } + + /* Store node in existing node. */ + sdsfree(key); + valkeyClusterNode *n = dictGetEntryVal(de); + if (n->slaves == NULL) { + n->slaves = listCreate(); + if (n->slaves == NULL) + return VALKEY_ERR; + n->slaves->free = listClusterNodeDestructor; + } + if (listAddNodeTail(n->slaves, node) == NULL) + return VALKEY_ERR; + + return VALKEY_OK; +} + +static int add_replica_nodes(dict *nodes, dict *replicas) { + dictIterator di; + dictInitIterator(&di, nodes); + + dictEntry *de; + while ((de = dictNext(&di))) { + valkeyClusterNode *primary = dictGetEntryVal(de); + + dictEntry *der = dictFind(replicas, primary->name); + if (der != NULL) { + if (primary->slaves == NULL) { + primary->slaves = listCreate(); + if (primary->slaves == NULL) { + return VALKEY_ERR; + } + primary->slaves->free = listClusterNodeDestructor; + } + + valkeyClusterNode *replica = dictGetEntryVal(der); + if (replica->slaves != NULL) { + replica->slaves->free = NULL; + while (listLength(replica->slaves) > 0) { + listNode *node = listFirst(replica->slaves); + if (listAddNodeHead(primary->slaves, node->value) == NULL) { + return VALKEY_ERR; + } + listDelNode(replica->slaves, node); + } + listRelease(replica->slaves); + replica->slaves = NULL; + } + if (listAddNodeHead(primary->slaves, replica) == NULL) { + return VALKEY_ERR; + } + } + } + + return VALKEY_OK; +} + /* Parse a node from a single CLUSTER NODES line. Only parse primary nodes if * the 'replica_master_id' argument is NULL, otherwise replicas are parsed and * its master_id is given via 'replica_master_id'. */ @@ -950,9 +932,9 @@ static valkeyClusterNode *parse_cluster_nodes_line(valkeyClusterContext *cc, cha */ static dict *parse_cluster_nodes(valkeyClusterContext *cc, valkeyReply *reply) { dict *nodes = NULL; - dict *nodes_name = NULL; int slot_ranges_found = 0; int add_replicas = cc->flags & VALKEYCLUSTER_FLAG_ADD_SLAVE; + dict *replicas = NULL; if (reply->type != VALKEY_REPLY_STRING) { valkeyClusterSetError(cc, VALKEY_ERR_OTHER, "Unexpected reply type"); @@ -999,20 +981,12 @@ static dict *parse_cluster_nodes(valkeyClusterContext *cc, valkeyReply *reply) { } slot_ranges_found += listLength(node->slots); - if (add_replicas) { - if (cluster_master_slave_mapping_with_name(cc, &nodes_name, node, node->name) != VALKEY_OK) { - goto error; - } - } } else { assert(node->role == VALKEY_ROLE_SLAVE); - sds id = sdsnew(master_id); - if (cluster_master_slave_mapping_with_name(cc, &nodes_name, node, id) != VALKEY_OK) { + if (store_replica_node(&replicas, node, master_id) != VALKEY_OK) { freeValkeyClusterNode(node); - sdsfree(id); - goto error; + goto oom; } - sdsfree(id); } } @@ -1021,8 +995,11 @@ static dict *parse_cluster_nodes(valkeyClusterContext *cc, valkeyReply *reply) { goto error; } - if (nodes_name != NULL) { - dictRelease(nodes_name); + if (replicas) { + if (add_replica_nodes(nodes, replicas) != VALKEY_OK) { + goto oom; + } + dictRelease(replicas); } return nodes; @@ -1035,8 +1012,8 @@ static dict *parse_cluster_nodes(valkeyClusterContext *cc, valkeyReply *reply) { if (nodes != NULL) { dictRelease(nodes); } - if (nodes_name != NULL) { - dictRelease(nodes_name); + if (replicas != NULL) { + dictRelease(replicas); } return NULL; }