Skip to content
This repository has been archived by the owner on Mar 4, 2024. It is now read-only.

Commit

Permalink
Merge pull request #277 from MathieuBordere/startup-recovery
Browse files Browse the repository at this point in the history
Startup recovery
  • Loading branch information
stgraber authored Jun 10, 2022
2 parents ca47af7 + 846b0bb commit 78b7937
Show file tree
Hide file tree
Showing 8 changed files with 769 additions and 45 deletions.
6 changes: 6 additions & 0 deletions include/raft/uv.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,12 @@ RAFT_API void raft_uv_set_connect_retry_delay(struct raft_io *io, unsigned msecs
RAFT_API void raft_uv_set_tracer(struct raft_io *io,
struct raft_tracer *tracer);

/**
* Enable or disable auto-recovery on startup. Default enabled.
*/
RAFT_API void raft_uv_set_auto_recovery(struct raft_io *io,
bool flag);

/**
* Callback invoked by the transport implementation when a new incoming
* connection has been established.
Expand Down
30 changes: 22 additions & 8 deletions src/uv.c
Original file line number Diff line number Diff line change
Expand Up @@ -330,12 +330,14 @@ static int uvFilterSegments(struct uv *uv,
}

/* Load the last snapshot (if any) and all entries contained in all segment
* files of the data directory. */
* files of the data directory. This function can be called recursively, `depth`
* is there to ensure we don't get stuck in a recursive loop. */
static int uvLoadSnapshotAndEntries(struct uv *uv,
struct raft_snapshot **snapshot,
raft_index *start_index,
struct raft_entry *entries[],
size_t *n)
size_t *n,
int depth)
{
struct uvSnapshotInfo *snapshots;
struct uvSegmentInfo *segments;
Expand Down Expand Up @@ -441,6 +443,13 @@ static int uvLoadSnapshotAndEntries(struct uv *uv,
*entries = NULL;
*n = 0;
}
/* Try to recover exactly once when corruption is detected, the first pass
* might have cleaned up corrupt data. Most of the arguments are already
* reset after the `err` label, except for `start_index`. */
if (rv == RAFT_CORRUPT && uv->auto_recovery && depth == 0) {
*start_index = 1;
return uvLoadSnapshotAndEntries(uv, snapshot, start_index, entries, n, depth + 1);
}
return rv;
}

Expand All @@ -454,7 +463,6 @@ static int uvLoad(struct raft_io *io,
size_t *n_entries)
{
struct uv *uv;
raft_index last_index;
int rv;
uv = io->impl;

Expand All @@ -463,7 +471,7 @@ static int uvLoad(struct raft_io *io,
*snapshot = NULL;

rv =
uvLoadSnapshotAndEntries(uv, snapshot, start_index, entries, n_entries);
uvLoadSnapshotAndEntries(uv, snapshot, start_index, entries, n_entries, 0);
if (rv != 0) {
return rv;
}
Expand All @@ -472,10 +480,8 @@ static int uvLoad(struct raft_io *io,
tracef("no snapshot");
}

last_index = *start_index + *n_entries - 1;

/* Set the index of the next entry that will be appended. */
uv->append_next_index = last_index + 1;
uv->append_next_index = *start_index + *n_entries;

return 0;
}
Expand Down Expand Up @@ -554,7 +560,7 @@ static int uvRecover(struct raft_io *io, const struct raft_configuration *conf)

/* Load the current state. This also closes any leftover open segment. */
rv = uvLoadSnapshotAndEntries(uv, &snapshot, &start_index, &entries,
&n_entries);
&n_entries, 0);
if (rv != 0) {
return rv;
}
Expand Down Expand Up @@ -672,6 +678,7 @@ int raft_uv_init(struct raft_io *io,
QUEUE_INIT(&uv->aborting);
uv->closing = false;
uv->close_cb = NULL;
uv->auto_recovery = true;

/* Set the raft_io implementation. */
io->version = 1; /* future-proof'ing */
Expand Down Expand Up @@ -750,4 +757,11 @@ void raft_uv_set_tracer(struct raft_io *io, struct raft_tracer *tracer)
uv->tracer = tracer;
}

void raft_uv_set_auto_recovery(struct raft_io *io, bool flag)
{
struct uv *uv;
uv = io->impl;
uv->auto_recovery = flag;
}

#undef tracef
1 change: 1 addition & 0 deletions src/uv.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ struct uv
queue aborting; /* Cleanups upon errors or shutdown */
bool closing; /* True if we are closing */
raft_io_close_cb close_cb; /* Invoked when finishing closing */
bool auto_recovery; /* Try to recover from corrupt segments */
};

/* Implementation of raft_io->truncate. */
Expand Down
27 changes: 27 additions & 0 deletions src/uv_fs.c
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,33 @@ int UvFsRemoveFile(const char *dir, const char *filename, char *errmsg)
return 0;
}

