Skip to content
This repository has been archived by the owner on Mar 4, 2024. It is now read-only.

Commit

Permalink
Merge pull request #286 from MathieuBordere/async-ss
Browse files Browse the repository at this point in the history
Async snapshots
  • Loading branch information
stgraber authored Jul 5, 2022
2 parents 118b450 + b98e31b commit 05a49e0
Show file tree
Hide file tree
Showing 16 changed files with 639 additions and 35 deletions.
9 changes: 6 additions & 3 deletions Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -105,14 +105,15 @@ test_integration_core_SOURCES = \
test/integration/test_election.c \
test/integration/test_fixture.c \
test/integration/test_heap.c \
test/integration/test_init.c \
test/integration/test_membership.c \
test/integration/test_recover.c \
test/integration/test_replication.c \
test/integration/test_snapshot.c \
test/integration/test_start.c \
test/integration/test_strerror.c \
test/integration/test_tick.c \
test/integration/test_transfer.c \
test/integration/test_start.c
test/integration/test_transfer.c
test_integration_core_CFLAGS = $(AM_CFLAGS) -Wno-conversion
test_integration_core_LDFLAGS = -no-install
test_integration_core_LDADD = libtest.la libraft.la
Expand Down Expand Up @@ -150,6 +151,7 @@ libraft_la_SOURCES += \
src/uv_tcp_listen.c \
src/uv_tcp_connect.c \
src/uv_truncate.c \
src/uv_work.c \
src/uv_writer.c
libraft_la_LDFLAGS += $(UV_LIBS)

Expand Down Expand Up @@ -198,7 +200,8 @@ test_integration_uv_SOURCES = \
test/integration/test_uv_tcp_connect.c \
test/integration/test_uv_tcp_listen.c \
test/integration/test_uv_snapshot_put.c \
test/integration/test_uv_truncate.c
test/integration/test_uv_truncate.c \
test/integration/test_uv_work.c
test_integration_uv_CFLAGS = $(AM_CFLAGS) -Wno-type-limits -Wno-conversion
test_integration_uv_LDFLAGS = -no-install $(UV_LIBS)
test_integration_uv_LDADD = libtest.la
Expand Down
38 changes: 36 additions & 2 deletions include/raft.h
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,20 @@ struct raft_io_snapshot_get
raft_io_snapshot_get_cb cb; /* Request callback */
};

/**
* Asynchronous work request.
*/
struct raft_io_async_work;
typedef int (*raft_io_async_work_fn)(struct raft_io_async_work *req);
typedef void (*raft_io_async_work_cb)(struct raft_io_async_work *req,
int status);
struct raft_io_async_work
{
void *data; /* User data */
raft_io_async_work_fn work; /* Function to run async from the main loop */
raft_io_async_work_cb cb; /* Request callback */
};

/**
* Customizable tracer, for debugging purposes.
*/
Expand Down Expand Up @@ -458,7 +472,7 @@ typedef void (*raft_io_close_cb)(struct raft_io *io);

struct raft_io
{
int version;
int version; /* 1 or 2 */
void *data;
void *impl;
char errmsg[RAFT_ERRMSG_BUF_SIZE];
Expand Down Expand Up @@ -499,6 +513,10 @@ struct raft_io
raft_io_snapshot_get_cb cb);
raft_time (*time)(struct raft_io *io);
int (*random)(struct raft_io *io, int min, int max);
/* Field(s) below added since version 2. */
int (*async_work)(struct raft_io *io,
struct raft_io_async_work *req,
raft_io_async_work_cb cb);
};

/*
Expand All @@ -511,10 +529,22 @@ struct raft_io
* `snapshot_finalize` can be used to e.g. release a lock that was taken during
* a call to `snapshot`. Until `snapshot_finalize` is called, raft can access
* the data contained in the `raft_buffer`s.
*
* version 3:
* Adds support for async snapshots through the `snapshot_async` function.
* When this method is provided, raft will call `snapshot` in the main loop,
* and when successful, will call `snapshot_async` by means of the `io->async_work` method.
* The callback to `io->async_work` will in turn call `snapshot_finalize`
* in the main loop when the work has completed independent of the return value
* of `snapshot_async`.
* An implementation that does not use asynchronous snapshots MUST set
* `snapshot_async` to NULL.
* All memory allocated by the snapshot routines MUST be freed by the snapshot
* routines themselves.
*/
struct raft_fsm
{
int version;
int version; /* 1, 2 or 3 */
void *data;
int (*apply)(struct raft_fsm *fsm,
const struct raft_buffer *buf,
Expand All @@ -527,6 +557,10 @@ struct raft_fsm
int (*snapshot_finalize)(struct raft_fsm *fsm,
struct raft_buffer *bufs[],
unsigned *n_bufs);
/* Fields below added since version 3. */
int (*snapshot_async)(struct raft_fsm *fsm,
struct raft_buffer *bufs[],
unsigned *n_bufs);
};

/**
Expand Down
3 changes: 2 additions & 1 deletion include/raft/fixture.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
enum {
RAFT_FIXTURE_TICK = 1, /* The tick callback has been invoked */
RAFT_FIXTURE_NETWORK, /* A network request has been sent or received */
RAFT_FIXTURE_DISK /* An I/O request has been submitted */
RAFT_FIXTURE_DISK, /* An I/O request has been submitted */
RAFT_FIXTURE_WORK /* A large, CPU and/or memory intensive task */
};

