Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
AbnerZheng committed May 28, 2024
1 parent a48d2bf commit 9446d9f
Show file tree
Hide file tree
Showing 7 changed files with 75 additions and 41 deletions.
10 changes: 9 additions & 1 deletion mini-lsm-starter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,17 +77,25 @@ Starter code for Mini-LSM.
### day 4
#### Test Your Understanding
* What is the time complexity of seeking a key in the SST?
-
- it takes O(1) to seek utilizing bloom filter, and if the SST may contains the key, we find the table by using binary search over BlockMeta. And as discussed in above, seeing a key in the block takes O(n) time complexity on average.
* Where does the cursor stop when you seek a non-existent key in your implementation?
- Stop at the first key that is `>=` the provided key.
* Is it possible (or necessary) to do in-place updates of SST files?
- It is unnecessary to do in-place updates of SST table, because LSM utilize append operation.
* An SST is usually large (i.e., 256MB). In this case, the cost of copying/expanding the Vec would be significant. Does your implementation allocate enough space for your SST builder in advance? How did you implement it?
-
* Looking at the moka block cache, why does it return Arc<Error> instead of the original Error? Does the usage of a block cache guarantee that there will be at most a fixed number of blocks in memory? For example, if you have a moka block cache of 4GB and block size of 4KB, will there be more than 4GB/4KB number of blocks in memory at the same time?
* Is it possible to store columnar data (i.e., a table of 100 integer columns) in an LSM engine? Is the current SST format still a good choice?
- It is possible to support columnar data in current SST format, but it is not a good choice, because each update to a single columns will have to write a record of whole 100 integer columns, the write/read/space amplification is very large.
* Consider the case that the LSM engine is built on object store services (i.e., S3). How would you optimize/change the SST format/parameters and the block cache to make it suitable for such services?

### day 5
#### Test Your Understanding
* Consider the case that a user has an iterator that iterates the whole storage engine, and the storage engine is 1TB large, so that it takes ~1 hour to scan all the data. What would be the problems if the user does so? (This is a good question and we will ask it several times at different points of the tutorial...)
-
* Another popular interface provided by some LSM-tree storage engines is multi-get (or vectored get). The user can pass a list of keys that they want to retrieve. The interface returns the value of each of the key. For example, multi_get(vec!["a", "b", "c", "d"]) -> a=1,b=2,c=3,d=4. Obviously, an easy implementation is to simply doing a single get for each of the key. How will you implement the multi-get interface, and what optimizations you can do to make it more efficient? (Hint: some operations during the get process will only need to be done once for all keys, and besides that, you can think of an improved disk I/O interface to better support this multi-get interface).
- When searching on unsorted run level, like memtables and l0, we have to search for each key
- When searching on sorted run level, we can sort the key of multi-get interface firstly, and iterator only one run of the level for all these keys, because they are both sorted.

### day 6
#### Test Your Understanding
Expand Down
14 changes: 12 additions & 2 deletions mini-lsm-starter/src/compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,9 @@ impl LsmStorageInner {
return Ok(());
};

self.dump_structure();
println!("running compaction task: {:?}", compaction_task);

let sst_to_add = self.compact(&compaction_task)?;
let sst_to_add_ids = sst_to_add
.iter()
Expand Down Expand Up @@ -356,16 +359,23 @@ impl LsmStorageInner {
if let Some(manifest) = &self.manifest {
manifest.add_record(
&state_lock,
ManifestRecord::Compaction(compaction_task, sst_to_add_ids),
ManifestRecord::Compaction(compaction_task, sst_to_add_ids.clone()),
)?;
}
*self.state.write() = Arc::new(new_state);
println!(
"compaction finished: {} files removed, {} files added, output={:?}",
sst_to_remove.len(),
sst_to_add_ids.len(),
sst_to_add_ids
);
sst_to_remove
};

