From b6e91310261386c5dd1a0d9a90cc95365fcebc45 Mon Sep 17 00:00:00 2001 From: Santiago Carmuega Date: Sat, 12 Oct 2024 08:44:19 -0300 Subject: [PATCH] feat: allow manual wal pruning via CLI --- src/bin/dolos/data/mod.rs | 4 +++ src/bin/dolos/data/prune_wal.rs | 43 +++++++++++++++++++++++++++++ src/bin/dolos/doctor/compact_wal.rs | 43 ----------------------------- src/bin/dolos/doctor/mod.rs | 4 --- src/wal/redb.rs | 33 ++++++++++++++-------- 5 files changed, 68 insertions(+), 59 deletions(-) create mode 100644 src/bin/dolos/data/prune_wal.rs delete mode 100644 src/bin/dolos/doctor/compact_wal.rs diff --git a/src/bin/dolos/data/mod.rs b/src/bin/dolos/data/mod.rs index c44b460..20e8045 100644 --- a/src/bin/dolos/data/mod.rs +++ b/src/bin/dolos/data/mod.rs @@ -4,6 +4,7 @@ mod copy_wal; mod dump_wal; mod export; mod find_seq; +mod prune_wal; mod summary; #[derive(Debug, Subcommand)] @@ -18,6 +19,8 @@ pub enum Command { Export(export::Args), /// copies a range of slots from the WAL into a new db CopyWal(copy_wal::Args), + /// removes blocks from the WAL before a given slot + PruneWal(prune_wal::Args), } #[derive(Debug, Parser)] @@ -37,6 +40,7 @@ pub fn run( Command::FindSeq(x) => find_seq::run(config, x)?, Command::Export(x) => export::run(config, x, feedback)?, Command::CopyWal(x) => copy_wal::run(config, x)?, + Command::PruneWal(x) => prune_wal::run(config, x)?, } Ok(()) diff --git a/src/bin/dolos/data/prune_wal.rs b/src/bin/dolos/data/prune_wal.rs new file mode 100644 index 0000000..1c01725 --- /dev/null +++ b/src/bin/dolos/data/prune_wal.rs @@ -0,0 +1,43 @@ +use miette::{bail, Context, IntoDiagnostic}; +use tracing::info; + +#[derive(Debug, clap::Args)] +pub struct Args { + /// the maximum number of slots to keep in the WAL + #[arg(long)] + max_slots: Option, + + /// the maximum number of slots to prune in a single operation + #[arg(long)] + max_prune: Option, +} + +pub fn run(config: &crate::Config, args: &Args) -> miette::Result<()> { + crate::common::setup_tracing(&config.logging)?; + + let (mut wal, _) = crate::common::open_data_stores(config).context("opening data stores")?; + + let max_slots = match args.max_slots { + Some(x) => x, + None => match config.storage.max_wal_history { + Some(x) => x, + None => bail!("neither args or config provided for max_slots"), + }, + }; + + info!(max_slots, "prunning to max slots"); + + wal.prune_history(max_slots, args.max_prune) + .into_diagnostic() + .context("removing range from WAL")?; + + let db = wal.db_mut().unwrap(); + + while db.compact().into_diagnostic()? { + info!("wal compaction round"); + } + + info!("wal segment trimmed"); + + Ok(()) +} diff --git a/src/bin/dolos/doctor/compact_wal.rs b/src/bin/dolos/doctor/compact_wal.rs deleted file mode 100644 index 534a166..0000000 --- a/src/bin/dolos/doctor/compact_wal.rs +++ /dev/null @@ -1,43 +0,0 @@ -use dolos::wal::{ChainPoint, WalReader as _}; -use miette::{Context, IntoDiagnostic}; - -#[derive(Debug, clap::Args)] -pub struct Args {} - -pub fn run(config: &crate::Config, _args: &Args) -> miette::Result<()> { - crate::common::setup_tracing(&config.logging)?; - - let (mut wal, _) = crate::common::open_data_stores(config).context("opening data stores")?; - - let (_, tip) = wal - .find_tip() - .into_diagnostic() - .context("finding tip")? - .ok_or(miette::miette!("no tip"))?; - - let tip = match tip { - ChainPoint::Origin => 0, - ChainPoint::Specific(slot, _) => slot, - }; - - println!("tip is slot {}", tip); - - // TODO: make this configurable - let min_slot = tip - 1000; - - println!("removing blocks before slot {}", min_slot); - - wal.remove_before(min_slot) - .into_diagnostic() - .context("removing range from WAL")?; - - let db = wal.db_mut().unwrap(); - - while db.compact().into_diagnostic()? { - println!("wal compaction round"); - } - - println!("wal segment trimmed"); - - Ok(()) -} diff --git a/src/bin/dolos/doctor/mod.rs b/src/bin/dolos/doctor/mod.rs index 50310f2..be6bdb6 100644 --- a/src/bin/dolos/doctor/mod.rs +++ b/src/bin/dolos/doctor/mod.rs @@ -2,7 +2,6 @@ use clap::{Parser, Subcommand}; use crate::feedback::Feedback; -mod compact_wal; mod rebuild_ledger; mod wal_integrity; @@ -12,8 +11,6 @@ pub enum Command { RebuildLedger(rebuild_ledger::Args), /// checks the integrity of the WAL records WalIntegrity(wal_integrity::Args), - /// remove parts of the WAL - CompactWal(compact_wal::Args), } #[derive(Debug, Parser)] @@ -26,7 +23,6 @@ pub fn run(config: &super::Config, args: &Args, feedback: &Feedback) -> miette:: match &args.command { Command::RebuildLedger(x) => rebuild_ledger::run(config, x, feedback)?, Command::WalIntegrity(x) => wal_integrity::run(config, x)?, - Command::CompactWal(x) => compact_wal::run(config, x)?, } Ok(()) diff --git a/src/wal/redb.rs b/src/wal/redb.rs index d7e7c9c..8374e4b 100644 --- a/src/wal/redb.rs +++ b/src/wal/redb.rs @@ -2,7 +2,7 @@ use bincode; use itertools::Itertools; use redb::{Range, ReadableTable, TableDefinition}; use std::{path::Path, sync::Arc}; -use tracing::{debug, info, warn}; +use tracing::{debug, info, trace, warn}; use super::{ BlockSlot, ChainPoint, LogEntry, LogSeq, LogValue, RawBlock, WalError, WalReader, WalWriter, @@ -195,8 +195,6 @@ impl WalStore { Ok(()) } - const MAX_PRUNE_SLOTS_PER_PASS: u64 = 10_000; - /// Prunes the WAL history to maintain a maximum number of slots. /// /// This method attempts to remove older entries from the WAL to keep the @@ -207,12 +205,13 @@ impl WalStore { /// 2. Calculates the number of slots that exceed the `max_slots` limit. /// 3. If pruning is necessary, it removes entries older than a calculated /// cutoff slot. - /// 4. Pruning is limited to a maximum of `MAX_PRUNE_SLOTS_PER_PASS` slots - /// per invocation to avoid long-running operations. + /// 4. Optionally limits the number of slots pruned per invocation. /// /// # Arguments /// /// * `max_slots` - The maximum number of slots to retain in the WAL. + /// * `max_prune` - Optional limit on the number of slots to prune in a + /// single operation. /// /// # Returns /// @@ -224,9 +223,14 @@ impl WalStore { /// - If the WAL doesn't exceed the `max_slots` limit, no pruning occurs. /// - This method is typically called periodically as part of housekeeping /// operations. - /// - The actual number of slots pruned may be less than the calculated - /// excess to avoid long-running operations. - pub fn prune_history(&mut self, max_slots: u64) -> Result<(), WalError> { + /// - If `max_prune` is specified, it limits the number of slots pruned in a + /// single operation, which can help avoid long-running operations. + /// - If `max_prune` is not specified, all excess slots will be pruned. + pub fn prune_history( + &mut self, + max_slots: u64, + max_prune: Option, + ) -> Result<(), WalError> { let start_slot = match self.find_start()? { Some((_, ChainPoint::Origin)) => 0, Some((_, ChainPoint::Specific(slot, _))) => slot, @@ -253,7 +257,10 @@ impl WalStore { return Ok(()); } - let max_prune = core::cmp::min(delta, Self::MAX_PRUNE_SLOTS_PER_PASS); + let max_prune = match max_prune { + Some(max) => core::cmp::min(delta, max), + None => delta, + }; let prune_before = start_slot + max_prune; @@ -264,10 +271,12 @@ impl WalStore { Ok(()) } + const MAX_PRUNE_SLOTS_PER_HOUSEKEEPING: u64 = 10_000; + pub fn housekeeping(&mut self) -> Result<(), WalError> { if let Some(max_slots) = self.max_slots { info!(max_slots, "pruning wal for excess history"); - self.prune_history(max_slots)?; + self.prune_history(max_slots, Some(Self::MAX_PRUNE_SLOTS_PER_HOUSEKEEPING))?; } Ok(()) @@ -425,7 +434,7 @@ impl WalStore { let mut to_remove = wal.extract_from_if(..last_seq, |_, _| true)?; while let Some(Ok((seq, _))) = to_remove.next() { - debug!(seq = seq.value(), "removing log entry"); + trace!(seq = seq.value(), "removing log entry"); } } @@ -434,7 +443,7 @@ impl WalStore { let mut to_remove = pos.extract_from_if(..(slot as i128), |_, _| true)?; while let Some(Ok((slot, _))) = to_remove.next() { - debug!(slot = slot.value(), "removing log entry"); + trace!(slot = slot.value(), "removing log entry"); } }