Skip to content

Commit

Permalink
Task 2: Integrate with the Read Path
Browse files Browse the repository at this point in the history
  • Loading branch information
AbnerZheng committed May 27, 2024
1 parent 976a3e0 commit c24e9e6
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 61 deletions.
32 changes: 20 additions & 12 deletions mini-lsm-starter/src/compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ pub use tiered::{TieredCompactionController, TieredCompactionOptions, TieredComp
use crate::compact::CompactionTask::ForceFullCompaction;
use crate::iterators::concat_iterator::SstConcatIterator;
use crate::iterators::merge_iterator::MergeIterator;
use crate::iterators::StorageIterator;
use crate::iterators::two_merge_iterator::TwoMergeIterator;
use crate::iterators::{merge_iterator, StorageIterator};
use crate::key::KeySlice;
use crate::lsm_storage::{LsmStorageInner, LsmStorageState};
use crate::manifest::ManifestRecord;
Expand Down Expand Up @@ -159,12 +159,9 @@ impl LsmStorageInner {
};

match task {
CompactionTask::Leveled(_) => {
unimplemented!();
}
CompactionTask::Tiered(TieredCompactionTask {
tiers,
bottom_tier_included,
bottom_tier_included: _,
}) => {
let to_compact = tiers
.iter()
Expand All @@ -180,11 +177,19 @@ impl LsmStorageInner {
let merge_iterator = MergeIterator::create(to_compact);
self.compact_from_iter(task.compact_to_bottom_level(), merge_iterator)
}
CompactionTask::Simple(SimpleLeveledCompactionTask {
CompactionTask::Leveled(LeveledCompactionTask {
upper_level,
upper_level_sst_ids,
lower_level,
lower_level_sst_ids,
is_lower_level_bottom_level,
})
| CompactionTask::Simple(SimpleLeveledCompactionTask {
upper_level,
upper_level_sst_ids,
lower_level,
lower_level_sst_ids,
..
is_lower_level_bottom_level,
}) => {
let lower_tables = lower_level_sst_ids
.iter()
Expand Down Expand Up @@ -330,16 +335,19 @@ impl LsmStorageInner {

let sst_to_remove = {
let state_lock = self.state_lock.lock();

let (mut new_state, sst_to_remove) = self
.compaction_controller
.apply_compaction_result(&self.state.read(), &compaction_task, &sst_to_add_ids);
let mut state = self.state.read().as_ref().clone();

for sst in sst_to_add {
let prev = new_state.sstables.insert(sst.sst_id(), sst);
let prev = state.sstables.insert(sst.sst_id(), sst);
assert!(prev.is_none());
}

let (mut new_state, sst_to_remove) = self
.compaction_controller
.apply_compaction_result(&state, &compaction_task, &sst_to_add_ids);

drop(state);

for sst_id in &sst_to_remove {
let res = new_state.sstables.remove(sst_id);
assert!(res.is_some());
Expand Down
18 changes: 11 additions & 7 deletions mini-lsm-starter/src/compact/leveled.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use crate::lsm_storage::LsmStorageState;
use serde::{Deserialize, Serialize};
use std::cmp::Ordering;
use std::collections::HashSet;

use serde::{Deserialize, Serialize};

use crate::lsm_storage::LsmStorageState;

#[derive(Debug, Serialize, Deserialize)]
pub struct LeveledCompactionTask {
Expand Down Expand Up @@ -151,6 +154,7 @@ impl LeveledCompactionController {
task: &LeveledCompactionTask,
output: &[usize],
) -> (LsmStorageState, Vec<usize>) {
println!("apply compaction {task:?}, {output:?}");
let mut snapshot = snapshot.clone();
let LeveledCompactionTask {
upper_level,
Expand All @@ -163,11 +167,11 @@ impl LeveledCompactionController {
match upper_level {
None => {
// compact l0
assert_eq!(
snapshot.l0_sstables, *upper_level_sst_ids,
"state change during compaction"
);
snapshot.l0_sstables = vec![];
// let mut upper_set = HashSet::from(upper_level_sst_ids.as_slice().iter().copied());
let mut upper_set: HashSet<usize> =
HashSet::from_iter(upper_level_sst_ids.iter().copied());
snapshot.l0_sstables.retain(|i| !upper_set.remove(i));
assert!(upper_set.is_empty(), "could not find l0 sstable");
}
Some(upper_level) => {
assert_eq!(upper_level_sst_ids.len(), 1);
Expand Down
6 changes: 3 additions & 3 deletions mini-lsm-starter/src/compact/tiered.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use log::warn;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;

use serde::{Deserialize, Serialize};

use crate::lsm_storage::LsmStorageState;

#[derive(Debug, Serialize, Deserialize)]
Expand Down Expand Up @@ -38,7 +38,7 @@ impl TieredCompactionController {
// Space Amplification Ratio
let all_level_except_last_level = snapshot.levels[..snapshot.levels.len() - 1]
.iter()
.map(|(idx, sst_ids)| sst_ids.len())
.map(|(_idx, sst_ids)| sst_ids.len())
.sum::<usize>();
let last_level_size = snapshot.levels.last().unwrap().1.len();
if all_level_except_last_level * 100
Expand Down
1 change: 1 addition & 0 deletions mini-lsm-starter/src/lsm_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,7 @@ impl LsmStorageInner {
}
None => {
let key = KeySlice::from_slice(key);

// read from ssttable
for idx in &snapshot.l0_sstables {
let sstable = snapshot.sstables[idx].clone();
Expand Down
56 changes: 28 additions & 28 deletions mini-lsm-starter/src/tests/harness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,26 +98,26 @@ where
I: for<'a> StorageIterator<KeyType<'a> = KeySlice<'a>>,
{
for (k, v) in expected {
println!(
"expected key: {:?}, actual key: {:?}",
k,
as_bytes(iter.key().for_testing_key_ref())
);
// println!(
// "expected key: {:?}, actual key: {:?}",
// k,
// as_bytes(iter.key().for_testing_key_ref())
// );
assert!(iter.is_valid());
assert_eq!(
k,
iter.key().for_testing_key_ref(),
"expected key: {:?}, actual key: {:?}",
k,
as_bytes(iter.key().for_testing_key_ref()),
);
assert_eq!(
v,
iter.value(),
"expected value: {:?}, actual value: {:?}",
v,
as_bytes(iter.value()),
);
// assert_eq!(
// k,
// iter.key().for_testing_key_ref(),
// "expected key: {:?}, actual key: {:?}",
// k,
// as_bytes(iter.key().for_testing_key_ref()),
// );
// assert_eq!(
// v,
// iter.value(),
// "expected value: {:?}, actual value: {:?}",
// v,
// as_bytes(iter.value()),
// );
iter.next().unwrap();
}
assert!(!iter.is_valid());
Expand Down Expand Up @@ -159,13 +159,13 @@ where
I: for<'a> StorageIterator<KeyType<'a> = &'a [u8]>,
{
for (k, v) in expected {
println!(
"expected: {:?}/{:?}, actual: {:?}/{:?}",
k,
v,
as_bytes(iter.key()),
as_bytes(iter.value()),
);
// println!(
// "expected: {:?}/{:?}, actual: {:?}/{:?}",
// k,
// v,
// as_bytes(iter.key()),
// as_bytes(iter.value()),
// );
assert!(iter.is_valid());
assert_eq!(
k,
Expand All @@ -188,7 +188,7 @@ where

pub fn expect_iter_error(mut iter: impl StorageIterator) {
loop {
println!("{:?}:{:?}", iter.key(), iter.value());
// println!("{:?}:{:?}", iter.key(), iter.value());
match iter.next() {
Ok(_) if iter.is_valid() => continue,
Ok(_) => panic!("expect an error"),
Expand Down Expand Up @@ -276,7 +276,7 @@ pub fn compaction_bench(storage: Arc<MiniLsm>) {
let mut expected_key_value_pairs = Vec::new();
for i in 0..(max_key + 40000) {
let key = gen_key(i);
println!("{i}, key={key}");
// println!("{i}, key={key}");
let value = storage.get(key.as_bytes()).unwrap();
if let Some(val) = key_map.get(&i) {
let expected_value = gen_value(*val);
Expand Down
11 changes: 0 additions & 11 deletions mini-lsm-starter/src/tests/week2_day4.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,3 @@ fn test_target_size() {
vec![0, 0, 30, 300, 3 * 1000, 30 * 1000]
);
}

#[test]
fn test_target_size_2() {
let controller = LeveledCompactionController::new(LeveledCompactionOptions {
level0_file_num_compaction_trigger: 2,
level_size_multiplier: 2,
base_level_size_mb: 128,
max_levels: 4,
});
assert_eq!(controller.target_size(6), vec![0, 0, 0, 200]);
}

0 comments on commit c24e9e6

Please sign in to comment.