Skip to content
This repository has been archived by the owner on Mar 4, 2024. It is now read-only.

Commit

Permalink
Revert "Async snapshot"
Browse files Browse the repository at this point in the history
  • Loading branch information
MathieuBordere authored Mar 24, 2022
1 parent b37dacd commit 569c323
Show file tree
Hide file tree
Showing 18 changed files with 68 additions and 693 deletions.
6 changes: 2 additions & 4 deletions Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ raftinclude_HEADERS =

lib_LTLIBRARIES = libraft.la
libraft_la_CFLAGS = $(AM_CFLAGS) -fvisibility=hidden
libraft_la_LDFLAGS = -version-info 1:0:0
libraft_la_LDFLAGS = -version-info 0:7:0
libraft_la_SOURCES = \
src/byte.c \
src/client.c \
Expand Down Expand Up @@ -150,7 +150,6 @@ libraft_la_SOURCES += \
src/uv_tcp_listen.c \
src/uv_tcp_connect.c \
src/uv_truncate.c \
src/uv_work.c \
src/uv_writer.c
libraft_la_LDFLAGS += $(UV_LIBS)

Expand Down Expand Up @@ -199,8 +198,7 @@ test_integration_uv_SOURCES = \
test/integration/test_uv_tcp_connect.c \
test/integration/test_uv_tcp_listen.c \
test/integration/test_uv_snapshot_put.c \
test/integration/test_uv_truncate.c \
test/integration/test_uv_work.c
test/integration/test_uv_truncate.c
test_integration_uv_CFLAGS = $(AM_CFLAGS) -Wno-type-limits -Wno-conversion
test_integration_uv_LDFLAGS = -no-install $(UV_LIBS)
test_integration_uv_LDADD = libtest.la
Expand Down
2 changes: 1 addition & 1 deletion configure.ac
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
AC_PREREQ(2.60)
AC_INIT([raft], [0.12.0])
AC_INIT([raft], [0.11.2])
AC_LANG([C])
AC_CONFIG_MACRO_DIR([m4])
AC_CONFIG_AUX_DIR([ac])
Expand Down
57 changes: 1 addition & 56 deletions include/raft.h
Original file line number Diff line number Diff line change
Expand Up @@ -417,20 +417,6 @@ struct raft_io_snapshot_get
raft_io_snapshot_get_cb cb; /* Request callback */
};

/**
* Asynchronous work request.
*/
struct raft_io_async_work;
typedef int (*raft_io_async_work_fn)(struct raft_io_async_work *req);
typedef void (*raft_io_async_work_cb)(struct raft_io_async_work *req,
int status);
struct raft_io_async_work
{
void *data; /* User data */
raft_io_async_work_fn work; /* Function to run async from the main loop */
raft_io_async_work_cb cb; /* Request callback */
};

/**
* Customizable tracer, for debugging purposes.
*/
Expand Down Expand Up @@ -513,46 +499,11 @@ struct raft_io
raft_io_snapshot_get_cb cb);
raft_time (*time)(struct raft_io *io);
int (*random)(struct raft_io *io, int min, int max);
int (*async_work)(struct raft_io *io,
struct raft_io_async_work *req,
raft_io_async_work_cb cb);
};

/*
* version 1:
* struct raft_fsm
* {
* int version;
* void *data;
* int (*apply)(struct raft_fsm *fsm,
* const struct raft_buffer *buf,
* void **result);
* int (*snapshot)(struct raft_fsm *fsm,
* struct raft_buffer *bufs[],
* unsigned *n_bufs);
* int (*restore)(struct raft_fsm *fsm, struct raft_buffer *buf);
* };
*
* version 2:
* Adds support for async snapshots through the `snapshot_async`
* and `snapshot_finalize` functions.
* An implementation must provide either the `snapshot` method or all 3
* snapshot methods. When only the `snapshot` method is provided, the behavior
* is identical to version 1, `snapshot` will be called in the main loop and the
* allocated buffers will be released by raft.
* When all 3 methods are provided, raft will call `snapshot` in the main loop,
* and when successful, will call `snapshot_async` by means of the `io->async_work` method.
* The callback to `io->async_work` will in turn call `snapshot_finalize`
* in the main loop when the work has completed independent of the return value
* of `snapshot_async`.
* An implementation that does not use asynchronous snapshots MUST set
* `snapshot_async` and `snapshot_finalize` to NULL.
* All memory allocated by the snapshot routines MUST be freed by the snapshot
* routines themselves.
*/
struct raft_fsm
{
int version; /* 1 or 2 */
int version;
void *data;
int (*apply)(struct raft_fsm *fsm,
const struct raft_buffer *buf,
Expand All @@ -561,12 +512,6 @@ struct raft_fsm
struct raft_buffer *bufs[],
unsigned *n_bufs);
int (*restore)(struct raft_fsm *fsm, struct raft_buffer *buf);
int (*snapshot_async)(struct raft_fsm *fsm,
struct raft_buffer *bufs[],
unsigned *n_bufs);
int (*snapshot_finalize)(struct raft_fsm *fsm,
struct raft_buffer *bufs[],
unsigned *n_bufs);
};

