Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor/Refactor controller logic #27

Merged
merged 10 commits into from
Oct 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/e2e.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ jobs:
cargo build --release
cd build
cp ../target/release/xline-operator .
docker build . -t datenlord/xline-operator:latest -f operator.Dockerfile
docker build . -t xline-kv/xline-operator:latest -f operator.Dockerfile
- name: 'E2E CI'
env:
KIND_CLUSTER_IMAGE: kindest/node:${{ matrix.k8s }}
Expand Down
20 changes: 6 additions & 14 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,4 @@ members = [
"operator-api",
"operator-k8s",
"sidecar",
"utils",
]
5 changes: 0 additions & 5 deletions build/xline-operator.Dockerfile

This file was deleted.

4 changes: 4 additions & 0 deletions operator-api/src/consts.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
/// Default backup PV mount path in container, this path cannot be mounted by user
pub const DEFAULT_BACKUP_DIR: &str = "/xline-backup";
/// Default xline data dir, this path cannot be mounted by user
pub const DEFAULT_DATA_DIR: &str = "/usr/local/xline/data-dir";
2 changes: 2 additions & 0 deletions operator-api/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
pub mod consts;

use serde::{Deserialize, Serialize};

/// Heartbeat status
Expand Down
3 changes: 1 addition & 2 deletions operator-k8s/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ async-trait = "0.1.68"
axum = "0.6.18"
clap = { version = "4.3.4", features = ["derive"] }
clippy-utilities = "0.2.0"
event-listener = "2.5.3"
flume = "0.10.14"
futures = "0.3.28"
k8s-openapi = { version = "0.18.0", features = ["v1_26", "schemars"] }
Expand All @@ -37,7 +36,7 @@ tokio = { version = "1.0", features = [
] }
tracing = "0.1.37"
tracing-subscriber = { version = "0.3.16", features = ["env-filter"] }
utils = { path = "../utils" }

