Skip to content

Commit

Permalink
Refactor replica handling in cluster nodes
Browse files Browse the repository at this point in the history
Signed-off-by: Björn Svensson <[email protected]>
  • Loading branch information
bjosv committed Oct 8, 2024
1 parent d330e42 commit f208ac9
Showing 1 changed file with 87 additions and 110 deletions.
197 changes: 87 additions & 110 deletions src/cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -506,101 +506,6 @@ static void cluster_nodes_swap_ctx(dict *nodes_f, dict *nodes_t) {
}
}

static int cluster_master_slave_mapping_with_name(valkeyClusterContext *cc,
dict **nodes,
valkeyClusterNode *node,
sds master_name) {
int ret;
dictEntry *di;
valkeyClusterNode *node_old;
listNode *lnode;

if (node == NULL || master_name == NULL) {
return VALKEY_ERR;
}

if (*nodes == NULL) {
*nodes = dictCreate(&clusterNodesRefDictType, NULL);
if (*nodes == NULL) {
goto oom;
}
}

di = dictFind(*nodes, master_name);
if (di == NULL) {
sds key = sdsnewlen(master_name, sdslen(master_name));
if (key == NULL) {
goto oom;
}
ret = dictAdd(*nodes, key, node);
if (ret != DICT_OK) {
sdsfree(key);
goto oom;
}

} else {
node_old = dictGetEntryVal(di);
if (node_old == NULL) {
valkeyClusterSetError(cc, VALKEY_ERR_OTHER, "dict get value null");
return VALKEY_ERR;
}

if (node->role == VALKEY_ROLE_MASTER &&
node_old->role == VALKEY_ROLE_MASTER) {
valkeyClusterSetError(cc, VALKEY_ERR_OTHER,
"two masters have the same name");
return VALKEY_ERR;
} else if (node->role == VALKEY_ROLE_MASTER &&
node_old->role == VALKEY_ROLE_SLAVE) {
if (node->slaves == NULL) {
node->slaves = listCreate();
if (node->slaves == NULL) {
goto oom;
}

node->slaves->free = listClusterNodeDestructor;
}

if (node_old->slaves != NULL) {
node_old->slaves->free = NULL;
while (listLength(node_old->slaves) > 0) {
lnode = listFirst(node_old->slaves);
if (listAddNodeHead(node->slaves, lnode->value) == NULL) {
goto oom;
}
listDelNode(node_old->slaves, lnode);
}
listRelease(node_old->slaves);
node_old->slaves = NULL;
}

if (listAddNodeHead(node->slaves, node_old) == NULL) {
goto oom;
}
dictSetHashVal(*nodes, di, node);

} else if (node->role == VALKEY_ROLE_SLAVE) {
if (node_old->slaves == NULL) {
node_old->slaves = listCreate();
if (node_old->slaves == NULL) {
goto oom;
}

node_old->slaves->free = listClusterNodeDestructor;
}
if (listAddNodeTail(node_old->slaves, node) == NULL) {
goto oom;
}
}
}

return VALKEY_OK;

oom:
valkeyClusterSetError(cc, VALKEY_ERR_OOM, "Out of memory");
return VALKEY_ERR;
}

/**
* Parse the "cluster slots" command reply to nodes dict.
*/
Expand Down Expand Up @@ -790,6 +695,83 @@ static dict *parse_cluster_slots(valkeyClusterContext *cc, valkeyReply *reply) {
return NULL;
}

/* Store a replica node in a dict using the primary_id as key. Additional
* replicas for a primary are stored within the first added replica. */
static int store_replica_node(dict **replicas, valkeyClusterNode *node, char *primary_id) {
if (*replicas == NULL) {
*replicas = dictCreate(&clusterNodesRefDictType, NULL);
if (replicas == NULL)
return VALKEY_ERR;
}

sds key = sdsnew(primary_id);
if (key == NULL)
return VALKEY_ERR;

dictEntry *de = dictFind(*replicas, key);
if (de == NULL) {
if (dictAdd(*replicas, key, node) != DICT_OK) {
sdsfree(key);
return VALKEY_ERR;
}
return VALKEY_OK;
}

/* Store node in existing node. */
sdsfree(key);
valkeyClusterNode *n = dictGetEntryVal(de);
if (n->slaves == NULL) {
n->slaves = listCreate();
if (n->slaves == NULL)
return VALKEY_ERR;
n->slaves->free = listClusterNodeDestructor;
}
if (listAddNodeTail(n->slaves, node) == NULL)
return VALKEY_ERR;

return VALKEY_OK;
}

