From 76d71413245666fb3e0500aba1ea3e4922ba2108 Mon Sep 17 00:00:00 2001 From: Abner Zheng Date: Wed, 12 Jun 2024 02:53:30 +0800 Subject: [PATCH] finish w3/d7 --- mini-lsm-starter/src/compact.rs | 72 ++++++++++++++++++------ mini-lsm-starter/src/tests/week3_day7.rs | 11 +++- 2 files changed, 66 insertions(+), 17 deletions(-) diff --git a/mini-lsm-starter/src/compact.rs b/mini-lsm-starter/src/compact.rs index 4d77d90..6ce2786 100644 --- a/mini-lsm-starter/src/compact.rs +++ b/mini-lsm-starter/src/compact.rs @@ -17,7 +17,7 @@ use crate::iterators::merge_iterator::MergeIterator; use crate::iterators::two_merge_iterator::TwoMergeIterator; use crate::iterators::StorageIterator; use crate::key::KeySlice; -use crate::lsm_storage::{LsmStorageInner, LsmStorageState}; +use crate::lsm_storage::{CompactionFilter, LsmStorageInner, LsmStorageState}; use crate::manifest::ManifestRecord; use crate::mvcc::watermark; use crate::table::{SsTable, SsTableBuilder, SsTableIterator}; @@ -128,6 +128,13 @@ impl LsmStorageInner { let guard = self.mvcc().ts.lock(); guard.1.watermark().unwrap_or(u64::MAX) }; + let compaction_filter_guard = self.compaction_filters.lock(); + let mut prefixs = Vec::with_capacity(compaction_filter_guard.len()); + for f in compaction_filter_guard.iter() { + match f { + CompactionFilter::Prefix(p) => prefixs.push(p.to_vec()), + } + } let mut latest_version_leq_added = false; while iter.is_valid() { @@ -140,34 +147,58 @@ impl LsmStorageInner { match &prev_key { None => { - prev_key = Some(cur_key); latest_version_leq_added = cur_ts <= watermark; if !(latest_version_leq_added && compact_to_bottom_level && iter.value().is_empty()) { - sst_builder.add(iter.key(), iter.value()); + if latest_version_leq_added { + let mut removed = false; + for p in &prefixs { + if cur_key.starts_with(p) { + removed = true; + break; + } + } + if !removed { + sst_builder.add(iter.key(), iter.value()); + } + } else { + sst_builder.add(iter.key(), iter.value()); + } } + prev_key = Some(cur_key); } Some(key) if *key != cur_key => { - prev_key = Some(cur_key); latest_version_leq_added = cur_ts <= watermark; if !(cur_ts <= watermark && compact_to_bottom_level && iter.value().is_empty()) { - sst_builder.add(iter.key(), iter.value()); + let mut removed = false; + if latest_version_leq_added { + for p in &prefixs { + if cur_key.starts_with(p) { + removed = true; + break; + } + } + } + if !removed { + sst_builder.add(iter.key(), iter.value()); - if sst_builder.estimated_size() > self.options.target_sst_size { - // split a new sst file - let sst_id = self.next_sst_id(); - let sst_table = sst_builder.build( - sst_id, - Some(self.block_cache.clone()), - self.path_of_sst(sst_id), - )?; - sst_to_add.push(Arc::new(sst_table)); - sst_builder = SsTableBuilder::new(self.options.block_size); + if sst_builder.estimated_size() > self.options.target_sst_size { + // split a new sst file + let sst_id = self.next_sst_id(); + let sst_table = sst_builder.build( + sst_id, + Some(self.block_cache.clone()), + self.path_of_sst(sst_id), + )?; + sst_to_add.push(Arc::new(sst_table)); + sst_builder = SsTableBuilder::new(self.options.block_size); + } } } + prev_key = Some(cur_key); } Some(key) if *key == cur_key => { if cur_ts > watermark { @@ -177,7 +208,16 @@ impl LsmStorageInner { latest_version_leq_added = true; if !(compact_to_bottom_level && iter.value().is_empty()) { - sst_builder.add(iter.key(), iter.value()); + let mut removed = false; + for p in &prefixs { + if key.starts_with(p) { + removed = true; + break; + } + } + if !removed { + sst_builder.add(iter.key(), iter.value()); + } } } else { println!("remove version@{cur_ts} of {key:?}"); diff --git a/mini-lsm-starter/src/tests/week3_day7.rs b/mini-lsm-starter/src/tests/week3_day7.rs index bfbc05d..5c6c658 100644 --- a/mini-lsm-starter/src/tests/week3_day7.rs +++ b/mini-lsm-starter/src/tests/week3_day7.rs @@ -1,12 +1,13 @@ use bytes::Bytes; use tempfile::tempdir; +use crate::iterators::StorageIterator; use crate::{ compact::CompactionOptions, lsm_storage::{CompactionFilter, LsmStorageOptions, MiniLsm, WriteBatchRecord}, }; -use super::harness::{check_iter_result_by_key, construct_merge_iterator_over_storage}; +use super::harness::{as_bytes, check_iter_result_by_key, construct_merge_iterator_over_storage}; #[test] fn test_task3_mvcc_compaction() { @@ -60,6 +61,14 @@ fn test_task3_mvcc_compaction() { storage.force_full_compaction().unwrap(); let mut iter = construct_merge_iterator_over_storage(&storage.inner.state.read()); + // while iter.is_valid() { + // println!( + // "actual: {:?}/{:?}", + // as_bytes(iter.key().for_testing_key_ref()), + // as_bytes(iter.value()), + // ); + // iter.next().unwrap(); + // } check_iter_result_by_key( &mut iter, vec![