From 29ed0a214f4ff28bac1261316d35059934ead823 Mon Sep 17 00:00:00 2001 From: Cole Miller Date: Tue, 21 Nov 2023 16:25:59 -0500 Subject: [PATCH] Add a test for logReinstate Signed-off-by: Cole Miller --- include/raft/fixture.h | 11 ++++++ src/fixture.c | 57 +++++++++++++++++++++++++---- test/integration/test_replication.c | 45 +++++++++++++++++++++++ 3 files changed, 105 insertions(+), 8 deletions(-) diff --git a/include/raft/fixture.h b/include/raft/fixture.h index 635a9c4b0..4e82e69c4 100644 --- a/include/raft/fixture.h +++ b/include/raft/fixture.h @@ -394,6 +394,15 @@ RAFT_API void raft_fixture_set_disk_latency(struct raft_fixture *f, unsigned i, unsigned msecs); +/** + * Send the send latency in milliseconds. Each message send will take this many + * milliseconds before the send callback is invoked. + */ +RAFT_API void raft_fixture_set_send_latency(struct raft_fixture *f, + unsigned i, + unsigned j, + unsigned msecs); + /** * Set the persisted term of the @i'th server. */ @@ -424,6 +433,8 @@ RAFT_API void raft_fixture_io_fault(struct raft_fixture *f, int delay, int repeat); +RAFT_API void raft_fixture_disk_write_fault(struct raft_fixture *f, unsigned i, int delay); + /** * Return the number of messages of the given type that the @i'th server has * successfully sent so far. diff --git a/src/fixture.c b/src/fixture.c index f1ad21dc4..a07c4b5e3 100644 --- a/src/fixture.c +++ b/src/fixture.c @@ -23,6 +23,7 @@ #define NETWORK_LATENCY 15 #define DISK_LATENCY 10 #define WORK_DURATION 200 +#define SEND_LATENCY 0 /* To keep in sync with raft.h */ #define N_MESSAGE_TYPES 6 @@ -132,6 +133,7 @@ struct peer struct io *io; /* The peer's I/O backend. */ bool connected; /* Whether a connection is established. */ bool saturated; /* Whether the established connection is saturated. */ + unsigned send_latency; }; /* Stub I/O implementation implementing all operations in-memory. */ @@ -176,6 +178,7 @@ struct io int countdown; /* Trigger the fault when this counter gets to zero. */ int n; /* Repeat the fault this many times. Default is -1. */ } fault; + int disk_fault_countdown; /* If flag i is true, messages of type i will be silently dropped. */ bool drop[N_MESSAGE_TYPES]; @@ -222,6 +225,15 @@ static bool ioFaultTick(struct io *io) return false; } +static bool diskFaultTick(struct io *io) +{ + int countdown = io->disk_fault_countdown; + if (countdown >= 0) { + io->disk_fault_countdown -= 1; + } + return countdown == 0; +} + static int ioMethodInit(struct raft_io *raft_io, raft_id id, const char *address) @@ -262,6 +274,11 @@ static void ioFlushAppend(struct io *s, struct append *append) goto done; } + if (diskFaultTick(s)) { + status = RAFT_IOERR; + goto done; + } + /* Allocate an array for the old entries plus the new ones. */ entries = raft_realloc(s->entries, (s->n + append->n) * sizeof *s->entries); assert(entries != NULL); @@ -753,11 +770,15 @@ static raft_time ioMethodTime(struct raft_io *raft_io) static int ioMethodRandom(struct raft_io *raft_io, int min, int max) { - struct io *io; - (void)min; - (void)max; - io = raft_io->impl; - return (int)io->randomized_election_timeout; + struct io *io = raft_io->impl; + int t = (int)io->randomized_election_timeout; + if (t < min) { + return min; + } else if (t > max) { + return max; + } else { + return t; + } } /* Queue up a request which will be processed later, when io_stub_flush() @@ -769,6 +790,7 @@ static int ioMethodSend(struct raft_io *raft_io, { struct io *io = raft_io->impl; struct send *r; + struct peer *peer; if (ioFaultTick(io)) { return RAFT_IOERR; @@ -785,9 +807,8 @@ static int ioMethodSend(struct raft_io *raft_io, r->message = *message; r->req->cb = cb; - /* TODO: simulate the presence of an OS send buffer, whose available size - * might delay the completion of send requests */ - r->completion_time = *io->time; + peer = ioGetPeer(io, message->server_id); + r->completion_time = *io->time + peer->send_latency; QUEUE_PUSH(&io->requests, &r->queue); @@ -841,6 +862,7 @@ static void ioConnect(struct raft_io *raft_io, struct raft_io *other) io->peers[io->n_peers].io = io_other; io->peers[io->n_peers].connected = true; io->peers[io->n_peers].saturated = false; + io->peers[io->n_peers].send_latency = SEND_LATENCY; io->n_peers++; } @@ -930,6 +952,7 @@ static int ioInit(struct raft_io *raft_io, unsigned index, raft_time *time) io->work_duration = WORK_DURATION; io->fault.countdown = -1; io->fault.n = -1; + io->disk_fault_countdown = -1; memset(io->drop, 0, sizeof io->drop); memset(io->n_send, 0, sizeof io->n_send); memset(io->n_recv, 0, sizeof io->n_recv); @@ -1940,6 +1963,16 @@ void raft_fixture_set_disk_latency(struct raft_fixture *f, io->disk_latency = msecs; } +void raft_fixture_set_send_latency(struct raft_fixture *f, + unsigned i, + unsigned j, + unsigned msecs) +{ + struct io *io = f->servers[i]->io.impl; + struct peer *peer = ioGetPeer(io, f->servers[j]->id); + peer->send_latency = msecs; +} + void raft_fixture_set_term(struct raft_fixture *f, unsigned i, raft_term term) { struct io *io = f->servers[i]->io.impl; @@ -1977,6 +2010,14 @@ void raft_fixture_io_fault(struct raft_fixture *f, io->fault.n = repeat; } +void raft_fixture_disk_write_fault(struct raft_fixture *f, + unsigned i, + int delay) +{ + struct io *io = f->servers[i]->io.impl; + io->disk_fault_countdown = delay; +} + unsigned raft_fixture_n_send(struct raft_fixture *f, unsigned i, int type) { struct io *io = f->servers[i]->io.impl; diff --git a/test/integration/test_replication.c b/test/integration/test_replication.c index f4081ad02..3d285e621 100644 --- a/test/integration/test_replication.c +++ b/test/integration/test_replication.c @@ -1234,3 +1234,48 @@ TEST(replication, failPersistBarrierFollower, setUp, tearDown, 0, NULL) return MUNIT_OK; } + +/* A leader originates a log entry, fails to persist it, and steps down. + * A follower that received the entry wins the ensuing election and sends + * the same entry back to the original leader, while the original leader + * still has an outgoing pending message that references its copy of the + * entry. This triggers the original leader to reinstate the entry in its + * log. */ +TEST(replication, receiveSameWithPendingSend, setUp, tearDown, 0, NULL) +{ + struct fixture *f = data; + struct raft_apply req; + + /* Three voters. */ + CLUSTER_GROW; + /* Server 0 is the leader. */ + BOOTSTRAP_START_AND_ELECT; + + /* Server 1 never gets the entry. */ + raft_fixture_set_send_latency(&f->cluster, 0, 1, 10000); + + /* Disk write fails, but not before the entry gets to server 2. */ + CLUSTER_SET_DISK_LATENCY(0, 1000); + raft_fixture_disk_write_fault(&f->cluster, 0, 0); + req.data = (void *)(intptr_t)RAFT_IOERR; + CLUSTER_APPLY_ADD_X(0, &req, 1, NULL); + /* Server 0 steps down. */ + CLUSTER_STEP_UNTIL_STATE_IS(0, RAFT_FOLLOWER, 1500); + munit_assert_ullong(CLUSTER_RAFT(0)->current_term, ==, 2); + ASSERT_FOLLOWER(1); + ASSERT_FOLLOWER(2); + /* Only server 2 has the new entry. */ + munit_assert_ullong(CLUSTER_RAFT(0)->last_stored, ==, 2); + munit_assert_ullong(CLUSTER_RAFT(1)->last_stored, ==, 2); + munit_assert_ullong(CLUSTER_RAFT(2)->last_stored, ==, 3); + + /* Server 2 times out first and wins the election. */ + raft_set_election_timeout(CLUSTER_RAFT(2), 500); + raft_fixture_start_elect(&f->cluster, 2); + CLUSTER_STEP_UNTIL_STATE_IS(2, RAFT_LEADER, 1000); + munit_assert_ullong(CLUSTER_RAFT(2)->current_term, ==, 3); + + /* Server 0 gets the same entry back from server 2. */ + CLUSTER_STEP_UNTIL_APPLIED(2, 3, 1000); + return MUNIT_OK; +}