Skip to content

Commit

Permalink
Fix leak of pages in restore_savepoint()
Browse files Browse the repository at this point in the history
Pages allocated in the system tree or freed tree when the savepoint was
captured are currently leaked, because they are in a pending free state,
and so do not appear in the diff of the allocator state, but they are
also curretly being dropped from the freed tree.

This fixes the leak, but make restore_savepoint() slower and makes it
require more RAM
  • Loading branch information
cberner committed Aug 11, 2024
1 parent 3e69eaa commit 5ac1e6a
Show file tree
Hide file tree
Showing 8 changed files with 177 additions and 161 deletions.
173 changes: 85 additions & 88 deletions src/transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use std::cmp::min;
use std::collections::{HashMap, HashSet};
use std::fmt::{Debug, Display, Formatter};
use std::marker::PhantomData;
use std::ops::{RangeBounds, RangeFull};
use std::ops::RangeBounds;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use std::{panic, thread};
Expand Down Expand Up @@ -694,114 +694,111 @@ impl WriteTransaction {

// Restoring a savepoint needs to accomplish the following:
// 1) restore the table tree. This is trivial, since we have the old root
// 2) update the system tree to remove invalid persistent savepoints.
// 3) free all pages that were allocated since the savepoint and are unreachable
// from the restored table tree root. This is non-trivial, since we want to avoid walking
// the entire table tree, and we cannot simply restore the old allocator state as there
// could be read transactions that reference this data.
// 3a) First we free all allocated pages that were not allocated in the savepoint,
// except those referenced by the system table
// 3b) We re-process the freed table referenced by the savepoint, but ignore any pages that
// are already free

let allocated_since_savepoint = self
.mem
.pages_allocated_since_raw_state(savepoint.get_regional_allocator_states());

// We don't want to rollback the system tree, so keep any pages it references
let mut whitelist = self
.system_tables
// 1a) we also filter the freed tree to remove any pages referenced by the old root
// 2) free all pages that were allocated since the savepoint and are unreachable
// from the restored table tree root. Here we diff the reachable pages from the old
// and new roots
// 3) update the system tree to remove invalid persistent savepoints.

let old_table_tree = TableTreeMut::new(
savepoint.get_user_root(),
self.transaction_guard.clone(),
self.mem.clone(),
self.freed_pages.clone(),
);
// TODO: traversing these can be very slow in a large database. Speed this up.
let current_root_pages = self
.tables
.lock()
.unwrap()
.table_tree
.all_referenced_pages()?;
// The tracker page could have changed too. Don't erase it.
whitelist.insert(self.mem.region_tracker_page());
let old_root_pages = old_table_tree.all_referenced_pages()?;

// Find the oldest transaction in the current freed tree, for use below. We do this before
// freeing pages to ensure that this tree is still valid
let oldest_unprocessed_transaction = if let Some(entry) = self
.freed_tree
.lock()
.unwrap()
.range::<RangeFull, FreedTableKey>(&(..))?
.next()
{
entry?.key().transaction_id
} else {
self.transaction_id.raw_id()
};

let mut freed_pages = vec![];
for page in allocated_since_savepoint {
if whitelist.contains(&page) {
continue;
}
if self.mem.uncommitted(page) {
self.mem.free(page);
} else {
freed_pages.push(page);
}
}
*self.freed_pages.lock().unwrap() = freed_pages;
// 1) restore the table tree
self.tables.lock().unwrap().table_tree = TableTreeMut::new(
savepoint.get_user_root(),
self.transaction_guard.clone(),
self.mem.clone(),
self.freed_pages.clone(),
);

// Remove any freed pages that have already been processed. Otherwise this would result in a double free
let mut freed_tree = BtreeMut::new(
savepoint.get_freed_root(),
self.transaction_guard.clone(),
self.mem.clone(),
self.post_commit_frees.clone(),
);
let lookup_key = FreedTableKey {
transaction_id: oldest_unprocessed_transaction,
pagination_id: 0,
};
let mut freed_pages = self.freed_pages.lock().unwrap();
let mut freed_pages_hash = HashSet::new();
freed_pages_hash.extend(freed_pages.iter());
let mut to_remove = vec![];
for entry in freed_tree.range(&(..lookup_key))? {
let item = entry?;
to_remove.push(item.key());
let pages: FreedPageList = item.value();
for i in 0..pages.len() {
let page = pages.get(i);
if self.mem.is_page_out_of_bounds(page) {
// Page no longer exists because the database file shrank
continue;
}
// Re-process the freed pages, but ignore any that would be double-frees
if self.mem.is_allocated(page)
&& !freed_pages_hash.contains(&page)
&& !whitelist.contains(&page)
{
if self.mem.uncommitted(page) {
self.mem.free(page);
} else {
freed_pages.push(page);
freed_pages_hash.insert(page);
// 1a) filter any pages referenced by the old data root to bring them back to the committed state
let mut txn_id = savepoint.get_transaction_id().raw_id();
let mut freed_tree = self.freed_tree.lock().unwrap();
loop {
let lower = FreedTableKey {
transaction_id: txn_id,
pagination_id: 0,
};

if freed_tree.range(&(lower..))?.next().is_none() {
break;
}
let lower = FreedTableKey {
transaction_id: txn_id,
pagination_id: 0,
};
let upper = FreedTableKey {
transaction_id: txn_id + 1,
pagination_id: 0,
};

// Find all the pending pages for this txn and filter them
let mut pending_pages = vec![];
for entry in freed_tree.extract_from_if(&(lower..upper), |_, _| true)? {
let item = entry?;
for i in 0..item.value().len() {
let p = item.value().get(i);
if !old_root_pages[p.region as usize].is_allocated(p.page_index, p.page_order) {
pending_pages.push(p);
}
}
}
}
drop(freed_pages);
for key in to_remove {
freed_tree.remove(&key)?;

let mut pagination_counter = 0u64;
while !pending_pages.is_empty() {
let chunk_size = 100;
let buffer_size = FreedPageList::required_bytes(chunk_size);
let key = FreedTableKey {
transaction_id: txn_id,
pagination_id: pagination_counter,
};
let mut access_guard =
freed_tree.insert_reserve(&key, buffer_size.try_into().unwrap())?;

let len = pending_pages.len();
access_guard.as_mut().clear();
for page in pending_pages.drain(len - min(len, chunk_size)..) {
access_guard.as_mut().push_back(page);
}
drop(access_guard);

pagination_counter += 1;
}

txn_id += 1;
}

*self.freed_tree.lock().unwrap() = freed_tree;
// 2) free all pages that became unreachable
let mut freed_pages = self.freed_pages.lock().unwrap();
for i in 0..current_root_pages.len() {
let mut pages = vec![];
current_root_pages[i].difference(i.try_into().unwrap(), &old_root_pages[i], &mut pages);
for page in pages {
if self.mem.uncommitted(page) {
self.mem.free(page);
} else {
freed_pages.push(page);
}
}
}
drop(freed_pages);

// Invalidate all savepoints that are newer than the one being applied to prevent the user
// 3) Invalidate all savepoints that are newer than the one being applied to prevent the user
// from later trying to restore a savepoint "on another timeline"
self.transaction_tracker
.invalidate_savepoints_after(savepoint.get_id());

for persistent_savepoint in self.list_persistent_savepoints()? {
if persistent_savepoint > savepoint.get_id().0 {
self.delete_persistent_savepoint(persistent_savepoint)?;
Expand Down
23 changes: 6 additions & 17 deletions src/tree_store/page_store/buddy_allocator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,34 +225,23 @@ impl BuddyAllocator {
result
}

pub(crate) fn get_allocated_pages_since_savepoint(
pub(crate) fn difference(
&self,
region: u32,
state: &[u8],
other: &BuddyAllocator,
output: &mut Vec<PageNumber>,
) {
let max_order = state[0];
let num_pages = u32::from_le_bytes(state[1..5].try_into().unwrap());
let num_pages = other.len();

let mut data_start = 5 + size_of::<u32>() * (max_order as usize + 1);

for order in 0..=max_order {
let offset_index = 5 + size_of::<u32>() * (order as usize);
let data_end = u32::from_le_bytes(
state[offset_index..(offset_index + size_of::<u32>())]
.try_into()
.unwrap(),
) as usize;
let bytes = &state[data_start..data_end];
let savepoint_allocated = U64GroupedBitmap::from_bytes(bytes);
for order in 0..=self.max_order {
let other_allocated = other.get_order_allocated(order);
let self_allocated = self.get_order_allocated(order);
for i in self_allocated.difference(&savepoint_allocated) {
for i in self_allocated.difference(other_allocated) {
if i >= num_pages {
break;
}
output.push(PageNumber::new(region, i, order));
}
data_start = data_end;
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/tree_store/page_store/layout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ impl RegionLayout {
}

#[derive(Clone, Copy, Debug)]
pub(super) struct DatabaseLayout {
pub(crate) struct DatabaseLayout {
full_region_layout: RegionLayout,
num_full_regions: u32,
trailing_partial_region: Option<RegionLayout>,
Expand Down
2 changes: 2 additions & 0 deletions src/tree_store/page_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,7 @@ pub use savepoint::Savepoint;
pub(crate) use savepoint::SerializedSavepoint;

pub(super) use base::{PageImpl, PageMut};
pub(super) use buddy_allocator::BuddyAllocator;
pub(crate) use cached_file::CachePriority;
pub(super) use region::new_allocators;
pub(super) use xxh3::hash128_with_seed;
47 changes: 4 additions & 43 deletions src/tree_store/page_store/page_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -341,19 +341,6 @@ impl TransactionalMemory {
Ok(())
}

// Returns true if the page is beyond the last region: i.e. it no longer exists
pub(crate) fn is_page_out_of_bounds(&self, page: PageNumber) -> bool {
let state = self.state.lock().unwrap();
page.region as usize >= state.allocators.region_allocators.len()
}

pub(crate) fn is_allocated(&self, page: PageNumber) -> bool {
let state = self.state.lock().unwrap();
let allocator = state.get_region(page.region);

allocator.is_allocated(page.page_index, page.page_order)
}

pub(crate) fn mark_pages_allocated(
&self,
allocated_pages: impl Iterator<Item = Result<PageNumber>>,
Expand Down Expand Up @@ -423,10 +410,6 @@ impl TransactionalMemory {
result
}

pub(crate) fn region_tracker_page(&self) -> PageNumber {
self.state.lock().unwrap().header.region_tracker()
}

// Relocates the region tracker to a lower page, if possible
// Returns true if the page was moved
pub(crate) fn relocate_region_tracker(&self) -> Result<bool> {
Expand Down Expand Up @@ -465,32 +448,6 @@ impl TransactionalMemory {
regional_allocators
}

// Diffs region_states, which must be the result of calling get_raw_allocator_states(), against
// the currently allocated set of pages
pub(crate) fn pages_allocated_since_raw_state(
&self,
region_states: &[Vec<u8>],
) -> Vec<PageNumber> {
let mut result = vec![];
let state = self.state.lock().unwrap();

for i in 0..state.header.layout().num_regions() {
let current_state = state.get_region(i);
if let Some(old_state) = region_states.get(i as usize) {
current_state.get_allocated_pages_since_savepoint(i, old_state, &mut result);
} else {
// This region didn't exist, so everything is newly allocated
current_state.get_allocated_pages(i, &mut result);
}
}

// Don't include the region tracker, since we manage that internally to the TranscationalMemory
// Otherwise restoring a savepoint would free it.
result.retain(|x| *x != state.header.region_tracker());

result
}

// Commit all outstanding changes and make them visible as the primary
pub(crate) fn commit(
&self,
Expand Down Expand Up @@ -778,6 +735,10 @@ impl TransactionalMemory {
}
}

pub(crate) fn get_layout(&self) -> DatabaseLayout {
self.state.lock().unwrap().header.layout()
}

pub(crate) fn get_last_committed_transaction_id(&self) -> Result<TransactionId> {
let state = self.state.lock().unwrap();
if self.read_from_secondary.load(Ordering::Acquire) {
Expand Down
23 changes: 23 additions & 0 deletions src/tree_store/page_store/region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,19 @@ impl RegionTracker {
}
}

pub(crate) fn new_allocators(layout: DatabaseLayout) -> Vec<BuddyAllocator> {
let mut result = vec![];
for i in 0..layout.num_regions() {
let region_layout = layout.region_layout(i);
let allocator = BuddyAllocator::new(
region_layout.num_pages(),
layout.full_region_layout().num_pages(),
);
result.push(allocator);
}
result
}

pub(super) struct Allocators {
pub(super) region_tracker: RegionTracker,
pub(super) region_allocators: Vec<BuddyAllocator>,
Expand All @@ -138,6 +151,16 @@ impl Allocators {
}
}

// TODO: remove this at some point. It is useful for debugging though.
#[allow(dead_code)]
pub(super) fn print_all_allocated(&self) {
let mut pages = vec![];
for (i, allocator) in self.region_allocators.iter().enumerate() {
allocator.get_allocated_pages(i.try_into().unwrap(), &mut pages);
}
println!("Allocated pages: {pages:?}");
}

pub(crate) fn xxh3_hash(&self) -> u128 {
let mut result = xxh3_checksum(&self.region_tracker.to_vec());
for allocator in self.region_allocators.iter() {
Expand Down
4 changes: 0 additions & 4 deletions src/tree_store/page_store/savepoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,6 @@ impl Savepoint {
self.freed_root
}

pub(crate) fn get_regional_allocator_states(&self) -> &[Vec<u8>] {
&self.regional_allocators
}

pub(crate) fn db_address(&self) -> *const TransactionTracker {
self.transaction_tracker.as_ref() as *const _
}
Expand Down
Loading

0 comments on commit 5ac1e6a

Please sign in to comment.