Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: allow manual WAL pruning via CLI #349

Merged
merged 1 commit into from
Oct 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/bin/dolos/data/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ mod copy_wal;
mod dump_wal;
mod export;
mod find_seq;
mod prune_wal;
mod summary;

#[derive(Debug, Subcommand)]
Expand All @@ -18,6 +19,8 @@ pub enum Command {
Export(export::Args),
/// copies a range of slots from the WAL into a new db
CopyWal(copy_wal::Args),
/// removes blocks from the WAL before a given slot
PruneWal(prune_wal::Args),
}

#[derive(Debug, Parser)]
Expand All @@ -37,6 +40,7 @@ pub fn run(
Command::FindSeq(x) => find_seq::run(config, x)?,
Command::Export(x) => export::run(config, x, feedback)?,
Command::CopyWal(x) => copy_wal::run(config, x)?,
Command::PruneWal(x) => prune_wal::run(config, x)?,
}

Ok(())
Expand Down
43 changes: 43 additions & 0 deletions src/bin/dolos/data/prune_wal.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
use miette::{bail, Context, IntoDiagnostic};
use tracing::info;

#[derive(Debug, clap::Args)]
pub struct Args {
/// the maximum number of slots to keep in the WAL
#[arg(long)]
max_slots: Option<u64>,

/// the maximum number of slots to prune in a single operation
#[arg(long)]
max_prune: Option<u64>,
}

pub fn run(config: &crate::Config, args: &Args) -> miette::Result<()> {
crate::common::setup_tracing(&config.logging)?;

let (mut wal, _) = crate::common::open_data_stores(config).context("opening data stores")?;

let max_slots = match args.max_slots {
Some(x) => x,
None => match config.storage.max_wal_history {
Some(x) => x,
None => bail!("neither args or config provided for max_slots"),
},
};

info!(max_slots, "prunning to max slots");

wal.prune_history(max_slots, args.max_prune)
.into_diagnostic()
.context("removing range from WAL")?;

let db = wal.db_mut().unwrap();

while db.compact().into_diagnostic()? {
info!("wal compaction round");
}

info!("wal segment trimmed");

Ok(())
}
43 changes: 0 additions & 43 deletions src/bin/dolos/doctor/compact_wal.rs

This file was deleted.

4 changes: 0 additions & 4 deletions src/bin/dolos/doctor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use clap::{Parser, Subcommand};

use crate::feedback::Feedback;

mod compact_wal;
mod rebuild_ledger;
mod wal_integrity;

Expand All @@ -12,8 +11,6 @@ pub enum Command {
RebuildLedger(rebuild_ledger::Args),
/// checks the integrity of the WAL records
WalIntegrity(wal_integrity::Args),
/// remove parts of the WAL
CompactWal(compact_wal::Args),
}

#[derive(Debug, Parser)]
Expand All @@ -26,7 +23,6 @@ pub fn run(config: &super::Config, args: &Args, feedback: &Feedback) -> miette::
match &args.command {
Command::RebuildLedger(x) => rebuild_ledger::run(config, x, feedback)?,
Command::WalIntegrity(x) => wal_integrity::run(config, x)?,
Command::CompactWal(x) => compact_wal::run(config, x)?,
}

Ok(())
Expand Down
33 changes: 21 additions & 12 deletions src/wal/redb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
use itertools::Itertools;
use redb::{Range, ReadableTable, TableDefinition};
use std::{path::Path, sync::Arc};
use tracing::{debug, info, warn};
use tracing::{debug, info, trace, warn};

use super::{
BlockSlot, ChainPoint, LogEntry, LogSeq, LogValue, RawBlock, WalError, WalReader, WalWriter,
Expand Down Expand Up @@ -195,8 +195,6 @@
Ok(())
}

const MAX_PRUNE_SLOTS_PER_PASS: u64 = 10_000;