int UvFsRenameFile(const char *dir,
const char *filename1,
const char *filename2,
char *errmsg)
{
char path1[UV__PATH_SZ];
char path2[UV__PATH_SZ];
int rv;

rv = UvOsJoin(dir, filename1, path1);
if (rv != 0) {
return RAFT_INVALID;
}
rv = UvOsJoin(dir, filename2, path2);
if (rv != 0) {
return RAFT_INVALID;
}

rv = UvOsRename(path1, path2);
if (rv != 0) {
UvOsErrMsg(errmsg, "rename", rv);
return rv;
}

return 0;
}

int UvFsTruncateAndRenameFile(const char *dir,
size_t size,
const char *filename1,
Expand Down
6 changes: 6 additions & 0 deletions src/uv_fs.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,12 @@ int UvFsTruncateAndRenameFile(const char *dir,
const char *filename2,
char *errmsg);

/* Synchronously rename a file. */
int UvFsRenameFile(const char *dir,
const char *filename1,
const char *filename2,
char *errmsg);

/* Return information about the I/O capabilities of the underlying file
* system.
*
Expand Down
74 changes: 72 additions & 2 deletions src/uv_segment.c
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
#include <errno.h>
#include <inttypes.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
Expand Down Expand Up @@ -466,7 +468,7 @@ static bool uvContentHasOnlyTrailingZeros(const struct raft_buffer *buf,
}

/* Load all entries contained in an open segment. */
static int uvLoadOpenSegment(struct uv *uv,
static int uvSegmentLoadOpen(struct uv *uv,
struct uvSegmentInfo *info,
struct raft_entry *entries[],
size_t *n,
Expand Down Expand Up @@ -801,6 +803,68 @@ void uvSegmentBufferReset(struct uvSegmentBuffer *b, unsigned retain)
b->n = b->n % b->block_size;
}

/* When a corrupted segment is detected, the segment is renamed.
* Upon a restart, raft will not detect the segment anymore and will try
* to start without it.
* */
#define CORRUPT_FILE_FMT "corrupt-%"PRId64"-%s"
static void uvMoveCorruptSegment(struct uv *uv, struct uvSegmentInfo *info)
{
char errmsg[RAFT_ERRMSG_BUF_SIZE] = {0};
char new_filename[UV__FILENAME_LEN+1] = {0};
size_t sz = sizeof(new_filename);
int rv;

struct timespec ts = {0};
/* Ignore errors */
clock_gettime(CLOCK_REALTIME, &ts);
int64_t ns = ts.tv_sec * 1000000000 + ts.tv_nsec;
rv = snprintf(new_filename, sz, CORRUPT_FILE_FMT, ns, info->filename);
if (rv < 0 || rv >= (int)sz) {
tracef("snprintf %d", rv);
return;
}

UvFsRenameFile(uv->dir, info->filename, new_filename, errmsg);
if (rv != 0) {
tracef("%s", errmsg);
return;
}
}

/*
* On startup, raft will try to recover when a corrupt segment is detected.
*
* When a corrupt open segment is encountered, it, and all subsequent open segments,
* are renamed. Not renaming newer, possible non-corrupt, open segments could lead
* to loading inconsistent data.
*
* When a corrupt closed segment is encountered, it will be renamed when
* it is the last closed segment, in that case all open-segments are renamed too.
*/
static void uvRecoverFromCorruptSegment(struct uv *uv,
size_t i_corrupt,
struct uvSegmentInfo *infos,
size_t n_infos)
{
struct uvSegmentInfo *info = &infos[i_corrupt];
if (info->is_open) {
for (size_t i = i_corrupt; i < n_infos; ++i) {
info = &infos[i];
uvMoveCorruptSegment(uv, info);
}
} else {
size_t i_next = i_corrupt + 1;
/* last segment or last closed segment. */
if (i_next == n_infos || infos[i_next].is_open) {
for (size_t i = i_corrupt; i < n_infos; ++i) {
info = &infos[i];
uvMoveCorruptSegment(uv, info);
}
}
}
}

int uvSegmentLoadAll(struct uv *uv,
const raft_index start_index,
struct uvSegmentInfo *infos,
Expand Down Expand Up @@ -828,9 +892,12 @@ int uvSegmentLoadAll(struct uv *uv,
tracef("load segment %s", info->filename);

if (info->is_open) {
rv = uvLoadOpenSegment(uv, info, entries, n_entries, &next_index);
rv = uvSegmentLoadOpen(uv, info, entries, n_entries, &next_index);
ErrMsgWrapf(uv->io->errmsg, "load open segment %s", info->filename);
if (rv != 0) {
if (rv == RAFT_CORRUPT && uv->auto_recovery) {
uvRecoverFromCorruptSegment(uv, i, infos, n_infos);
}
goto err;
}
} else {
Expand All @@ -852,6 +919,9 @@ int uvSegmentLoadAll(struct uv *uv,
if (rv != 0) {
ErrMsgWrapf(uv->io->errmsg, "load closed segment %s",
info->filename);
if (rv == RAFT_CORRUPT && uv->auto_recovery) {
uvRecoverFromCorruptSegment(uv, i, infos, n_infos);
}
goto err;
}

Expand Down
Loading

0 comments on commit 78b7937

Please sign in to comment.