diff --git a/src/transactions.rs b/src/transactions.rs index 369b2c60..c0f69071 100644 --- a/src/transactions.rs +++ b/src/transactions.rs @@ -23,7 +23,7 @@ use std::cmp::min; use std::collections::{HashMap, HashSet}; use std::fmt::{Debug, Display, Formatter}; use std::marker::PhantomData; -use std::ops::{RangeBounds, RangeFull}; +use std::ops::RangeBounds; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex}; use std::{panic, thread}; @@ -694,56 +694,28 @@ impl WriteTransaction { // Restoring a savepoint needs to accomplish the following: // 1) restore the table tree. This is trivial, since we have the old root - // 2) update the system tree to remove invalid persistent savepoints. - // 3) free all pages that were allocated since the savepoint and are unreachable - // from the restored table tree root. This is non-trivial, since we want to avoid walking - // the entire table tree, and we cannot simply restore the old allocator state as there - // could be read transactions that reference this data. - // 3a) First we free all allocated pages that were not allocated in the savepoint, - // except those referenced by the system table - // 3b) We re-process the freed table referenced by the savepoint, but ignore any pages that - // are already free - - let allocated_since_savepoint = self - .mem - .pages_allocated_since_raw_state(savepoint.get_regional_allocator_states()); - - // We don't want to rollback the system tree, so keep any pages it references - let mut whitelist = self - .system_tables + // 1a) we also filter the freed tree to remove any pages referenced by the old root + // 2) free all pages that were allocated since the savepoint and are unreachable + // from the restored table tree root. Here we diff the reachable pages from the old + // and new roots + // 3) update the system tree to remove invalid persistent savepoints. + + let old_table_tree = TableTreeMut::new( + savepoint.get_user_root(), + self.transaction_guard.clone(), + self.mem.clone(), + self.freed_pages.clone(), + ); + // TODO: traversing these can be very slow in a large database. Speed this up. + let current_root_pages = self + .tables .lock() .unwrap() .table_tree .all_referenced_pages()?; - // The tracker page could have changed too. Don't erase it. - whitelist.insert(self.mem.region_tracker_page()); + let old_root_pages = old_table_tree.all_referenced_pages()?; - // Find the oldest transaction in the current freed tree, for use below. We do this before - // freeing pages to ensure that this tree is still valid - let oldest_unprocessed_transaction = if let Some(entry) = self - .freed_tree - .lock() - .unwrap() - .range::(&(..))? - .next() - { - entry?.key().transaction_id - } else { - self.transaction_id.raw_id() - }; - - let mut freed_pages = vec![]; - for page in allocated_since_savepoint { - if whitelist.contains(&page) { - continue; - } - if self.mem.uncommitted(page) { - self.mem.free(page); - } else { - freed_pages.push(page); - } - } - *self.freed_pages.lock().unwrap() = freed_pages; + // 1) restore the table tree self.tables.lock().unwrap().table_tree = TableTreeMut::new( savepoint.get_user_root(), self.transaction_guard.clone(), @@ -751,57 +723,82 @@ impl WriteTransaction { self.freed_pages.clone(), ); - // Remove any freed pages that have already been processed. Otherwise this would result in a double free - let mut freed_tree = BtreeMut::new( - savepoint.get_freed_root(), - self.transaction_guard.clone(), - self.mem.clone(), - self.post_commit_frees.clone(), - ); - let lookup_key = FreedTableKey { - transaction_id: oldest_unprocessed_transaction, - pagination_id: 0, - }; - let mut freed_pages = self.freed_pages.lock().unwrap(); - let mut freed_pages_hash = HashSet::new(); - freed_pages_hash.extend(freed_pages.iter()); - let mut to_remove = vec![]; - for entry in freed_tree.range(&(..lookup_key))? { - let item = entry?; - to_remove.push(item.key()); - let pages: FreedPageList = item.value(); - for i in 0..pages.len() { - let page = pages.get(i); - if self.mem.is_page_out_of_bounds(page) { - // Page no longer exists because the database file shrank - continue; - } - // Re-process the freed pages, but ignore any that would be double-frees - if self.mem.is_allocated(page) - && !freed_pages_hash.contains(&page) - && !whitelist.contains(&page) - { - if self.mem.uncommitted(page) { - self.mem.free(page); - } else { - freed_pages.push(page); - freed_pages_hash.insert(page); + // 1a) filter any pages referenced by the old data root to bring them back to the committed state + let mut txn_id = savepoint.get_transaction_id().raw_id(); + let mut freed_tree = self.freed_tree.lock().unwrap(); + loop { + let lower = FreedTableKey { + transaction_id: txn_id, + pagination_id: 0, + }; + + if freed_tree.range(&(lower..))?.next().is_none() { + break; + } + let lower = FreedTableKey { + transaction_id: txn_id, + pagination_id: 0, + }; + let upper = FreedTableKey { + transaction_id: txn_id + 1, + pagination_id: 0, + }; + + // Find all the pending pages for this txn and filter them + let mut pending_pages = vec![]; + for entry in freed_tree.extract_from_if(&(lower..upper), |_, _| true)? { + let item = entry?; + for i in 0..item.value().len() { + let p = item.value().get(i); + if !old_root_pages[p.region as usize].is_allocated(p.page_index, p.page_order) { + pending_pages.push(p); } } } - } - drop(freed_pages); - for key in to_remove { - freed_tree.remove(&key)?; + + let mut pagination_counter = 0u64; + while !pending_pages.is_empty() { + let chunk_size = 100; + let buffer_size = FreedPageList::required_bytes(chunk_size); + let key = FreedTableKey { + transaction_id: txn_id, + pagination_id: pagination_counter, + }; + let mut access_guard = + freed_tree.insert_reserve(&key, buffer_size.try_into().unwrap())?; + + let len = pending_pages.len(); + access_guard.as_mut().clear(); + for page in pending_pages.drain(len - min(len, chunk_size)..) { + access_guard.as_mut().push_back(page); + } + drop(access_guard); + + pagination_counter += 1; + } + + txn_id += 1; } - *self.freed_tree.lock().unwrap() = freed_tree; + // 2) free all pages that became unreachable + let mut freed_pages = self.freed_pages.lock().unwrap(); + for i in 0..current_root_pages.len() { + let mut pages = vec![]; + current_root_pages[i].difference(i.try_into().unwrap(), &old_root_pages[i], &mut pages); + for page in pages { + if self.mem.uncommitted(page) { + self.mem.free(page); + } else { + freed_pages.push(page); + } + } + } + drop(freed_pages); - // Invalidate all savepoints that are newer than the one being applied to prevent the user + // 3) Invalidate all savepoints that are newer than the one being applied to prevent the user // from later trying to restore a savepoint "on another timeline" self.transaction_tracker .invalidate_savepoints_after(savepoint.get_id()); - for persistent_savepoint in self.list_persistent_savepoints()? { if persistent_savepoint > savepoint.get_id().0 { self.delete_persistent_savepoint(persistent_savepoint)?; diff --git a/src/tree_store/page_store/buddy_allocator.rs b/src/tree_store/page_store/buddy_allocator.rs index 6f81b4a2..284649f6 100644 --- a/src/tree_store/page_store/buddy_allocator.rs +++ b/src/tree_store/page_store/buddy_allocator.rs @@ -225,34 +225,23 @@ impl BuddyAllocator { result } - pub(crate) fn get_allocated_pages_since_savepoint( + pub(crate) fn difference( &self, region: u32, - state: &[u8], + other: &BuddyAllocator, output: &mut Vec, ) { - let max_order = state[0]; - let num_pages = u32::from_le_bytes(state[1..5].try_into().unwrap()); + let num_pages = other.len(); - let mut data_start = 5 + size_of::() * (max_order as usize + 1); - - for order in 0..=max_order { - let offset_index = 5 + size_of::() * (order as usize); - let data_end = u32::from_le_bytes( - state[offset_index..(offset_index + size_of::())] - .try_into() - .unwrap(), - ) as usize; - let bytes = &state[data_start..data_end]; - let savepoint_allocated = U64GroupedBitmap::from_bytes(bytes); + for order in 0..=self.max_order { + let other_allocated = other.get_order_allocated(order); let self_allocated = self.get_order_allocated(order); - for i in self_allocated.difference(&savepoint_allocated) { + for i in self_allocated.difference(other_allocated) { if i >= num_pages { break; } output.push(PageNumber::new(region, i, order)); } - data_start = data_end; } } diff --git a/src/tree_store/page_store/layout.rs b/src/tree_store/page_store/layout.rs index f0ac5e38..7c8b2a16 100644 --- a/src/tree_store/page_store/layout.rs +++ b/src/tree_store/page_store/layout.rs @@ -83,7 +83,7 @@ impl RegionLayout { } #[derive(Clone, Copy, Debug)] -pub(super) struct DatabaseLayout { +pub(crate) struct DatabaseLayout { full_region_layout: RegionLayout, num_full_regions: u32, trailing_partial_region: Option, diff --git a/src/tree_store/page_store/mod.rs b/src/tree_store/page_store/mod.rs index eb642c8d..331a2ca7 100644 --- a/src/tree_store/page_store/mod.rs +++ b/src/tree_store/page_store/mod.rs @@ -20,5 +20,7 @@ pub use savepoint::Savepoint; pub(crate) use savepoint::SerializedSavepoint; pub(super) use base::{PageImpl, PageMut}; +pub(super) use buddy_allocator::BuddyAllocator; pub(crate) use cached_file::CachePriority; +pub(super) use region::new_allocators; pub(super) use xxh3::hash128_with_seed; diff --git a/src/tree_store/page_store/page_manager.rs b/src/tree_store/page_store/page_manager.rs index 05ccc80c..55001aae 100644 --- a/src/tree_store/page_store/page_manager.rs +++ b/src/tree_store/page_store/page_manager.rs @@ -341,19 +341,6 @@ impl TransactionalMemory { Ok(()) } - // Returns true if the page is beyond the last region: i.e. it no longer exists - pub(crate) fn is_page_out_of_bounds(&self, page: PageNumber) -> bool { - let state = self.state.lock().unwrap(); - page.region as usize >= state.allocators.region_allocators.len() - } - - pub(crate) fn is_allocated(&self, page: PageNumber) -> bool { - let state = self.state.lock().unwrap(); - let allocator = state.get_region(page.region); - - allocator.is_allocated(page.page_index, page.page_order) - } - pub(crate) fn mark_pages_allocated( &self, allocated_pages: impl Iterator>, @@ -423,10 +410,6 @@ impl TransactionalMemory { result } - pub(crate) fn region_tracker_page(&self) -> PageNumber { - self.state.lock().unwrap().header.region_tracker() - } - // Relocates the region tracker to a lower page, if possible // Returns true if the page was moved pub(crate) fn relocate_region_tracker(&self) -> Result { @@ -465,32 +448,6 @@ impl TransactionalMemory { regional_allocators } - // Diffs region_states, which must be the result of calling get_raw_allocator_states(), against - // the currently allocated set of pages - pub(crate) fn pages_allocated_since_raw_state( - &self, - region_states: &[Vec], - ) -> Vec { - let mut result = vec![]; - let state = self.state.lock().unwrap(); - - for i in 0..state.header.layout().num_regions() { - let current_state = state.get_region(i); - if let Some(old_state) = region_states.get(i as usize) { - current_state.get_allocated_pages_since_savepoint(i, old_state, &mut result); - } else { - // This region didn't exist, so everything is newly allocated - current_state.get_allocated_pages(i, &mut result); - } - } - - // Don't include the region tracker, since we manage that internally to the TranscationalMemory - // Otherwise restoring a savepoint would free it. - result.retain(|x| *x != state.header.region_tracker()); - - result - } - // Commit all outstanding changes and make them visible as the primary pub(crate) fn commit( &self, @@ -778,6 +735,10 @@ impl TransactionalMemory { } } + pub(crate) fn get_layout(&self) -> DatabaseLayout { + self.state.lock().unwrap().header.layout() + } + pub(crate) fn get_last_committed_transaction_id(&self) -> Result { let state = self.state.lock().unwrap(); if self.read_from_secondary.load(Ordering::Acquire) { diff --git a/src/tree_store/page_store/region.rs b/src/tree_store/page_store/region.rs index 147a87a6..37f8c8ef 100644 --- a/src/tree_store/page_store/region.rs +++ b/src/tree_store/page_store/region.rs @@ -112,6 +112,19 @@ impl RegionTracker { } } +pub(crate) fn new_allocators(layout: DatabaseLayout) -> Vec { + let mut result = vec![]; + for i in 0..layout.num_regions() { + let region_layout = layout.region_layout(i); + let allocator = BuddyAllocator::new( + region_layout.num_pages(), + layout.full_region_layout().num_pages(), + ); + result.push(allocator); + } + result +} + pub(super) struct Allocators { pub(super) region_tracker: RegionTracker, pub(super) region_allocators: Vec, @@ -138,6 +151,16 @@ impl Allocators { } } + // TODO: remove this at some point. It is useful for debugging though. + #[allow(dead_code)] + pub(super) fn print_all_allocated(&self) { + let mut pages = vec![]; + for (i, allocator) in self.region_allocators.iter().enumerate() { + allocator.get_allocated_pages(i.try_into().unwrap(), &mut pages); + } + println!("Allocated pages: {pages:?}"); + } + pub(crate) fn xxh3_hash(&self) -> u128 { let mut result = xxh3_checksum(&self.region_tracker.to_vec()); for allocator in self.region_allocators.iter() { diff --git a/src/tree_store/page_store/savepoint.rs b/src/tree_store/page_store/savepoint.rs index 5d4d9b72..72e3e3a3 100644 --- a/src/tree_store/page_store/savepoint.rs +++ b/src/tree_store/page_store/savepoint.rs @@ -82,10 +82,6 @@ impl Savepoint { self.freed_root } - pub(crate) fn get_regional_allocator_states(&self) -> &[Vec] { - &self.regional_allocators - } - pub(crate) fn db_address(&self) -> *const TransactionTracker { self.transaction_tracker.as_ref() as *const _ } diff --git a/src/tree_store/table_tree.rs b/src/tree_store/table_tree.rs index b9e2051f..b4c6cb1d 100644 --- a/src/tree_store/table_tree.rs +++ b/src/tree_store/table_tree.rs @@ -1,18 +1,20 @@ use crate::db::TransactionGuard; use crate::error::TableError; use crate::multimap_table::{ - finalize_tree_and_subtree_checksums, multimap_btree_stats, verify_tree_and_subtree_checksums, + finalize_tree_and_subtree_checksums, multimap_btree_stats, parse_subtree_roots, + verify_tree_and_subtree_checksums, DynamicCollection, }; use crate::tree_store::btree::{btree_stats, UntypedBtreeMut}; use crate::tree_store::btree_base::BtreeHeader; use crate::tree_store::btree_iters::AllPageNumbersBtreeIter; +use crate::tree_store::page_store::{new_allocators, BuddyAllocator}; use crate::tree_store::{ Btree, BtreeMut, BtreeRangeIter, PageHint, PageNumber, RawBtree, TransactionalMemory, }; use crate::types::{Key, MutInPlaceValue, TypeName, Value}; use crate::{DatabaseStats, Result}; use std::cmp::max; -use std::collections::{HashMap, HashSet}; +use std::collections::HashMap; use std::mem; use std::mem::size_of; use std::ops::RangeFull; @@ -556,12 +558,14 @@ impl<'txn> TableTreeMut<'txn> { } } - pub(crate) fn all_referenced_pages(&self) -> Result> { + pub(crate) fn all_referenced_pages(&self) -> Result> { + let mut result = new_allocators(self.mem.get_layout()); + // All the pages in the table tree itself - let mut result = HashSet::new(); if let Some(iter) = self.tree.all_pages_iter()? { for page in iter { - result.insert(page?); + let page = page?; + result[page.region as usize].record_alloc(page.page_index, page.page_order); } } @@ -580,13 +584,57 @@ impl<'txn> TableTreeMut<'txn> { )?; for page in table_pages_iter { - result.insert(page?); + let page = page?; + result[page.region as usize].record_alloc(page.page_index, page.page_order); } } } - if !self.list_tables(TableType::Multimap)?.is_empty() { - unimplemented!("Walking all multimap references is not currently supported"); + for entry in self.list_tables(TableType::Multimap)? { + let definition = self + .get_table_untyped(&entry, TableType::Multimap) + .map_err(|e| e.into_storage_error_or_corrupted("Internal corruption"))? + .unwrap(); + if let Some(header) = definition.get_root() { + let table_pages_iter = AllPageNumbersBtreeIter::new( + header.root, + definition.get_fixed_key_size(), + DynamicCollection::<()>::fixed_width_with(definition.get_fixed_value_size()), + self.mem.clone(), + )?; + for page in table_pages_iter { + let page = page?; + result[page.region as usize].record_alloc(page.page_index, page.page_order); + } + + let table_pages_iter = AllPageNumbersBtreeIter::new( + header.root, + definition.get_fixed_key_size(), + DynamicCollection::<()>::fixed_width_with(definition.get_fixed_value_size()), + self.mem.clone(), + )?; + for table_page in table_pages_iter { + let page = self.mem.get_page(table_page?)?; + let subtree_roots = parse_subtree_roots( + &page, + definition.get_fixed_key_size(), + definition.get_fixed_value_size(), + ); + for subtree_header in subtree_roots { + let sub_root_iter = AllPageNumbersBtreeIter::new( + subtree_header.root, + definition.get_fixed_value_size(), + <()>::fixed_width(), + self.mem.clone(), + )?; + for page in sub_root_iter { + let page = page?; + result[page.region as usize] + .record_alloc(page.page_index, page.page_order); + } + } + } + } } Ok(result)