Skip to content

Commit

Permalink
Event adapter corrections (valkey-io#106)
Browse files Browse the repository at this point in the history
Attach a cluster context to an event library when an attach function has
been registered. Check the `attach_fn` instead of `data` since the attach
data might not be required for some event engines, an can be NULL.

Add missing cluster event adapters: ivykis, libhv, libsdevent, MacOS, poll

Signed-off-by: Björn Svensson <[email protected]>
  • Loading branch information
bjosv authored Oct 1, 2024
1 parent ef25c7e commit 952b59b
Show file tree
Hide file tree
Showing 16 changed files with 139 additions and 49 deletions.
6 changes: 4 additions & 2 deletions examples/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ example-async-libhv: async-libhv.c $(STLIBNAME)
$(CC) -o $@ $(CFLAGS) $< -lhv $(STLIBNAME)

example-async-libsdevent: async-libsdevent.c $(STLIBNAME)
$(CC) -o $@ $(CFLAGS) $< -lsdevent $(STLIBNAME)
$(CC) -o $@ $(CFLAGS) $< -lsystemd $(STLIBNAME)

example-async-glib: async-glib.c $(STLIBNAME)
$(CC) -o $@ $(CFLAGS) $< $(shell pkg-config --cflags --libs glib-2.0) $(STLIBNAME)
Expand All @@ -69,7 +69,9 @@ example-async-ae:
@false
else
example-async-ae: async-ae.c $(STLIBNAME)
$(CC) -o $@ $(CFLAGS) $(LDFLAGS) -I$(AE_DIR) $< $(AE_DIR)/ae.o $(AE_DIR)/zmalloc.o $(AE_DIR)/../deps/jemalloc/lib/libjemalloc.a -pthread $(STLIBNAME)
$(CC) -o $@ $(CFLAGS) $(LDFLAGS) -I$(AE_DIR) $< $(AE_DIR)/ae.o $(AE_DIR)/zmalloc.o \
$(AE_DIR)/monotonic.o $(AE_DIR)/anet.o $(AE_DIR)/serverassert.o $(AE_DIR)/../deps/jemalloc/lib/libjemalloc.a \
-pthread $(STLIBNAME)
endif

ifndef LIBUV_DIR
Expand Down
13 changes: 7 additions & 6 deletions include/valkey/adapters/ae.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,19 +134,20 @@ static int valkeyAeAttach(aeEventLoop *loop, valkeyAsyncContext *ac) {
return VALKEY_OK;
}

static int valkeyAeAttach_link(valkeyAsyncContext *ac, void *base) {
return valkeyAeAttach((aeEventLoop *)base, ac);
/* Internal adapter function with correct function signature. */
static int valkeyAeAttachAdapter(valkeyAsyncContext *ac, void *loop) {
return valkeyAeAttach((aeEventLoop *)loop, ac);
}

VALKEY_UNUSED
static int valkeyClusterAeAttach(aeEventLoop *loop,
valkeyClusterAsyncContext *acc) {
static int valkeyClusterAeAttach(valkeyClusterAsyncContext *acc,
aeEventLoop *loop) {
if (acc == NULL || loop == NULL) {
return VALKEY_ERR;
}

acc->adapter = loop;
acc->attach_fn = valkeyAeAttach_link;
acc->attach_fn = valkeyAeAttachAdapter;
acc->attach_data = loop;
return VALKEY_OK;
}
#endif /* VALKEY_ADAPTERS_AE_H */
18 changes: 7 additions & 11 deletions include/valkey/adapters/glib.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,27 +144,23 @@ valkey_source_new(valkeyAsyncContext *ac) {
return (GSource *)source;
}

typedef struct valkeyClusterGlibAdapter {
GMainContext *context;
} valkeyClusterGlibAdapter;

static int valkeyGlibAttach_link(valkeyAsyncContext *ac, void *adapter) {
GMainContext *context = ((valkeyClusterGlibAdapter *)adapter)->context;
if (g_source_attach(valkey_source_new(ac), context) > 0) {
/* Internal adapter function with correct function signature. */
static int valkeyGlibAttachAdapter(valkeyAsyncContext *ac, void *context) {
if (g_source_attach(valkey_source_new(ac), (GMainContext *)context) > 0) {
return VALKEY_OK;
}
return VALKEY_ERR;
}

VALKEY_UNUSED
static int valkeyClusterGlibAttach(valkeyClusterAsyncContext *acc,
valkeyClusterGlibAdapter *adapter) {
if (acc == NULL || adapter == NULL) {
GMainContext *context) {
if (acc == NULL) { // A NULL context is accepted.
return VALKEY_ERR;
}

acc->adapter = adapter;
acc->attach_fn = valkeyGlibAttach_link;
acc->attach_fn = valkeyGlibAttachAdapter;
acc->attach_data = context;
return VALKEY_OK;
}

Expand Down
17 changes: 17 additions & 0 deletions include/valkey/adapters/ivykis.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#ifndef VALKEY_ADAPTERS_IVYKIS_H
#define VALKEY_ADAPTERS_IVYKIS_H
#include "../async.h"
#include "../cluster.h"
#include "../valkey.h"

#include <iv.h>
Expand Down Expand Up @@ -82,4 +83,20 @@ static int valkeyIvykisAttach(valkeyAsyncContext *ac) {

return VALKEY_OK;
}

/* Internal adapter function with correct function signature. */
static int valkeyClusterIvykisAttachAdapter(valkeyAsyncContext *ac, VALKEY_UNUSED void *) {
return valkeyIvykisAttach(ac);
}

VALKEY_UNUSED
static int valkeyClusterIvykisAttach(valkeyClusterAsyncContext *acc) {
if (acc == NULL) {
return VALKEY_ERR;
}

acc->attach_fn = valkeyClusterIvykisAttachAdapter;
return VALKEY_OK;
}

#endif /* VALKEY_ADAPTERS_IVYKIS_H */
9 changes: 5 additions & 4 deletions include/valkey/adapters/libev.h
Original file line number Diff line number Diff line change
Expand Up @@ -187,19 +187,20 @@ static int valkeyLibevAttach(EV_P_ valkeyAsyncContext *ac) {
return VALKEY_OK;
}

static int valkeyLibevAttach_link(valkeyAsyncContext *ac, void *loop) {
/* Internal adapter function with correct function signature. */
static int valkeyLibevAttachAdapter(valkeyAsyncContext *ac, void *loop) {
return valkeyLibevAttach((struct ev_loop *)loop, ac);
}

VALKEY_UNUSED
static int valkeyClusterLibevAttach(valkeyClusterAsyncContext *acc,
struct ev_loop *loop) {
if (loop == NULL || acc == NULL) {
if (acc == NULL || loop == NULL) {
return VALKEY_ERR;
}

acc->adapter = loop;
acc->attach_fn = valkeyLibevAttach_link;
acc->attach_fn = valkeyLibevAttachAdapter;
acc->attach_data = loop;
return VALKEY_OK;
}

Expand Down
8 changes: 4 additions & 4 deletions include/valkey/adapters/libevent.h
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,8 @@ static int valkeyLibeventAttach(valkeyAsyncContext *ac, struct event_base *base)
return VALKEY_OK;
}

VALKEY_UNUSED
static int valkeyLibeventAttach_link(valkeyAsyncContext *ac, void *base) {
/* Internal adapter function with correct function signature. */
static int valkeyLibeventAttachAdapter(valkeyAsyncContext *ac, void *base) {
return valkeyLibeventAttach(ac, (struct event_base *)base);
}

Expand All @@ -188,8 +188,8 @@ static int valkeyClusterLibeventAttach(valkeyClusterAsyncContext *acc,
return VALKEY_ERR;
}

acc->adapter = base;
acc->attach_fn = valkeyLibeventAttach_link;
acc->attach_fn = valkeyLibeventAttachAdapter;
acc->attach_data = base;
return VALKEY_OK;
}
#endif /* VALKEY_ADAPTERS_LIBEVENT_H */
19 changes: 19 additions & 0 deletions include/valkey/adapters/libhv.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#define VALKEY_ADAPTERS_LIBHV_H

#include "../async.h"
#include "../cluster.h"
#include "../valkey.h"

#include <hv/hloop.h>
Expand Down Expand Up @@ -121,4 +122,22 @@ static int valkeyLibhvAttach(valkeyAsyncContext *ac, hloop_t *loop) {

return VALKEY_OK;
}

/* Internal adapter function with correct function signature. */
static int valkeyLibhvAttachAdapter(valkeyAsyncContext *ac, void *loop) {
return valkeyLibhvAttach(ac, (hloop_t *)loop);
}

VALKEY_UNUSED
static int valkeyClusterLibhvAttach(valkeyClusterAsyncContext *acc,
hloop_t *loop) {
if (acc == NULL || loop == NULL) {
return VALKEY_ERR;
}

acc->attach_fn = valkeyLibhvAttachAdapter;
acc->attach_data = loop;
return VALKEY_OK;
}

#endif /* VALKEY_ADAPTERS_LIBHV_H */
19 changes: 19 additions & 0 deletions include/valkey/adapters/libsdevent.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#ifndef VALKEY_ADAPTERS_LIBSDEVENT_H
#define VALKEY_ADAPTERS_LIBSDEVENT_H
#include "../async.h"
#include "../cluster.h"
#include "../valkey.h"

#include <systemd/sd-event.h>
Expand Down Expand Up @@ -176,4 +177,22 @@ static int valkeyLibsdeventAttach(valkeyAsyncContext *ac, struct sd_event *event

return VALKEY_OK;
}

/* Internal adapter function with correct function signature. */
static int valkeyLibsdeventAttachAdapter(valkeyAsyncContext *ac, void *event) {
return valkeyLibsdeventAttach(ac, (struct sd_event *)event);
}

VALKEY_UNUSED
static int valkeyClusterLibsdeventAttach(valkeyClusterAsyncContext *acc,
struct sd_event *event) {
if (acc == NULL || event == NULL) {
return VALKEY_ERR;
}

acc->attach_fn = valkeyLibsdeventAttachAdapter;
acc->attach_data = event;
return VALKEY_OK;
}

#endif /* VALKEY_ADAPTERS_LIBSDEVENT_H */
7 changes: 4 additions & 3 deletions include/valkey/adapters/libuv.h
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,8 @@ static int valkeyLibuvAttach(valkeyAsyncContext *ac, uv_loop_t *loop) {
return VALKEY_OK;
}

static int valkeyLibuvAttach_link(valkeyAsyncContext *ac, void *loop) {
/* Internal adapter function with correct function signature. */
static int valkeyLibuvAttachAdapter(valkeyAsyncContext *ac, void *loop) {
return valkeyLibuvAttach(ac, (uv_loop_t *)loop);
}

Expand All @@ -208,8 +209,8 @@ static int valkeyClusterLibuvAttach(valkeyClusterAsyncContext *acc,
return VALKEY_ERR;
}

acc->adapter = loop;
acc->attach_fn = valkeyLibuvAttach_link;
acc->attach_fn = valkeyLibuvAttachAdapter;
acc->attach_data = loop;
return VALKEY_OK;
}
#endif /* VALKEY_ADAPTERS_LIBUV_H */
18 changes: 18 additions & 0 deletions include/valkey/adapters/macosx.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#define VALKEY_ADAPTERS_MACOSX_H

#include "../async.h"
#include "../cluster.h"
#include "../valkey.h"

#include <CoreFoundation/CoreFoundation.h>
Expand Down Expand Up @@ -142,4 +143,21 @@ static int valkeyMacOSAttach(valkeyAsyncContext *valkeyAsyncCtx, CFRunLoopRef ru
return VALKEY_OK;
}

/* Internal adapter function with correct function signature. */
static int valkeyMacOSAttachAdapter(valkeyAsyncContext *ac, void *loop) {
return valkeyMacOSAttach(ac, (CFRunLoopRef)loop);
}

VALKEY_UNUSED
static int valkeyClusterMacOSAttach(valkeyClusterAsyncContext *acc,
CFRunLoopRef loop) {
if (acc == NULL || loop == NULL) {
return VALKEY_ERR;
}

acc->attach_fn = valkeyMacOSAttachAdapter;
acc->attach_data = loop;
return VALKEY_OK;
}

#endif /* VALKEY_ADAPTERS_MACOSX_H */
17 changes: 17 additions & 0 deletions include/valkey/adapters/poll.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#define VALKEY_ADAPTERS_POLL_H

#include "../async.h"
#include "../cluster.h"
#include "../sockcompat.h"

#include <errno.h>
Expand Down Expand Up @@ -194,4 +195,20 @@ static int valkeyPollAttach(valkeyAsyncContext *ac) {

return VALKEY_OK;
}

/* Internal adapter function with correct function signature. */
static int valkeyPollAttachAdapter(valkeyAsyncContext *ac, VALKEY_UNUSED void *unused) {
return valkeyPollAttach(ac);
}

VALKEY_UNUSED
static int valkeyClusterPollAttach(valkeyClusterAsyncContext *acc) {
if (acc == NULL) {
return VALKEY_ERR;
}

acc->attach_fn = valkeyPollAttachAdapter;
return VALKEY_OK;
}

#endif /* VALKEY_ADAPTERS_POLL_H */
6 changes: 3 additions & 3 deletions include/valkey/cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ struct hilist;
struct valkeyClusterAsyncContext;
struct valkeyTLSContext;

typedef int(adapterAttachFn)(valkeyAsyncContext *, void *);
typedef void(valkeyClusterCallbackFn)(struct valkeyClusterAsyncContext *,
void *, void *);
typedef struct valkeyClusterNode {
Expand Down Expand Up @@ -135,8 +134,9 @@ typedef struct valkeyClusterAsyncContext {

int64_t lastSlotmapUpdateAttempt; /* Timestamp */

void *adapter; /* Adapter to the async event library */
adapterAttachFn *attach_fn; /* Func ptr for attaching the async library */
/* Attach function for an async library. */
int (*attach_fn)(valkeyAsyncContext *ac, void *attach_data);
void *attach_data;

/* Called when either the connection is terminated due to an error or per
* user request. The status is set accordingly (VALKEY_OK, VALKEY_ERR). */
Expand Down
8 changes: 4 additions & 4 deletions src/cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -2909,8 +2909,8 @@ valkeyClusterGetValkeyAsyncContext(valkeyClusterAsyncContext *acc,
}
}

if (acc->adapter) {
ret = acc->attach_fn(ac, acc->adapter);
if (acc->attach_fn) {
ret = acc->attach_fn(ac, acc->attach_data);
if (ret != VALKEY_OK) {
valkeyClusterAsyncSetError(acc, VALKEY_ERR_OTHER,
"Failed to attach event adapter");
Expand Down Expand Up @@ -2975,8 +2975,8 @@ valkeyClusterAsyncContext *valkeyClusterAsyncConnect(const char *addrs,
}

int valkeyClusterAsyncConnect2(valkeyClusterAsyncContext *acc) {
/* An adapter to an async event library is required. */
if (acc->adapter == NULL) {
/* An attach function for an async event library is required. */
if (acc->attach_fn == NULL) {
return VALKEY_ERR;
}
return updateSlotMapAsync(acc, NULL /*any node*/);
Expand Down
14 changes: 7 additions & 7 deletions tests/clusterclient_async.c
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,8 @@ void replyCallback(valkeyClusterAsyncContext *acc, void *r, void *privdata) {

if (--num_running == 0) {
/* Schedule a read from stdin and send next command */
event_base_once(acc->adapter, -1, EV_TIMEOUT, sendNextCommand, acc,
NULL);
struct event_base *base = acc->attach_data;
event_base_once(base, -1, EV_TIMEOUT, sendNextCommand, acc, NULL);
}
}

Expand All @@ -125,8 +125,8 @@ void sendNextCommand(evutil_socket_t fd, short kind, void *arg) {
if (strcmp(cmd, "!sleep") == 0) {
ASSERT_MSG(async == 0, "!sleep in !async not supported");
struct timeval timeout = {1, 0};
event_base_once(acc->adapter, -1, EV_TIMEOUT, sendNextCommand,
acc, &timeout);
struct event_base *base = acc->attach_data;
event_base_once(base, -1, EV_TIMEOUT, sendNextCommand, acc, &timeout);
return;
}
if (strcmp(cmd, "!async") == 0) /* Enable async send */
Expand Down Expand Up @@ -172,8 +172,8 @@ void sendNextCommand(evutil_socket_t fd, short kind, void *arg) {
printf("error: %s\n", acc->errstr);

/* Schedule a read from stdin and handle next command. */
event_base_once(acc->adapter, -1, EV_TIMEOUT, sendNextCommand,
acc, NULL);
struct event_base *base = acc->attach_data;
event_base_once(base, -1, EV_TIMEOUT, sendNextCommand, acc, NULL);
}
}

Expand Down Expand Up @@ -275,7 +275,7 @@ int main(int argc, char **argv) {
assert(status == VALKEY_OK);

/* Schedule a read from stdin and send next command */
event_base_once(acc->adapter, -1, EV_TIMEOUT, sendNextCommand, acc, NULL);
event_base_once(base, -1, EV_TIMEOUT, sendNextCommand, acc, NULL);

event_base_dispatch(base);

Expand Down
Loading

0 comments on commit 952b59b

Please sign in to comment.