diff --git a/include/raft/fixture.h b/include/raft/fixture.h index 635a9c4b0..1fb757703 100644 --- a/include/raft/fixture.h +++ b/include/raft/fixture.h @@ -394,6 +394,15 @@ RAFT_API void raft_fixture_set_disk_latency(struct raft_fixture *f, unsigned i, unsigned msecs); +/** + * Send the send latency in milliseconds. Each message send will take this many + * milliseconds before the send callback is invoked. + */ +RAFT_API void raft_fixture_set_send_latency(struct raft_fixture *f, + unsigned i, + unsigned j, + unsigned msecs); + /** * Set the persisted term of the @i'th server. */ @@ -415,14 +424,13 @@ RAFT_API void raft_fixture_add_entry(struct raft_fixture *f, unsigned i, struct raft_entry *entry); -/** - * Inject an I/O failure that will be triggered on the @i'th server after @delay - * I/O requests and occur @repeat times. - */ -RAFT_API void raft_fixture_io_fault(struct raft_fixture *f, - unsigned i, - int delay, - int repeat); +RAFT_API void raft_fixture_append_fault(struct raft_fixture *f, unsigned i, int delay); + +RAFT_API void raft_fixture_vote_fault(struct raft_fixture *f, unsigned i, int delay); + +RAFT_API void raft_fixture_term_fault(struct raft_fixture *f, unsigned i, int delay); + +RAFT_API void raft_fixture_send_fault(struct raft_fixture *f, unsigned i, int delay); /** * Return the number of messages of the given type that the @i'th server has diff --git a/src/fixture.c b/src/fixture.c index f1ad21dc4..344e8ae61 100644 --- a/src/fixture.c +++ b/src/fixture.c @@ -23,6 +23,7 @@ #define NETWORK_LATENCY 15 #define DISK_LATENCY 10 #define WORK_DURATION 200 +#define SEND_LATENCY 0 /* To keep in sync with raft.h */ #define N_MESSAGE_TYPES 6 @@ -132,6 +133,7 @@ struct peer struct io *io; /* The peer's I/O backend. */ bool connected; /* Whether a connection is established. */ bool saturated; /* Whether the established connection is saturated. */ + unsigned send_latency; }; /* Stub I/O implementation implementing all operations in-memory. */ @@ -171,11 +173,10 @@ struct io unsigned disk_latency; /* Milliseconds to perform disk I/O */ unsigned work_duration; /* Milliseconds to run async work */ - struct - { - int countdown; /* Trigger the fault when this counter gets to zero. */ - int n; /* Repeat the fault this many times. Default is -1. */ - } fault; + int append_fault_countdown; + int vote_fault_countdown; + int term_fault_countdown; + int send_fault_countdown; /* If flag i is true, messages of type i will be silently dropped. */ bool drop[N_MESSAGE_TYPES]; @@ -186,40 +187,13 @@ struct io unsigned n_append; }; -/* Advance the fault counters and return @true if an error should occur. */ -static bool ioFaultTick(struct io *io) +static bool faultTick(int *countdown) { - /* If the countdown is negative, faults are disabled. */ - if (io->fault.countdown < 0) { - return false; + bool trigger = *countdown == 0; + if (*countdown >= 0) { + *countdown -= 1; } - - /* If the countdown didn't reach zero, it's still not come the time to - * trigger faults. */ - if (io->fault.countdown > 0) { - io->fault.countdown--; - return false; - } - - assert(io->fault.countdown == 0); - - /* If n is negative we keep triggering the fault forever. */ - if (io->fault.n < 0) { - return true; - } - - /* If n is positive we need to trigger the fault at least this time. */ - if (io->fault.n > 0) { - io->fault.n--; - return true; - } - - assert(io->fault.n == 0); - - /* We reached 'n', let's disable faults. */ - io->fault.countdown--; - - return false; + return trigger; } static int ioMethodInit(struct raft_io *raft_io, @@ -238,9 +212,6 @@ static int ioMethodStart(struct raft_io *raft_io, raft_io_recv_cb recv_cb) { struct io *io = raft_io->impl; - if (ioFaultTick(io)) { - return RAFT_IOERR; - } io->tick_interval = msecs; io->tick_cb = tick_cb; io->recv_cb = recv_cb; @@ -257,7 +228,7 @@ static void ioFlushAppend(struct io *s, struct append *append) int status = 0; /* Simulates a disk write failure. */ - if (ioFaultTick(s)) { + if (faultTick(&s->append_fault_countdown)) { status = RAFT_IOERR; goto done; } @@ -408,7 +379,6 @@ static void ioFlushSend(struct io *io, struct send *send) break; } - /* tracef("io: flush: %s", describeMessage(&send->message)); */ io->n_send[send->message.type]++; status = 0; @@ -496,10 +466,6 @@ static int ioMethodLoad(struct raft_io *io, s = io->impl; - if (ioFaultTick(s)) { - return RAFT_IOERR; - } - *term = s->term; *voted_for = s->voted_for; *start_index = 1; @@ -532,10 +498,6 @@ static int ioMethodBootstrap(struct raft_io *raft_io, struct raft_entry *entries; int rv; - if (ioFaultTick(io)) { - return RAFT_IOERR; - } - if (io->term != 0) { return RAFT_CANTBOOTSTRAP; } @@ -582,7 +544,7 @@ static int ioMethodSetTerm(struct raft_io *raft_io, const raft_term term) { struct io *io = raft_io->impl; - if (ioFaultTick(io)) { + if (faultTick(&io->term_fault_countdown)) { return RAFT_IOERR; } @@ -596,11 +558,10 @@ static int ioMethodSetVote(struct raft_io *raft_io, const raft_id server_id) { struct io *io = raft_io->impl; - if (ioFaultTick(io)) { + if (faultTick(&io->vote_fault_countdown)) { return RAFT_IOERR; } - /* tracef("io: set vote: %d %d", server_id, io->index); */ io->voted_for = server_id; return 0; @@ -615,10 +576,6 @@ static int ioMethodAppend(struct raft_io *raft_io, struct io *io = raft_io->impl; struct append *r; - if (ioFaultTick(io)) { - return RAFT_IOERR; - } - r = raft_malloc(sizeof *r); assert(r != NULL); @@ -640,10 +597,6 @@ static int ioMethodTruncate(struct raft_io *raft_io, raft_index index) struct io *io = raft_io->impl; size_t n; - if (ioFaultTick(io)) { - return RAFT_IOERR; - } - n = (size_t)(index - 1); /* Number of entries left after truncation */ if (n > 0) { @@ -753,11 +706,15 @@ static raft_time ioMethodTime(struct raft_io *raft_io) static int ioMethodRandom(struct raft_io *raft_io, int min, int max) { - struct io *io; - (void)min; - (void)max; - io = raft_io->impl; - return (int)io->randomized_election_timeout; + struct io *io = raft_io->impl; + int t = (int)io->randomized_election_timeout; + if (t < min) { + return min; + } else if (t > max) { + return max; + } else { + return t; + } } /* Queue up a request which will be processed later, when io_stub_flush() @@ -769,14 +726,12 @@ static int ioMethodSend(struct raft_io *raft_io, { struct io *io = raft_io->impl; struct send *r; + struct peer *peer; - if (ioFaultTick(io)) { + if (faultTick(&io->send_fault_countdown)) { return RAFT_IOERR; } - /* tracef("io: send: %s to server %d", describeMessage(message), - message->server_id); */ - r = raft_malloc(sizeof *r); assert(r != NULL); @@ -785,9 +740,8 @@ static int ioMethodSend(struct raft_io *raft_io, r->message = *message; r->req->cb = cb; - /* TODO: simulate the presence of an OS send buffer, whose available size - * might delay the completion of send requests */ - r->completion_time = *io->time; + peer = ioGetPeer(io, message->server_id); + r->completion_time = *io->time + peer->send_latency; QUEUE_PUSH(&io->requests, &r->queue); @@ -796,8 +750,6 @@ static int ioMethodSend(struct raft_io *raft_io, static void ioReceive(struct io *io, struct raft_message *message) { - /* tracef("io: recv: %s from server %d", describeMessage(message), - message->server_id); */ io->recv_cb(io->io, message); io->n_recv[message->type]++; } @@ -841,6 +793,7 @@ static void ioConnect(struct raft_io *raft_io, struct raft_io *other) io->peers[io->n_peers].io = io_other; io->peers[io->n_peers].connected = true; io->peers[io->n_peers].saturated = false; + io->peers[io->n_peers].send_latency = SEND_LATENCY; io->n_peers++; } @@ -928,8 +881,10 @@ static int ioInit(struct raft_io *raft_io, unsigned index, raft_time *time) io->network_latency = NETWORK_LATENCY; io->disk_latency = DISK_LATENCY; io->work_duration = WORK_DURATION; - io->fault.countdown = -1; - io->fault.n = -1; + io->append_fault_countdown = -1; + io->vote_fault_countdown = -1; + io->term_fault_countdown = -1; + io->send_fault_countdown = -1; memset(io->drop, 0, sizeof io->drop); memset(io->n_send, 0, sizeof io->n_send); memset(io->n_recv, 0, sizeof io->n_recv); @@ -1940,6 +1895,16 @@ void raft_fixture_set_disk_latency(struct raft_fixture *f, io->disk_latency = msecs; } +void raft_fixture_set_send_latency(struct raft_fixture *f, + unsigned i, + unsigned j, + unsigned msecs) +{ + struct io *io = f->servers[i]->io.impl; + struct peer *peer = ioGetPeer(io, f->servers[j]->id); + peer->send_latency = msecs; +} + void raft_fixture_set_term(struct raft_fixture *f, unsigned i, raft_term term) { struct io *io = f->servers[i]->io.impl; @@ -1967,14 +1932,28 @@ void raft_fixture_add_entry(struct raft_fixture *f, io->n++; } -void raft_fixture_io_fault(struct raft_fixture *f, - unsigned i, - int delay, - int repeat) +void raft_fixture_append_fault(struct raft_fixture *f, unsigned i, int delay) +{ + struct io *io = f->servers[i]->io.impl; + io->append_fault_countdown = delay; +} + +void raft_fixture_vote_fault(struct raft_fixture *f, unsigned i, int delay) +{ + struct io *io = f->servers[i]->io.impl; + io->vote_fault_countdown = delay; +} + +void raft_fixture_term_fault(struct raft_fixture *f, unsigned i, int delay) +{ + struct io *io = f->servers[i]->io.impl; + io->term_fault_countdown = delay; +} + +void raft_fixture_send_fault(struct raft_fixture *f, unsigned i, int delay) { struct io *io = f->servers[i]->io.impl; - io->fault.countdown = delay; - io->fault.n = repeat; + io->send_fault_countdown = delay; } unsigned raft_fixture_n_send(struct raft_fixture *f, unsigned i, int type) diff --git a/src/log.c b/src/log.c index 1fcb8449a..a22198881 100644 --- a/src/log.c +++ b/src/log.c @@ -36,6 +36,8 @@ static int refsTryInsert(struct raft_entry_ref *table, const raft_term term, const raft_index index, const unsigned short count, + struct raft_buffer buf, + void *batch, bool *collision) { struct raft_entry_ref *bucket; /* Bucket associated with this index. */ @@ -103,6 +105,8 @@ static int refsTryInsert(struct raft_entry_ref *table, slot->term = term; slot->index = index; slot->count = count; + slot->buf = buf; + slot->batch = batch; slot->next = NULL; *collision = false; @@ -138,7 +142,7 @@ static int refsMove(struct raft_entry_ref *bucket, /* Insert the reference count for this entry into the new table. */ rv = refsTryInsert(table, size, slot->term, slot->index, slot->count, - &collision); + slot->buf, slot->batch, &collision); next_slot = slot->next; @@ -205,7 +209,9 @@ static int refsGrow(struct raft_log *l) * to 1. */ static int refsInit(struct raft_log *l, const raft_term term, - const raft_index index) + const raft_index index, + struct raft_buffer buf, + void *batch) { int i; @@ -233,7 +239,8 @@ static int refsInit(struct raft_log *l, bool collision; int rc; - rc = refsTryInsert(l->refs, l->refs_size, term, index, 1, &collision); + rc = refsTryInsert(l->refs, l->refs_size, term, index, 1, buf, batch, + &collision); if (rc != 0) { return RAFT_NOMEM; } @@ -480,6 +487,53 @@ static int ensureCapacity(struct raft_log *l) return 0; } +int logReinstate(struct raft_log *l, + raft_term term, + unsigned short type, + bool *reinstated) +{ + raft_index index; + size_t key; + struct raft_entry_ref *bucket; + struct raft_entry_ref *slot; + struct raft_entry *entry; + int rv; + + *reinstated = false; + + if (l->refs_size == 0) { + return 0; + } + + index = logLastIndex(l) + 1; + key = refsKey(index, l->refs_size); + bucket = &l->refs[key]; + if (bucket->count == 0 || bucket->index != index) { + return 0; + } + + for (slot = bucket; slot != NULL; slot = slot->next) { + if (slot->term == term) { + rv = ensureCapacity(l); + if (rv != 0) { + return rv; + } + slot->count++; + l->back++; + l->back %= l->size; + entry = &l->entries[l->back]; + entry->term = term; + entry->type = type; + entry->buf = slot->buf; + entry->batch = slot->batch; + *reinstated = true; + break; + } + } + + return 0; +} + int logAppend(struct raft_log *l, const raft_term term, const unsigned short type, @@ -502,7 +556,7 @@ int logAppend(struct raft_log *l, index = logLastIndex(l) + 1; - rv = refsInit(l, term, index); + rv = refsInit(l, term, index, *buf, batch); if (rv != 0) { return rv; } diff --git a/src/log.h b/src/log.h index 9a33aa694..65e196a52 100644 --- a/src/log.h +++ b/src/log.h @@ -24,9 +24,16 @@ */ struct raft_entry_ref { - raft_term term; /* Term of the entry being ref-counted. */ - raft_index index; /* Index of the entry being ref-counted. */ - unsigned short count; /* Number of references. */ + raft_term term; /* Term of the entry being ref-counted. */ + raft_index index; /* Index of the entry being ref-counted. */ + unsigned short count; /* Number of references. */ + /* The next two fields are copied from the corresponding fields of the + * raft_entry pointed to by this reference. We store them here as well, + * so that logReinstate can retrieve them when it finds a raft_entry_ref + * with the same index and term as it was passed, and create a full + * raft_entry using them. */ + struct raft_buffer buf; + void *batch; struct raft_entry_ref *next; /* Next item in the bucket (for collisions). */ }; @@ -85,11 +92,20 @@ raft_term logTermOf(struct raft_log *l, raft_index index); * there are no snapshots. */ raft_index logSnapshotIndex(struct raft_log *l); -/* Get the entry with the given index. * The returned pointer remains valid only +/* Get the entry with the given index. The returned pointer remains valid only * as long as no API that might delete the entry with the given index is * invoked. Return #NULL if there is no such entry. */ const struct raft_entry *logGet(struct raft_log *l, const raft_index index); +/* Check whether the hash map is already tracking an entry with the given + * @term and @index (that is not part of the "logical" log). If so, increment + * the refcount of that entry and set @reinstated to true; otherwise, set + * @reinstated to false. */ +int logReinstate(struct raft_log *l, + raft_term term, + unsigned short type, + bool *reinstated); + /* Append a new entry to the log. */ int logAppend(struct raft_log *l, raft_term term, @@ -108,7 +124,7 @@ int logAppendConfiguration(struct raft_log *l, const raft_term term, const struct raft_configuration *configuration); -/* Acquire an array of entries from the given index onwards. * The payload +/* Acquire an array of entries from the given index onwards. The payload * memory referenced by the @buf attribute of the returned entries is guaranteed * to be valid until logRelease() is called. */ int logAcquire(struct raft_log *l, diff --git a/src/replication.c b/src/replication.c index 6cc65acc7..0d79d7637 100644 --- a/src/replication.c +++ b/src/replication.c @@ -1099,6 +1099,7 @@ int replicationAppend(struct raft *r, size_t n; size_t i; size_t j; + bool reinstated; int rv; assert(r != NULL); @@ -1171,6 +1172,23 @@ int replicationAppend(struct raft *r, * that we issue below actually completes. */ for (j = 0; j < n; j++) { struct raft_entry *entry = &args->entries[i + j]; + + /* We are trying to append an entry at index X with term T to our + * in-memory log. If we've gotten this far, we know that the log + * *logically* has no entry at this index. However, it's possible that + * we're still hanging on to such an entry, because we previously tried + * to append and replicate it, and the associated disk write failed, but + * some send requests are still pending that refer to it. Since the log + * is not capable of tracking multiple independent entries that share an + * index and term, we just piggyback on the already-stored entry in this + * case. */ + rv = logReinstate(r->log, entry->term, entry->type, &reinstated); + if (rv != 0) { + goto err_after_request_alloc; + } else if (reinstated) { + continue; + } + /* TODO This copy should not strictly be necessary, as the batch logic * will take care of freeing the batch buffer in which the entries are * received. However, this would lead to memory spikes in certain edge diff --git a/test/integration/test_election.c b/test/integration/test_election.c index 5452a7c40..7d0569b3d 100644 --- a/test/integration/test_election.c +++ b/test/integration/test_election.c @@ -516,24 +516,26 @@ TEST(election, receiveRejectResult, setUp, tearDown, 0, cluster_5_params) return MUNIT_OK; } -static char *ioErrorConvertDelay[] = {"0", "1", NULL}; -static MunitParameterEnum ioErrorConvert[] = { - {"delay", ioErrorConvertDelay}, - {NULL, NULL}, -}; +/* An I/O error occurs when converting to candidate. */ +TEST(election, ioErrorConvertTerm, setUp, tearDown, 0, NULL) +{ + struct fixture *f = data; + CLUSTER_START; + + raft_fixture_term_fault(&f->cluster, 0, 0); + CLUSTER_STEP_UNTIL_STATE_IS(0, RAFT_UNAVAILABLE, 2000); + + return MUNIT_OK; +} /* An I/O error occurs when converting to candidate. */ -TEST(election, ioErrorConvert, setUp, tearDown, 0, ioErrorConvert) +TEST(election, ioErrorConvertVote, setUp, tearDown, 0, NULL) { struct fixture *f = data; - const char *delay = munit_parameters_get(params, "delay"); - return MUNIT_SKIP; CLUSTER_START; - /* The first server fails to convert to candidate. */ - CLUSTER_IO_FAULT(0, atoi(delay), 1); - CLUSTER_STEP; - ASSERT_UNAVAILABLE(0); + raft_fixture_vote_fault(&f->cluster, 0, 0); + CLUSTER_STEP_UNTIL_STATE_IS(0, RAFT_UNAVAILABLE, 2000); return MUNIT_OK; } @@ -542,16 +544,11 @@ TEST(election, ioErrorConvert, setUp, tearDown, 0, ioErrorConvert) TEST(election, ioErrorSendVoteRequest, setUp, tearDown, 0, NULL) { struct fixture *f = data; - return MUNIT_SKIP; CLUSTER_START; /* The first server fails to send a RequestVote RPC. */ - CLUSTER_IO_FAULT(0, 2, 1); - CLUSTER_STEP; - - /* The first server is still candidate. */ - CLUSTER_STEP; - ASSERT_CANDIDATE(0); + raft_fixture_send_fault(&f->cluster, 0, 0); + CLUSTER_STEP_UNTIL_STATE_IS(0, RAFT_LEADER, 5000); return MUNIT_OK; } @@ -560,18 +557,15 @@ TEST(election, ioErrorSendVoteRequest, setUp, tearDown, 0, NULL) TEST(election, ioErrorPersistVote, setUp, tearDown, 0, NULL) { struct fixture *f = data; - return MUNIT_SKIP; CLUSTER_START; /* The first server becomes candidate. */ - CLUSTER_STEP; - ASSERT_CANDIDATE(0); + CLUSTER_STEP_UNTIL_STATE_IS(0, RAFT_CANDIDATE, 2000); /* The second server receives a RequestVote RPC but fails to persist its * vote. */ - CLUSTER_IO_FAULT(1, 0, 1); - CLUSTER_STEP; - ASSERT_UNAVAILABLE(1); + raft_fixture_vote_fault(&f->cluster, 1, 0); + CLUSTER_STEP_UNTIL_STATE_IS(1, RAFT_UNAVAILABLE, 1000); return MUNIT_OK; } diff --git a/test/integration/test_replication.c b/test/integration/test_replication.c index f4081ad02..cff5ab982 100644 --- a/test/integration/test_replication.c +++ b/test/integration/test_replication.c @@ -456,14 +456,13 @@ TEST(replication, sendOom, setUp, tearDown, 0, send_oom_params) } /* A failure occurs upon submitting the I/O request. */ -TEST(replication, sendIoError, setUp, tearDown, 0, NULL) +TEST(replication, persistError, setUp, tearDown, 0, NULL) { struct fixture *f = data; - return MUNIT_SKIP; struct raft_apply req; BOOTSTRAP_START_AND_ELECT; - CLUSTER_IO_FAULT(0, 1, 1); + raft_fixture_append_fault(&f->cluster, 0, 0); CLUSTER_APPLY_ADD_X(0, &req, 1, NULL); CLUSTER_STEP; @@ -1090,7 +1089,7 @@ TEST(replication, diskWriteFailure, setUp, tearDown, 0, NULL) req->data = (void *)(intptr_t)RAFT_IOERR; BOOTSTRAP_START_AND_ELECT; - CLUSTER_IO_FAULT(0, 1, 1); + raft_fixture_append_fault(&f->cluster, 0, 0); CLUSTER_APPLY_ADD_X(0, req, 1, applyAssertStatusCb); /* The leader steps down when its disk write fails. */ CLUSTER_STEP_UNTIL_STATE_IS(0, RAFT_FOLLOWER, 2000); @@ -1198,7 +1197,7 @@ TEST(replication, failPersistBarrier, setUp, tearDown, 0, NULL) CLUSTER_GROW; /* Server 0 will fail to persist entry 2, a barrier */ - CLUSTER_IO_FAULT(0, 10, 1); + raft_fixture_append_fault(&f->cluster, 0, 0); /* Server 0 gets elected and creates a barrier entry at index 2 */ CLUSTER_BOOTSTRAP; @@ -1220,8 +1219,8 @@ TEST(replication, failPersistBarrierFollower, setUp, tearDown, 0, NULL) CLUSTER_GROW; /* The servers will fail to persist entry 2, a barrier */ - CLUSTER_IO_FAULT(1, 7, 1); - CLUSTER_IO_FAULT(2, 7, 1); + raft_fixture_append_fault(&f->cluster, 1, 0); + raft_fixture_append_fault(&f->cluster, 2, 0); /* Server 0 gets elected and creates a barrier entry at index 2 */ CLUSTER_BOOTSTRAP; @@ -1234,3 +1233,48 @@ TEST(replication, failPersistBarrierFollower, setUp, tearDown, 0, NULL) return MUNIT_OK; } + +/* A leader originates a log entry, fails to persist it, and steps down. + * A follower that received the entry wins the ensuing election and sends + * the same entry back to the original leader, while the original leader + * still has an outgoing pending message that references its copy of the + * entry. This triggers the original leader to reinstate the entry in its + * log. */ +TEST(replication, receiveSameWithPendingSend, setUp, tearDown, 0, NULL) +{ + struct fixture *f = data; + struct raft_apply req; + + /* Three voters. */ + CLUSTER_GROW; + /* Server 0 is the leader. */ + BOOTSTRAP_START_AND_ELECT; + + /* Server 1 never gets the entry. */ + raft_fixture_set_send_latency(&f->cluster, 0, 1, 10000); + + /* Disk write fails, but not before the entry gets to server 2. */ + CLUSTER_SET_DISK_LATENCY(0, 1000); + raft_fixture_append_fault(&f->cluster, 0, 0); + req.data = (void *)(intptr_t)RAFT_IOERR; + CLUSTER_APPLY_ADD_X(0, &req, 1, NULL); + /* Server 0 steps down. */ + CLUSTER_STEP_UNTIL_STATE_IS(0, RAFT_FOLLOWER, 1500); + munit_assert_ullong(CLUSTER_RAFT(0)->current_term, ==, 2); + ASSERT_FOLLOWER(1); + ASSERT_FOLLOWER(2); + /* Only server 2 has the new entry. */ + munit_assert_ullong(CLUSTER_RAFT(0)->last_stored, ==, 2); + munit_assert_ullong(CLUSTER_RAFT(1)->last_stored, ==, 2); + munit_assert_ullong(CLUSTER_RAFT(2)->last_stored, ==, 3); + + /* Server 2 times out first and wins the election. */ + raft_set_election_timeout(CLUSTER_RAFT(2), 500); + raft_fixture_start_elect(&f->cluster, 2); + CLUSTER_STEP_UNTIL_STATE_IS(2, RAFT_LEADER, 1000); + munit_assert_ullong(CLUSTER_RAFT(2)->current_term, ==, 3); + + /* Server 0 gets the same entry back from server 2. */ + CLUSTER_STEP_UNTIL_APPLIED(2, 3, 1000); + return MUNIT_OK; +} diff --git a/test/lib/cluster.h b/test/lib/cluster.h index decfdb3c6..1cdfc6464 100644 --- a/test/lib/cluster.h +++ b/test/lib/cluster.h @@ -418,10 +418,6 @@ #define CLUSTER_ADD_ENTRY(I, ENTRY) \ raft_fixture_add_entry(&f->cluster, I, ENTRY) -/* Make an I/O error occur on the I'th server after @DELAY operations. */ -#define CLUSTER_IO_FAULT(I, DELAY, REPEAT) \ - raft_fixture_io_fault(&f->cluster, I, DELAY, REPEAT) - /* Return the number of messages sent by the given server. */ #define CLUSTER_N_SEND(I, TYPE) raft_fixture_n_send(&f->cluster, I, TYPE)