Skip to content
This repository has been archived by the owner on Mar 4, 2024. It is now read-only.

Commit

Permalink
Add a test for logReinstate
Browse files Browse the repository at this point in the history
Signed-off-by: Cole Miller <[email protected]>
  • Loading branch information
cole-miller committed Nov 21, 2023
1 parent b2e715f commit 29ed0a2
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 8 deletions.
11 changes: 11 additions & 0 deletions include/raft/fixture.h
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,15 @@ RAFT_API void raft_fixture_set_disk_latency(struct raft_fixture *f,
unsigned i,
unsigned msecs);

/**
* Send the send latency in milliseconds. Each message send will take this many
* milliseconds before the send callback is invoked.
*/
RAFT_API void raft_fixture_set_send_latency(struct raft_fixture *f,
unsigned i,
unsigned j,
unsigned msecs);

/**
* Set the persisted term of the @i'th server.
*/
Expand Down Expand Up @@ -424,6 +433,8 @@ RAFT_API void raft_fixture_io_fault(struct raft_fixture *f,
int delay,
int repeat);

RAFT_API void raft_fixture_disk_write_fault(struct raft_fixture *f, unsigned i, int delay);

/**
* Return the number of messages of the given type that the @i'th server has
* successfully sent so far.
Expand Down
57 changes: 49 additions & 8 deletions src/fixture.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#define NETWORK_LATENCY 15
#define DISK_LATENCY 10
#define WORK_DURATION 200
#define SEND_LATENCY 0

/* To keep in sync with raft.h */
#define N_MESSAGE_TYPES 6
Expand Down Expand Up @@ -132,6 +133,7 @@ struct peer
struct io *io; /* The peer's I/O backend. */
bool connected; /* Whether a connection is established. */
bool saturated; /* Whether the established connection is saturated. */
unsigned send_latency;
};

/* Stub I/O implementation implementing all operations in-memory. */
Expand Down Expand Up @@ -176,6 +178,7 @@ struct io
int countdown; /* Trigger the fault when this counter gets to zero. */
int n; /* Repeat the fault this many times. Default is -1. */
} fault;
int disk_fault_countdown;

/* If flag i is true, messages of type i will be silently dropped. */
bool drop[N_MESSAGE_TYPES];
Expand Down Expand Up @@ -222,6 +225,15 @@ static bool ioFaultTick(struct io *io)
return false;
}

static bool diskFaultTick(struct io *io)
{
int countdown = io->disk_fault_countdown;
if (countdown >= 0) {
io->disk_fault_countdown -= 1;
}
return countdown == 0;
}

static int ioMethodInit(struct raft_io *raft_io,
raft_id id,
const char *address)
Expand Down Expand Up @@ -262,6 +274,11 @@ static void ioFlushAppend(struct io *s, struct append *append)
goto done;
}

if (diskFaultTick(s)) {
status = RAFT_IOERR;
goto done;
}

/* Allocate an array for the old entries plus the new ones. */
entries = raft_realloc(s->entries, (s->n + append->n) * sizeof *s->entries);
assert(entries != NULL);
Expand Down Expand Up @@ -753,11 +770,15 @@ static raft_time ioMethodTime(struct raft_io *raft_io)

static int ioMethodRandom(struct raft_io *raft_io, int min, int max)
{
struct io *io;
(void)min;
(void)max;
io = raft_io->impl;
return (int)io->randomized_election_timeout;
struct io *io = raft_io->impl;
int t = (int)io->randomized_election_timeout;
if (t < min) {
return min;

Check warning on line 776 in src/fixture.c

View check run for this annotation

Codecov / codecov/patch

src/fixture.c#L776

Added line #L776 was not covered by tests
} else if (t > max) {
return max;
} else {
return t;
}
}

/* Queue up a request which will be processed later, when io_stub_flush()
Expand All @@ -769,6 +790,7 @@ static int ioMethodSend(struct raft_io *raft_io,
{
struct io *io = raft_io->impl;
struct send *r;
struct peer *peer;

if (ioFaultTick(io)) {
return RAFT_IOERR;
Expand All @@ -785,9 +807,8 @@ static int ioMethodSend(struct raft_io *raft_io,
r->message = *message;
r->req->cb = cb;

/* TODO: simulate the presence of an OS send buffer, whose available size
* might delay the completion of send requests */
r->completion_time = *io->time;
peer = ioGetPeer(io, message->server_id);
r->completion_time = *io->time + peer->send_latency;

QUEUE_PUSH(&io->requests, &r->queue);