for sst_id in &sst_to_remove {
fs::remove_file(self.path_of_sst(*sst_id))?
}
self.sync_dir()?;
Ok(())
}

Expand Down Expand Up @@ -399,7 +409,7 @@ impl LsmStorageInner {
let snapshot = self.state.read();
snapshot.imm_memtables.len()
};
if imm_memtable_len + 1 >= self.options.num_memtable_limit {
if imm_memtable_len >= self.options.num_memtable_limit {
self.force_flush_next_imm_memtable()
} else {
Ok(())
Expand Down
4 changes: 2 additions & 2 deletions mini-lsm-starter/src/compact/simple_leveled.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ impl SimpleLeveledCompactionController {
< self.options.size_ratio_percent * snapshot.l0_sstables.len()
{
println!(
"compaction triggered at level 0 and 1 with size ratio {}",
"compaction triggered at level l0 and 0 with size ratio {}",
snapshot.levels[0].1.len() / snapshot.l0_sstables.len()
);
return Some(SimpleLeveledCompactionTask {
Expand Down Expand Up @@ -73,7 +73,7 @@ impl SimpleLeveledCompactionController {
upper_level_sst_ids: upper_level_sst.clone(),
lower_level,
lower_level_sst_ids: lower_level_sst.clone(),
is_lower_level_bottom_level: lower_level == self.options.max_levels,
is_lower_level_bottom_level: lower_level == self.options.max_levels - 1,
});
}
}
Expand Down
17 changes: 13 additions & 4 deletions mini-lsm-starter/src/iterators/concat_iterator.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
#![allow(unused_variables)] // TODO(you): remove this lint after implementing this mod
#![allow(dead_code)] // TODO(you): remove this lint after implementing this mod

use std::sync::Arc;

use anyhow::Result;
Expand All @@ -20,7 +17,19 @@ pub struct SstConcatIterator {
}

impl SstConcatIterator {
fn check_sst_valid(sstables: &[Arc<SsTable>]) {
for sst in sstables {
assert!(sst.first_key() <= sst.last_key());
}
if !sstables.is_empty() {
for i in 0..(sstables.len() - 1) {
assert!(sstables[i].last_key() < sstables[i + 1].first_key());
}
}
}

pub fn create_and_seek_to_first(sstables: Vec<Arc<SsTable>>) -> Result<Self> {
Self::check_sst_valid(&sstables);
let current = match sstables.get(0) {
None => None,
Some(sst) => Some(SsTableIterator::create_and_seek_to_first(sst.clone())?),
Expand All @@ -35,7 +44,7 @@ impl SstConcatIterator {
}

pub fn create_and_seek_to_key(sstables: Vec<Arc<SsTable>>, key: KeySlice) -> Result<Self> {
let sst_idx = 0;
Self::check_sst_valid(&sstables);
for (idx, sst) in sstables.iter().enumerate() {
if sst.last_key().as_key_slice() >= key {
let mut iter = Self {
Expand Down
25 changes: 19 additions & 6 deletions mini-lsm-starter/src/lsm_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,7 @@ impl LsmStorageInner {
if let Some(manifest) = &self.manifest {
manifest.add_record(state_lock_observer, ManifestRecord::NewMemtable(sst_id))?;
}
self.sync_dir()?;

Ok(())
}
Expand Down Expand Up @@ -617,10 +618,6 @@ impl LsmStorageInner {
self.path_of_sst(sst_id),
)?;

if let Some(manifest) = &self.manifest {
manifest.add_record(&_state_lock, ManifestRecord::Flush(sst_id))?;
}

{
let mut guard = self.state.write();
let mut snapshot = guard.as_ref().clone();
Expand All @@ -634,11 +631,23 @@ impl LsmStorageInner {
} else {
snapshot.levels.insert(0, (sst_id, vec![sst_id]));
}

println!("flushed {}.sst with size={}", sst_id, sstable.table_size());
snapshot.sstables.insert(sst_id, Arc::new(sstable));

*guard = Arc::new(snapshot);
}

if self.options.enable_wal {
fs::remove_file(self.path_of_wal(sst_id))?;
}

if let Some(manifest) = &self.manifest {
manifest.add_record(&_state_lock, ManifestRecord::Flush(sst_id))?;
}

self.sync_dir()?;

Ok(())
}

Expand Down Expand Up @@ -684,7 +693,9 @@ impl LsmStorageInner {
sstable,
KeySlice::from_slice(key),
)?;
iter.next()?;
if iter.is_valid() && iter.key().raw_ref() == key {
iter.next()?;
}
iter
}

Expand Down Expand Up @@ -714,7 +725,9 @@ impl LsmStorageInner {
levels_sst,
KeySlice::from_slice(key),
)?;
iter.next()?;
if iter.is_valid() && iter.key().raw_ref() == key {
iter.next()?;
}
iter
}
Bound::Unbounded => SstConcatIterator::create_and_seek_to_first(levels_sst)?,
Expand Down
45 changes: 20 additions & 25 deletions mini-lsm-starter/src/table/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ pub struct SsTableBuilder {
pub(crate) key_hashes: Vec<u32>,
}

impl SsTableBuilder {}

impl SsTableBuilder {
/// Create a builder based on target block size.
pub fn new(block_size: usize) -> Self {
Expand All @@ -40,37 +38,34 @@ impl SsTableBuilder {
}

/// Adds a key-value pair to SSTable.
///
/// Note: You should split a new block when the current block is full.(`std::mem::replace` may
/// be helpful here)
pub fn add(&mut self, key: KeySlice, value: &[u8]) {
if self.first_key.is_empty() {
self.first_key.set_from_slice(key);
}

self.key_hashes.push(fingerprint32(key.raw_ref()));
let added = self.builder.add(key, value);
if !added {
// split a new block
let mut new_builder = BlockBuilder::new(self.block_size);
let added = new_builder.add(key, value);
assert!(added, "Not able to add key pair to a new created block");

let old_builder = mem::replace(&mut self.builder, new_builder);
let block = old_builder.build().encode();
let old_first_key = mem::replace(&mut self.first_key, key.to_key_vec());
let old_last_key = mem::replace(&mut self.last_key, key.to_key_vec());

self.meta.push(BlockMeta {
offset: self.data.len(),
first_key: old_first_key.into_key_bytes(),
last_key: old_last_key.into_key_bytes(),
});

self.data.extend_from_slice(&block);
} else {
if self.builder.add(key, value) {
self.last_key.set_from_slice(key);
return;
}

// split a new block
let mut new_builder = BlockBuilder::new(self.block_size);
let added = new_builder.add(key, value);
assert!(added, "Not able to add key pair to a new created block");

let old_builder = mem::replace(&mut self.builder, new_builder);
let block = old_builder.build().encode();
let old_first_key = mem::replace(&mut self.first_key, key.to_key_vec());
let old_last_key = mem::replace(&mut self.last_key, key.to_key_vec());

self.meta.push(BlockMeta {
offset: self.data.len(),
first_key: old_first_key.into_key_bytes(),
last_key: old_last_key.into_key_bytes(),
});

self.data.extend_from_slice(&block);
}

/// Get the estimated size of the SSTable.
Expand Down
1 change: 0 additions & 1 deletion mini-lsm-starter/src/tests/harness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,6 @@ pub fn compaction_bench(storage: Arc<MiniLsm>) {
let mut expected_key_value_pairs = Vec::new();
for i in 0..(max_key + 40000) {
let key = gen_key(i);
// println!("{i}, key={key}");
let value = storage.get(key.as_bytes()).unwrap();
if let Some(val) = key_map.get(&i) {
let expected_value = gen_value(*val);
Expand Down

0 comments on commit 9446d9f

Please sign in to comment.