diff --git a/src/bin/dolos/daemon.rs b/src/bin/dolos/daemon.rs index 005ba319..f4138fe1 100644 --- a/src/bin/dolos/daemon.rs +++ b/src/bin/dolos/daemon.rs @@ -9,7 +9,11 @@ use dolos::{ pub struct Args {} #[tokio::main] -pub async fn run(config: super::Config, _args: &Args) -> Result<(), Error> { +pub async fn run( + config: super::Config, + policy: &gasket::runtime::Policy, + _args: &Args, +) -> Result<(), Error> { tracing::subscriber::set_global_default( tracing_subscriber::FmtSubscriber::builder() .with_max_level(tracing::Level::INFO) @@ -44,7 +48,7 @@ pub async fn run(config: super::Config, _args: &Args) -> Result<(), Error> { from_sync.try_into().unwrap(), )); - dolos::sync::pipeline(&config.upstream, rolldb, applydb, to_serve) + dolos::sync::pipeline(&config.upstream, rolldb, applydb, to_serve, policy) .unwrap() .block(); diff --git a/src/bin/dolos/main.rs b/src/bin/dolos/main.rs index 79effe4b..3fb0dcf6 100644 --- a/src/bin/dolos/main.rs +++ b/src/bin/dolos/main.rs @@ -1,3 +1,5 @@ +use std::time::Duration; + use clap::{Parser, Subcommand}; use miette::{IntoDiagnostic, Result}; use serde::Deserialize; @@ -43,6 +45,7 @@ pub struct Config { pub applydb: ApplydbConfig, pub upstream: dolos::sync::Config, pub serve: dolos::serve::Config, + pub retries: Option, } impl Config { @@ -67,13 +70,32 @@ impl Config { } } +fn define_gasket_policy(config: Option<&gasket::retries::Policy>) -> gasket::runtime::Policy { + let default_policy = gasket::retries::Policy { + max_retries: 20, + backoff_unit: Duration::from_secs(1), + backoff_factor: 2, + max_backoff: Duration::from_secs(60), + dismissible: false, + }; + + gasket::runtime::Policy { + tick_timeout: std::time::Duration::from_secs(120).into(), + bootstrap_retry: config.cloned().unwrap_or(default_policy.clone()), + work_retry: config.cloned().unwrap_or(default_policy.clone()), + teardown_retry: config.cloned().unwrap_or(default_policy.clone()), + } +} + fn main() -> Result<()> { let args = Cli::parse(); let config = Config::new(&args.config).into_diagnostic()?; + let retries = define_gasket_policy(config.retries.as_ref()); + match args.command { - Command::Daemon(x) => daemon::run(config, &x).into_diagnostic()?, - Command::Sync(x) => sync::run(&config, &x).into_diagnostic()?, + Command::Daemon(x) => daemon::run(config, &retries, &x).into_diagnostic()?, + Command::Sync(x) => sync::run(&config, &retries, &x).into_diagnostic()?, Command::Read(x) => read::run(&config, &x).into_diagnostic()?, Command::Serve(x) => serve::run(config, &x).into_diagnostic()?, }; diff --git a/src/bin/dolos/sync.rs b/src/bin/dolos/sync.rs index 74c25059..4586c819 100644 --- a/src/bin/dolos/sync.rs +++ b/src/bin/dolos/sync.rs @@ -8,7 +8,11 @@ use dolos::{ #[derive(Debug, clap::Args)] pub struct Args {} -pub fn run(config: &super::Config, _args: &Args) -> Result<(), Error> { +pub fn run( + config: &super::Config, + policy: &gasket::runtime::Policy, + _args: &Args, +) -> Result<(), Error> { tracing::subscriber::set_global_default( tracing_subscriber::FmtSubscriber::builder() .with_max_level(tracing::Level::INFO) @@ -36,7 +40,7 @@ pub fn run(config: &super::Config, _args: &Args) -> Result<(), Error> { // placeholder while we implement monitoring sink let (to_monitor, _) = gasket::messaging::tokio::broadcast_channel(100); - dolos::sync::pipeline(&config.upstream, rolldb, applydb, to_monitor) + dolos::sync::pipeline(&config.upstream, rolldb, applydb, to_monitor, policy) .unwrap() .block(); diff --git a/src/sync/mod.rs b/src/sync/mod.rs index ba43c09e..3f2cc1e4 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -19,6 +19,7 @@ pub fn pipeline( rolldb: RollDB, applydb: ApplyDB, output: gasket::messaging::tokio::ChannelSendAdapter, + policy: &gasket::runtime::Policy, ) -> Result { let pull_cursor = rolldb .intersect_options(5) @@ -49,9 +50,9 @@ pub fn pipeline( // output to outside of out pipeline apply.downstream.connect(output); - let pull = gasket::runtime::spawn_stage(pull, gasket::runtime::Policy::default()); - let roll = gasket::runtime::spawn_stage(roll, gasket::runtime::Policy::default()); - let apply = gasket::runtime::spawn_stage(apply, gasket::runtime::Policy::default()); + let pull = gasket::runtime::spawn_stage(pull, policy.clone()); + let roll = gasket::runtime::spawn_stage(roll, policy.clone()); + let apply = gasket::runtime::spawn_stage(apply, policy.clone()); Ok(gasket::daemon::Daemon(vec![pull, roll, apply])) }