/**
Expand Down
3 changes: 1 addition & 2 deletions include/raft/fixture.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@
enum {
RAFT_FIXTURE_TICK = 1, /* The tick callback has been invoked */
RAFT_FIXTURE_NETWORK, /* A network request has been sent or received */
RAFT_FIXTURE_DISK, /* An I/O request has been submitted */
RAFT_FIXTURE_WORK /* A large, CPU and/or memory intensive task */
RAFT_FIXTURE_DISK /* An I/O request has been submitted */
};

/**
Expand Down
52 changes: 1 addition & 51 deletions src/fixture.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
#define ELECTION_TIMEOUT 1000
#define NETWORK_LATENCY 15
#define DISK_LATENCY 10
#define WORK_DURATION 200

/* To keep in sync with raft.h */
#define N_MESSAGE_TYPES 6
Expand All @@ -37,7 +36,7 @@
queue queue /* Link the I/O pending requests queue. */

/* Request type codes. */
enum { APPEND = 1, SEND, TRANSMIT, SNAPSHOT_PUT, SNAPSHOT_GET, ASYNC_WORK };
enum { APPEND = 1, SEND, TRANSMIT, SNAPSHOT_PUT, SNAPSHOT_GET };

/* Abstract base type for an asynchronous request submitted to the stub I/o
* implementation. */
Expand Down Expand Up @@ -73,13 +72,6 @@ struct snapshot_put
const struct raft_snapshot *snapshot;
};

/* Pending request to perform general work. */
struct async_work
{
REQUEST;
struct raft_io_async_work *req;
};

/* Pending request to load a snapshot. */
struct snapshot_get
{
Expand Down Expand Up @@ -139,7 +131,6 @@ struct io
unsigned randomized_election_timeout; /* Value returned by io->random() */
unsigned network_latency; /* Milliseconds to deliver RPCs */
unsigned disk_latency; /* Milliseconds to perform disk I/O */
unsigned work_duration; /* Milliseconds to long running work */

struct
{
Expand Down Expand Up @@ -286,16 +277,6 @@ static void ioFlushSnapshotGet(struct io *s, struct snapshot_get *r)
raft_free(r);
}

/* Flush an async work request */
static void ioFlushAsyncWork(struct io *s, struct async_work *r)
{
(void) s;
int rv;
rv = r->req->work(r->req);
r->req->cb(r->req, rv);
raft_free(r);
}

/* Search for the peer with the given ID. */
static struct peer *ioGetPeer(struct io *io, raft_id id)
{
Expand Down Expand Up @@ -429,9 +410,6 @@ static void ioFlushAll(struct io *io)
case SNAPSHOT_GET:
ioFlushSnapshotGet(io, (struct snapshot_get *)r);
break;
case ASYNC_WORK:
ioFlushAsyncWork(io, (struct async_work *)r);
break;
default:
assert(0);
}
Expand Down Expand Up @@ -668,28 +646,6 @@ static int ioMethodSnapshotPut(struct raft_io *raft_io,
return 0;
}

static int ioMethodAsyncWork(struct raft_io *raft_io,
struct raft_io_async_work *req,
raft_io_async_work_cb cb)
{
struct io *io = raft_io->impl;
struct raft *raft = raft_io->data;
struct async_work *r;

r = raft_malloc(sizeof *r);
assert(r != NULL);

r->type = ASYNC_WORK;
r->req = req;
r->req->cb = cb;
r->req->data = raft;
r->completion_time = *io->time + io->work_duration;

QUEUE_PUSH(&io->requests, &r->queue);
return 0;
}


static int ioMethodSnapshotGet(struct raft_io *raft_io,
struct raft_io_snapshot_get *req,
raft_io_snapshot_get_cb cb)
Expand Down Expand Up @@ -892,7 +848,6 @@ static int ioInit(struct raft_io *raft_io, unsigned index, raft_time *time)
io->randomized_election_timeout = ELECTION_TIMEOUT + index * 100;
io->network_latency = NETWORK_LATENCY;
io->disk_latency = DISK_LATENCY;
io->work_duration = WORK_DURATION;
io->fault.countdown = -1;
io->fault.n = -1;
memset(io->drop, 0, sizeof io->drop);
Expand All @@ -913,7 +868,6 @@ static int ioInit(struct raft_io *raft_io, unsigned index, raft_time *time)
raft_io->truncate = ioMethodTruncate;
raft_io->send = ioMethodSend;
raft_io->snapshot_put = ioMethodSnapshotPut;
raft_io->async_work = ioMethodAsyncWork;
raft_io->snapshot_get = ioMethodSnapshotGet;
raft_io->time = ioMethodTime;
raft_io->random = ioMethodRandom;
Expand Down Expand Up @@ -1413,10 +1367,6 @@ static void completeRequest(struct raft_fixture *f, unsigned i, raft_time t)
ioFlushSnapshotGet(io, (struct snapshot_get *)r);
f->event.type = RAFT_FIXTURE_DISK;
break;
case ASYNC_WORK:
ioFlushAsyncWork(io, (struct async_work *)r);
f->event.type = RAFT_FIXTURE_WORK;
break;
default:
assert(0);
}
Expand Down
Loading

0 comments on commit 569c323

Please sign in to comment.