diff --git a/src/transactions.rs b/src/transactions.rs index dd072737..fa474e22 100644 --- a/src/transactions.rs +++ b/src/transactions.rs @@ -692,28 +692,52 @@ impl WriteTransaction { assert_eq!(self.mem.get_version(), savepoint.get_version()); self.dirty.store(true, Ordering::Release); + // Restoring a savepoint needs to accomplish the following: + // 1) restore the table tree. This is trivial, since we have the old root + // 2) update the system tree to remove invalid persistent savepoints. + // 3) free all pages that were allocated since the savepoint and are unreachable + // from the restored table tree root. This is non-trivial, since we want to avoid walking + // the entire table tree, and we cannot simply restore the old allocator state as there + // could be read transactions that reference this data. + // 3a) First we free all allocated pages that were not allocated in the savepoint, + // except those referenced by the system table + // 3b) We re-process the freed table referenced by the savepoint, but ignore any pages that + // are already free + let allocated_since_savepoint = self .mem .pages_allocated_since_raw_state(savepoint.get_regional_allocator_states()); // We don't want to rollback the system tree, so keep any pages it references - let referenced_by_system_tree = self + let mut whitelist = self .system_tables .lock() .unwrap() .table_tree .all_referenced_pages()?; + // The tracker page could have changed too. Don't erase it. + whitelist.insert(self.mem.region_tracker_page()); + + // Find the oldest transaction in the current freed tree, for use below. We do this before + // freeing pages to ensure that this tree is still valid + let oldest_unprocessed_transaction = if let Some(entry) = self + .freed_tree + .lock() + .unwrap() + .range::(&(..))? + .next() + { + entry?.key().transaction_id + } else { + self.transaction_id.raw_id() + }; let mut freed_pages = vec![]; for page in allocated_since_savepoint { - if referenced_by_system_tree.contains(&page) { + if whitelist.contains(&page) { continue; } - if self.mem.uncommitted(page) { - self.mem.free(page); - } else { - freed_pages.push(page); - } + freed_pages.push(page); } *self.freed_pages.lock().unwrap() = freed_pages; self.tables.lock().unwrap().table_tree = TableTreeMut::new( @@ -724,19 +748,6 @@ impl WriteTransaction { ); // Remove any freed pages that have already been processed. Otherwise this would result in a double free - // We assume below that PageNumber is length 8 - let oldest_unprocessed_transaction = if let Some(entry) = self - .freed_tree - .lock() - .unwrap() - .range::(&(..))? - .next() - { - entry?.key().transaction_id - } else { - self.transaction_id.raw_id() - }; - let mut freed_tree = BtreeMut::new( savepoint.get_freed_root(), self.transaction_guard.clone(), @@ -747,10 +758,31 @@ impl WriteTransaction { transaction_id: oldest_unprocessed_transaction, pagination_id: 0, }; + let mut freed_pages = self.freed_pages.lock().unwrap(); + let mut freed_pages_hash = HashSet::new(); + freed_pages_hash.extend(freed_pages.iter()); let mut to_remove = vec![]; for entry in freed_tree.range(&(..lookup_key))? { - to_remove.push(entry?.key()); + let item = entry?; + to_remove.push(item.key()); + let pages: FreedPageList = item.value(); + for i in 0..pages.len() { + let page = pages.get(i); + if self.mem.is_page_out_of_bounds(page) { + // Page no longer exists because the database file shrank + continue; + } + // Re-process the freed pages, but ignore any that would be double-frees + if self.mem.is_allocated(page) + && !freed_pages_hash.contains(&page) + && !whitelist.contains(&page) + { + freed_pages.push(page); + freed_pages_hash.insert(page); + } + } } + drop(freed_pages); for key in to_remove { freed_tree.remove(&key)?; } diff --git a/src/tree_store/page_store/page_manager.rs b/src/tree_store/page_store/page_manager.rs index cfd48feb..05ccc80c 100644 --- a/src/tree_store/page_store/page_manager.rs +++ b/src/tree_store/page_store/page_manager.rs @@ -341,6 +341,19 @@ impl TransactionalMemory { Ok(()) } + // Returns true if the page is beyond the last region: i.e. it no longer exists + pub(crate) fn is_page_out_of_bounds(&self, page: PageNumber) -> bool { + let state = self.state.lock().unwrap(); + page.region as usize >= state.allocators.region_allocators.len() + } + + pub(crate) fn is_allocated(&self, page: PageNumber) -> bool { + let state = self.state.lock().unwrap(); + let allocator = state.get_region(page.region); + + allocator.is_allocated(page.page_index, page.page_order) + } + pub(crate) fn mark_pages_allocated( &self, allocated_pages: impl Iterator>, @@ -410,6 +423,10 @@ impl TransactionalMemory { result } + pub(crate) fn region_tracker_page(&self) -> PageNumber { + self.state.lock().unwrap().header.region_tracker() + } + // Relocates the region tracker to a lower page, if possible // Returns true if the page was moved pub(crate) fn relocate_region_tracker(&self) -> Result { diff --git a/tests/integration_tests.rs b/tests/integration_tests.rs index aeca518c..e7ca97a3 100644 --- a/tests/integration_tests.rs +++ b/tests/integration_tests.rs @@ -1002,6 +1002,73 @@ fn regression22() { assert_eq!(allocated_pages, txn.stats().unwrap().allocated_pages()); } +#[test] +fn regression23() { + let tmpfile = create_tempfile(); + + let db = Database::create(tmpfile.path()).unwrap(); + let txn = db.begin_write().unwrap(); + { + // List the savepoints to ensure the system table is created and occupies a page + #[allow(unused_must_use)] + { + txn.list_persistent_savepoints().unwrap(); + } + let mut table = txn.open_table(U64_TABLE).unwrap(); + table.insert(0, 0).unwrap(); + } + txn.commit().unwrap(); + + let txn = db.begin_write().unwrap(); + { + let mut table = txn.open_table(U64_TABLE).unwrap(); + table.remove(0).unwrap(); + } + txn.commit().unwrap(); + + // Extra commit to finalize the cleanup of the freed pages + let txn = db.begin_write().unwrap(); + txn.commit().unwrap(); + + let txn = db.begin_write().unwrap(); + let allocated_pages = txn.stats().unwrap().allocated_pages(); + { + let mut table = txn.open_table(U64_TABLE).unwrap(); + table.insert(0, 0).unwrap(); + } + txn.commit().unwrap(); + + let txn = db.begin_write().unwrap(); + { + let mut table = txn.open_table(U64_TABLE).unwrap(); + table.remove(0).unwrap(); + } + txn.commit().unwrap(); + + let txn = db.begin_write().unwrap(); + let savepoint = txn.ephemeral_savepoint().unwrap(); + txn.commit().unwrap(); + + let txn = db.begin_write().unwrap(); + { + let mut table = txn.open_table(U64_TABLE).unwrap(); + table.insert(0, 0).unwrap(); + } + txn.commit().unwrap(); + + let mut txn = db.begin_write().unwrap(); + txn.restore_savepoint(&savepoint).unwrap(); + txn.commit().unwrap(); + drop(savepoint); + + // Extra commit to finalize the cleanup of the freed pages. + // There was a bug where the restoration of the savepoint would leak pages + db.begin_write().unwrap().commit().unwrap(); + + let txn = db.begin_write().unwrap(); + assert_eq!(allocated_pages, txn.stats().unwrap().allocated_pages()); +} + #[test] fn check_integrity_clean() { let tmpfile = create_tempfile();