From 9f3b09e41b624cff330b055d3639882ada20956e Mon Sep 17 00:00:00 2001 From: Christopher Berner Date: Sun, 11 Aug 2024 15:14:14 -0700 Subject: [PATCH] Fix leak of pages in restore_savepoint() Pages allocated in the system tree or freed tree when the savepoint was captured are currently leaked, because they are in a pending free state, and so do not appear in the diff of the allocator state, but they are also curretly being dropped from the freed tree. This fixes the leak, but make restore_savepoint() slower --- src/transactions.rs | 173 +++++++++---------- src/tree_store/page_store/buddy_allocator.rs | 23 +-- src/tree_store/page_store/layout.rs | 2 +- src/tree_store/page_store/mod.rs | 2 + src/tree_store/page_store/page_manager.rs | 47 +---- src/tree_store/page_store/region.rs | 23 +++ src/tree_store/page_store/savepoint.rs | 4 - src/tree_store/table_tree.rs | 64 ++++++- 8 files changed, 177 insertions(+), 161 deletions(-) diff --git a/src/transactions.rs b/src/transactions.rs index 369b2c60..c0f69071 100644 --- a/src/transactions.rs +++ b/src/transactions.rs @@ -23,7 +23,7 @@ use std::cmp::min; use std::collections::{HashMap, HashSet}; use std::fmt::{Debug, Display, Formatter}; use std::marker::PhantomData; -use std::ops::{RangeBounds, RangeFull}; +use std::ops::RangeBounds; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex}; use std::{panic, thread}; @@ -694,56 +694,28 @@ impl WriteTransaction { // Restoring a savepoint needs to accomplish the following: // 1) restore the table tree. This is trivial, since we have the old root - // 2) update the system tree to remove invalid persistent savepoints. - // 3) free all pages that were allocated since the savepoint and are unreachable - // from the restored table tree root. This is non-trivial, since we want to avoid walking - // the entire table tree, and we cannot simply restore the old allocator state as there - // could be read transactions that reference this data. - // 3a) First we free all allocated pages that were not allocated in the savepoint, - // except those referenced by the system table - // 3b) We re-process the freed table referenced by the savepoint, but ignore any pages that - // are already free - - let allocated_since_savepoint = self - .mem - .pages_allocated_since_raw_state(savepoint.get_regional_allocator_states()); - - // We don't want to rollback the system tree, so keep any pages it references - let mut whitelist = self - .system_tables + // 1a) we also filter the freed tree to remove any pages referenced by the old root + // 2) free all pages that were allocated since the savepoint and are unreachable + // from the restored table tree root. Here we diff the reachable pages from the old + // and new roots + // 3) update the system tree to remove invalid persistent savepoints. + + let old_table_tree = TableTreeMut::new( + savepoint.get_user_root(), + self.transaction_guard.clone(), + self.mem.clone(), + self.freed_pages.clone(), + ); + // TODO: traversing these can be very slow in a large database. Speed this up. + let current_root_pages = self + .tables .lock() .unwrap() .table_tree .all_referenced_pages()?; - // The tracker page could have changed too. Don't erase it. - whitelist.insert(self.mem.region_tracker_page()); + let old_root_pages = old_table_tree.all_referenced_pages()?; - // Find the oldest transaction in the current freed tree, for use below. We do this before - // freeing pages to ensure that this tree is still valid - let oldest_unprocessed_transaction = if let Some(entry) = self - .freed_tree - .lock() - .unwrap() - .range::(&(..))? - .next() - { - entry?.key().transaction_id - } else { - self.transaction_id.raw_id() - }; - - let mut freed_pages = vec![]; - for page in allocated_since_savepoint { - if whitelist.contains(&page) { - continue; - } - if self.mem.uncommitted(page) { - self.mem.free(page); - } else { - freed_pages.push(page); - } - } - *self.freed_pages.lock().unwrap() = freed_pages; + // 1) restore the table tree self.tables.lock().unwrap().table_tree = TableTreeMut::new( savepoint.get_user_root(), self.transaction_guard.clone(), @@ -751,57 +723,82 @@ impl WriteTransaction { self.freed_pages.clone(), ); - // Remove any freed pages that have already been processed. Otherwise this would result in a double free - let mut freed_tree = BtreeMut::new( - savepoint.get_freed_root(), - self.transaction_guard.clone(), - self.mem.clone(), - self.post_commit_frees.clone(), - ); - let lookup_key = FreedTableKey { - transaction_id: oldest_unprocessed_transaction, - pagination_id: 0, - }; - let mut freed_pages = self.freed_pages.lock().unwrap(); - let mut freed_pages_hash = HashSet::new(); - freed_pages_hash.extend(freed_pages.iter()); - let mut to_remove = vec![]; - for entry in freed_tree.range(&(..lookup_key))? { - let item = entry?; - to_remove.push(item.key()); - let pages: FreedPageList = item.value(); - for i in 0..pages.len() { - let page = pages.get(i); - if self.mem.is_page_out_of_bounds(page) { - // Page no longer exists because the database file shrank - continue; - } - // Re-process the freed pages, but ignore any that would be double-frees - if self.mem.is_allocated(page) - && !freed_pages_hash.contains(&page) - && !whitelist.contains(&page) - { - if self.mem.uncommitted(page) { - self.mem.free(page); - } else { - freed_pages.push(page); - freed_pages_hash.insert(page); + // 1a) filter any pages referenced by the old data root to bring them back to the committed state + let mut txn_id = savepoint.get_transaction_id().raw_id(); + let mut freed_tree = self.freed_tree.lock().unwrap(); + loop { + let lower = FreedTableKey { + transaction_id: txn_id, + pagination_id: 0, + }; + + if freed_tree.range(&(lower..))?.next().is_none() { + break; + } + let lower = FreedTableKey { + transaction_id: txn_id, + pagination_id: 0, + }; + let upper = FreedTableKey { + transaction_id: txn_id + 1, + pagination_id: 0, + }; + + // Find all the pending pages for this txn and filter them + let mut pending_pages = vec![]; + for entry in freed_tree.extract_from_if(&(lower..upper), |_, _| true)? { + let item = entry?; + for i in 0..item.value().len() { + let p = item.value().get(i); + if !old_root_pages[p.region as usize].is_allocated(p.page_index, p.page_order) { + pending_pages.push(p); } } } - } - drop(freed_pages); - for key in to_remove { - freed_tree.remove(&key)?; + + let mut pagination_counter = 0u64; + while !pending_pages.is_empty() { + let chunk_size = 100; + let buffer_size = FreedPageList::required_bytes(chunk_size); + let key = FreedTableKey { + transaction_id: txn_id, + pagination_id: pagination_counter, + }; + let mut access_guard = + freed_tree.insert_reserve(&key, buffer_size.try_into().unwrap())?; + + let len = pending_pages.len(); + access_guard.as_mut().clear(); + for page in pending_pages.drain(len - min(len, chunk_size)..) { + access_guard.as_mut().push_back(page); + } + drop(access_guard); + + pagination_counter += 1; + } + + txn_id += 1; } - *self.freed_tree.lock().unwrap() = freed_tree; + // 2) free all pages that became unreachable + let mut freed_pages = self.freed_pages.lock().unwrap(); + for i in 0..current_root_pages.len() { + let mut pages = vec![]; + current_root_pages[i].difference(i.try_into().unwrap(), &old_root_pages[i], &mut pages); + for page in pages { + if self.mem.uncommitted(page) { + self.mem.free(page); + } else { + freed_pages.push(page); + } + } + } + drop(freed_pages); - // Invalidate all savepoints that are newer than the one being applied to prevent the user + // 3) Invalidate all savepoints that are newer than the one being applied to prevent the user // from later trying to restore a savepoint "on another timeline" self.transaction_tracker .invalidate_savepoints_after(savepoint.get_id()); - for persistent_savepoint in self.list_persistent_savepoints()? { if persistent_savepoint > savepoint.get_id().0 { self.delete_persistent_savepoint(persistent_savepoint)?; diff --git a/src/tree_store/page_store/buddy_allocator.rs b/src/tree_store/page_store/buddy_allocator.rs index 6f81b4a2..284649f6 100644 --- a/src/tree_store/page_store/buddy_allocator.rs +++ b/src/tree_store/page_store/buddy_allocator.rs @@ -225,34 +225,23 @@ impl BuddyAllocator { result } - pub(crate) fn get_allocated_pages_since_savepoint( + pub(crate) fn difference( &self, region: u32, - state: &[u8], + other: &BuddyAllocator, output: &mut Vec, ) { - let max_order = state[0]; - let num_pages = u32::from_le_bytes(state[1..5].try_into().unwrap()); + let num_pages = other.len(); - let mut data_start = 5 + size_of::() * (max_order as usize + 1); - - for order in 0..=max_order { - let offset_index = 5 + size_of::() * (order as usize); - let data_end = u32::from_le_bytes( - state[offset_index..(offset_index + size_of::())] - .try_into() - .unwrap(), - ) as usize; - let bytes = &state[data_start..data_end]; - let savepoint_allocated = U64GroupedBitmap::from_bytes(bytes); + for order in 0..=self.max_order { + let other_allocated = other.get_order_allocated(order); let self_allocated = self.get_order_allocated(order); - for i in self_allocated.difference(&savepoint_allocated) { + for i in self_allocated.difference(other_allocated) { if i >= num_pages { break; } output.push(PageNumber::new(region, i, order)); } - data_start = data_end; } } diff --git a/src/tree_store/page_store/layout.rs b/src/tree_store/page_store/layout.rs index f0ac5e38..7c8b2a16 100644 --- a/src/tree_store/page_store/layout.rs +++ b/src/tree_store/page_store/layout.rs @@ -83,7 +83,7 @@ impl RegionLayout { } #[derive(Clone, Copy, Debug)] -pub(super) struct DatabaseLayout { +pub(crate) struct DatabaseLayout { full_region_layout: RegionLayout, num_full_regions: u32, trailing_partial_region: Option, diff --git a/src/tree_store/page_store/mod.rs b/src/tree_store/page_store/mod.rs index eb642c8d..331a2ca7 100644 --- a/src/tree_store/page_store/mod.rs +++ b/src/tree_store/page_store/mod.rs @@ -20,5 +20,7 @@ pub use savepoint::Savepoint; pub(crate) use savepoint::SerializedSavepoint; pub(super) use base::{PageImpl, PageMut}; +pub(super) use buddy_allocator::BuddyAllocator; pub(crate) use cached_file::CachePriority; +pub(super) use region::new_allocators; pub(super) use xxh3::hash128_with_seed; diff --git a/src/tree_store/page_store/page_manager.rs b/src/tree_store/page_store/page_manager.rs index 05ccc80c..55001aae 100644 --- a/src/tree_store/page_store/page_manager.rs +++ b/src/tree_store/page_store/page_manager.rs @@ -341,19 +341,6 @@ impl TransactionalMemory { Ok(()) } - // Returns true if the page is beyond the last region: i.e. it no longer exists - pub(crate) fn is_page_out_of_bounds(&self, page: PageNumber) -> bool { - let state = self.state.lock().unwrap(); - page.region as usize >= state.allocators.region_allocators.len() - } - - pub(crate) fn is_allocated(&self, page: PageNumber) -> bool { - let state = self.state.lock().unwrap(); - let allocator = state.get_region(page.region); - - allocator.is_allocated(page.page_index, page.page_order) - } - pub(crate) fn mark_pages_allocated( &self, allocated_pages: impl Iterator>, @@ -423,10 +410,6 @@ impl TransactionalMemory { result } - pub(crate) fn region_tracker_page(&self) -> PageNumber { - self.state.lock().unwrap().header.region_tracker() - } - // Relocates the region tracker to a lower page, if possible // Returns true if the page was moved pub(crate) fn relocate_region_tracker(&self) -> Result { @@ -465,32 +448,6 @@ impl TransactionalMemory { regional_allocators } - // Diffs region_states, which must be the result of calling get_raw_allocator_states(), against - // the currently allocated set of pages - pub(crate) fn pages_allocated_since_raw_state( - &self, - region_states: &[Vec], - ) -> Vec { - let mut result = vec![]; - let state = self.state.lock().unwrap(); - - for i in 0..state.header.layout().num_regions() { - let current_state = state.get_region(i); - if let Some(old_state) = region_states.get(i as usize) { - current_state.get_allocated_pages_since_savepoint(i, old_state, &mut result); - } else { - // This region didn't exist, so everything is newly allocated - current_state.get_allocated_pages(i, &mut result); - } - } - - // Don't include the region tracker, since we manage that internally to the TranscationalMemory - // Otherwise restoring a savepoint would free it. - result.retain(|x| *x != state.header.region_tracker()); - - result - } - // Commit all outstanding changes and make them visible as the primary pub(crate) fn commit( &self, @@ -778,6 +735,10 @@ impl TransactionalMemory { } } + pub(crate) fn get_layout(&self) -> DatabaseLayout { + self.state.lock().unwrap().header.layout() + } + pub(crate) fn get_last_committed_transaction_id(&self) -> Result { let state = self.state.lock().unwrap(); if self.read_from_secondary.load(Ordering::Acquire) { diff --git a/src/tree_store/page_store/region.rs b/src/tree_store/page_store/region.rs index 147a87a6..37f8c8ef 100644 --- a/src/tree_store/page_store/region.rs +++ b/src/tree_store/page_store/region.rs @@ -112,6 +112,19 @@ impl RegionTracker { } } +pub(crate) fn new_allocators(layout: DatabaseLayout) -> Vec { + let mut result = vec![]; + for i in 0..layout.num_regions() { + let region_layout = layout.region_layout(i); + let allocator = BuddyAllocator::new( + region_layout.num_pages(), + layout.full_region_layout().num_pages(), + ); + result.push(allocator); + } + result +} + pub(super) struct Allocators { pub(super) region_tracker: RegionTracker, pub(super) region_allocators: Vec, @@ -138,6 +151,16 @@ impl Allocators { } } + // TODO: remove this at some point. It is useful for debugging though. + #[allow(dead_code)] + pub(super) fn print_all_allocated(&self) { + let mut pages = vec![]; + for (i, allocator) in self.region_allocators.iter().enumerate() { + allocator.get_allocated_pages(i.try_into().unwrap(), &mut pages); + } + println!("Allocated pages: {pages:?}"); + } + pub(crate) fn xxh3_hash(&self) -> u128 { let mut result = xxh3_checksum(&self.region_tracker.to_vec()); for allocator in self.region_allocators.iter() { diff --git a/src/tree_store/page_store/savepoint.rs b/src/tree_store/page_store/savepoint.rs index 5d4d9b72..72e3e3a3 100644 --- a/src/tree_store/page_store/savepoint.rs +++ b/src/tree_store/page_store/savepoint.rs @@ -82,10 +82,6 @@ impl Savepoint { self.freed_root } - pub(crate) fn get_regional_allocator_states(&self) -> &[Vec] { - &self.regional_allocators - } - pub(crate) fn db_address(&self) -> *const TransactionTracker { self.transaction_tracker.as_ref() as *const _ } diff --git a/src/tree_store/table_tree.rs b/src/tree_store/table_tree.rs index b9e2051f..b4c6cb1d 100644 --- a/src/tree_store/table_tree.rs +++ b/src/tree_store/table_tree.rs @@ -1,18 +1,20 @@ use crate::db::TransactionGuard; use crate::error::TableError; use crate::multimap_table::{ - finalize_tree_and_subtree_checksums, multimap_btree_stats, verify_tree_and_subtree_checksums, + finalize_tree_and_subtree_checksums, multimap_btree_stats, parse_subtree_roots, + verify_tree_and_subtree_checksums, DynamicCollection, }; use crate::tree_store::btree::{btree_stats, UntypedBtreeMut}; use crate::tree_store::btree_base::BtreeHeader; use crate::tree_store::btree_iters::AllPageNumbersBtreeIter; +use crate::tree_store::page_store::{new_allocators, BuddyAllocator}; use crate::tree_store::{ Btree, BtreeMut, BtreeRangeIter, PageHint, PageNumber, RawBtree, TransactionalMemory, }; use crate::types::{Key, MutInPlaceValue, TypeName, Value}; use crate::{DatabaseStats, Result}; use std::cmp::max; -use std::collections::{HashMap, HashSet}; +use std::collections::HashMap; use std::mem; use std::mem::size_of; use std::ops::RangeFull; @@ -556,12 +558,14 @@ impl<'txn> TableTreeMut<'txn> { } } - pub(crate) fn all_referenced_pages(&self) -> Result> { + pub(crate) fn all_referenced_pages(&self) -> Result> { + let mut result = new_allocators(self.mem.get_layout()); + // All the pages in the table tree itself - let mut result = HashSet::new(); if let Some(iter) = self.tree.all_pages_iter()? { for page in iter { - result.insert(page?); + let page = page?; + result[page.region as usize].record_alloc(page.page_index, page.page_order); } } @@ -580,13 +584,57 @@ impl<'txn> TableTreeMut<'txn> { )?; for page in table_pages_iter { - result.insert(page?); + let page = page?; + result[page.region as usize].record_alloc(page.page_index, page.page_order); } } } - if !self.list_tables(TableType::Multimap)?.is_empty() { - unimplemented!("Walking all multimap references is not currently supported"); + for entry in self.list_tables(TableType::Multimap)? { + let definition = self + .get_table_untyped(&entry, TableType::Multimap) + .map_err(|e| e.into_storage_error_or_corrupted("Internal corruption"))? + .unwrap(); + if let Some(header) = definition.get_root() { + let table_pages_iter = AllPageNumbersBtreeIter::new( + header.root, + definition.get_fixed_key_size(), + DynamicCollection::<()>::fixed_width_with(definition.get_fixed_value_size()), + self.mem.clone(), + )?; + for page in table_pages_iter { + let page = page?; + result[page.region as usize].record_alloc(page.page_index, page.page_order); + } + + let table_pages_iter = AllPageNumbersBtreeIter::new( + header.root, + definition.get_fixed_key_size(), + DynamicCollection::<()>::fixed_width_with(definition.get_fixed_value_size()), + self.mem.clone(), + )?; + for table_page in table_pages_iter { + let page = self.mem.get_page(table_page?)?; + let subtree_roots = parse_subtree_roots( + &page, + definition.get_fixed_key_size(), + definition.get_fixed_value_size(), + ); + for subtree_header in subtree_roots { + let sub_root_iter = AllPageNumbersBtreeIter::new( + subtree_header.root, + definition.get_fixed_value_size(), + <()>::fixed_width(), + self.mem.clone(), + )?; + for page in sub_root_iter { + let page = page?; + result[page.region as usize] + .record_alloc(page.page_index, page.page_order); + } + } + } + } } Ok(result)