Skip to content
This repository has been archived by the owner on Mar 4, 2024. It is now read-only.

replication: Try to reinstate entries that have been truncated away #483

Merged
merged 6 commits into from
Nov 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 16 additions & 8 deletions include/raft/fixture.h
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,15 @@ RAFT_API void raft_fixture_set_disk_latency(struct raft_fixture *f,
unsigned i,
unsigned msecs);

/**
* Send the send latency in milliseconds. Each message send will take this many
* milliseconds before the send callback is invoked.
*/
RAFT_API void raft_fixture_set_send_latency(struct raft_fixture *f,
unsigned i,
unsigned j,
unsigned msecs);

/**
* Set the persisted term of the @i'th server.
*/
Expand All @@ -415,14 +424,13 @@ RAFT_API void raft_fixture_add_entry(struct raft_fixture *f,
unsigned i,
struct raft_entry *entry);

/**
* Inject an I/O failure that will be triggered on the @i'th server after @delay
* I/O requests and occur @repeat times.
*/
RAFT_API void raft_fixture_io_fault(struct raft_fixture *f,
unsigned i,
int delay,
int repeat);
RAFT_API void raft_fixture_append_fault(struct raft_fixture *f, unsigned i, int delay);

RAFT_API void raft_fixture_vote_fault(struct raft_fixture *f, unsigned i, int delay);

RAFT_API void raft_fixture_term_fault(struct raft_fixture *f, unsigned i, int delay);

RAFT_API void raft_fixture_send_fault(struct raft_fixture *f, unsigned i, int delay);

/**
* Return the number of messages of the given type that the @i'th server has
Expand Down
145 changes: 62 additions & 83 deletions src/fixture.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#define NETWORK_LATENCY 15
#define DISK_LATENCY 10
#define WORK_DURATION 200
#define SEND_LATENCY 0

/* To keep in sync with raft.h */
#define N_MESSAGE_TYPES 6
Expand Down Expand Up @@ -132,6 +133,7 @@
struct io *io; /* The peer's I/O backend. */
bool connected; /* Whether a connection is established. */
bool saturated; /* Whether the established connection is saturated. */
unsigned send_latency;
};

/* Stub I/O implementation implementing all operations in-memory. */
Expand Down Expand Up @@ -171,11 +173,10 @@
unsigned disk_latency; /* Milliseconds to perform disk I/O */
unsigned work_duration; /* Milliseconds to run async work */

struct
{
int countdown; /* Trigger the fault when this counter gets to zero. */
int n; /* Repeat the fault this many times. Default is -1. */
} fault;
int append_fault_countdown;
int vote_fault_countdown;
int term_fault_countdown;
int send_fault_countdown;

/* If flag i is true, messages of type i will be silently dropped. */
bool drop[N_MESSAGE_TYPES];
Expand All @@ -186,40 +187,13 @@
unsigned n_append;
};

