diff --git a/Makefile.am b/Makefile.am index 17dede867..ac990a569 100644 --- a/Makefile.am +++ b/Makefile.am @@ -8,7 +8,7 @@ raftinclude_HEADERS = lib_LTLIBRARIES = libraft.la libraft_la_CFLAGS = $(AM_CFLAGS) -fvisibility=hidden -libraft_la_LDFLAGS = -version-info 0:7:0 +libraft_la_LDFLAGS = -version-info 2:0:0 libraft_la_SOURCES = \ src/byte.c \ src/client.c \ diff --git a/configure.ac b/configure.ac index df7bea9b0..a2bc1ebed 100644 --- a/configure.ac +++ b/configure.ac @@ -1,5 +1,5 @@ AC_PREREQ(2.60) -AC_INIT([raft], [0.11.2]) +AC_INIT([raft], [0.13.0]) AC_LANG([C]) AC_CONFIG_MACRO_DIR([m4]) AC_CONFIG_AUX_DIR([ac]) diff --git a/example/server.c b/example/server.c index 604c99487..e6cb5195e 100644 --- a/example/server.c +++ b/example/server.c @@ -74,10 +74,11 @@ static int FsmInit(struct raft_fsm *fsm) return RAFT_NOMEM; } f->count = 0; - fsm->version = 1; + fsm->version = 2; fsm->data = f; fsm->apply = FsmApply; fsm->snapshot = FsmSnapshot; + fsm->snapshot_finalize = NULL; fsm->restore = FsmRestore; return 0; } diff --git a/include/raft.h b/include/raft.h index 9ce4aacb1..d63710d11 100644 --- a/include/raft.h +++ b/include/raft.h @@ -501,6 +501,31 @@ struct raft_io int (*random)(struct raft_io *io, int min, int max); }; +/* + * version 1: + * struct raft_fsm + * { + * int version; + * void *data; + * int (*apply)(struct raft_fsm *fsm, + * const struct raft_buffer *buf, + * void **result); + * int (*snapshot)(struct raft_fsm *fsm, + * struct raft_buffer *bufs[], + * unsigned *n_bufs); + * int (*restore)(struct raft_fsm *fsm, struct raft_buffer *buf); + * }; + * + * version 2: + * introduces `snapshot_finalize`, when this method is not NULL, it will + * always run after a successful call to `snapshot`, whether the snapshot has + * been successfully written to disk or not. If it is set, raft will + * assume no ownership of any of the `raft_buffer`s and the responsibility to + * clean up lies with the user of raft. + * `snapshot_finalize` can be used to e.g. release a lock that was taken during + * a call to `snapshot`. Until `snapshot_finalize` is called, raft can access + * the data contained in the `raft_buffer`s. + */ struct raft_fsm { int version; @@ -512,6 +537,9 @@ struct raft_fsm struct raft_buffer *bufs[], unsigned *n_bufs); int (*restore)(struct raft_fsm *fsm, struct raft_buffer *buf); + int (*snapshot_finalize)(struct raft_fsm *fsm, + struct raft_buffer *bufs[], + unsigned *n_bufs); }; /** diff --git a/src/replication.c b/src/replication.c index ca65ecf0b..1ba5d5d62 100644 --- a/src/replication.c +++ b/src/replication.c @@ -531,7 +531,7 @@ static void appendLeaderCb(struct raft_io_append *req, int status) /* Submit a disk write for all entries from the given index onward. */ static int appendLeader(struct raft *r, raft_index index) { - struct raft_entry *entries; + struct raft_entry *entries = NULL; unsigned n; struct appendLeader *request; int rv; @@ -1424,6 +1424,22 @@ static bool shouldTakeSnapshot(struct raft *r) return true; } +/* + * When taking a snapshot, ownership of the snapshot data is with raft if + * `snapshot_finalize` is NULL. + */ +static void takeSnapshotClose(struct raft *r, struct raft_snapshot *s) +{ + if (r->fsm->version == 1 || + (r->fsm->version > 1 && r->fsm->snapshot_finalize == NULL)) { + snapshotClose(s); + return; + } + + configurationClose(&s->configuration); + r->fsm->snapshot_finalize(r->fsm, &s->bufs, &s->n_bufs); +} + static void takeSnapshotCb(struct raft_io_snapshot_put *req, int status) { struct raft *r = req->data; @@ -1441,14 +1457,13 @@ static void takeSnapshotCb(struct raft_io_snapshot_put *req, int status) logSnapshot(&r->log, snapshot->index, r->snapshot.trailing); out: - snapshotClose(&r->snapshot.pending); + takeSnapshotClose(r, &r->snapshot.pending); r->snapshot.pending.term = 0; } static int takeSnapshot(struct raft *r) { struct raft_snapshot *snapshot; - unsigned i; int rv; tracef("take snapshot at %lld", r->last_applied); @@ -1470,7 +1485,8 @@ static int takeSnapshot(struct raft *r) if (rv == RAFT_BUSY) { rv = 0; } - goto abort_after_config_copy; + raft_configuration_close(&snapshot->configuration); + goto abort; } assert(r->snapshot.put.data == NULL); @@ -1484,12 +1500,7 @@ static int takeSnapshot(struct raft *r) return 0; abort_after_fsm_snapshot: - for (i = 0; i < snapshot->n_bufs; i++) { - raft_free(snapshot->bufs[i].base); - } - raft_free(snapshot->bufs); -abort_after_config_copy: - raft_configuration_close(&snapshot->configuration); + takeSnapshotClose(r, snapshot); abort: r->snapshot.pending.term = 0; return rv; diff --git a/src/uv_append.c b/src/uv_append.c index 22b763602..c10c01ff4 100644 --- a/src/uv_append.c +++ b/src/uv_append.c @@ -635,7 +635,7 @@ int UvAppend(struct raft_io *io, uv = io->impl; assert(!uv->closing); - append = RaftHeapMalloc(sizeof *append); + append = RaftHeapCalloc(1, sizeof *append); if (append == NULL) { rv = RAFT_NOMEM; goto err; diff --git a/src/uv_segment.c b/src/uv_segment.c index c4d4bba93..b625a6322 100644 --- a/src/uv_segment.c +++ b/src/uv_segment.c @@ -901,9 +901,9 @@ static int uvWriteClosedSegment(struct uv *uv, const struct raft_buffer *conf) { char filename[UV__FILENAME_LEN]; - struct uvSegmentBuffer buf; + struct uvSegmentBuffer buf = {0}; struct raft_buffer data; - struct raft_entry entry; + struct raft_entry entry = {0}; size_t cap; char errmsg[RAFT_ERRMSG_BUF_SIZE]; int rv; diff --git a/test/integration/test_fixture.c b/test/integration/test_fixture.c index 03f49d554..443229a68 100644 --- a/test/integration/test_fixture.c +++ b/test/integration/test_fixture.c @@ -26,7 +26,7 @@ static void *setUp(const MunitParameter params[], MUNIT_UNUSED void *user_data) int rc; SET_UP_HEAP; for (i = 0; i < N_SERVERS; i++) { - FsmInit(&f->fsms[i]); + FsmInit(&f->fsms[i], 2); } rc = raft_fixture_init(&f->fixture, N_SERVERS, f->fsms); diff --git a/test/integration/test_snapshot.c b/test/integration/test_snapshot.c index 0587c2fbc..b6da06226 100644 --- a/test/integration/test_snapshot.c +++ b/test/integration/test_snapshot.c @@ -62,6 +62,46 @@ static void tearDown(void *data) } \ } +static int ioMethodSnapshotPutFail(struct raft_io *raft_io, + unsigned trailing, + struct raft_io_snapshot_put *req, + const struct raft_snapshot *snapshot, + raft_io_snapshot_put_cb cb) +{ + (void) raft_io; + (void) trailing; + (void) req; + (void) snapshot; + (void) cb; + return -1; +} + +#define SET_FAULTY_SNAPSHOT_PUT() \ + { \ + unsigned i; \ + for (i = 0; i < CLUSTER_N; i++) { \ + CLUSTER_RAFT(i)->io->snapshot_put = ioMethodSnapshotPutFail; \ + } \ + } + +static int fsmSnapshotFail(struct raft_fsm *fsm, + struct raft_buffer *bufs[], + unsigned *n_bufs) +{ + (void) fsm; + (void) bufs; + (void) n_bufs; + return -1; +} + +#define SET_FAULTY_SNAPSHOT() \ + { \ + unsigned i; \ + for (i = 0; i < CLUSTER_N; i++) { \ + CLUSTER_RAFT(i)->fsm->snapshot = fsmSnapshotFail; \ + } \ + } + /****************************************************************************** * * Successfully install a snapshot @@ -475,8 +515,14 @@ TEST(snapshot, installSnapshotDuringEntriesWrite, setUp, tearDown, 0, NULL) return MUNIT_OK; } +static char *fsm_version[] = {"1", "2", NULL}; +static MunitParameterEnum fsm_version_params[] = { + {CLUSTER_FSM_VERSION_PARAM, fsm_version}, + {NULL, NULL}, +}; + /* Follower receives AppendEntries RPCs while taking a snapshot */ -TEST(snapshot, takeSnapshotAppendEntries, setUp, tearDown, 0, NULL) +TEST(snapshot, takeSnapshotAppendEntries, setUp, tearDown, 0, fsm_version_params) { struct fixture *f = data; (void)params; @@ -512,3 +558,43 @@ TEST(snapshot, takeSnapshotAppendEntries, setUp, tearDown, 0, NULL) CLUSTER_STEP_UNTIL_APPLIED(1, 11, 5000); return MUNIT_OK; } + +TEST(snapshot, takeSnapshotSnapshotPutFail, setUp, tearDown, 0, fsm_version_params) +{ + struct fixture *f = data; + (void)params; + + SET_FAULTY_SNAPSHOT_PUT(); + + /* Set very low threshold and trailing entries number */ + SET_SNAPSHOT_THRESHOLD(3); + SET_SNAPSHOT_TRAILING(1); + + /* Apply a few of entries, to force a snapshot to be taken. */ + CLUSTER_MAKE_PROGRESS; + CLUSTER_MAKE_PROGRESS; + CLUSTER_MAKE_PROGRESS; + + /* No crash or leaks have occurred */ + return MUNIT_OK; +} + +TEST(snapshot, takeSnapshotFail, setUp, tearDown, 0, fsm_version_params) +{ + struct fixture *f = data; + (void)params; + + SET_FAULTY_SNAPSHOT(); + + /* Set very low threshold and trailing entries number */ + SET_SNAPSHOT_THRESHOLD(3); + SET_SNAPSHOT_TRAILING(1); + + /* Apply a few of entries, to force a snapshot to be taken. */ + CLUSTER_MAKE_PROGRESS; + CLUSTER_MAKE_PROGRESS; + CLUSTER_MAKE_PROGRESS; + + /* No crash or leaks have occurred */ + return MUNIT_OK; +} diff --git a/test/lib/cluster.h b/test/lib/cluster.h index 48d149e54..620bb91ce 100644 --- a/test/lib/cluster.h +++ b/test/lib/cluster.h @@ -19,38 +19,43 @@ /* N is the default number of servers, but can be tweaked with the cluster-n * parameter. */ -#define SETUP_CLUSTER(DEFAULT_N) \ - SET_UP_HEAP; \ - do { \ - unsigned _n = DEFAULT_N; \ - bool _pre_vote = false; \ - unsigned _hb = 0; \ - unsigned _i; \ - int _rv; \ - if (munit_parameters_get(params, CLUSTER_N_PARAM) != NULL) { \ - _n = atoi(munit_parameters_get(params, CLUSTER_N_PARAM)); \ - } \ - if (munit_parameters_get(params, CLUSTER_PRE_VOTE_PARAM) != NULL) { \ - _pre_vote = \ - atoi(munit_parameters_get(params, CLUSTER_PRE_VOTE_PARAM)); \ - } \ - if (munit_parameters_get(params, CLUSTER_HEARTBEAT_PARAM) != NULL) { \ - _hb = \ - atoi(munit_parameters_get(params, CLUSTER_HEARTBEAT_PARAM)); \ - } \ - munit_assert_int(_n, >, 0); \ - for (_i = 0; _i < _n; _i++) { \ - FsmInit(&f->fsms[_i]); \ - } \ - _rv = raft_fixture_init(&f->cluster, _n, f->fsms); \ - munit_assert_int(_rv, ==, 0); \ - for (_i = 0; _i < _n; _i++) { \ - raft_set_pre_vote(raft_fixture_get(&f->cluster, _i), _pre_vote); \ - if (_hb) { \ - raft_set_heartbeat_timeout(raft_fixture_get(&f->cluster, _i),\ - _hb); \ - } \ - } \ +#define SETUP_CLUSTER(DEFAULT_N) \ + SET_UP_HEAP; \ + do { \ + unsigned _n = DEFAULT_N; \ + bool _pre_vote = false; \ + int _fsm_version = 2; \ + unsigned _hb = 0; \ + unsigned _i; \ + int _rv; \ + if (munit_parameters_get(params, CLUSTER_N_PARAM) != NULL) { \ + _n = atoi(munit_parameters_get(params, CLUSTER_N_PARAM)); \ + } \ + if (munit_parameters_get(params, CLUSTER_PRE_VOTE_PARAM) != NULL) { \ + _pre_vote = \ + atoi(munit_parameters_get(params, CLUSTER_PRE_VOTE_PARAM)); \ + } \ + if (munit_parameters_get(params, CLUSTER_HEARTBEAT_PARAM) != NULL) { \ + _hb = \ + atoi(munit_parameters_get(params, CLUSTER_HEARTBEAT_PARAM)); \ + } \ + if (munit_parameters_get(params, CLUSTER_FSM_VERSION_PARAM) != NULL) { \ + _fsm_version = \ + atoi(munit_parameters_get(params, CLUSTER_FSM_VERSION_PARAM)); \ + } \ + munit_assert_int(_n, >, 0); \ + for (_i = 0; _i < _n; _i++) { \ + FsmInit(&f->fsms[_i], _fsm_version); \ + } \ + _rv = raft_fixture_init(&f->cluster, _n, f->fsms); \ + munit_assert_int(_rv, ==, 0); \ + for (_i = 0; _i < _n; _i++) { \ + raft_set_pre_vote(raft_fixture_get(&f->cluster, _i), _pre_vote); \ + if (_hb) { \ + raft_set_heartbeat_timeout(raft_fixture_get(&f->cluster, _i), \ + _hb); \ + } \ + } \ } while (0) #define TEAR_DOWN_CLUSTER \ @@ -75,6 +80,9 @@ /* Munit parameter for setting HeartBeat timeout */ #define CLUSTER_HEARTBEAT_PARAM "cluster-heartbeat" +/* Munit parameter for setting snapshot behaviour */ +#define CLUSTER_FSM_VERSION_PARAM "fsm-version" + /* Get the number of servers in the cluster. */ #define CLUSTER_N raft_fixture_n(&f->cluster) @@ -284,7 +292,7 @@ #define CLUSTER_GROW \ { \ int rv_; \ - FsmInit(&f->fsms[CLUSTER_N]); \ + FsmInit(&f->fsms[CLUSTER_N], 2); \ rv_ = raft_fixture_grow(&f->cluster, &f->fsms[CLUSTER_N]); \ munit_assert_int(rv_, ==, 0); \ } diff --git a/test/lib/fsm.c b/test/lib/fsm.c index 36a96592b..552eeacdd 100644 --- a/test/lib/fsm.c +++ b/test/lib/fsm.c @@ -8,6 +8,8 @@ struct fsm { int x; int y; + int lock; + void *data; }; /* Command codes */ @@ -104,18 +106,58 @@ static int fsmSnapshot(struct raft_fsm *fsm, return fsmEncodeSnapshot(f->x, f->y, bufs, n_bufs); } -void FsmInit(struct raft_fsm *fsm) +static int fsmSnapshotInitialize(struct raft_fsm *fsm, + struct raft_buffer *bufs[], + unsigned *n_bufs) +{ + struct fsm *f = fsm->data; + munit_assert_int(f->lock, ==, 0); + f->lock = 1; + f->data = raft_malloc(8); /* Detect proper cleanup in finalize */ + munit_assert_ptr_not_null(f->data); + return fsmEncodeSnapshot(f->x, f->y, bufs, n_bufs); +} + +static int fsmSnapshotFinalize(struct raft_fsm *fsm, + struct raft_buffer *bufs[], + unsigned *n_bufs) +{ + (void) bufs; + (void) n_bufs; + struct fsm *f = fsm->data; + if (*bufs != NULL) { + for (unsigned i = 0; i < *n_bufs; ++i) { + raft_free((*bufs)[i].base); + } + raft_free(*bufs); + } + *bufs = NULL; + *n_bufs = 0; + munit_assert_int(f->lock, ==, 1); + f->lock = 0; + raft_free(f->data); + return 0; +} + +void FsmInit(struct raft_fsm *fsm, int version) { struct fsm *f = munit_malloc(sizeof *fsm); + memset(fsm, 'x', sizeof(*fsm)); /* Fill with garbage */ f->x = 0; f->y = 0; + f->lock = 0; + f->data = NULL; - fsm->version = 1; + fsm->version = version; fsm->data = f; fsm->apply = fsmApply; fsm->snapshot = fsmSnapshot; fsm->restore = fsmRestore; + if (version > 1) { + fsm->snapshot = fsmSnapshotInitialize; + fsm->snapshot_finalize = fsmSnapshotFinalize; + } } void FsmClose(struct raft_fsm *fsm) diff --git a/test/lib/fsm.h b/test/lib/fsm.h index 76eddbf9d..ff7c008be 100644 --- a/test/lib/fsm.h +++ b/test/lib/fsm.h @@ -7,7 +7,7 @@ #include "../../include/raft.h" -void FsmInit(struct raft_fsm *fsm); +void FsmInit(struct raft_fsm *fsm, int version); void FsmClose(struct raft_fsm *fsm);