diff --git a/src/cluster.c b/src/cluster.c index c906d19..b0c6f09 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -463,74 +463,6 @@ static valkeyClusterNode *node_get_with_slots(valkeyClusterContext *cc, return NULL; } -/** - * Return a new node with the "cluster nodes" command reply. - */ -static valkeyClusterNode *node_get_with_nodes(valkeyClusterContext *cc, - sds *node_infos, int info_count, - uint8_t role) { - char *p = NULL; - valkeyClusterNode *node = NULL; - - if (info_count < 8) { - return NULL; - } - - node = createValkeyClusterNode(); - if (node == NULL) { - goto oom; - } - - if (role == VALKEY_ROLE_MASTER) { - node->slots = listCreate(); - if (node->slots == NULL) { - goto oom; - } - - node->slots->free = listClusterSlotDestructor; - } - - /* Handle field */ - node->name = node_infos[0]; - node_infos[0] = NULL; /* Ownership moved */ - - /* Handle field - * Remove @cport... since addr is used as a dict key which should be : */ - if ((p = strchr(node_infos[1], PORT_CPORT_SEPARATOR)) != NULL) { - sdsrange(node_infos[1], 0, p - node_infos[1] - 1 /* skip @ */); - } - node->addr = node_infos[1]; - node_infos[1] = NULL; /* Ownership moved */ - - node->role = role; - - /* Get the ip part */ - if ((p = strrchr(node->addr, IP_PORT_SEPARATOR)) == NULL) { - valkeyClusterSetError( - cc, VALKEY_ERR_OTHER, - "server address is incorrect, port separator missing."); - goto error; - } - node->host = sdsnewlen(node->addr, p - node->addr); - if (node->host == NULL) { - goto oom; - } - p++; // remove found separator character - - /* Get the port part */ - node->port = vk_atoi(p, strlen(p)); - - return node; - -oom: - valkeyClusterSetError(cc, VALKEY_ERR_OOM, "Out of memory"); - // passthrough - -error: - freeValkeyClusterNode(node); - return NULL; -} - static void cluster_nodes_swap_ctx(dict *nodes_f, dict *nodes_t) { dictEntry *de_f, *de_t; valkeyClusterNode *node_f, *node_t; @@ -858,23 +790,154 @@ static dict *parse_cluster_slots(valkeyClusterContext *cc, valkeyReply *reply) { return NULL; } +/* Parse a node from a single CLUSTER NODES line. Only parse master nodes unless + * the 'replica_master_id' argument is not NULL. */ +static valkeyClusterNode *parse_cluster_nodes_line(char *line, char **replica_master_id) { + char *p, *id = NULL, *addr = NULL, *flags = NULL, *master_id = NULL, + *ping_sent = NULL, *pong_recv = NULL, *config_epoch = NULL, + *link_state = NULL, *slots = NULL; + // Start by finding all fields. + // clang-format off + int i = 0; + while ((p = strchr(line, ' ')) != NULL) { + *p = '\0'; + switch(i++){ + case 0: id = line; break; + case 1: addr = line; break; + case 2: flags = line; break; + case 3: master_id = line; break; + case 4: ping_sent = line; break; + case 5: pong_recv = line; break; + case 6: config_epoch = line; break; + case 7: link_state = line; break; + } + line = p + 1; /* Start of next field. */ + if (i == 8) { slots = line; break; } + } + if (i == 7 && line[0] != '\0') link_state = line; + // clang-format on + + UNUSED(ping_sent); + UNUSED(pong_recv); + UNUSED(config_epoch); + + if (link_state == NULL) /* Mandatory field missing */ + return NULL; + + /* Parse flags. */ + uint8_t role = VALKEY_ROLE_NULL; + while (*flags != '\0') { + if ((p = strchr(flags, ',')) != NULL) + *p = '\0'; + if (memcmp(flags, "master", 6) == 0) { + role = VALKEY_ROLE_MASTER; + break; + } + if (memcmp(flags, "slave", 5) == 0) { + role = VALKEY_ROLE_SLAVE; + break; + } + if (p == NULL) /* No more flags. */ + break; + flags = p + 1; /* Start of next flag. */ + } + if (role == VALKEY_ROLE_NULL) /* Role missing. */ + return NULL; + + /* Only parse slaves when requested. */ + if (role == VALKEY_ROLE_SLAVE && replica_master_id == NULL) + return NULL; + + valkeyClusterNode *node = createValkeyClusterNode(); + if (node == NULL) { + return NULL; // goto oom; TODO: return errorcode + } + node->name = sdsnew(id); + node->role = role; + + /* Handle field + * Remove @cport... since addr is used as a dict key which should be : */ + if ((p = strchr(addr, PORT_CPORT_SEPARATOR)) != NULL) { + *p = '\0'; + } + node->addr = sdsnew(addr); + /* Get the ip part */ + if ((p = strrchr(addr, IP_PORT_SEPARATOR)) == NULL) { + // handle error VALKEY_ERR_OTHER + /* "server address is incorrect, port separator missing."); */ + freeValkeyClusterNode(node); + return NULL; + } + *p = '\0'; + /* Skip nodes where address starts with ":0", i.e. 'noaddr'. */ + if (strlen(addr) == 0) { + freeValkeyClusterNode(node); + return NULL; // NO ERROR, just skip + } + node->host = sdsnew(addr); + p++; // Skip found ip-port separator character. + node->port = vk_atoi(p, strlen(p)); + + /* No slot information in replicas but return master id */ + if (node->role == VALKEY_ROLE_SLAVE) { + *replica_master_id = master_id; + return node; + } + + node->slots = listCreate(); + if (node->slots == NULL) { + freeValkeyClusterNode(node); + return NULL; /// TODO goto oom; + } + node->slots->free = listClusterSlotDestructor; + + /* Parse slots when available. */ + if (slots == NULL) + return node; + while (*slots != '\0') { + if ((p = strchr(slots, ' ')) != NULL) + *p = '\0'; + char *entry = slots; + if (entry[0] == '[') + break; // Skip importing/migrating slots at string end. + + int slot_start, slot_end; + char *sp = strchr(entry, '-'); + if (sp == NULL) { + slot_start = vk_atoi(entry, strlen(entry)); + slot_end = slot_start; + } else { + *sp = '\0'; + slot_start = vk_atoi(entry, strlen(entry)); + entry = sp + 1; // Skip '-' + slot_end = vk_atoi(entry, strlen(entry)); + } + + /* Create a slot entry owned by the node. */ + cluster_slot *slot = cluster_slot_create(node); + if (slot == NULL) { + freeValkeyClusterNode(node); + return NULL; /// TODO goto oom; + } + slot->start = (uint32_t)slot_start; + slot->end = (uint32_t)slot_end; + + if (p == NULL) /* No more entries. */ + break; + slots = p + 1; /* Start of next entry. */ + } + + return node; +} + /** * Parse the "cluster nodes" command reply to nodes dict. */ static dict *parse_cluster_nodes(valkeyClusterContext *cc, valkeyReply *reply) { - int ret; dict *nodes = NULL; dict *nodes_name = NULL; - valkeyClusterNode *master, *slave; - cluster_slot *slot; - char *pos, *start, *end, *line_start, *line_end; - char *role; - int role_len; - int slot_start, slot_end, slot_ranges_found = 0; - sds *part = NULL, *slot_start_end = NULL; - int count_part = 0, count_slot_start_end = 0; - int k; - int len; + int slot_ranges_found = 0; + int add_replicas = cc->flags & VALKEYCLUSTER_FLAG_ADD_SLAVE; if (reply->type != VALKEY_REPLY_STRING) { valkeyClusterSetError(cc, VALKEY_ERR_OTHER, "Unexpected reply type"); @@ -886,149 +949,53 @@ static dict *parse_cluster_nodes(valkeyClusterContext *cc, valkeyReply *reply) { goto oom; } - start = reply->str; - end = start + reply->len; - - line_start = start; + char *lines = reply->str; /* NULL terminated string. */ + char *p, *line; + while ((p = strchr(lines, '\n')) != NULL) { + *p = '\0'; + line = lines; + lines = p + 1; /* Start of next line. */ - for (pos = start; pos < end; pos++) { - if (*pos == '\n') { - line_end = pos - 1; - len = line_end - line_start; - - part = sdssplitlen(line_start, len + 1, " ", 1, &count_part); - if (part == NULL) { + char *master_id; + valkeyClusterNode *node = parse_cluster_nodes_line(line, add_replicas ? &master_id : NULL); + if (node == NULL) { + // TODO; handle error + continue; + } + if (node->role == VALKEY_ROLE_MASTER) { + sds key = sdsnew(node->addr); + if (key == NULL) { + freeValkeyClusterNode(node); goto oom; } - - if (count_part < 8) { + if (dictFind(nodes, key) != NULL) { valkeyClusterSetError(cc, VALKEY_ERR_OTHER, - "split cluster nodes error"); + "Duplicate addresses in cluster nodes response"); + sdsfree(key); + freeValkeyClusterNode(node); goto error; } - - // if the address string starts with ":0", skip this node. - if (sdslen(part[1]) >= 2 && memcmp(part[1], ":0", 2) == 0) { - sdsfreesplitres(part, count_part); - count_part = 0; - part = NULL; - - start = pos + 1; - line_start = start; - pos = start; - - continue; - } - - if (sdslen(part[2]) >= 7 && memcmp(part[2], "myself,", 7) == 0) { - role_len = sdslen(part[2]) - 7; - role = part[2] + 7; - } else { - role_len = sdslen(part[2]); - role = part[2]; + if (dictAdd(nodes, key, node) != DICT_OK) { + sdsfree(key); + freeValkeyClusterNode(node); + goto oom; } + slot_ranges_found += listLength(node->slots); - // add master node - if (role_len >= 6 && memcmp(role, "master", 6) == 0) { - master = node_get_with_nodes(cc, part, count_part, - VALKEY_ROLE_MASTER); - if (master == NULL) { - goto error; - } - - sds key = sdsnewlen(master->addr, sdslen(master->addr)); - if (key == NULL) { - freeValkeyClusterNode(master); - goto oom; - } - if (dictFind(nodes, key) != NULL) { - valkeyClusterSetError(cc, VALKEY_ERR_OTHER, - "Duplicate addresses in cluster nodes response"); - sdsfree(key); - freeValkeyClusterNode(master); + if (add_replicas) { + if (cluster_master_slave_mapping_with_name(cc, &nodes_name, node, node->name) != VALKEY_OK) { goto error; } - if (dictAdd(nodes, key, master) != DICT_OK) { - sdsfree(key); - freeValkeyClusterNode(master); - goto oom; - } - - if (cc->flags & VALKEYCLUSTER_FLAG_ADD_SLAVE) { - ret = cluster_master_slave_mapping_with_name( - cc, &nodes_name, master, master->name); - if (ret != VALKEY_OK) { - freeValkeyClusterNode(master); - goto error; - } - } - - for (k = 8; k < count_part; k++) { - slot_start_end = sdssplitlen(part[k], sdslen(part[k]), "-", - 1, &count_slot_start_end); - if (slot_start_end == NULL) { - goto oom; - } - - if (count_slot_start_end == 1) { - slot_start = vk_atoi(slot_start_end[0], - sdslen(slot_start_end[0])); - slot_end = slot_start; - } else if (count_slot_start_end == 2) { - slot_start = vk_atoi(slot_start_end[0], - sdslen(slot_start_end[0])); - slot_end = vk_atoi(slot_start_end[1], - sdslen(slot_start_end[1])); - } else { - slot_start = -1; - slot_end = -1; - } - - sdsfreesplitres(slot_start_end, count_slot_start_end); - count_slot_start_end = 0; - slot_start_end = NULL; - - if (slot_start < 0 || slot_end < 0 || - slot_start > slot_end || - slot_end >= VALKEYCLUSTER_SLOTS) { - continue; - } - slot_ranges_found += 1; - - slot = cluster_slot_create(master); - if (slot == NULL) { - goto oom; - } - - slot->start = (uint32_t)slot_start; - slot->end = (uint32_t)slot_end; - } - } - // add slave node - else if ((cc->flags & VALKEYCLUSTER_FLAG_ADD_SLAVE) && - (role_len >= 5 && memcmp(role, "slave", 5) == 0)) { - slave = node_get_with_nodes(cc, part, count_part, - VALKEY_ROLE_SLAVE); - if (slave == NULL) { - goto error; - } - - ret = cluster_master_slave_mapping_with_name(cc, &nodes_name, - slave, part[3]); - if (ret != VALKEY_OK) { - freeValkeyClusterNode(slave); - goto error; - } + } else { + assert(node->role == VALKEY_ROLE_SLAVE); + sds id = sdsnew(master_id); + if (cluster_master_slave_mapping_with_name(cc, &nodes_name, node, id) != VALKEY_OK) { + freeValkeyClusterNode(node); + sdsfree(id); + goto error; } - - sdsfreesplitres(part, count_part); - count_part = 0; - part = NULL; - - start = pos + 1; - line_start = start; - pos = start; + sdsfree(id); } } @@ -1048,8 +1015,6 @@ static dict *parse_cluster_nodes(valkeyClusterContext *cc, valkeyReply *reply) { // passthrough error: - sdsfreesplitres(part, count_part); - sdsfreesplitres(slot_start_end, count_slot_start_end); if (nodes != NULL) { dictRelease(nodes); } diff --git a/tests/ct_out_of_memory_handling.c b/tests/ct_out_of_memory_handling.c index 86c023e..4a87f43 100644 --- a/tests/ct_out_of_memory_handling.c +++ b/tests/ct_out_of_memory_handling.c @@ -170,14 +170,18 @@ void test_alloc_failure_handling(void) { // Connect { - for (int i = 0; i < 128; ++i) { + for (int i = 0; i < 62; ++i) { prepare_allocation_test(cc, i); result = valkeyClusterConnect2(cc); assert(result == VALKEY_ERR); - ASSERT_STR_EQ(cc->errstr, "Out of memory"); + if (i > 32 && i < 40) { + ASSERT_STR_EQ(cc->errstr, "No slot information"); + } else { + ASSERT_STR_EQ(cc->errstr, "Out of memory"); + } } - prepare_allocation_test(cc, 128); + prepare_allocation_test(cc, 62); result = valkeyClusterConnect2(cc); assert(result == VALKEY_OK); } @@ -519,14 +523,18 @@ void test_alloc_failure_handling_async(void) { // Connect { - for (int i = 0; i < 126; ++i) { + for (int i = 0; i < 60; ++i) { prepare_allocation_test(acc->cc, i); result = valkeyClusterConnect2(acc->cc); assert(result == VALKEY_ERR); - ASSERT_STR_EQ(acc->cc->errstr, "Out of memory"); + if (i > 30 && i < 38) { + ASSERT_STR_EQ(acc->cc->errstr, "No slot information"); + } else { + ASSERT_STR_EQ(acc->cc->errstr, "Out of memory"); + } } - prepare_allocation_test(acc->cc, 126); + prepare_allocation_test(acc->cc, 60); result = valkeyClusterConnect2(acc->cc); assert(result == VALKEY_OK); }