Skip to content
This repository has been archived by the owner on Mar 4, 2024. It is now read-only.

Commit

Permalink
raft_fsm: version 3 introducing async snapshot
Browse files Browse the repository at this point in the history
  • Loading branch information
Mathieu Borderé committed Jun 29, 2022
1 parent 9e12b06 commit b98e31b
Show file tree
Hide file tree
Showing 10 changed files with 370 additions and 32 deletions.
5 changes: 3 additions & 2 deletions Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -105,14 +105,15 @@ test_integration_core_SOURCES = \
test/integration/test_election.c \
test/integration/test_fixture.c \
test/integration/test_heap.c \
test/integration/test_init.c \
test/integration/test_membership.c \
test/integration/test_recover.c \
test/integration/test_replication.c \
test/integration/test_snapshot.c \
test/integration/test_start.c \
test/integration/test_strerror.c \
test/integration/test_tick.c \
test/integration/test_transfer.c \
test/integration/test_start.c
test/integration/test_transfer.c
test_integration_core_CFLAGS = $(AM_CFLAGS) -Wno-conversion
test_integration_core_LDFLAGS = -no-install
test_integration_core_LDADD = libtest.la libraft.la
Expand Down
20 changes: 18 additions & 2 deletions include/raft.h
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ struct raft_io_snapshot_get
struct raft_io_async_work;
typedef int (*raft_io_async_work_fn)(struct raft_io_async_work *req);
typedef void (*raft_io_async_work_cb)(struct raft_io_async_work *req,
int status);
int status);
struct raft_io_async_work
{
void *data; /* User data */
Expand Down Expand Up @@ -529,10 +529,22 @@ struct raft_io
* `snapshot_finalize` can be used to e.g. release a lock that was taken during
* a call to `snapshot`. Until `snapshot_finalize` is called, raft can access
* the data contained in the `raft_buffer`s.
*
* version 3:
* Adds support for async snapshots through the `snapshot_async` function.
* When this method is provided, raft will call `snapshot` in the main loop,
* and when successful, will call `snapshot_async` by means of the `io->async_work` method.
* The callback to `io->async_work` will in turn call `snapshot_finalize`
* in the main loop when the work has completed independent of the return value
* of `snapshot_async`.
* An implementation that does not use asynchronous snapshots MUST set
* `snapshot_async` to NULL.
* All memory allocated by the snapshot routines MUST be freed by the snapshot
* routines themselves.
*/
struct raft_fsm
{
int version;
int version; /* 1, 2 or 3 */
void *data;
int (*apply)(struct raft_fsm *fsm,
const struct raft_buffer *buf,
Expand All @@ -545,6 +557,10 @@ struct raft_fsm
int (*snapshot_finalize)(struct raft_fsm *fsm,
struct raft_buffer *bufs[],
unsigned *n_bufs);
/* Fields below added since version 3. */
int (*snapshot_async)(struct raft_fsm *fsm,
struct raft_buffer *bufs[],
unsigned *n_bufs);
};

/**
Expand Down
25 changes: 25 additions & 0 deletions src/raft.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@
#define DEFAULT_MAX_CATCH_UP_ROUNDS 10
#define DEFAULT_MAX_CATCH_UP_ROUND_DURATION (5 * 1000)

static int ioFsmCompat(struct raft *r,
struct raft_io *io,
struct raft_fsm *fsm);

int raft_init(struct raft *r,
struct raft_io *io,
struct raft_fsm *fsm,
Expand All @@ -32,6 +36,12 @@ int raft_init(struct raft *r,
{
int rv;
assert(r != NULL);

rv = ioFsmCompat(r, io, fsm);
if (rv != 0) {
goto err;
}

r->io = io;
r->io->data = r;
r->fsm = fsm;
Expand Down Expand Up @@ -227,3 +237,18 @@ unsigned long long raft_digest(const char *text, unsigned long long n)

return byteFlip64(digest);
}

static int ioFsmCompat(struct raft *r,
struct raft_io *io,
struct raft_fsm *fsm)
{
if ((fsm->version > 2 && fsm->snapshot_async != NULL)
&& ((io->version < 2) || (io->async_work == NULL))) {
ErrMsgPrintf(r->errmsg,
"async snapshot requires io->version > 1 and async_work method.");
return -1;
}

return 0;
}

79 changes: 69 additions & 10 deletions src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -1455,12 +1455,58 @@ static void takeSnapshotCb(struct raft_io_snapshot_put *req, int status)
}

logSnapshot(&r->log, snapshot->index, r->snapshot.trailing);

out:
takeSnapshotClose(r, &r->snapshot.pending);
takeSnapshotClose(r, snapshot);
r->snapshot.pending.term = 0;
}

static int putSnapshot(struct raft *r, struct raft_snapshot *snapshot,
raft_io_snapshot_put_cb cb)
{
int rv;
assert(r->snapshot.put.data == NULL);
r->snapshot.put.data = r;
rv = r->io->snapshot_put(r->io, r->snapshot.trailing, &r->snapshot.put,
snapshot, cb);
if (rv != 0) {
takeSnapshotClose(r, snapshot);
r->snapshot.pending.term = 0;
r->snapshot.put.data = NULL;
}

return rv;
}

static void takeSnapshotDoneCb(struct raft_io_async_work *take,
int status) {
struct raft *r = take->data;
struct raft_snapshot *snapshot = &r->snapshot.pending;
int rv;

raft_free(take);

if (status != 0) {
tracef("take snapshot failed %s", raft_strerror(status));
takeSnapshotClose(r, snapshot);
r->snapshot.pending.term = 0;
r->snapshot.put.data = NULL;
return;
}

rv = putSnapshot(r, snapshot, takeSnapshotCb);
if (rv != 0) {
tracef("put snapshot failed %d", rv);
}
}

static int takeSnapshotAsync(struct raft_io_async_work *take)
{
struct raft *r = take->data;
tracef("take snapshot async at %lld", r->snapshot.pending.index);
struct raft_snapshot *snapshot = &r->snapshot.pending;
return r->fsm->snapshot_async(r->fsm, &snapshot->bufs, &snapshot->n_bufs);
}

static int takeSnapshot(struct raft *r)
{
struct raft_snapshot *snapshot;
Expand All @@ -1471,12 +1517,13 @@ static int takeSnapshot(struct raft *r)
snapshot = &r->snapshot.pending;
snapshot->index = r->last_applied;
snapshot->term = logTermOf(&r->log, r->last_applied);
snapshot->bufs = NULL;
snapshot->n_bufs = 0;

rv = configurationCopy(&r->configuration, &snapshot->configuration);
if (rv != 0) {
goto abort;
}

snapshot->configuration_index = r->configuration_index;

rv = r->fsm->snapshot(r->fsm, &snapshot->bufs, &snapshot->n_bufs);
Expand All @@ -1489,17 +1536,29 @@ static int takeSnapshot(struct raft *r)
goto abort;
}

assert(r->snapshot.put.data == NULL);
r->snapshot.put.data = r;
rv = r->io->snapshot_put(r->io, r->snapshot.trailing, &r->snapshot.put,
snapshot, takeSnapshotCb);
if (rv != 0) {
goto abort_after_fsm_snapshot;
bool sync_snapshot = r->fsm->version < 3 || r->fsm->snapshot_async == NULL;
if (sync_snapshot) {
/* putSnapshot will clean up config and buffers in case of error */
return putSnapshot(r, snapshot, takeSnapshotCb);
} else {
struct raft_io_async_work *take = raft_malloc(sizeof(*take));
if (take == NULL) {
rv = RAFT_NOMEM;
goto abort_after_snapshot;
}
take->data = r;
take->work = takeSnapshotAsync;
rv = r->io->async_work(r->io, take, takeSnapshotDoneCb);
if (rv != 0) {
raft_free(take);
goto abort_after_snapshot;
}
}

return 0;

abort_after_fsm_snapshot:
abort_after_snapshot:
/* Closes config and finalizes snapshot */
takeSnapshotClose(r, snapshot);
abort:
r->snapshot.pending.term = 0;
Expand Down
5 changes: 4 additions & 1 deletion src/uv_snapshot.c
Original file line number Diff line number Diff line change
Expand Up @@ -634,7 +634,10 @@ int UvSnapshotPut(struct raft_io *io,
raft_index next_index;

uv = io->impl;
assert(!uv->closing);
if (uv->closing) {
return RAFT_CANCELED;
}

assert(uv->snapshot_put_work.data == NULL);

tracef("put snapshot at %lld, keeping %d", snapshot->index, trailing);
Expand Down
52 changes: 52 additions & 0 deletions test/integration/test_init.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
#include "../../include/raft.h"
#include "../lib/runner.h"

/******************************************************************************
*
* raft_init
*
*****************************************************************************/

SUITE(raft_init)

/* Incompatible raft->io and raft->fsm wrt async snapshots. */
TEST(raft_init, incompatIoFsmAsyncSnapshotNotNull, NULL, NULL, 0, NULL)
{
/* Set incompatible io and fsm versions and non-NULL snapshot_async fn */
struct raft r = {0};
struct raft_io io = {0};
struct raft_fsm fsm = {0};
io.version = 1; /* Too low */
io.async_work = (int (*)(struct raft_io *, struct raft_io_async_work *,
raft_io_async_work_cb))(uintptr_t)0xDEADBEEF;
fsm.version = 3;
fsm.snapshot_async = (int (*)(struct raft_fsm *,
struct raft_buffer **, unsigned int *))(uintptr_t)0xDEADBEEF;

int rc;
rc = raft_init(&r, &io, &fsm, 1, "1");
munit_assert_int(rc, ==, -1);
munit_assert_string_equal(r.errmsg, "async snapshot requires io->version > 1 and async_work method.");
return MUNIT_OK;
}

/* Incompatible raft->io and raft->fsm wrt async snapshots. */
TEST(raft_init, incompatIoFsmAsyncSnapshotNull, NULL, NULL, 0, NULL)
{
/* Set incompatible io and fsm versions and NULL snapshot_async fn */
struct raft r = {0};
struct raft_io io = {0};
struct raft_fsm fsm = {0};
io.version = 2;
io.async_work = NULL;
fsm.version = 3;
fsm.snapshot_async = (int (*)(struct raft_fsm *,
struct raft_buffer **, unsigned int *))(uintptr_t)0xDEADBEEF;

int rc;
rc = raft_init(&r, &io, &fsm, 1, "1");
munit_assert_int(rc, ==, -1);
munit_assert_string_equal(r.errmsg, "async snapshot requires io->version > 1 and async_work method.");
return MUNIT_OK;
}

Loading

0 comments on commit b98e31b

Please sign in to comment.