Skip to content

Commit

Permalink
Refactor replica handling in cluster nodes
Browse files Browse the repository at this point in the history
Signed-off-by: Björn Svensson <[email protected]>
  • Loading branch information
bjosv committed Oct 14, 2024
1 parent 87ddc82 commit e4c1371
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 130 deletions.
215 changes: 93 additions & 122 deletions src/cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -504,101 +504,6 @@ static void cluster_nodes_swap_ctx(dict *nodes_f, dict *nodes_t) {
}
}

static int cluster_master_slave_mapping_with_name(valkeyClusterContext *cc,
dict **nodes,
valkeyClusterNode *node,
sds master_name) {
int ret;
dictEntry *di;
valkeyClusterNode *node_old;
listNode *lnode;

if (node == NULL || master_name == NULL) {
return VALKEY_ERR;
}

if (*nodes == NULL) {
*nodes = dictCreate(&clusterNodesRefDictType, NULL);
if (*nodes == NULL) {
goto oom;
}
}

di = dictFind(*nodes, master_name);
if (di == NULL) {
sds key = sdsnewlen(master_name, sdslen(master_name));
if (key == NULL) {
goto oom;
}
ret = dictAdd(*nodes, key, node);
if (ret != DICT_OK) {
sdsfree(key);
goto oom;
}

} else {
node_old = dictGetEntryVal(di);
if (node_old == NULL) {
valkeyClusterSetError(cc, VALKEY_ERR_OTHER, "dict get value null");
return VALKEY_ERR;
}

if (node->role == VALKEY_ROLE_MASTER &&
node_old->role == VALKEY_ROLE_MASTER) {
valkeyClusterSetError(cc, VALKEY_ERR_OTHER,
"two masters have the same name");
return VALKEY_ERR;
} else if (node->role == VALKEY_ROLE_MASTER &&
node_old->role == VALKEY_ROLE_SLAVE) {
if (node->slaves == NULL) {
node->slaves = listCreate();
if (node->slaves == NULL) {
goto oom;
}

node->slaves->free = listClusterNodeDestructor;
}

if (node_old->slaves != NULL) {
while (listLength(node_old->slaves) > 0) {
lnode = listFirst(node_old->slaves);
if (listAddNodeHead(node->slaves, lnode->value) == NULL) {
goto oom;
}
node_old->slaves->free = NULL;
listDelNode(node_old->slaves, lnode);
}
listRelease(node_old->slaves);
node_old->slaves = NULL;
}

if (listAddNodeHead(node->slaves, node_old) == NULL) {
goto oom;
}
dictSetHashVal(*nodes, di, node);

} else if (node->role == VALKEY_ROLE_SLAVE) {
if (node_old->slaves == NULL) {
node_old->slaves = listCreate();
if (node_old->slaves == NULL) {
goto oom;
}

node_old->slaves->free = listClusterNodeDestructor;
}
if (listAddNodeTail(node_old->slaves, node) == NULL) {
goto oom;
}
}
}

return VALKEY_OK;

oom:
valkeyClusterSetError(cc, VALKEY_ERR_OOM, "Out of memory");
return VALKEY_ERR;
}

/**
* Parse the "cluster slots" command reply to nodes dict.
*/
Expand Down Expand Up @@ -784,6 +689,92 @@ static dict *parse_cluster_slots(valkeyClusterContext *cc, valkeyReply *reply) {
return NULL;
}

/* Store a parsed replica node in a dict using the primary_id as key.
* Additional replicas for a primary are stored within the first replica. */
static int store_replica_node(dict **replicas, char *primary_id, valkeyClusterNode *node) {
if (*replicas == NULL) {
*replicas = dictCreate(&clusterNodesDictType, NULL);
if (replicas == NULL)
return VALKEY_ERR;
}

sds key = sdsnew(primary_id);
if (key == NULL)
return VALKEY_ERR;

dictEntry *de = dictFind(*replicas, key);
if (de == NULL) {
if (dictAdd(*replicas, key, node) != DICT_OK) {
sdsfree(key);
return VALKEY_ERR;
}
return VALKEY_OK;
}

/* Store replica node in the existing replica node. */
sdsfree(key);
valkeyClusterNode *n = dictGetEntryVal(de);
if (n->slaves == NULL) {
n->slaves = listCreate();
if (n->slaves == NULL)
return VALKEY_ERR;
n->slaves->free = listClusterNodeDestructor;
}
if (listAddNodeTail(n->slaves, node) == NULL)
return VALKEY_ERR;

return VALKEY_OK;
}

