diff --git a/Makefile.am b/Makefile.am index ac990a569..be69c655a 100644 --- a/Makefile.am +++ b/Makefile.am @@ -150,6 +150,7 @@ libraft_la_SOURCES += \ src/uv_tcp_listen.c \ src/uv_tcp_connect.c \ src/uv_truncate.c \ + src/uv_work.c \ src/uv_writer.c libraft_la_LDFLAGS += $(UV_LIBS) @@ -198,7 +199,8 @@ test_integration_uv_SOURCES = \ test/integration/test_uv_tcp_connect.c \ test/integration/test_uv_tcp_listen.c \ test/integration/test_uv_snapshot_put.c \ - test/integration/test_uv_truncate.c + test/integration/test_uv_truncate.c \ + test/integration/test_uv_work.c test_integration_uv_CFLAGS = $(AM_CFLAGS) -Wno-type-limits -Wno-conversion test_integration_uv_LDFLAGS = -no-install $(UV_LIBS) test_integration_uv_LDADD = libtest.la diff --git a/include/raft.h b/include/raft.h index 3d7960d87..ac8789908 100644 --- a/include/raft.h +++ b/include/raft.h @@ -417,6 +417,20 @@ struct raft_io_snapshot_get raft_io_snapshot_get_cb cb; /* Request callback */ }; +/** + * Asynchronous work request. + */ +struct raft_io_async_work; +typedef int (*raft_io_async_work_fn)(struct raft_io_async_work *req); +typedef void (*raft_io_async_work_cb)(struct raft_io_async_work *req, + int status); +struct raft_io_async_work +{ + void *data; /* User data */ + raft_io_async_work_fn work; /* Function to run async from the main loop */ + raft_io_async_work_cb cb; /* Request callback */ +}; + /** * Customizable tracer, for debugging purposes. */ @@ -458,7 +472,7 @@ typedef void (*raft_io_close_cb)(struct raft_io *io); struct raft_io { - int version; + int version; /* 1 or 2 */ void *data; void *impl; char errmsg[RAFT_ERRMSG_BUF_SIZE]; @@ -499,6 +513,10 @@ struct raft_io raft_io_snapshot_get_cb cb); raft_time (*time)(struct raft_io *io); int (*random)(struct raft_io *io, int min, int max); + /* Field(s) below added since version 2. */ + int (*async_work)(struct raft_io *io, + struct raft_io_async_work *req, + raft_io_async_work_cb cb); }; /* diff --git a/include/raft/fixture.h b/include/raft/fixture.h index e28251830..de62bd648 100644 --- a/include/raft/fixture.h +++ b/include/raft/fixture.h @@ -16,7 +16,8 @@ enum { RAFT_FIXTURE_TICK = 1, /* The tick callback has been invoked */ RAFT_FIXTURE_NETWORK, /* A network request has been sent or received */ - RAFT_FIXTURE_DISK /* An I/O request has been submitted */ + RAFT_FIXTURE_DISK, /* An I/O request has been submitted */ + RAFT_FIXTURE_WORK /* A large, CPU and/or memory intensive task */ }; /** diff --git a/src/fixture.c b/src/fixture.c index f324b8541..9757f6126 100644 --- a/src/fixture.c +++ b/src/fixture.c @@ -21,6 +21,7 @@ #define ELECTION_TIMEOUT 1000 #define NETWORK_LATENCY 15 #define DISK_LATENCY 10 +#define WORK_DURATION 200 /* To keep in sync with raft.h */ #define N_MESSAGE_TYPES 6 @@ -36,7 +37,7 @@ queue queue /* Link the I/O pending requests queue. */ /* Request type codes. */ -enum { APPEND = 1, SEND, TRANSMIT, SNAPSHOT_PUT, SNAPSHOT_GET }; +enum { APPEND = 1, SEND, TRANSMIT, SNAPSHOT_PUT, SNAPSHOT_GET, ASYNC_WORK }; /* Abstract base type for an asynchronous request submitted to the stub I/o * implementation. */ @@ -72,6 +73,13 @@ struct snapshot_put const struct raft_snapshot *snapshot; }; +/* Pending request to perform general work. */ +struct async_work +{ + REQUEST; + struct raft_io_async_work *req; +}; + /* Pending request to load a snapshot. */ struct snapshot_get { @@ -131,6 +139,7 @@ struct io unsigned randomized_election_timeout; /* Value returned by io->random() */ unsigned network_latency; /* Milliseconds to deliver RPCs */ unsigned disk_latency; /* Milliseconds to perform disk I/O */ + unsigned work_duration; /* Milliseconds to long running work */ struct { @@ -277,6 +286,16 @@ static void ioFlushSnapshotGet(struct io *s, struct snapshot_get *r) raft_free(r); } +/* Flush an async work request */ +static void ioFlushAsyncWork(struct io *s, struct async_work *r) +{ + (void) s; + int rv; + rv = r->req->work(r->req); + r->req->cb(r->req, rv); + raft_free(r); +} + /* Search for the peer with the given ID. */ static struct peer *ioGetPeer(struct io *io, raft_id id) { @@ -410,6 +429,9 @@ static void ioFlushAll(struct io *io) case SNAPSHOT_GET: ioFlushSnapshotGet(io, (struct snapshot_get *)r); break; + case ASYNC_WORK: + ioFlushAsyncWork(io, (struct async_work *)r); + break; default: assert(0); } @@ -646,6 +668,28 @@ static int ioMethodSnapshotPut(struct raft_io *raft_io, return 0; } +static int ioMethodAsyncWork(struct raft_io *raft_io, + struct raft_io_async_work *req, + raft_io_async_work_cb cb) +{ + struct io *io = raft_io->impl; + struct raft *raft = raft_io->data; + struct async_work *r; + + r = raft_malloc(sizeof *r); + assert(r != NULL); + + r->type = ASYNC_WORK; + r->req = req; + r->req->cb = cb; + r->req->data = raft; + r->completion_time = *io->time + io->work_duration; + + QUEUE_PUSH(&io->requests, &r->queue); + return 0; +} + + static int ioMethodSnapshotGet(struct raft_io *raft_io, struct raft_io_snapshot_get *req, raft_io_snapshot_get_cb cb) @@ -848,6 +892,7 @@ static int ioInit(struct raft_io *raft_io, unsigned index, raft_time *time) io->randomized_election_timeout = ELECTION_TIMEOUT + index * 100; io->network_latency = NETWORK_LATENCY; io->disk_latency = DISK_LATENCY; + io->work_duration = WORK_DURATION; io->fault.countdown = -1; io->fault.n = -1; memset(io->drop, 0, sizeof io->drop); @@ -868,6 +913,7 @@ static int ioInit(struct raft_io *raft_io, unsigned index, raft_time *time) raft_io->truncate = ioMethodTruncate; raft_io->send = ioMethodSend; raft_io->snapshot_put = ioMethodSnapshotPut; + raft_io->async_work = ioMethodAsyncWork; raft_io->snapshot_get = ioMethodSnapshotGet; raft_io->time = ioMethodTime; raft_io->random = ioMethodRandom; @@ -1378,6 +1424,10 @@ static void completeRequest(struct raft_fixture *f, unsigned i, raft_time t) ioFlushSnapshotGet(io, (struct snapshot_get *)r); f->event.type = RAFT_FIXTURE_DISK; break; + case ASYNC_WORK: + ioFlushAsyncWork(io, (struct async_work *)r); + f->event.type = RAFT_FIXTURE_WORK; + break; default: assert(0); } diff --git a/src/uv.c b/src/uv.c index 31744977d..23a04f0d1 100644 --- a/src/uv.c +++ b/src/uv.c @@ -185,6 +185,9 @@ void uvMaybeFireCloseCb(struct uv *uv) if (!QUEUE_IS_EMPTY(&uv->snapshot_get_reqs)) { return; } + if (!QUEUE_IS_EMPTY(&uv->async_work_reqs)) { + return; + } if (!QUEUE_IS_EMPTY(&uv->aborting)) { return; } @@ -671,6 +674,7 @@ int raft_uv_init(struct raft_io *io, uv->finalize_work.data = NULL; uv->truncate_work.data = NULL; QUEUE_INIT(&uv->snapshot_get_reqs); + QUEUE_INIT(&uv->async_work_reqs); uv->snapshot_put_work.data = NULL; uv->timer.data = NULL; uv->tick_cb = NULL; /* Set by raft_io->start() */ @@ -696,6 +700,7 @@ int raft_uv_init(struct raft_io *io, io->send = UvSend; io->snapshot_put = UvSnapshotPut; io->snapshot_get = UvSnapshotGet; + io->async_work = UvAsyncWork; io->time = uvTime; io->random = uvRandom; diff --git a/src/uv.h b/src/uv.h index b0f79efb7..edb2de973 100644 --- a/src/uv.h +++ b/src/uv.h @@ -83,6 +83,7 @@ struct uv struct uv_work_s finalize_work; /* Resize and rename segments */ struct uv_work_s truncate_work; /* Execute truncate log requests */ queue snapshot_get_reqs; /* Inflight get snapshot requests */ + queue async_work_reqs; /* Inflight async work requests */ struct uv_work_s snapshot_put_work; /* Execute snapshot put requests */ struct uvMetadata metadata; /* Cache of metadata on disk */ struct uv_timer_s timer; /* Timer for periodic ticks */ @@ -273,6 +274,12 @@ int UvSnapshotGet(struct raft_io *io, struct raft_io_snapshot_get *req, raft_io_snapshot_get_cb cb); + +/* Implementation of raft_io->async_work (defined in uv_work.c). */ +int UvAsyncWork(struct raft_io *io, + struct raft_io_async_work *req, + raft_io_async_work_cb cb); + /* Return a list of all snapshots and segments found in the data directory. Both * snapshots and segments are ordered by filename (closed segments come before * open ones). */ diff --git a/src/uv_work.c b/src/uv_work.c new file mode 100644 index 000000000..1cb6e4fe2 --- /dev/null +++ b/src/uv_work.c @@ -0,0 +1,80 @@ +#include "assert.h" +#include "heap.h" +#include "uv.h" + +#define tracef(...) Tracef(uv->tracer, __VA_ARGS__) + +struct uvAsyncWork +{ + struct uv *uv; + struct raft_io_async_work *req; + struct uv_work_s work; + int status; + queue queue; +}; + +static void uvAsyncWorkCb(uv_work_t *work) +{ + struct uvAsyncWork *w = work->data; + assert(w != NULL); + int rv; + rv = w->req->work(w->req); + w->status = rv; +} + +static void uvAsyncAfterWorkCb(uv_work_t *work, int status) +{ + struct uvAsyncWork *w = work->data; + struct raft_io_async_work *req = w->req; + int req_status = w->status; + struct uv *uv = w->uv; + assert(status == 0); + + QUEUE_REMOVE(&w->queue); + RaftHeapFree(w); + req->cb(req, req_status); + uvMaybeFireCloseCb(uv); +} + +int UvAsyncWork(struct raft_io *io, + struct raft_io_async_work *req, + raft_io_async_work_cb cb) +{ + struct uv *uv; + struct uvAsyncWork *async_work; + int rv; + + uv = io->impl; + assert(!uv->closing); + + async_work = RaftHeapMalloc(sizeof *async_work); + if (async_work == NULL) { + rv = RAFT_NOMEM; + goto err; + } + + async_work->uv = uv; + async_work->req = req; + async_work->work.data = async_work; + req->cb = cb; + + QUEUE_PUSH(&uv->async_work_reqs, &async_work->queue); + rv = uv_queue_work(uv->loop, &async_work->work, uvAsyncWorkCb, + uvAsyncAfterWorkCb); + if (rv != 0) { + QUEUE_REMOVE(&async_work->queue); + tracef("async work: %s", uv_strerror(rv)); + rv = RAFT_IOERR; + goto err_after_req_alloc; + } + + return 0; + +err_after_req_alloc: + RaftHeapFree(async_work); +err: + assert(rv != 0); + return rv; +} + +#undef tracef diff --git a/test/integration/test_uv_work.c b/test/integration/test_uv_work.c new file mode 100644 index 000000000..6c7bd181d --- /dev/null +++ b/test/integration/test_uv_work.c @@ -0,0 +1,103 @@ +#include + +#include "../lib/dir.h" +#include "../lib/loop.h" +#include "../lib/runner.h" +#include "../lib/uv.h" +#include "../../src/uv.h" + +/****************************************************************************** + * + * Fixture + * + *****************************************************************************/ + +struct fixture +{ + FIXTURE_UV_DEPS; + FIXTURE_UV; +}; + +struct result +{ + int rv; /* Indicate success or failure of the work */ + int counter; /* Proof that work was performed */ + bool done; /* To check test termination */ +}; + +/****************************************************************************** + * + * Set up and tear down. + * + *****************************************************************************/ + +static void *setUp(const MunitParameter params[], void *user_data) +{ + struct fixture *f = munit_malloc(sizeof *f); + SETUP_UV_DEPS; + SETUP_UV; + return f; +} + +static void tearDownDeps(void *data) +{ + struct fixture *f = data; + if (f == NULL) { + return; + } + TEAR_DOWN_UV_DEPS; + free(f); +} + +static void tearDown(void *data) +{ + struct fixture *f = data; + if (f == NULL) { + return; + } + TEAR_DOWN_UV; + tearDownDeps(f); +} + +/****************************************************************************** + * + * UvAsyncWork + * + *****************************************************************************/ + +static void asyncWorkCbAssertResult(struct raft_io_async_work *req, int status) +{ + struct result *r = req->data; + munit_assert_int(status, ==, r->rv); + munit_assert_int(r->counter, ==, 1); + r->done = true; +} + +static int asyncWorkFn(struct raft_io_async_work *req) +{ + struct result *r = req->data; + sleep(1); + r->counter = 1; + return r->rv; +} + +SUITE(UvAsyncWork) + +static char* rvs[] = { "-1", "0", "1", "37", NULL }; +static MunitParameterEnum rvs_params[] = { + { "rv", rvs }, + { NULL, NULL }, +}; + +TEST(UvAsyncWork, work, setUp, tearDown, 0, rvs_params) +{ + struct fixture *f = data; + struct result res = {0}; + struct raft_io_async_work req = {0}; + res.rv = (int)strtol(munit_parameters_get(params, "rv"), NULL, 0); + req.data = &res; + req.work = asyncWorkFn; + UvAsyncWork(&f->io, &req, asyncWorkCbAssertResult); + LOOP_RUN_UNTIL(&res.done); + return MUNIT_OK; +}