diff --git a/operator-k8s/src/config.rs b/operator-k8s/src/config.rs index 52690617..46ff29bf 100644 --- a/operator-k8s/src/config.rs +++ b/operator-k8s/src/config.rs @@ -20,10 +20,10 @@ pub struct Config { /// The kubernetes cluster DNS suffix #[arg(long, default_value = "cluster.local")] pub cluster_suffix: String, - /// maximum interval between accepted `HeartbeatStatus` + /// Maximum interval between accepted `HeartbeatStatus` #[arg(long, default_value = "2")] pub heartbeat_period: u64, - /// unreachable counter threshold + /// Sidecar unreachable counter threshold #[arg(long, default_value = "4")] pub unreachable_thresh: usize, } diff --git a/operator-k8s/src/controller/mod.rs b/operator-k8s/src/controller/mod.rs index 9d96afab..36b68cc5 100644 --- a/operator-k8s/src/controller/mod.rs +++ b/operator-k8s/src/controller/mod.rs @@ -1,5 +1,6 @@ use std::error::Error; use std::fmt::Debug; +use std::future::Future; use std::hash::Hash; use std::sync::Arc; @@ -84,4 +85,18 @@ where .for_each(|_| futures::future::ready(())) .await; } + + /// Run this controller with a shutdown signal + async fn run_with_shutdown( + controller: Arc, + api: Api, + trigger: impl Future + Send + Sync + 'static, + ) { + kube::runtime::Controller::new(api, WatcherConfig::default()) + .graceful_shutdown_on(trigger) + .run(Self::reconcile, Self::on_error, controller) + .filter_map(|res| async move { res.ok() }) + .for_each(|_| futures::future::ready(())) + .await; + } } diff --git a/operator-k8s/src/lib.rs b/operator-k8s/src/lib.rs index 60c93131..a3ad1015 100644 --- a/operator-k8s/src/lib.rs +++ b/operator-k8s/src/lib.rs @@ -159,9 +159,9 @@ mod controller; mod crd; /// Custom resource manager mod manager; +/// Sidecar monitor +mod monitor; /// Xline operator pub mod operator; /// Xline operator web server router mod router; -/// Maintain the state of sidecar operators -mod sidecar_state; diff --git a/operator-k8s/src/sidecar_state.rs b/operator-k8s/src/monitor.rs similarity index 92% rename from operator-k8s/src/sidecar_state.rs rename to operator-k8s/src/monitor.rs index 249843ca..34fc627f 100644 --- a/operator-k8s/src/sidecar_state.rs +++ b/operator-k8s/src/monitor.rs @@ -12,8 +12,8 @@ use tracing::{debug, error}; use crate::crd::Cluster; -/// State of sidecar operators -pub(crate) struct SidecarState { +/// Sidecar monitor +pub(crate) struct SidecarMonitor { /// Map for each sidecar operator and its status statuses: HashMap, /// Receiver for heartbeat status @@ -30,7 +30,7 @@ pub(crate) struct SidecarState { unreachable_thresh: usize, } -impl SidecarState { +impl SidecarMonitor { /// Creates a new `SidecarState` pub(crate) fn new( status_rx: Receiver, @@ -67,14 +67,20 @@ impl SidecarState { } } - /// Inner task for state update + /// Inner task for state update, return the unrecoverable error async fn state_update_inner(mut self) -> Result<()> { loop { let status = self.status_rx.recv_async().await?; debug!("received status: {status:?}"); let _prev = self.statuses.insert(status.name.clone(), status); - let spec_size = self.get_spec_size().await?; + let spec_size = match self.get_spec_size().await { + Ok(spec_size) => spec_size, + Err(err) => { + error!("get cluster size failed, error: {err}"); + continue; + } + }; let majority = (spec_size / 2).overflow_add(1); debug!("spec.size: {spec_size}, majority: {majority}"); @@ -182,7 +188,7 @@ mod test { ]; assert!( - SidecarState::get_reachable_counts(&statuses1.into(), heartbeat_period, majority) + SidecarMonitor::get_reachable_counts(&statuses1.into(), heartbeat_period, majority) .is_none(), "the reachable status should not be accepted" ); @@ -203,7 +209,7 @@ mod test { ]; let counts = - SidecarState::get_reachable_counts(&statuses0.into(), heartbeat_period, majority) + SidecarMonitor::get_reachable_counts(&statuses0.into(), heartbeat_period, majority) .expect("the status not accepted"); assert_eq!(counts[&id0], 1); diff --git a/operator-k8s/src/operator.rs b/operator-k8s/src/operator.rs index ace78170..9525c030 100644 --- a/operator-k8s/src/operator.rs +++ b/operator-k8s/src/operator.rs @@ -5,7 +5,6 @@ use anyhow::Result; use axum::routing::any; use axum::routing::post; use axum::{Extension, Router}; -use flume::Sender; use futures::FutureExt; use k8s_openapi::api::core::v1::Pod; use k8s_openapi::apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceDefinition; @@ -15,8 +14,8 @@ use kube::runtime::wait::{await_condition, conditions}; use kube::{Api, Client, CustomResourceExt, Resource}; use operator_api::HeartbeatStatus; use prometheus::Registry; -use tokio::signal; -use tracing::{debug, info, warn}; +use tokio::sync::watch::{Receiver, Sender}; +use tracing::{debug, error, info, warn}; use crate::config::{Config, Namespace}; use crate::consts::FIELD_MANAGER; @@ -24,8 +23,8 @@ use crate::controller::cluster::{ClusterController, ClusterMetrics}; use crate::controller::{Controller, Metrics}; use crate::crd::version::ApiVersion; use crate::crd::Cluster; +use crate::monitor::SidecarMonitor; use crate::router::{healthz, metrics, sidecar_state}; -use crate::sidecar_state::SidecarState; /// wait crd to establish timeout const CRD_ESTABLISH_TIMEOUT: Duration = Duration::from_secs(20); @@ -63,72 +62,147 @@ impl Operator { (Api::all(kube_client.clone()), Api::all(kube_client.clone())) } }; + let (graceful_shutdown_event, _) = tokio::sync::watch::channel(()); + let forceful_shutdown = self.forceful_shutdown(&graceful_shutdown_event); let (status_tx, status_rx) = flume::unbounded(); - let graceful_shutdown_event = event_listener::Event::new(); - let forceful_shutdown = async { - info!("press ctrl+c to shut down gracefully"); - let _ctrl_c = tokio::signal::ctrl_c().await; - graceful_shutdown_event.notify(usize::MAX); - info!("graceful shutdown already requested, press ctrl+c again to force shut down"); - let _ctrl_c_c = tokio::signal::ctrl_c().await; - }; + let registry = Registry::new(); - let state_update_task = SidecarState::new( + self.start_sidecar_monitor( status_rx, - self.config.heartbeat_period, cluster_api.clone(), pod_api, - self.config.unreachable_thresh, - ) - .run_with_graceful_shutdown(graceful_shutdown_event.listen()); + graceful_shutdown_event.subscribe(), + ); + self.start_controller( + kube_client, + cluster_api, + ®istry, + graceful_shutdown_event.subscribe(), + )?; + self.start_web_server(status_tx, registry, graceful_shutdown_event.subscribe())?; + + tokio::pin!(forceful_shutdown); + + #[allow(clippy::integer_arithmetic)] // this error originates in the macro `tokio::select` + { + tokio::select! { + _ = &mut forceful_shutdown => { + warn!("forceful shutdown"); + } + _ = graceful_shutdown_event.closed() => { + info!("graceful shutdown"); + } + } + } + + Ok(()) + } + /// Forceful shutdown + async fn forceful_shutdown(&self, graceful_shutdown_event: &Sender<()>) { + info!("press ctrl+c to shut down gracefully"); + let _ctrl_c = tokio::signal::ctrl_c().await; + let _ig = graceful_shutdown_event.send(()); + info!("graceful shutdown already requested to {} components, press ctrl+c again to force shut down", graceful_shutdown_event.receiver_count()); + let _ctrl_c_c = tokio::signal::ctrl_c().await; + } + + /// Start controller + fn start_controller( + &self, + kube_client: Client, + cluster_api: Api, + registry: &Registry, + graceful_shutdown: Receiver<()>, + ) -> Result<()> { let metrics = ClusterMetrics::new(); - let registry = Registry::new(); - metrics.register(®istry)?; + metrics.register(registry)?; + let controller = Arc::new(ClusterController { kube_client, cluster_suffix: self.config.cluster_suffix.clone(), metrics, }); - let mut controller = ClusterController::run(controller, cluster_api); + #[allow(unsafe_code)] // safe + let _ig = tokio::spawn(async move { + let mut shutdown = graceful_shutdown; - let web_server = self.web_server(status_tx, registry); + { + // Safety: + // Some hacking to make future generated from `graceful_shutdown` to be 'static + // The 'static marker is required by `kube::runtime::Controller::graceful_shutdown_on` + // and it is not good for our design. + let shutdown_static: &'static mut Receiver<()> = + unsafe { std::mem::transmute(&mut shutdown) }; + ClusterController::run_with_shutdown( + controller, + cluster_api, + shutdown_static.changed().map(|_| ()), + ) + .await; + } - tokio::pin!(forceful_shutdown); - tokio::pin!(web_server); - tokio::pin!(state_update_task); + // yes, we cheated the `ClusterController`, but now it is dead so we can safely dropped here + drop(shutdown); + info!("controller shutdown"); + }); + Ok(()) + } - let mut web_server_shutdown = false; - let mut controller_shutdown = false; - let mut state_update_shutdown = false; + /// Start sidecar monitor + fn start_sidecar_monitor( + &self, + status_rx: flume::Receiver, + cluster_api: Api, + pod_api: Api, + graceful_shutdown: Receiver<()>, + ) { + let monitor = SidecarMonitor::new( + status_rx, + self.config.heartbeat_period, + cluster_api, + pod_api, + self.config.unreachable_thresh, + ); - #[allow(clippy::integer_arithmetic)] // required by tokio::select - loop { - tokio::select! { - _ = &mut forceful_shutdown => { - warn!("forceful shutdown"); - break - } - res = &mut state_update_task, if !state_update_shutdown => { - res?; - state_update_shutdown = true; - info!("state update task graceful shutdown"); - } - res = &mut web_server, if !web_server_shutdown => { - res?; - web_server_shutdown = true; - info!("web server graceful shutdown"); - } - _ = &mut controller, if !controller_shutdown => { - controller_shutdown = true; - info!("controller graceful shutdown"); - } + let _ig = tokio::spawn(async move { + let mut shutdown = graceful_shutdown; + let res = monitor + .run_with_graceful_shutdown(shutdown.changed().map(|_| ())) + .await; + if let Err(err) = res { + error!("monitor run failed, error: {err}"); } + info!("sidecar monitor shutdown"); + }); + } + + /// Start web server + fn start_web_server( + &self, + status_tx: flume::Sender, + registry: Registry, + graceful_shutdown: Receiver<()>, + ) -> Result<()> { + let status = Router::new() + .route("/status", post(sidecar_state)) + .route("/metrics", any(metrics)) + .route("/healthz", any(healthz)) + .layer(Extension(status_tx)) + .layer(Extension(registry)); + let server = axum::Server::bind(&self.config.listen_addr.parse()?); - if web_server_shutdown && controller_shutdown && state_update_shutdown { - break; + let _ig = tokio::spawn(async move { + let mut shutdown = graceful_shutdown; + let res = server + .serve(status.into_make_service()) + .with_graceful_shutdown(shutdown.changed().map(|_| ())) + .await; + if let Err(err) = res { + error!("web server starts failed, error: {err}"); } - } + info!("web server shut down"); + }); Ok(()) } @@ -282,25 +356,4 @@ impl Operator { Self::wait_crd_established(crd_api.clone(), Cluster::crd_name()).await?; Ok(()) } - - /// Run a server that receive sidecar operators' status - async fn web_server( - &self, - status_tx: Sender, - registry: Registry, - ) -> Result<()> { - let status = Router::new() - .route("/status", post(sidecar_state)) - .route("/metrics", any(metrics)) - .route("/healthz", any(healthz)) - .layer(Extension(status_tx)) - .layer(Extension(registry)); - - axum::Server::bind(&self.config.listen_addr.parse()?) - .serve(status.into_make_service()) - .with_graceful_shutdown(signal::ctrl_c().map(|_| ())) - .await?; - - Ok(()) - } }