Skip to content

Commit

Permalink
feat: added retry config
Browse files Browse the repository at this point in the history
  • Loading branch information
paulobressan committed Sep 4, 2023
1 parent 6a81321 commit 911b888
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 9 deletions.
8 changes: 6 additions & 2 deletions src/bin/dolos/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,11 @@ use dolos::{
pub struct Args {}

#[tokio::main]
pub async fn run(config: super::Config, _args: &Args) -> Result<(), Error> {
pub async fn run(
config: super::Config,
policy: &gasket::runtime::Policy,
_args: &Args,
) -> Result<(), Error> {
tracing::subscriber::set_global_default(
tracing_subscriber::FmtSubscriber::builder()
.with_max_level(tracing::Level::INFO)
Expand Down Expand Up @@ -44,7 +48,7 @@ pub async fn run(config: super::Config, _args: &Args) -> Result<(), Error> {
from_sync.try_into().unwrap(),
));

dolos::sync::pipeline(&config.upstream, rolldb, applydb, to_serve)
dolos::sync::pipeline(&config.upstream, rolldb, applydb, to_serve, policy)
.unwrap()
.block();

Expand Down
26 changes: 24 additions & 2 deletions src/bin/dolos/main.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::time::Duration;

use clap::{Parser, Subcommand};
use miette::{IntoDiagnostic, Result};
use serde::Deserialize;
Expand Down Expand Up @@ -43,6 +45,7 @@ pub struct Config {
pub applydb: ApplydbConfig,
pub upstream: dolos::sync::Config,
pub serve: dolos::serve::Config,
pub retries: Option<gasket::retries::Policy>,
}

impl Config {
Expand All @@ -67,13 +70,32 @@ impl Config {
}
}

fn define_gasket_policy(config: Option<&gasket::retries::Policy>) -> gasket::runtime::Policy {
let default_policy = gasket::retries::Policy {
max_retries: 20,
backoff_unit: Duration::from_secs(1),
backoff_factor: 2,
max_backoff: Duration::from_secs(60),
dismissible: false,
};

gasket::runtime::Policy {
tick_timeout: std::time::Duration::from_secs(120).into(),
bootstrap_retry: config.cloned().unwrap_or(default_policy.clone()),
work_retry: config.cloned().unwrap_or(default_policy.clone()),
teardown_retry: config.cloned().unwrap_or(default_policy.clone()),
}
}

fn main() -> Result<()> {
let args = Cli::parse();
let config = Config::new(&args.config).into_diagnostic()?;

let retries = define_gasket_policy(config.retries.as_ref());

match args.command {
Command::Daemon(x) => daemon::run(config, &x).into_diagnostic()?,
Command::Sync(x) => sync::run(&config, &x).into_diagnostic()?,
Command::Daemon(x) => daemon::run(config, &retries, &x).into_diagnostic()?,
Command::Sync(x) => sync::run(&config, &retries, &x).into_diagnostic()?,
Command::Read(x) => read::run(&config, &x).into_diagnostic()?,
Command::Serve(x) => serve::run(config, &x).into_diagnostic()?,
};
Expand Down
8 changes: 6 additions & 2 deletions src/bin/dolos/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,11 @@ use dolos::{
#[derive(Debug, clap::Args)]
pub struct Args {}

pub fn run(config: &super::Config, _args: &Args) -> Result<(), Error> {
pub fn run(
config: &super::Config,
policy: &gasket::runtime::Policy,
_args: &Args,
) -> Result<(), Error> {
tracing::subscriber::set_global_default(
tracing_subscriber::FmtSubscriber::builder()
.with_max_level(tracing::Level::INFO)
Expand Down Expand Up @@ -36,7 +40,7 @@ pub fn run(config: &super::Config, _args: &Args) -> Result<(), Error> {
// placeholder while we implement monitoring sink
let (to_monitor, _) = gasket::messaging::tokio::broadcast_channel(100);

dolos::sync::pipeline(&config.upstream, rolldb, applydb, to_monitor)
dolos::sync::pipeline(&config.upstream, rolldb, applydb, to_monitor, policy)
.unwrap()
.block();

Expand Down
7 changes: 4 additions & 3 deletions src/sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ pub fn pipeline(
rolldb: RollDB,
applydb: ApplyDB,
output: gasket::messaging::tokio::ChannelSendAdapter<RollEvent>,
policy: &gasket::runtime::Policy,
) -> Result<gasket::daemon::Daemon, Error> {
let pull_cursor = rolldb
.intersect_options(5)
Expand Down Expand Up @@ -49,9 +50,9 @@ pub fn pipeline(
// output to outside of out pipeline
apply.downstream.connect(output);

let pull = gasket::runtime::spawn_stage(pull, gasket::runtime::Policy::default());
let roll = gasket::runtime::spawn_stage(roll, gasket::runtime::Policy::default());
let apply = gasket::runtime::spawn_stage(apply, gasket::runtime::Policy::default());
let pull = gasket::runtime::spawn_stage(pull, policy.clone());
let roll = gasket::runtime::spawn_stage(roll, policy.clone());
let apply = gasket::runtime::spawn_stage(apply, policy.clone());

Ok(gasket::daemon::Daemon(vec![pull, roll, apply]))
}

0 comments on commit 911b888

Please sign in to comment.