/**
Expand Down
52 changes: 51 additions & 1 deletion src/fixture.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#define ELECTION_TIMEOUT 1000
#define NETWORK_LATENCY 15
#define DISK_LATENCY 10
#define WORK_DURATION 200

/* To keep in sync with raft.h */
#define N_MESSAGE_TYPES 6
Expand All @@ -36,7 +37,7 @@
queue queue /* Link the I/O pending requests queue. */

/* Request type codes. */
enum { APPEND = 1, SEND, TRANSMIT, SNAPSHOT_PUT, SNAPSHOT_GET };
enum { APPEND = 1, SEND, TRANSMIT, SNAPSHOT_PUT, SNAPSHOT_GET, ASYNC_WORK };

/* Abstract base type for an asynchronous request submitted to the stub I/o
* implementation. */
Expand Down Expand Up @@ -72,6 +73,13 @@ struct snapshot_put
const struct raft_snapshot *snapshot;
};

/* Pending request to perform general work. */
struct async_work
{
REQUEST;
struct raft_io_async_work *req;
};

/* Pending request to load a snapshot. */
struct snapshot_get
{
Expand Down Expand Up @@ -131,6 +139,7 @@ struct io
unsigned randomized_election_timeout; /* Value returned by io->random() */
unsigned network_latency; /* Milliseconds to deliver RPCs */
unsigned disk_latency; /* Milliseconds to perform disk I/O */
unsigned work_duration; /* Milliseconds to long running work */

struct
{
Expand Down Expand Up @@ -277,6 +286,16 @@ static void ioFlushSnapshotGet(struct io *s, struct snapshot_get *r)
raft_free(r);
}

/* Flush an async work request */
static void ioFlushAsyncWork(struct io *s, struct async_work *r)
{
(void) s;
int rv;
rv = r->req->work(r->req);
r->req->cb(r->req, rv);
raft_free(r);
}

/* Search for the peer with the given ID. */
static struct peer *ioGetPeer(struct io *io, raft_id id)
{
Expand Down Expand Up @@ -410,6 +429,9 @@ static void ioFlushAll(struct io *io)
case SNAPSHOT_GET:
ioFlushSnapshotGet(io, (struct snapshot_get *)r);
break;
case ASYNC_WORK:
ioFlushAsyncWork(io, (struct async_work *)r);
break;
default:
assert(0);
}
Expand Down Expand Up @@ -646,6 +668,28 @@ static int ioMethodSnapshotPut(struct raft_io *raft_io,
return 0;
}

static int ioMethodAsyncWork(struct raft_io *raft_io,
struct raft_io_async_work *req,
raft_io_async_work_cb cb)
{
struct io *io = raft_io->impl;
struct raft *raft = raft_io->data;
struct async_work *r;

r = raft_malloc(sizeof *r);
assert(r != NULL);

r->type = ASYNC_WORK;
r->req = req;
r->req->cb = cb;
r->req->data = raft;
r->completion_time = *io->time + io->work_duration;

QUEUE_PUSH(&io->requests, &r->queue);
return 0;
}


static int ioMethodSnapshotGet(struct raft_io *raft_io,
struct raft_io_snapshot_get *req,
raft_io_snapshot_get_cb cb)
Expand Down Expand Up @@ -848,6 +892,7 @@ static int ioInit(struct raft_io *raft_io, unsigned index, raft_time *time)
io->randomized_election_timeout = ELECTION_TIMEOUT + index * 100;
io->network_latency = NETWORK_LATENCY;
io->disk_latency = DISK_LATENCY;
io->work_duration = WORK_DURATION;
io->fault.countdown = -1;
io->fault.n = -1;
memset(io->drop, 0, sizeof io->drop);
Expand All @@ -868,6 +913,7 @@ static int ioInit(struct raft_io *raft_io, unsigned index, raft_time *time)
raft_io->truncate = ioMethodTruncate;
raft_io->send = ioMethodSend;
raft_io->snapshot_put = ioMethodSnapshotPut;
raft_io->async_work = ioMethodAsyncWork;
raft_io->snapshot_get = ioMethodSnapshotGet;
raft_io->time = ioMethodTime;
raft_io->random = ioMethodRandom;
Expand Down Expand Up @@ -1378,6 +1424,10 @@ static void completeRequest(struct raft_fixture *f, unsigned i, raft_time t)
ioFlushSnapshotGet(io, (struct snapshot_get *)r);
f->event.type = RAFT_FIXTURE_DISK;
break;
case ASYNC_WORK:
ioFlushAsyncWork(io, (struct async_work *)r);
f->event.type = RAFT_FIXTURE_WORK;
break;
default:
assert(0);
}
Expand Down
25 changes: 25 additions & 0 deletions src/raft.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@
#define DEFAULT_MAX_CATCH_UP_ROUNDS 10
#define DEFAULT_MAX_CATCH_UP_ROUND_DURATION (5 * 1000)

