Skip to content

Commit

Permalink
Add functionality in journal table to delete a range of the journal, …
Browse files Browse the repository at this point in the history
…rather than the whole journal.
  • Loading branch information
slinkydeveloper committed Nov 22, 2024
1 parent 3052cce commit de5e0d1
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 8 deletions.
18 changes: 11 additions & 7 deletions crates/partition-store/src/journal_table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use restate_types::identifiers::{
};
use restate_types::storage::StorageCodec;
use std::io::Cursor;
use std::ops::RangeInclusive;
use std::ops::{Range, RangeInclusive};

define_table_key!(
Journal,
Expand Down Expand Up @@ -121,14 +121,14 @@ fn all_journals<S: StorageAccess>(
}))
}

fn delete_journal<S: StorageAccess>(
fn delete_journal_range<S: StorageAccess>(
storage: &mut S,
invocation_id: &InvocationId,
journal_length: EntryIndex,
journal_range: Range<EntryIndex>,
) {
let mut key = write_journal_entry_key(invocation_id, 0);
let k = &mut key;
for journal_index in 0..journal_length {
for journal_index in journal_range {
k.journal_index = Some(journal_index);
storage.delete_key(k);
}
Expand Down Expand Up @@ -201,10 +201,14 @@ impl<'a> JournalTable for PartitionStoreTransaction<'a> {
put_journal_entry(self, invocation_id, journal_index, journal_entry)
}

async fn delete_journal(&mut self, invocation_id: &InvocationId, journal_length: EntryIndex) {
async fn delete_journal_range(
&mut self,
invocation_id: &InvocationId,
journal_range: Range<EntryIndex>,
) {
self.assert_partition_key(invocation_id);
let _x = RocksDbPerfGuard::new("delete-journal");
delete_journal(self, invocation_id, journal_length)
let _x = RocksDbPerfGuard::new("delete-journal-range");
delete_journal_range(self, invocation_id, journal_range)
}
}

Expand Down
10 changes: 9 additions & 1 deletion crates/storage-api/src/journal_table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use restate_types::identifiers::{EntryIndex, InvocationId, JournalEntryId, Parti
use restate_types::journal::enriched::EnrichedRawEntry;
use restate_types::journal::{CompletionResult, EntryType};
use std::future::Future;
use std::ops::RangeInclusive;
use std::ops::{Range, RangeInclusive};

/// Different types of journal entries persisted by the runtime
#[derive(Debug, Clone, PartialEq, Eq)]
Expand Down Expand Up @@ -78,5 +78,13 @@ pub trait JournalTable: ReadOnlyJournalTable {
&mut self,
invocation_id: &InvocationId,
journal_length: EntryIndex,
) -> impl Future<Output = ()> + Send {
self.delete_journal_range(invocation_id, 0..journal_length)
}

fn delete_journal_range(
&mut self,
invocation_id: &InvocationId,
journal_range: Range<EntryIndex>,
) -> impl Future<Output = ()> + Send;
}

0 comments on commit de5e0d1

Please sign in to comment.