/* Move parsed replica nodes from the collection to related primary. */
static int move_replica_nodes(dict *replicas, dict *nodes) {
if (replicas == NULL)
return VALKEY_OK;

dictIterator di;
dictInitIterator(&di, nodes);
dictEntry *de;
while ((de = dictNext(&di))) {
valkeyClusterNode *primary = dictGetEntryVal(de);

/* Move all replica nodes related to this primary. */
dictEntry *der = dictFind(replicas, primary->name);
if (der != NULL) {
if (primary->slaves == NULL) {
primary->slaves = listCreate();
if (primary->slaves == NULL) {
return VALKEY_ERR;
}
primary->slaves->free = listClusterNodeDestructor;
}

/* Move all replicas stored in the first parsed replica. */
valkeyClusterNode *replica = dictGetEntryVal(der);
if (replica->slaves != NULL) {
while (listLength(replica->slaves) > 0) {
listNode *node = listFirst(replica->slaves);
if (listAddNodeTail(primary->slaves, node->value) == NULL) {
return VALKEY_ERR;
}
/* Delete element without freeing the moved cluster node. */
replica->slaves->free = NULL;
listDelNode(replica->slaves, node);
replica->slaves->free = listClusterNodeDestructor;
}
listRelease(replica->slaves);
replica->slaves = NULL;
}
/* Move replica that was parsed first. */
if (listAddNodeHead(primary->slaves, replica) == NULL) {
return VALKEY_ERR;
}
/* Replicas moved, reset the dict data to avoid freeing. */
dictSetHashVal(replicas, der, NULL);
}
}
return VALKEY_OK;
}