/// Prunes the WAL history to maintain a maximum number of slots.
///
/// This method attempts to remove older entries from the WAL to keep the
Expand All @@ -207,12 +205,13 @@
/// 2. Calculates the number of slots that exceed the `max_slots` limit.
/// 3. If pruning is necessary, it removes entries older than a calculated
/// cutoff slot.
/// 4. Pruning is limited to a maximum of `MAX_PRUNE_SLOTS_PER_PASS` slots
/// per invocation to avoid long-running operations.
/// 4. Optionally limits the number of slots pruned per invocation.
///
/// # Arguments
///
/// * `max_slots` - The maximum number of slots to retain in the WAL.
/// * `max_prune` - Optional limit on the number of slots to prune in a
/// single operation.
///
/// # Returns
///
Expand All @@ -224,9 +223,14 @@
/// - If the WAL doesn't exceed the `max_slots` limit, no pruning occurs.
/// - This method is typically called periodically as part of housekeeping
/// operations.
/// - The actual number of slots pruned may be less than the calculated
/// excess to avoid long-running operations.
pub fn prune_history(&mut self, max_slots: u64) -> Result<(), WalError> {
/// - If `max_prune` is specified, it limits the number of slots pruned in a
/// single operation, which can help avoid long-running operations.
/// - If `max_prune` is not specified, all excess slots will be pruned.
pub fn prune_history(
&mut self,
max_slots: u64,
max_prune: Option<u64>,
) -> Result<(), WalError> {
let start_slot = match self.find_start()? {
Some((_, ChainPoint::Origin)) => 0,
Some((_, ChainPoint::Specific(slot, _))) => slot,
Expand All @@ -253,7 +257,10 @@
return Ok(());
}

let max_prune = core::cmp::min(delta, Self::MAX_PRUNE_SLOTS_PER_PASS);
let max_prune = match max_prune {
Some(max) => core::cmp::min(delta, max),
None => delta,
};

let prune_before = start_slot + max_prune;

Expand All @@ -264,10 +271,12 @@
Ok(())
}

const MAX_PRUNE_SLOTS_PER_HOUSEKEEPING: u64 = 10_000;

pub fn housekeeping(&mut self) -> Result<(), WalError> {
if let Some(max_slots) = self.max_slots {
info!(max_slots, "pruning wal for excess history");
self.prune_history(max_slots)?;
self.prune_history(max_slots, Some(Self::MAX_PRUNE_SLOTS_PER_HOUSEKEEPING))?;
}

Ok(())
Expand Down Expand Up @@ -355,9 +364,9 @@
/// # Examples
///
/// ```
/// let result = wal.approximate_slot_with_retry(

Check failure on line 367 in src/wal/redb.rs

View workflow job for this annotation

GitHub Actions / Test Suite

cannot find value `wal` in this scope
/// slot,

Check failure on line 368 in src/wal/redb.rs

View workflow job for this annotation

GitHub Actions / Test Suite

cannot find value `slot` in this scope
/// |retry| slot - 100 * retry..=slot + 100 * retry

Check failure on line 369 in src/wal/redb.rs

View workflow job for this annotation

GitHub Actions / Test Suite

cannot find value `slot` in this scope

Check failure on line 369 in src/wal/redb.rs

View workflow job for this annotation

GitHub Actions / Test Suite

cannot find value `slot` in this scope
/// )?;
/// ```
pub fn approximate_slot_with_retry<F, R>(
Expand Down Expand Up @@ -425,7 +434,7 @@
let mut to_remove = wal.extract_from_if(..last_seq, |_, _| true)?;

while let Some(Ok((seq, _))) = to_remove.next() {
debug!(seq = seq.value(), "removing log entry");
trace!(seq = seq.value(), "removing log entry");
}
}

Expand All @@ -434,7 +443,7 @@
let mut to_remove = pos.extract_from_if(..(slot as i128), |_, _| true)?;

while let Some(Ok((slot, _))) = to_remove.next() {
debug!(slot = slot.value(), "removing log entry");
trace!(slot = slot.value(), "removing log entry");
}
}

Expand Down
Loading