From 6672ed5b896fd5fc6a6febb79a4092913b5e9139 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Luis=20Coss=C3=ADo?= Date: Tue, 24 Sep 2024 22:44:44 -0300 Subject: [PATCH 1/9] Untested but compiler not complaining --- src/payload_storage.rs | 121 +++++++++++++++++++++++++++++++- src/slotted_page.rs | 155 ++++++++++++++++++++++++++++++----------- 2 files changed, 234 insertions(+), 42 deletions(-) diff --git a/src/payload_storage.rs b/src/payload_storage.rs index 617434b..6123040 100644 --- a/src/payload_storage.rs +++ b/src/payload_storage.rs @@ -3,6 +3,7 @@ use crate::payload::Payload; use crate::slotted_page::{SlotHeader, SlottedPageMmap}; use lz4_flex::compress_prepend_size; use parking_lot::RwLock; +use std::cmp::Reverse; use std::collections::HashMap; use std::path::PathBuf; use std::sync::Arc; @@ -166,7 +167,7 @@ impl PayloadStorage { self.create_new_page(Some(payload_size)) }); let mut page = self.pages.get_mut(&new_page_id).unwrap().write(); - let new_slot_id = page.insert_value(&comp_payload).unwrap(); + let new_slot_id = page.insert_value(point_offset, &comp_payload).unwrap(); // update page_tracker self.page_tracker .set(point_offset, PagePointer::new(new_page_id, new_slot_id)); @@ -181,7 +182,12 @@ impl PayloadStorage { }); let page = self.pages.get_mut(&page_id).unwrap(); - let slot_id = page.write().insert_value(&comp_payload).unwrap(); + + let slot_id = page + .write() + .insert_value(point_offset, &comp_payload) + .unwrap(); + // update page_tracker self.page_tracker .set(point_offset, PagePointer::new(page_id, slot_id)); @@ -199,6 +205,117 @@ impl PayloadStorage { self.page_tracker.unset(point_offset); Some(()) } + + /// Page ids with amount of fragmentation, ordered by most to least fragmentation + fn pages_to_defrag(&self) -> Vec<(u32, usize)> { + let mut fragmentation = self + .pages + .iter() + .filter_map(|(page_id, page)| { + let page = page.read(); + let frag_space = page.fragmented_space(); + + // check if we should defrag this page + let frag_threshold = + SlottedPageMmap::FRAGMENTATION_THRESHOLD_RATIO * page.page_size() as f32; + if frag_space < frag_threshold.ceil() as usize { + // page is not fragmented enough, skip + return None; + } + Some((*page_id, frag_space)) + }) + .collect::>(); + + // sort by most to least fragmented + fragmentation.sort_unstable_by_key(|(_, fragmented_space)| Reverse(*fragmented_space)); + + fragmentation + } + + pub fn compact(&mut self) { + // find out which pages should be compacted + let pages_to_defrag = self.pages_to_defrag(); + + if pages_to_defrag.is_empty() { + return; + } + + let mut pages_to_defrag = pages_to_defrag.into_iter(); + let mut old_page_id = pages_to_defrag.next().unwrap().0; + let mut last_slot_id = 0; + + // TODO: account for the fact that the first value could be larger than 32MB and the newly created page will + // immediately not be used? we don't want to create empty pages. But it is a quite rare case, so maybe we can just ignore it + let mut size_hint = None; + + // This is a loop because we might need to create more pages if the current new page is full + 'new_page: loop { + // create new page + let new_page_id = self.create_new_page(size_hint); + + // go over each page to defrag + loop { + let mut new_page = self.pages.get(&new_page_id).unwrap().write(); + let old_page = self.pages.get(&old_page_id).unwrap().read(); + + 'slots: for (slot, value) in old_page.iter_slot_values_starting_from(last_slot_id) { + let point_offset = slot.point_offset(); + + if slot.deleted() { + continue 'slots; + } + + let was_inserted = if let Some(value) = value { + new_page.insert_value(point_offset, value) + } else { + new_page.insert_placeholder_value(point_offset) + }; + + if was_inserted.is_none() { + // new page is full, create a new one + size_hint = value.map(|v| v.len()); + continue 'new_page; + } + + let slot_id = was_inserted.expect("a value should always fit at this point"); + + let new_pointer = PagePointer { + page_id: new_page_id, + slot_id, + }; + + // update page tracker + self.page_tracker.set(point_offset, new_pointer); + + // prepare for next iteration + last_slot_id += 1; + } + // drop read and write guards + drop(old_page); + drop(new_page); + + // delete old page + let page_to_remove = self.pages.remove(&old_page_id).unwrap(); + last_slot_id = 0; + + // All points in this page have been updated to the new page in the page tracker, + // so there should not be any outstanding references to this page. + // TODO: audit this part + Arc::into_inner(page_to_remove) + .unwrap() + .into_inner() + .drop_page(); + + match pages_to_defrag.next() { + Some((page_id, _defrag_space)) => { + old_page_id = page_id; + } + // No more pages to defrag, end compaction + None => break 'new_page, + }; + } + } + } } #[cfg(test)] diff --git a/src/slotted_page.rs b/src/slotted_page.rs index b6c92fa..f8c24c5 100644 --- a/src/slotted_page.rs +++ b/src/slotted_page.rs @@ -41,30 +41,46 @@ impl SlottedPageHeader { pub struct SlotHeader { offset: u64, // offset in the page (8 bytes) length: u64, // length of the value (8 bytes) + point_offset: u32, // point id (4 bytes) right_padding: u8, // padding within the value for small values (1 byte) deleted: bool, // whether the value has been deleted (1 byte) - _align: [u8; 6], // 6 bytes padding for alignment + _align: [u8; 2], // 2 bytes padding for alignment } impl SlotHeader { pub const fn size_in_bytes() -> usize { size_of::() - // 24 // 8 + 8 + 1 + 1 + 6 padding + // 24 // 8 + 8 + 4 + 1 + 1 + 2 padding } - fn new(offset: u64, length: u64, right_padding: u8, deleted: bool) -> SlotHeader { + fn new( + point_offset: u32, + offset: u64, + length: u64, + right_padding: u8, + deleted: bool, + ) -> SlotHeader { assert!( length >= SlottedPageMmap::MIN_VALUE_SIZE_BYTES as u64, "Value too small" ); SlotHeader { + point_offset, offset, length, right_padding, deleted, - _align: [0; 6], + _align: [0; 2], } } + + pub fn point_offset(&self) -> u32 { + self.point_offset + } + + pub fn deleted(&self) -> bool { + self.deleted + } } #[derive(Debug)] @@ -87,33 +103,45 @@ impl SlottedPageMmap { const PLACEHOLDER_VALUE: [u8; SlottedPageMmap::MIN_VALUE_SIZE_BYTES] = [0; SlottedPageMmap::MIN_VALUE_SIZE_BYTES]; + pub const FRAGMENTATION_THRESHOLD_RATIO: f32 = 0.5; + /// Flushes outstanding memory map modifications to disk. fn flush(&self) { self.mmap.flush().unwrap(); } - /// Return all values in the page with deleted values as None - fn all_values(&self) -> Vec> { - let mut values = Vec::new(); - for i in 0..self.header.slot_count { - let slot = self.get_slot(&(i as u32)).unwrap(); - // skip values associated with deleted slots - if slot.deleted { - values.push(None); - continue; - } - match self.get_slot_value(&slot) { - Some(value) => values.push(Some(value)), - None => values.push(None), - } + /// Return all values in the page with placeholder or deleted values as `None` + pub fn all_values(&self) -> Vec> { + self.iter_slot_values_starting_from(0) + .map(|(_, value)| value) + .collect() + } + + /// Iterate over all values in the page, starting at the provided slot id. + /// + /// `None` values can be either placeholders, or deleted values + pub fn iter_slot_values_starting_from( + &self, + slot_id: SlotId, + ) -> impl Iterator)> + '_ { + if slot_id as u64 >= self.header.slot_count { + panic!("Slot id out of bounds") } - // values are stored in reverse order - values.reverse(); - values + + (slot_id as u64..self.header.slot_count).map(move |i| { + let slot = self.get_slot_ref(&(i as u32)).unwrap(); + let value = if slot.deleted { + None + } else { + self.get_slot_value(slot) + }; + + (slot, value) + }) } - /// Returns all non deleted values in the page - fn values(&self) -> Vec<&[u8]> { + /// Returns all non deleted values in the page. `None` values means that the slot is a placeholder + fn non_deleted_values(&self) -> Vec<&[u8]> { let mut values = Vec::new(); for i in 0..self.header.slot_count { let slot = self.get_slot(&(i as u32)).unwrap(); @@ -125,8 +153,6 @@ impl SlottedPageMmap { values.push(value) } } - // values are stored in reverse order - values.reverse(); values } @@ -180,6 +206,10 @@ impl SlottedPageMmap { /// Get the slot associated with the slot id. fn get_slot(&self, slot_id: &u32) -> Option { + self.get_slot_ref(slot_id).cloned() + } + + fn get_slot_ref(&self, slot_id: &SlotId) -> Option<&SlotHeader> { let slot_count = self.header.slot_count; if *slot_id >= slot_count as u32 { return None; @@ -190,7 +220,8 @@ impl SlottedPageMmap { let start = slot_offset; let end = start + size_of::(); let slot: &SlotHeader = transmute_from_u8(&self.mmap[start..end]); - Some(slot.clone()) + + Some(slot) } /// Get value associated with the slot @@ -237,6 +268,35 @@ impl SlottedPageMmap { data_start_offset.saturating_sub(last_slot_offset) } + pub fn page_size(&self) -> usize { + self.header.page_size() + } + + /// Sums the amount of unused space in between the data. + pub fn fragmented_space(&self) -> usize { + let mut fragmented_space = 0; + + let mut slot_id = 0; + let mut last_offset = 0u64; + while let Some(slot) = self.get_slot_ref(&slot_id) { + // if the slot is deleted, we can consider it empty space + if slot.deleted { + fragmented_space += slot.length; + } + + // check the empty space between the last slot and the end of the current slot + fragmented_space += last_offset + .checked_sub(slot.offset + slot.length) + .expect("last_offset should be to the right of the current slot end"); + + // update for next iteration + last_offset = slot.offset; + slot_id += 1; + } + + fragmented_space as usize + } + /// Compute the start and end offsets for the slot fn offsets_for_slot(&self, slot_id: SlotId) -> (usize, usize) { let slot_offset = @@ -247,8 +307,8 @@ impl SlottedPageMmap { } /// Insert a new placeholder into the page - pub fn insert_placeholder_value(&mut self) -> Option { - self.insert_value(&SlottedPageMmap::PLACEHOLDER_VALUE) + pub fn insert_placeholder_value(&mut self, point_id: u32) -> Option { + self.insert_value(point_id, &SlottedPageMmap::PLACEHOLDER_VALUE) } /// Insert a new value into the page @@ -256,7 +316,7 @@ impl SlottedPageMmap { /// Returns /// - None if there is not enough space for a new slot + value /// - Some(slot_id) if the value was successfully added - pub fn insert_value(&mut self, value: &[u8]) -> Option { + pub fn insert_value(&mut self, point_offset: u32, value: &[u8]) -> Option { // size of the value in bytes let real_value_size = value.len(); @@ -278,6 +338,7 @@ impl SlottedPageMmap { let slot_count = self.header.slot_count; let next_slot_id = slot_count as SlotId; let slot = SlotHeader::new( + point_offset, new_data_start_offset as u64, value_len as u64, padding as u8, @@ -357,6 +418,7 @@ impl SlottedPageMmap { // actual value size accounting for the minimum value size let value_len = real_value_size + right_padding; let update_slot = SlotHeader::new( + slot.point_offset, value_start as u64, // new offset value value_len as u64, // new value size right_padding as u8, // new padding @@ -369,6 +431,12 @@ impl SlottedPageMmap { true } + /// Delete the page from the filesystem. + pub fn drop_page(self) { + drop(self.mmap); + std::fs::remove_file(&self.path).unwrap(); + } + // TODO // - remove deleted slots and values // - shift all values to the end of the page @@ -455,9 +523,11 @@ mod tests { let mut mmap = SlottedPageMmap::open(path).unwrap(); let mut free_space = mmap.free_space(); + let mut sequence = 0u32..; // add placeholder values while mmap.has_capacity_for_min_value() { - mmap.insert_placeholder_value().unwrap(); + mmap.insert_placeholder_value(sequence.next().unwrap()) + .unwrap(); let new_free_space = mmap.free_space(); assert!(new_free_space < free_space); free_space = new_free_space; @@ -468,7 +538,10 @@ mod tests { assert_eq!(mmap.free_space(), 104); // not enough space for a new slot + placeholder value // can't add more values - assert_eq!(mmap.insert_placeholder_value(), None); + assert_eq!( + mmap.insert_placeholder_value(sequence.next().unwrap()), + None + ); // drop and reopen drop(mmap); @@ -477,7 +550,7 @@ mod tests { assert_eq!(mmap.header.data_start_offset, 5_298_176); assert_eq!(mmap.all_values().len(), expected_slot_count as usize); - assert_eq!(mmap.values().len(), 0); + assert_eq!(mmap.non_deleted_values().len(), 0); } #[test] @@ -494,8 +567,8 @@ mod tests { assert_eq!(values.len(), 0); // add 10 placeholder values - for _ in 0..10 { - mmap.insert_placeholder_value().unwrap(); + for i in 0..10 { + mmap.insert_placeholder_value(i).unwrap(); } assert_eq!(mmap.header.slot_count, 10); @@ -540,7 +613,8 @@ mod tests { bar: i, qux: i % 2 == 0, }; - mmap.insert_value(foo.to_bytes().as_slice()).unwrap(); + mmap.insert_value(i as u32, foo.to_bytes().as_slice()) + .unwrap(); } assert_eq!(mmap.header.slot_count, 100); @@ -589,7 +663,8 @@ mod tests { bar: i, qux: i % 2 == 0, }; - mmap.insert_value(foo.to_bytes().as_slice()).unwrap(); + mmap.insert_value(i as u32, foo.to_bytes().as_slice()) + .unwrap(); } // delete slot 10 @@ -598,7 +673,7 @@ mod tests { assert!(mmap.get_slot(&10).unwrap().deleted); assert_eq!(mmap.all_values().len(), 100); - assert_eq!(mmap.values().len(), 99) + assert_eq!(mmap.non_deleted_values().len(), 99) } #[test] @@ -616,7 +691,7 @@ mod tests { // push one value let foo = Foo { bar: 1, qux: true }; - mmap.insert_value(foo.to_bytes().as_slice()).unwrap(); + mmap.insert_value(0, foo.to_bytes().as_slice()).unwrap(); // read slots & values let slot = mmap.get_slot(&0).unwrap(); @@ -651,7 +726,7 @@ mod tests { assert_eq!(values.len(), 0); // push placeholder value - mmap.insert_placeholder_value().unwrap(); + mmap.insert_placeholder_value(0).unwrap(); let values = mmap.all_values(); assert_eq!(values.len(), 1); assert_eq!(mmap.get_value(&0), None); @@ -687,7 +762,7 @@ mod tests { assert_eq!(values.len(), 0); // push placeholder value - mmap.insert_placeholder_value().unwrap(); + mmap.insert_placeholder_value(0).unwrap(); let values = mmap.all_values(); assert_eq!(values.len(), 1); assert_eq!(mmap.get_value(&0), None); From c92c1c9790151aac2a68510676c476a27c3774e6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Luis=20Coss=C3=ADo?= Date: Wed, 25 Sep 2024 12:36:15 -0300 Subject: [PATCH 2/9] put the page tracker behind a RwLock --- src/payload_storage.rs | 49 ++++++++++++++++++++++++++---------------- 1 file changed, 30 insertions(+), 19 deletions(-) diff --git a/src/payload_storage.rs b/src/payload_storage.rs index 6123040..ef91e09 100644 --- a/src/payload_storage.rs +++ b/src/payload_storage.rs @@ -9,7 +9,7 @@ use std::path::PathBuf; use std::sync::Arc; pub struct PayloadStorage { - page_tracker: PageTracker, + page_tracker: RwLock, pages: HashMap>>, // page_id -> mmap page max_page_id: u32, base_path: PathBuf, @@ -28,7 +28,7 @@ impl PayloadStorage { pub fn new(path: PathBuf) -> Self { Self { - page_tracker: PageTracker::new(&path, None), + page_tracker: RwLock::new(PageTracker::new(&path, None)), pages: HashMap::new(), max_page_id: 0, base_path: path, @@ -53,7 +53,7 @@ impl PayloadStorage { } } Some(Self { - page_tracker, + page_tracker: RwLock::new(page_tracker), pages, max_page_id, base_path: path, @@ -61,7 +61,7 @@ impl PayloadStorage { } pub fn is_empty(&self) -> bool { - self.pages.is_empty() && self.page_tracker.is_empty() + self.pages.is_empty() && self.page_tracker.read().is_empty() } /// Get the path for a given page id @@ -90,9 +90,9 @@ impl PayloadStorage { /// Get the payload for a given point offset pub fn get_payload(&self, point_offset: PointOffset) -> Option { let PagePointer { page_id, slot_id } = self.get_pointer(point_offset)?; - let page = self.pages.get(page_id).expect("page not found"); + let page = self.pages.get(&page_id).expect("page not found"); let page_guard = page.read(); - let raw = page_guard.get_value(slot_id)?; + let raw = page_guard.get_value(&slot_id)?; let decompressed = Self::decompress(raw); let payload = Payload::from_bytes(&decompressed); Some(payload) @@ -140,8 +140,8 @@ impl PayloadStorage { } /// Get the mapping for a given point offset - fn get_pointer(&self, point_offset: PointOffset) -> Option<&PagePointer> { - self.page_tracker.get(point_offset) + fn get_pointer(&self, point_offset: PointOffset) -> Option { + self.page_tracker.read().get(point_offset).copied() } /// Put a payload in the storage @@ -150,7 +150,7 @@ impl PayloadStorage { let comp_payload = Self::compress(&payload_bytes); let payload_size = comp_payload.len(); - if let Some(PagePointer { page_id, slot_id }) = self.get_pointer(point_offset).copied() { + if let Some(PagePointer { page_id, slot_id }) = self.get_pointer(point_offset) { let page = self.pages.get_mut(&page_id).unwrap(); let mut page_guard = page.write(); let updated = page_guard.update_value(slot_id, &comp_payload); @@ -170,6 +170,7 @@ impl PayloadStorage { let new_slot_id = page.insert_value(point_offset, &comp_payload).unwrap(); // update page_tracker self.page_tracker + .write() .set(point_offset, PagePointer::new(new_page_id, new_slot_id)); } } else { @@ -187,9 +188,10 @@ impl PayloadStorage { .write() .insert_value(point_offset, &comp_payload) .unwrap(); - + // update page_tracker self.page_tracker + .write() .set(point_offset, PagePointer::new(page_id, slot_id)); } } @@ -197,12 +199,12 @@ impl PayloadStorage { /// Delete a payload from the storage /// Returns None if the point_offset, page, or payload was not found pub fn delete_payload(&mut self, point_offset: PointOffset) -> Option<()> { - let PagePointer { page_id, slot_id } = self.get_pointer(point_offset).copied()?; + let PagePointer { page_id, slot_id } = self.get_pointer(point_offset)?; let page = self.pages.get_mut(&page_id)?; // delete value from page page.write().delete_value(slot_id); // delete mapping - self.page_tracker.unset(point_offset); + self.page_tracker.write().unset(point_offset); Some(()) } @@ -255,6 +257,9 @@ impl PayloadStorage { // go over each page to defrag loop { + // lock the tracker at this point, to prevent updates to the page tracker while we are defragging + let mut page_tracker = self.page_tracker.write(); + let mut new_page = self.pages.get(&new_page_id).unwrap().write(); let old_page = self.pages.get(&old_page_id).unwrap().read(); @@ -285,7 +290,7 @@ impl PayloadStorage { }; // update page tracker - self.page_tracker.set(point_offset, new_pointer); + page_tracker.set(point_offset, new_pointer); // prepare for next iteration last_slot_id += 1; @@ -341,7 +346,7 @@ mod tests { let payload = Payload::default(); storage.put_payload(0, payload); assert_eq!(storage.pages.len(), 1); - assert_eq!(storage.page_tracker.raw_mapping_len(), 1); + assert_eq!(storage.page_tracker.read().raw_mapping_len(), 1); let stored_payload = storage.get_payload(0); assert!(stored_payload.is_some()); @@ -359,7 +364,7 @@ mod tests { storage.put_payload(0, payload.clone()); assert_eq!(storage.pages.len(), 1); - assert_eq!(storage.page_tracker.raw_mapping_len(), 1); + assert_eq!(storage.page_tracker.read().raw_mapping_len(), 1); let page_mapping = storage.get_pointer(0).unwrap(); assert_eq!(page_mapping.page_id, 1); // first page @@ -437,7 +442,7 @@ mod tests { storage.put_payload(0, payload.clone()); assert_eq!(storage.pages.len(), 1); - assert_eq!(storage.page_tracker.raw_mapping_len(), 1); + assert_eq!(storage.page_tracker.read().raw_mapping_len(), 1); let page_mapping = storage.get_pointer(0).unwrap(); assert_eq!(page_mapping.page_id, 1); // first page @@ -455,7 +460,7 @@ mod tests { storage.put_payload(0, updated_payload.clone()); assert_eq!(storage.pages.len(), 1); - assert_eq!(storage.page_tracker.raw_mapping_len(), 1); + assert_eq!(storage.page_tracker.read().raw_mapping_len(), 1); let stored_payload = storage.get_payload(0); assert!(stored_payload.is_some()); @@ -519,7 +524,10 @@ mod tests { } // asset same length - assert_eq!(storage.page_tracker.mapping_len(), model_hashmap.len()); + assert_eq!( + storage.page_tracker.read().mapping_len(), + model_hashmap.len() + ); // validate storage and model_hashmap are the same for point_offset in 0..=max_point_offset { @@ -535,7 +543,10 @@ mod tests { let storage = PayloadStorage::open(dir.path().to_path_buf()).unwrap(); // asset same length - assert_eq!(storage.page_tracker.mapping_len(), model_hashmap.len()); + assert_eq!( + storage.page_tracker.read().mapping_len(), + model_hashmap.len() + ); // validate storage and model_hashmap are the same for point_offset in 0..=max_point_offset { From 21daf0011865456a91400a249e460db611ab2d53 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Luis=20Coss=C3=ADo?= Date: Thu, 26 Sep 2024 09:59:50 -0300 Subject: [PATCH 3/9] fix refactor --- src/slotted_page.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/slotted_page.rs b/src/slotted_page.rs index f8c24c5..87483f5 100644 --- a/src/slotted_page.rs +++ b/src/slotted_page.rs @@ -123,8 +123,8 @@ impl SlottedPageMmap { pub fn iter_slot_values_starting_from( &self, slot_id: SlotId, - ) -> impl Iterator)> + '_ { - if slot_id as u64 >= self.header.slot_count { + ) -> impl Iterator)> + '_ { + if slot_id as u64 >= self.header.slot_count && self.header.slot_count > 0 { panic!("Slot id out of bounds") } From 5f6983612fafe8eb7c6fef38262dd8387cca2cb9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Luis=20Coss=C3=ADo?= Date: Thu, 26 Sep 2024 10:02:39 -0300 Subject: [PATCH 4/9] fmt --- src/slotted_page.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/slotted_page.rs b/src/slotted_page.rs index 87483f5..6e6ca45 100644 --- a/src/slotted_page.rs +++ b/src/slotted_page.rs @@ -123,7 +123,7 @@ impl SlottedPageMmap { pub fn iter_slot_values_starting_from( &self, slot_id: SlotId, - ) -> impl Iterator)> + '_ { + ) -> impl Iterator)> + '_ { if slot_id as u64 >= self.header.slot_count && self.header.slot_count > 0 { panic!("Slot id out of bounds") } From 18c4e084d6fd84c048cf3a5e9f6dd0b30f1fc44b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Luis=20Coss=C3=ADo?= Date: Thu, 26 Sep 2024 12:30:13 -0300 Subject: [PATCH 5/9] test compaction --- src/payload_storage.rs | 66 ++++++++++++++++++++++++++++++++++++++---- src/slotted_page.rs | 2 +- 2 files changed, 62 insertions(+), 6 deletions(-) diff --git a/src/payload_storage.rs b/src/payload_storage.rs index ef91e09..6beb45f 100644 --- a/src/payload_storage.rs +++ b/src/payload_storage.rs @@ -257,7 +257,7 @@ impl PayloadStorage { // go over each page to defrag loop { - // lock the tracker at this point, to prevent updates to the page tracker while we are defragging + // lock the tracker at this point, to prevent other processes from using the old pointer let mut page_tracker = self.page_tracker.write(); let mut new_page = self.pages.get(&new_page_id).unwrap().write(); @@ -709,16 +709,16 @@ mod tests { // load data into storage let point_offset = write_data(&mut storage, 0); assert_eq!(point_offset, EXPECTED_LEN as u32); - assert_eq!(storage.page_tracker.mapping_len(), EXPECTED_LEN); - assert_eq!(storage.page_tracker.raw_mapping_len(), EXPECTED_LEN); + assert_eq!(storage.page_tracker.read().mapping_len(), EXPECTED_LEN); + assert_eq!(storage.page_tracker.read().raw_mapping_len(), EXPECTED_LEN); assert_eq!(storage.pages.len(), 2); // write the same payload a second time let point_offset = write_data(&mut storage, point_offset); assert_eq!(point_offset, EXPECTED_LEN as u32 * 2); assert_eq!(storage.pages.len(), 3); - assert_eq!(storage.page_tracker.mapping_len(), EXPECTED_LEN * 2); - assert_eq!(storage.page_tracker.raw_mapping_len(), EXPECTED_LEN * 2); + assert_eq!(storage.page_tracker.read().mapping_len(), EXPECTED_LEN * 2); + assert_eq!(storage.page_tracker.read().raw_mapping_len(), EXPECTED_LEN * 2); // assert storage is consistent storage_double_pass_is_consistent(&storage); @@ -732,4 +732,60 @@ mod tests { // assert storage is consistent after reopening storage_double_pass_is_consistent(&storage); } + + #[test] + fn test_compaction() { + let (_dir, mut storage) = empty_storage(); + + let rng = &mut rand::thread_rng(); + let max_point_offset = 20000; + + let large_payloads = (0..max_point_offset) + .map(|_| one_random_payload_please(rng, 10)) + .collect::>(); + + for i in 0..max_point_offset { + storage.put_payload(i as u32, large_payloads[i].clone()); + } + + // sanity check + for i in 0..max_point_offset { + let stored_payload = storage.get_payload(i as u32); + assert_eq!(stored_payload.as_ref(), Some(&large_payloads[i])); + } + + // check no fragmentation + for page in storage.pages.values() { + assert_eq!(page.read().fragmented_space(), 0); + } + + // update with smaller values + let small_payloads = (0..max_point_offset) + .map(|_| one_random_payload_please(rng, 1)) + .collect::>(); + for i in 0..max_point_offset { + storage.put_payload(i as u32, small_payloads[i].clone()); + } + + // sanity check + for i in 0..max_point_offset { + let stored_payload = storage.get_payload(i as u32); + assert_eq!(stored_payload.as_ref(), Some(&small_payloads[i])); + } + + // check fragmentation + assert!(!storage.pages_to_defrag().is_empty()); + + // compaction + storage.compact(); + + // check consistency + for i in 0..max_point_offset { + let stored_payload = storage.get_payload(i as u32); + assert_eq!(stored_payload.as_ref(), Some(&small_payloads[i])); + } + + // check no outstanding fragmentation + assert!(storage.pages_to_defrag().is_empty()); + } } diff --git a/src/slotted_page.rs b/src/slotted_page.rs index 6e6ca45..4711d24 100644 --- a/src/slotted_page.rs +++ b/src/slotted_page.rs @@ -277,7 +277,7 @@ impl SlottedPageMmap { let mut fragmented_space = 0; let mut slot_id = 0; - let mut last_offset = 0u64; + let mut last_offset = self.page_size() as u64; while let Some(slot) = self.get_slot_ref(&slot_id) { // if the slot is deleted, we can consider it empty space if slot.deleted { From 8b06f0dcb0096ba0adacaa9a3786d2630ed00f81 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Luis=20Coss=C3=ADo?= Date: Thu, 26 Sep 2024 14:03:28 -0300 Subject: [PATCH 6/9] fmt --- src/payload_storage.rs | 25 ++++++++++++++----------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/src/payload_storage.rs b/src/payload_storage.rs index 6beb45f..68702ee 100644 --- a/src/payload_storage.rs +++ b/src/payload_storage.rs @@ -718,7 +718,10 @@ mod tests { assert_eq!(point_offset, EXPECTED_LEN as u32 * 2); assert_eq!(storage.pages.len(), 3); assert_eq!(storage.page_tracker.read().mapping_len(), EXPECTED_LEN * 2); - assert_eq!(storage.page_tracker.read().raw_mapping_len(), EXPECTED_LEN * 2); + assert_eq!( + storage.page_tracker.read().raw_mapping_len(), + EXPECTED_LEN * 2 + ); // assert storage is consistent storage_double_pass_is_consistent(&storage); @@ -732,7 +735,7 @@ mod tests { // assert storage is consistent after reopening storage_double_pass_is_consistent(&storage); } - + #[test] fn test_compaction() { let (_dir, mut storage) = empty_storage(); @@ -743,22 +746,22 @@ mod tests { let large_payloads = (0..max_point_offset) .map(|_| one_random_payload_please(rng, 10)) .collect::>(); - + for i in 0..max_point_offset { storage.put_payload(i as u32, large_payloads[i].clone()); } - + // sanity check for i in 0..max_point_offset { let stored_payload = storage.get_payload(i as u32); assert_eq!(stored_payload.as_ref(), Some(&large_payloads[i])); } - + // check no fragmentation for page in storage.pages.values() { assert_eq!(page.read().fragmented_space(), 0); } - + // update with smaller values let small_payloads = (0..max_point_offset) .map(|_| one_random_payload_please(rng, 1)) @@ -766,25 +769,25 @@ mod tests { for i in 0..max_point_offset { storage.put_payload(i as u32, small_payloads[i].clone()); } - + // sanity check for i in 0..max_point_offset { let stored_payload = storage.get_payload(i as u32); assert_eq!(stored_payload.as_ref(), Some(&small_payloads[i])); } - + // check fragmentation assert!(!storage.pages_to_defrag().is_empty()); - + // compaction storage.compact(); - + // check consistency for i in 0..max_point_offset { let stored_payload = storage.get_payload(i as u32); assert_eq!(stored_payload.as_ref(), Some(&small_payloads[i])); } - + // check no outstanding fragmentation assert!(storage.pages_to_defrag().is_empty()); } From 6ea1d1f2dac345f92ec9be6e5285d9b5dcf85f5b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Luis=20Coss=C3=ADo?= Date: Thu, 26 Sep 2024 16:57:02 -0300 Subject: [PATCH 7/9] refactor compaction with ControlFlow --- src/payload_storage.rs | 174 ++++++++++++++++++++++++++++------------- src/slotted_page.rs | 5 +- 2 files changed, 122 insertions(+), 57 deletions(-) diff --git a/src/payload_storage.rs b/src/payload_storage.rs index 68702ee..ca35764 100644 --- a/src/payload_storage.rs +++ b/src/payload_storage.rs @@ -1,10 +1,11 @@ -use crate::page_tracker::{PagePointer, PageTracker, PointOffset}; +use crate::page_tracker::{PageId, PagePointer, PageTracker, PointOffset}; use crate::payload::Payload; -use crate::slotted_page::{SlotHeader, SlottedPageMmap}; +use crate::slotted_page::{SlotHeader, SlotId, SlottedPageMmap}; use lz4_flex::compress_prepend_size; use parking_lot::RwLock; use std::cmp::Reverse; use std::collections::HashMap; +use std::ops::ControlFlow; use std::path::PathBuf; use std::sync::Arc; @@ -37,7 +38,7 @@ impl PayloadStorage { /// Open an existing PayloadStorage at the given path /// Returns None if the storage does not exist - fn open(path: PathBuf) -> Option { + pub fn open(path: PathBuf) -> Option { let page_tracker = PageTracker::open(&path)?; let page_ids = page_tracker.all_page_ids(); // load pages @@ -90,7 +91,7 @@ impl PayloadStorage { /// Get the payload for a given point offset pub fn get_payload(&self, point_offset: PointOffset) -> Option { let PagePointer { page_id, slot_id } = self.get_pointer(point_offset)?; - let page = self.pages.get(&page_id).expect("page not found"); + let page = self.pages.get(&page_id)?; let page_guard = page.read(); let raw = page_guard.get_value(&slot_id)?; let decompressed = Self::decompress(raw); @@ -234,12 +235,13 @@ impl PayloadStorage { fragmentation } - pub fn compact(&mut self) { + /// Compact the storage by defragmenting pages into new ones. Returns true if at least one page was defragmented. + pub fn compact(&mut self) -> bool { // find out which pages should be compacted let pages_to_defrag = self.pages_to_defrag(); if pages_to_defrag.is_empty() { - return; + return false; } let mut pages_to_defrag = pages_to_defrag.into_iter(); @@ -253,55 +255,40 @@ impl PayloadStorage { // This is a loop because we might need to create more pages if the current new page is full 'new_page: loop { // create new page - let new_page_id = self.create_new_page(size_hint); + let new_page_id = { + let this = &mut *self; + let new_page_id = this.max_page_id + 1; + let path = this.page_path(new_page_id); + let was_created = + this.add_page(new_page_id, SlottedPageMmap::new(&path, size_hint)); + + assert!(was_created); + + new_page_id + }; // go over each page to defrag loop { - // lock the tracker at this point, to prevent other processes from using the old pointer + // lock the tracker at this point, to prevent other threads from waiting on the old page let mut page_tracker = self.page_tracker.write(); - let mut new_page = self.pages.get(&new_page_id).unwrap().write(); - let old_page = self.pages.get(&old_page_id).unwrap().read(); - - 'slots: for (slot, value) in old_page.iter_slot_values_starting_from(last_slot_id) { - let point_offset = slot.point_offset(); - - if slot.deleted() { - continue 'slots; - } - - let was_inserted = if let Some(value) = value { - new_page.insert_value(point_offset, value) - } else { - new_page.insert_placeholder_value(point_offset) - }; - - if was_inserted.is_none() { - // new page is full, create a new one - size_hint = value.map(|v| v.len()); + match self.transfer_page_values( + old_page_id, + new_page_id, + last_slot_id, + &mut page_tracker, + ) { + ControlFlow::Break((slot_id, hint)) => { + // page is full, create a new one + size_hint = hint; + last_slot_id = slot_id; continue 'new_page; } - - let slot_id = was_inserted.expect("a value should always fit at this point"); - - let new_pointer = PagePointer { - page_id: new_page_id, - slot_id, - }; - - // update page tracker - page_tracker.set(point_offset, new_pointer); - - // prepare for next iteration - last_slot_id += 1; + ControlFlow::Continue(()) => {} } - // drop read and write guards - drop(old_page); - drop(new_page); - // delete old page + // delete old page, page tracker shouldn't point to this page anymore. let page_to_remove = self.pages.remove(&old_page_id).unwrap(); - last_slot_id = 0; // All points in this page have been updated to the new page in the page tracker, // so there should not be any outstanding references to this page. @@ -309,10 +296,11 @@ impl PayloadStorage { Arc::into_inner(page_to_remove) .unwrap() .into_inner() - .drop_page(); + .delete_page(); match pages_to_defrag.next() { Some((page_id, _defrag_space)) => { + last_slot_id = 0; old_page_id = page_id; } // No more pages to defrag, end compaction @@ -320,6 +308,78 @@ impl PayloadStorage { }; } } + + true + } + + /// Transfer all values from one page to the other, and update the page tracker. + /// + /// If the values do not fit, returns `ControlFlow::Break` with the pending slot id and a size hint to create another page. + fn transfer_page_values( + &self, + old_page_id: PageId, + new_page_id: PageId, + from_slot_id: SlotId, + page_tracker: &mut PageTracker, + ) -> ControlFlow<(SlotId, Option), ()> { + let mut current_slot_id = from_slot_id; + + let old_page = self.pages.get(&old_page_id).unwrap().read(); + let mut new_page = self.pages.get(&new_page_id).unwrap().write(); + + for (slot, value) in old_page.iter_slot_values_starting_from(current_slot_id) { + match Self::move_value(slot, value, &mut new_page, new_page_id, page_tracker) { + ControlFlow::Break(size_hint) => { + return ControlFlow::Break((current_slot_id, size_hint)) + } + ControlFlow::Continue(()) => {} + } + + // prepare for next iteration + current_slot_id += 1; + } + + ControlFlow::Continue(()) + } + + /// Move a value from one page to another. + /// + /// If the value does not fit in the new one, returns `ControlFlow::Break` with a size hint to create another page. + fn move_value( + current_slot: &SlotHeader, + value: Option<&[u8]>, + new_page: &mut SlottedPageMmap, + new_page_id: PageId, + page_tracker: &mut PageTracker, + ) -> ControlFlow, ()> { + let point_offset = current_slot.point_offset(); + + if current_slot.deleted() { + // value was deleted, skip + return ControlFlow::Continue(()); + } + + let was_inserted = if let Some(value) = value { + new_page.insert_value(point_offset, value) + } else { + new_page.insert_placeholder_value(point_offset) + }; + + let Some(slot_id) = was_inserted else { + // new page is full, request to create a new one + let size_hint = value.map(|v| v.len()); + return ControlFlow::Break(size_hint); + }; + + let new_pointer = PagePointer { + page_id: new_page_id, + slot_id, + }; + + // update page tracker + page_tracker.set(point_offset, new_pointer); + + ControlFlow::Continue(()) } } @@ -747,14 +807,14 @@ mod tests { .map(|_| one_random_payload_please(rng, 10)) .collect::>(); - for i in 0..max_point_offset { - storage.put_payload(i as u32, large_payloads[i].clone()); + for (i, payload) in large_payloads.iter().enumerate() { + storage.put_payload(i as u32, payload.clone()); } // sanity check - for i in 0..max_point_offset { + for (i, payload) in large_payloads.iter().enumerate() { let stored_payload = storage.get_payload(i as u32); - assert_eq!(stored_payload.as_ref(), Some(&large_payloads[i])); + assert_eq!(stored_payload.as_ref(), Some(payload)); } // check no fragmentation @@ -766,14 +826,16 @@ mod tests { let small_payloads = (0..max_point_offset) .map(|_| one_random_payload_please(rng, 1)) .collect::>(); - for i in 0..max_point_offset { - storage.put_payload(i as u32, small_payloads[i].clone()); + + for (i, payload) in small_payloads.iter().enumerate() { + storage.put_payload(i as u32, payload.clone()); } // sanity check - for i in 0..max_point_offset { + // check consistency + for (i, payload) in small_payloads.iter().enumerate() { let stored_payload = storage.get_payload(i as u32); - assert_eq!(stored_payload.as_ref(), Some(&small_payloads[i])); + assert_eq!(stored_payload.as_ref(), Some(payload)); } // check fragmentation @@ -783,9 +845,9 @@ mod tests { storage.compact(); // check consistency - for i in 0..max_point_offset { + for (i, payload) in small_payloads.iter().enumerate() { let stored_payload = storage.get_payload(i as u32); - assert_eq!(stored_payload.as_ref(), Some(&small_payloads[i])); + assert_eq!(stored_payload.as_ref(), Some(payload)); } // check no outstanding fragmentation diff --git a/src/slotted_page.rs b/src/slotted_page.rs index 4711d24..73f6faa 100644 --- a/src/slotted_page.rs +++ b/src/slotted_page.rs @@ -111,6 +111,7 @@ impl SlottedPageMmap { } /// Return all values in the page with placeholder or deleted values as `None` + #[cfg(test)] pub fn all_values(&self) -> Vec> { self.iter_slot_values_starting_from(0) .map(|(_, value)| value) @@ -141,6 +142,7 @@ impl SlottedPageMmap { } /// Returns all non deleted values in the page. `None` values means that the slot is a placeholder + #[cfg(test)] fn non_deleted_values(&self) -> Vec<&[u8]> { let mut values = Vec::new(); for i in 0..self.header.slot_count { @@ -242,6 +244,7 @@ impl SlottedPageMmap { } /// Check if there is enough space for a new slot + min value + #[cfg(test)] fn has_capacity_for_min_value(&self) -> bool { self.free_space() .saturating_sub(SlotHeader::size_in_bytes() + SlottedPageMmap::MIN_VALUE_SIZE_BYTES) @@ -432,7 +435,7 @@ impl SlottedPageMmap { } /// Delete the page from the filesystem. - pub fn drop_page(self) { + pub fn delete_page(self) { drop(self.mmap); std::fs::remove_file(&self.path).unwrap(); } From c2c54d92707d23b1d62d7d8140bf95256041155c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Luis=20Coss=C3=ADo?= Date: Thu, 26 Sep 2024 16:57:32 -0300 Subject: [PATCH 8/9] fmt --- src/payload_storage.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/payload_storage.rs b/src/payload_storage.rs index ca35764..cf650b3 100644 --- a/src/payload_storage.rs +++ b/src/payload_storage.rs @@ -826,7 +826,7 @@ mod tests { let small_payloads = (0..max_point_offset) .map(|_| one_random_payload_please(rng, 1)) .collect::>(); - + for (i, payload) in small_payloads.iter().enumerate() { storage.put_payload(i as u32, payload.clone()); } From ced191838fd6dcca19a239260013b6df66f91be0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Luis=20Coss=C3=ADo?= Date: Thu, 26 Sep 2024 17:46:41 -0300 Subject: [PATCH 9/9] test fragmentation calculation --- src/slotted_page.rs | 49 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/src/slotted_page.rs b/src/slotted_page.rs index 73f6faa..86a7833 100644 --- a/src/slotted_page.rs +++ b/src/slotted_page.rs @@ -787,4 +787,53 @@ mod tests { // The caller must delete and create a new value assert!(!mmap.update_value(0, large_value.as_slice())); } + + #[test] + fn test_fragmentation_calculation() { + let file = Builder::new() + .prefix("test-pages") + .suffix(".data") + .tempfile() + .unwrap(); + let path = file.path(); + + let mut mmap = SlottedPageMmap::new(path, None); + + let big_value = [1; 200]; + for i in 0..500 { + mmap.insert_value(i, &big_value); + } + + let mut fragmented_space = mmap.fragmented_space(); + + assert_eq!(fragmented_space, 0); + + // delete some values + for i in 0..500 { + if i % 2 == 0 { + mmap.delete_value(i); + } + } + + fragmented_space = mmap.fragmented_space(); + + // 250 values are deleted, so 250 * 200 bytes are fragmented + assert_eq!(fragmented_space, 250 * 200); + + // update some values + let min_value = [1; SlottedPageMmap::MIN_VALUE_SIZE_BYTES]; + for i in 0..500 { + if i % 2 == 1 { + mmap.update_value(i, &min_value); + } + } + + fragmented_space = mmap.fragmented_space(); + + // 250 values are updated, so 250 * (200 - MIN_VALUE_SIZE_BYTES) bytes are fragmented. + // Plus the ones that were deleted before. + let expected_fragmentation = + 250 * (200 - SlottedPageMmap::MIN_VALUE_SIZE_BYTES) + 250 * 200; + assert_eq!(fragmented_space, expected_fragmentation); + } }