Skip to content

Commit

Permalink
Update async cluster api
Browse files Browse the repository at this point in the history
Signed-off-by: Björn Svensson <[email protected]>
  • Loading branch information
bjosv committed Oct 1, 2024
1 parent 27820d5 commit b22341a
Show file tree
Hide file tree
Showing 8 changed files with 106 additions and 132 deletions.
22 changes: 10 additions & 12 deletions examples/cluster-clientside-caching-async.c
Original file line number Diff line number Diff line change
Expand Up @@ -140,24 +140,22 @@ void modifyKey(const char *key, const char *value) {
int main(int argc, char **argv) {
(void)argc;
(void)argv;
valkeyClusterAsyncContext *acc = valkeyClusterAsyncContextInit();
struct event_base *base = event_base_new();

valkeyClusterOptions options = {0};
options.initial_nodes = CLUSTER_NODE;
options.onConnectNC = connectCallbackNC;
options.onDisconnect = disconnectCallback;
VALKEY_CLUSTER_OPTIONS_SET_ADAPTER_LIBEVENT(&options, base);

valkeyClusterAsyncContext *acc = valkeyClusterAsyncContextInit(&options);
assert(acc);

int status;
status = valkeyClusterAsyncSetConnectCallbackNC(acc, connectCallbackNC);
assert(status == VALKEY_OK);
status = valkeyClusterAsyncSetDisconnectCallback(acc, disconnectCallback);
assert(status == VALKEY_OK);
status = valkeyClusterSetEventCallback(acc->cc, eventCallback, acc);
assert(status == VALKEY_OK);
status = valkeyClusterSetOptionAddNodes(acc->cc, CLUSTER_NODE);
assert(status == VALKEY_OK);

struct event_base *base = event_base_new();
status = valkeyClusterLibeventAttach(acc, base);
assert(status == VALKEY_OK);

status = valkeyClusterAsyncConnect2(acc);
status = valkeyClusterAsyncConnect(acc);
assert(status == VALKEY_OK);

event_base_dispatch(base);
Expand Down
14 changes: 5 additions & 9 deletions include/valkey/cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -184,13 +184,13 @@ typedef struct {
* Synchronous API
*/

valkeyClusterContext *valkeyClusterContextInit(const valkeyClusterOptions *options);
valkeyClusterContext *valkeyClusterConnectWithOptions(const valkeyClusterOptions *options);
valkeyClusterContext *valkeyClusterConnect(const char *addrs);
valkeyClusterContext *valkeyClusterConnectWithTimeout(const char *addrs,
const struct timeval tv);
int valkeyClusterConnect2(valkeyClusterContext *cc);

valkeyClusterContext *valkeyClusterContextInit(void);
void valkeyClusterFree(valkeyClusterContext *cc);

/* Configuration options */
Expand Down Expand Up @@ -293,7 +293,10 @@ valkeyContext *valkeyClusterGetValkeyContext(valkeyClusterContext *cc,
* Asynchronous API
*/

valkeyClusterAsyncContext *valkeyClusterAsyncContextInit(void);
valkeyClusterAsyncContext *valkeyClusterAsyncConnectWithOptions(const valkeyClusterOptions *options);
valkeyClusterAsyncContext *valkeyClusterAsyncContextInit(const valkeyClusterOptions *options);
int valkeyClusterAsyncConnect(valkeyClusterAsyncContext *acc); /* Connect an initiated context. */
void valkeyClusterAsyncDisconnect(valkeyClusterAsyncContext *acc);
void valkeyClusterAsyncFree(valkeyClusterAsyncContext *acc);

int valkeyClusterAsyncSetConnectCallback(valkeyClusterAsyncContext *acc,
Expand All @@ -303,13 +306,6 @@ int valkeyClusterAsyncSetConnectCallbackNC(valkeyClusterAsyncContext *acc,
int valkeyClusterAsyncSetDisconnectCallback(valkeyClusterAsyncContext *acc,
valkeyDisconnectCallback *fn);

valkeyClusterAsyncContext *valkeyClusterAsyncConnectWithOptions(const valkeyClusterOptions *options);
/* Connect and update slotmap, will block until complete. */
valkeyClusterAsyncContext *valkeyClusterAsyncConnect(const char *addrs);
/* Connect and update slotmap asynchronously using configured event engine. */
int valkeyClusterAsyncConnect2(valkeyClusterAsyncContext *acc);
void valkeyClusterAsyncDisconnect(valkeyClusterAsyncContext *acc);

/* Commands */
int valkeyClusterAsyncCommand(valkeyClusterAsyncContext *acc,
valkeyClusterCallbackFn *fn, void *privdata,
Expand Down
108 changes: 49 additions & 59 deletions src/cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -1346,50 +1346,14 @@ int valkeyClusterUpdateSlotmap(valkeyClusterContext *cc) {
return VALKEY_ERR;
}

valkeyClusterContext *valkeyClusterContextInit(void) {
valkeyClusterContext *valkeyClusterContextInit(const valkeyClusterOptions *options) {
valkeyClusterContext *cc;

cc = vk_calloc(1, sizeof(valkeyClusterContext));
if (cc == NULL)
return NULL;

cc->max_retry_count = CLUSTER_DEFAULT_MAX_RETRY_COUNT;
return cc;
}

void valkeyClusterFree(valkeyClusterContext *cc) {
if (cc == NULL)
return;

if (cc->event_callback) {
cc->event_callback(cc, VALKEYCLUSTER_EVENT_FREE_CONTEXT,
cc->event_privdata);
}

vk_free(cc->connect_timeout);
vk_free(cc->command_timeout);
vk_free(cc->username);
vk_free(cc->password);
vk_free(cc->table);

if (cc->nodes != NULL) {
dictRelease(cc->nodes);
}

if (cc->requests != NULL) {
listRelease(cc->requests);
}

memset(cc, 0xff, sizeof(*cc));
vk_free(cc);
}

valkeyClusterContext *valkeyClusterConnectWithOptions(const valkeyClusterOptions *options) {
valkeyClusterContext *cc = valkeyClusterContextInit();
if (cc == NULL) {
return NULL;
}

cc->flags = options->flags;
if (options->max_retry_count > 0) {
cc->max_retry_count = options->max_retry_count;
Expand Down Expand Up @@ -1425,6 +1389,42 @@ valkeyClusterContext *valkeyClusterConnectWithOptions(const valkeyClusterOptions
cc->tls_init_fn = options->tls_init_fn;
}

return cc;
}

void valkeyClusterFree(valkeyClusterContext *cc) {
if (cc == NULL)
return;

if (cc->event_callback) {
cc->event_callback(cc, VALKEYCLUSTER_EVENT_FREE_CONTEXT,
cc->event_privdata);
}

vk_free(cc->connect_timeout);
vk_free(cc->command_timeout);
vk_free(cc->username);
vk_free(cc->password);
vk_free(cc->table);

if (cc->nodes != NULL) {
dictRelease(cc->nodes);
}

if (cc->requests != NULL) {
listRelease(cc->requests);
}

memset(cc, 0xff, sizeof(*cc));
vk_free(cc);
}

valkeyClusterContext *valkeyClusterConnectWithOptions(const valkeyClusterOptions *options) {
valkeyClusterContext *cc = valkeyClusterContextInit(options);
if (cc == NULL) {
return NULL;
}

valkeyClusterUpdateSlotmap(cc);
return cc;
}
Expand Down Expand Up @@ -2949,11 +2949,11 @@ valkeyClusterGetValkeyAsyncContext(valkeyClusterAsyncContext *acc,
return ac;
}

valkeyClusterAsyncContext *valkeyClusterAsyncContextInit(void) {
valkeyClusterAsyncContext *valkeyClusterAsyncContextInit(const valkeyClusterOptions *options) {
valkeyClusterContext *cc;
valkeyClusterAsyncContext *acc;

cc = valkeyClusterContextInit();
cc = valkeyClusterContextInit(options);
if (cc == NULL) {
return NULL;
}
Expand All @@ -2964,21 +2964,6 @@ valkeyClusterAsyncContext *valkeyClusterAsyncContextInit(void) {
return NULL;
}

return acc;
}

valkeyClusterAsyncContext *valkeyClusterAsyncConnectWithOptions(const valkeyClusterOptions *options) {

valkeyClusterContext *cc = valkeyClusterConnectWithOptions(options);
if (cc == NULL) {
return NULL;
}

valkeyClusterAsyncContext *acc = valkeyClusterAsyncInitialize(cc);
if (acc == NULL) {
valkeyClusterFree(cc);
return NULL;
}
if (options->onConnect != NULL) {
acc->onConnect = options->onConnect;
}
Expand All @@ -2996,18 +2981,23 @@ valkeyClusterAsyncContext *valkeyClusterAsyncConnectWithOptions(const valkeyClus
return acc;
}

valkeyClusterAsyncContext *valkeyClusterAsyncConnect(const char *addrs) {
valkeyClusterOptions options = {0};
options.initial_nodes = addrs;
valkeyClusterAsyncContext *valkeyClusterAsyncConnectWithOptions(const valkeyClusterOptions *options) {
valkeyClusterAsyncContext *acc = valkeyClusterAsyncContextInit(options);
if (acc == NULL) {
return NULL;
}

return valkeyClusterAsyncConnectWithOptions(&options);
//TODO: valkeyClusterAsyncConnect(acc);
valkeyClusterUpdateSlotmap(acc->cc);
return acc;
}

int valkeyClusterAsyncConnect2(valkeyClusterAsyncContext *acc) {
int valkeyClusterAsyncConnect(valkeyClusterAsyncContext *acc) {
/* An attach function for an async event library is required. */
if (acc->attach_fn == NULL) {
return VALKEY_ERR;
}
/* TODO: add options to use: valkeyClusterUpdateSlotmap(acc->cc); */
return updateSlotMapAsync(acc, NULL /*any node*/);
}

Expand Down
14 changes: 7 additions & 7 deletions tests/clusterclient_reconnect_async.c
Original file line number Diff line number Diff line change
Expand Up @@ -98,15 +98,15 @@ int main(int argc, char **argv) {
exit(1);
}
const char *initnode = argv[1];
struct event_base *base = event_base_new();

valkeyClusterAsyncContext *acc = valkeyClusterAsyncContextInit();
assert(acc);
valkeyClusterSetOptionAddNodes(acc->cc, initnode);
valkeyClusterSetOptionRouteUseSlots(acc->cc);
valkeyClusterOptions options = {0};
options.initial_nodes = initnode;
options.flags = VALKEYCLUSTER_FLAG_ROUTE_USE_SLOTS;
VALKEY_CLUSTER_OPTIONS_SET_ADAPTER_LIBEVENT(&options, base);

struct event_base *base = event_base_new();
int status = valkeyClusterLibeventAttach(acc, base);
assert(status == VALKEY_OK);
valkeyClusterAsyncContext *acc = valkeyClusterAsyncContextInit(&options);
assert(acc);

connectToValkey(acc);
// schedule reading from stdin and sending next command
Expand Down
34 changes: 11 additions & 23 deletions tests/ct_async.c
Original file line number Diff line number Diff line change
Expand Up @@ -68,34 +68,22 @@ void eventCallback(const valkeyClusterContext *cc, int event, void *privdata) {
}

int main(void) {
struct event_base *base = event_base_new();

valkeyClusterAsyncContext *acc = valkeyClusterAsyncContextInit();
assert(acc);

int status;
status = valkeyClusterAsyncSetConnectCallback(acc, connectCallback);
assert(status == VALKEY_OK);
status = valkeyClusterAsyncSetConnectCallback(acc, connectCallback);
assert(status == VALKEY_ERR); /* Re-registration not accepted */
status = valkeyClusterAsyncSetConnectCallbackNC(acc, connectCallbackNC);
assert(status == VALKEY_ERR); /* Re-registration not accepted */

status = valkeyClusterAsyncSetDisconnectCallback(acc, disconnectCallback);
assert(status == VALKEY_OK);
status = valkeyClusterSetEventCallback(acc->cc, eventCallback, acc);
assert(status == VALKEY_OK);
status = valkeyClusterSetOptionAddNodes(acc->cc, CLUSTER_NODE);
assert(status == VALKEY_OK);
valkeyClusterOptions options = {0};
options.initial_nodes = CLUSTER_NODE;
options.onConnect = connectCallback;
options.onDisconnect = disconnectCallback;
VALKEY_CLUSTER_OPTIONS_SET_ADAPTER_LIBEVENT(&options, base);

/* Expect error when connecting without an attached event library. */
status = valkeyClusterAsyncConnect2(acc);
assert(status == VALKEY_ERR);
valkeyClusterAsyncContext *acc = valkeyClusterAsyncContextInit(&options);
assert(acc);

struct event_base *base = event_base_new();
status = valkeyClusterLibeventAttach(acc, base);
/* Set an event callback that uses acc as privdata */
int status = valkeyClusterSetEventCallback(acc->cc, eventCallback, acc);
assert(status == VALKEY_OK);

status = valkeyClusterAsyncConnect2(acc);
status = valkeyClusterAsyncConnect(acc);
assert(status == VALKEY_OK);

event_base_dispatch(base);
Expand Down
15 changes: 8 additions & 7 deletions tests/ct_async_glib.c
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,17 @@ int main(int argc, char **argv) {
GMainContext *context = NULL;
mainloop = g_main_loop_new(context, FALSE);

valkeyClusterAsyncContext *acc = valkeyClusterAsyncConnect(CLUSTER_NODE);
valkeyClusterOptions options = {0};
options.initial_nodes = CLUSTER_NODE;
options.onConnect = connectCallback;
options.onDisconnect = disconnectCallback;
VALKEY_CLUSTER_OPTIONS_SET_ADAPTER_GLIB(&options, context);

valkeyClusterAsyncContext *acc = valkeyClusterAsyncConnectWithOptions(&options);
assert(acc);
ASSERT_MSG(acc->err == 0, acc->errstr);

int status = valkeyClusterGlibAttach(acc, context);
assert(status == VALKEY_OK);

valkeyClusterAsyncSetConnectCallback(acc, connectCallback);
valkeyClusterAsyncSetDisconnectCallback(acc, disconnectCallback);

int status;
status = valkeyClusterAsyncCommand(acc, setCallback, (char *)"id", "SET key value");
ASSERT_MSG(status == VALKEY_OK, acc->errstr);

Expand Down
14 changes: 7 additions & 7 deletions tests/ct_async_libev.c
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,17 @@ int main(int argc, char **argv) {
UNUSED(argc);
UNUSED(argv);

valkeyClusterAsyncContext *acc = valkeyClusterAsyncConnect(CLUSTER_NODE);
valkeyClusterOptions options = {0};
options.initial_nodes = CLUSTER_NODE;
options.onConnect = connectCallback;
options.onDisconnect = disconnectCallback;
VALKEY_CLUSTER_OPTIONS_SET_ADAPTER_LIBEV(&options, EV_DEFAULT);

valkeyClusterAsyncContext *acc = valkeyClusterAsyncConnectWithOptions(&options);
assert(acc);
ASSERT_MSG(acc->err == 0, acc->errstr);

int status;
status = valkeyClusterLibevAttach(acc, EV_DEFAULT);
assert(status == VALKEY_OK);

valkeyClusterAsyncSetConnectCallback(acc, connectCallback);
valkeyClusterAsyncSetDisconnectCallback(acc, disconnectCallback);

status = valkeyClusterAsyncCommand(acc, setCallback, (char *)"ID",
"SET key value");
ASSERT_MSG(status == VALKEY_OK, acc->errstr);
Expand Down
17 changes: 9 additions & 8 deletions tests/ct_async_libuv.c
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,19 @@ int main(int argc, char **argv) {
UNUSED(argc);
UNUSED(argv);

valkeyClusterAsyncContext *acc = valkeyClusterAsyncConnect(CLUSTER_NODE);
uv_loop_t *loop = uv_default_loop();

valkeyClusterOptions options = {0};
options.initial_nodes = CLUSTER_NODE;
options.onConnect = connectCallback;
options.onDisconnect = disconnectCallback;
VALKEY_CLUSTER_OPTIONS_SET_ADAPTER_LIBUV(&options, loop);

valkeyClusterAsyncContext *acc = valkeyClusterAsyncConnectWithOptions(&options);
assert(acc);
ASSERT_MSG(acc->err == 0, acc->errstr);

int status;
uv_loop_t *loop = uv_default_loop();
status = valkeyClusterLibuvAttach(acc, loop);
assert(status == VALKEY_OK);

valkeyClusterAsyncSetConnectCallback(acc, connectCallback);
valkeyClusterAsyncSetDisconnectCallback(acc, disconnectCallback);

status = valkeyClusterAsyncCommand(acc, setCallback, (char *)"ID",
"SET key value");
ASSERT_MSG(status == VALKEY_OK, acc->errstr);
Expand Down

0 comments on commit b22341a

Please sign in to comment.