/* Advance the fault counters and return @true if an error should occur. */
static bool ioFaultTick(struct io *io)
static bool faultTick(int *countdown)
{
/* If the countdown is negative, faults are disabled. */
if (io->fault.countdown < 0) {
return false;
bool trigger = *countdown == 0;
if (*countdown >= 0) {
*countdown -= 1;
}

/* If the countdown didn't reach zero, it's still not come the time to
* trigger faults. */
if (io->fault.countdown > 0) {
io->fault.countdown--;
return false;
}

assert(io->fault.countdown == 0);

/* If n is negative we keep triggering the fault forever. */
if (io->fault.n < 0) {
return true;
}

/* If n is positive we need to trigger the fault at least this time. */
if (io->fault.n > 0) {
io->fault.n--;
return true;
}

assert(io->fault.n == 0);

/* We reached 'n', let's disable faults. */
io->fault.countdown--;

return false;
return trigger;
}

static int ioMethodInit(struct raft_io *raft_io,
Expand All @@ -238,9 +212,6 @@
raft_io_recv_cb recv_cb)
{
struct io *io = raft_io->impl;
if (ioFaultTick(io)) {
return RAFT_IOERR;
}
io->tick_interval = msecs;
io->tick_cb = tick_cb;
io->recv_cb = recv_cb;
Expand All @@ -257,7 +228,7 @@
int status = 0;

/* Simulates a disk write failure. */
if (ioFaultTick(s)) {
if (faultTick(&s->append_fault_countdown)) {
status = RAFT_IOERR;
goto done;
}
Expand Down Expand Up @@ -408,7 +379,6 @@
break;
}

/* tracef("io: flush: %s", describeMessage(&send->message)); */
io->n_send[send->message.type]++;
status = 0;

Expand Down Expand Up @@ -496,10 +466,6 @@

s = io->impl;

if (ioFaultTick(s)) {
return RAFT_IOERR;
}

*term = s->term;
*voted_for = s->voted_for;
*start_index = 1;
Expand Down Expand Up @@ -532,10 +498,6 @@
struct raft_entry *entries;
int rv;

if (ioFaultTick(io)) {
return RAFT_IOERR;
}

if (io->term != 0) {
return RAFT_CANTBOOTSTRAP;
}
Expand Down Expand Up @@ -582,7 +544,7 @@
{
struct io *io = raft_io->impl;

if (ioFaultTick(io)) {
if (faultTick(&io->term_fault_countdown)) {
return RAFT_IOERR;
}

Expand All @@ -596,11 +558,10 @@
{
struct io *io = raft_io->impl;

if (ioFaultTick(io)) {
if (faultTick(&io->vote_fault_countdown)) {
return RAFT_IOERR;
}

/* tracef("io: set vote: %d %d", server_id, io->index); */
io->voted_for = server_id;

return 0;
Expand All @@ -615,10 +576,6 @@
struct io *io = raft_io->impl;
struct append *r;

if (ioFaultTick(io)) {
return RAFT_IOERR;
}

r = raft_malloc(sizeof *r);
assert(r != NULL);

Expand All @@ -640,10 +597,6 @@
struct io *io = raft_io->impl;
size_t n;

if (ioFaultTick(io)) {
return RAFT_IOERR;
}

n = (size_t)(index - 1); /* Number of entries left after truncation */

if (n > 0) {
Expand Down Expand Up @@ -753,11 +706,15 @@

static int ioMethodRandom(struct raft_io *raft_io, int min, int max)
{
struct io *io;
(void)min;
(void)max;
io = raft_io->impl;
return (int)io->randomized_election_timeout;
struct io *io = raft_io->impl;
int t = (int)io->randomized_election_timeout;
if (t < min) {
return min;

Check warning on line 712 in src/fixture.c

View check run for this annotation

Codecov / codecov/patch

src/fixture.c#L712

Added line #L712 was not covered by tests
} else if (t > max) {
return max;
} else {
return t;
}
}

/* Queue up a request which will be processed later, when io_stub_flush()
Expand All @@ -769,14 +726,12 @@
{
struct io *io = raft_io->impl;
struct send *r;
struct peer *peer;

if (ioFaultTick(io)) {
if (faultTick(&io->send_fault_countdown)) {
return RAFT_IOERR;
}

/* tracef("io: send: %s to server %d", describeMessage(message),
message->server_id); */

r = raft_malloc(sizeof *r);
assert(r != NULL);

Expand All @@ -785,9 +740,8 @@
r->message = *message;
r->req->cb = cb;

/* TODO: simulate the presence of an OS send buffer, whose available size
* might delay the completion of send requests */
r->completion_time = *io->time;
peer = ioGetPeer(io, message->server_id);
r->completion_time = *io->time + peer->send_latency;

QUEUE_PUSH(&io->requests, &r->queue);

Expand All @@ -796,8 +750,6 @@

static void ioReceive(struct io *io, struct raft_message *message)
{
/* tracef("io: recv: %s from server %d", describeMessage(message),
message->server_id); */
io->recv_cb(io->io, message);
io->n_recv[message->type]++;
}
Expand Down Expand Up @@ -841,6 +793,7 @@
io->peers[io->n_peers].io = io_other;
io->peers[io->n_peers].connected = true;
io->peers[io->n_peers].saturated = false;
io->peers[io->n_peers].send_latency = SEND_LATENCY;
io->n_peers++;
}

Expand Down Expand Up @@ -928,8 +881,10 @@
io->network_latency = NETWORK_LATENCY;
io->disk_latency = DISK_LATENCY;
io->work_duration = WORK_DURATION;
io->fault.countdown = -1;
io->fault.n = -1;
io->append_fault_countdown = -1;
io->vote_fault_countdown = -1;
io->term_fault_countdown = -1;
io->send_fault_countdown = -1;
memset(io->drop, 0, sizeof io->drop);
memset(io->n_send, 0, sizeof io->n_send);
memset(io->n_recv, 0, sizeof io->n_recv);
Expand Down Expand Up @@ -1940,6 +1895,16 @@
io->disk_latency = msecs;
}

void raft_fixture_set_send_latency(struct raft_fixture *f,
unsigned i,
unsigned j,
unsigned msecs)
{
struct io *io = f->servers[i]->io.impl;
struct peer *peer = ioGetPeer(io, f->servers[j]->id);
peer->send_latency = msecs;
}

void raft_fixture_set_term(struct raft_fixture *f, unsigned i, raft_term term)
{
struct io *io = f->servers[i]->io.impl;
Expand Down Expand Up @@ -1967,14 +1932,28 @@
io->n++;
}

void raft_fixture_io_fault(struct raft_fixture *f,
unsigned i,
int delay,
int repeat)
void raft_fixture_append_fault(struct raft_fixture *f, unsigned i, int delay)
{
struct io *io = f->servers[i]->io.impl;
io->append_fault_countdown = delay;
}

void raft_fixture_vote_fault(struct raft_fixture *f, unsigned i, int delay)
{
struct io *io = f->servers[i]->io.impl;
io->vote_fault_countdown = delay;
}

void raft_fixture_term_fault(struct raft_fixture *f, unsigned i, int delay)
{
struct io *io = f->servers[i]->io.impl;
io->term_fault_countdown = delay;
}

void raft_fixture_send_fault(struct raft_fixture *f, unsigned i, int delay)
{
struct io *io = f->servers[i]->io.impl;
io->fault.countdown = delay;
io->fault.n = repeat;
io->send_fault_countdown = delay;
}

unsigned raft_fixture_n_send(struct raft_fixture *f, unsigned i, int type)
Expand Down
Loading
Loading