Skip to content

Commit

Permalink
feat: add counter of running compactions #117
Browse files Browse the repository at this point in the history
  • Loading branch information
marvin-j97 committed Jan 6, 2025
1 parent ca18cb0 commit f54a94b
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 2 deletions.
9 changes: 8 additions & 1 deletion src/compaction/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,14 @@
use super::manager::CompactionManager;
use crate::snapshot_tracker::SnapshotTracker;
use lsm_tree::AbstractTree;
use std::sync::atomic::AtomicUsize;

/// Runs a single run of compaction.
pub fn run(compaction_manager: &CompactionManager, snapshot_tracker: &SnapshotTracker) {
pub fn run(
compaction_manager: &CompactionManager,
snapshot_tracker: &SnapshotTracker,
compaction_counter: &AtomicUsize,
) {
let Some(item) = compaction_manager.pop() else {
return;
};
Expand All @@ -21,10 +26,12 @@ pub fn run(compaction_manager: &CompactionManager, snapshot_tracker: &SnapshotTr

// TODO: loop if there's more work to do

compaction_counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if let Err(e) = item
.tree
.compact(strategy.inner(), snapshot_tracker.get_seqno_safe_to_gc())
{
log::error!("Compaction failed: {e:?}");
};
compaction_counter.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
}
20 changes: 19 additions & 1 deletion src/keyspace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ pub struct KeyspaceInner {
/// True if fsync failed
pub(crate) is_poisoned: Arc<AtomicBool>,

/// Active compaction conter
pub(crate) active_compaction_count: Arc<AtomicUsize>,

#[doc(hidden)]
pub snapshot_tracker: SnapshotTracker,
}
Expand Down Expand Up @@ -194,6 +197,14 @@ impl Keyspace {
self.write_buffer_manager.get()
}

/// Returns the number of active compactions currently running.
#[doc(hidden)]
#[must_use]
pub fn active_compactions(&self) -> usize {
self.active_compaction_count
.load(std::sync::atomic::Ordering::Relaxed)
}

/// Returns the amount of journals on disk.
///
/// # Examples
Expand Down Expand Up @@ -580,6 +591,7 @@ impl Keyspace {
write_buffer_manager: WriteBufferManager::default(),
is_poisoned: Arc::default(),
snapshot_tracker: SnapshotTracker::default(),
active_compaction_count: Arc::default(),
};

let keyspace = Self(Arc::new(inner));
Expand Down Expand Up @@ -713,6 +725,7 @@ impl Keyspace {
write_buffer_manager: WriteBufferManager::default(),
is_poisoned: Arc::default(),
snapshot_tracker: SnapshotTracker::default(),
active_compaction_count: Arc::default(),
};

// NOTE: Lastly, fsync .fjall marker, which contains the version
Expand Down Expand Up @@ -792,6 +805,7 @@ impl Keyspace {
let stop_signal = self.stop_signal.clone();
let thread_counter = self.active_background_threads.clone();
let snapshot_tracker = self.snapshot_tracker.clone();
let compaction_counter = self.active_compaction_count.clone();

thread_counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed);

Expand All @@ -802,7 +816,11 @@ impl Keyspace {
log::trace!("compaction: waiting for work");
compaction_manager.wait_for();

crate::compaction::worker::run(&compaction_manager, &snapshot_tracker);
crate::compaction::worker::run(
&compaction_manager,
&snapshot_tracker,
&compaction_counter,
);
}

log::trace!("compaction thread: exiting because keyspace is dropping");
Expand Down

0 comments on commit f54a94b

Please sign in to comment.