Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace clusterclient_async_sequence with clusterclient_async #184

Merged
merged 1 commit into from
Aug 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 8 additions & 10 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,6 @@ add_executable(clusterclient_async clusterclient_async.c)
target_link_libraries(clusterclient_async hiredis_cluster ${SSL_LIBRARY} ${LIBEVENT_LIBRARY})
add_executable(clusterclient_reconnect_async clusterclient_reconnect_async.c)
target_link_libraries(clusterclient_reconnect_async hiredis_cluster ${SSL_LIBRARY} ${LIBEVENT_LIBRARY})
add_executable(clusterclient_async_sequence clusterclient_async_sequence.c)
target_link_libraries(clusterclient_async_sequence hiredis_cluster ${SSL_LIBRARY} ${LIBEVENT_LIBRARY})
add_test(NAME set-get-test
COMMAND "${CMAKE_SOURCE_DIR}/tests/scripts/set-get-test.sh"
"$<TARGET_FILE:clusterclient>"
Expand Down Expand Up @@ -193,39 +191,39 @@ add_test(NAME dbsize-to-all-nodes-test
WORKING_DIRECTORY "${CMAKE_SOURCE_DIR}/tests/scripts/")
add_test(NAME dbsize-to-all-nodes-test-async
COMMAND "${CMAKE_SOURCE_DIR}/tests/scripts/dbsize-to-all-nodes-test.sh"
"$<TARGET_FILE:clusterclient_async_sequence>"
"$<TARGET_FILE:clusterclient_async>"
WORKING_DIRECTORY "${CMAKE_SOURCE_DIR}/tests/scripts/")
add_test(NAME dbsize-to-all-nodes-during-scaledown-test
COMMAND "${CMAKE_SOURCE_DIR}/tests/scripts/dbsize-to-all-nodes-during-scaledown-test.sh"
"$<TARGET_FILE:clusterclient_async_sequence>"
"$<TARGET_FILE:clusterclient_async>"
WORKING_DIRECTORY "${CMAKE_SOURCE_DIR}/tests/scripts/")
add_test(NAME reconnect-test
COMMAND "${CMAKE_SOURCE_DIR}/tests/scripts/reconnect-test.sh"
"$<TARGET_FILE:clusterclient_reconnect_async>"
WORKING_DIRECTORY "${CMAKE_SOURCE_DIR}/tests/scripts/")
add_test(NAME timeout-handling-test
COMMAND "${CMAKE_SOURCE_DIR}/tests/scripts/timeout-handling-test.sh"
"$<TARGET_FILE:clusterclient_async_sequence>"
"$<TARGET_FILE:clusterclient_async>"
WORKING_DIRECTORY "${CMAKE_SOURCE_DIR}/tests/scripts/")
add_test(NAME connect-error-using-cluster-nodes-test
COMMAND "${CMAKE_SOURCE_DIR}/tests/scripts/connect-error-using-cluster-nodes-test.sh"
"$<TARGET_FILE:clusterclient_async>"
WORKING_DIRECTORY "${CMAKE_SOURCE_DIR}/tests/scripts/")
add_test(NAME command-from-callback-test
COMMAND "${CMAKE_SOURCE_DIR}/tests/scripts/command-from-callback-test.sh"
"$<TARGET_FILE:clusterclient_async_sequence>"
"$<TARGET_FILE:clusterclient_async>"
WORKING_DIRECTORY "${CMAKE_SOURCE_DIR}/tests/scripts/")
add_test(NAME ask-redirect-connection-error-test
COMMAND "${CMAKE_SOURCE_DIR}/tests/scripts/ask-redirect-connection-error-test.sh"
"$<TARGET_FILE:clusterclient_async_sequence>"
"$<TARGET_FILE:clusterclient_async>"
WORKING_DIRECTORY "${CMAKE_SOURCE_DIR}/tests/scripts/")
add_test(NAME cluster-down-test
COMMAND "${CMAKE_SOURCE_DIR}/tests/scripts/cluster-down-test.sh"
"$<TARGET_FILE:clusterclient_async_sequence>"
"$<TARGET_FILE:clusterclient_async>"
WORKING_DIRECTORY "${CMAKE_SOURCE_DIR}/tests/scripts/")
add_test(NAME connection-error-test
COMMAND "${CMAKE_SOURCE_DIR}/tests/scripts/connection-error-test.sh"
"$<TARGET_FILE:clusterclient_async_sequence>"
"$<TARGET_FILE:clusterclient_async>"
WORKING_DIRECTORY "${CMAKE_SOURCE_DIR}/tests/scripts/")
add_test(NAME redirect-with-hostname-test
COMMAND "${CMAKE_SOURCE_DIR}/tests/scripts/redirect-with-hostname-test.sh"
Expand All @@ -247,7 +245,7 @@ endif()
# Disabling the testcase if hiredis contains the issue or if the version is unknown.
add_test(NAME redirect-with-hostname-test-async
COMMAND "${CMAKE_SOURCE_DIR}/tests/scripts/redirect-with-hostname-test.sh"
"$<TARGET_FILE:clusterclient_async_sequence>"
"$<TARGET_FILE:clusterclient_async>"
WORKING_DIRECTORY "${CMAKE_SOURCE_DIR}/tests/scripts/")
if(hiredis_VERSION VERSION_EQUAL "1.1.0" OR hiredis_VERSION VERSION_EQUAL "0")
set_tests_properties(redirect-with-hostname-test-async PROPERTIES DISABLED True)
Expand Down
197 changes: 145 additions & 52 deletions tests/clusterclient_async.c
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,33 @@
*
* The behaviour is the same as that of clusterclient.c, but the asynchronous
* API of the library is used rather than the synchronous API.
* The following action commands can alter the default behaviour:
*
* !async - Send multiple commands and then wait for their responses.
* Will send all following commands until EOF or the command `!sync`
*
* !sync - Send a single command and wait for its response before sending next
* command. This is the default behaviour.
*
* !resend - Resend a failed command from its reply callback.
* Will resend all following failed commands until EOF.
*
* !sleep - Sleep a second. Can be used to allow timers to timeout.
* Currently not supported while in !async mode.
*
* !all - Send each command to all nodes in the cluster.
* Will send following commands using the `..ToNode()` API and a
* cluster node iterator to send each command to all known nodes.
*
* An example input of first sending 2 commands and waiting for their responses,
* before sending a single command and waiting for its response:
*
* !async
* SET dual-1 command
* SET dual-2 command
* !sync
* SET single command
*
*/

#include "adapters/libevent.h"
Expand All @@ -14,54 +41,134 @@
#include <stdlib.h>
#include <string.h>

#define CMD_SIZE 256
#define HISTORY_DEPTH 16

char cmd_history[HISTORY_DEPTH][CMD_SIZE];

int num_running = 0;
int resend_failed_cmd = 0;
int send_to_all = 0;

/*
void printReply(redisReply *reply) {
void sendNextCommand(int, short, void *);

void printReply(const redisReply *reply) {
switch (reply->type) {
case REDIS_REPLY_INTEGER: printf("%lld", reply->integer); break;
case REDIS_REPLY_DOUBLE: printf("%s", reply->str); break;
case REDIS_REPLY_ERROR: printf("-%s", reply->str); break;
// TODO: Escape special chars in strings
case REDIS_REPLY_STRING: printf("\"%s\"", reply->str); break;
case REDIS_REPLY_ARRAY:
printf("[");
for (size_t i = 0; i < reply->elements; i++) {
printReply(reply->element[i]);
if (i < reply->elements - 1)
printf(", ");
}
printf("]");
case REDIS_REPLY_ERROR:
case REDIS_REPLY_STATUS:
case REDIS_REPLY_STRING:
case REDIS_REPLY_VERB:
case REDIS_REPLY_BIGNUM:
printf("%s\n", reply->str);
break;
case REDIS_REPLY_INTEGER:
printf("%lld\n", reply->integer);
break;
default:
printf("UNKNOWN TYPE %d", reply->type);
printf("Unhandled reply type: %d\n", reply->type);
}
}
*/

void replyCallback(redisClusterAsyncContext *acc, void *r, void *privdata) {
UNUSED(privdata);
redisReply *reply = (redisReply *)r;
ASSERT_MSG(reply != NULL, acc->errstr);
intptr_t cmd_id = (intptr_t)privdata; /* Id to corresponding cmd */

/* printReply(reply); */
/* printf("\n"); */
printf("%s\n", reply->str);
if (reply == NULL) {
if (acc->err) {
printf("error: %s\n", acc->errstr);
} else {
printf("unknown error\n");
}

if (resend_failed_cmd) {
printf("resend '%s'\n", cmd_history[cmd_id]);
if (redisClusterAsyncCommand(acc, replyCallback, (void *)cmd_id,
cmd_history[cmd_id]) != REDIS_OK)
printf("send error\n");
return;
}
} else {
printReply(reply);
}

if (--num_running == 0) {
// Disconnect after receiving all replies
redisClusterAsyncDisconnect(acc);
/* Schedule a read from stdin and send next command */
event_base_once(acc->adapter, -1, EV_TIMEOUT, sendNextCommand, acc,
NULL);
}
}

void connectCallback(const redisAsyncContext *ac, int status) {
ASSERT_MSG(status == REDIS_OK, ac->errstr);
// printf("Connected to %s:%d\n", ac->c.tcp.host, ac->c.tcp.port);
}
void sendNextCommand(int fd, short kind, void *arg) {
UNUSED(fd);
UNUSED(kind);
redisClusterAsyncContext *acc = arg;
int async = 0;

char cmd[CMD_SIZE];
while (fgets(cmd, CMD_SIZE, stdin)) {
size_t len = strlen(cmd);
if (cmd[len - 1] == '\n') /* Chop trailing line break */
cmd[len - 1] = '\0';

if (cmd[0] == '\0') /* Skip empty lines */
continue;
if (cmd[0] == '#') /* Skip comments */
continue;
if (cmd[0] == '!') {
if (strcmp(cmd, "!sleep") == 0) {
ASSERT_MSG(async == 0, "!sleep in !async not supported");
struct timeval timeout = {1, 0};
event_base_once(acc->adapter, -1, EV_TIMEOUT, sendNextCommand,
acc, &timeout);
return;
}
if (strcmp(cmd, "!async") == 0) /* Enable async send */
async = 1;
if (strcmp(cmd, "!sync") == 0) { /* Disable async send */
if (async)
return; /* We are done sending commands */
}
if (strcmp(cmd, "!resend") == 0) /* Enable resend of failed cmd */
resend_failed_cmd = 1;
if (strcmp(cmd, "!all") == 0) { /* Enable send to all nodes */
ASSERT_MSG(resend_failed_cmd == 0,
"!all in !resend not supported");
send_to_all = 1;
}
continue; /* Skip line */
}

void disconnectCallback(const redisAsyncContext *ac, int status) {
ASSERT_MSG(status == REDIS_OK, ac->errstr);
// printf("Disconnected from %s:%d\n", ac->c.tcp.host, ac->c.tcp.port);
/* Copy command string to history buffer */
assert(num_running < HISTORY_DEPTH);
strcpy(cmd_history[num_running], cmd);

if (send_to_all) {
nodeIterator ni;
initNodeIterator(&ni, acc->cc);

redisClusterNode *node;
while ((node = nodeNext(&ni)) != NULL) {
int status = redisClusterAsyncCommandToNode(
acc, node, replyCallback, (void *)((intptr_t)num_running),
cmd);
ASSERT_MSG(status == REDIS_OK, acc->errstr);
num_running++;
}
} else {
int status = redisClusterAsyncCommand(
acc, replyCallback, (void *)((intptr_t)num_running), cmd);
ASSERT_MSG(status == REDIS_OK, acc->errstr);
num_running++;
}

if (async)
continue; /* Send next command as well */

return;
}

/* Disconnect if nothing is left to read from stdin */
redisClusterAsyncDisconnect(acc);
}

void eventCallback(const redisClusterContext *cc, int event, void *privdata) {
Expand Down Expand Up @@ -105,12 +212,14 @@ int main(int argc, char **argv) {
exit(1);
}
const char *initnode = argv[optind];
struct timeval timeout = {0, 500000};

redisClusterAsyncContext *acc = redisClusterAsyncContextInit();
assert(acc);
redisClusterAsyncSetConnectCallback(acc, connectCallback);
redisClusterAsyncSetDisconnectCallback(acc, disconnectCallback);
redisClusterSetOptionAddNodes(acc->cc, initnode);
redisClusterSetOptionTimeout(acc->cc, timeout);
redisClusterSetOptionConnectTimeout(acc->cc, timeout);
redisClusterSetOptionMaxRetry(acc->cc, 1);
if (use_cluster_slots) {
redisClusterSetOptionRouteUseSlots(acc->cc);
}
Expand All @@ -123,28 +232,12 @@ int main(int argc, char **argv) {
exit(2);
}

int status;
struct event_base *base = event_base_new();
status = redisClusterLibeventAttach(acc, base);
int status = redisClusterLibeventAttach(acc, base);
assert(status == REDIS_OK);

// Forward commands from stdin to redis cluster
char command[256];

// Make sure num_running doesn't reach 0 in replyCallback() before all
// commands have been sent.
num_running++;

while (fgets(command, 256, stdin)) {
size_t len = strlen(command);
if (command[len - 1] == '\n') // Chop trailing line break
command[len - 1] = '\0';
status =
redisClusterAsyncCommand(acc, replyCallback, (char *)"ID", command);
ASSERT_MSG(status == REDIS_OK, acc->errstr);
num_running++;
}
num_running--; // all commands sent
/* Schedule a read from stdin and send next command */
event_base_once(acc->adapter, -1, EV_TIMEOUT, sendNextCommand, acc, NULL);

event_base_dispatch(base);

Expand Down
Loading