Skip to content

Commit

Permalink
refactor: Reduce the amount of code written to add new backup types
Browse files Browse the repository at this point in the history
  • Loading branch information
notheotherben committed Nov 30, 2024
1 parent 437f1e8 commit ff5ea54
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 70 deletions.
37 changes: 19 additions & 18 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ chrono = { version = "0.4.38", features = ["serde"] }
clap = { version = "4.5.21", features = ["derive", "string"] }
croner = "2.1.0"
ctrlc = "3.4.5"
futures = "0.3.31"
gix = { version = "0.68.0", features = [
"blocking-http-transport-reqwest-rust-tls",
] }
Expand Down
71 changes: 22 additions & 49 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use clap::Parser;
use engines::BackupState;
use errors::Error;
use pairing::PairingHandler;
use std::sync::atomic::AtomicBool;
use std::time::Duration;
use tokio_stream::StreamExt;
use tracing_batteries::prelude::*;

#[macro_use]
Expand Down Expand Up @@ -78,60 +78,21 @@ async fn run(args: Args) -> Result<(), Error> {
match policy.kind.as_str() {
k if k == GitHubKind::Repo.as_str() => {
info!("Backing up repositories for {}", &policy);

let stream = github_repo.run(policy, &CANCEL);
tokio::pin!(stream);
while let Some(result) = stream.next().await {
match result {
Ok((entity, BackupState::Skipped)) => {
debug!(" - {} ({})", entity, BackupState::Skipped);
}
Ok((entity, state)) => {
info!(" - {} ({})", entity, state);
}
Err(e) => {
warn!("Error: {}", e);
}
}
}
github_repo
.run(policy, &LoggingPairingHandler, &CANCEL)
.await;
}
k if k == GitHubKind::Star.as_str() => {
info!("Backing up starred repositories for {}", &policy);

let stream = github_star.run(policy, &CANCEL);
tokio::pin!(stream);
while let Some(result) = stream.next().await {
match result {
Ok((entity, BackupState::Skipped)) => {
debug!(" - {} ({})", entity, BackupState::Skipped);
}
Ok((entity, state)) => {
info!(" - {} ({})", entity, state);
}
Err(e) => {
warn!("Error: {}", e);
}
}
}
github_star
.run(policy, &LoggingPairingHandler, &CANCEL)
.await;
}
k if k == GitHubKind::Release.as_str() => {
info!("Backing up release artifacts for {}", &policy);

let stream = github_release.run(policy, &CANCEL);
tokio::pin!(stream);
while let Some(result) = stream.next().await {
match result {
Ok((entity, BackupState::Skipped)) => {
debug!(" - {} ({})", entity, BackupState::Skipped);
}
Ok((entity, state)) => {
info!(" - {} ({})", entity, state);
}
Err(e) => {
warn!("Error: {}", e);
}
}
}
github_release
.run(policy, &LoggingPairingHandler, &CANCEL)
.await;
}
_ => {
error!("Unknown policy kind: {}", policy.kind);
Expand Down Expand Up @@ -162,6 +123,18 @@ async fn run(args: Args) -> Result<(), Error> {
Ok(())
}

pub struct LoggingPairingHandler;

impl<E: BackupEntity> PairingHandler<E> for LoggingPairingHandler {
fn on_complete(&self, entity: E, state: BackupState) {
info!(" - {} ({})", entity, state);
}

fn on_error(&self, error: crate::Error) {
warn!("Error: {}", error);
}
}

#[tokio::main]
async fn main() {
ctrlc::set_handler(|| {
Expand Down
27 changes: 24 additions & 3 deletions src/pairing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::{marker::PhantomData, sync::atomic::AtomicBool};

use crate::telemetry::StreamExt;
use tokio::task::JoinSet;
use tokio_stream::Stream;
use tokio_stream::{Stream, StreamExt as _};
use tracing_batteries::prelude::*;

use crate::{
Expand Down Expand Up @@ -49,7 +49,23 @@ impl<
}
}

pub fn run<'a>(
pub async fn run(
&self,
policy: &BackupPolicy,
handler: &dyn PairingHandler<E>,
cancel: &'static AtomicBool,
) {
let stream = self.run_all_backups(policy, cancel);
tokio::pin!(stream);
while let Some(result) = stream.next().await {
match result {
Ok((entity, state)) => handler.on_complete(entity, state),
Err(e) => handler.on_error(e),
}
}
}

pub fn run_all_backups<'a>(
&'a self,
policy: &'a BackupPolicy,
cancel: &'static AtomicBool,
Expand Down Expand Up @@ -114,6 +130,11 @@ impl<
}
}

pub trait PairingHandler<E: BackupEntity> {
fn on_complete(&self, entity: E, state: BackupState);
fn on_error(&self, error: crate::Error);
}

#[cfg(test)]
mod tests {
use std::path::Path;
Expand Down Expand Up @@ -221,7 +242,7 @@ mod tests {
.with_concurrency_limit(5)
.with_dry_run(false);

let stream = pairing.run(&policy, &CANCEL);
let stream = pairing.run_all_backups(&policy, &CANCEL);

tokio::pin!(stream);

Expand Down

0 comments on commit ff5ea54

Please sign in to comment.