Skip to content

Commit

Permalink
refactor: start components in separated tasks
Browse files Browse the repository at this point in the history
Signed-off-by: iGxnon <[email protected]>
  • Loading branch information
iGxnon committed Oct 23, 2023
1 parent 6417e9e commit d375989
Show file tree
Hide file tree
Showing 5 changed files with 157 additions and 83 deletions.
4 changes: 2 additions & 2 deletions operator-k8s/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ pub struct Config {
/// The kubernetes cluster DNS suffix
#[arg(long, default_value = "cluster.local")]
pub cluster_suffix: String,
/// maximum interval between accepted `HeartbeatStatus`
/// Maximum interval between accepted `HeartbeatStatus`
#[arg(long, default_value = "2")]
pub heartbeat_period: u64,
/// unreachable counter threshold
/// Sidecar unreachable counter threshold
#[arg(long, default_value = "4")]
pub unreachable_thresh: usize,
}
Expand Down
15 changes: 15 additions & 0 deletions operator-k8s/src/controller/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::error::Error;
use std::fmt::Debug;
use std::future::Future;
use std::hash::Hash;
use std::sync::Arc;

Expand Down Expand Up @@ -84,4 +85,18 @@ where
.for_each(|_| futures::future::ready(()))
.await;
}

/// Run this controller with a shutdown signal
async fn run_with_shutdown(
controller: Arc<Self>,
api: Api<R>,
trigger: impl Future<Output = ()> + Send + Sync + 'static,
) {
kube::runtime::Controller::new(api, WatcherConfig::default())
.graceful_shutdown_on(trigger)
.run(Self::reconcile, Self::on_error, controller)
.filter_map(|res| async move { res.ok() })
.for_each(|_| futures::future::ready(()))
.await;
}
}
4 changes: 2 additions & 2 deletions operator-k8s/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,9 +159,9 @@ mod controller;
mod crd;
/// Custom resource manager
mod manager;
/// Sidecar monitor
mod monitor;
/// Xline operator
pub mod operator;
/// Xline operator web server router
mod router;
/// Maintain the state of sidecar operators
mod sidecar_state;
20 changes: 13 additions & 7 deletions operator-k8s/src/sidecar_state.rs → operator-k8s/src/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ use tracing::{debug, error};

use crate::crd::Cluster;

