Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Compaction for fragmented pages #3

Merged
merged 9 commits into from
Sep 27, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
233 changes: 210 additions & 23 deletions src/payload_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@ use crate::payload::Payload;
use crate::slotted_page::{SlotHeader, SlottedPageMmap};
use lz4_flex::compress_prepend_size;
use parking_lot::RwLock;
use std::cmp::Reverse;
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;

pub struct PayloadStorage {
page_tracker: PageTracker,
page_tracker: RwLock<PageTracker>,
pages: HashMap<u32, Arc<RwLock<SlottedPageMmap>>>, // page_id -> mmap page
max_page_id: u32,
base_path: PathBuf,
Expand All @@ -27,7 +28,7 @@ impl PayloadStorage {

pub fn new(path: PathBuf) -> Self {
Self {
page_tracker: PageTracker::new(&path, None),
page_tracker: RwLock::new(PageTracker::new(&path, None)),
pages: HashMap::new(),
max_page_id: 0,
base_path: path,
Expand All @@ -52,15 +53,15 @@ impl PayloadStorage {
}
}
Some(Self {
page_tracker,
page_tracker: RwLock::new(page_tracker),
pages,
max_page_id,
base_path: path,
})
}

pub fn is_empty(&self) -> bool {
self.pages.is_empty() && self.page_tracker.is_empty()
self.pages.is_empty() && self.page_tracker.read().is_empty()
}

/// Get the path for a given page id
Expand Down Expand Up @@ -89,9 +90,9 @@ impl PayloadStorage {
/// Get the payload for a given point offset
pub fn get_payload(&self, point_offset: PointOffset) -> Option<Payload> {
let PagePointer { page_id, slot_id } = self.get_pointer(point_offset)?;
let page = self.pages.get(page_id).expect("page not found");
let page = self.pages.get(&page_id).expect("page not found");
let page_guard = page.read();
let raw = page_guard.get_value(slot_id)?;
let raw = page_guard.get_value(&slot_id)?;
let decompressed = Self::decompress(raw);
let payload = Payload::from_bytes(&decompressed);
Some(payload)
Expand Down Expand Up @@ -139,8 +140,8 @@ impl PayloadStorage {
}

/// Get the mapping for a given point offset
fn get_pointer(&self, point_offset: PointOffset) -> Option<&PagePointer> {
self.page_tracker.get(point_offset)
fn get_pointer(&self, point_offset: PointOffset) -> Option<PagePointer> {
self.page_tracker.read().get(point_offset).copied()
}

/// Put a payload in the storage
Expand All @@ -149,7 +150,7 @@ impl PayloadStorage {
let comp_payload = Self::compress(&payload_bytes);
let payload_size = comp_payload.len();

if let Some(PagePointer { page_id, slot_id }) = self.get_pointer(point_offset).copied() {
if let Some(PagePointer { page_id, slot_id }) = self.get_pointer(point_offset) {
let page = self.pages.get_mut(&page_id).unwrap();
let mut page_guard = page.write();
let updated = page_guard.update_value(slot_id, &comp_payload);
Expand All @@ -166,9 +167,10 @@ impl PayloadStorage {
self.create_new_page(Some(payload_size))
});
let mut page = self.pages.get_mut(&new_page_id).unwrap().write();
let new_slot_id = page.insert_value(&comp_payload).unwrap();
let new_slot_id = page.insert_value(point_offset, &comp_payload).unwrap();
// update page_tracker
self.page_tracker
.write()
.set(point_offset, PagePointer::new(new_page_id, new_slot_id));
}
} else {
Expand All @@ -181,24 +183,144 @@ impl PayloadStorage {
});

let page = self.pages.get_mut(&page_id).unwrap();
let slot_id = page.write().insert_value(&comp_payload).unwrap();

let slot_id = page
.write()
.insert_value(point_offset, &comp_payload)
.unwrap();

// update page_tracker
self.page_tracker
.write()
.set(point_offset, PagePointer::new(page_id, slot_id));
}
}

/// Delete a payload from the storage
/// Returns None if the point_offset, page, or payload was not found
pub fn delete_payload(&mut self, point_offset: PointOffset) -> Option<()> {
let PagePointer { page_id, slot_id } = self.get_pointer(point_offset).copied()?;
let PagePointer { page_id, slot_id } = self.get_pointer(point_offset)?;
let page = self.pages.get_mut(&page_id)?;
// delete value from page
page.write().delete_value(slot_id);
// delete mapping
self.page_tracker.unset(point_offset);
self.page_tracker.write().unset(point_offset);
Some(())
}

/// Page ids with amount of fragmentation, ordered by most to least fragmentation
fn pages_to_defrag(&self) -> Vec<(u32, usize)> {
let mut fragmentation = self
.pages
.iter()
.filter_map(|(page_id, page)| {
let page = page.read();
let frag_space = page.fragmented_space();

// check if we should defrag this page
let frag_threshold =
SlottedPageMmap::FRAGMENTATION_THRESHOLD_RATIO * page.page_size() as f32;
if frag_space < frag_threshold.ceil() as usize {
// page is not fragmented enough, skip
return None;
}
Some((*page_id, frag_space))
})
.collect::<Vec<_>>();

// sort by most to least fragmented
fragmentation.sort_unstable_by_key(|(_, fragmented_space)| Reverse(*fragmented_space));

fragmentation
}

pub fn compact(&mut self) {
// find out which pages should be compacted
let pages_to_defrag = self.pages_to_defrag();

if pages_to_defrag.is_empty() {
return;
}

let mut pages_to_defrag = pages_to_defrag.into_iter();
let mut old_page_id = pages_to_defrag.next().unwrap().0;
let mut last_slot_id = 0;

// TODO: account for the fact that the first value could be larger than 32MB and the newly created page will
// immediately not be used? we don't want to create empty pages. But it is a quite rare case, so maybe we can just ignore it
let mut size_hint = None;

// This is a loop because we might need to create more pages if the current new page is full
'new_page: loop {
// create new page
let new_page_id = self.create_new_page(size_hint);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should create this page as a tempfile and then move it at the end of compaction.
This way there is no left over if compaction is interrupted.


// go over each page to defrag
loop {
// lock the tracker at this point, to prevent other processes from using the old pointer
let mut page_tracker = self.page_tracker.write();

let mut new_page = self.pages.get(&new_page_id).unwrap().write();
let old_page = self.pages.get(&old_page_id).unwrap().read();

'slots: for (slot, value) in old_page.iter_slot_values_starting_from(last_slot_id) {
let point_offset = slot.point_offset();

if slot.deleted() {
continue 'slots;
}

let was_inserted = if let Some(value) = value {
new_page.insert_value(point_offset, value)
} else {
new_page.insert_placeholder_value(point_offset)
};

if was_inserted.is_none() {
// new page is full, create a new one
size_hint = value.map(|v| v.len());
continue 'new_page;
}

let slot_id = was_inserted.expect("a value should always fit at this point");

let new_pointer = PagePointer {
page_id: new_page_id,
slot_id,
};

// update page tracker
page_tracker.set(point_offset, new_pointer);

// prepare for next iteration
last_slot_id += 1;
}
// drop read and write guards
drop(old_page);
drop(new_page);

// delete old page
let page_to_remove = self.pages.remove(&old_page_id).unwrap();
last_slot_id = 0;

// All points in this page have been updated to the new page in the page tracker,
// so there should not be any outstanding references to this page.
// TODO: audit this part
Arc::into_inner(page_to_remove)
.unwrap()
.into_inner()
.drop_page();

match pages_to_defrag.next() {
Some((page_id, _defrag_space)) => {
old_page_id = page_id;
}
// No more pages to defrag, end compaction
None => break 'new_page,
};
}
}
}
}

