Skip to content

Commit

Permalink
feat: allow manual wal pruning via CLI (#349)
Browse files Browse the repository at this point in the history
  • Loading branch information
scarmuega authored Oct 12, 2024
1 parent 637eeef commit b2c056e
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 59 deletions.
4 changes: 4 additions & 0 deletions src/bin/dolos/data/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ mod copy_wal;
mod dump_wal;
mod export;
mod find_seq;
mod prune_wal;
mod summary;

#[derive(Debug, Subcommand)]
Expand All @@ -18,6 +19,8 @@ pub enum Command {
Export(export::Args),
/// copies a range of slots from the WAL into a new db
CopyWal(copy_wal::Args),
/// removes blocks from the WAL before a given slot
PruneWal(prune_wal::Args),
}

#[derive(Debug, Parser)]
Expand All @@ -37,6 +40,7 @@ pub fn run(
Command::FindSeq(x) => find_seq::run(config, x)?,
Command::Export(x) => export::run(config, x, feedback)?,
Command::CopyWal(x) => copy_wal::run(config, x)?,
Command::PruneWal(x) => prune_wal::run(config, x)?,
}

Ok(())
Expand Down
43 changes: 43 additions & 0 deletions src/bin/dolos/data/prune_wal.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
use miette::{bail, Context, IntoDiagnostic};
use tracing::info;

#[derive(Debug, clap::Args)]
pub struct Args {
/// the maximum number of slots to keep in the WAL
#[arg(long)]
max_slots: Option<u64>,

/// the maximum number of slots to prune in a single operation
#[arg(long)]
max_prune: Option<u64>,
}

pub fn run(config: &crate::Config, args: &Args) -> miette::Result<()> {
crate::common::setup_tracing(&config.logging)?;

let (mut wal, _) = crate::common::open_data_stores(config).context("opening data stores")?;

let max_slots = match args.max_slots {
Some(x) => x,
None => match config.storage.max_wal_history {
Some(x) => x,
None => bail!("neither args or config provided for max_slots"),
},
};

info!(max_slots, "prunning to max slots");

wal.prune_history(max_slots, args.max_prune)
.into_diagnostic()
.context("removing range from WAL")?;

let db = wal.db_mut().unwrap();

while db.compact().into_diagnostic()? {
info!("wal compaction round");
}

info!("wal segment trimmed");

Ok(())
}
43 changes: 0 additions & 43 deletions src/bin/dolos/doctor/compact_wal.rs

This file was deleted.

4 changes: 0 additions & 4 deletions src/bin/dolos/doctor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use clap::{Parser, Subcommand};

use crate::feedback::Feedback;

mod compact_wal;
mod rebuild_ledger;
mod wal_integrity;

Expand All @@ -12,8 +11,6 @@ pub enum Command {
RebuildLedger(rebuild_ledger::Args),
/// checks the integrity of the WAL records
WalIntegrity(wal_integrity::Args),
/// remove parts of the WAL
CompactWal(compact_wal::Args),
}

#[derive(Debug, Parser)]
Expand All @@ -26,7 +23,6 @@ pub fn run(config: &super::Config, args: &Args, feedback: &Feedback) -> miette::
match &args.command {
Command::RebuildLedger(x) => rebuild_ledger::run(config, x, feedback)?,
Command::WalIntegrity(x) => wal_integrity::run(config, x)?,
Command::CompactWal(x) => compact_wal::run(config, x)?,
}

Ok(())
Expand Down
33 changes: 21 additions & 12 deletions src/wal/redb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use bincode;
use itertools::Itertools;
use redb::{Range, ReadableTable, TableDefinition};
use std::{path::Path, sync::Arc};
use tracing::{debug, info, warn};
use tracing::{debug, info, trace, warn};

use super::{
BlockSlot, ChainPoint, LogEntry, LogSeq, LogValue, RawBlock, WalError, WalReader, WalWriter,
Expand Down Expand Up @@ -195,8 +195,6 @@ impl WalStore {
Ok(())
}

const MAX_PRUNE_SLOTS_PER_PASS: u64 = 10_000;

/// Prunes the WAL history to maintain a maximum number of slots.
///
/// This method attempts to remove older entries from the WAL to keep the
Expand All @@ -207,12 +205,13 @@ impl WalStore {
/// 2. Calculates the number of slots that exceed the `max_slots` limit.
/// 3. If pruning is necessary, it removes entries older than a calculated
/// cutoff slot.
/// 4. Pruning is limited to a maximum of `MAX_PRUNE_SLOTS_PER_PASS` slots
/// per invocation to avoid long-running operations.
/// 4. Optionally limits the number of slots pruned per invocation.
///
/// # Arguments
///
/// * `max_slots` - The maximum number of slots to retain in the WAL.
/// * `max_prune` - Optional limit on the number of slots to prune in a
/// single operation.
///
/// # Returns
///
Expand All @@ -224,9 +223,14 @@ impl WalStore {
/// - If the WAL doesn't exceed the `max_slots` limit, no pruning occurs.
/// - This method is typically called periodically as part of housekeeping
/// operations.
/// - The actual number of slots pruned may be less than the calculated
/// excess to avoid long-running operations.
pub fn prune_history(&mut self, max_slots: u64) -> Result<(), WalError> {
/// - If `max_prune` is specified, it limits the number of slots pruned in a
/// single operation, which can help avoid long-running operations.
/// - If `max_prune` is not specified, all excess slots will be pruned.
pub fn prune_history(
&mut self,
max_slots: u64,
max_prune: Option<u64>,
) -> Result<(), WalError> {
let start_slot = match self.find_start()? {
Some((_, ChainPoint::Origin)) => 0,
Some((_, ChainPoint::Specific(slot, _))) => slot,
Expand All @@ -253,7 +257,10 @@ impl WalStore {
return Ok(());
}

let max_prune = core::cmp::min(delta, Self::MAX_PRUNE_SLOTS_PER_PASS);
let max_prune = match max_prune {
Some(max) => core::cmp::min(delta, max),
None => delta,
};

let prune_before = start_slot + max_prune;

Expand All @@ -264,10 +271,12 @@ impl WalStore {
Ok(())
}

const MAX_PRUNE_SLOTS_PER_HOUSEKEEPING: u64 = 10_000;

pub fn housekeeping(&mut self) -> Result<(), WalError> {
if let Some(max_slots) = self.max_slots {
info!(max_slots, "pruning wal for excess history");
self.prune_history(max_slots)?;
self.prune_history(max_slots, Some(Self::MAX_PRUNE_SLOTS_PER_HOUSEKEEPING))?;
}

Ok(())
Expand Down Expand Up @@ -425,7 +434,7 @@ impl WalStore {
let mut to_remove = wal.extract_from_if(..last_seq, |_, _| true)?;

while let Some(Ok((seq, _))) = to_remove.next() {
debug!(seq = seq.value(), "removing log entry");
trace!(seq = seq.value(), "removing log entry");
}
}

Expand All @@ -434,7 +443,7 @@ impl WalStore {
let mut to_remove = pos.extract_from_if(..(slot as i128), |_, _| true)?;

while let Some(Ok((slot, _))) = to_remove.next() {
debug!(slot = slot.value(), "removing log entry");
trace!(slot = slot.value(), "removing log entry");
}
}

Expand Down

0 comments on commit b2c056e

Please sign in to comment.