Skip to content
This repository has been archived by the owner on Mar 4, 2024. It is now read-only.

Commit

Permalink
replication: Try to reinstate entries that have been truncated away
Browse files Browse the repository at this point in the history
Signed-off-by: Cole Miller <[email protected]>
  • Loading branch information
cole-miller committed Sep 28, 2023
1 parent b68076f commit daf5883
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 4 deletions.
52 changes: 48 additions & 4 deletions src/log.c
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ static int refsTryInsert(struct raft_entry_ref *table,
const raft_term term,
const raft_index index,
const unsigned short count,
struct raft_buffer buf,
void *batch,
bool *collision)
{
struct raft_entry_ref *bucket; /* Bucket associated with this index. */
Expand Down Expand Up @@ -103,6 +105,8 @@ static int refsTryInsert(struct raft_entry_ref *table,
slot->term = term;
slot->index = index;
slot->count = count;
slot->buf = buf;
slot->batch = batch;
slot->next = NULL;

*collision = false;
Expand Down Expand Up @@ -138,7 +142,7 @@ static int refsMove(struct raft_entry_ref *bucket,

/* Insert the reference count for this entry into the new table. */
rv = refsTryInsert(table, size, slot->term, slot->index, slot->count,
&collision);
slot->buf, slot->batch, &collision);

next_slot = slot->next;

Expand Down Expand Up @@ -205,7 +209,9 @@ static int refsGrow(struct raft_log *l)
* to 1. */
static int refsInit(struct raft_log *l,
const raft_term term,
const raft_index index)
const raft_index index,
struct raft_buffer buf,
void *batch)
{
int i;

Expand Down Expand Up @@ -233,7 +239,8 @@ static int refsInit(struct raft_log *l,
bool collision;
int rc;

rc = refsTryInsert(l->refs, l->refs_size, term, index, 1, &collision);
rc = refsTryInsert(l->refs, l->refs_size, term, index, 1, buf, batch,
&collision);
if (rc != 0) {
return RAFT_NOMEM;
}
Expand Down Expand Up @@ -480,6 +487,43 @@ static int ensureCapacity(struct raft_log *l)
return 0;
}

int logReinstate(struct raft_log *l, raft_term term, unsigned short type)
{
if (l->refs_size == 0) {
return 0;
}

raft_index index = logLastIndex(l) + 1;
size_t key = refsKey(index, l->refs_size);
struct raft_entry_ref *bucket = &l->refs[key];
struct raft_entry_ref *slot;
struct raft_entry *entry;

if (bucket->count == 0) {
return 0;
}

if (bucket->index != index) {
return 0;
}

for (slot = bucket; slot != NULL; slot = slot->next) {
if (slot->term == term) {
slot->count++;
l->back += 1;
l->back %= l->size;
entry = &l->entries[l->back];
entry->term = term;
entry->type = type;
entry->buf = slot->buf;
entry->batch = slot->batch;
return 1;
}
}

return 0;
}

int logAppend(struct raft_log *l,
const raft_term term,
const unsigned short type,
Expand All @@ -502,7 +546,7 @@ int logAppend(struct raft_log *l,

index = logLastIndex(l) + 1;

rv = refsInit(l, term, index);
rv = refsInit(l, term, index, *buf, batch);
if (rv != 0) {
return rv;
}
Expand Down
7 changes: 7 additions & 0 deletions src/log.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ struct raft_entry_ref
raft_term term; /* Term of the entry being ref-counted. */
raft_index index; /* Index of the entry being ref-counted. */
unsigned short count; /* Number of references. */
struct raft_buffer buf; /* Private buffer for the tracked entry */
void *batch; /* Batch pointer fore the tracked entry */
struct raft_entry_ref *next; /* Next item in the bucket (for collisions). */
};

Expand Down Expand Up @@ -90,6 +92,11 @@ raft_index logSnapshotIndex(struct raft_log *l);
* invoked. Return #NULL if there is no such entry. */
const struct raft_entry *logGet(struct raft_log *l, const raft_index index);

/* Check whether the hash map is already tracking an entry with the given
* @term and @index (that is not part of the "logical" log). If so, increment
* the refcount of that entry and return > 0; otherwise, return 0. */
int logReinstate(struct raft_log *l, raft_term term, unsigned short type);

/* Append a new entry to the log. */
int logAppend(struct raft_log *l,
raft_term term,
Expand Down
15 changes: 15 additions & 0 deletions src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -1171,6 +1171,21 @@ int replicationAppend(struct raft *r,
* that we issue below actually completes. */
for (j = 0; j < n; j++) {
struct raft_entry *entry = &args->entries[i + j];

/* We are trying to append an entry at index X with term T to our
* in-memory log. If we've gotten this far, we know that the log
* *logically* has no entry at this index. However, it's possible that
* we're still hanging on to such an entry, because we previously tried
* to append and replicate it, and the associated disk write failed, but
* some send requests are still pending that refer to it. Since the log
* is not capable of tracking multiple independent entries that share an
* index and term, we just piggyback on the already-stored entry in this
* case. */
rv = logReinstate(r->log, entry->term, entry->type);
if (rv > 0) {
continue;
}

/* TODO This copy should not strictly be necessary, as the batch logic
* will take care of freeing the batch buffer in which the entries are
* received. However, this would lead to memory spikes in certain edge
Expand Down

0 comments on commit daf5883

Please sign in to comment.