Skip to content

Commit

Permalink
chore: refactor controller run reconcile
Browse files Browse the repository at this point in the history
Signed-off-by: iGxnon <[email protected]>
  • Loading branch information
iGxnon committed Oct 26, 2023
1 parent 8654047 commit c025e6d
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 61 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions operator-k8s/src/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,9 @@ impl Operator {
.await;
if let Err(err) = res {
error!("monitor run failed, error: {err}");
} else {
info!("sidecar monitor shutdown");
}
info!("sidecar monitor shutdown");
});
}

Expand All @@ -196,8 +197,9 @@ impl Operator {
.await;
if let Err(err) = res {
error!("web server starts failed, error: {err}");
} else {
info!("web server shut down");
}
info!("web server shut down");
});

Ok(())
Expand Down
1 change: 0 additions & 1 deletion sidecar/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ operator-api = { path = "../operator-api" }
reqwest = { version = "0.11", features = ["json"] }
serde = { version = "1.0.130", features = ["derive"] }
serde_json = "1.0.97"
thiserror = "1.0.40"
tokio = { version = "1.0", features = [
"rt-multi-thread",
"time",
Expand Down
67 changes: 33 additions & 34 deletions sidecar/src/controller.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
#![allow(dead_code)] // TODO remove when it is implemented

use std::future::Future;
use std::sync::Arc;
use thiserror::Error;
use std::time::Duration;

use anyhow::Result;
use tokio::select;
use tokio::sync::watch::Receiver;
use tokio::sync::Mutex;
use tokio::time::{Instant, Interval};
use tokio::time::{interval, MissedTickBehavior};
use tracing::debug;

use crate::types::StatePayload;
use crate::xline::XlineHandle;
Expand All @@ -18,61 +21,57 @@ pub(crate) struct Controller {
/// Xline handle
handle: Arc<XlineHandle>,
/// Check interval
check_interval: Interval,
/// graceful shutdown signal
graceful_shutdown: Receiver<()>,
}

/// All possible errors
#[derive(Error, Debug, PartialEq)]
pub(crate) enum Error {
/// Graceful shutdown error
#[error("operator has been shutdown")]
Shutdown,
reconcile_interval: Duration,
}

/// Controller result
type Result<T> = std::result::Result<T, Error>;

impl Controller {
/// Constructor
pub(crate) fn new(
handle: Arc<XlineHandle>,
check_interval: Interval,
state: Arc<Mutex<StatePayload>>,
graceful_shutdown: Receiver<()>,
handle: Arc<XlineHandle>,
reconcile_interval: Duration,
) -> Self {
Self {
state,
handle,
check_interval,
graceful_shutdown,
reconcile_interval,
}
}

/// Perform a reconciliation
/// Run reconcile loop with shutdown
#[allow(clippy::integer_arithmetic)] // this error originates in the macro `tokio::select`
pub(crate) async fn reconcile_once(&mut self) -> Result<Instant> {
pub(crate) async fn run_reconcile_with_shutdown(
self,
graceful_shutdown: impl Future<Output = ()>,
) -> Result<()> {
select! {
_ = self.graceful_shutdown.changed() => {
// TODO notify the cluster of this node's shutdown
Err(Error::Shutdown)
_ = graceful_shutdown => {
Ok(())
}
instant = self.check_interval.tick() => {
self.reconcile_inner().await.map(|_| instant)
res = self.run_reconcile() => {
res
}
}
}

/// Reconciliation inner
async fn reconcile_inner(&mut self) -> Result<()> {
self.evaluate().await?;
self.execute().await
/// Run reconcile loop
pub(crate) async fn run_reconcile(self) -> Result<()> {
let mut tick = interval(self.reconcile_interval);
tick.set_missed_tick_behavior(MissedTickBehavior::Skip);
loop {
let instant = tick.tick().await;
let _result = self.evaluate().await;
let _result1 = self.execute().await;
debug!(
"successfully reconcile the cluster states within {:?}",
instant.elapsed()
);
}
}

/// Evaluate cluster states
#[allow(clippy::unused_async)] // TODO remove when it is implemented
async fn evaluate(&mut self) -> Result<()> {
async fn evaluate(&self) -> Result<()> {
// TODO evaluate states
Ok(())
}
Expand Down
33 changes: 10 additions & 23 deletions sidecar/src/sidecar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use tracing::{debug, error, info, warn};
use crate::backup::pv::Pv;
use crate::backup::Provider;
use crate::controller::Controller;
use crate::controller::Error;
use crate::routers;
use crate::types::{BackendConfig, BackupConfig, Config, State, StatePayload};
use crate::xline::XlineHandle;
Expand Down Expand Up @@ -134,6 +133,7 @@ impl Sidecar {
/// Start heartbeat
fn start_heartbeat(&self, graceful_shutdown: Receiver<()>) {
let Some(monitor) = self.config.monitor.clone() else {
info!("monitor did not set, disable heartbeat");
return;
};
let cluster_name = self.config.cluster_name.clone();
Expand Down Expand Up @@ -177,29 +177,16 @@ impl Sidecar {
state: Arc<Mutex<StatePayload>>,
graceful_shutdown: Receiver<()>,
) {
let mut controller = Controller::new(
handle,
interval(self.config.reconcile_interval),
state,
graceful_shutdown,
);
let controller = Controller::new(state, handle, self.config.reconcile_interval);
let _ig = tokio::spawn(async move {
loop {
match controller.reconcile_once().await {
Ok(instant) => {
debug!(
"successfully reconcile the cluster states within {:?}",
instant.elapsed()
);
}
Err(err) => {
if err == Error::Shutdown {
info!("controller graceful shutdown");
break;
}
error!("reconcile failed, error: {}", err);
}
}
let mut shutdown = graceful_shutdown;
let res = controller
.run_reconcile_with_shutdown(shutdown.changed().map(|_| ()))
.await;
if let Err(err) = res {
error!("controller run failed, error: {err}");
} else {
info!("controller shutdown");
}
});
}
Expand Down

0 comments on commit c025e6d

Please sign in to comment.