/* Parse a node from a single CLUSTER NODES line. Only parse primary nodes if
* the 'replica_master_id' argument is NULL, otherwise replicas are parsed and
* its master_id is given via 'replica_master_id'. */
Expand Down Expand Up @@ -944,9 +935,9 @@ static int parse_cluster_nodes_line(valkeyClusterContext *cc, char *line,
*/
static dict *parse_cluster_nodes(valkeyClusterContext *cc, valkeyReply *reply) {
dict *nodes = NULL;
dict *nodes_name = NULL;
int slot_ranges_found = 0;
int add_replicas = cc->flags & VALKEYCLUSTER_FLAG_ADD_SLAVE;
dict *replicas = NULL;

if (reply->type != VALKEY_REPLY_STRING) {
valkeyClusterSetError(cc, VALKEY_ERR_OTHER, "Unexpected reply type");
Expand Down Expand Up @@ -991,24 +982,12 @@ static dict *parse_cluster_nodes(valkeyClusterContext *cc, valkeyReply *reply) {
}
slot_ranges_found += listLength(node->slots);

if (add_replicas) {
if (cluster_master_slave_mapping_with_name(cc, &nodes_name, node, node->name) != VALKEY_OK) {
goto error;
}
}
} else {
assert(node->role == VALKEY_ROLE_SLAVE);
sds id = sdsnew(master_id);
if (id == NULL) {
if (store_replica_node(&replicas, master_id, node) != VALKEY_OK) {
freeValkeyClusterNode(node);
goto oom;
}
if (cluster_master_slave_mapping_with_name(cc, &nodes_name, node, id) != VALKEY_OK) {
freeValkeyClusterNode(node);
sdsfree(id);
goto error;
}
sdsfree(id);
}
}

Expand All @@ -1017,7 +996,10 @@ static dict *parse_cluster_nodes(valkeyClusterContext *cc, valkeyReply *reply) {
goto error;
}

dictRelease(nodes_name);
if (move_replica_nodes(replicas, nodes) != VALKEY_OK) {
goto oom;
}
dictRelease(replicas);

return nodes;

Expand All @@ -1026,18 +1008,7 @@ static dict *parse_cluster_nodes(valkeyClusterContext *cc, valkeyReply *reply) {
// passthrough

error:
if (nodes_name != NULL) {
/* Only free parsed replicas since the `nodes` dict owns primary nodes. */
dictIterator di;
dictInitIterator(&di, nodes_name);
dictEntry *de;
while ((de = dictNext(&di))) {
valkeyClusterNode *node = dictGetEntryVal(de);
if (node->role == VALKEY_ROLE_SLAVE)
freeValkeyClusterNode(node);
}
dictRelease(nodes_name);
}
dictRelease(replicas);
dictRelease(nodes);
return NULL;
}
Expand Down
8 changes: 4 additions & 4 deletions tests/ct_out_of_memory_handling.c
Original file line number Diff line number Diff line change
Expand Up @@ -171,14 +171,14 @@ void test_alloc_failure_handling(void) {

// Connect
{
for (int i = 0; i < 91; ++i) {
for (int i = 0; i < 88; ++i) {
prepare_allocation_test(cc, i);
result = valkeyClusterConnect2(cc);
assert(result == VALKEY_ERR);
ASSERT_STR_EQ(cc->errstr, "Out of memory");
}

prepare_allocation_test(cc, 91);
prepare_allocation_test(cc, 88);
result = valkeyClusterConnect2(cc);
assert(result == VALKEY_OK);
}
Expand Down Expand Up @@ -521,14 +521,14 @@ void test_alloc_failure_handling_async(void) {

// Connect
{
for (int i = 0; i < 89; ++i) {
for (int i = 0; i < 86; ++i) {
prepare_allocation_test(acc->cc, i);
result = valkeyClusterConnect2(acc->cc);
assert(result == VALKEY_ERR);
ASSERT_STR_EQ(acc->cc->errstr, "Out of memory");
}

prepare_allocation_test(acc->cc, 89);
prepare_allocation_test(acc->cc, 86);
result = valkeyClusterConnect2(acc->cc);
assert(result == VALKEY_OK);
}
Expand Down
8 changes: 4 additions & 4 deletions tests/ut_slotmap_update.c
Original file line number Diff line number Diff line change
Expand Up @@ -289,14 +289,14 @@ void test_parse_cluster_nodes_with_multiple_replicas(void) {
assert(strcmp(node->addr, "127.0.0.1:30004") == 0);
assert(node->role == VALKEY_ROLE_SLAVE);
node = listNodeValue(listNext(&li));
assert(strcmp(node->name, "824fe116063bc5fcf9f4ffd895bc17aee7731ac3") == 0);
assert(strcmp(node->addr, "127.0.0.1:30006") == 0);
assert(node->role == VALKEY_ROLE_SLAVE);
node = listNodeValue(listNext(&li));
assert(strcmp(node->name, "6ec23923021cf3ffec47632106199cb7f496ce01") == 0);
assert(strcmp(node->addr, "127.0.0.1:30005") == 0);
assert(node->role == VALKEY_ROLE_SLAVE);
node = listNodeValue(listNext(&li));
assert(strcmp(node->name, "824fe116063bc5fcf9f4ffd895bc17aee7731ac3") == 0);
assert(strcmp(node->addr, "127.0.0.1:30006") == 0);
assert(node->role == VALKEY_ROLE_SLAVE);
node = listNodeValue(listNext(&li));
assert(strcmp(node->name, "67ed2db8d677e59ec4a4cefb06858cf2a1a89fa1") == 0);
assert(strcmp(node->addr, "127.0.0.1:30002") == 0);
assert(node->role == VALKEY_ROLE_SLAVE);
Expand Down

0 comments on commit e4c1371

Please sign in to comment.