From 51ca54cd913a075a3bd0acd8793f198aa6e12c29 Mon Sep 17 00:00:00 2001 From: Christopher Berner Date: Tue, 24 Dec 2024 20:42:32 -0800 Subject: [PATCH] Optimize restore_savepoint() This restores and fixes the optimization that was removed in baa86e7fe1b6b07acd0568f504d49404a19710d6 restore_savepoint() now scales with the number of database modifications since the savepoint was captured, rather than the size of the database savepoint_benchmark is now ~35x faster for an 8GiB database file --- src/transactions.rs | 171 +++++++++++++------ src/tree_store/btree.rs | 21 ++- src/tree_store/mod.rs | 4 +- src/tree_store/page_store/bitmap.rs | 1 - src/tree_store/page_store/buddy_allocator.rs | 56 +++++- src/tree_store/page_store/mod.rs | 3 +- src/tree_store/page_store/page_manager.rs | 30 +++- src/tree_store/page_store/region.rs | 13 -- src/tree_store/page_store/savepoint.rs | 15 +- src/tree_store/table_tree.rs | 44 +++-- 10 files changed, 265 insertions(+), 93 deletions(-) diff --git a/src/transactions.rs b/src/transactions.rs index 78a83f33..385e881b 100644 --- a/src/transactions.rs +++ b/src/transactions.rs @@ -5,9 +5,9 @@ use crate::sealed::Sealed; use crate::table::ReadOnlyUntypedTable; use crate::transaction_tracker::{SavepointId, TransactionId, TransactionTracker}; use crate::tree_store::{ - Btree, BtreeHeader, BtreeMut, FreedPageList, FreedTableKey, InternalTableDefinition, Page, - PageHint, PageNumber, SerializedSavepoint, TableTree, TableTreeMut, TableType, - TransactionalMemory, MAX_PAIR_LENGTH, MAX_VALUE_LENGTH, + Btree, BtreeHeader, BtreeMut, BuddyAllocator, FreedPageList, FreedTableKey, + InternalTableDefinition, Page, PageHint, PageNumber, SerializedSavepoint, TableTree, + TableTreeMut, TableType, TransactionalMemory, MAX_PAIR_LENGTH, MAX_VALUE_LENGTH, }; use crate::types::{Key, Value}; use crate::{ @@ -25,7 +25,6 @@ use std::fmt::{Debug, Display, Formatter}; use std::marker::PhantomData; use std::mem::size_of; use std::ops::RangeBounds; -#[cfg(any(test, fuzzing))] use std::ops::RangeFull; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex}; @@ -563,34 +562,32 @@ impl WriteTransaction { println!("Tracker page"); println!("{tracker:?}"); - let table_allocators = self - .tables + let mut table_pages = vec![]; + self.tables .lock() .unwrap() .table_tree - .all_referenced_pages() + .visit_all_pages(|path| { + table_pages.push(path.page_number()); + Ok(()) + }) .unwrap(); - let mut table_pages = vec![]; - for (i, allocator) in table_allocators.iter().enumerate() { - allocator.get_allocated_pages(i.try_into().unwrap(), &mut table_pages); - } println!("Tables"); for p in table_pages { all_allocated.remove(&p); println!("{p:?}"); } - let system_table_allocators = self - .system_tables + let mut system_table_pages = vec![]; + self.system_tables .lock() .unwrap() .table_tree - .all_referenced_pages() + .visit_all_pages(|path| { + system_table_pages.push(path.page_number()); + Ok(()) + }) .unwrap(); - let mut system_table_pages = vec![]; - for (i, allocator) in system_table_allocators.iter().enumerate() { - allocator.get_allocated_pages(i.try_into().unwrap(), &mut system_table_pages); - } println!("System tables"); for p in system_table_pages { all_allocated.remove(&p); @@ -837,31 +834,51 @@ impl WriteTransaction { // and new roots // 3) update the system tree to remove invalid persistent savepoints. - let old_table_tree = TableTreeMut::new( - savepoint.get_user_root(), + let old_system_tree = TableTree::new( + savepoint.get_system_root(), + PageHint::None, self.transaction_guard.clone(), self.mem.clone(), - self.freed_pages.clone(), - ); - // TODO: traversing these can be very slow in a large database. Speed this up. - let current_root_pages = self - .tables - .lock() - .unwrap() - .table_tree - .all_referenced_pages()?; - let old_root_pages = old_table_tree.all_referenced_pages()?; - - // 1) restore the table tree - self.tables.lock().unwrap().table_tree = TableTreeMut::new( - savepoint.get_user_root(), + )?; + let old_freed_tree: Btree> = Btree::new( + savepoint.get_freed_root(), + PageHint::None, self.transaction_guard.clone(), self.mem.clone(), - self.freed_pages.clone(), - ); + )?; - // 1a) filter any pages referenced by the old data root to bring them back to the committed state - let mut txn_id = savepoint.get_transaction_id().raw_id(); + // Pages which are part of the system and freed trees in the savepoint, should be freed + // even after the savepoint is restored, because the system and freed trees only roll + // forward + let mut old_system_and_freed_pages = HashSet::new(); + old_system_tree.visit_all_pages(|path| { + old_system_and_freed_pages.insert(path.page_number()); + Ok(()) + })?; + old_freed_tree.visit_all_pages(|path| { + old_system_and_freed_pages.insert(path.page_number()); + Ok(()) + })?; + + // 1) restore the table tree + { + self.tables.lock().unwrap().table_tree = TableTreeMut::new( + savepoint.get_user_root(), + self.transaction_guard.clone(), + self.mem.clone(), + self.freed_pages.clone(), + ); + } + + // 1a) purge all transactions that happened after the savepoint from freed tree, + // except pages from the old system or freed tree in the savepoint. Those still need to be + // freed, since the system tree only rolls forward, never back. This brings all pages in the + // old data root back to the committed state + // This operation will also leak everything else that was allocated since the savepoint, + // but we fix that below -- noting that all the system trees that existed between the savepoint + // and now which might be referenced by other savepoints will become unreachable, since those + // savepoints are invalidated by this restoration + let mut txn_id = savepoint.get_transaction_id().next().raw_id(); let mut freed_tree = self.freed_tree.lock().unwrap(); loop { let lower = FreedTableKey { @@ -887,7 +904,8 @@ impl WriteTransaction { let item = entry?; for i in 0..item.value().len() { let p = item.value().get(i); - if !old_root_pages[p.region as usize].is_allocated(p.page_index, p.page_order) { + // Keep the old system and freed tree pages, but purge anything else + if old_system_and_freed_pages.contains(&p) { pending_pages.push(p); } } @@ -917,19 +935,76 @@ impl WriteTransaction { txn_id += 1; } - // 2) free all pages that became unreachable - let mut freed_pages = self.freed_pages.lock().unwrap(); - for i in 0..current_root_pages.len() { - let mut pages = vec![]; - current_root_pages[i].difference(i.try_into().unwrap(), &old_root_pages[i], &mut pages); - for page in pages { - if self.mem.uncommitted(page) { - self.mem.free(page); + let mut current_system_and_freed_pages = HashSet::new(); + self.system_tables + .lock() + .unwrap() + .table_tree + .visit_all_pages(|path| { + current_system_and_freed_pages.insert(path.page_number()); + Ok(()) + })?; + freed_tree.visit_all_pages(|path| { + current_system_and_freed_pages.insert(path.page_number()); + Ok(()) + })?; + + let mut old_allocators: Vec = savepoint + .get_regional_allocators() + .iter() + .map(|data| BuddyAllocator::from_savepoint_state(data)) + .collect(); + + // Find the oldest transaction in the current freed tree, for use below + { + let oldest_unprocessed_transaction = + if let Some(entry) = freed_tree.range::(&(..))?.next() { + entry?.key().transaction_id } else { - freed_pages.push(page); + self.transaction_id.raw_id() + }; + + let lookup_key = FreedTableKey { + transaction_id: oldest_unprocessed_transaction, + pagination_id: 0, + }; + + // Replay all finalized frees into the old allocator state to ensure that a page which + // was pending free, freed, and then reallocated does not leak + for entry in old_freed_tree.range(&(..lookup_key))? { + let item = entry?; + let pages: FreedPageList = item.value(); + for i in 0..pages.len() { + let page = pages.get(i); + assert!(old_allocators[page.region as usize] + .is_allocated(page.page_index, page.page_order)); + old_allocators[page.region as usize].free(page.page_index, page.page_order); } } } + + // 2) free all pages that became unreachable + let mut freed_pages = self.freed_pages.lock().unwrap(); + let mut already_awaiting_free: HashSet = freed_pages.iter().copied().collect(); + already_awaiting_free.extend(self.post_commit_frees.lock().unwrap().iter().copied()); + let to_free = self.mem.pages_allocated_since_raw_state(&old_allocators); + for page in to_free { + if already_awaiting_free.contains(&page) { + // Make sure that we don't double free something that is already going to be freed + continue; + } + if current_system_and_freed_pages.contains(&page) { + // Don't free pages which are part of the current system or freed tree, even though + // these pages are new. Again this is because these trees only move forward; + // never backwards as part of a savepoint restore + continue; + } + if self.mem.uncommitted(page) { + self.mem.free(page); + } else { + freed_pages.push(page); + } + } drop(freed_pages); // 3) Invalidate all savepoints that are newer than the one being applied to prevent the user diff --git a/src/tree_store/btree.rs b/src/tree_store/btree.rs index 541e9e3d..a675bd1a 100644 --- a/src/tree_store/btree.rs +++ b/src/tree_store/btree.rs @@ -383,13 +383,7 @@ impl BtreeMut<'_, K, V> { where F: FnMut(&PagePath) -> Result, { - let tree = UntypedBtree::new( - self.root, - self.mem.clone(), - K::fixed_width(), - V::fixed_width(), - ); - tree.visit_all_pages(visitor) + self.read_tree()?.visit_all_pages(visitor) } pub(crate) fn get_root(&self) -> Option { @@ -721,6 +715,19 @@ impl Btree { self.root } + pub(crate) fn visit_all_pages(&self, visitor: F) -> Result + where + F: FnMut(&PagePath) -> Result, + { + let tree = UntypedBtree::new( + self.root, + self.mem.clone(), + K::fixed_width(), + V::fixed_width(), + ); + tree.visit_all_pages(visitor) + } + pub(crate) fn get(&self, key: &K::SelfType<'_>) -> Result>> { if let Some(ref root_page) = self.cached_root { self.get_helper(root_page.clone(), K::as_bytes(key).as_ref()) diff --git a/src/tree_store/mod.rs b/src/tree_store/mod.rs index 7181dcd8..d8fb81d6 100644 --- a/src/tree_store/mod.rs +++ b/src/tree_store/mod.rs @@ -17,8 +17,8 @@ pub(crate) use btree_base::{ pub(crate) use btree_iters::{AllPageNumbersBtreeIter, BtreeExtractIf, BtreeRangeIter}; pub use page_store::{file_backend, InMemoryBackend, Savepoint}; pub(crate) use page_store::{ - Page, PageHint, PageNumber, SerializedSavepoint, TransactionalMemory, FILE_FORMAT_VERSION2, - MAX_PAIR_LENGTH, MAX_VALUE_LENGTH, PAGE_SIZE, + BuddyAllocator, Page, PageHint, PageNumber, SerializedSavepoint, TransactionalMemory, + FILE_FORMAT_VERSION2, MAX_PAIR_LENGTH, MAX_VALUE_LENGTH, PAGE_SIZE, }; pub(crate) use table_tree::{FreedPageList, FreedTableKey, TableTree, TableTreeMut}; pub(crate) use table_tree_base::{InternalTableDefinition, TableType}; diff --git a/src/tree_store/page_store/bitmap.rs b/src/tree_store/page_store/bitmap.rs index 7acb9b2c..ab64bc5a 100644 --- a/src/tree_store/page_store/bitmap.rs +++ b/src/tree_store/page_store/bitmap.rs @@ -346,7 +346,6 @@ impl U64GroupedBitmap { U64GroupedBitmapDifference::new(&self.data, &exclusion.data) } - #[allow(dead_code)] pub fn iter(&self) -> U64GroupedBitmapIter { U64GroupedBitmapIter::new(self.len, &self.data) } diff --git a/src/tree_store/page_store/buddy_allocator.rs b/src/tree_store/page_store/buddy_allocator.rs index 99361d16..a2a31cb2 100644 --- a/src/tree_store/page_store/buddy_allocator.rs +++ b/src/tree_store/page_store/buddy_allocator.rs @@ -202,6 +202,43 @@ impl BuddyAllocator { free_pages } + // Inverse of make_state_for_savepoint() + pub(crate) fn from_savepoint_state(data: &[u8]) -> Self { + let mut offset = 0; + let max_order = data[offset]; + offset += 1; + let len = u32::from_le_bytes( + data[offset..(offset + size_of::())] + .try_into() + .unwrap(), + ); + offset += size_of::(); + + let mut data_start = offset + size_of::() * (max_order as usize + 1); + let mut allocated_sets = vec![]; + for _ in 0..=max_order { + let data_end = u32::from_le_bytes( + data[offset..(offset + size_of::())] + .try_into() + .unwrap(), + ) as usize; + offset += size_of::(); + allocated_sets.push(U64GroupedBitmap::from_bytes(&data[data_start..data_end])); + data_start = data_end; + } + assert_eq!(data_start, data.len()); + + let mut result = Self::new(len, allocated_sets[0].capacity()); + + for (order, allocated) in allocated_sets.iter().enumerate() { + for page_number in allocated.iter() { + result.record_alloc(page_number, order.try_into().unwrap()); + } + } + + result + } + // Reduced state for savepoint, which includes only the list of allocated pages // Format: // 1 byte: max order @@ -246,7 +283,6 @@ impl BuddyAllocator { } } - #[cfg(any(test, fuzzing))] pub(crate) fn get_allocated_pages(&self, region: u32, output: &mut Vec) { for order in 0..=self.max_order { let allocated = self.get_order_allocated(order); @@ -612,6 +648,24 @@ mod test { assert_eq!(allocator.count_allocated_pages(), 0); } + #[test] + fn make_savepoint_state() { + let num_pages = 256; + let mut allocator = BuddyAllocator::new(num_pages, num_pages); + + // Allocate some arbitrary stuff + allocator.record_alloc(7, 0); + allocator.alloc(0); + allocator.alloc(1); + allocator.alloc(3); + allocator.alloc(0); + allocator.alloc(0); + + let allocator2 = + BuddyAllocator::from_savepoint_state(&allocator.make_state_for_savepoint()); + assert_eq!(allocator.to_vec(), allocator2.to_vec()); + } + #[test] fn serialized_size() { // Check that serialized size is as expected for a full region diff --git a/src/tree_store/page_store/mod.rs b/src/tree_store/page_store/mod.rs index 143c2da7..13362d9e 100644 --- a/src/tree_store/page_store/mod.rs +++ b/src/tree_store/page_store/mod.rs @@ -21,6 +21,5 @@ pub use savepoint::Savepoint; pub(crate) use savepoint::SerializedSavepoint; pub(super) use base::{PageImpl, PageMut}; -pub(super) use buddy_allocator::BuddyAllocator; -pub(super) use region::new_allocators; +pub(crate) use buddy_allocator::BuddyAllocator; pub(super) use xxh3::hash128_with_seed; diff --git a/src/tree_store/page_store/page_manager.rs b/src/tree_store/page_store/page_manager.rs index 5b7bd148..d7073c9b 100644 --- a/src/tree_store/page_store/page_manager.rs +++ b/src/tree_store/page_store/page_manager.rs @@ -606,6 +606,32 @@ impl TransactionalMemory { } } + // Diffs region_states, which must be derived from get_raw_allocator_states(), against + // the currently allocated set of pages + pub(crate) fn pages_allocated_since_raw_state( + &self, + old_states: &[BuddyAllocator], + ) -> Vec { + let mut result = vec![]; + let state = self.state.lock().unwrap(); + + for i in 0..state.header.layout().num_regions() { + let current_state = state.get_region(i); + if let Some(old_state) = old_states.get(i as usize) { + current_state.difference(i, old_state, &mut result); + } else { + // This region didn't exist, so everything is newly allocated + current_state.get_allocated_pages(i, &mut result); + } + } + + // Don't include the region tracker, since we manage that internally to the TranscationalMemory + // Otherwise restoring a savepoint would free it. + result.retain(|x| *x != state.header.region_tracker()); + + result + } + pub(crate) fn get_raw_allocator_states(&self) -> Vec> { let state = self.state.lock().unwrap(); @@ -902,10 +928,6 @@ impl TransactionalMemory { } } - pub(crate) fn get_layout(&self) -> DatabaseLayout { - self.state.lock().unwrap().header.layout() - } - pub(crate) fn get_last_committed_transaction_id(&self) -> Result { let state = self.state.lock().unwrap(); if self.read_from_secondary.load(Ordering::Acquire) { diff --git a/src/tree_store/page_store/region.rs b/src/tree_store/page_store/region.rs index 44b88e43..e0ea9f42 100644 --- a/src/tree_store/page_store/region.rs +++ b/src/tree_store/page_store/region.rs @@ -112,19 +112,6 @@ impl RegionTracker { } } -pub(crate) fn new_allocators(layout: DatabaseLayout) -> Vec { - let mut result = vec![]; - for i in 0..layout.num_regions() { - let region_layout = layout.region_layout(i); - let allocator = BuddyAllocator::new( - region_layout.num_pages(), - layout.full_region_layout().num_pages(), - ); - result.push(allocator); - } - result -} - pub(super) struct Allocators { pub(super) region_tracker: RegionTracker, pub(super) region_allocators: Vec, diff --git a/src/tree_store/page_store/savepoint.rs b/src/tree_store/page_store/savepoint.rs index 534fedfb..6c42a12d 100644 --- a/src/tree_store/page_store/savepoint.rs +++ b/src/tree_store/page_store/savepoint.rs @@ -29,7 +29,6 @@ pub struct Savepoint { // are not freed transaction_id: TransactionId, user_root: Option, - // For future use. This is not used in the restoration protocol. system_root: Option, freed_root: Option, regional_allocators: Vec>, @@ -78,6 +77,18 @@ impl Savepoint { self.user_root } + pub(crate) fn get_system_root(&self) -> Option { + self.system_root + } + + pub(crate) fn get_freed_root(&self) -> Option { + self.freed_root + } + + pub(crate) fn get_regional_allocators(&self) -> &Vec> { + &self.regional_allocators + } + pub(crate) fn db_address(&self) -> *const TransactionTracker { std::ptr::from_ref(self.transaction_tracker.as_ref()) } @@ -147,6 +158,8 @@ impl<'a> SerializedSavepoint<'a> { .to_le_bytes(), ); + // TODO: this seems like it is going to fail on databases with > 3GiB of regional allocators + // since that is the max size of a single stored value for region in &savepoint.regional_allocators { result.extend(region); } diff --git a/src/tree_store/table_tree.rs b/src/tree_store/table_tree.rs index 0d6d26a1..99bd4645 100644 --- a/src/tree_store/table_tree.rs +++ b/src/tree_store/table_tree.rs @@ -5,7 +5,6 @@ use crate::multimap_table::{ }; use crate::tree_store::btree::{btree_stats, UntypedBtreeMut}; use crate::tree_store::btree_base::BtreeHeader; -use crate::tree_store::page_store::{new_allocators, BuddyAllocator}; use crate::tree_store::{ Btree, BtreeMut, BtreeRangeIter, InternalTableDefinition, PageHint, PageNumber, PagePath, RawBtree, TableType, TransactionalMemory, @@ -196,6 +195,7 @@ impl Iterator for TableNameIter { pub(crate) struct TableTree { tree: Btree<&'static str, InternalTableDefinition>, + mem: Arc, } impl TableTree { @@ -206,7 +206,8 @@ impl TableTree { mem: Arc, ) -> Result { Ok(Self { - tree: Btree::new(master_root, page_hint, guard, mem)?, + tree: Btree::new(master_root, page_hint, guard, mem.clone())?, + mem, }) } @@ -258,6 +259,33 @@ impl TableTree { }, ) } + + pub(crate) fn visit_all_pages(&self, mut visitor: F) -> Result + where + F: FnMut(&PagePath) -> Result, + { + // All the pages in the table tree itself + self.tree.visit_all_pages(&mut visitor)?; + + // All the normal tables + for entry in self.list_tables(TableType::Normal)? { + let definition = self + .get_table_untyped(&entry, TableType::Normal) + .map_err(|e| e.into_storage_error_or_corrupted("Internal corruption"))? + .unwrap(); + definition.visit_all_pages(self.mem.clone(), |path| visitor(path))?; + } + + for entry in self.list_tables(TableType::Multimap)? { + let definition = self + .get_table_untyped(&entry, TableType::Multimap) + .map_err(|e| e.into_storage_error_or_corrupted("Internal corruption"))? + .unwrap(); + definition.visit_all_pages(self.mem.clone(), |path| visitor(path))?; + } + + Ok(()) + } } pub(crate) struct TableTreeMut<'txn> { @@ -285,18 +313,6 @@ impl<'txn> TableTreeMut<'txn> { } } - pub(crate) fn all_referenced_pages(&self) -> Result> { - let mut result = new_allocators(self.mem.get_layout()); - - self.visit_all_pages(|path| { - let page = path.page_number(); - result[page.region as usize].record_alloc(page.page_index, page.page_order); - Ok(()) - })?; - - Ok(result) - } - pub(crate) fn visit_all_pages(&self, mut visitor: F) -> Result where F: FnMut(&PagePath) -> Result,