static int ioFsmCompat(struct raft *r,
struct raft_io *io,
struct raft_fsm *fsm);

int raft_init(struct raft *r,
struct raft_io *io,
struct raft_fsm *fsm,
Expand All @@ -32,6 +36,12 @@ int raft_init(struct raft *r,
{
int rv;
assert(r != NULL);

rv = ioFsmCompat(r, io, fsm);
if (rv != 0) {
goto err;
}

r->io = io;
r->io->data = r;
r->fsm = fsm;
Expand Down Expand Up @@ -227,3 +237,18 @@ unsigned long long raft_digest(const char *text, unsigned long long n)

return byteFlip64(digest);
}

static int ioFsmCompat(struct raft *r,
struct raft_io *io,
struct raft_fsm *fsm)
{
if ((fsm->version > 2 && fsm->snapshot_async != NULL)
&& ((io->version < 2) || (io->async_work == NULL))) {
ErrMsgPrintf(r->errmsg,
"async snapshot requires io->version > 1 and async_work method.");
return -1;
}

return 0;
}

79 changes: 69 additions & 10 deletions src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -1455,12 +1455,58 @@ static void takeSnapshotCb(struct raft_io_snapshot_put *req, int status)
}

logSnapshot(&r->log, snapshot->index, r->snapshot.trailing);

out:
takeSnapshotClose(r, &r->snapshot.pending);
takeSnapshotClose(r, snapshot);
r->snapshot.pending.term = 0;
}

static int putSnapshot(struct raft *r, struct raft_snapshot *snapshot,
raft_io_snapshot_put_cb cb)
{
int rv;
assert(r->snapshot.put.data == NULL);
r->snapshot.put.data = r;
rv = r->io->snapshot_put(r->io, r->snapshot.trailing, &r->snapshot.put,
snapshot, cb);
if (rv != 0) {
takeSnapshotClose(r, snapshot);
r->snapshot.pending.term = 0;
r->snapshot.put.data = NULL;
}

return rv;
}

static void takeSnapshotDoneCb(struct raft_io_async_work *take,
int status) {
struct raft *r = take->data;
struct raft_snapshot *snapshot = &r->snapshot.pending;
int rv;

raft_free(take);

if (status != 0) {
tracef("take snapshot failed %s", raft_strerror(status));
takeSnapshotClose(r, snapshot);
r->snapshot.pending.term = 0;
r->snapshot.put.data = NULL;
return;
}

rv = putSnapshot(r, snapshot, takeSnapshotCb);
if (rv != 0) {
tracef("put snapshot failed %d", rv);
}
}

static int takeSnapshotAsync(struct raft_io_async_work *take)
{
struct raft *r = take->data;
tracef("take snapshot async at %lld", r->snapshot.pending.index);
struct raft_snapshot *snapshot = &r->snapshot.pending;
return r->fsm->snapshot_async(r->fsm, &snapshot->bufs, &snapshot->n_bufs);
}

static int takeSnapshot(struct raft *r)
{
struct raft_snapshot *snapshot;
Expand All @@ -1471,12 +1517,13 @@ static int takeSnapshot(struct raft *r)
snapshot = &r->snapshot.pending;
snapshot->index = r->last_applied;
snapshot->term = logTermOf(&r->log, r->last_applied);
snapshot->bufs = NULL;
snapshot->n_bufs = 0;

rv = configurationCopy(&r->configuration, &snapshot->configuration);
if (rv != 0) {
goto abort;
}

snapshot->configuration_index = r->configuration_index;

rv = r->fsm->snapshot(r->fsm, &snapshot->bufs, &snapshot->n_bufs);
Expand All @@ -1489,17 +1536,29 @@ static int takeSnapshot(struct raft *r)
goto abort;
}

assert(r->snapshot.put.data == NULL);
r->snapshot.put.data = r;
rv = r->io->snapshot_put(r->io, r->snapshot.trailing, &r->snapshot.put,
snapshot, takeSnapshotCb);
if (rv != 0) {
goto abort_after_fsm_snapshot;
bool sync_snapshot = r->fsm->version < 3 || r->fsm->snapshot_async == NULL;
if (sync_snapshot) {
/* putSnapshot will clean up config and buffers in case of error */
return putSnapshot(r, snapshot, takeSnapshotCb);
} else {
struct raft_io_async_work *take = raft_malloc(sizeof(*take));
if (take == NULL) {
rv = RAFT_NOMEM;
goto abort_after_snapshot;
}
take->data = r;
take->work = takeSnapshotAsync;
rv = r->io->async_work(r->io, take, takeSnapshotDoneCb);
if (rv != 0) {
raft_free(take);
goto abort_after_snapshot;
}
}

return 0;

abort_after_fsm_snapshot:
abort_after_snapshot:
/* Closes config and finalizes snapshot */
takeSnapshotClose(r, snapshot);
abort:
r->snapshot.pending.term = 0;
Expand Down
Loading

0 comments on commit 05a49e0

Please sign in to comment.