Skip to content

Commit

Permalink
Replace clusterclient_async_sequence with clusterclient_async
Browse files Browse the repository at this point in the history
The clusterclient_async test binary now provides all features that
previously only existed in clusterclient_async_sequence.
clusterclient_async_sequence is therefor removed.

Inherits clusterclient_async_sequence's behaviour by sending commands
sequentially instead of queuing all commands before starting the event engine.
This allows testcases to perform behavioral changes between commands using
action commands. See clusterclient_async.c.
  • Loading branch information
bjosv committed Aug 30, 2023
1 parent e1fde9b commit 4dc5564
Show file tree
Hide file tree
Showing 3 changed files with 153 additions and 270 deletions.
18 changes: 8 additions & 10 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,6 @@ add_executable(clusterclient_async clusterclient_async.c)
target_link_libraries(clusterclient_async hiredis_cluster ${SSL_LIBRARY} ${LIBEVENT_LIBRARY})
add_executable(clusterclient_reconnect_async clusterclient_reconnect_async.c)
target_link_libraries(clusterclient_reconnect_async hiredis_cluster ${SSL_LIBRARY} ${LIBEVENT_LIBRARY})
add_executable(clusterclient_async_sequence clusterclient_async_sequence.c)
target_link_libraries(clusterclient_async_sequence hiredis_cluster ${SSL_LIBRARY} ${LIBEVENT_LIBRARY})
add_test(NAME set-get-test
COMMAND "${CMAKE_SOURCE_DIR}/tests/scripts/set-get-test.sh"
"$<TARGET_FILE:clusterclient>"
Expand Down Expand Up @@ -193,39 +191,39 @@ add_test(NAME dbsize-to-all-nodes-test
WORKING_DIRECTORY "${CMAKE_SOURCE_DIR}/tests/scripts/")
add_test(NAME dbsize-to-all-nodes-test-async
COMMAND "${CMAKE_SOURCE_DIR}/tests/scripts/dbsize-to-all-nodes-test.sh"
"$<TARGET_FILE:clusterclient_async_sequence>"
"$<TARGET_FILE:clusterclient_async>"
WORKING_DIRECTORY "${CMAKE_SOURCE_DIR}/tests/scripts/")
add_test(NAME dbsize-to-all-nodes-during-scaledown-test
COMMAND "${CMAKE_SOURCE_DIR}/tests/scripts/dbsize-to-all-nodes-during-scaledown-test.sh"
"$<TARGET_FILE:clusterclient_async_sequence>"
"$<TARGET_FILE:clusterclient_async>"
WORKING_DIRECTORY "${CMAKE_SOURCE_DIR}/tests/scripts/")
add_test(NAME reconnect-test
COMMAND "${CMAKE_SOURCE_DIR}/tests/scripts/reconnect-test.sh"
"$<TARGET_FILE:clusterclient_reconnect_async>"
WORKING_DIRECTORY "${CMAKE_SOURCE_DIR}/tests/scripts/")
add_test(NAME timeout-handling-test
COMMAND "${CMAKE_SOURCE_DIR}/tests/scripts/timeout-handling-test.sh"
"$<TARGET_FILE:clusterclient_async_sequence>"
"$<TARGET_FILE:clusterclient_async>"
WORKING_DIRECTORY "${CMAKE_SOURCE_DIR}/tests/scripts/")
add_test(NAME connect-error-using-cluster-nodes-test
COMMAND "${CMAKE_SOURCE_DIR}/tests/scripts/connect-error-using-cluster-nodes-test.sh"
"$<TARGET_FILE:clusterclient_async>"
WORKING_DIRECTORY "${CMAKE_SOURCE_DIR}/tests/scripts/")
add_test(NAME command-from-callback-test
COMMAND "${CMAKE_SOURCE_DIR}/tests/scripts/command-from-callback-test.sh"
"$<TARGET_FILE:clusterclient_async_sequence>"
"$<TARGET_FILE:clusterclient_async>"
WORKING_DIRECTORY "${CMAKE_SOURCE_DIR}/tests/scripts/")
add_test(NAME ask-redirect-connection-error-test
COMMAND "${CMAKE_SOURCE_DIR}/tests/scripts/ask-redirect-connection-error-test.sh"
"$<TARGET_FILE:clusterclient_async_sequence>"
"$<TARGET_FILE:clusterclient_async>"
WORKING_DIRECTORY "${CMAKE_SOURCE_DIR}/tests/scripts/")
add_test(NAME cluster-down-test
COMMAND "${CMAKE_SOURCE_DIR}/tests/scripts/cluster-down-test.sh"
"$<TARGET_FILE:clusterclient_async_sequence>"
"$<TARGET_FILE:clusterclient_async>"
WORKING_DIRECTORY "${CMAKE_SOURCE_DIR}/tests/scripts/")
add_test(NAME connection-error-test
COMMAND "${CMAKE_SOURCE_DIR}/tests/scripts/connection-error-test.sh"
"$<TARGET_FILE:clusterclient_async_sequence>"
"$<TARGET_FILE:clusterclient_async>"
WORKING_DIRECTORY "${CMAKE_SOURCE_DIR}/tests/scripts/")
add_test(NAME redirect-with-hostname-test
COMMAND "${CMAKE_SOURCE_DIR}/tests/scripts/redirect-with-hostname-test.sh"
Expand All @@ -247,7 +245,7 @@ endif()
# Disabling the testcase if hiredis contains the issue or if the version is unknown.
add_test(NAME redirect-with-hostname-test-async
COMMAND "${CMAKE_SOURCE_DIR}/tests/scripts/redirect-with-hostname-test.sh"
"$<TARGET_FILE:clusterclient_async_sequence>"
"$<TARGET_FILE:clusterclient_async>"
WORKING_DIRECTORY "${CMAKE_SOURCE_DIR}/tests/scripts/")
if(hiredis_VERSION VERSION_EQUAL "1.1.0" OR hiredis_VERSION VERSION_EQUAL "0")
set_tests_properties(redirect-with-hostname-test-async PROPERTIES DISABLED True)
Expand Down
197 changes: 145 additions & 52 deletions tests/clusterclient_async.c
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,33 @@
*
* The behaviour is the same as that of clusterclient.c, but the asynchronous
* API of the library is used rather than the synchronous API.
* The following action commands can alter the default behaviour:
*
* !async - Send multiple commands and then wait for their responses.
* Will send all following commands until EOF or the command `!sync`
*
* !sync - Send a single command and wait for its response before sending next
* command. This is the default behaviour.
*
* !resend - Resend a failed command from its reply callback.
* Will resend all following failed commands until EOF.
*
* !sleep - Sleep a second. Can be used to allow timers to timeout.
* Currently not supported while in !async mode.
*
* !all - Send each command to all nodes in the cluster.
* Will send following commands using the `..ToNode()` API and a
* cluster node iterator to send each command to all known nodes.
*
* An example input of first sending 2 commands and waiting for their responses,
* before sending a single command and waiting for its response:
*
* !async
* SET dual-1 command
* SET dual-2 command
* !sync
* SET single command
*
*/

#include "adapters/libevent.h"
Expand All @@ -14,54 +41,134 @@
#include <stdlib.h>
#include <string.h>

#define CMD_SIZE 256
#define HISTORY_DEPTH 16

char cmd_history[HISTORY_DEPTH][CMD_SIZE];

int num_running = 0;
int resend_failed_cmd = 0;
int send_to_all = 0;

/*
void printReply(redisReply *reply) {
void sendNextCommand(int, short, void *);

void printReply(const redisReply *reply) {
switch (reply->type) {
case REDIS_REPLY_INTEGER: printf("%lld", reply->integer); break;
case REDIS_REPLY_DOUBLE: printf("%s", reply->str); break;
case REDIS_REPLY_ERROR: printf("-%s", reply->str); break;
// TODO: Escape special chars in strings
case REDIS_REPLY_STRING: printf("\"%s\"", reply->str); break;
case REDIS_REPLY_ARRAY:
printf("[");
for (size_t i = 0; i < reply->elements; i++) {
printReply(reply->element[i]);
if (i < reply->elements - 1)
printf(", ");
}
printf("]");
case REDIS_REPLY_ERROR:
case REDIS_REPLY_STATUS:
case REDIS_REPLY_STRING:
case REDIS_REPLY_VERB:
case REDIS_REPLY_BIGNUM:
printf("%s\n", reply->str);
break;
case REDIS_REPLY_INTEGER:
printf("%lld\n", reply->integer);
break;
default:
printf("UNKNOWN TYPE %d", reply->type);
printf("Unhandled reply type: %d\n", reply->type);
}
}
*/

void replyCallback(redisClusterAsyncContext *acc, void *r, void *privdata) {
UNUSED(privdata);
redisReply *reply = (redisReply *)r;
ASSERT_MSG(reply != NULL, acc->errstr);
intptr_t cmd_id = (intptr_t)privdata; /* Id to corresponding cmd */

/* printReply(reply); */
/* printf("\n"); */
printf("%s\n", reply->str);
if (reply == NULL) {
if (acc->err) {
printf("error: %s\n", acc->errstr);
} else {
printf("unknown error\n");
}

if (resend_failed_cmd) {
printf("resend '%s'\n", cmd_history[cmd_id]);
if (redisClusterAsyncCommand(acc, replyCallback, (void *)cmd_id,
cmd_history[cmd_id]) != REDIS_OK)
printf("send error\n");
return;
}
} else {
printReply(reply);
}

if (--num_running == 0) {
// Disconnect after receiving all replies
redisClusterAsyncDisconnect(acc);
/* Schedule a read from stdin and send next command */
event_base_once(acc->adapter, -1, EV_TIMEOUT, sendNextCommand, acc,
NULL);
}
}

void connectCallback(const redisAsyncContext *ac, int status) {
ASSERT_MSG(status == REDIS_OK, ac->errstr);
// printf("Connected to %s:%d\n", ac->c.tcp.host, ac->c.tcp.port);
}
void sendNextCommand(int fd, short kind, void *arg) {
UNUSED(fd);
UNUSED(kind);
redisClusterAsyncContext *acc = arg;
int async = 0;

char cmd[CMD_SIZE];
while (fgets(cmd, CMD_SIZE, stdin)) {
size_t len = strlen(cmd);
if (cmd[len - 1] == '\n') /* Chop trailing line break */
cmd[len - 1] = '\0';

if (cmd[0] == '\0') /* Skip empty lines */
continue;
if (cmd[0] == '#') /* Skip comments */
continue;
if (cmd[0] == '!') {
if (strcmp(cmd, "!sleep") == 0) {
ASSERT_MSG(async == 0, "!sleep in !async not supported");
struct timeval timeout = {1, 0};
event_base_once(acc->adapter, -1, EV_TIMEOUT, sendNextCommand,
acc, &timeout);
return;
}
if (strcmp(cmd, "!async") == 0) /* Enable async send */
async = 1;
if (strcmp(cmd, "!sync") == 0) { /* Disable async send */
if (async)
return; /* We are done sending commands */
}
if (strcmp(cmd, "!resend") == 0) /* Enable resend of failed cmd */
resend_failed_cmd = 1;
if (strcmp(cmd, "!all") == 0) { /* Enable send to all nodes */
ASSERT_MSG(resend_failed_cmd == 0,
"!all in !resend not supported");
send_to_all = 1;
}
continue; /* Skip line */
}

void disconnectCallback(const redisAsyncContext *ac, int status) {
ASSERT_MSG(status == REDIS_OK, ac->errstr);
// printf("Disconnected from %s:%d\n", ac->c.tcp.host, ac->c.tcp.port);
/* Copy command string to history buffer */
assert(num_running < HISTORY_DEPTH);
strcpy(cmd_history[num_running], cmd);

if (send_to_all) {
nodeIterator ni;
initNodeIterator(&ni, acc->cc);

redisClusterNode *node;
while ((node = nodeNext(&ni)) != NULL) {
int status = redisClusterAsyncCommandToNode(
acc, node, replyCallback, (void *)((intptr_t)num_running),
cmd);
ASSERT_MSG(status == REDIS_OK, acc->errstr);
num_running++;
}
} else {
int status = redisClusterAsyncCommand(
acc, replyCallback, (void *)((intptr_t)num_running), cmd);
ASSERT_MSG(status == REDIS_OK, acc->errstr);
num_running++;
}

if (async)
continue; /* Send next command as well */

return;
}

/* Disconnect if nothing is left to read from stdin */
redisClusterAsyncDisconnect(acc);
}

void eventCallback(const redisClusterContext *cc, int event, void *privdata) {
Expand Down Expand Up @@ -105,12 +212,14 @@ int main(int argc, char **argv) {
exit(1);
}
const char *initnode = argv[optind];
struct timeval timeout = {0, 500000};

redisClusterAsyncContext *acc = redisClusterAsyncContextInit();
assert(acc);
redisClusterAsyncSetConnectCallback(acc, connectCallback);
redisClusterAsyncSetDisconnectCallback(acc, disconnectCallback);
redisClusterSetOptionAddNodes(acc->cc, initnode);
redisClusterSetOptionTimeout(acc->cc, timeout);
redisClusterSetOptionConnectTimeout(acc->cc, timeout);
redisClusterSetOptionMaxRetry(acc->cc, 1);
if (use_cluster_slots) {
redisClusterSetOptionRouteUseSlots(acc->cc);
}
Expand All @@ -123,28 +232,12 @@ int main(int argc, char **argv) {
exit(2);
}

int status;
struct event_base *base = event_base_new();
status = redisClusterLibeventAttach(acc, base);
int status = redisClusterLibeventAttach(acc, base);
assert(status == REDIS_OK);

// Forward commands from stdin to redis cluster
char command[256];

// Make sure num_running doesn't reach 0 in replyCallback() before all
// commands have been sent.
num_running++;

while (fgets(command, 256, stdin)) {
size_t len = strlen(command);
if (command[len - 1] == '\n') // Chop trailing line break
command[len - 1] = '\0';
status =
redisClusterAsyncCommand(acc, replyCallback, (char *)"ID", command);
ASSERT_MSG(status == REDIS_OK, acc->errstr);
num_running++;
}
num_running--; // all commands sent
/* Schedule a read from stdin and send next command */
event_base_once(acc->adapter, -1, EV_TIMEOUT, sendNextCommand, acc, NULL);

event_base_dispatch(base);

Expand Down
Loading

0 comments on commit 4dc5564

Please sign in to comment.