/// State of sidecar operators
pub(crate) struct SidecarState {
/// Sidecar monitor
pub(crate) struct SidecarMonitor {
/// Map for each sidecar operator and its status
statuses: HashMap<String, HeartbeatStatus>,
/// Receiver for heartbeat status
Expand All @@ -30,7 +30,7 @@ pub(crate) struct SidecarState {
unreachable_thresh: usize,
}

impl SidecarState {
impl SidecarMonitor {
/// Creates a new `SidecarState`
pub(crate) fn new(
status_rx: Receiver<HeartbeatStatus>,
Expand Down Expand Up @@ -67,14 +67,20 @@ impl SidecarState {
}
}

/// Inner task for state update
/// Inner task for state update, return the unrecoverable error
async fn state_update_inner(mut self) -> Result<()> {
loop {
let status = self.status_rx.recv_async().await?;
debug!("received status: {status:?}");
let _prev = self.statuses.insert(status.name.clone(), status);

let spec_size = self.get_spec_size().await?;
let spec_size = match self.get_spec_size().await {
Ok(spec_size) => spec_size,
Err(err) => {
error!("get cluster size failed, error: {err}");
continue;
}
};
let majority = (spec_size / 2).overflow_add(1);
debug!("spec.size: {spec_size}, majority: {majority}");

Expand Down Expand Up @@ -182,7 +188,7 @@ mod test {
];

assert!(
SidecarState::get_reachable_counts(&statuses1.into(), heartbeat_period, majority)
SidecarMonitor::get_reachable_counts(&statuses1.into(), heartbeat_period, majority)
.is_none(),
"the reachable status should not be accepted"
);
Expand All @@ -203,7 +209,7 @@ mod test {
];

let counts =
SidecarState::get_reachable_counts(&statuses0.into(), heartbeat_period, majority)
SidecarMonitor::get_reachable_counts(&statuses0.into(), heartbeat_period, majority)
.expect("the status not accepted");

assert_eq!(counts[&id0], 1);
Expand Down
197 changes: 125 additions & 72 deletions operator-k8s/src/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use anyhow::Result;
use axum::routing::any;
use axum::routing::post;
use axum::{Extension, Router};
use flume::Sender;
use futures::FutureExt;
use k8s_openapi::api::core::v1::Pod;
use k8s_openapi::apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceDefinition;
Expand All @@ -15,17 +14,17 @@ use kube::runtime::wait::{await_condition, conditions};
use kube::{Api, Client, CustomResourceExt, Resource};
use operator_api::HeartbeatStatus;
use prometheus::Registry;
use tokio::signal;
use tracing::{debug, info, warn};
use tokio::sync::watch::{Receiver, Sender};
use tracing::{debug, error, info, warn};

use crate::config::{Config, Namespace};
use crate::consts::FIELD_MANAGER;
use crate::controller::cluster::{ClusterController, ClusterMetrics};
use crate::controller::{Controller, Metrics};
use crate::crd::version::ApiVersion;
use crate::crd::Cluster;
use crate::monitor::SidecarMonitor;
use crate::router::{healthz, metrics, sidecar_state};
use crate::sidecar_state::SidecarState;

/// wait crd to establish timeout
const CRD_ESTABLISH_TIMEOUT: Duration = Duration::from_secs(20);
Expand Down Expand Up @@ -63,72 +62,147 @@ impl Operator {
(Api::all(kube_client.clone()), Api::all(kube_client.clone()))
}
};
let (graceful_shutdown_event, _) = tokio::sync::watch::channel(());
let forceful_shutdown = self.forceful_shutdown(&graceful_shutdown_event);
let (status_tx, status_rx) = flume::unbounded();
let graceful_shutdown_event = event_listener::Event::new();
let forceful_shutdown = async {
info!("press ctrl+c to shut down gracefully");
let _ctrl_c = tokio::signal::ctrl_c().await;
graceful_shutdown_event.notify(usize::MAX);
info!("graceful shutdown already requested, press ctrl+c again to force shut down");
let _ctrl_c_c = tokio::signal::ctrl_c().await;
};
let registry = Registry::new();

let state_update_task = SidecarState::new(
self.start_sidecar_monitor(
status_rx,
self.config.heartbeat_period,
cluster_api.clone(),
pod_api,
self.config.unreachable_thresh,
)
.run_with_graceful_shutdown(graceful_shutdown_event.listen());
graceful_shutdown_event.subscribe(),
);
self.start_controller(
kube_client,
cluster_api,
&registry,
graceful_shutdown_event.subscribe(),
)?;
self.start_web_server(status_tx, registry, graceful_shutdown_event.subscribe())?;

tokio::pin!(forceful_shutdown);

#[allow(clippy::integer_arithmetic)] // this error originates in the macro `tokio::select`
{
tokio::select! {
_ = &mut forceful_shutdown => {
warn!("forceful shutdown");
}
_ = graceful_shutdown_event.closed() => {
info!("graceful shutdown");
}
}
}

Ok(())
}

/// Forceful shutdown
async fn forceful_shutdown(&self, graceful_shutdown_event: &Sender<()>) {
info!("press ctrl+c to shut down gracefully");
let _ctrl_c = tokio::signal::ctrl_c().await;
let _ig = graceful_shutdown_event.send(());
info!("graceful shutdown already requested to {} components, press ctrl+c again to force shut down", graceful_shutdown_event.receiver_count());
let _ctrl_c_c = tokio::signal::ctrl_c().await;
}

/// Start controller
fn start_controller(
&self,
kube_client: Client,
cluster_api: Api<Cluster>,
registry: &Registry,
graceful_shutdown: Receiver<()>,
) -> Result<()> {
let metrics = ClusterMetrics::new();
let registry = Registry::new();
metrics.register(&registry)?;
metrics.register(registry)?;

let controller = Arc::new(ClusterController {
kube_client,
cluster_suffix: self.config.cluster_suffix.clone(),
metrics,
});
let mut controller = ClusterController::run(controller, cluster_api);
#[allow(unsafe_code)] // safe
let _ig = tokio::spawn(async move {
let mut shutdown = graceful_shutdown;

let web_server = self.web_server(status_tx, registry);
{
// Safety:
// Some hacking to make future generated from `graceful_shutdown` to be 'static
// The 'static marker is required by `kube::runtime::Controller::graceful_shutdown_on`
// and it is not good for our design.
let shutdown_static: &'static mut Receiver<()> =
unsafe { std::mem::transmute(&mut shutdown) };
ClusterController::run_with_shutdown(
controller,
cluster_api,
shutdown_static.changed().map(|_| ()),
)
.await;
}

tokio::pin!(forceful_shutdown);
tokio::pin!(web_server);
tokio::pin!(state_update_task);
// yes, we cheated the `ClusterController`, but now it is dead so we can safely dropped here
drop(shutdown);
info!("controller shutdown");
});
Ok(())
}

let mut web_server_shutdown = false;
let mut controller_shutdown = false;
let mut state_update_shutdown = false;
/// Start sidecar monitor
fn start_sidecar_monitor(
&self,
status_rx: flume::Receiver<HeartbeatStatus>,
cluster_api: Api<Cluster>,
pod_api: Api<Pod>,
graceful_shutdown: Receiver<()>,
) {
let monitor = SidecarMonitor::new(
status_rx,
self.config.heartbeat_period,
cluster_api,
pod_api,
self.config.unreachable_thresh,
);

#[allow(clippy::integer_arithmetic)] // required by tokio::select
loop {
tokio::select! {
_ = &mut forceful_shutdown => {
warn!("forceful shutdown");
break
}
res = &mut state_update_task, if !state_update_shutdown => {
res?;
state_update_shutdown = true;
info!("state update task graceful shutdown");
}
res = &mut web_server, if !web_server_shutdown => {
res?;
web_server_shutdown = true;
info!("web server graceful shutdown");
}
_ = &mut controller, if !controller_shutdown => {
controller_shutdown = true;
info!("controller graceful shutdown");
}
let _ig = tokio::spawn(async move {
let mut shutdown = graceful_shutdown;
let res = monitor
.run_with_graceful_shutdown(shutdown.changed().map(|_| ()))
.await;
if let Err(err) = res {
error!("monitor run failed, error: {err}");
}
info!("sidecar monitor shutdown");
});
}

/// Start web server
fn start_web_server(
&self,
status_tx: flume::Sender<HeartbeatStatus>,
registry: Registry,
graceful_shutdown: Receiver<()>,
) -> Result<()> {
let status = Router::new()
.route("/status", post(sidecar_state))
.route("/metrics", any(metrics))
.route("/healthz", any(healthz))
.layer(Extension(status_tx))
.layer(Extension(registry));
let server = axum::Server::bind(&self.config.listen_addr.parse()?);

if web_server_shutdown && controller_shutdown && state_update_shutdown {
break;
let _ig = tokio::spawn(async move {
let mut shutdown = graceful_shutdown;
let res = server
.serve(status.into_make_service())
.with_graceful_shutdown(shutdown.changed().map(|_| ()))
.await;
if let Err(err) = res {
error!("web server starts failed, error: {err}");
}
}
info!("web server shut down");
});

Ok(())
}
Expand Down Expand Up @@ -282,25 +356,4 @@ impl Operator {
Self::wait_crd_established(crd_api.clone(), Cluster::crd_name()).await?;
Ok(())
}

/// Run a server that receive sidecar operators' status
async fn web_server(
&self,
status_tx: Sender<HeartbeatStatus>,
registry: Registry,
) -> Result<()> {
let status = Router::new()
.route("/status", post(sidecar_state))
.route("/metrics", any(metrics))
.route("/healthz", any(healthz))
.layer(Extension(status_tx))
.layer(Extension(registry));

axum::Server::bind(&self.config.listen_addr.parse()?)
.serve(status.into_make_service())
.with_graceful_shutdown(signal::ctrl_c().map(|_| ()))
.await?;

Ok(())
}
}

0 comments on commit d375989

Please sign in to comment.