diff --git a/mini-lsm-starter/src/compact.rs b/mini-lsm-starter/src/compact.rs index e70d899..fbef621 100644 --- a/mini-lsm-starter/src/compact.rs +++ b/mini-lsm-starter/src/compact.rs @@ -4,11 +4,17 @@ mod leveled; mod simple_leveled; mod tiered; +use std::fs; +use std::os::macos::raw::stat; use std::sync::Arc; use std::time::Duration; +use crate::compact::CompactionTask::ForceFullCompaction; +use crate::iterators::merge_iterator::MergeIterator; +use crate::iterators::StorageIterator; use anyhow::Result; pub use leveled::{LeveledCompactionController, LeveledCompactionOptions, LeveledCompactionTask}; +use log::warn; use serde::{Deserialize, Serialize}; pub use simple_leveled::{ SimpleLeveledCompactionController, SimpleLeveledCompactionOptions, SimpleLeveledCompactionTask, @@ -16,7 +22,7 @@ pub use simple_leveled::{ pub use tiered::{TieredCompactionController, TieredCompactionOptions, TieredCompactionTask}; use crate::lsm_storage::{LsmStorageInner, LsmStorageState}; -use crate::table::SsTable; +use crate::table::{SsTable, SsTableBuilder, SsTableIterator}; #[derive(Debug, Serialize, Deserialize)] pub enum CompactionTask { @@ -107,12 +113,126 @@ pub enum CompactionOptions { } impl LsmStorageInner { - fn compact(&self, _task: &CompactionTask) -> Result>> { - unimplemented!() + fn compact(&self, task: &CompactionTask) -> Result>> { + match task { + CompactionTask::Leveled(_) => { + unimplemented!(); + } + CompactionTask::Tiered(_) => { + unimplemented!(); + } + CompactionTask::Simple(_) => { + unimplemented!(); + } + ForceFullCompaction { + l0_sstables, + l1_sstables, + } => { + let mut sstables_to_compact = vec![]; + sstables_to_compact.extend_from_slice(l0_sstables); + sstables_to_compact.extend_from_slice(l1_sstables); + + let snapshot = { + let guard = self.state.read(); + guard.clone() + }; + + let sstables = sstables_to_compact + .iter() + .map(|i| { + let sstable = snapshot.sstables[i].clone(); + let sstable_iter = SsTableIterator::create_and_seek_to_first(sstable)?; + Ok(Box::new(sstable_iter)) + }) + .collect::>>()?; + let mut merge_iterator = MergeIterator::create(sstables); + let mut sst_builder = SsTableBuilder::new(self.options.block_size); + + let mut sstable_to_add = vec![]; + while merge_iterator.is_valid() { + if !(task.compact_to_bottom_level() && merge_iterator.value().is_empty()) { + sst_builder.add(merge_iterator.key(), merge_iterator.value()); + if sst_builder.estimated_size() > self.options.target_sst_size { + // split a new sst file + let sst_id = self.next_sst_id(); + let sst_table = sst_builder.build( + sst_id, + Some(self.block_cache.clone()), + self.path_of_sst(sst_id), + )?; + sstable_to_add.push(Arc::new(sst_table)); + sst_builder = SsTableBuilder::new(self.options.block_size); + } + } + merge_iterator.next()?; + } + + if !sst_builder.is_empty() { + let sst_id = self.next_sst_id(); + let sst_table = sst_builder.build( + sst_id, + Some(self.block_cache.clone()), + self.path_of_sst(sst_id), + ); + sstable_to_add.push(Arc::new(sst_table.unwrap())); + } + + Ok(sstable_to_add) + } + } } pub fn force_full_compaction(&self) -> Result<()> { - unimplemented!() + let snapshot = { + let guard = self.state.read(); + guard.clone() + }; + let l0_sstables_to_compact = snapshot.l0_sstables.clone(); + assert_eq!(snapshot.levels[0].0, 1); + let l1_sstables_to_compact = snapshot.levels[0].1.clone(); + + let new_l1_levels = self.compact(&ForceFullCompaction { + l0_sstables: l0_sstables_to_compact.clone(), + l1_sstables: l1_sstables_to_compact.clone(), + })?; + + // update state of lsm + { + let _state_lock = self.state_lock.lock(); + let mut state = self.state.read().as_ref().clone(); + + // remove sstable + l0_sstables_to_compact + .iter() + .chain(l1_sstables_to_compact.iter()) + .for_each(|sst_id| { + state.sstables.remove(sst_id); + }); + + state + .l0_sstables + .retain(|sst_id| !l0_sstables_to_compact.contains(sst_id)); + + let mut level1 = Vec::with_capacity(new_l1_levels.len()); + for sst in new_l1_levels { + let sst_id = sst.sst_id(); + level1.push(sst_id); + let result = state.sstables.insert(sst_id, sst); + assert!(result.is_none()); + } + state.levels[0] = (1, level1); + *self.state.write() = Arc::new(state); + } + + // remove files + for sst_id in l0_sstables_to_compact + .iter() + .chain(l1_sstables_to_compact.iter()) + { + fs::remove_file(self.path_of_sst(*sst_id))?; + } + + return Ok(()); } fn trigger_compaction(&self) -> Result<()> { diff --git a/mini-lsm-starter/src/lsm_storage.rs b/mini-lsm-starter/src/lsm_storage.rs index 42ec980..5231e5f 100644 --- a/mini-lsm-starter/src/lsm_storage.rs +++ b/mini-lsm-starter/src/lsm_storage.rs @@ -453,7 +453,7 @@ impl LsmStorageInner { let sstable = builder.build( sst_id, Some(self.block_cache.clone()), - self.path.join(format!("{}.sst", sst_id)), + self.path_of_sst(sst_id), )?; { diff --git a/mini-lsm-starter/src/table/builder.rs b/mini-lsm-starter/src/table/builder.rs index 1e4f746..b5b58cf 100644 --- a/mini-lsm-starter/src/table/builder.rs +++ b/mini-lsm-starter/src/table/builder.rs @@ -23,6 +23,8 @@ pub struct SsTableBuilder { pub(crate) key_hashes: Vec, } +impl SsTableBuilder {} + impl SsTableBuilder { /// Create a builder based on target block size. pub fn new(block_size: usize) -> Self { @@ -128,6 +130,9 @@ impl SsTableBuilder { max_ts: 0, }) } + pub(crate) fn is_empty(&self) -> bool { + self.builder.is_empty() + } #[cfg(test)] pub(crate) fn build_for_test(self, path: impl AsRef) -> Result { diff --git a/mini-lsm-starter/src/tests.rs b/mini-lsm-starter/src/tests.rs index e5f9f90..738e5a9 100644 --- a/mini-lsm-starter/src/tests.rs +++ b/mini-lsm-starter/src/tests.rs @@ -1,12 +1,12 @@ //! DO NOT MODIFY -- Mini-LSM tests modules //! This file will be automatically rewritten by the copy-test command. -mod week1_day6; +mod harness; +mod week1_day1; mod week1_day2; mod week1_day3; -mod week1_day7; -mod harness; -mod week2_day1; mod week1_day4; mod week1_day5; -mod week1_day1; +mod week1_day6; +mod week1_day7; +mod week2_day1;