diff --git a/Makefile.am b/Makefile.am index 6e8293a87..17dede867 100644 --- a/Makefile.am +++ b/Makefile.am @@ -8,7 +8,7 @@ raftinclude_HEADERS = lib_LTLIBRARIES = libraft.la libraft_la_CFLAGS = $(AM_CFLAGS) -fvisibility=hidden -libraft_la_LDFLAGS = -version-info 1:0:0 +libraft_la_LDFLAGS = -version-info 0:7:0 libraft_la_SOURCES = \ src/byte.c \ src/client.c \ @@ -150,7 +150,6 @@ libraft_la_SOURCES += \ src/uv_tcp_listen.c \ src/uv_tcp_connect.c \ src/uv_truncate.c \ - src/uv_work.c \ src/uv_writer.c libraft_la_LDFLAGS += $(UV_LIBS) @@ -199,8 +198,7 @@ test_integration_uv_SOURCES = \ test/integration/test_uv_tcp_connect.c \ test/integration/test_uv_tcp_listen.c \ test/integration/test_uv_snapshot_put.c \ - test/integration/test_uv_truncate.c \ - test/integration/test_uv_work.c + test/integration/test_uv_truncate.c test_integration_uv_CFLAGS = $(AM_CFLAGS) -Wno-type-limits -Wno-conversion test_integration_uv_LDFLAGS = -no-install $(UV_LIBS) test_integration_uv_LDADD = libtest.la diff --git a/configure.ac b/configure.ac index 8b3b8f5fb..df7bea9b0 100644 --- a/configure.ac +++ b/configure.ac @@ -1,5 +1,5 @@ AC_PREREQ(2.60) -AC_INIT([raft], [0.12.0]) +AC_INIT([raft], [0.11.2]) AC_LANG([C]) AC_CONFIG_MACRO_DIR([m4]) AC_CONFIG_AUX_DIR([ac]) diff --git a/include/raft.h b/include/raft.h index bb2c5886b..9ce4aacb1 100644 --- a/include/raft.h +++ b/include/raft.h @@ -417,20 +417,6 @@ struct raft_io_snapshot_get raft_io_snapshot_get_cb cb; /* Request callback */ }; -/** - * Asynchronous work request. - */ -struct raft_io_async_work; -typedef int (*raft_io_async_work_fn)(struct raft_io_async_work *req); -typedef void (*raft_io_async_work_cb)(struct raft_io_async_work *req, - int status); -struct raft_io_async_work -{ - void *data; /* User data */ - raft_io_async_work_fn work; /* Function to run async from the main loop */ - raft_io_async_work_cb cb; /* Request callback */ -}; - /** * Customizable tracer, for debugging purposes. */ @@ -513,46 +499,11 @@ struct raft_io raft_io_snapshot_get_cb cb); raft_time (*time)(struct raft_io *io); int (*random)(struct raft_io *io, int min, int max); - int (*async_work)(struct raft_io *io, - struct raft_io_async_work *req, - raft_io_async_work_cb cb); }; -/* - * version 1: - * struct raft_fsm - * { - * int version; - * void *data; - * int (*apply)(struct raft_fsm *fsm, - * const struct raft_buffer *buf, - * void **result); - * int (*snapshot)(struct raft_fsm *fsm, - * struct raft_buffer *bufs[], - * unsigned *n_bufs); - * int (*restore)(struct raft_fsm *fsm, struct raft_buffer *buf); - * }; - * - * version 2: - * Adds support for async snapshots through the `snapshot_async` - * and `snapshot_finalize` functions. - * An implementation must provide either the `snapshot` method or all 3 - * snapshot methods. When only the `snapshot` method is provided, the behavior - * is identical to version 1, `snapshot` will be called in the main loop and the - * allocated buffers will be released by raft. - * When all 3 methods are provided, raft will call `snapshot` in the main loop, - * and when successful, will call `snapshot_async` by means of the `io->async_work` method. - * The callback to `io->async_work` will in turn call `snapshot_finalize` - * in the main loop when the work has completed independent of the return value - * of `snapshot_async`. - * An implementation that does not use asynchronous snapshots MUST set - * `snapshot_async` and `snapshot_finalize` to NULL. - * All memory allocated by the snapshot routines MUST be freed by the snapshot - * routines themselves. - */ struct raft_fsm { - int version; /* 1 or 2 */ + int version; void *data; int (*apply)(struct raft_fsm *fsm, const struct raft_buffer *buf, @@ -561,12 +512,6 @@ struct raft_fsm struct raft_buffer *bufs[], unsigned *n_bufs); int (*restore)(struct raft_fsm *fsm, struct raft_buffer *buf); - int (*snapshot_async)(struct raft_fsm *fsm, - struct raft_buffer *bufs[], - unsigned *n_bufs); - int (*snapshot_finalize)(struct raft_fsm *fsm, - struct raft_buffer *bufs[], - unsigned *n_bufs); }; /** diff --git a/include/raft/fixture.h b/include/raft/fixture.h index 731e8dbe8..c27c82f68 100644 --- a/include/raft/fixture.h +++ b/include/raft/fixture.h @@ -16,8 +16,7 @@ enum { RAFT_FIXTURE_TICK = 1, /* The tick callback has been invoked */ RAFT_FIXTURE_NETWORK, /* A network request has been sent or received */ - RAFT_FIXTURE_DISK, /* An I/O request has been submitted */ - RAFT_FIXTURE_WORK /* A large, CPU and/or memory intensive task */ + RAFT_FIXTURE_DISK /* An I/O request has been submitted */ }; /** diff --git a/src/fixture.c b/src/fixture.c index 237614a39..4cfbbbbf5 100644 --- a/src/fixture.c +++ b/src/fixture.c @@ -21,7 +21,6 @@ #define ELECTION_TIMEOUT 1000 #define NETWORK_LATENCY 15 #define DISK_LATENCY 10 -#define WORK_DURATION 200 /* To keep in sync with raft.h */ #define N_MESSAGE_TYPES 6 @@ -37,7 +36,7 @@ queue queue /* Link the I/O pending requests queue. */ /* Request type codes. */ -enum { APPEND = 1, SEND, TRANSMIT, SNAPSHOT_PUT, SNAPSHOT_GET, ASYNC_WORK }; +enum { APPEND = 1, SEND, TRANSMIT, SNAPSHOT_PUT, SNAPSHOT_GET }; /* Abstract base type for an asynchronous request submitted to the stub I/o * implementation. */ @@ -73,13 +72,6 @@ struct snapshot_put const struct raft_snapshot *snapshot; }; -/* Pending request to perform general work. */ -struct async_work -{ - REQUEST; - struct raft_io_async_work *req; -}; - /* Pending request to load a snapshot. */ struct snapshot_get { @@ -139,7 +131,6 @@ struct io unsigned randomized_election_timeout; /* Value returned by io->random() */ unsigned network_latency; /* Milliseconds to deliver RPCs */ unsigned disk_latency; /* Milliseconds to perform disk I/O */ - unsigned work_duration; /* Milliseconds to long running work */ struct { @@ -286,16 +277,6 @@ static void ioFlushSnapshotGet(struct io *s, struct snapshot_get *r) raft_free(r); } -/* Flush an async work request */ -static void ioFlushAsyncWork(struct io *s, struct async_work *r) -{ - (void) s; - int rv; - rv = r->req->work(r->req); - r->req->cb(r->req, rv); - raft_free(r); -} - /* Search for the peer with the given ID. */ static struct peer *ioGetPeer(struct io *io, raft_id id) { @@ -429,9 +410,6 @@ static void ioFlushAll(struct io *io) case SNAPSHOT_GET: ioFlushSnapshotGet(io, (struct snapshot_get *)r); break; - case ASYNC_WORK: - ioFlushAsyncWork(io, (struct async_work *)r); - break; default: assert(0); } @@ -668,28 +646,6 @@ static int ioMethodSnapshotPut(struct raft_io *raft_io, return 0; } -static int ioMethodAsyncWork(struct raft_io *raft_io, - struct raft_io_async_work *req, - raft_io_async_work_cb cb) -{ - struct io *io = raft_io->impl; - struct raft *raft = raft_io->data; - struct async_work *r; - - r = raft_malloc(sizeof *r); - assert(r != NULL); - - r->type = ASYNC_WORK; - r->req = req; - r->req->cb = cb; - r->req->data = raft; - r->completion_time = *io->time + io->work_duration; - - QUEUE_PUSH(&io->requests, &r->queue); - return 0; -} - - static int ioMethodSnapshotGet(struct raft_io *raft_io, struct raft_io_snapshot_get *req, raft_io_snapshot_get_cb cb) @@ -892,7 +848,6 @@ static int ioInit(struct raft_io *raft_io, unsigned index, raft_time *time) io->randomized_election_timeout = ELECTION_TIMEOUT + index * 100; io->network_latency = NETWORK_LATENCY; io->disk_latency = DISK_LATENCY; - io->work_duration = WORK_DURATION; io->fault.countdown = -1; io->fault.n = -1; memset(io->drop, 0, sizeof io->drop); @@ -913,7 +868,6 @@ static int ioInit(struct raft_io *raft_io, unsigned index, raft_time *time) raft_io->truncate = ioMethodTruncate; raft_io->send = ioMethodSend; raft_io->snapshot_put = ioMethodSnapshotPut; - raft_io->async_work = ioMethodAsyncWork; raft_io->snapshot_get = ioMethodSnapshotGet; raft_io->time = ioMethodTime; raft_io->random = ioMethodRandom; @@ -1413,10 +1367,6 @@ static void completeRequest(struct raft_fixture *f, unsigned i, raft_time t) ioFlushSnapshotGet(io, (struct snapshot_get *)r); f->event.type = RAFT_FIXTURE_DISK; break; - case ASYNC_WORK: - ioFlushAsyncWork(io, (struct async_work *)r); - f->event.type = RAFT_FIXTURE_WORK; - break; default: assert(0); } diff --git a/src/replication.c b/src/replication.c index bc3629a53..ca65ecf0b 100644 --- a/src/replication.c +++ b/src/replication.c @@ -137,18 +137,6 @@ static int sendAppendEntries(struct raft *r, return rv; } -static void takeSnapshotClose(struct raft *r, struct raft_snapshot *s) -{ - if (r->fsm->version == 1 || - (r->fsm->version > 1 && r->fsm->snapshot_async == NULL)) { - snapshotClose(s); - return; - } - - configurationClose(&s->configuration); - r->fsm->snapshot_finalize(r->fsm, &s->bufs, &s->n_bufs); -} - /* Context of a RAFT_IO_INSTALL_SNAPSHOT request that was submitted with * raft_io_>send(). */ struct sendInstallSnapshot @@ -543,7 +531,7 @@ static void appendLeaderCb(struct raft_io_append *req, int status) /* Submit a disk write for all entries from the given index onward. */ static int appendLeader(struct raft *r, raft_index index) { - struct raft_entry *entries = NULL; + struct raft_entry *entries; unsigned n; struct appendLeader *request; int rv; @@ -1451,59 +1439,16 @@ static void takeSnapshotCb(struct raft_io_snapshot_put *req, int status) } logSnapshot(&r->log, snapshot->index, r->snapshot.trailing); + out: - takeSnapshotClose(r, snapshot); + snapshotClose(&r->snapshot.pending); r->snapshot.pending.term = 0; } -static int putSnapshot(struct raft *r, struct raft_snapshot *snapshot, - raft_io_snapshot_put_cb cb) -{ - int rv; - assert(r->snapshot.put.data == NULL); - r->snapshot.put.data = r; - rv = r->io->snapshot_put(r->io, r->snapshot.trailing, &r->snapshot.put, - snapshot, cb); - if (rv != 0) { - takeSnapshotClose(r, snapshot); - r->snapshot.pending.term = 0; - r->snapshot.put.data = NULL; - } - - return rv; -} - -static void takeSnapshotDoneCb(struct raft_io_async_work *take, - int status) { - struct raft *r = take->data; - struct raft_snapshot *snapshot = &r->snapshot.pending; - int rv; - - raft_free(take); - - if (status != 0) { - tracef("take snapshot failed %s", raft_strerror(status)); - takeSnapshotClose(r, snapshot); - return; - } - - rv = putSnapshot(r, snapshot, takeSnapshotCb); - if (rv != 0) { - tracef("put snapshot failed %d", rv); - } -} - -static int takeSnapshotAsync(struct raft_io_async_work *take) -{ - struct raft *r = take->data; - tracef("take snapshot async at %lld", r->snapshot.pending.index); - struct raft_snapshot *snapshot = &r->snapshot.pending; - return r->fsm->snapshot_async(r->fsm, &snapshot->bufs, &snapshot->n_bufs); -} - static int takeSnapshot(struct raft *r) { struct raft_snapshot *snapshot; + unsigned i; int rv; tracef("take snapshot at %lld", r->last_applied); @@ -1511,13 +1456,12 @@ static int takeSnapshot(struct raft *r) snapshot = &r->snapshot.pending; snapshot->index = r->last_applied; snapshot->term = logTermOf(&r->log, r->last_applied); - snapshot->bufs = NULL; - snapshot->n_bufs = 0; rv = configurationCopy(&r->configuration, &snapshot->configuration); if (rv != 0) { goto abort; } + snapshot->configuration_index = r->configuration_index; rv = r->fsm->snapshot(r->fsm, &snapshot->bufs, &snapshot->n_bufs); @@ -1526,35 +1470,26 @@ static int takeSnapshot(struct raft *r) if (rv == RAFT_BUSY) { rv = 0; } - raft_configuration_close(&snapshot->configuration); - goto abort; + goto abort_after_config_copy; } - bool sync_snapshot = r->fsm->version < 2 || r->fsm->snapshot_async == NULL; - if (sync_snapshot) { - /* putSnapshot will clean up config and buffers case of error */ - return putSnapshot(r, snapshot, takeSnapshotCb); - } else { - struct raft_io_async_work *take = raft_malloc(sizeof(*take)); - if (take == NULL) { - rv = RAFT_NOMEM; - goto abort_after_snapshot; - } - take->cb = NULL; - take->data = r; - take->work = takeSnapshotAsync; - rv = r->io->async_work(r->io, take, takeSnapshotDoneCb); - if (rv != 0) { - raft_free(take); - goto abort_after_snapshot; - } + assert(r->snapshot.put.data == NULL); + r->snapshot.put.data = r; + rv = r->io->snapshot_put(r->io, r->snapshot.trailing, &r->snapshot.put, + snapshot, takeSnapshotCb); + if (rv != 0) { + goto abort_after_fsm_snapshot; } return 0; -abort_after_snapshot: - /* Closes config and finalizes snapshot */ - takeSnapshotClose(r, snapshot); +abort_after_fsm_snapshot: + for (i = 0; i < snapshot->n_bufs; i++) { + raft_free(snapshot->bufs[i].base); + } + raft_free(snapshot->bufs); +abort_after_config_copy: + raft_configuration_close(&snapshot->configuration); abort: r->snapshot.pending.term = 0; return rv; diff --git a/src/uv.c b/src/uv.c index 0f138b04c..49aaed364 100644 --- a/src/uv.c +++ b/src/uv.c @@ -185,9 +185,6 @@ void uvMaybeFireCloseCb(struct uv *uv) if (!QUEUE_IS_EMPTY(&uv->snapshot_get_reqs)) { return; } - if (!QUEUE_IS_EMPTY(&uv->async_work_reqs)) { - return; - } if (!QUEUE_IS_EMPTY(&uv->aborting)) { return; } @@ -668,7 +665,6 @@ int raft_uv_init(struct raft_io *io, uv->finalize_work.data = NULL; uv->truncate_work.data = NULL; QUEUE_INIT(&uv->snapshot_get_reqs); - QUEUE_INIT(&uv->async_work_reqs); uv->snapshot_put_work.data = NULL; uv->timer.data = NULL; uv->tick_cb = NULL; /* Set by raft_io->start() */ @@ -693,7 +689,6 @@ int raft_uv_init(struct raft_io *io, io->send = UvSend; io->snapshot_put = UvSnapshotPut; io->snapshot_get = UvSnapshotGet; - io->async_work = UvAsyncWork; io->time = uvTime; io->random = uvRandom; diff --git a/src/uv.h b/src/uv.h index 662cff2e1..43159fe1c 100644 --- a/src/uv.h +++ b/src/uv.h @@ -83,7 +83,6 @@ struct uv struct uv_work_s finalize_work; /* Resize and rename segments */ struct uv_work_s truncate_work; /* Execute truncate log requests */ queue snapshot_get_reqs; /* Inflight get snapshot requests */ - queue async_work_reqs; /* Inflight async work requests */ struct uv_work_s snapshot_put_work; /* Execute snapshot put requests */ struct uvMetadata metadata; /* Cache of metadata on disk */ struct uv_timer_s timer; /* Timer for periodic ticks */ @@ -273,12 +272,6 @@ int UvSnapshotGet(struct raft_io *io, struct raft_io_snapshot_get *req, raft_io_snapshot_get_cb cb); - -/* Implementation of raft_io->async_work (defined in uv_work.c). */ -int UvAsyncWork(struct raft_io *io, - struct raft_io_async_work *req, - raft_io_async_work_cb cb); - /* Return a list of all snapshots and segments found in the data directory. Both * snapshots and segments are ordered by filename (closed segments come before * open ones). */ diff --git a/src/uv_append.c b/src/uv_append.c index c10c01ff4..22b763602 100644 --- a/src/uv_append.c +++ b/src/uv_append.c @@ -635,7 +635,7 @@ int UvAppend(struct raft_io *io, uv = io->impl; assert(!uv->closing); - append = RaftHeapCalloc(1, sizeof *append); + append = RaftHeapMalloc(sizeof *append); if (append == NULL) { rv = RAFT_NOMEM; goto err; diff --git a/src/uv_segment.c b/src/uv_segment.c index b625a6322..c4d4bba93 100644 --- a/src/uv_segment.c +++ b/src/uv_segment.c @@ -901,9 +901,9 @@ static int uvWriteClosedSegment(struct uv *uv, const struct raft_buffer *conf) { char filename[UV__FILENAME_LEN]; - struct uvSegmentBuffer buf = {0}; + struct uvSegmentBuffer buf; struct raft_buffer data; - struct raft_entry entry = {0}; + struct raft_entry entry; size_t cap; char errmsg[RAFT_ERRMSG_BUF_SIZE]; int rv; diff --git a/src/uv_snapshot.c b/src/uv_snapshot.c index 064397d1d..feff857d7 100644 --- a/src/uv_snapshot.c +++ b/src/uv_snapshot.c @@ -634,10 +634,7 @@ int UvSnapshotPut(struct raft_io *io, raft_index next_index; uv = io->impl; - if (uv->closing) { - return RAFT_CANCELED; - } - + assert(!uv->closing); assert(uv->snapshot_put_work.data == NULL); tracef("put snapshot at %lld, keeping %d", snapshot->index, trailing); diff --git a/src/uv_work.c b/src/uv_work.c deleted file mode 100644 index 1cb6e4fe2..000000000 --- a/src/uv_work.c +++ /dev/null @@ -1,80 +0,0 @@ -#include "assert.h" -#include "heap.h" -#include "uv.h" - -#define tracef(...) Tracef(uv->tracer, __VA_ARGS__) - -struct uvAsyncWork -{ - struct uv *uv; - struct raft_io_async_work *req; - struct uv_work_s work; - int status; - queue queue; -}; - -static void uvAsyncWorkCb(uv_work_t *work) -{ - struct uvAsyncWork *w = work->data; - assert(w != NULL); - int rv; - rv = w->req->work(w->req); - w->status = rv; -} - -static void uvAsyncAfterWorkCb(uv_work_t *work, int status) -{ - struct uvAsyncWork *w = work->data; - struct raft_io_async_work *req = w->req; - int req_status = w->status; - struct uv *uv = w->uv; - assert(status == 0); - - QUEUE_REMOVE(&w->queue); - RaftHeapFree(w); - req->cb(req, req_status); - uvMaybeFireCloseCb(uv); -} - -int UvAsyncWork(struct raft_io *io, - struct raft_io_async_work *req, - raft_io_async_work_cb cb) -{ - struct uv *uv; - struct uvAsyncWork *async_work; - int rv; - - uv = io->impl; - assert(!uv->closing); - - async_work = RaftHeapMalloc(sizeof *async_work); - if (async_work == NULL) { - rv = RAFT_NOMEM; - goto err; - } - - async_work->uv = uv; - async_work->req = req; - async_work->work.data = async_work; - req->cb = cb; - - QUEUE_PUSH(&uv->async_work_reqs, &async_work->queue); - rv = uv_queue_work(uv->loop, &async_work->work, uvAsyncWorkCb, - uvAsyncAfterWorkCb); - if (rv != 0) { - QUEUE_REMOVE(&async_work->queue); - tracef("async work: %s", uv_strerror(rv)); - rv = RAFT_IOERR; - goto err_after_req_alloc; - } - - return 0; - -err_after_req_alloc: - RaftHeapFree(async_work); -err: - assert(rv != 0); - return rv; -} - -#undef tracef diff --git a/test/integration/test_fixture.c b/test/integration/test_fixture.c index 443229a68..03f49d554 100644 --- a/test/integration/test_fixture.c +++ b/test/integration/test_fixture.c @@ -26,7 +26,7 @@ static void *setUp(const MunitParameter params[], MUNIT_UNUSED void *user_data) int rc; SET_UP_HEAP; for (i = 0; i < N_SERVERS; i++) { - FsmInit(&f->fsms[i], 2); + FsmInit(&f->fsms[i]); } rc = raft_fixture_init(&f->fixture, N_SERVERS, f->fsms); diff --git a/test/integration/test_snapshot.c b/test/integration/test_snapshot.c index d0b2aa5b6..0587c2fbc 100644 --- a/test/integration/test_snapshot.c +++ b/test/integration/test_snapshot.c @@ -62,72 +62,6 @@ static void tearDown(void *data) } \ } -static int ioMethodSnapshotPutFail(struct raft_io *raft_io, - unsigned trailing, - struct raft_io_snapshot_put *req, - const struct raft_snapshot *snapshot, - raft_io_snapshot_put_cb cb) -{ - (void) raft_io; - (void) trailing; - (void) req; - (void) snapshot; - (void) cb; - return -1; -} - -#define SET_FAULTY_SNAPSHOT_PUT() \ - { \ - unsigned i; \ - for (i = 0; i < CLUSTER_N; i++) { \ - CLUSTER_RAFT(i)->io->snapshot_put = ioMethodSnapshotPutFail; \ - } \ - } - -static int ioMethodAsyncWorkFail(struct raft_io *raft_io, - struct raft_io_async_work *req, - raft_io_async_work_cb cb) -{ - (void) raft_io; - (void) req; - (void) cb; - return -1; -} - -#define SET_FAULTY_ASYNC_WORK() \ - { \ - unsigned i; \ - for (i = 0; i < CLUSTER_N; i++) { \ - CLUSTER_RAFT(i)->io->async_work = ioMethodAsyncWorkFail; \ - } \ - } - -static int fsmSnapshotFail(struct raft_fsm *fsm, - struct raft_buffer *bufs[], - unsigned *n_bufs) -{ - (void) fsm; - (void) bufs; - (void) n_bufs; - return -1; -} - -#define SET_FAULTY_SNAPSHOT_ASYNC() \ - { \ - unsigned i; \ - for (i = 0; i < CLUSTER_N; i++) { \ - CLUSTER_RAFT(i)->fsm->snapshot_async = fsmSnapshotFail; \ - } \ - } - -#define SET_FAULTY_SNAPSHOT() \ - { \ - unsigned i; \ - for (i = 0; i < CLUSTER_N; i++) { \ - CLUSTER_RAFT(i)->fsm->snapshot = fsmSnapshotFail; \ - } \ - } - /****************************************************************************** * * Successfully install a snapshot @@ -541,24 +475,8 @@ TEST(snapshot, installSnapshotDuringEntriesWrite, setUp, tearDown, 0, NULL) return MUNIT_OK; } -static char *fsm_snapshot_async[] = {"0", "1", NULL}; -static char *fsm_version[] = {"1", "2", NULL}; -static MunitParameterEnum fsm_snapshot_async_params[] = { - {CLUSTER_SS_ASYNC_PARAM, fsm_snapshot_async}, - {CLUSTER_FSM_VERSION_PARAM, fsm_version}, - {NULL, NULL}, -}; - -static char *fsm_snapshot_only_async[] = {"1", NULL}; -static char *fsm_version_only_async[] = {"2", NULL}; -static MunitParameterEnum fsm_snapshot_only_async_params[] = { - {CLUSTER_SS_ASYNC_PARAM, fsm_snapshot_only_async}, - {CLUSTER_FSM_VERSION_PARAM, fsm_version_only_async}, - {NULL, NULL}, -}; - /* Follower receives AppendEntries RPCs while taking a snapshot */ -TEST(snapshot, takeSnapshotAppendEntries, setUp, tearDown, 0, fsm_snapshot_async_params) +TEST(snapshot, takeSnapshotAppendEntries, setUp, tearDown, 0, NULL) { struct fixture *f = data; (void)params; @@ -578,7 +496,7 @@ TEST(snapshot, takeSnapshotAppendEntries, setUp, tearDown, 0, fsm_snapshot_async /* Step the cluster until server 1 takes a snapshot */ const struct raft *r = CLUSTER_RAFT(1); - CLUSTER_STEP_UNTIL(server_taking_snapshot, (void*) r, 3000); + CLUSTER_STEP_UNTIL(server_taking_snapshot, (void*) r, 2000); /* Send AppendEntries RPCs while server 1 is taking a snapshot */ static struct raft_apply reqs[5]; @@ -594,83 +512,3 @@ TEST(snapshot, takeSnapshotAppendEntries, setUp, tearDown, 0, fsm_snapshot_async CLUSTER_STEP_UNTIL_APPLIED(1, 11, 5000); return MUNIT_OK; } - -TEST(snapshot, takeSnapshotSnapshotPutFail, setUp, tearDown, 0, fsm_snapshot_async_params) -{ - struct fixture *f = data; - (void)params; - - SET_FAULTY_SNAPSHOT_PUT(); - - /* Set very low threshold and trailing entries number */ - SET_SNAPSHOT_THRESHOLD(3); - SET_SNAPSHOT_TRAILING(1); - - /* Apply a few of entries, to force a snapshot to be taken. */ - CLUSTER_MAKE_PROGRESS; - CLUSTER_MAKE_PROGRESS; - CLUSTER_MAKE_PROGRESS; - - /* No crash or leaks have occurred */ - return MUNIT_OK; -} - -TEST(snapshot, takeSnapshotAsyncWorkFail, setUp, tearDown, 0, fsm_snapshot_async_params) -{ - struct fixture *f = data; - (void)params; - - SET_FAULTY_ASYNC_WORK(); - - /* Set very low threshold and trailing entries number */ - SET_SNAPSHOT_THRESHOLD(3); - SET_SNAPSHOT_TRAILING(1); - - /* Apply a few of entries, to force a snapshot to be taken. */ - CLUSTER_MAKE_PROGRESS; - CLUSTER_MAKE_PROGRESS; - CLUSTER_MAKE_PROGRESS; - - /* No crash or leaks have occurred */ - return MUNIT_OK; -} - -TEST(snapshot, takeSnapshotAsyncFail, setUp, tearDown, 0, fsm_snapshot_only_async_params) -{ - struct fixture *f = data; - (void)params; - - SET_FAULTY_SNAPSHOT_ASYNC(); - - /* Set very low threshold and trailing entries number */ - SET_SNAPSHOT_THRESHOLD(3); - SET_SNAPSHOT_TRAILING(1); - - /* Apply a few of entries, to force a snapshot to be taken. */ - CLUSTER_MAKE_PROGRESS; - CLUSTER_MAKE_PROGRESS; - CLUSTER_MAKE_PROGRESS; - - /* No crash or leaks have occurred */ - return MUNIT_OK; -} - -TEST(snapshot, takeSnapshotFail, setUp, tearDown, 0, fsm_snapshot_async_params) -{ - struct fixture *f = data; - (void)params; - - SET_FAULTY_SNAPSHOT(); - - /* Set very low threshold and trailing entries number */ - SET_SNAPSHOT_THRESHOLD(3); - SET_SNAPSHOT_TRAILING(1); - - /* Apply a few of entries, to force a snapshot to be taken. */ - CLUSTER_MAKE_PROGRESS; - CLUSTER_MAKE_PROGRESS; - CLUSTER_MAKE_PROGRESS; - - /* No crash or leaks have occurred */ - return MUNIT_OK; -} diff --git a/test/integration/test_uv_work.c b/test/integration/test_uv_work.c deleted file mode 100644 index 6c7bd181d..000000000 --- a/test/integration/test_uv_work.c +++ /dev/null @@ -1,103 +0,0 @@ -#include - -#include "../lib/dir.h" -#include "../lib/loop.h" -#include "../lib/runner.h" -#include "../lib/uv.h" -#include "../../src/uv.h" - -/****************************************************************************** - * - * Fixture - * - *****************************************************************************/ - -struct fixture -{ - FIXTURE_UV_DEPS; - FIXTURE_UV; -}; - -struct result -{ - int rv; /* Indicate success or failure of the work */ - int counter; /* Proof that work was performed */ - bool done; /* To check test termination */ -}; - -/****************************************************************************** - * - * Set up and tear down. - * - *****************************************************************************/ - -static void *setUp(const MunitParameter params[], void *user_data) -{ - struct fixture *f = munit_malloc(sizeof *f); - SETUP_UV_DEPS; - SETUP_UV; - return f; -} - -static void tearDownDeps(void *data) -{ - struct fixture *f = data; - if (f == NULL) { - return; - } - TEAR_DOWN_UV_DEPS; - free(f); -} - -static void tearDown(void *data) -{ - struct fixture *f = data; - if (f == NULL) { - return; - } - TEAR_DOWN_UV; - tearDownDeps(f); -} - -/****************************************************************************** - * - * UvAsyncWork - * - *****************************************************************************/ - -static void asyncWorkCbAssertResult(struct raft_io_async_work *req, int status) -{ - struct result *r = req->data; - munit_assert_int(status, ==, r->rv); - munit_assert_int(r->counter, ==, 1); - r->done = true; -} - -static int asyncWorkFn(struct raft_io_async_work *req) -{ - struct result *r = req->data; - sleep(1); - r->counter = 1; - return r->rv; -} - -SUITE(UvAsyncWork) - -static char* rvs[] = { "-1", "0", "1", "37", NULL }; -static MunitParameterEnum rvs_params[] = { - { "rv", rvs }, - { NULL, NULL }, -}; - -TEST(UvAsyncWork, work, setUp, tearDown, 0, rvs_params) -{ - struct fixture *f = data; - struct result res = {0}; - struct raft_io_async_work req = {0}; - res.rv = (int)strtol(munit_parameters_get(params, "rv"), NULL, 0); - req.data = &res; - req.work = asyncWorkFn; - UvAsyncWork(&f->io, &req, asyncWorkCbAssertResult); - LOOP_RUN_UNTIL(&res.done); - return MUNIT_OK; -} diff --git a/test/lib/cluster.h b/test/lib/cluster.h index bf77d4352..48d149e54 100644 --- a/test/lib/cluster.h +++ b/test/lib/cluster.h @@ -19,52 +19,38 @@ /* N is the default number of servers, but can be tweaked with the cluster-n * parameter. */ -#define SETUP_CLUSTER(DEFAULT_N) \ - SET_UP_HEAP; \ - do { \ - unsigned _n = DEFAULT_N; \ - bool _pre_vote = false; \ - bool _ss_async = false; \ - int _fsm_version = 2; \ - unsigned _hb = 0; \ - unsigned _i; \ - int _rv; \ - if (munit_parameters_get(params, CLUSTER_N_PARAM) != NULL) { \ - _n = atoi(munit_parameters_get(params, CLUSTER_N_PARAM)); \ - } \ - if (munit_parameters_get(params, CLUSTER_PRE_VOTE_PARAM) != NULL) { \ - _pre_vote = \ - atoi(munit_parameters_get(params, CLUSTER_PRE_VOTE_PARAM)); \ - } \ - if (munit_parameters_get(params, CLUSTER_HEARTBEAT_PARAM) != NULL) { \ - _hb = \ - atoi(munit_parameters_get(params, CLUSTER_HEARTBEAT_PARAM)); \ - } \ - if (munit_parameters_get(params, CLUSTER_SS_ASYNC_PARAM) != NULL) { \ - _ss_async = \ - atoi(munit_parameters_get(params, CLUSTER_SS_ASYNC_PARAM)); \ - } \ - if (munit_parameters_get(params, CLUSTER_FSM_VERSION_PARAM) != NULL) { \ - _fsm_version = \ - atoi(munit_parameters_get(params, CLUSTER_FSM_VERSION_PARAM)); \ - } \ - munit_assert_int(_n, >, 0); \ - for (_i = 0; _i < _n; _i++) { \ - if (_ss_async && _fsm_version > 1) { \ - FsmInitAsync(&f->fsms[_i], _fsm_version); \ - } else { \ - FsmInit(&f->fsms[_i], _fsm_version); \ - } \ - } \ - _rv = raft_fixture_init(&f->cluster, _n, f->fsms); \ - munit_assert_int(_rv, ==, 0); \ - for (_i = 0; _i < _n; _i++) { \ - raft_set_pre_vote(raft_fixture_get(&f->cluster, _i), _pre_vote); \ - if (_hb) { \ - raft_set_heartbeat_timeout(raft_fixture_get(&f->cluster, _i), \ - _hb); \ - } \ - } \ +#define SETUP_CLUSTER(DEFAULT_N) \ + SET_UP_HEAP; \ + do { \ + unsigned _n = DEFAULT_N; \ + bool _pre_vote = false; \ + unsigned _hb = 0; \ + unsigned _i; \ + int _rv; \ + if (munit_parameters_get(params, CLUSTER_N_PARAM) != NULL) { \ + _n = atoi(munit_parameters_get(params, CLUSTER_N_PARAM)); \ + } \ + if (munit_parameters_get(params, CLUSTER_PRE_VOTE_PARAM) != NULL) { \ + _pre_vote = \ + atoi(munit_parameters_get(params, CLUSTER_PRE_VOTE_PARAM)); \ + } \ + if (munit_parameters_get(params, CLUSTER_HEARTBEAT_PARAM) != NULL) { \ + _hb = \ + atoi(munit_parameters_get(params, CLUSTER_HEARTBEAT_PARAM)); \ + } \ + munit_assert_int(_n, >, 0); \ + for (_i = 0; _i < _n; _i++) { \ + FsmInit(&f->fsms[_i]); \ + } \ + _rv = raft_fixture_init(&f->cluster, _n, f->fsms); \ + munit_assert_int(_rv, ==, 0); \ + for (_i = 0; _i < _n; _i++) { \ + raft_set_pre_vote(raft_fixture_get(&f->cluster, _i), _pre_vote); \ + if (_hb) { \ + raft_set_heartbeat_timeout(raft_fixture_get(&f->cluster, _i),\ + _hb); \ + } \ + } \ } while (0) #define TEAR_DOWN_CLUSTER \ @@ -89,12 +75,6 @@ /* Munit parameter for setting HeartBeat timeout */ #define CLUSTER_HEARTBEAT_PARAM "cluster-heartbeat" -/* Munit parameter for setting snapshot behaviour */ -#define CLUSTER_SS_ASYNC_PARAM "cluster-snapshot-async" - -/* Munit parameter for setting snapshot behaviour */ -#define CLUSTER_FSM_VERSION_PARAM "fsm-version" - /* Get the number of servers in the cluster. */ #define CLUSTER_N raft_fixture_n(&f->cluster) @@ -304,7 +284,7 @@ #define CLUSTER_GROW \ { \ int rv_; \ - FsmInit(&f->fsms[CLUSTER_N], 2); \ + FsmInit(&f->fsms[CLUSTER_N]); \ rv_ = raft_fixture_grow(&f->cluster, &f->fsms[CLUSTER_N]); \ munit_assert_int(rv_, ==, 0); \ } diff --git a/test/lib/fsm.c b/test/lib/fsm.c index 242fffa49..36a96592b 100644 --- a/test/lib/fsm.c +++ b/test/lib/fsm.c @@ -8,8 +8,6 @@ struct fsm { int x; int y; - int lock; - void *data; }; /* Command codes */ @@ -106,85 +104,18 @@ static int fsmSnapshot(struct raft_fsm *fsm, return fsmEncodeSnapshot(f->x, f->y, bufs, n_bufs); } -static int fsmSnapshotInitialize(struct raft_fsm *fsm, - struct raft_buffer *bufs[], - unsigned *n_bufs) -{ - (void) bufs; - (void) n_bufs; - struct fsm *f = fsm->data; - munit_assert_int(f->lock, ==, 0); - f->lock = 1; - f->data = raft_malloc(8); /* Detect proper cleanup in finalize */ - munit_assert_ptr_not_null(f->data); - return 0; -} - -static int fsmSnapshotAsync(struct raft_fsm *fsm, - struct raft_buffer *bufs[], - unsigned *n_bufs) -{ - struct fsm *f = fsm->data; - return fsmEncodeSnapshot(f->x, f->y, bufs, n_bufs); -} - -static int fsmSnapshotFinalize(struct raft_fsm *fsm, - struct raft_buffer *bufs[], - unsigned *n_bufs) -{ - (void) bufs; - (void) n_bufs; - struct fsm *f = fsm->data; - if (*bufs != NULL) { - for (unsigned i = 0; i < *n_bufs; ++i) { - raft_free((*bufs)[i].base); - } - raft_free(*bufs); - } - *bufs = NULL; - *n_bufs = 0; - munit_assert_int(f->lock, ==, 1); - f->lock = 0; - raft_free(f->data); - return 0; -} - -void FsmInit(struct raft_fsm *fsm, int version) +void FsmInit(struct raft_fsm *fsm) { struct fsm *f = munit_malloc(sizeof *fsm); - memset(fsm, 'x', sizeof(*fsm)); /* Fill with garbage */ f->x = 0; f->y = 0; - f->lock = 0; - fsm->version = version; + fsm->version = 1; fsm->data = f; fsm->apply = fsmApply; fsm->snapshot = fsmSnapshot; fsm->restore = fsmRestore; - if (version > 1) { - fsm->snapshot_async = NULL; - fsm->snapshot_finalize = NULL; - } -} - -void FsmInitAsync(struct raft_fsm *fsm, int version) -{ - munit_assert_int(version, >, 1); - struct fsm *f = munit_malloc(sizeof *fsm); - - f->x = 0; - f->y = 0; - f->lock = 0; - - fsm->version = version; - fsm->data = f; - fsm->apply = fsmApply; - fsm->snapshot = fsmSnapshotInitialize; - fsm->snapshot_async = fsmSnapshotAsync; - fsm->snapshot_finalize = fsmSnapshotFinalize; - fsm->restore = fsmRestore; } void FsmClose(struct raft_fsm *fsm) diff --git a/test/lib/fsm.h b/test/lib/fsm.h index f99ad0acc..76eddbf9d 100644 --- a/test/lib/fsm.h +++ b/test/lib/fsm.h @@ -7,10 +7,7 @@ #include "../../include/raft.h" -void FsmInit(struct raft_fsm *fsm, int version); - -/* Same as FsmInit but with asynchronous snapshots */ -void FsmInitAsync(struct raft_fsm *fsm, int version); +void FsmInit(struct raft_fsm *fsm); void FsmClose(struct raft_fsm *fsm);