diff --git a/Makefile.am b/Makefile.am index be69c655a..4be26efde 100644 --- a/Makefile.am +++ b/Makefile.am @@ -105,14 +105,15 @@ test_integration_core_SOURCES = \ test/integration/test_election.c \ test/integration/test_fixture.c \ test/integration/test_heap.c \ + test/integration/test_init.c \ test/integration/test_membership.c \ test/integration/test_recover.c \ test/integration/test_replication.c \ test/integration/test_snapshot.c \ + test/integration/test_start.c \ test/integration/test_strerror.c \ test/integration/test_tick.c \ - test/integration/test_transfer.c \ - test/integration/test_start.c + test/integration/test_transfer.c test_integration_core_CFLAGS = $(AM_CFLAGS) -Wno-conversion test_integration_core_LDFLAGS = -no-install test_integration_core_LDADD = libtest.la libraft.la diff --git a/include/raft.h b/include/raft.h index ac8789908..2cb2ccccc 100644 --- a/include/raft.h +++ b/include/raft.h @@ -423,7 +423,7 @@ struct raft_io_snapshot_get struct raft_io_async_work; typedef int (*raft_io_async_work_fn)(struct raft_io_async_work *req); typedef void (*raft_io_async_work_cb)(struct raft_io_async_work *req, - int status); + int status); struct raft_io_async_work { void *data; /* User data */ @@ -529,10 +529,22 @@ struct raft_io * `snapshot_finalize` can be used to e.g. release a lock that was taken during * a call to `snapshot`. Until `snapshot_finalize` is called, raft can access * the data contained in the `raft_buffer`s. + * + * version 3: + * Adds support for async snapshots through the `snapshot_async` function. + * When this method is provided, raft will call `snapshot` in the main loop, + * and when successful, will call `snapshot_async` by means of the `io->async_work` method. + * The callback to `io->async_work` will in turn call `snapshot_finalize` + * in the main loop when the work has completed independent of the return value + * of `snapshot_async`. + * An implementation that does not use asynchronous snapshots MUST set + * `snapshot_async` to NULL. + * All memory allocated by the snapshot routines MUST be freed by the snapshot + * routines themselves. */ struct raft_fsm { - int version; + int version; /* 1, 2 or 3 */ void *data; int (*apply)(struct raft_fsm *fsm, const struct raft_buffer *buf, @@ -545,6 +557,10 @@ struct raft_fsm int (*snapshot_finalize)(struct raft_fsm *fsm, struct raft_buffer *bufs[], unsigned *n_bufs); + /* Fields below added since version 3. */ + int (*snapshot_async)(struct raft_fsm *fsm, + struct raft_buffer *bufs[], + unsigned *n_bufs); }; /** diff --git a/src/raft.c b/src/raft.c index dd5c3048d..071b822f2 100644 --- a/src/raft.c +++ b/src/raft.c @@ -24,6 +24,10 @@ #define DEFAULT_MAX_CATCH_UP_ROUNDS 10 #define DEFAULT_MAX_CATCH_UP_ROUND_DURATION (5 * 1000) +static int ioFsmCompat(struct raft *r, + struct raft_io *io, + struct raft_fsm *fsm); + int raft_init(struct raft *r, struct raft_io *io, struct raft_fsm *fsm, @@ -32,6 +36,12 @@ int raft_init(struct raft *r, { int rv; assert(r != NULL); + + rv = ioFsmCompat(r, io, fsm); + if (rv != 0) { + goto err; + } + r->io = io; r->io->data = r; r->fsm = fsm; @@ -227,3 +237,18 @@ unsigned long long raft_digest(const char *text, unsigned long long n) return byteFlip64(digest); } + +static int ioFsmCompat(struct raft *r, + struct raft_io *io, + struct raft_fsm *fsm) +{ + if ((fsm->version > 2 && fsm->snapshot_async != NULL) + && ((io->version < 2) || (io->async_work == NULL))) { + ErrMsgPrintf(r->errmsg, + "async snapshot requires io->version > 1 and async_work method."); + return -1; + } + + return 0; +} + diff --git a/src/replication.c b/src/replication.c index 1ba5d5d62..0d73be2f7 100644 --- a/src/replication.c +++ b/src/replication.c @@ -1455,12 +1455,58 @@ static void takeSnapshotCb(struct raft_io_snapshot_put *req, int status) } logSnapshot(&r->log, snapshot->index, r->snapshot.trailing); - out: - takeSnapshotClose(r, &r->snapshot.pending); + takeSnapshotClose(r, snapshot); r->snapshot.pending.term = 0; } +static int putSnapshot(struct raft *r, struct raft_snapshot *snapshot, + raft_io_snapshot_put_cb cb) +{ + int rv; + assert(r->snapshot.put.data == NULL); + r->snapshot.put.data = r; + rv = r->io->snapshot_put(r->io, r->snapshot.trailing, &r->snapshot.put, + snapshot, cb); + if (rv != 0) { + takeSnapshotClose(r, snapshot); + r->snapshot.pending.term = 0; + r->snapshot.put.data = NULL; + } + + return rv; +} + +static void takeSnapshotDoneCb(struct raft_io_async_work *take, + int status) { + struct raft *r = take->data; + struct raft_snapshot *snapshot = &r->snapshot.pending; + int rv; + + raft_free(take); + + if (status != 0) { + tracef("take snapshot failed %s", raft_strerror(status)); + takeSnapshotClose(r, snapshot); + r->snapshot.pending.term = 0; + r->snapshot.put.data = NULL; + return; + } + + rv = putSnapshot(r, snapshot, takeSnapshotCb); + if (rv != 0) { + tracef("put snapshot failed %d", rv); + } +} + +static int takeSnapshotAsync(struct raft_io_async_work *take) +{ + struct raft *r = take->data; + tracef("take snapshot async at %lld", r->snapshot.pending.index); + struct raft_snapshot *snapshot = &r->snapshot.pending; + return r->fsm->snapshot_async(r->fsm, &snapshot->bufs, &snapshot->n_bufs); +} + static int takeSnapshot(struct raft *r) { struct raft_snapshot *snapshot; @@ -1471,12 +1517,13 @@ static int takeSnapshot(struct raft *r) snapshot = &r->snapshot.pending; snapshot->index = r->last_applied; snapshot->term = logTermOf(&r->log, r->last_applied); + snapshot->bufs = NULL; + snapshot->n_bufs = 0; rv = configurationCopy(&r->configuration, &snapshot->configuration); if (rv != 0) { goto abort; } - snapshot->configuration_index = r->configuration_index; rv = r->fsm->snapshot(r->fsm, &snapshot->bufs, &snapshot->n_bufs); @@ -1489,17 +1536,29 @@ static int takeSnapshot(struct raft *r) goto abort; } - assert(r->snapshot.put.data == NULL); - r->snapshot.put.data = r; - rv = r->io->snapshot_put(r->io, r->snapshot.trailing, &r->snapshot.put, - snapshot, takeSnapshotCb); - if (rv != 0) { - goto abort_after_fsm_snapshot; + bool sync_snapshot = r->fsm->version < 3 || r->fsm->snapshot_async == NULL; + if (sync_snapshot) { + /* putSnapshot will clean up config and buffers in case of error */ + return putSnapshot(r, snapshot, takeSnapshotCb); + } else { + struct raft_io_async_work *take = raft_malloc(sizeof(*take)); + if (take == NULL) { + rv = RAFT_NOMEM; + goto abort_after_snapshot; + } + take->data = r; + take->work = takeSnapshotAsync; + rv = r->io->async_work(r->io, take, takeSnapshotDoneCb); + if (rv != 0) { + raft_free(take); + goto abort_after_snapshot; + } } return 0; -abort_after_fsm_snapshot: +abort_after_snapshot: + /* Closes config and finalizes snapshot */ takeSnapshotClose(r, snapshot); abort: r->snapshot.pending.term = 0; diff --git a/src/uv_snapshot.c b/src/uv_snapshot.c index feff857d7..064397d1d 100644 --- a/src/uv_snapshot.c +++ b/src/uv_snapshot.c @@ -634,7 +634,10 @@ int UvSnapshotPut(struct raft_io *io, raft_index next_index; uv = io->impl; - assert(!uv->closing); + if (uv->closing) { + return RAFT_CANCELED; + } + assert(uv->snapshot_put_work.data == NULL); tracef("put snapshot at %lld, keeping %d", snapshot->index, trailing); diff --git a/test/integration/test_init.c b/test/integration/test_init.c new file mode 100644 index 000000000..830d2274d --- /dev/null +++ b/test/integration/test_init.c @@ -0,0 +1,52 @@ +#include "../../include/raft.h" +#include "../lib/runner.h" + +/****************************************************************************** + * + * raft_init + * + *****************************************************************************/ + +SUITE(raft_init) + +/* Incompatible raft->io and raft->fsm wrt async snapshots. */ +TEST(raft_init, incompatIoFsmAsyncSnapshotNotNull, NULL, NULL, 0, NULL) +{ + /* Set incompatible io and fsm versions and non-NULL snapshot_async fn */ + struct raft r = {0}; + struct raft_io io = {0}; + struct raft_fsm fsm = {0}; + io.version = 1; /* Too low */ + io.async_work = (int (*)(struct raft_io *, struct raft_io_async_work *, + raft_io_async_work_cb))(uintptr_t)0xDEADBEEF; + fsm.version = 3; + fsm.snapshot_async = (int (*)(struct raft_fsm *, + struct raft_buffer **, unsigned int *))(uintptr_t)0xDEADBEEF; + + int rc; + rc = raft_init(&r, &io, &fsm, 1, "1"); + munit_assert_int(rc, ==, -1); + munit_assert_string_equal(r.errmsg, "async snapshot requires io->version > 1 and async_work method."); + return MUNIT_OK; +} + +/* Incompatible raft->io and raft->fsm wrt async snapshots. */ +TEST(raft_init, incompatIoFsmAsyncSnapshotNull, NULL, NULL, 0, NULL) +{ + /* Set incompatible io and fsm versions and NULL snapshot_async fn */ + struct raft r = {0}; + struct raft_io io = {0}; + struct raft_fsm fsm = {0}; + io.version = 2; + io.async_work = NULL; + fsm.version = 3; + fsm.snapshot_async = (int (*)(struct raft_fsm *, + struct raft_buffer **, unsigned int *))(uintptr_t)0xDEADBEEF; + + int rc; + rc = raft_init(&r, &io, &fsm, 1, "1"); + munit_assert_int(rc, ==, -1); + munit_assert_string_equal(r.errmsg, "async snapshot requires io->version > 1 and async_work method."); + return MUNIT_OK; +} + diff --git a/test/integration/test_snapshot.c b/test/integration/test_snapshot.c index b6da06226..a55c914a7 100644 --- a/test/integration/test_snapshot.c +++ b/test/integration/test_snapshot.c @@ -84,6 +84,24 @@ static int ioMethodSnapshotPutFail(struct raft_io *raft_io, } \ } +static int ioMethodAsyncWorkFail(struct raft_io *raft_io, + struct raft_io_async_work *req, + raft_io_async_work_cb cb) +{ + (void) raft_io; + (void) req; + (void) cb; + return -1; +} + +#define SET_FAULTY_ASYNC_WORK() \ + { \ + unsigned i; \ + for (i = 0; i < CLUSTER_N; i++) { \ + CLUSTER_RAFT(i)->io->async_work = ioMethodAsyncWorkFail; \ + } \ + } + static int fsmSnapshotFail(struct raft_fsm *fsm, struct raft_buffer *bufs[], unsigned *n_bufs) @@ -94,12 +112,27 @@ static int fsmSnapshotFail(struct raft_fsm *fsm, return -1; } -#define SET_FAULTY_SNAPSHOT() \ - { \ - unsigned i; \ - for (i = 0; i < CLUSTER_N; i++) { \ - CLUSTER_RAFT(i)->fsm->snapshot = fsmSnapshotFail; \ - } \ +#define SET_FAULTY_SNAPSHOT_ASYNC() \ + { \ + unsigned i; \ + for (i = 0; i < CLUSTER_N; i++) { \ + CLUSTER_RAFT(i)->fsm->snapshot_async = fsmSnapshotFail; \ + } \ + } + +#define RESET_FSM_ASYNC(I) \ + { \ + struct raft_fsm *fsm = CLUSTER_RAFT(I)->fsm; \ + FsmClose(fsm); \ + FsmInitAsync(fsm, fsm->version); \ + } + +#define SET_FAULTY_SNAPSHOT() \ + { \ + unsigned i; \ + for (i = 0; i < CLUSTER_N; i++) { \ + CLUSTER_RAFT(i)->fsm->snapshot = fsmSnapshotFail; \ + } \ } /****************************************************************************** @@ -515,14 +548,24 @@ TEST(snapshot, installSnapshotDuringEntriesWrite, setUp, tearDown, 0, NULL) return MUNIT_OK; } -static char *fsm_version[] = {"1", "2", NULL}; -static MunitParameterEnum fsm_version_params[] = { +static char *fsm_version[] = {"1", "2", "3", NULL}; +static char *fsm_snapshot_async[] = {"0", "1", NULL}; +static MunitParameterEnum fsm_snapshot_async_params[] = { + {CLUSTER_SS_ASYNC_PARAM, fsm_snapshot_async}, {CLUSTER_FSM_VERSION_PARAM, fsm_version}, {NULL, NULL}, }; +static char *fsm_snapshot_only_async[] = {"1", NULL}; +static char *fsm_version_only_async[] = {"3", NULL}; +static MunitParameterEnum fsm_snapshot_only_async_params[] = { + {CLUSTER_SS_ASYNC_PARAM, fsm_snapshot_only_async}, + {CLUSTER_FSM_VERSION_PARAM, fsm_version_only_async}, + {NULL, NULL}, +}; + /* Follower receives AppendEntries RPCs while taking a snapshot */ -TEST(snapshot, takeSnapshotAppendEntries, setUp, tearDown, 0, fsm_version_params) +TEST(snapshot, takeSnapshotAppendEntries, setUp, tearDown, 0, fsm_snapshot_async_params) { struct fixture *f = data; (void)params; @@ -542,7 +585,7 @@ TEST(snapshot, takeSnapshotAppendEntries, setUp, tearDown, 0, fsm_version_params /* Step the cluster until server 1 takes a snapshot */ const struct raft *r = CLUSTER_RAFT(1); - CLUSTER_STEP_UNTIL(server_taking_snapshot, (void*) r, 2000); + CLUSTER_STEP_UNTIL(server_taking_snapshot, (void*) r, 3000); /* Send AppendEntries RPCs while server 1 is taking a snapshot */ static struct raft_apply reqs[5]; @@ -559,7 +602,7 @@ TEST(snapshot, takeSnapshotAppendEntries, setUp, tearDown, 0, fsm_version_params return MUNIT_OK; } -TEST(snapshot, takeSnapshotSnapshotPutFail, setUp, tearDown, 0, fsm_version_params) +TEST(snapshot, takeSnapshotSnapshotPutFail, setUp, tearDown, 0, fsm_snapshot_async_params) { struct fixture *f = data; (void)params; @@ -579,7 +622,83 @@ TEST(snapshot, takeSnapshotSnapshotPutFail, setUp, tearDown, 0, fsm_version_para return MUNIT_OK; } -TEST(snapshot, takeSnapshotFail, setUp, tearDown, 0, fsm_version_params) +TEST(snapshot, takeSnapshotAsyncWorkFail, setUp, tearDown, 0, fsm_snapshot_async_params) +{ + struct fixture *f = data; + (void)params; + + SET_FAULTY_ASYNC_WORK(); + + /* Set very low threshold and trailing entries number */ + SET_SNAPSHOT_THRESHOLD(3); + SET_SNAPSHOT_TRAILING(1); + + /* Apply a few of entries, to force a snapshot to be taken. */ + CLUSTER_MAKE_PROGRESS; + CLUSTER_MAKE_PROGRESS; + CLUSTER_MAKE_PROGRESS; + + /* No crash or leaks have occurred */ + return MUNIT_OK; +} + +TEST(snapshot, takeSnapshotAsyncFail, setUp, tearDown, 0, fsm_snapshot_only_async_params) +{ + struct fixture *f = data; + (void)params; + + SET_FAULTY_SNAPSHOT_ASYNC(); + + /* Set very low threshold and trailing entries number */ + SET_SNAPSHOT_THRESHOLD(3); + SET_SNAPSHOT_TRAILING(1); + + /* Apply a few of entries, to force a snapshot to be taken. */ + CLUSTER_MAKE_PROGRESS; + CLUSTER_MAKE_PROGRESS; + CLUSTER_MAKE_PROGRESS; + + /* No crash or leaks have occurred */ + return MUNIT_OK; +} + +TEST(snapshot, takeSnapshotAsyncFailOnce, setUp, tearDown, 0, fsm_snapshot_only_async_params) +{ + struct fixture *f = data; + (void)params; + + SET_FAULTY_SNAPSHOT_ASYNC(); + + /* Set very low threshold and trailing entries number */ + SET_SNAPSHOT_THRESHOLD(3); + SET_SNAPSHOT_TRAILING(1); + CLUSTER_SATURATE_BOTHWAYS(0, 2); + + /* Apply a few of entries, to force a snapshot to be taken. */ + CLUSTER_MAKE_PROGRESS; + CLUSTER_MAKE_PROGRESS; + /* Wait for snapshot to fail. */ + CLUSTER_STEP_UNTIL_ELAPSED(200); + /* napshot will have failed here. */ + + /* Set the non-faulty fsm->snapshot_async function */ + RESET_FSM_ASYNC(CLUSTER_LEADER); + CLUSTER_MAKE_PROGRESS; + + /* Wait for snapshot to be finished */ + CLUSTER_STEP_UNTIL_ELAPSED(200); + + /* Reconnect the follower and wait for it to catch up */ + CLUSTER_DESATURATE_BOTHWAYS(0, 2); + CLUSTER_STEP_UNTIL_APPLIED(2, 4, 5000); + + /* Check that the leader has sent a snapshot */ + munit_assert_int(CLUSTER_N_SEND(0, RAFT_IO_INSTALL_SNAPSHOT), ==, 1); + munit_assert_int(CLUSTER_N_RECV(2, RAFT_IO_INSTALL_SNAPSHOT), ==, 1); + return MUNIT_OK; +} + +TEST(snapshot, takeSnapshotFail, setUp, tearDown, 0, fsm_snapshot_async_params) { struct fixture *f = data; (void)params; diff --git a/test/lib/cluster.h b/test/lib/cluster.h index 28b74ff9b..be6052d31 100644 --- a/test/lib/cluster.h +++ b/test/lib/cluster.h @@ -24,7 +24,8 @@ do { \ unsigned _n = DEFAULT_N; \ bool _pre_vote = false; \ - int _fsm_version = 2; \ + bool _ss_async = false; \ + int _fsm_version = 3; \ unsigned _hb = 0; \ unsigned _i; \ int _rv; \ @@ -39,6 +40,10 @@ _hb = \ atoi(munit_parameters_get(params, CLUSTER_HEARTBEAT_PARAM)); \ } \ + if (munit_parameters_get(params, CLUSTER_SS_ASYNC_PARAM) != NULL) { \ + _ss_async = \ + atoi(munit_parameters_get(params, CLUSTER_SS_ASYNC_PARAM)); \ + } \ if (munit_parameters_get(params, CLUSTER_FSM_VERSION_PARAM) != NULL) { \ _fsm_version = \ atoi(munit_parameters_get(params, CLUSTER_FSM_VERSION_PARAM)); \ @@ -47,9 +52,13 @@ _rv = raft_fixture_initialize(&f->cluster); \ munit_assert_int(_rv, ==, 0); \ for (_i = 0; _i < _n; _i++) { \ - FsmInit(&f->fsms[_i], _fsm_version); \ _rv = raft_fixture_grow(&f->cluster, &f->fsms[_i]); \ munit_assert_int(_rv, ==, 0); \ + if (!_ss_async || _fsm_version < 3) { \ + FsmInit(&f->fsms[_i], _fsm_version); \ + } else { \ + FsmInitAsync(&f->fsms[_i], _fsm_version); \ + } \ } \ for (_i = 0; _i < _n; _i++) { \ raft_set_pre_vote(raft_fixture_get(&f->cluster, _i), _pre_vote); \ @@ -83,6 +92,9 @@ #define CLUSTER_HEARTBEAT_PARAM "cluster-heartbeat" /* Munit parameter for setting snapshot behaviour */ +#define CLUSTER_SS_ASYNC_PARAM "cluster-snapshot-async" + +/* Munit parameter for setting fsm version */ #define CLUSTER_FSM_VERSION_PARAM "fsm-version" /* Get the number of servers in the cluster. */ diff --git a/test/lib/fsm.c b/test/lib/fsm.c index 552eeacdd..acb01248b 100644 --- a/test/lib/fsm.c +++ b/test/lib/fsm.c @@ -98,7 +98,8 @@ static int fsmEncodeSnapshot(int x, return 0; } -static int fsmSnapshot(struct raft_fsm *fsm, +/* For use with fsm->version 1 */ +static int fsmSnapshot_v1(struct raft_fsm *fsm, struct raft_buffer *bufs[], unsigned *n_bufs) { @@ -106,15 +107,39 @@ static int fsmSnapshot(struct raft_fsm *fsm, return fsmEncodeSnapshot(f->x, f->y, bufs, n_bufs); } +/* For use with fsmSnapshotFinalize and fsm->version >= 2 */ +static int fsmSnapshot_v2(struct raft_fsm *fsm, + struct raft_buffer *bufs[], + unsigned *n_bufs) +{ + struct fsm *f = fsm->data; + munit_assert_int(f->lock, ==, 0); + f->lock = 1; + f->data = raft_malloc(8); /* Detect proper cleanup in finalize */ + munit_assert_ptr_not_null(f->data); + return fsmEncodeSnapshot(f->x, f->y, bufs, n_bufs); +} + static int fsmSnapshotInitialize(struct raft_fsm *fsm, struct raft_buffer *bufs[], unsigned *n_bufs) { + (void) bufs; + (void) n_bufs; struct fsm *f = fsm->data; munit_assert_int(f->lock, ==, 0); f->lock = 1; + munit_assert_ptr_null(f->data); f->data = raft_malloc(8); /* Detect proper cleanup in finalize */ munit_assert_ptr_not_null(f->data); + return 0; +} + +static int fsmSnapshotAsync(struct raft_fsm *fsm, + struct raft_buffer *bufs[], + unsigned *n_bufs) +{ + struct fsm *f = fsm->data; return fsmEncodeSnapshot(f->x, f->y, bufs, n_bufs); } @@ -135,7 +160,9 @@ static int fsmSnapshotFinalize(struct raft_fsm *fsm, *n_bufs = 0; munit_assert_int(f->lock, ==, 1); f->lock = 0; + munit_assert_ptr_not_null(f->data); raft_free(f->data); + f->data = NULL; return 0; } @@ -152,14 +179,35 @@ void FsmInit(struct raft_fsm *fsm, int version) fsm->version = version; fsm->data = f; fsm->apply = fsmApply; - fsm->snapshot = fsmSnapshot; + fsm->snapshot = fsmSnapshot_v1; fsm->restore = fsmRestore; if (version > 1) { - fsm->snapshot = fsmSnapshotInitialize; + fsm->snapshot = fsmSnapshot_v2; fsm->snapshot_finalize = fsmSnapshotFinalize; + fsm->snapshot_async = NULL; } } +void FsmInitAsync(struct raft_fsm *fsm, int version) +{ + munit_assert_int(version, >, 2); + struct fsm *f = munit_malloc(sizeof *fsm); + memset(fsm, 'x', sizeof(*fsm)); /* Fill with garbage */ + + f->x = 0; + f->y = 0; + f->lock = 0; + f->data = NULL; + + fsm->version = version; + fsm->data = f; + fsm->apply = fsmApply; + fsm->snapshot = fsmSnapshotInitialize; + fsm->snapshot_async = fsmSnapshotAsync; + fsm->snapshot_finalize = fsmSnapshotFinalize; + fsm->restore = fsmRestore; +} + void FsmClose(struct raft_fsm *fsm) { struct fsm *f = fsm->data; diff --git a/test/lib/fsm.h b/test/lib/fsm.h index ff7c008be..f99ad0acc 100644 --- a/test/lib/fsm.h +++ b/test/lib/fsm.h @@ -9,6 +9,9 @@ void FsmInit(struct raft_fsm *fsm, int version); +/* Same as FsmInit but with asynchronous snapshots */ +void FsmInitAsync(struct raft_fsm *fsm, int version); + void FsmClose(struct raft_fsm *fsm); /* Encode a command to set x to the given value. */