From 9e12b0627b75d930656fd249b52f5912906909af Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mathieu=20Border=C3=A9?= Date: Tue, 22 Mar 2022 11:11:20 +0100 Subject: [PATCH 1/2] raft_io: add async_work method Allows raft to perform a potentially long running task asynchronously from the main loop. Users of this function should take the necessary thread-safety measures when making use of this function as the work can run in parallell to the main thread. --- Makefile.am | 4 +- include/raft.h | 20 ++++++- include/raft/fixture.h | 3 +- src/fixture.c | 52 +++++++++++++++- src/uv.c | 5 ++ src/uv.h | 7 +++ src/uv_work.c | 80 +++++++++++++++++++++++++ test/integration/test_uv_work.c | 103 ++++++++++++++++++++++++++++++++ 8 files changed, 270 insertions(+), 4 deletions(-) create mode 100644 src/uv_work.c create mode 100644 test/integration/test_uv_work.c diff --git a/Makefile.am b/Makefile.am index ac990a569..be69c655a 100644 --- a/Makefile.am +++ b/Makefile.am @@ -150,6 +150,7 @@ libraft_la_SOURCES += \ src/uv_tcp_listen.c \ src/uv_tcp_connect.c \ src/uv_truncate.c \ + src/uv_work.c \ src/uv_writer.c libraft_la_LDFLAGS += $(UV_LIBS) @@ -198,7 +199,8 @@ test_integration_uv_SOURCES = \ test/integration/test_uv_tcp_connect.c \ test/integration/test_uv_tcp_listen.c \ test/integration/test_uv_snapshot_put.c \ - test/integration/test_uv_truncate.c + test/integration/test_uv_truncate.c \ + test/integration/test_uv_work.c test_integration_uv_CFLAGS = $(AM_CFLAGS) -Wno-type-limits -Wno-conversion test_integration_uv_LDFLAGS = -no-install $(UV_LIBS) test_integration_uv_LDADD = libtest.la diff --git a/include/raft.h b/include/raft.h index 3d7960d87..ac8789908 100644 --- a/include/raft.h +++ b/include/raft.h @@ -417,6 +417,20 @@ struct raft_io_snapshot_get raft_io_snapshot_get_cb cb; /* Request callback */ }; +/** + * Asynchronous work request. + */ +struct raft_io_async_work; +typedef int (*raft_io_async_work_fn)(struct raft_io_async_work *req); +typedef void (*raft_io_async_work_cb)(struct raft_io_async_work *req, + int status); +struct raft_io_async_work +{ + void *data; /* User data */ + raft_io_async_work_fn work; /* Function to run async from the main loop */ + raft_io_async_work_cb cb; /* Request callback */ +}; + /** * Customizable tracer, for debugging purposes. */ @@ -458,7 +472,7 @@ typedef void (*raft_io_close_cb)(struct raft_io *io); struct raft_io { - int version; + int version; /* 1 or 2 */ void *data; void *impl; char errmsg[RAFT_ERRMSG_BUF_SIZE]; @@ -499,6 +513,10 @@ struct raft_io raft_io_snapshot_get_cb cb); raft_time (*time)(struct raft_io *io); int (*random)(struct raft_io *io, int min, int max); + /* Field(s) below added since version 2. */ + int (*async_work)(struct raft_io *io, + struct raft_io_async_work *req, + raft_io_async_work_cb cb); }; /* diff --git a/include/raft/fixture.h b/include/raft/fixture.h index e28251830..de62bd648 100644 --- a/include/raft/fixture.h +++ b/include/raft/fixture.h @@ -16,7 +16,8 @@ enum { RAFT_FIXTURE_TICK = 1, /* The tick callback has been invoked */ RAFT_FIXTURE_NETWORK, /* A network request has been sent or received */ - RAFT_FIXTURE_DISK /* An I/O request has been submitted */ + RAFT_FIXTURE_DISK, /* An I/O request has been submitted */ + RAFT_FIXTURE_WORK /* A large, CPU and/or memory intensive task */ }; /** diff --git a/src/fixture.c b/src/fixture.c index f324b8541..9757f6126 100644 --- a/src/fixture.c +++ b/src/fixture.c @@ -21,6 +21,7 @@ #define ELECTION_TIMEOUT 1000 #define NETWORK_LATENCY 15 #define DISK_LATENCY 10 +#define WORK_DURATION 200 /* To keep in sync with raft.h */ #define N_MESSAGE_TYPES 6 @@ -36,7 +37,7 @@ queue queue /* Link the I/O pending requests queue. */ /* Request type codes. */ -enum { APPEND = 1, SEND, TRANSMIT, SNAPSHOT_PUT, SNAPSHOT_GET }; +enum { APPEND = 1, SEND, TRANSMIT, SNAPSHOT_PUT, SNAPSHOT_GET, ASYNC_WORK }; /* Abstract base type for an asynchronous request submitted to the stub I/o * implementation. */ @@ -72,6 +73,13 @@ struct snapshot_put const struct raft_snapshot *snapshot; }; +/* Pending request to perform general work. */ +struct async_work +{ + REQUEST; + struct raft_io_async_work *req; +}; + /* Pending request to load a snapshot. */ struct snapshot_get { @@ -131,6 +139,7 @@ struct io unsigned randomized_election_timeout; /* Value returned by io->random() */ unsigned network_latency; /* Milliseconds to deliver RPCs */ unsigned disk_latency; /* Milliseconds to perform disk I/O */ + unsigned work_duration; /* Milliseconds to long running work */ struct { @@ -277,6 +286,16 @@ static void ioFlushSnapshotGet(struct io *s, struct snapshot_get *r) raft_free(r); } +/* Flush an async work request */ +static void ioFlushAsyncWork(struct io *s, struct async_work *r) +{ + (void) s; + int rv; + rv = r->req->work(r->req); + r->req->cb(r->req, rv); + raft_free(r); +} + /* Search for the peer with the given ID. */ static struct peer *ioGetPeer(struct io *io, raft_id id) { @@ -410,6 +429,9 @@ static void ioFlushAll(struct io *io) case SNAPSHOT_GET: ioFlushSnapshotGet(io, (struct snapshot_get *)r); break; + case ASYNC_WORK: + ioFlushAsyncWork(io, (struct async_work *)r); + break; default: assert(0); } @@ -646,6 +668,28 @@ static int ioMethodSnapshotPut(struct raft_io *raft_io, return 0; } +static int ioMethodAsyncWork(struct raft_io *raft_io, + struct raft_io_async_work *req, + raft_io_async_work_cb cb) +{ + struct io *io = raft_io->impl; + struct raft *raft = raft_io->data; + struct async_work *r; + + r = raft_malloc(sizeof *r); + assert(r != NULL); + + r->type = ASYNC_WORK; + r->req = req; + r->req->cb = cb; + r->req->data = raft; + r->completion_time = *io->time + io->work_duration; + + QUEUE_PUSH(&io->requests, &r->queue); + return 0; +} + + static int ioMethodSnapshotGet(struct raft_io *raft_io, struct raft_io_snapshot_get *req, raft_io_snapshot_get_cb cb) @@ -848,6 +892,7 @@ static int ioInit(struct raft_io *raft_io, unsigned index, raft_time *time) io->randomized_election_timeout = ELECTION_TIMEOUT + index * 100; io->network_latency = NETWORK_LATENCY; io->disk_latency = DISK_LATENCY; + io->work_duration = WORK_DURATION; io->fault.countdown = -1; io->fault.n = -1; memset(io->drop, 0, sizeof io->drop); @@ -868,6 +913,7 @@ static int ioInit(struct raft_io *raft_io, unsigned index, raft_time *time) raft_io->truncate = ioMethodTruncate; raft_io->send = ioMethodSend; raft_io->snapshot_put = ioMethodSnapshotPut; + raft_io->async_work = ioMethodAsyncWork; raft_io->snapshot_get = ioMethodSnapshotGet; raft_io->time = ioMethodTime; raft_io->random = ioMethodRandom; @@ -1378,6 +1424,10 @@ static void completeRequest(struct raft_fixture *f, unsigned i, raft_time t) ioFlushSnapshotGet(io, (struct snapshot_get *)r); f->event.type = RAFT_FIXTURE_DISK; break; + case ASYNC_WORK: + ioFlushAsyncWork(io, (struct async_work *)r); + f->event.type = RAFT_FIXTURE_WORK; + break; default: assert(0); } diff --git a/src/uv.c b/src/uv.c index 31744977d..23a04f0d1 100644 --- a/src/uv.c +++ b/src/uv.c @@ -185,6 +185,9 @@ void uvMaybeFireCloseCb(struct uv *uv) if (!QUEUE_IS_EMPTY(&uv->snapshot_get_reqs)) { return; } + if (!QUEUE_IS_EMPTY(&uv->async_work_reqs)) { + return; + } if (!QUEUE_IS_EMPTY(&uv->aborting)) { return; } @@ -671,6 +674,7 @@ int raft_uv_init(struct raft_io *io, uv->finalize_work.data = NULL; uv->truncate_work.data = NULL; QUEUE_INIT(&uv->snapshot_get_reqs); + QUEUE_INIT(&uv->async_work_reqs); uv->snapshot_put_work.data = NULL; uv->timer.data = NULL; uv->tick_cb = NULL; /* Set by raft_io->start() */ @@ -696,6 +700,7 @@ int raft_uv_init(struct raft_io *io, io->send = UvSend; io->snapshot_put = UvSnapshotPut; io->snapshot_get = UvSnapshotGet; + io->async_work = UvAsyncWork; io->time = uvTime; io->random = uvRandom; diff --git a/src/uv.h b/src/uv.h index b0f79efb7..edb2de973 100644 --- a/src/uv.h +++ b/src/uv.h @@ -83,6 +83,7 @@ struct uv struct uv_work_s finalize_work; /* Resize and rename segments */ struct uv_work_s truncate_work; /* Execute truncate log requests */ queue snapshot_get_reqs; /* Inflight get snapshot requests */ + queue async_work_reqs; /* Inflight async work requests */ struct uv_work_s snapshot_put_work; /* Execute snapshot put requests */ struct uvMetadata metadata; /* Cache of metadata on disk */ struct uv_timer_s timer; /* Timer for periodic ticks */ @@ -273,6 +274,12 @@ int UvSnapshotGet(struct raft_io *io, struct raft_io_snapshot_get *req, raft_io_snapshot_get_cb cb); + +/* Implementation of raft_io->async_work (defined in uv_work.c). */ +int UvAsyncWork(struct raft_io *io, + struct raft_io_async_work *req, + raft_io_async_work_cb cb); + /* Return a list of all snapshots and segments found in the data directory. Both * snapshots and segments are ordered by filename (closed segments come before * open ones). */ diff --git a/src/uv_work.c b/src/uv_work.c new file mode 100644 index 000000000..1cb6e4fe2 --- /dev/null +++ b/src/uv_work.c @@ -0,0 +1,80 @@ +#include "assert.h" +#include "heap.h" +#include "uv.h" + +#define tracef(...) Tracef(uv->tracer, __VA_ARGS__) + +struct uvAsyncWork +{ + struct uv *uv; + struct raft_io_async_work *req; + struct uv_work_s work; + int status; + queue queue; +}; + +static void uvAsyncWorkCb(uv_work_t *work) +{ + struct uvAsyncWork *w = work->data; + assert(w != NULL); + int rv; + rv = w->req->work(w->req); + w->status = rv; +} + +static void uvAsyncAfterWorkCb(uv_work_t *work, int status) +{ + struct uvAsyncWork *w = work->data; + struct raft_io_async_work *req = w->req; + int req_status = w->status; + struct uv *uv = w->uv; + assert(status == 0); + + QUEUE_REMOVE(&w->queue); + RaftHeapFree(w); + req->cb(req, req_status); + uvMaybeFireCloseCb(uv); +} + +int UvAsyncWork(struct raft_io *io, + struct raft_io_async_work *req, + raft_io_async_work_cb cb) +{ + struct uv *uv; + struct uvAsyncWork *async_work; + int rv; + + uv = io->impl; + assert(!uv->closing); + + async_work = RaftHeapMalloc(sizeof *async_work); + if (async_work == NULL) { + rv = RAFT_NOMEM; + goto err; + } + + async_work->uv = uv; + async_work->req = req; + async_work->work.data = async_work; + req->cb = cb; + + QUEUE_PUSH(&uv->async_work_reqs, &async_work->queue); + rv = uv_queue_work(uv->loop, &async_work->work, uvAsyncWorkCb, + uvAsyncAfterWorkCb); + if (rv != 0) { + QUEUE_REMOVE(&async_work->queue); + tracef("async work: %s", uv_strerror(rv)); + rv = RAFT_IOERR; + goto err_after_req_alloc; + } + + return 0; + +err_after_req_alloc: + RaftHeapFree(async_work); +err: + assert(rv != 0); + return rv; +} + +#undef tracef diff --git a/test/integration/test_uv_work.c b/test/integration/test_uv_work.c new file mode 100644 index 000000000..6c7bd181d --- /dev/null +++ b/test/integration/test_uv_work.c @@ -0,0 +1,103 @@ +#include + +#include "../lib/dir.h" +#include "../lib/loop.h" +#include "../lib/runner.h" +#include "../lib/uv.h" +#include "../../src/uv.h" + +/****************************************************************************** + * + * Fixture + * + *****************************************************************************/ + +struct fixture +{ + FIXTURE_UV_DEPS; + FIXTURE_UV; +}; + +struct result +{ + int rv; /* Indicate success or failure of the work */ + int counter; /* Proof that work was performed */ + bool done; /* To check test termination */ +}; + +/****************************************************************************** + * + * Set up and tear down. + * + *****************************************************************************/ + +static void *setUp(const MunitParameter params[], void *user_data) +{ + struct fixture *f = munit_malloc(sizeof *f); + SETUP_UV_DEPS; + SETUP_UV; + return f; +} + +static void tearDownDeps(void *data) +{ + struct fixture *f = data; + if (f == NULL) { + return; + } + TEAR_DOWN_UV_DEPS; + free(f); +} + +static void tearDown(void *data) +{ + struct fixture *f = data; + if (f == NULL) { + return; + } + TEAR_DOWN_UV; + tearDownDeps(f); +} + +/****************************************************************************** + * + * UvAsyncWork + * + *****************************************************************************/ + +static void asyncWorkCbAssertResult(struct raft_io_async_work *req, int status) +{ + struct result *r = req->data; + munit_assert_int(status, ==, r->rv); + munit_assert_int(r->counter, ==, 1); + r->done = true; +} + +static int asyncWorkFn(struct raft_io_async_work *req) +{ + struct result *r = req->data; + sleep(1); + r->counter = 1; + return r->rv; +} + +SUITE(UvAsyncWork) + +static char* rvs[] = { "-1", "0", "1", "37", NULL }; +static MunitParameterEnum rvs_params[] = { + { "rv", rvs }, + { NULL, NULL }, +}; + +TEST(UvAsyncWork, work, setUp, tearDown, 0, rvs_params) +{ + struct fixture *f = data; + struct result res = {0}; + struct raft_io_async_work req = {0}; + res.rv = (int)strtol(munit_parameters_get(params, "rv"), NULL, 0); + req.data = &res; + req.work = asyncWorkFn; + UvAsyncWork(&f->io, &req, asyncWorkCbAssertResult); + LOOP_RUN_UNTIL(&res.done); + return MUNIT_OK; +} From b98e31b7913a9853b1baa1325b256ff2d2e2d483 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mathieu=20Border=C3=A9?= Date: Tue, 22 Mar 2022 14:17:45 +0100 Subject: [PATCH 2/2] raft_fsm: version 3 introducing async snapshot --- Makefile.am | 5 +- include/raft.h | 20 ++++- src/raft.c | 25 ++++++ src/replication.c | 79 ++++++++++++++--- src/uv_snapshot.c | 5 +- test/integration/test_init.c | 52 +++++++++++ test/integration/test_snapshot.c | 143 ++++++++++++++++++++++++++++--- test/lib/cluster.h | 16 +++- test/lib/fsm.c | 54 +++++++++++- test/lib/fsm.h | 3 + 10 files changed, 370 insertions(+), 32 deletions(-) create mode 100644 test/integration/test_init.c diff --git a/Makefile.am b/Makefile.am index be69c655a..4be26efde 100644 --- a/Makefile.am +++ b/Makefile.am @@ -105,14 +105,15 @@ test_integration_core_SOURCES = \ test/integration/test_election.c \ test/integration/test_fixture.c \ test/integration/test_heap.c \ + test/integration/test_init.c \ test/integration/test_membership.c \ test/integration/test_recover.c \ test/integration/test_replication.c \ test/integration/test_snapshot.c \ + test/integration/test_start.c \ test/integration/test_strerror.c \ test/integration/test_tick.c \ - test/integration/test_transfer.c \ - test/integration/test_start.c + test/integration/test_transfer.c test_integration_core_CFLAGS = $(AM_CFLAGS) -Wno-conversion test_integration_core_LDFLAGS = -no-install test_integration_core_LDADD = libtest.la libraft.la diff --git a/include/raft.h b/include/raft.h index ac8789908..2cb2ccccc 100644 --- a/include/raft.h +++ b/include/raft.h @@ -423,7 +423,7 @@ struct raft_io_snapshot_get struct raft_io_async_work; typedef int (*raft_io_async_work_fn)(struct raft_io_async_work *req); typedef void (*raft_io_async_work_cb)(struct raft_io_async_work *req, - int status); + int status); struct raft_io_async_work { void *data; /* User data */ @@ -529,10 +529,22 @@ struct raft_io * `snapshot_finalize` can be used to e.g. release a lock that was taken during * a call to `snapshot`. Until `snapshot_finalize` is called, raft can access * the data contained in the `raft_buffer`s. + * + * version 3: + * Adds support for async snapshots through the `snapshot_async` function. + * When this method is provided, raft will call `snapshot` in the main loop, + * and when successful, will call `snapshot_async` by means of the `io->async_work` method. + * The callback to `io->async_work` will in turn call `snapshot_finalize` + * in the main loop when the work has completed independent of the return value + * of `snapshot_async`. + * An implementation that does not use asynchronous snapshots MUST set + * `snapshot_async` to NULL. + * All memory allocated by the snapshot routines MUST be freed by the snapshot + * routines themselves. */ struct raft_fsm { - int version; + int version; /* 1, 2 or 3 */ void *data; int (*apply)(struct raft_fsm *fsm, const struct raft_buffer *buf, @@ -545,6 +557,10 @@ struct raft_fsm int (*snapshot_finalize)(struct raft_fsm *fsm, struct raft_buffer *bufs[], unsigned *n_bufs); + /* Fields below added since version 3. */ + int (*snapshot_async)(struct raft_fsm *fsm, + struct raft_buffer *bufs[], + unsigned *n_bufs); }; /** diff --git a/src/raft.c b/src/raft.c index dd5c3048d..071b822f2 100644 --- a/src/raft.c +++ b/src/raft.c @@ -24,6 +24,10 @@ #define DEFAULT_MAX_CATCH_UP_ROUNDS 10 #define DEFAULT_MAX_CATCH_UP_ROUND_DURATION (5 * 1000) +static int ioFsmCompat(struct raft *r, + struct raft_io *io, + struct raft_fsm *fsm); + int raft_init(struct raft *r, struct raft_io *io, struct raft_fsm *fsm, @@ -32,6 +36,12 @@ int raft_init(struct raft *r, { int rv; assert(r != NULL); + + rv = ioFsmCompat(r, io, fsm); + if (rv != 0) { + goto err; + } + r->io = io; r->io->data = r; r->fsm = fsm; @@ -227,3 +237,18 @@ unsigned long long raft_digest(const char *text, unsigned long long n) return byteFlip64(digest); } + +static int ioFsmCompat(struct raft *r, + struct raft_io *io, + struct raft_fsm *fsm) +{ + if ((fsm->version > 2 && fsm->snapshot_async != NULL) + && ((io->version < 2) || (io->async_work == NULL))) { + ErrMsgPrintf(r->errmsg, + "async snapshot requires io->version > 1 and async_work method."); + return -1; + } + + return 0; +} + diff --git a/src/replication.c b/src/replication.c index 1ba5d5d62..0d73be2f7 100644 --- a/src/replication.c +++ b/src/replication.c @@ -1455,12 +1455,58 @@ static void takeSnapshotCb(struct raft_io_snapshot_put *req, int status) } logSnapshot(&r->log, snapshot->index, r->snapshot.trailing); - out: - takeSnapshotClose(r, &r->snapshot.pending); + takeSnapshotClose(r, snapshot); r->snapshot.pending.term = 0; } +static int putSnapshot(struct raft *r, struct raft_snapshot *snapshot, + raft_io_snapshot_put_cb cb) +{ + int rv; + assert(r->snapshot.put.data == NULL); + r->snapshot.put.data = r; + rv = r->io->snapshot_put(r->io, r->snapshot.trailing, &r->snapshot.put, + snapshot, cb); + if (rv != 0) { + takeSnapshotClose(r, snapshot); + r->snapshot.pending.term = 0; + r->snapshot.put.data = NULL; + } + + return rv; +} + +static void takeSnapshotDoneCb(struct raft_io_async_work *take, + int status) { + struct raft *r = take->data; + struct raft_snapshot *snapshot = &r->snapshot.pending; + int rv; + + raft_free(take); + + if (status != 0) { + tracef("take snapshot failed %s", raft_strerror(status)); + takeSnapshotClose(r, snapshot); + r->snapshot.pending.term = 0; + r->snapshot.put.data = NULL; + return; + } + + rv = putSnapshot(r, snapshot, takeSnapshotCb); + if (rv != 0) { + tracef("put snapshot failed %d", rv); + } +} + +static int takeSnapshotAsync(struct raft_io_async_work *take) +{ + struct raft *r = take->data; + tracef("take snapshot async at %lld", r->snapshot.pending.index); + struct raft_snapshot *snapshot = &r->snapshot.pending; + return r->fsm->snapshot_async(r->fsm, &snapshot->bufs, &snapshot->n_bufs); +} + static int takeSnapshot(struct raft *r) { struct raft_snapshot *snapshot; @@ -1471,12 +1517,13 @@ static int takeSnapshot(struct raft *r) snapshot = &r->snapshot.pending; snapshot->index = r->last_applied; snapshot->term = logTermOf(&r->log, r->last_applied); + snapshot->bufs = NULL; + snapshot->n_bufs = 0; rv = configurationCopy(&r->configuration, &snapshot->configuration); if (rv != 0) { goto abort; } - snapshot->configuration_index = r->configuration_index; rv = r->fsm->snapshot(r->fsm, &snapshot->bufs, &snapshot->n_bufs); @@ -1489,17 +1536,29 @@ static int takeSnapshot(struct raft *r) goto abort; } - assert(r->snapshot.put.data == NULL); - r->snapshot.put.data = r; - rv = r->io->snapshot_put(r->io, r->snapshot.trailing, &r->snapshot.put, - snapshot, takeSnapshotCb); - if (rv != 0) { - goto abort_after_fsm_snapshot; + bool sync_snapshot = r->fsm->version < 3 || r->fsm->snapshot_async == NULL; + if (sync_snapshot) { + /* putSnapshot will clean up config and buffers in case of error */ + return putSnapshot(r, snapshot, takeSnapshotCb); + } else { + struct raft_io_async_work *take = raft_malloc(sizeof(*take)); + if (take == NULL) { + rv = RAFT_NOMEM; + goto abort_after_snapshot; + } + take->data = r; + take->work = takeSnapshotAsync; + rv = r->io->async_work(r->io, take, takeSnapshotDoneCb); + if (rv != 0) { + raft_free(take); + goto abort_after_snapshot; + } } return 0; -abort_after_fsm_snapshot: +abort_after_snapshot: + /* Closes config and finalizes snapshot */ takeSnapshotClose(r, snapshot); abort: r->snapshot.pending.term = 0; diff --git a/src/uv_snapshot.c b/src/uv_snapshot.c index feff857d7..064397d1d 100644 --- a/src/uv_snapshot.c +++ b/src/uv_snapshot.c @@ -634,7 +634,10 @@ int UvSnapshotPut(struct raft_io *io, raft_index next_index; uv = io->impl; - assert(!uv->closing); + if (uv->closing) { + return RAFT_CANCELED; + } + assert(uv->snapshot_put_work.data == NULL); tracef("put snapshot at %lld, keeping %d", snapshot->index, trailing); diff --git a/test/integration/test_init.c b/test/integration/test_init.c new file mode 100644 index 000000000..830d2274d --- /dev/null +++ b/test/integration/test_init.c @@ -0,0 +1,52 @@ +#include "../../include/raft.h" +#include "../lib/runner.h" + +/****************************************************************************** + * + * raft_init + * + *****************************************************************************/ + +SUITE(raft_init) + +/* Incompatible raft->io and raft->fsm wrt async snapshots. */ +TEST(raft_init, incompatIoFsmAsyncSnapshotNotNull, NULL, NULL, 0, NULL) +{ + /* Set incompatible io and fsm versions and non-NULL snapshot_async fn */ + struct raft r = {0}; + struct raft_io io = {0}; + struct raft_fsm fsm = {0}; + io.version = 1; /* Too low */ + io.async_work = (int (*)(struct raft_io *, struct raft_io_async_work *, + raft_io_async_work_cb))(uintptr_t)0xDEADBEEF; + fsm.version = 3; + fsm.snapshot_async = (int (*)(struct raft_fsm *, + struct raft_buffer **, unsigned int *))(uintptr_t)0xDEADBEEF; + + int rc; + rc = raft_init(&r, &io, &fsm, 1, "1"); + munit_assert_int(rc, ==, -1); + munit_assert_string_equal(r.errmsg, "async snapshot requires io->version > 1 and async_work method."); + return MUNIT_OK; +} + +/* Incompatible raft->io and raft->fsm wrt async snapshots. */ +TEST(raft_init, incompatIoFsmAsyncSnapshotNull, NULL, NULL, 0, NULL) +{ + /* Set incompatible io and fsm versions and NULL snapshot_async fn */ + struct raft r = {0}; + struct raft_io io = {0}; + struct raft_fsm fsm = {0}; + io.version = 2; + io.async_work = NULL; + fsm.version = 3; + fsm.snapshot_async = (int (*)(struct raft_fsm *, + struct raft_buffer **, unsigned int *))(uintptr_t)0xDEADBEEF; + + int rc; + rc = raft_init(&r, &io, &fsm, 1, "1"); + munit_assert_int(rc, ==, -1); + munit_assert_string_equal(r.errmsg, "async snapshot requires io->version > 1 and async_work method."); + return MUNIT_OK; +} + diff --git a/test/integration/test_snapshot.c b/test/integration/test_snapshot.c index b6da06226..a55c914a7 100644 --- a/test/integration/test_snapshot.c +++ b/test/integration/test_snapshot.c @@ -84,6 +84,24 @@ static int ioMethodSnapshotPutFail(struct raft_io *raft_io, } \ } +static int ioMethodAsyncWorkFail(struct raft_io *raft_io, + struct raft_io_async_work *req, + raft_io_async_work_cb cb) +{ + (void) raft_io; + (void) req; + (void) cb; + return -1; +} + +#define SET_FAULTY_ASYNC_WORK() \ + { \ + unsigned i; \ + for (i = 0; i < CLUSTER_N; i++) { \ + CLUSTER_RAFT(i)->io->async_work = ioMethodAsyncWorkFail; \ + } \ + } + static int fsmSnapshotFail(struct raft_fsm *fsm, struct raft_buffer *bufs[], unsigned *n_bufs) @@ -94,12 +112,27 @@ static int fsmSnapshotFail(struct raft_fsm *fsm, return -1; } -#define SET_FAULTY_SNAPSHOT() \ - { \ - unsigned i; \ - for (i = 0; i < CLUSTER_N; i++) { \ - CLUSTER_RAFT(i)->fsm->snapshot = fsmSnapshotFail; \ - } \ +#define SET_FAULTY_SNAPSHOT_ASYNC() \ + { \ + unsigned i; \ + for (i = 0; i < CLUSTER_N; i++) { \ + CLUSTER_RAFT(i)->fsm->snapshot_async = fsmSnapshotFail; \ + } \ + } + +#define RESET_FSM_ASYNC(I) \ + { \ + struct raft_fsm *fsm = CLUSTER_RAFT(I)->fsm; \ + FsmClose(fsm); \ + FsmInitAsync(fsm, fsm->version); \ + } + +#define SET_FAULTY_SNAPSHOT() \ + { \ + unsigned i; \ + for (i = 0; i < CLUSTER_N; i++) { \ + CLUSTER_RAFT(i)->fsm->snapshot = fsmSnapshotFail; \ + } \ } /****************************************************************************** @@ -515,14 +548,24 @@ TEST(snapshot, installSnapshotDuringEntriesWrite, setUp, tearDown, 0, NULL) return MUNIT_OK; } -static char *fsm_version[] = {"1", "2", NULL}; -static MunitParameterEnum fsm_version_params[] = { +static char *fsm_version[] = {"1", "2", "3", NULL}; +static char *fsm_snapshot_async[] = {"0", "1", NULL}; +static MunitParameterEnum fsm_snapshot_async_params[] = { + {CLUSTER_SS_ASYNC_PARAM, fsm_snapshot_async}, {CLUSTER_FSM_VERSION_PARAM, fsm_version}, {NULL, NULL}, }; +static char *fsm_snapshot_only_async[] = {"1", NULL}; +static char *fsm_version_only_async[] = {"3", NULL}; +static MunitParameterEnum fsm_snapshot_only_async_params[] = { + {CLUSTER_SS_ASYNC_PARAM, fsm_snapshot_only_async}, + {CLUSTER_FSM_VERSION_PARAM, fsm_version_only_async}, + {NULL, NULL}, +}; + /* Follower receives AppendEntries RPCs while taking a snapshot */ -TEST(snapshot, takeSnapshotAppendEntries, setUp, tearDown, 0, fsm_version_params) +TEST(snapshot, takeSnapshotAppendEntries, setUp, tearDown, 0, fsm_snapshot_async_params) { struct fixture *f = data; (void)params; @@ -542,7 +585,7 @@ TEST(snapshot, takeSnapshotAppendEntries, setUp, tearDown, 0, fsm_version_params /* Step the cluster until server 1 takes a snapshot */ const struct raft *r = CLUSTER_RAFT(1); - CLUSTER_STEP_UNTIL(server_taking_snapshot, (void*) r, 2000); + CLUSTER_STEP_UNTIL(server_taking_snapshot, (void*) r, 3000); /* Send AppendEntries RPCs while server 1 is taking a snapshot */ static struct raft_apply reqs[5]; @@ -559,7 +602,7 @@ TEST(snapshot, takeSnapshotAppendEntries, setUp, tearDown, 0, fsm_version_params return MUNIT_OK; } -TEST(snapshot, takeSnapshotSnapshotPutFail, setUp, tearDown, 0, fsm_version_params) +TEST(snapshot, takeSnapshotSnapshotPutFail, setUp, tearDown, 0, fsm_snapshot_async_params) { struct fixture *f = data; (void)params; @@ -579,7 +622,83 @@ TEST(snapshot, takeSnapshotSnapshotPutFail, setUp, tearDown, 0, fsm_version_para return MUNIT_OK; } -TEST(snapshot, takeSnapshotFail, setUp, tearDown, 0, fsm_version_params) +TEST(snapshot, takeSnapshotAsyncWorkFail, setUp, tearDown, 0, fsm_snapshot_async_params) +{ + struct fixture *f = data; + (void)params; + + SET_FAULTY_ASYNC_WORK(); + + /* Set very low threshold and trailing entries number */ + SET_SNAPSHOT_THRESHOLD(3); + SET_SNAPSHOT_TRAILING(1); + + /* Apply a few of entries, to force a snapshot to be taken. */ + CLUSTER_MAKE_PROGRESS; + CLUSTER_MAKE_PROGRESS; + CLUSTER_MAKE_PROGRESS; + + /* No crash or leaks have occurred */ + return MUNIT_OK; +} + +TEST(snapshot, takeSnapshotAsyncFail, setUp, tearDown, 0, fsm_snapshot_only_async_params) +{ + struct fixture *f = data; + (void)params; + + SET_FAULTY_SNAPSHOT_ASYNC(); + + /* Set very low threshold and trailing entries number */ + SET_SNAPSHOT_THRESHOLD(3); + SET_SNAPSHOT_TRAILING(1); + + /* Apply a few of entries, to force a snapshot to be taken. */ + CLUSTER_MAKE_PROGRESS; + CLUSTER_MAKE_PROGRESS; + CLUSTER_MAKE_PROGRESS; + + /* No crash or leaks have occurred */ + return MUNIT_OK; +} + +TEST(snapshot, takeSnapshotAsyncFailOnce, setUp, tearDown, 0, fsm_snapshot_only_async_params) +{ + struct fixture *f = data; + (void)params; + + SET_FAULTY_SNAPSHOT_ASYNC(); + + /* Set very low threshold and trailing entries number */ + SET_SNAPSHOT_THRESHOLD(3); + SET_SNAPSHOT_TRAILING(1); + CLUSTER_SATURATE_BOTHWAYS(0, 2); + + /* Apply a few of entries, to force a snapshot to be taken. */ + CLUSTER_MAKE_PROGRESS; + CLUSTER_MAKE_PROGRESS; + /* Wait for snapshot to fail. */ + CLUSTER_STEP_UNTIL_ELAPSED(200); + /* napshot will have failed here. */ + + /* Set the non-faulty fsm->snapshot_async function */ + RESET_FSM_ASYNC(CLUSTER_LEADER); + CLUSTER_MAKE_PROGRESS; + + /* Wait for snapshot to be finished */ + CLUSTER_STEP_UNTIL_ELAPSED(200); + + /* Reconnect the follower and wait for it to catch up */ + CLUSTER_DESATURATE_BOTHWAYS(0, 2); + CLUSTER_STEP_UNTIL_APPLIED(2, 4, 5000); + + /* Check that the leader has sent a snapshot */ + munit_assert_int(CLUSTER_N_SEND(0, RAFT_IO_INSTALL_SNAPSHOT), ==, 1); + munit_assert_int(CLUSTER_N_RECV(2, RAFT_IO_INSTALL_SNAPSHOT), ==, 1); + return MUNIT_OK; +} + +TEST(snapshot, takeSnapshotFail, setUp, tearDown, 0, fsm_snapshot_async_params) { struct fixture *f = data; (void)params; diff --git a/test/lib/cluster.h b/test/lib/cluster.h index 28b74ff9b..be6052d31 100644 --- a/test/lib/cluster.h +++ b/test/lib/cluster.h @@ -24,7 +24,8 @@ do { \ unsigned _n = DEFAULT_N; \ bool _pre_vote = false; \ - int _fsm_version = 2; \ + bool _ss_async = false; \ + int _fsm_version = 3; \ unsigned _hb = 0; \ unsigned _i; \ int _rv; \ @@ -39,6 +40,10 @@ _hb = \ atoi(munit_parameters_get(params, CLUSTER_HEARTBEAT_PARAM)); \ } \ + if (munit_parameters_get(params, CLUSTER_SS_ASYNC_PARAM) != NULL) { \ + _ss_async = \ + atoi(munit_parameters_get(params, CLUSTER_SS_ASYNC_PARAM)); \ + } \ if (munit_parameters_get(params, CLUSTER_FSM_VERSION_PARAM) != NULL) { \ _fsm_version = \ atoi(munit_parameters_get(params, CLUSTER_FSM_VERSION_PARAM)); \ @@ -47,9 +52,13 @@ _rv = raft_fixture_initialize(&f->cluster); \ munit_assert_int(_rv, ==, 0); \ for (_i = 0; _i < _n; _i++) { \ - FsmInit(&f->fsms[_i], _fsm_version); \ _rv = raft_fixture_grow(&f->cluster, &f->fsms[_i]); \ munit_assert_int(_rv, ==, 0); \ + if (!_ss_async || _fsm_version < 3) { \ + FsmInit(&f->fsms[_i], _fsm_version); \ + } else { \ + FsmInitAsync(&f->fsms[_i], _fsm_version); \ + } \ } \ for (_i = 0; _i < _n; _i++) { \ raft_set_pre_vote(raft_fixture_get(&f->cluster, _i), _pre_vote); \ @@ -83,6 +92,9 @@ #define CLUSTER_HEARTBEAT_PARAM "cluster-heartbeat" /* Munit parameter for setting snapshot behaviour */ +#define CLUSTER_SS_ASYNC_PARAM "cluster-snapshot-async" + +/* Munit parameter for setting fsm version */ #define CLUSTER_FSM_VERSION_PARAM "fsm-version" /* Get the number of servers in the cluster. */ diff --git a/test/lib/fsm.c b/test/lib/fsm.c index 552eeacdd..acb01248b 100644 --- a/test/lib/fsm.c +++ b/test/lib/fsm.c @@ -98,7 +98,8 @@ static int fsmEncodeSnapshot(int x, return 0; } -static int fsmSnapshot(struct raft_fsm *fsm, +/* For use with fsm->version 1 */ +static int fsmSnapshot_v1(struct raft_fsm *fsm, struct raft_buffer *bufs[], unsigned *n_bufs) { @@ -106,15 +107,39 @@ static int fsmSnapshot(struct raft_fsm *fsm, return fsmEncodeSnapshot(f->x, f->y, bufs, n_bufs); } +/* For use with fsmSnapshotFinalize and fsm->version >= 2 */ +static int fsmSnapshot_v2(struct raft_fsm *fsm, + struct raft_buffer *bufs[], + unsigned *n_bufs) +{ + struct fsm *f = fsm->data; + munit_assert_int(f->lock, ==, 0); + f->lock = 1; + f->data = raft_malloc(8); /* Detect proper cleanup in finalize */ + munit_assert_ptr_not_null(f->data); + return fsmEncodeSnapshot(f->x, f->y, bufs, n_bufs); +} + static int fsmSnapshotInitialize(struct raft_fsm *fsm, struct raft_buffer *bufs[], unsigned *n_bufs) { + (void) bufs; + (void) n_bufs; struct fsm *f = fsm->data; munit_assert_int(f->lock, ==, 0); f->lock = 1; + munit_assert_ptr_null(f->data); f->data = raft_malloc(8); /* Detect proper cleanup in finalize */ munit_assert_ptr_not_null(f->data); + return 0; +} + +static int fsmSnapshotAsync(struct raft_fsm *fsm, + struct raft_buffer *bufs[], + unsigned *n_bufs) +{ + struct fsm *f = fsm->data; return fsmEncodeSnapshot(f->x, f->y, bufs, n_bufs); } @@ -135,7 +160,9 @@ static int fsmSnapshotFinalize(struct raft_fsm *fsm, *n_bufs = 0; munit_assert_int(f->lock, ==, 1); f->lock = 0; + munit_assert_ptr_not_null(f->data); raft_free(f->data); + f->data = NULL; return 0; } @@ -152,14 +179,35 @@ void FsmInit(struct raft_fsm *fsm, int version) fsm->version = version; fsm->data = f; fsm->apply = fsmApply; - fsm->snapshot = fsmSnapshot; + fsm->snapshot = fsmSnapshot_v1; fsm->restore = fsmRestore; if (version > 1) { - fsm->snapshot = fsmSnapshotInitialize; + fsm->snapshot = fsmSnapshot_v2; fsm->snapshot_finalize = fsmSnapshotFinalize; + fsm->snapshot_async = NULL; } } +void FsmInitAsync(struct raft_fsm *fsm, int version) +{ + munit_assert_int(version, >, 2); + struct fsm *f = munit_malloc(sizeof *fsm); + memset(fsm, 'x', sizeof(*fsm)); /* Fill with garbage */ + + f->x = 0; + f->y = 0; + f->lock = 0; + f->data = NULL; + + fsm->version = version; + fsm->data = f; + fsm->apply = fsmApply; + fsm->snapshot = fsmSnapshotInitialize; + fsm->snapshot_async = fsmSnapshotAsync; + fsm->snapshot_finalize = fsmSnapshotFinalize; + fsm->restore = fsmRestore; +} + void FsmClose(struct raft_fsm *fsm) { struct fsm *f = fsm->data; diff --git a/test/lib/fsm.h b/test/lib/fsm.h index ff7c008be..f99ad0acc 100644 --- a/test/lib/fsm.h +++ b/test/lib/fsm.h @@ -9,6 +9,9 @@ void FsmInit(struct raft_fsm *fsm, int version); +/* Same as FsmInit but with asynchronous snapshots */ +void FsmInitAsync(struct raft_fsm *fsm, int version); + void FsmClose(struct raft_fsm *fsm); /* Encode a command to set x to the given value. */