static int add_replica_nodes(dict *nodes, dict *replicas) {
dictIterator di;
dictInitIterator(&di, nodes);

dictEntry *de;
while ((de = dictNext(&di))) {
valkeyClusterNode *primary = dictGetEntryVal(de);

dictEntry *der = dictFind(replicas, primary->name);
if (der != NULL) {
if (primary->slaves == NULL) {
primary->slaves = listCreate();
if (primary->slaves == NULL) {
return VALKEY_ERR;
}
primary->slaves->free = listClusterNodeDestructor;
}

valkeyClusterNode *replica = dictGetEntryVal(der);
if (replica->slaves != NULL) {
replica->slaves->free = NULL;
while (listLength(replica->slaves) > 0) {
listNode *node = listFirst(replica->slaves);
if (listAddNodeHead(primary->slaves, node->value) == NULL) {
return VALKEY_ERR;
}
listDelNode(replica->slaves, node);
}
listRelease(replica->slaves);
replica->slaves = NULL;
}
if (listAddNodeHead(primary->slaves, replica) == NULL) {
return VALKEY_ERR;
}
}
}

return VALKEY_OK;
}

/* Parse a node from a single CLUSTER NODES line. Only parse primary nodes if
* the 'replica_master_id' argument is NULL, otherwise replicas are parsed and
* its master_id is given via 'replica_master_id'. */
Expand Down Expand Up @@ -950,9 +932,9 @@ static valkeyClusterNode *parse_cluster_nodes_line(valkeyClusterContext *cc, cha
*/
static dict *parse_cluster_nodes(valkeyClusterContext *cc, valkeyReply *reply) {
dict *nodes = NULL;
dict *nodes_name = NULL;
int slot_ranges_found = 0;
int add_replicas = cc->flags & VALKEYCLUSTER_FLAG_ADD_SLAVE;
dict *replicas = NULL;

if (reply->type != VALKEY_REPLY_STRING) {
valkeyClusterSetError(cc, VALKEY_ERR_OTHER, "Unexpected reply type");
Expand Down Expand Up @@ -999,20 +981,12 @@ static dict *parse_cluster_nodes(valkeyClusterContext *cc, valkeyReply *reply) {
}
slot_ranges_found += listLength(node->slots);

if (add_replicas) {
if (cluster_master_slave_mapping_with_name(cc, &nodes_name, node, node->name) != VALKEY_OK) {
goto error;
}
}
} else {
assert(node->role == VALKEY_ROLE_SLAVE);
sds id = sdsnew(master_id);
if (cluster_master_slave_mapping_with_name(cc, &nodes_name, node, id) != VALKEY_OK) {
if (store_replica_node(&replicas, node, master_id) != VALKEY_OK) {
freeValkeyClusterNode(node);
sdsfree(id);
goto error;
goto oom;
}
sdsfree(id);
}
}

Expand All @@ -1021,8 +995,11 @@ static dict *parse_cluster_nodes(valkeyClusterContext *cc, valkeyReply *reply) {
goto error;
}

if (nodes_name != NULL) {
dictRelease(nodes_name);
if (replicas) {
if (add_replica_nodes(nodes, replicas) != VALKEY_OK) {
goto oom;
}
dictRelease(replicas);
}

return nodes;
Expand All @@ -1035,8 +1012,8 @@ static dict *parse_cluster_nodes(valkeyClusterContext *cc, valkeyReply *reply) {
if (nodes != NULL) {
dictRelease(nodes);
}
if (nodes_name != NULL) {
dictRelease(nodes_name);
if (replicas != NULL) {
dictRelease(replicas);
}
return NULL;
}
Expand Down

0 comments on commit f208ac9

Please sign in to comment.