[dev-dependencies]
garde = { version = "0.11.2", default-features = false, features = ["derive", "pattern"] }
serde_yaml = "0.9.25"
7 changes: 5 additions & 2 deletions operator-k8s/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,16 @@ pub struct Config {
/// Whether to create CRD regardless of current version on k8s
#[arg(long, default_value = "false")]
pub create_crd: bool,
/// Whether to enable auto migration if CRD version is less than current version
#[arg(long, default_value = "false")]
pub auto_migration: bool,
/// The kubernetes cluster DNS suffix
#[arg(long, default_value = "cluster.local")]
pub cluster_suffix: String,
/// maximum interval between accepted `HeartbeatStatus`
/// Maximum interval between accepted `HeartbeatStatus`
#[arg(long, default_value = "2")]
pub heartbeat_period: u64,
/// unreachable counter threshold
/// Sidecar unreachable counter threshold
#[arg(long, default_value = "4")]
pub unreachable_thresh: usize,
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
#![allow(unused)] // TODO remove

use std::time::Duration;

/// The default requeue duration to achieve eventual consistency
pub(super) const DEFAULT_REQUEUE_DURATION: Duration = Duration::from_secs(600);
pub(crate) const DEFAULT_REQUEUE_DURATION: Duration = Duration::from_secs(600);
/// The field manager identifier of xline operator
pub(super) const FIELD_MANAGER: &str = "xlineoperator.datenlord.io";
/// The emptyDir volume name of each pod if there is no data pvc specified
pub(crate) const DATA_EMPTY_DIR_NAME: &str = "xline-data-empty-dir";
pub(crate) const FIELD_MANAGER: &str = "xlineoperator.datenlord.io";
/// The image used for cronjob to trigger backup
/// The following command line tool should be available in this image
/// 1. curl
Expand All @@ -21,3 +21,12 @@ pub(crate) const DEFAULT_XLINE_PORT: i32 = 2379;
pub(crate) const DEFAULT_SIDECAR_PORT: i32 = 2380;
/// The environment name of the xline pod name
pub(crate) const XLINE_POD_NAME_ENV: &str = "XLINE_POD_NAME";
/// The annotation used to inherit labels in `XlineCluster`
pub(crate) const ANNOTATION_INHERIT_LABELS_PREFIX: &str =
"xlineoperator.datenlord.io/inherit-label-prefix";
/// The label attach to subresources, indicate the xlinecluster name
pub(crate) const LABEL_CLUSTER_NAME: &str = "xlinecluster/name";
/// The label attach to subresources, indicate the component type of this subresource
pub(crate) const LABEL_CLUSTER_COMPONENT: &str = "xlinecluster/component";
/// Indicate the version of operator that creates this subresource
pub(crate) const LABEL_OPERATOR_VERSION: &str = "xlinecluster/operator-version";
73 changes: 73 additions & 0 deletions operator-k8s/src/controller/cluster/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
use clippy_utilities::NumericCast;
use prometheus::{Error, Histogram, HistogramOpts, HistogramTimer, IntCounterVec, Opts, Registry};

use std::iter::repeat;
use std::ops::Mul;

use crate::controller::Metrics;

/// Cluster metrics
pub(crate) struct ClusterMetrics {
/// Reconcile duration histogram
reconcile_duration: Histogram,
/// Reconcile failed count
reconcile_failed_count: IntCounterVec,
}

impl Default for ClusterMetrics {
fn default() -> Self {
Self::new()
}
}

impl Metrics for ClusterMetrics {
/// Register metrics
fn register(&self, registry: &Registry) -> Result<(), Error> {
registry.register(Box::new(self.reconcile_duration.clone()))?;
registry.register(Box::new(self.reconcile_failed_count.clone()))
}

/// Record duration
fn record_duration(&self) -> HistogramTimer {
self.reconcile_duration.start_timer()
}

/// Increment failed count
fn record_failed_count(&self, labels: &[&str]) {
self.reconcile_failed_count.with_label_values(labels).inc();
}
}

impl ClusterMetrics {
/// Create a new cluster metrics
#[allow(clippy::expect_used)]
pub(crate) fn new() -> Self {
Self {
reconcile_duration: Histogram::with_opts(
HistogramOpts::new(
"operator_reconcile_duration_seconds",
"Duration of operator reconcile loop in seconds",
)
.buckets(exponential_time_bucket(0.1, 2.0, 10)),
)
.expect("failed to create operator_reconcile_duration_seconds histogram"),
reconcile_failed_count: IntCounterVec::new(
Opts::new(
"operator_reconcile_failed_count",
"Number of failed times the operator reconcile loop has run",
),
&["reason"],
)
.expect("failed to create operator_reconcile_failed_count counter"),
}
}
}

/// Returns a vector of time buckets for the reconcile duration histogram.
fn exponential_time_bucket(start: f64, factor: f64, count: usize) -> Vec<f64> {
repeat(factor)
.enumerate()
.take(count)
.map(|(i, f)| start.mul(f.powi(i.numeric_cast())))
.collect::<Vec<_>>()
}
81 changes: 4 additions & 77 deletions operator-k8s/src/controller/cluster/mod.rs
Original file line number Diff line number Diff line change
@@ -1,81 +1,8 @@
use clippy_utilities::NumericCast;
use prometheus::{Error, Histogram, HistogramOpts, HistogramTimer, IntCounterVec, Opts, Registry};

use std::iter::repeat;
use std::ops::Mul;

use crate::controller::Metrics;

/// Controller v1alpha
mod v1alpha;
/// Controller v1alpha1
mod v1alpha1;

/// Current controller of cluster
pub(crate) type Controller = v1alpha::ClusterController;

/// Cluster metrics
pub(crate) struct ClusterMetrics {
/// Reconcile duration histogram
reconcile_duration: Histogram,
/// Reconcile failed count
reconcile_failed_count: IntCounterVec,
}

impl Default for ClusterMetrics {
fn default() -> Self {
Self::new()
}
}

impl Metrics for ClusterMetrics {
/// Register metrics
fn register(&self, registry: &Registry) -> Result<(), Error> {
registry.register(Box::new(self.reconcile_duration.clone()))?;
registry.register(Box::new(self.reconcile_failed_count.clone()))
}

/// Record duration
fn record_duration(&self) -> HistogramTimer {
self.reconcile_duration.start_timer()
}

/// Increment failed count
fn record_failed_count(&self, labels: &[&str]) {
self.reconcile_failed_count.with_label_values(labels).inc();
}
}

impl ClusterMetrics {
/// Create a new cluster metrics
#[allow(clippy::expect_used)]
pub(crate) fn new() -> Self {
Self {
reconcile_duration: Histogram::with_opts(
HistogramOpts::new(
"operator_reconcile_duration_seconds",
"Duration of operator reconcile loop in seconds",
)
.buckets(exponential_time_bucket(0.1, 2.0, 10)),
)
.expect("failed to create operator_reconcile_duration_seconds histogram"),
reconcile_failed_count: IntCounterVec::new(
Opts::new(
"operator_reconcile_failed_count",
"Number of failed times the operator reconcile loop has run",
),
&["reason"],
)
.expect("failed to create operator_reconcile_failed_count counter"),
}
}
}
/// Controller metrics
mod metrics;

/// Returns a vector of time buckets for the reconcile duration histogram.
fn exponential_time_bucket(start: f64, factor: f64, count: usize) -> Vec<f64> {
repeat(factor)
.enumerate()
.take(count)
.map(|(i, f)| start.mul(f.powi(i.numeric_cast())))
.collect::<Vec<_>>()
}
pub(crate) use metrics::ClusterMetrics;
pub(crate) use v1alpha1::ClusterController;
Loading
Loading