Skip to content

Commit

Permalink
finish w2/d1, Task 1: Compaction Implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
AbnerZheng committed May 21, 2024
1 parent d983185 commit 0e44192
Show file tree
Hide file tree
Showing 4 changed files with 135 additions and 10 deletions.
128 changes: 124 additions & 4 deletions mini-lsm-starter/src/compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,25 @@ mod leveled;
mod simple_leveled;
mod tiered;

use std::fs;
use std::os::macos::raw::stat;
use std::sync::Arc;
use std::time::Duration;

use crate::compact::CompactionTask::ForceFullCompaction;
use crate::iterators::merge_iterator::MergeIterator;
use crate::iterators::StorageIterator;
use anyhow::Result;
pub use leveled::{LeveledCompactionController, LeveledCompactionOptions, LeveledCompactionTask};
use log::warn;
use serde::{Deserialize, Serialize};
pub use simple_leveled::{
SimpleLeveledCompactionController, SimpleLeveledCompactionOptions, SimpleLeveledCompactionTask,
};
pub use tiered::{TieredCompactionController, TieredCompactionOptions, TieredCompactionTask};

use crate::lsm_storage::{LsmStorageInner, LsmStorageState};
use crate::table::SsTable;
use crate::table::{SsTable, SsTableBuilder, SsTableIterator};

#[derive(Debug, Serialize, Deserialize)]
pub enum CompactionTask {
Expand Down Expand Up @@ -107,12 +113,126 @@ pub enum CompactionOptions {
}

impl LsmStorageInner {
fn compact(&self, _task: &CompactionTask) -> Result<Vec<Arc<SsTable>>> {
unimplemented!()
fn compact(&self, task: &CompactionTask) -> Result<Vec<Arc<SsTable>>> {
match task {
CompactionTask::Leveled(_) => {
unimplemented!();
}
CompactionTask::Tiered(_) => {
unimplemented!();
}
CompactionTask::Simple(_) => {
unimplemented!();
}
ForceFullCompaction {
l0_sstables,
l1_sstables,
} => {
let mut sstables_to_compact = vec![];
sstables_to_compact.extend_from_slice(l0_sstables);
sstables_to_compact.extend_from_slice(l1_sstables);

let snapshot = {
let guard = self.state.read();
guard.clone()
};

let sstables = sstables_to_compact
.iter()
.map(|i| {
let sstable = snapshot.sstables[i].clone();
let sstable_iter = SsTableIterator::create_and_seek_to_first(sstable)?;
Ok(Box::new(sstable_iter))
})
.collect::<Result<Vec<_>>>()?;
let mut merge_iterator = MergeIterator::create(sstables);
let mut sst_builder = SsTableBuilder::new(self.options.block_size);

let mut sstable_to_add = vec![];
while merge_iterator.is_valid() {
if !(task.compact_to_bottom_level() && merge_iterator.value().is_empty()) {
sst_builder.add(merge_iterator.key(), merge_iterator.value());
if sst_builder.estimated_size() > self.options.target_sst_size {
// split a new sst file
let sst_id = self.next_sst_id();
let sst_table = sst_builder.build(
sst_id,
Some(self.block_cache.clone()),
self.path_of_sst(sst_id),
)?;
sstable_to_add.push(Arc::new(sst_table));
sst_builder = SsTableBuilder::new(self.options.block_size);
}
}
merge_iterator.next()?;
}

if !sst_builder.is_empty() {
let sst_id = self.next_sst_id();
let sst_table = sst_builder.build(
sst_id,
Some(self.block_cache.clone()),
self.path_of_sst(sst_id),
);
sstable_to_add.push(Arc::new(sst_table.unwrap()));
}

Ok(sstable_to_add)
}
}
}

pub fn force_full_compaction(&self) -> Result<()> {
unimplemented!()
let snapshot = {
let guard = self.state.read();
guard.clone()
};
let l0_sstables_to_compact = snapshot.l0_sstables.clone();
assert_eq!(snapshot.levels[0].0, 1);
let l1_sstables_to_compact = snapshot.levels[0].1.clone();

let new_l1_levels = self.compact(&ForceFullCompaction {
l0_sstables: l0_sstables_to_compact.clone(),
l1_sstables: l1_sstables_to_compact.clone(),
})?;

// update state of lsm
{
let _state_lock = self.state_lock.lock();
let mut state = self.state.read().as_ref().clone();

// remove sstable
l0_sstables_to_compact
.iter()
.chain(l1_sstables_to_compact.iter())
.for_each(|sst_id| {
state.sstables.remove(sst_id);
});

state
.l0_sstables
.retain(|sst_id| !l0_sstables_to_compact.contains(sst_id));

let mut level1 = Vec::with_capacity(new_l1_levels.len());
for sst in new_l1_levels {
let sst_id = sst.sst_id();
level1.push(sst_id);
let result = state.sstables.insert(sst_id, sst);
assert!(result.is_none());
}
state.levels[0] = (1, level1);
*self.state.write() = Arc::new(state);
}

// remove files
for sst_id in l0_sstables_to_compact
.iter()
.chain(l1_sstables_to_compact.iter())
{
fs::remove_file(self.path_of_sst(*sst_id))?;
}

return Ok(());
}

fn trigger_compaction(&self) -> Result<()> {
Expand Down
2 changes: 1 addition & 1 deletion mini-lsm-starter/src/lsm_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,7 @@ impl LsmStorageInner {
let sstable = builder.build(
sst_id,
Some(self.block_cache.clone()),
self.path.join(format!("{}.sst", sst_id)),
self.path_of_sst(sst_id),
)?;

{
Expand Down
5 changes: 5 additions & 0 deletions mini-lsm-starter/src/table/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ pub struct SsTableBuilder {
pub(crate) key_hashes: Vec<u32>,
}

impl SsTableBuilder {}

impl SsTableBuilder {
/// Create a builder based on target block size.
pub fn new(block_size: usize) -> Self {
Expand Down Expand Up @@ -128,6 +130,9 @@ impl SsTableBuilder {
max_ts: 0,
})
}
pub(crate) fn is_empty(&self) -> bool {
self.builder.is_empty()
}

#[cfg(test)]
pub(crate) fn build_for_test(self, path: impl AsRef<Path>) -> Result<SsTable> {
Expand Down
10 changes: 5 additions & 5 deletions mini-lsm-starter/src/tests.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
//! DO NOT MODIFY -- Mini-LSM tests modules
//! This file will be automatically rewritten by the copy-test command.
mod week1_day6;
mod harness;
mod week1_day1;
mod week1_day2;
mod week1_day3;
mod week1_day7;
mod harness;
mod week2_day1;
mod week1_day4;
mod week1_day5;
mod week1_day1;
mod week1_day6;
mod week1_day7;
mod week2_day1;

0 comments on commit 0e44192

Please sign in to comment.