Skip to content

Commit

Permalink
Refactor slotmap update functions (valkey-io#107)
Browse files Browse the repository at this point in the history
- Merge `handleClusterSlotsReply` and `handleClusterNodesReply` into
`clusterUpdateRouteHandleReply`.
- Validate the reply in `parse_cluster_slots` and `parse_cluster_nodes`
- Remove redundant argument `flags` in `parse_cluster_slots` and
`parse_cluster_nodes`
- Use error strings from the standalone context in both the send
function (`clusterUpdateRouteSendCommand`) and the response function
(`handleClusterSlotsReply`); making sure that specific error strings are
used (like OOM).

Signed-off-by: Björn Svensson <[email protected]>
  • Loading branch information
bjosv authored Oct 4, 2024
1 parent 952b59b commit 1d9e49c
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 95 deletions.
141 changes: 46 additions & 95 deletions src/cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -672,8 +672,7 @@ static int cluster_master_slave_mapping_with_name(valkeyClusterContext *cc,
/**
* Parse the "cluster slots" command reply to nodes dict.
*/
static dict *parse_cluster_slots(valkeyClusterContext *cc, valkeyReply *reply,
int flags) {
static dict *parse_cluster_slots(valkeyClusterContext *cc, valkeyReply *reply) {
int ret;
cluster_slot *slot = NULL;
dict *nodes = NULL;
Expand All @@ -685,22 +684,20 @@ static dict *parse_cluster_slots(valkeyClusterContext *cc, valkeyReply *reply,
valkeyClusterNode *master = NULL, *slave;
uint32_t i, idx;

if (reply == NULL) {
return NULL;
if (reply->type != VALKEY_REPLY_ARRAY) {
valkeyClusterSetError(cc, VALKEY_ERR_OTHER, "Unexpected reply type");
goto error;
}
if (reply->elements == 0) {
valkeyClusterSetError(cc, VALKEY_ERR_OTHER, "No slot information");
goto error;
}

nodes = dictCreate(&clusterNodesDictType, NULL);
if (nodes == NULL) {
goto oom;
}

if (reply->type != VALKEY_REPLY_ARRAY || reply->elements <= 0) {
valkeyClusterSetError(cc, VALKEY_ERR_OTHER,
"Command(cluster slots) reply error: "
"reply is not an array.");
goto error;
}

for (i = 0; i < reply->elements; i++) {
elem_slots = reply->element[i];
if (elem_slots->type != VALKEY_REPLY_ARRAY ||
Expand Down Expand Up @@ -819,7 +816,7 @@ static dict *parse_cluster_slots(valkeyClusterContext *cc, valkeyReply *reply,
}

slot = NULL;
} else if (flags & VALKEYCLUSTER_FLAG_ADD_SLAVE) {
} else if (cc->flags & VALKEYCLUSTER_FLAG_ADD_SLAVE) {
slave = node_get_with_slots(cc, elem_ip, elem_port,
VALKEY_ROLE_SLAVE);
if (slave == NULL) {
Expand Down Expand Up @@ -864,8 +861,7 @@ static dict *parse_cluster_slots(valkeyClusterContext *cc, valkeyReply *reply,
/**
* Parse the "cluster nodes" command reply to nodes dict.
*/
static dict *parse_cluster_nodes(valkeyClusterContext *cc, char *str, int str_len,
int flags) {
static dict *parse_cluster_nodes(valkeyClusterContext *cc, valkeyReply *reply) {
int ret;
dict *nodes = NULL;
dict *nodes_name = NULL;
Expand All @@ -880,13 +876,18 @@ static dict *parse_cluster_nodes(valkeyClusterContext *cc, char *str, int str_le
int k;
int len;

if (reply->type != VALKEY_REPLY_STRING) {
valkeyClusterSetError(cc, VALKEY_ERR_OTHER, "Unexpected reply type");
goto error;
}

nodes = dictCreate(&clusterNodesDictType, NULL);
if (nodes == NULL) {
goto oom;
}

start = str;
end = start + str_len;
start = reply->str;
end = start + reply->len;

line_start = start;

Expand Down Expand Up @@ -940,19 +941,20 @@ static dict *parse_cluster_nodes(valkeyClusterContext *cc, char *str, int str_le
freeValkeyClusterNode(master);
goto oom;
}

ret = dictAdd(nodes, key, master);
if (ret != DICT_OK) {
// Key already exists, but possibly an OOM error
valkeyClusterSetError(
cc, VALKEY_ERR_OTHER,
"The address already exists in the nodes");
if (dictFind(nodes, key) != NULL) {
valkeyClusterSetError(cc, VALKEY_ERR_OTHER,
"Duplicate addresses in cluster nodes response");
sdsfree(key);
freeValkeyClusterNode(master);
goto error;
}
if (dictAdd(nodes, key, master) != DICT_OK) {
sdsfree(key);
freeValkeyClusterNode(master);
goto oom;
}

if (flags & VALKEYCLUSTER_FLAG_ADD_SLAVE) {
if (cc->flags & VALKEYCLUSTER_FLAG_ADD_SLAVE) {
ret = cluster_master_slave_mapping_with_name(
cc, &nodes_name, master, master->name);
if (ret != VALKEY_OK) {
Expand Down Expand Up @@ -1004,7 +1006,7 @@ static dict *parse_cluster_nodes(valkeyClusterContext *cc, char *str, int str_le

}
// add slave node
else if ((flags & VALKEYCLUSTER_FLAG_ADD_SLAVE) &&
else if ((cc->flags & VALKEYCLUSTER_FLAG_ADD_SLAVE) &&
(role_len >= 5 && memcmp(role, "slave", 5) == 0)) {
slave = node_get_with_nodes(cc, part, count_part,
VALKEY_ROLE_SLAVE);
Expand Down Expand Up @@ -1064,92 +1066,41 @@ static int clusterUpdateRouteSendCommand(valkeyClusterContext *cc,
VALKEY_COMMAND_CLUSTER_SLOTS :
VALKEY_COMMAND_CLUSTER_NODES);
if (valkeyAppendCommand(c, cmd) != VALKEY_OK) {
const char *msg = (cc->flags & VALKEYCLUSTER_FLAG_ROUTE_USE_SLOTS ?
"Command (cluster slots) send error." :
"Command (cluster nodes) send error.");
valkeyClusterSetError(cc, c->err, msg);
valkeyClusterSetError(cc, c->err, c->errstr);
return VALKEY_ERR;
}
/* Flush buffer to socket. */
if (valkeyBufferWrite(c, NULL) == VALKEY_ERR)
if (valkeyBufferWrite(c, NULL) == VALKEY_ERR) {
valkeyClusterSetError(cc, c->err, c->errstr);
return VALKEY_ERR;
}

return VALKEY_OK;
}

/* Receives and handles a CLUSTER SLOTS reply from node with context c. */
static int handleClusterSlotsReply(valkeyClusterContext *cc, valkeyContext *c) {
/* Receives and handles a CLUSTER SLOTS or CLUSTER NODES reply from node with
* context c. */
static int clusterUpdateRouteHandleReply(valkeyClusterContext *cc,
valkeyContext *c) {
valkeyReply *reply = NULL;
int result = valkeyGetReply(c, (void **)&reply);
if (result != VALKEY_OK) {
if (c->err == VALKEY_ERR_TIMEOUT) {
valkeyClusterSetError(
cc, c->err,
"Command (cluster slots) reply error (socket timeout)");
} else {
valkeyClusterSetError(
cc, VALKEY_ERR_OTHER,
"Command (cluster slots) reply error (NULL).");
}
return VALKEY_ERR;
} else if (reply->type != VALKEY_REPLY_ARRAY) {
if (reply->type == VALKEY_REPLY_ERROR) {
valkeyClusterSetError(cc, VALKEY_ERR_OTHER, reply->str);
} else {
valkeyClusterSetError(
cc, VALKEY_ERR_OTHER,
"Command (cluster slots) reply error: type is not array.");
}
freeReplyObject(reply);
if (valkeyGetReply(c, (void **)&reply) != VALKEY_OK) {
valkeyClusterSetError(cc, c->err, c->errstr);
return VALKEY_ERR;
}

dict *nodes = parse_cluster_slots(cc, reply, cc->flags);
freeReplyObject(reply);
return updateNodesAndSlotmap(cc, nodes);
}

/* Receives and handles a CLUSTER NODES reply from node with context c. */
static int handleClusterNodesReply(valkeyClusterContext *cc, valkeyContext *c) {
valkeyReply *reply = NULL;
int result = valkeyGetReply(c, (void **)&reply);
if (result != VALKEY_OK) {
if (c->err == VALKEY_ERR_TIMEOUT) {
valkeyClusterSetError(cc, c->err,
"Command (cluster nodes) reply error "
"(socket timeout)");
} else {
valkeyClusterSetError(cc, VALKEY_ERR_OTHER,
"Command (cluster nodes) reply error "
"(NULL).");
}
return VALKEY_ERR;
} else if (reply->type != VALKEY_REPLY_STRING) {
if (reply->type == VALKEY_REPLY_ERROR) {
valkeyClusterSetError(cc, VALKEY_ERR_OTHER, reply->str);
} else {
valkeyClusterSetError(cc, VALKEY_ERR_OTHER,
"Command(cluster nodes) reply error: "
"type is not string.");
}
if (reply->type == VALKEY_REPLY_ERROR) {
valkeyClusterSetError(cc, VALKEY_ERR_OTHER, reply->str);
freeReplyObject(reply);
return VALKEY_ERR;
}

dict *nodes = parse_cluster_nodes(cc, reply->str, reply->len, cc->flags);
freeReplyObject(reply);
return updateNodesAndSlotmap(cc, nodes);
}

/* Receives and handles a CLUSTER SLOTS or CLUSTER NODES reply from node with
* context c. */
static int clusterUpdateRouteHandleReply(valkeyClusterContext *cc,
valkeyContext *c) {
dict *nodes;
if (cc->flags & VALKEYCLUSTER_FLAG_ROUTE_USE_SLOTS) {
return handleClusterSlotsReply(cc, c);
nodes = parse_cluster_slots(cc, reply);
} else {
return handleClusterNodesReply(cc, c);
nodes = parse_cluster_nodes(cc, reply);
}
freeReplyObject(reply);
return updateNodesAndSlotmap(cc, nodes);
}

/**
Expand Down Expand Up @@ -3025,7 +2976,7 @@ void clusterSlotsReplyCallback(valkeyAsyncContext *ac, void *r,
}

valkeyClusterContext *cc = acc->cc;
dict *nodes = parse_cluster_slots(cc, reply, cc->flags);
dict *nodes = parse_cluster_slots(cc, reply);
if (updateNodesAndSlotmap(cc, nodes) != VALKEY_OK) {
/* Ignore failures for now */
}
Expand All @@ -3046,7 +2997,7 @@ void clusterNodesReplyCallback(valkeyAsyncContext *ac, void *r,
}

valkeyClusterContext *cc = acc->cc;
dict *nodes = parse_cluster_nodes(cc, reply->str, reply->len, cc->flags);
dict *nodes = parse_cluster_nodes(cc, reply);
if (updateNodesAndSlotmap(cc, nodes) != VALKEY_OK) {
/* Ignore failures for now */
}
Expand Down
2 changes: 2 additions & 0 deletions tests/ct_out_of_memory_handling.c
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ void test_alloc_failure_handling(void) {
prepare_allocation_test(cc, i);
result = valkeyClusterConnect2(cc);
assert(result == VALKEY_ERR);
ASSERT_STR_EQ(cc->errstr, "Out of memory");
}

prepare_allocation_test(cc, 128);
Expand Down Expand Up @@ -521,6 +522,7 @@ void test_alloc_failure_handling_async(void) {
prepare_allocation_test(acc->cc, i);
result = valkeyClusterConnect2(acc->cc);
assert(result == VALKEY_ERR);
ASSERT_STR_EQ(acc->cc->errstr, "Out of memory");
}

prepare_allocation_test(acc->cc, 126);
Expand Down

0 comments on commit 1d9e49c

Please sign in to comment.