Skip to content

Commit

Permalink
Refactor internal function parse_cluster_nodes
Browse files Browse the repository at this point in the history
Signed-off-by: Björn Svensson <[email protected]>
  • Loading branch information
bjosv committed Oct 6, 2024
1 parent b5feed6 commit 4082ac8
Showing 1 changed file with 119 additions and 160 deletions.
279 changes: 119 additions & 160 deletions src/cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -466,43 +466,24 @@ static valkeyClusterNode *node_get_with_slots(valkeyClusterContext *cc,
/**
* Return a new node with the "cluster nodes" command reply.
*/
static valkeyClusterNode *node_get_with_nodes(valkeyClusterContext *cc,
sds *node_infos, int info_count,
uint8_t role) {
char *p = NULL;
valkeyClusterNode *node = NULL;

if (info_count < 8) {
return NULL;
}

node = createValkeyClusterNode();
static valkeyClusterNode *node_get_with_nodes(valkeyClusterContext *cc, sds *fields) {
valkeyClusterNode *node = createValkeyClusterNode();
if (node == NULL) {
goto oom;
}

if (role == VALKEY_ROLE_MASTER) {
node->slots = listCreate();
if (node->slots == NULL) {
goto oom;
}

node->slots->free = listClusterSlotDestructor;
}

/* Handle field <id> */
node->name = node_infos[0];
node_infos[0] = NULL; /* Ownership moved */
node->name = fields[0];
fields[0] = NULL; /* Ownership moved */

/* Handle field <ip:port@cport...>
* Remove @cport... since addr is used as a dict key which should be <ip>:<port> */
if ((p = strchr(node_infos[1], PORT_CPORT_SEPARATOR)) != NULL) {
sdsrange(node_infos[1], 0, p - node_infos[1] - 1 /* skip @ */);
char *p;
if ((p = strchr(fields[1], PORT_CPORT_SEPARATOR)) != NULL) {
sdsrange(fields[1], 0, p - fields[1] - 1 /* skip @ */);
}
node->addr = node_infos[1];
node_infos[1] = NULL; /* Ownership moved */

node->role = role;
node->addr = fields[1];
fields[1] = NULL; /* Ownership moved */

/* Get the ip part */
if ((p = strrchr(node->addr, IP_PORT_SEPARATOR)) == NULL) {
Expand Down Expand Up @@ -862,19 +843,11 @@ static dict *parse_cluster_slots(valkeyClusterContext *cc, valkeyReply *reply) {
* Parse the "cluster nodes" command reply to nodes dict.
*/
static dict *parse_cluster_nodes(valkeyClusterContext *cc, valkeyReply *reply) {
int ret;
dict *nodes = NULL;
sds *fields = NULL;
int numfields = 0;
dict *nodes_name = NULL;
valkeyClusterNode *master, *slave;
cluster_slot *slot;
char *pos, *start, *end, *line_start, *line_end;
char *role;
int role_len;
int slot_start, slot_end, slot_ranges_found = 0;
sds *part = NULL, *slot_start_end = NULL;
int count_part = 0, count_slot_start_end = 0;
int k;
int len;
int slot_ranges_found = 0;

if (reply->type != VALKEY_REPLY_STRING) {
valkeyClusterSetError(cc, VALKEY_ERR_OTHER, "Unexpected reply type");
Expand All @@ -886,150 +859,137 @@ static dict *parse_cluster_nodes(valkeyClusterContext *cc, valkeyReply *reply) {
goto oom;
}

start = reply->str;
end = start + reply->len;
char *lines = reply->str; /* NULL-terminated. */
char *p, *line;
while ((p = strstr(lines, "\n")) != NULL) {
*p = '\0';
line = lines;
lines = p + 1; /* Start of next line. */
int len = p - line;

line_start = start;
fields = sdssplitlen(line, len, " ", 1, &numfields);
if (fields == NULL) {
goto oom;
}
if (numfields < 8) {
valkeyClusterSetError(cc, VALKEY_ERR_OTHER,
"split cluster nodes error");
goto error;
}

for (pos = start; pos < end; pos++) {
if (*pos == '\n') {
line_end = pos - 1;
len = line_end - line_start;
/* Skip nodes where address starts with ":0", i.e. 'noaddr'. */
if (sdslen(fields[1]) >= 2 && memcmp(fields[1], ":0", 2) == 0) {
sdsfreesplitres(fields, numfields);
fields = NULL;
continue;
}

part = sdssplitlen(line_start, len + 1, " ", 1, &count_part);
if (part == NULL) {
goto oom;
}
char *role;
int role_len;
if (sdslen(fields[2]) >= 7 && memcmp(fields[2], "myself,", 7) == 0) {
role_len = sdslen(fields[2]) - 7;
role = fields[2] + 7;
} else {
role_len = sdslen(fields[2]);
role = fields[2];
}

if (count_part < 8) {
valkeyClusterSetError(cc, VALKEY_ERR_OTHER,
"split cluster nodes error");
/* Add master node. */
if (role_len >= 6 && memcmp(role, "master", 6) == 0) {
valkeyClusterNode *master = node_get_with_nodes(cc, fields);
if (master == NULL) {
goto error;
}
master->role = VALKEY_ROLE_MASTER;

// if the address string starts with ":0", skip this node.
if (sdslen(part[1]) >= 2 && memcmp(part[1], ":0", 2) == 0) {
sdsfreesplitres(part, count_part);
count_part = 0;
part = NULL;

start = pos + 1;
line_start = start;
pos = start;

continue;
sds key = sdsnewlen(master->addr, sdslen(master->addr));
if (key == NULL) {
freeValkeyClusterNode(master);
goto oom;
}

if (sdslen(part[2]) >= 7 && memcmp(part[2], "myself,", 7) == 0) {
role_len = sdslen(part[2]) - 7;
role = part[2] + 7;
} else {
role_len = sdslen(part[2]);
role = part[2];
if (dictFind(nodes, key) != NULL) {
valkeyClusterSetError(cc, VALKEY_ERR_OTHER,
"Duplicate addresses in cluster nodes response");
sdsfree(key);
freeValkeyClusterNode(master);
goto error;
}
if (dictAdd(nodes, key, master) != DICT_OK) {
sdsfree(key);
freeValkeyClusterNode(master);
goto oom;
}

// add master node
if (role_len >= 6 && memcmp(role, "master", 6) == 0) {
master = node_get_with_nodes(cc, part, count_part,
VALKEY_ROLE_MASTER);
if (master == NULL) {
goto error;
}

sds key = sdsnewlen(master->addr, sdslen(master->addr));
if (key == NULL) {
freeValkeyClusterNode(master);
goto oom;
}
if (dictFind(nodes, key) != NULL) {
valkeyClusterSetError(cc, VALKEY_ERR_OTHER,
"Duplicate addresses in cluster nodes response");
sdsfree(key);
if (cc->flags & VALKEYCLUSTER_FLAG_ADD_SLAVE) {
if (cluster_master_slave_mapping_with_name(cc, &nodes_name, master, master->name) != VALKEY_OK) {
freeValkeyClusterNode(master);
goto error;
}
if (dictAdd(nodes, key, master) != DICT_OK) {
sdsfree(key);
freeValkeyClusterNode(master);
goto oom;
}
}

if (cc->flags & VALKEYCLUSTER_FLAG_ADD_SLAVE) {
ret = cluster_master_slave_mapping_with_name(
cc, &nodes_name, master, master->name);
if (ret != VALKEY_OK) {
freeValkeyClusterNode(master);
goto error;
}
/* Add slots ranges */
master->slots = listCreate();
if (master->slots == NULL) {
goto oom;
}
master->slots->free = listClusterSlotDestructor;
for (int k = 8; k < numfields; k++) {
int count_slot_start_end;
sds *slot_start_end = sdssplitlen(fields[k], sdslen(fields[k]), "-",
1, &count_slot_start_end);
if (slot_start_end == NULL) {
goto oom;
}

for (k = 8; k < count_part; k++) {
slot_start_end = sdssplitlen(part[k], sdslen(part[k]), "-",
1, &count_slot_start_end);
if (slot_start_end == NULL) {
goto oom;
}

if (count_slot_start_end == 1) {
slot_start = vk_atoi(slot_start_end[0],
sdslen(slot_start_end[0]));
slot_end = slot_start;
} else if (count_slot_start_end == 2) {
slot_start = vk_atoi(slot_start_end[0],
sdslen(slot_start_end[0]));
slot_end = vk_atoi(slot_start_end[1],
sdslen(slot_start_end[1]));
} else {
slot_start = -1;
slot_end = -1;
}

sdsfreesplitres(slot_start_end, count_slot_start_end);
count_slot_start_end = 0;
slot_start_end = NULL;

if (slot_start < 0 || slot_end < 0 ||
slot_start > slot_end ||
slot_end >= VALKEYCLUSTER_SLOTS) {
continue;
}
slot_ranges_found += 1;

slot = cluster_slot_create(master);
if (slot == NULL) {
goto oom;
}

slot->start = (uint32_t)slot_start;
slot->end = (uint32_t)slot_end;
int slot_start, slot_end;
if (count_slot_start_end == 1) {
slot_start = vk_atoi(slot_start_end[0],
sdslen(slot_start_end[0]));
slot_end = slot_start;
} else if (count_slot_start_end == 2) {
slot_start = vk_atoi(slot_start_end[0],
sdslen(slot_start_end[0]));
slot_end = vk_atoi(slot_start_end[1],
sdslen(slot_start_end[1]));
} else {
slot_start = -1;
slot_end = -1;
}
sdsfreesplitres(slot_start_end, count_slot_start_end);

}
// add slave node
else if ((cc->flags & VALKEYCLUSTER_FLAG_ADD_SLAVE) &&
(role_len >= 5 && memcmp(role, "slave", 5) == 0)) {
slave = node_get_with_nodes(cc, part, count_part,
VALKEY_ROLE_SLAVE);
if (slave == NULL) {
goto error;
if (slot_start < 0 || slot_end < 0 ||
slot_start > slot_end ||
slot_end >= VALKEYCLUSTER_SLOTS) {
continue;
}
slot_ranges_found += 1;

ret = cluster_master_slave_mapping_with_name(cc, &nodes_name,
slave, part[3]);
if (ret != VALKEY_OK) {
freeValkeyClusterNode(slave);
goto error;
cluster_slot *slot = cluster_slot_create(master); /* master owns memory */
if (slot == NULL) {
goto oom;
}
slot->start = (uint32_t)slot_start;
slot->end = (uint32_t)slot_end;
}

sdsfreesplitres(part, count_part);
count_part = 0;
part = NULL;
}
// add slave node
else if ((cc->flags & VALKEYCLUSTER_FLAG_ADD_SLAVE) &&
(role_len >= 5 && memcmp(role, "slave", 5) == 0)) {
valkeyClusterNode *slave = node_get_with_nodes(cc, fields);
if (slave == NULL) {
goto error;
}
slave->role = VALKEY_ROLE_SLAVE;

start = pos + 1;
line_start = start;
pos = start;
if (cluster_master_slave_mapping_with_name(cc, &nodes_name, slave, fields[3]) != VALKEY_OK) {
freeValkeyClusterNode(slave);
goto error;
}
}

sdsfreesplitres(fields, numfields);
fields = NULL;
}

if (slot_ranges_found == 0) {
Expand All @@ -1048,8 +1008,7 @@ static dict *parse_cluster_nodes(valkeyClusterContext *cc, valkeyReply *reply) {
// passthrough

error:
sdsfreesplitres(part, count_part);
sdsfreesplitres(slot_start_end, count_slot_start_end);
sdsfreesplitres(fields, numfields);
if (nodes != NULL) {
dictRelease(nodes);
}
Expand Down

0 comments on commit 4082ac8

Please sign in to comment.