diff --git a/Cargo.lock b/Cargo.lock index c592767..05c9676 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -645,9 +645,9 @@ dependencies = [ [[package]] name = "futures" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0" +checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876" dependencies = [ "futures-channel", "futures-core", @@ -660,9 +660,9 @@ dependencies = [ [[package]] name = "futures-channel" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78" +checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10" dependencies = [ "futures-core", "futures-sink", @@ -670,15 +670,15 @@ dependencies = [ [[package]] name = "futures-core" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d" +checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" [[package]] name = "futures-executor" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a576fc72ae164fca6b9db127eaa9a9dda0d61316034f33a0a0d4eda41f02b01d" +checksum = "1e28d1d997f585e54aebc3f97d39e72338912123a67330d723fdbb564d646c9f" dependencies = [ "futures-core", "futures-task", @@ -687,15 +687,15 @@ dependencies = [ [[package]] name = "futures-io" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1" +checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" [[package]] name = "futures-macro" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" +checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" dependencies = [ "proc-macro2", "quote", @@ -704,15 +704,15 @@ dependencies = [ [[package]] name = "futures-sink" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9fb8e00e87438d937621c1c6269e53f536c14d3fbd6a042bb24879e57d474fb5" +checksum = "e575fab7d1e0dcb8d0c7bcf9a63ee213816ab51902e6d244a95819acacf1d4f7" [[package]] name = "futures-task" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "38d84fa142264698cdce1a9f9172cf383a0c82de1bddcf3092901442c4097004" +checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988" [[package]] name = "futures-timer" @@ -722,9 +722,9 @@ checksum = "f288b0a4f20f9a56b5d1da57e2227c661b7b16168e2f72365f57b63326e29b24" [[package]] name = "futures-util" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48" +checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" dependencies = [ "futures-channel", "futures-core", @@ -775,6 +775,7 @@ dependencies = [ "clap", "croner", "ctrlc", + "futures", "gix", "human-errors", "log", diff --git a/Cargo.toml b/Cargo.toml index c5a5b40..0f8f890 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,6 +10,7 @@ chrono = { version = "0.4.38", features = ["serde"] } clap = { version = "4.5.21", features = ["derive", "string"] } croner = "2.1.0" ctrlc = "3.4.5" +futures = "0.3.31" gix = { version = "0.68.0", features = [ "blocking-http-transport-reqwest-rust-tls", ] } diff --git a/src/main.rs b/src/main.rs index 3472974..166e531 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,9 +1,9 @@ use clap::Parser; use engines::BackupState; use errors::Error; +use pairing::PairingHandler; use std::sync::atomic::AtomicBool; use std::time::Duration; -use tokio_stream::StreamExt; use tracing_batteries::prelude::*; #[macro_use] @@ -78,60 +78,21 @@ async fn run(args: Args) -> Result<(), Error> { match policy.kind.as_str() { k if k == GitHubKind::Repo.as_str() => { info!("Backing up repositories for {}", &policy); - - let stream = github_repo.run(policy, &CANCEL); - tokio::pin!(stream); - while let Some(result) = stream.next().await { - match result { - Ok((entity, BackupState::Skipped)) => { - debug!(" - {} ({})", entity, BackupState::Skipped); - } - Ok((entity, state)) => { - info!(" - {} ({})", entity, state); - } - Err(e) => { - warn!("Error: {}", e); - } - } - } + github_repo + .run(policy, &LoggingPairingHandler, &CANCEL) + .await; } k if k == GitHubKind::Star.as_str() => { info!("Backing up starred repositories for {}", &policy); - - let stream = github_star.run(policy, &CANCEL); - tokio::pin!(stream); - while let Some(result) = stream.next().await { - match result { - Ok((entity, BackupState::Skipped)) => { - debug!(" - {} ({})", entity, BackupState::Skipped); - } - Ok((entity, state)) => { - info!(" - {} ({})", entity, state); - } - Err(e) => { - warn!("Error: {}", e); - } - } - } + github_star + .run(policy, &LoggingPairingHandler, &CANCEL) + .await; } k if k == GitHubKind::Release.as_str() => { info!("Backing up release artifacts for {}", &policy); - - let stream = github_release.run(policy, &CANCEL); - tokio::pin!(stream); - while let Some(result) = stream.next().await { - match result { - Ok((entity, BackupState::Skipped)) => { - debug!(" - {} ({})", entity, BackupState::Skipped); - } - Ok((entity, state)) => { - info!(" - {} ({})", entity, state); - } - Err(e) => { - warn!("Error: {}", e); - } - } - } + github_release + .run(policy, &LoggingPairingHandler, &CANCEL) + .await; } _ => { error!("Unknown policy kind: {}", policy.kind); @@ -162,6 +123,18 @@ async fn run(args: Args) -> Result<(), Error> { Ok(()) } +pub struct LoggingPairingHandler; + +impl PairingHandler for LoggingPairingHandler { + fn on_complete(&self, entity: E, state: BackupState) { + info!(" - {} ({})", entity, state); + } + + fn on_error(&self, error: crate::Error) { + warn!("Error: {}", error); + } +} + #[tokio::main] async fn main() { ctrlc::set_handler(|| { diff --git a/src/pairing.rs b/src/pairing.rs index ebb3f75..02d0c4a 100644 --- a/src/pairing.rs +++ b/src/pairing.rs @@ -2,7 +2,7 @@ use std::{marker::PhantomData, sync::atomic::AtomicBool}; use crate::telemetry::StreamExt; use tokio::task::JoinSet; -use tokio_stream::Stream; +use tokio_stream::{Stream, StreamExt as _}; use tracing_batteries::prelude::*; use crate::{ @@ -49,7 +49,23 @@ impl< } } - pub fn run<'a>( + pub async fn run( + &self, + policy: &BackupPolicy, + handler: &dyn PairingHandler, + cancel: &'static AtomicBool, + ) { + let stream = self.run_all_backups(policy, cancel); + tokio::pin!(stream); + while let Some(result) = stream.next().await { + match result { + Ok((entity, state)) => handler.on_complete(entity, state), + Err(e) => handler.on_error(e), + } + } + } + + pub fn run_all_backups<'a>( &'a self, policy: &'a BackupPolicy, cancel: &'static AtomicBool, @@ -114,6 +130,11 @@ impl< } } +pub trait PairingHandler { + fn on_complete(&self, entity: E, state: BackupState); + fn on_error(&self, error: crate::Error); +} + #[cfg(test)] mod tests { use std::path::Path; @@ -221,7 +242,7 @@ mod tests { .with_concurrency_limit(5) .with_dry_run(false); - let stream = pairing.run(&policy, &CANCEL); + let stream = pairing.run_all_backups(&policy, &CANCEL); tokio::pin!(stream);