diff --git a/Cargo.lock b/Cargo.lock index dfac1ad6..f631711e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4116,7 +4116,6 @@ dependencies = [ "reqwest", "serde", "serde_json", - "thiserror", "tokio", "toml 0.8.2", "tonic", diff --git a/operator-k8s/src/operator.rs b/operator-k8s/src/operator.rs index f084e881..dedd0868 100644 --- a/operator-k8s/src/operator.rs +++ b/operator-k8s/src/operator.rs @@ -168,8 +168,9 @@ impl Operator { .await; if let Err(err) = res { error!("monitor run failed, error: {err}"); + } else { + info!("sidecar monitor shutdown"); } - info!("sidecar monitor shutdown"); }); } @@ -196,8 +197,9 @@ impl Operator { .await; if let Err(err) = res { error!("web server starts failed, error: {err}"); + } else { + info!("web server shut down"); } - info!("web server shut down"); }); Ok(()) diff --git a/sidecar/Cargo.toml b/sidecar/Cargo.toml index 51ec75de..08a99161 100644 --- a/sidecar/Cargo.toml +++ b/sidecar/Cargo.toml @@ -23,7 +23,6 @@ operator-api = { path = "../operator-api" } reqwest = { version = "0.11", features = ["json"] } serde = { version = "1.0.130", features = ["derive"] } serde_json = "1.0.97" -thiserror = "1.0.40" tokio = { version = "1.0", features = [ "rt-multi-thread", "time", diff --git a/sidecar/src/controller.rs b/sidecar/src/controller.rs index 141f0c4f..abe583ce 100644 --- a/sidecar/src/controller.rs +++ b/sidecar/src/controller.rs @@ -1,11 +1,14 @@ #![allow(dead_code)] // TODO remove when it is implemented +use std::future::Future; use std::sync::Arc; -use thiserror::Error; +use std::time::Duration; + +use anyhow::Result; use tokio::select; -use tokio::sync::watch::Receiver; use tokio::sync::Mutex; -use tokio::time::{Instant, Interval}; +use tokio::time::{interval, MissedTickBehavior}; +use tracing::debug; use crate::types::StatePayload; use crate::xline::XlineHandle; @@ -18,61 +21,57 @@ pub(crate) struct Controller { /// Xline handle handle: Arc, /// Check interval - check_interval: Interval, - /// graceful shutdown signal - graceful_shutdown: Receiver<()>, -} - -/// All possible errors -#[derive(Error, Debug, PartialEq)] -pub(crate) enum Error { - /// Graceful shutdown error - #[error("operator has been shutdown")] - Shutdown, + reconcile_interval: Duration, } -/// Controller result -type Result = std::result::Result; - impl Controller { /// Constructor pub(crate) fn new( - handle: Arc, - check_interval: Interval, state: Arc>, - graceful_shutdown: Receiver<()>, + handle: Arc, + reconcile_interval: Duration, ) -> Self { Self { state, handle, - check_interval, - graceful_shutdown, + reconcile_interval, } } - /// Perform a reconciliation + /// Run reconcile loop with shutdown #[allow(clippy::integer_arithmetic)] // this error originates in the macro `tokio::select` - pub(crate) async fn reconcile_once(&mut self) -> Result { + pub(crate) async fn run_reconcile_with_shutdown( + self, + graceful_shutdown: impl Future, + ) -> Result<()> { select! { - _ = self.graceful_shutdown.changed() => { - // TODO notify the cluster of this node's shutdown - Err(Error::Shutdown) + _ = graceful_shutdown => { + Ok(()) } - instant = self.check_interval.tick() => { - self.reconcile_inner().await.map(|_| instant) + res = self.run_reconcile() => { + res } } } - /// Reconciliation inner - async fn reconcile_inner(&mut self) -> Result<()> { - self.evaluate().await?; - self.execute().await + /// Run reconcile loop + pub(crate) async fn run_reconcile(self) -> Result<()> { + let mut tick = interval(self.reconcile_interval); + tick.set_missed_tick_behavior(MissedTickBehavior::Skip); + loop { + let instant = tick.tick().await; + let _result = self.evaluate().await; + let _result1 = self.execute().await; + debug!( + "successfully reconcile the cluster states within {:?}", + instant.elapsed() + ); + } } /// Evaluate cluster states #[allow(clippy::unused_async)] // TODO remove when it is implemented - async fn evaluate(&mut self) -> Result<()> { + async fn evaluate(&self) -> Result<()> { // TODO evaluate states Ok(()) } diff --git a/sidecar/src/sidecar.rs b/sidecar/src/sidecar.rs index 2cc08ba4..575c8df5 100644 --- a/sidecar/src/sidecar.rs +++ b/sidecar/src/sidecar.rs @@ -15,7 +15,6 @@ use tracing::{debug, error, info, warn}; use crate::backup::pv::Pv; use crate::backup::Provider; use crate::controller::Controller; -use crate::controller::Error; use crate::routers; use crate::types::{BackendConfig, BackupConfig, Config, State, StatePayload}; use crate::xline::XlineHandle; @@ -134,6 +133,7 @@ impl Sidecar { /// Start heartbeat fn start_heartbeat(&self, graceful_shutdown: Receiver<()>) { let Some(monitor) = self.config.monitor.clone() else { + info!("monitor did not set, disable heartbeat"); return; }; let cluster_name = self.config.cluster_name.clone(); @@ -177,29 +177,16 @@ impl Sidecar { state: Arc>, graceful_shutdown: Receiver<()>, ) { - let mut controller = Controller::new( - handle, - interval(self.config.reconcile_interval), - state, - graceful_shutdown, - ); + let controller = Controller::new(state, handle, self.config.reconcile_interval); let _ig = tokio::spawn(async move { - loop { - match controller.reconcile_once().await { - Ok(instant) => { - debug!( - "successfully reconcile the cluster states within {:?}", - instant.elapsed() - ); - } - Err(err) => { - if err == Error::Shutdown { - info!("controller graceful shutdown"); - break; - } - error!("reconcile failed, error: {}", err); - } - } + let mut shutdown = graceful_shutdown; + let res = controller + .run_reconcile_with_shutdown(shutdown.changed().map(|_| ())) + .await; + if let Err(err) = res { + error!("controller run failed, error: {err}"); + } else { + info!("controller shutdown"); } }); }