Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes to savepoint restoration #833

Merged
merged 4 commits into from
Jul 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 53 additions & 21 deletions src/transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -692,28 +692,52 @@ impl WriteTransaction {
assert_eq!(self.mem.get_version(), savepoint.get_version());
self.dirty.store(true, Ordering::Release);

// Restoring a savepoint needs to accomplish the following:
// 1) restore the table tree. This is trivial, since we have the old root
// 2) update the system tree to remove invalid persistent savepoints.
// 3) free all pages that were allocated since the savepoint and are unreachable
// from the restored table tree root. This is non-trivial, since we want to avoid walking
// the entire table tree, and we cannot simply restore the old allocator state as there
// could be read transactions that reference this data.
// 3a) First we free all allocated pages that were not allocated in the savepoint,
// except those referenced by the system table
// 3b) We re-process the freed table referenced by the savepoint, but ignore any pages that
// are already free

let allocated_since_savepoint = self
.mem
.pages_allocated_since_raw_state(savepoint.get_regional_allocator_states());

// We don't want to rollback the system tree, so keep any pages it references
let referenced_by_system_tree = self
let mut whitelist = self
.system_tables
.lock()
.unwrap()
.table_tree
.all_referenced_pages()?;
// The tracker page could have changed too. Don't erase it.
whitelist.insert(self.mem.region_tracker_page());

// Find the oldest transaction in the current freed tree, for use below. We do this before
// freeing pages to ensure that this tree is still valid
let oldest_unprocessed_transaction = if let Some(entry) = self
.freed_tree
.lock()
.unwrap()
.range::<RangeFull, FreedTableKey>(&(..))?
.next()
{
entry?.key().transaction_id
} else {
self.transaction_id.raw_id()
};

let mut freed_pages = vec![];
for page in allocated_since_savepoint {
if referenced_by_system_tree.contains(&page) {
if whitelist.contains(&page) {
continue;
}
if self.mem.uncommitted(page) {
self.mem.free(page);
} else {
freed_pages.push(page);
}
freed_pages.push(page);
}
*self.freed_pages.lock().unwrap() = freed_pages;
self.tables.lock().unwrap().table_tree = TableTreeMut::new(
Expand All @@ -724,19 +748,6 @@ impl WriteTransaction {
);

// Remove any freed pages that have already been processed. Otherwise this would result in a double free
// We assume below that PageNumber is length 8
let oldest_unprocessed_transaction = if let Some(entry) = self
.freed_tree
.lock()
.unwrap()
.range::<RangeFull, FreedTableKey>(&(..))?
.next()
{
entry?.key().transaction_id
} else {
self.transaction_id.raw_id()
};

let mut freed_tree = BtreeMut::new(
savepoint.get_freed_root(),
self.transaction_guard.clone(),
Expand All @@ -747,10 +758,31 @@ impl WriteTransaction {
transaction_id: oldest_unprocessed_transaction,
pagination_id: 0,
};
let mut freed_pages = self.freed_pages.lock().unwrap();
let mut freed_pages_hash = HashSet::new();
freed_pages_hash.extend(freed_pages.iter());
let mut to_remove = vec![];
for entry in freed_tree.range(&(..lookup_key))? {
to_remove.push(entry?.key());
let item = entry?;
to_remove.push(item.key());
let pages: FreedPageList = item.value();
for i in 0..pages.len() {
let page = pages.get(i);
if self.mem.is_page_out_of_bounds(page) {
// Page no longer exists because the database file shrank
continue;
}
// Re-process the freed pages, but ignore any that would be double-frees
if self.mem.is_allocated(page)
&& !freed_pages_hash.contains(&page)
&& !whitelist.contains(&page)
{
freed_pages.push(page);
freed_pages_hash.insert(page);
}
}
}
drop(freed_pages);
for key in to_remove {
freed_tree.remove(&key)?;
}
Expand Down
17 changes: 17 additions & 0 deletions src/tree_store/page_store/page_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,19 @@ impl TransactionalMemory {
Ok(())
}

// Returns true if the page is beyond the last region: i.e. it no longer exists
pub(crate) fn is_page_out_of_bounds(&self, page: PageNumber) -> bool {
let state = self.state.lock().unwrap();
page.region as usize >= state.allocators.region_allocators.len()
}

pub(crate) fn is_allocated(&self, page: PageNumber) -> bool {
let state = self.state.lock().unwrap();
let allocator = state.get_region(page.region);

allocator.is_allocated(page.page_index, page.page_order)
}

pub(crate) fn mark_pages_allocated(
&self,
allocated_pages: impl Iterator<Item = Result<PageNumber>>,
Expand Down Expand Up @@ -410,6 +423,10 @@ impl TransactionalMemory {
result
}

pub(crate) fn region_tracker_page(&self) -> PageNumber {
self.state.lock().unwrap().header.region_tracker()
}

// Relocates the region tracker to a lower page, if possible
// Returns true if the page was moved
pub(crate) fn relocate_region_tracker(&self) -> Result<bool> {
Expand Down
67 changes: 67 additions & 0 deletions tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1002,6 +1002,73 @@ fn regression22() {
assert_eq!(allocated_pages, txn.stats().unwrap().allocated_pages());
}

#[test]
fn regression23() {
let tmpfile = create_tempfile();

let db = Database::create(tmpfile.path()).unwrap();
let txn = db.begin_write().unwrap();
{
// List the savepoints to ensure the system table is created and occupies a page
#[allow(unused_must_use)]
{
txn.list_persistent_savepoints().unwrap();
}
let mut table = txn.open_table(U64_TABLE).unwrap();
table.insert(0, 0).unwrap();
}
txn.commit().unwrap();

let txn = db.begin_write().unwrap();
{
let mut table = txn.open_table(U64_TABLE).unwrap();
table.remove(0).unwrap();
}
txn.commit().unwrap();

// Extra commit to finalize the cleanup of the freed pages
let txn = db.begin_write().unwrap();
txn.commit().unwrap();

let txn = db.begin_write().unwrap();
let allocated_pages = txn.stats().unwrap().allocated_pages();
{
let mut table = txn.open_table(U64_TABLE).unwrap();
table.insert(0, 0).unwrap();
}
txn.commit().unwrap();

let txn = db.begin_write().unwrap();
{
let mut table = txn.open_table(U64_TABLE).unwrap();
table.remove(0).unwrap();
}
txn.commit().unwrap();

let txn = db.begin_write().unwrap();
let savepoint = txn.ephemeral_savepoint().unwrap();
txn.commit().unwrap();

let txn = db.begin_write().unwrap();
{
let mut table = txn.open_table(U64_TABLE).unwrap();
table.insert(0, 0).unwrap();
}
txn.commit().unwrap();

let mut txn = db.begin_write().unwrap();
txn.restore_savepoint(&savepoint).unwrap();
txn.commit().unwrap();
drop(savepoint);

// Extra commit to finalize the cleanup of the freed pages.
// There was a bug where the restoration of the savepoint would leak pages
db.begin_write().unwrap().commit().unwrap();

let txn = db.begin_write().unwrap();
assert_eq!(allocated_pages, txn.stats().unwrap().allocated_pages());
}

#[test]
fn check_integrity_clean() {
let tmpfile = create_tempfile();
Expand Down
Loading