Skip to content

Commit

Permalink
finish w3/d7
Browse files Browse the repository at this point in the history
  • Loading branch information
AbnerZheng committed Jun 11, 2024
1 parent 727b26b commit 76d7141
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 17 deletions.
72 changes: 56 additions & 16 deletions mini-lsm-starter/src/compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use crate::iterators::merge_iterator::MergeIterator;
use crate::iterators::two_merge_iterator::TwoMergeIterator;
use crate::iterators::StorageIterator;
use crate::key::KeySlice;
use crate::lsm_storage::{LsmStorageInner, LsmStorageState};
use crate::lsm_storage::{CompactionFilter, LsmStorageInner, LsmStorageState};
use crate::manifest::ManifestRecord;
use crate::mvcc::watermark;
use crate::table::{SsTable, SsTableBuilder, SsTableIterator};
Expand Down Expand Up @@ -128,6 +128,13 @@ impl LsmStorageInner {
let guard = self.mvcc().ts.lock();
guard.1.watermark().unwrap_or(u64::MAX)
};
let compaction_filter_guard = self.compaction_filters.lock();
let mut prefixs = Vec::with_capacity(compaction_filter_guard.len());
for f in compaction_filter_guard.iter() {
match f {
CompactionFilter::Prefix(p) => prefixs.push(p.to_vec()),
}
}

let mut latest_version_leq_added = false;
while iter.is_valid() {
Expand All @@ -140,34 +147,58 @@ impl LsmStorageInner {

match &prev_key {
None => {
prev_key = Some(cur_key);
latest_version_leq_added = cur_ts <= watermark;
if !(latest_version_leq_added
&& compact_to_bottom_level
&& iter.value().is_empty())
{
sst_builder.add(iter.key(), iter.value());
if latest_version_leq_added {
let mut removed = false;
for p in &prefixs {
if cur_key.starts_with(p) {
removed = true;
break;
}
}
if !removed {
sst_builder.add(iter.key(), iter.value());
}
} else {
sst_builder.add(iter.key(), iter.value());
}
}
prev_key = Some(cur_key);
}
Some(key) if *key != cur_key => {
prev_key = Some(cur_key);
latest_version_leq_added = cur_ts <= watermark;
if !(cur_ts <= watermark && compact_to_bottom_level && iter.value().is_empty())
{
sst_builder.add(iter.key(), iter.value());
let mut removed = false;
if latest_version_leq_added {
for p in &prefixs {
if cur_key.starts_with(p) {
removed = true;
break;
}
}
}
if !removed {
sst_builder.add(iter.key(), iter.value());

if sst_builder.estimated_size() > self.options.target_sst_size {
// split a new sst file
let sst_id = self.next_sst_id();
let sst_table = sst_builder.build(
sst_id,
Some(self.block_cache.clone()),
self.path_of_sst(sst_id),
)?;
sst_to_add.push(Arc::new(sst_table));
sst_builder = SsTableBuilder::new(self.options.block_size);
if sst_builder.estimated_size() > self.options.target_sst_size {
// split a new sst file
let sst_id = self.next_sst_id();
let sst_table = sst_builder.build(
sst_id,
Some(self.block_cache.clone()),
self.path_of_sst(sst_id),
)?;
sst_to_add.push(Arc::new(sst_table));
sst_builder = SsTableBuilder::new(self.options.block_size);
}
}
}
prev_key = Some(cur_key);
}
Some(key) if *key == cur_key => {
if cur_ts > watermark {
Expand All @@ -177,7 +208,16 @@ impl LsmStorageInner {
latest_version_leq_added = true;

if !(compact_to_bottom_level && iter.value().is_empty()) {
sst_builder.add(iter.key(), iter.value());
let mut removed = false;
for p in &prefixs {
if key.starts_with(p) {
removed = true;
break;
}
}
if !removed {
sst_builder.add(iter.key(), iter.value());
}
}
} else {
println!("remove version@{cur_ts} of {key:?}");
Expand Down
11 changes: 10 additions & 1 deletion mini-lsm-starter/src/tests/week3_day7.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
use bytes::Bytes;
use tempfile::tempdir;

use crate::iterators::StorageIterator;
use crate::{
compact::CompactionOptions,
lsm_storage::{CompactionFilter, LsmStorageOptions, MiniLsm, WriteBatchRecord},
};

use super::harness::{check_iter_result_by_key, construct_merge_iterator_over_storage};
use super::harness::{as_bytes, check_iter_result_by_key, construct_merge_iterator_over_storage};

#[test]
fn test_task3_mvcc_compaction() {
Expand Down Expand Up @@ -60,6 +61,14 @@ fn test_task3_mvcc_compaction() {
storage.force_full_compaction().unwrap();

let mut iter = construct_merge_iterator_over_storage(&storage.inner.state.read());
// while iter.is_valid() {
// println!(
// "actual: {:?}/{:?}",
// as_bytes(iter.key().for_testing_key_ref()),
// as_bytes(iter.value()),
// );
// iter.next().unwrap();
// }
check_iter_result_by_key(
&mut iter,
vec![
Expand Down

0 comments on commit 76d7141

Please sign in to comment.