#[cfg(test)]
Expand All @@ -224,7 +346,7 @@ mod tests {
let payload = Payload::default();
storage.put_payload(0, payload);
assert_eq!(storage.pages.len(), 1);
assert_eq!(storage.page_tracker.raw_mapping_len(), 1);
assert_eq!(storage.page_tracker.read().raw_mapping_len(), 1);

let stored_payload = storage.get_payload(0);
assert!(stored_payload.is_some());
Expand All @@ -242,7 +364,7 @@ mod tests {

storage.put_payload(0, payload.clone());
assert_eq!(storage.pages.len(), 1);
assert_eq!(storage.page_tracker.raw_mapping_len(), 1);
assert_eq!(storage.page_tracker.read().raw_mapping_len(), 1);

let page_mapping = storage.get_pointer(0).unwrap();
assert_eq!(page_mapping.page_id, 1); // first page
Expand Down Expand Up @@ -320,7 +442,7 @@ mod tests {

storage.put_payload(0, payload.clone());
assert_eq!(storage.pages.len(), 1);
assert_eq!(storage.page_tracker.raw_mapping_len(), 1);
assert_eq!(storage.page_tracker.read().raw_mapping_len(), 1);

let page_mapping = storage.get_pointer(0).unwrap();
assert_eq!(page_mapping.page_id, 1); // first page
Expand All @@ -338,7 +460,7 @@ mod tests {

storage.put_payload(0, updated_payload.clone());
assert_eq!(storage.pages.len(), 1);
assert_eq!(storage.page_tracker.raw_mapping_len(), 1);
assert_eq!(storage.page_tracker.read().raw_mapping_len(), 1);

let stored_payload = storage.get_payload(0);
assert!(stored_payload.is_some());
Expand Down Expand Up @@ -402,7 +524,10 @@ mod tests {
}

// asset same length
assert_eq!(storage.page_tracker.mapping_len(), model_hashmap.len());
assert_eq!(
storage.page_tracker.read().mapping_len(),
model_hashmap.len()
);

// validate storage and model_hashmap are the same
for point_offset in 0..=max_point_offset {
Expand All @@ -418,7 +543,10 @@ mod tests {
let storage = PayloadStorage::open(dir.path().to_path_buf()).unwrap();

// asset same length
assert_eq!(storage.page_tracker.mapping_len(), model_hashmap.len());
assert_eq!(
storage.page_tracker.read().mapping_len(),
model_hashmap.len()
);

// validate storage and model_hashmap are the same
for point_offset in 0..=max_point_offset {
Expand Down Expand Up @@ -581,16 +709,19 @@ mod tests {
// load data into storage
let point_offset = write_data(&mut storage, 0);
assert_eq!(point_offset, EXPECTED_LEN as u32);
assert_eq!(storage.page_tracker.mapping_len(), EXPECTED_LEN);
assert_eq!(storage.page_tracker.raw_mapping_len(), EXPECTED_LEN);
assert_eq!(storage.page_tracker.read().mapping_len(), EXPECTED_LEN);
assert_eq!(storage.page_tracker.read().raw_mapping_len(), EXPECTED_LEN);
assert_eq!(storage.pages.len(), 2);

// write the same payload a second time
let point_offset = write_data(&mut storage, point_offset);
assert_eq!(point_offset, EXPECTED_LEN as u32 * 2);
assert_eq!(storage.pages.len(), 3);
assert_eq!(storage.page_tracker.mapping_len(), EXPECTED_LEN * 2);
assert_eq!(storage.page_tracker.raw_mapping_len(), EXPECTED_LEN * 2);
assert_eq!(storage.page_tracker.read().mapping_len(), EXPECTED_LEN * 2);
assert_eq!(
storage.page_tracker.read().raw_mapping_len(),
EXPECTED_LEN * 2
);

// assert storage is consistent
storage_double_pass_is_consistent(&storage);
Expand All @@ -604,4 +735,60 @@ mod tests {
// assert storage is consistent after reopening
storage_double_pass_is_consistent(&storage);
}

#[test]
fn test_compaction() {
let (_dir, mut storage) = empty_storage();

let rng = &mut rand::thread_rng();
let max_point_offset = 20000;

let large_payloads = (0..max_point_offset)
.map(|_| one_random_payload_please(rng, 10))
.collect::<Vec<_>>();

for i in 0..max_point_offset {
storage.put_payload(i as u32, large_payloads[i].clone());
}

// sanity check
for i in 0..max_point_offset {
let stored_payload = storage.get_payload(i as u32);
assert_eq!(stored_payload.as_ref(), Some(&large_payloads[i]));
}

// check no fragmentation
for page in storage.pages.values() {
assert_eq!(page.read().fragmented_space(), 0);
}

// update with smaller values
let small_payloads = (0..max_point_offset)
.map(|_| one_random_payload_please(rng, 1))
.collect::<Vec<_>>();
for i in 0..max_point_offset {
storage.put_payload(i as u32, small_payloads[i].clone());
}

// sanity check
for i in 0..max_point_offset {
let stored_payload = storage.get_payload(i as u32);
assert_eq!(stored_payload.as_ref(), Some(&small_payloads[i]));
}

// check fragmentation
assert!(!storage.pages_to_defrag().is_empty());

// compaction
storage.compact();

// check consistency
for i in 0..max_point_offset {
let stored_payload = storage.get_payload(i as u32);
assert_eq!(stored_payload.as_ref(), Some(&small_payloads[i]));
}

// check no outstanding fragmentation
assert!(storage.pages_to_defrag().is_empty());
}
}
Loading