Expand Down Expand Up @@ -841,6 +862,7 @@ static void ioConnect(struct raft_io *raft_io, struct raft_io *other)
io->peers[io->n_peers].io = io_other;
io->peers[io->n_peers].connected = true;
io->peers[io->n_peers].saturated = false;
io->peers[io->n_peers].send_latency = SEND_LATENCY;
io->n_peers++;
}

Expand Down Expand Up @@ -930,6 +952,7 @@ static int ioInit(struct raft_io *raft_io, unsigned index, raft_time *time)
io->work_duration = WORK_DURATION;
io->fault.countdown = -1;
io->fault.n = -1;
io->disk_fault_countdown = -1;
memset(io->drop, 0, sizeof io->drop);
memset(io->n_send, 0, sizeof io->n_send);
memset(io->n_recv, 0, sizeof io->n_recv);
Expand Down Expand Up @@ -1940,6 +1963,16 @@ void raft_fixture_set_disk_latency(struct raft_fixture *f,
io->disk_latency = msecs;
}

void raft_fixture_set_send_latency(struct raft_fixture *f,
unsigned i,
unsigned j,
unsigned msecs)
{
struct io *io = f->servers[i]->io.impl;
struct peer *peer = ioGetPeer(io, f->servers[j]->id);
peer->send_latency = msecs;
}

void raft_fixture_set_term(struct raft_fixture *f, unsigned i, raft_term term)
{
struct io *io = f->servers[i]->io.impl;
Expand Down Expand Up @@ -1977,6 +2010,14 @@ void raft_fixture_io_fault(struct raft_fixture *f,
io->fault.n = repeat;
}

void raft_fixture_disk_write_fault(struct raft_fixture *f,
unsigned i,
int delay)
{
struct io *io = f->servers[i]->io.impl;
io->disk_fault_countdown = delay;
}

unsigned raft_fixture_n_send(struct raft_fixture *f, unsigned i, int type)
{
struct io *io = f->servers[i]->io.impl;
Expand Down
45 changes: 45 additions & 0 deletions test/integration/test_replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -1234,3 +1234,48 @@ TEST(replication, failPersistBarrierFollower, setUp, tearDown, 0, NULL)

return MUNIT_OK;
}

/* A leader originates a log entry, fails to persist it, and steps down.
* A follower that received the entry wins the ensuing election and sends
* the same entry back to the original leader, while the original leader
* still has an outgoing pending message that references its copy of the
* entry. This triggers the original leader to reinstate the entry in its
* log. */
TEST(replication, receiveSameWithPendingSend, setUp, tearDown, 0, NULL)
{
struct fixture *f = data;
struct raft_apply req;

/* Three voters. */
CLUSTER_GROW;
/* Server 0 is the leader. */
BOOTSTRAP_START_AND_ELECT;

/* Server 1 never gets the entry. */
raft_fixture_set_send_latency(&f->cluster, 0, 1, 10000);

/* Disk write fails, but not before the entry gets to server 2. */
CLUSTER_SET_DISK_LATENCY(0, 1000);
raft_fixture_disk_write_fault(&f->cluster, 0, 0);
req.data = (void *)(intptr_t)RAFT_IOERR;
CLUSTER_APPLY_ADD_X(0, &req, 1, NULL);
/* Server 0 steps down. */
CLUSTER_STEP_UNTIL_STATE_IS(0, RAFT_FOLLOWER, 1500);
munit_assert_ullong(CLUSTER_RAFT(0)->current_term, ==, 2);
ASSERT_FOLLOWER(1);
ASSERT_FOLLOWER(2);
/* Only server 2 has the new entry. */
munit_assert_ullong(CLUSTER_RAFT(0)->last_stored, ==, 2);
munit_assert_ullong(CLUSTER_RAFT(1)->last_stored, ==, 2);
munit_assert_ullong(CLUSTER_RAFT(2)->last_stored, ==, 3);

/* Server 2 times out first and wins the election. */
raft_set_election_timeout(CLUSTER_RAFT(2), 500);
raft_fixture_start_elect(&f->cluster, 2);
CLUSTER_STEP_UNTIL_STATE_IS(2, RAFT_LEADER, 1000);
munit_assert_ullong(CLUSTER_RAFT(2)->current_term, ==, 3);

/* Server 0 gets the same entry back from server 2. */
CLUSTER_STEP_UNTIL_APPLIED(2, 3, 1000);
return MUNIT_OK;
}

0 comments on commit 29ed0a2

Please sign in to comment.