diff --git a/crates/benchmark/src/runner.rs b/crates/benchmark/src/runner.rs index 26ae232d3..e62552a6d 100644 --- a/crates/benchmark/src/runner.rs +++ b/crates/benchmark/src/runner.rs @@ -19,7 +19,7 @@ use tokio::{ time::{Duration, Instant}, }; use tracing::debug; -use utils::config::ClientConfig; +use utils::config::client::ClientConfig; use xline_client::{types::kv::PutRequest, ClientOptions}; use crate::{args::Commands, bench_client::BenchClient, Benchmark}; diff --git a/crates/curp-test-utils/src/test_cmd.rs b/crates/curp-test-utils/src/test_cmd.rs index 1fdde4f1a..2674c2046 100644 --- a/crates/curp-test-utils/src/test_cmd.rs +++ b/crates/curp-test-utils/src/test_cmd.rs @@ -18,7 +18,7 @@ use serde::{Deserialize, Serialize}; use thiserror::Error; use tokio::{sync::mpsc, time::sleep}; use tracing::debug; -use utils::config::EngineConfig; +use utils::config::engine::EngineConfig; use crate::{META_TABLE, REVISION_TABLE, TEST_TABLE}; @@ -422,7 +422,7 @@ impl TestCE { after_sync_sender: mpsc::UnboundedSender<(TestCommand, LogIndex)>, engine_cfg: EngineConfig, ) -> Self { - let engine_type = match engine_cfg { + let engine_type: EngineType = match engine_cfg { EngineConfig::Memory => EngineType::Memory, EngineConfig::RocksDB(path) => EngineType::Rocks(path), _ => unreachable!("Not supported storage type"), diff --git a/crates/curp/src/client/mod.rs b/crates/curp/src/client/mod.rs index 17ded1a7d..88d925a9b 100644 --- a/crates/curp/src/client/mod.rs +++ b/crates/curp/src/client/mod.rs @@ -32,7 +32,7 @@ use tonic::transport::ClientTlsConfig; use tracing::debug; #[cfg(madsim)] use utils::ClientTlsConfig; -use utils::{build_endpoint, config::ClientConfig}; +use utils::{build_endpoint, config::client::ClientConfig}; use self::{ retry::{Retry, RetryConfig}, diff --git a/crates/curp/src/server/cmd_worker/mod.rs b/crates/curp/src/server/cmd_worker/mod.rs index f97be228c..d5023934e 100644 --- a/crates/curp/src/server/cmd_worker/mod.rs +++ b/crates/curp/src/server/cmd_worker/mod.rs @@ -422,7 +422,7 @@ mod tests { use test_macros::abort_on_panic; use tokio::{sync::mpsc, time::Instant}; use tracing_test::traced_test; - use utils::config::EngineConfig; + use utils::config::engine::EngineConfig; use super::*; use crate::{log_entry::LogEntry, rpc::ProposeId}; diff --git a/crates/curp/src/server/curp_node.rs b/crates/curp/src/server/curp_node.rs index ea0f0806b..d93c65dcd 100644 --- a/crates/curp/src/server/curp_node.rs +++ b/crates/curp/src/server/curp_node.rs @@ -21,15 +21,17 @@ use tracing::{debug, error, info, trace, warn}; #[cfg(madsim)] use utils::ClientTlsConfig; use utils::{ - config::CurpConfig, + config::curp::CurpConfig, task_manager::{tasks::TaskName, Listener, State, TaskManager}, }; use super::{ cmd_board::{CmdBoardRef, CommandBoard}, cmd_worker::{conflict_checked_mpmc, start_cmd_workers}, - conflict::spec_pool_new::{SpObject, SpeculativePool}, - conflict::uncommitted_pool::{UcpObject, UncommittedPool}, + conflict::{ + spec_pool_new::{SpObject, SpeculativePool}, + uncommitted_pool::{UcpObject, UncommittedPool}, + }, gc::gc_cmd_board, lease_manager::LeaseManager, raw_curp::{AppendEntries, RawCurp, Vote}, diff --git a/crates/curp/src/server/mod.rs b/crates/curp/src/server/mod.rs index b6fca3a99..4aed0b8de 100644 --- a/crates/curp/src/server/mod.rs +++ b/crates/curp/src/server/mod.rs @@ -7,11 +7,13 @@ use tonic::transport::ClientTlsConfig; use tracing::instrument; #[cfg(madsim)] use utils::ClientTlsConfig; -use utils::{config::CurpConfig, task_manager::TaskManager, tracing::Extract}; +use utils::{config::curp::CurpConfig, task_manager::TaskManager, tracing::Extract}; use self::curp_node::CurpNode; -pub use self::raw_curp::RawCurp; -pub use self::{conflict::spec_pool_new::SpObject, conflict::uncommitted_pool::UcpObject}; +pub use self::{ + conflict::{spec_pool_new::SpObject, uncommitted_pool::UcpObject}, + raw_curp::RawCurp, +}; use crate::{ cmd::{Command, CommandExecutor}, members::{ClusterInfo, ServerId}, diff --git a/crates/curp/src/server/raw_curp/log.rs b/crates/curp/src/server/raw_curp/log.rs index 4aee089d3..b5410b04a 100644 --- a/crates/curp/src/server/raw_curp/log.rs +++ b/crates/curp/src/server/raw_curp/log.rs @@ -455,7 +455,7 @@ mod tests { use std::{iter::repeat, ops::Index, sync::Arc}; use curp_test_utils::test_cmd::TestCommand; - use utils::config::{default_batch_max_size, default_log_entries_cap}; + use utils::config::curp::{default_batch_max_size, default_log_entries_cap}; use super::*; diff --git a/crates/curp/src/server/raw_curp/mod.rs b/crates/curp/src/server/raw_curp/mod.rs index 86c62eace..76a0fd658 100644 --- a/crates/curp/src/server/raw_curp/mod.rs +++ b/crates/curp/src/server/raw_curp/mod.rs @@ -37,7 +37,7 @@ use tracing::{ #[cfg(madsim)] use utils::ClientTlsConfig; use utils::{ - config::CurpConfig, + config::curp::CurpConfig, parking_lot_lock::{MutexMap, RwLockMap}, task_manager::TaskManager, }; @@ -47,9 +47,11 @@ use self::{ state::{CandidateState, LeaderState, State}, }; use super::{ - cmd_worker::CEEventTxApi, conflict::spec_pool_new::SpeculativePool, - conflict::uncommitted_pool::UncommittedPool, lease_manager::LeaseManagerRef, - storage::StorageApi, DB, + cmd_worker::CEEventTxApi, + conflict::{spec_pool_new::SpeculativePool, uncommitted_pool::UncommittedPool}, + lease_manager::LeaseManagerRef, + storage::StorageApi, + DB, }; use crate::{ cmd::Command, diff --git a/crates/curp/src/server/raw_curp/tests.rs b/crates/curp/src/server/raw_curp/tests.rs index 5e3896c37..703a5d361 100644 --- a/crates/curp/src/server/raw_curp/tests.rs +++ b/crates/curp/src/server/raw_curp/tests.rs @@ -7,7 +7,7 @@ use tokio::{ time::{sleep, Instant}, }; use tracing_test::traced_test; -use utils::config::{ +use utils::config::curp::{ default_candidate_timeout_ticks, default_follower_timeout_ticks, default_heartbeat_interval, CurpConfigBuilder, }; diff --git a/crates/curp/src/server/storage/db.rs b/crates/curp/src/server/storage/db.rs index 0c8433080..a8e9cd2b9 100644 --- a/crates/curp/src/server/storage/db.rs +++ b/crates/curp/src/server/storage/db.rs @@ -3,7 +3,7 @@ use std::marker::PhantomData; use async_trait::async_trait; use engine::{Engine, EngineType, StorageEngine, WriteOperation}; use prost::Message; -use utils::config::EngineConfig; +use utils::config::engine::EngineConfig; use super::{StorageApi, StorageError}; use crate::{ diff --git a/crates/curp/tests/it/common/curp_group.rs b/crates/curp/tests/it/common/curp_group.rs index 8a4cccfc2..188b5cc42 100644 --- a/crates/curp/tests/it/common/curp_group.rs +++ b/crates/curp/tests/it/common/curp_group.rs @@ -39,7 +39,10 @@ use tracing::debug; use utils::{ build_endpoint, config::{ - default_quota, ClientConfig, CurpConfig, CurpConfigBuilder, EngineConfig, StorageConfig, + client::ClientConfig, + curp::{CurpConfig, CurpConfigBuilder}, + engine::EngineConfig, + storage::{default_quota, StorageConfig}, }, task_manager::{tasks::TaskName, Listener, TaskManager}, }; diff --git a/crates/curp/tests/it/server.rs b/crates/curp/tests/it/server.rs index 5ea89a808..b23b1b6e6 100644 --- a/crates/curp/tests/it/server.rs +++ b/crates/curp/tests/it/server.rs @@ -15,7 +15,7 @@ use curp_test_utils::{ use madsim::rand::{thread_rng, Rng}; use test_macros::abort_on_panic; use tokio::net::TcpListener; -use utils::{config::ClientConfig, timestamp}; +use utils::{config::client::ClientConfig, timestamp}; use crate::common::curp_group::{ commandpb::ProposeId, CurpGroup, FetchClusterRequest, ProposeRequest, ProposeResponse, diff --git a/crates/simulation/src/curp_group.rs b/crates/simulation/src/curp_group.rs index ebca5fa2b..01f629e8e 100644 --- a/crates/simulation/src/curp_group.rs +++ b/crates/simulation/src/curp_group.rs @@ -29,7 +29,7 @@ use parking_lot::Mutex; use tokio::sync::mpsc; use tracing::debug; use utils::{ - config::{ClientConfig, CurpConfigBuilder, EngineConfig}, + config::{client::ClientConfig, curp::CurpConfigBuilder, engine::EngineConfig}, task_manager::TaskManager, }; diff --git a/crates/simulation/src/xline_group.rs b/crates/simulation/src/xline_group.rs index 9943400dd..81f8692c5 100644 --- a/crates/simulation/src/xline_group.rs +++ b/crates/simulation/src/xline_group.rs @@ -5,8 +5,14 @@ use madsim::runtime::NodeHandle; use tonic::transport::Channel; use tracing::debug; use utils::config::{ - AuthConfig, ClientConfig, ClusterConfig, CompactConfig, CurpConfig, InitialClusterState, - ServerTimeout, StorageConfig, TlsConfig, + auth::AuthConfig, + client::ClientConfig, + cluster::{ClusterConfig, InitialClusterState}, + compact::CompactConfig, + curp::CurpConfig, + server::ServerTimeout, + storage::StorageConfig, + tls::TlsConfig, }; use xline::server::XlineServer; use xline_client::{ diff --git a/crates/utils/src/config.rs b/crates/utils/src/config.rs deleted file mode 100644 index dcf7e547d..000000000 --- a/crates/utils/src/config.rs +++ /dev/null @@ -1,1526 +0,0 @@ -use std::{collections::HashMap, path::PathBuf, time::Duration}; - -use derive_builder::Builder; -use getset::Getters; -use serde::Deserialize; -use tracing_appender::rolling::RollingFileAppender; - -/// Xline server configuration object -#[allow(clippy::module_name_repetitions)] -#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Getters, Default)] -pub struct XlineServerConfig { - /// cluster configuration object - #[getset(get = "pub")] - cluster: ClusterConfig, - /// xline storage configuration object - #[getset(get = "pub")] - storage: StorageConfig, - /// log configuration object - #[getset(get = "pub")] - log: LogConfig, - /// trace configuration object - #[getset(get = "pub")] - trace: TraceConfig, - /// auth configuration object - #[getset(get = "pub")] - auth: AuthConfig, - /// compactor configuration object - #[getset(get = "pub")] - compact: CompactConfig, - /// tls configuration object - #[getset(get = "pub")] - tls: TlsConfig, - /// Metrics config - #[getset(get = "pub")] - #[serde(default = "MetricsConfig::default")] - metrics: MetricsConfig, -} - -/// Cluster Range type alias -pub type ClusterRange = std::ops::Range; -/// Log verbosity level alias -#[allow(clippy::module_name_repetitions)] -pub type LevelConfig = tracing::metadata::LevelFilter; - -/// `Duration` deserialization formatter -pub mod duration_format { - use std::time::Duration; - - use serde::{Deserialize, Deserializer}; - - use crate::parse_duration; - - /// deserializes a cluster duration - #[allow(single_use_lifetimes)] // the false positive case blocks us - pub(crate) fn deserialize<'de, D>(deserializer: D) -> Result - where - D: Deserializer<'de>, - { - let s = String::deserialize(deserializer)?; - parse_duration(&s).map_err(serde::de::Error::custom) - } -} - -/// batch size deserialization formatter -pub mod bytes_format { - use serde::{Deserialize, Deserializer}; - - use crate::parse_batch_bytes; - - /// deserializes a cluster duration - #[allow(single_use_lifetimes)] // the false positive case blocks us - pub(crate) fn deserialize<'de, D>(deserializer: D) -> Result - where - D: Deserializer<'de>, - { - let s = String::deserialize(deserializer)?; - parse_batch_bytes(&s).map_err(serde::de::Error::custom) - } -} - -/// Cluster configuration object, including cluster relevant configuration fields -#[allow(clippy::module_name_repetitions)] -#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Getters)] -pub struct ClusterConfig { - /// Get xline server name - #[getset(get = "pub")] - name: String, - /// Xline server peer listen urls - #[getset(get = "pub")] - peer_listen_urls: Vec, - /// Xline server peer advertise urls - #[getset(get = "pub")] - peer_advertise_urls: Vec, - /// Xline server client listen urls - #[getset(get = "pub")] - client_listen_urls: Vec, - /// Xline server client advertise urls - #[getset(get = "pub")] - client_advertise_urls: Vec, - /// All the nodes in the xline cluster - #[getset(get = "pub")] - peers: HashMap>, - /// Leader node. - #[getset(get = "pub")] - is_leader: bool, - /// Curp server timeout settings - #[getset(get = "pub")] - #[serde(default = "CurpConfig::default")] - curp_config: CurpConfig, - /// Curp client config settings - #[getset(get = "pub")] - #[serde(default = "ClientConfig::default")] - client_config: ClientConfig, - /// Xline server timeout settings - #[getset(get = "pub")] - #[serde(default = "ServerTimeout::default")] - server_timeout: ServerTimeout, - /// Xline server initial state - #[getset(get = "pub")] - #[serde(with = "state_format", default = "InitialClusterState::default")] - initial_cluster_state: InitialClusterState, -} - -impl Default for ClusterConfig { - #[inline] - fn default() -> Self { - Self { - name: "default".to_owned(), - peer_listen_urls: vec!["http://127.0.0.1:2380".to_owned()], - peer_advertise_urls: vec!["http://127.0.0.1:2380".to_owned()], - client_listen_urls: vec!["http://127.0.0.1:2379".to_owned()], - client_advertise_urls: vec!["http://127.0.0.1:2379".to_owned()], - peers: HashMap::from([( - "default".to_owned(), - vec!["http://127.0.0.1:2379".to_owned()], - )]), - is_leader: false, - curp_config: CurpConfig::default(), - client_config: ClientConfig::default(), - server_timeout: ServerTimeout::default(), - initial_cluster_state: InitialClusterState::default(), - } - } -} - -/// Initial cluster state of xline server -#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Deserialize)] -#[non_exhaustive] -pub enum InitialClusterState { - /// Create a new cluster - #[default] - New, - /// Join an existing cluster - Existing, -} - -/// `InitialClusterState` deserialization formatter -pub mod state_format { - use serde::{Deserialize, Deserializer}; - - use super::InitialClusterState; - use crate::parse_state; - - /// deserializes a cluster log rotation strategy - #[allow(single_use_lifetimes)] - pub(crate) fn deserialize<'de, D>(deserializer: D) -> Result - where - D: Deserializer<'de>, - { - let s = String::deserialize(deserializer)?; - parse_state(&s).map_err(serde::de::Error::custom) - } -} - -impl ClusterConfig { - /// Generate a new `ClusterConfig` object - #[must_use] - #[inline] - #[allow(clippy::too_many_arguments)] - pub fn new( - name: String, - peer_listen_urls: Vec, - peer_advertise_urls: Vec, - client_listen_urls: Vec, - client_advertise_urls: Vec, - peers: HashMap>, - is_leader: bool, - curp: CurpConfig, - client_config: ClientConfig, - server_timeout: ServerTimeout, - initial_cluster_state: InitialClusterState, - ) -> Self { - Self { - name, - peer_listen_urls, - peer_advertise_urls, - client_listen_urls, - client_advertise_urls, - peers, - is_leader, - curp_config: curp, - client_config, - server_timeout, - initial_cluster_state, - } - } -} - -/// Compaction configuration -#[derive(Clone, Copy, Debug, Deserialize, PartialEq, Eq, Getters)] -#[allow(clippy::module_name_repetitions)] -pub struct CompactConfig { - /// The max number of historical versions processed in a single compact operation - #[getset(get = "pub")] - #[serde(default = "default_compact_batch_size")] - compact_batch_size: usize, - /// The interval between two compaction batches - #[getset(get = "pub")] - #[serde(with = "duration_format", default = "default_compact_sleep_interval")] - compact_sleep_interval: Duration, - /// The auto compactor config - #[getset(get = "pub")] - auto_compact_config: Option, -} - -impl Default for CompactConfig { - #[inline] - fn default() -> Self { - Self { - compact_batch_size: default_compact_batch_size(), - compact_sleep_interval: default_compact_sleep_interval(), - auto_compact_config: None, - } - } -} - -impl CompactConfig { - /// Create a new compact config - #[must_use] - #[inline] - pub fn new( - compact_batch_size: usize, - compact_sleep_interval: Duration, - auto_compact_config: Option, - ) -> Self { - Self { - compact_batch_size, - compact_sleep_interval, - auto_compact_config, - } - } -} - -/// default compact batch size -#[must_use] -#[inline] -pub const fn default_compact_batch_size() -> usize { - 1000 -} - -/// default compact interval -#[must_use] -#[inline] -pub const fn default_compact_sleep_interval() -> Duration { - Duration::from_millis(10) -} - -/// Curp server timeout settings -#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Getters, Builder)] -#[allow(clippy::module_name_repetitions, clippy::exhaustive_structs)] -pub struct CurpConfig { - /// Heartbeat Interval - #[builder(default = "default_heartbeat_interval()")] - #[serde(with = "duration_format", default = "default_heartbeat_interval")] - pub heartbeat_interval: Duration, - - /// Curp wait sync timeout - #[builder(default = "default_server_wait_synced_timeout()")] - #[serde( - with = "duration_format", - default = "default_server_wait_synced_timeout" - )] - pub wait_synced_timeout: Duration, - - /// Curp propose retry count - #[builder(default = "default_retry_count()")] - #[serde(default = "default_retry_count")] - pub retry_count: usize, - - /// Curp rpc timeout - #[builder(default = "default_rpc_timeout()")] - #[serde(with = "duration_format", default = "default_rpc_timeout")] - pub rpc_timeout: Duration, - - /// Curp append entries batch timeout - /// If the `batch_timeout` has expired, then it will be dispatched - /// whether its size reaches the `BATCHING_MSG_MAX_SIZE` or not. - #[builder(default = "default_batch_timeout()")] - #[serde(with = "duration_format", default = "default_batch_timeout")] - pub batch_timeout: Duration, - - /// The maximum number of bytes per batch. - #[builder(default = "default_batch_max_size()")] - #[serde(with = "bytes_format", default = "default_batch_max_size")] - pub batch_max_size: u64, - - /// How many ticks a follower is allowed to miss before it starts a new round of election - /// The actual timeout will be randomized and in between heartbeat_interval * [follower_timeout_ticks, 2 * follower_timeout_ticks) - #[builder(default = "default_follower_timeout_ticks()")] - #[serde(default = "default_follower_timeout_ticks")] - pub follower_timeout_ticks: u8, - - /// How many ticks a candidate needs to wait before it starts a new round of election - /// It should be smaller than `follower_timeout_ticks` - /// The actual timeout will be randomized and in between heartbeat_interval * [candidate_timeout_ticks, 2 * candidate_timeout_ticks) - #[builder(default = "default_candidate_timeout_ticks()")] - #[serde(default = "default_candidate_timeout_ticks")] - pub candidate_timeout_ticks: u8, - - /// Curp storage path - #[builder(default = "EngineConfig::default()")] - #[serde(default = "EngineConfig::default")] - pub engine_cfg: EngineConfig, - - /// Number of command execute workers - #[builder(default = "default_cmd_workers()")] - #[serde(default = "default_cmd_workers")] - pub cmd_workers: u8, - - /// How often should the gc task run - #[builder(default = "default_gc_interval()")] - #[serde(with = "duration_format", default = "default_gc_interval")] - pub gc_interval: Duration, - - /// Number of log entries to keep in memory - #[builder(default = "default_log_entries_cap()")] - #[serde(default = "default_log_entries_cap")] - pub log_entries_cap: usize, -} - -/// default heartbeat interval -#[must_use] -#[inline] -pub const fn default_heartbeat_interval() -> Duration { - Duration::from_millis(300) -} - -/// default batch timeout -#[must_use] -#[inline] -pub const fn default_batch_timeout() -> Duration { - Duration::from_millis(15) -} - -/// default batch timeout -#[must_use] -#[inline] -#[allow(clippy::arithmetic_side_effects)] -pub const fn default_batch_max_size() -> u64 { - 2 * 1024 * 1024 -} - -/// default wait synced timeout -#[must_use] -#[inline] -pub const fn default_server_wait_synced_timeout() -> Duration { - Duration::from_secs(5) -} - -/// default initial retry timeout -#[must_use] -#[inline] -pub const fn default_initial_retry_timeout() -> Duration { - Duration::from_millis(1500) -} - -/// default max retry timeout -#[must_use] -#[inline] -pub const fn default_max_retry_timeout() -> Duration { - Duration::from_millis(10_000) -} - -/// default retry count -#[cfg(not(madsim))] -#[must_use] -#[inline] -pub const fn default_retry_count() -> usize { - 3 -} -/// default retry count -#[cfg(madsim)] -#[must_use] -#[inline] -pub const fn default_retry_count() -> usize { - 10 -} - -/// default use backoff -#[must_use] -#[inline] -pub const fn default_fixed_backoff() -> bool { - false -} - -/// default rpc timeout -#[must_use] -#[inline] -pub const fn default_rpc_timeout() -> Duration { - Duration::from_millis(150) -} - -/// default candidate timeout ticks -#[must_use] -#[inline] -pub const fn default_candidate_timeout_ticks() -> u8 { - 2 -} - -/// default client wait synced timeout -#[must_use] -#[inline] -pub const fn default_client_wait_synced_timeout() -> Duration { - Duration::from_secs(2) -} - -/// default client propose timeout -#[must_use] -#[inline] -pub const fn default_propose_timeout() -> Duration { - Duration::from_secs(1) -} - -/// default client id keep alive interval -#[must_use] -#[inline] -pub const fn default_client_id_keep_alive_interval() -> Duration { - Duration::from_secs(1) -} - -/// default follower timeout -#[must_use] -#[inline] -pub const fn default_follower_timeout_ticks() -> u8 { - 5 -} - -/// default number of execute workers -#[must_use] -#[inline] -pub const fn default_cmd_workers() -> u8 { - 8 -} - -/// default range retry timeout -#[must_use] -#[inline] -pub const fn default_range_retry_timeout() -> Duration { - Duration::from_secs(2) -} - -/// default compact timeout -#[must_use] -#[inline] -pub const fn default_compact_timeout() -> Duration { - Duration::from_secs(5) -} - -/// default sync victims interval -#[must_use] -#[inline] -pub const fn default_sync_victims_interval() -> Duration { - Duration::from_millis(10) -} - -/// default gc interval -#[must_use] -#[inline] -pub const fn default_gc_interval() -> Duration { - Duration::from_secs(20) -} - -/// default number of log entries to keep in memory -#[must_use] -#[inline] -pub const fn default_log_entries_cap() -> usize { - 5000 -} - -/// default watch progress notify interval -#[must_use] -#[inline] -pub const fn default_watch_progress_notify_interval() -> Duration { - Duration::from_secs(600) -} - -impl Default for CurpConfig { - #[inline] - fn default() -> Self { - Self { - heartbeat_interval: default_heartbeat_interval(), - wait_synced_timeout: default_server_wait_synced_timeout(), - retry_count: default_retry_count(), - rpc_timeout: default_rpc_timeout(), - batch_timeout: default_batch_timeout(), - batch_max_size: default_batch_max_size(), - follower_timeout_ticks: default_follower_timeout_ticks(), - candidate_timeout_ticks: default_candidate_timeout_ticks(), - engine_cfg: EngineConfig::default(), - cmd_workers: default_cmd_workers(), - gc_interval: default_gc_interval(), - log_entries_cap: default_log_entries_cap(), - } - } -} - -/// Curp client settings -#[derive(Copy, Clone, Debug, Deserialize, PartialEq, Eq, Getters)] -#[allow(clippy::module_name_repetitions)] -pub struct ClientConfig { - /// Curp client wait sync timeout - #[getset(get = "pub")] - #[serde( - with = "duration_format", - default = "default_client_wait_synced_timeout" - )] - wait_synced_timeout: Duration, - - /// Curp client propose request timeout - #[getset(get = "pub")] - #[serde(with = "duration_format", default = "default_propose_timeout")] - propose_timeout: Duration, - - /// Curp client initial retry interval - #[getset(get = "pub")] - #[serde(with = "duration_format", default = "default_initial_retry_timeout")] - initial_retry_timeout: Duration, - - /// Curp client max retry interval - #[getset(get = "pub")] - #[serde(with = "duration_format", default = "default_max_retry_timeout")] - max_retry_timeout: Duration, - - /// Curp client retry interval - #[getset(get = "pub")] - #[serde(default = "default_retry_count")] - retry_count: usize, - - /// Whether to use exponential backoff in retries - #[getset(get = "pub")] - #[serde(default = "default_fixed_backoff")] - fixed_backoff: bool, - - /// Curp client keep client id alive interval - #[getset(get = "pub")] - #[serde( - with = "duration_format", - default = "default_client_id_keep_alive_interval" - )] - keep_alive_interval: Duration, -} - -impl ClientConfig { - /// Create a new client timeout - /// - /// # Panics - /// - /// Panics if `initial_retry_timeout` is larger than `max_retry_timeout` - #[must_use] - #[inline] - pub fn new( - wait_synced_timeout: Duration, - propose_timeout: Duration, - initial_retry_timeout: Duration, - max_retry_timeout: Duration, - retry_count: usize, - fixed_backoff: bool, - keep_alive_interval: Duration, - ) -> Self { - assert!( - initial_retry_timeout <= max_retry_timeout, - "`initial_retry_timeout` should less or equal to `max_retry_timeout`" - ); - Self { - wait_synced_timeout, - propose_timeout, - initial_retry_timeout, - max_retry_timeout, - retry_count, - fixed_backoff, - keep_alive_interval, - } - } -} - -impl Default for ClientConfig { - #[inline] - fn default() -> Self { - Self { - wait_synced_timeout: default_client_wait_synced_timeout(), - propose_timeout: default_propose_timeout(), - initial_retry_timeout: default_initial_retry_timeout(), - max_retry_timeout: default_max_retry_timeout(), - retry_count: default_retry_count(), - fixed_backoff: default_fixed_backoff(), - keep_alive_interval: default_client_id_keep_alive_interval(), - } - } -} - -/// Xline server settings -#[derive(Copy, Clone, Debug, Deserialize, PartialEq, Eq, Getters)] -pub struct ServerTimeout { - /// Range request retry timeout settings - #[getset(get = "pub")] - #[serde(with = "duration_format", default = "default_range_retry_timeout")] - range_retry_timeout: Duration, - /// Range request retry timeout settings - #[getset(get = "pub")] - #[serde(with = "duration_format", default = "default_compact_timeout")] - compact_timeout: Duration, - /// Sync victims interval - #[getset(get = "pub")] - #[serde(with = "duration_format", default = "default_sync_victims_interval")] - sync_victims_interval: Duration, - /// Watch progress notify interval settings - #[getset(get = "pub")] - #[serde( - with = "duration_format", - default = "default_watch_progress_notify_interval" - )] - watch_progress_notify_interval: Duration, -} - -impl ServerTimeout { - /// Create a new server timeout - #[must_use] - #[inline] - pub fn new( - range_retry_timeout: Duration, - compact_timeout: Duration, - sync_victims_interval: Duration, - watch_progress_notify_interval: Duration, - ) -> Self { - Self { - range_retry_timeout, - compact_timeout, - sync_victims_interval, - watch_progress_notify_interval, - } - } -} - -impl Default for ServerTimeout { - #[inline] - fn default() -> Self { - Self { - range_retry_timeout: default_range_retry_timeout(), - compact_timeout: default_compact_timeout(), - sync_victims_interval: default_sync_victims_interval(), - watch_progress_notify_interval: default_watch_progress_notify_interval(), - } - } -} - -/// Auto Compactor Configuration -#[allow(clippy::module_name_repetitions)] -#[non_exhaustive] -#[derive(Debug, Clone, Copy, Deserialize, PartialEq, Eq)] -#[serde( - tag = "mode", - content = "retention", - rename_all(deserialize = "lowercase") -)] -pub enum AutoCompactConfig { - /// auto periodic compactor - #[serde(with = "duration_format")] - Periodic(Duration), - /// auto revision compactor - Revision(i64), -} - -/// Engine Configuration -#[allow(clippy::module_name_repetitions)] -#[non_exhaustive] -#[derive(Debug, Clone, Deserialize, PartialEq, Eq)] -#[serde( - tag = "type", - content = "data_dir", - rename_all(deserialize = "lowercase") -)] -pub enum EngineConfig { - /// Memory Storage Engine - Memory, - /// RocksDB Storage Engine - RocksDB(PathBuf), -} - -impl Default for EngineConfig { - #[inline] - fn default() -> Self { - Self::Memory - } -} - -/// /// Storage Configuration -#[derive(Debug, Clone, Deserialize, PartialEq, Eq)] -#[allow(clippy::module_name_repetitions)] -#[non_exhaustive] -pub struct StorageConfig { - /// Engine Configuration - #[serde(default = "EngineConfig::default")] - pub engine: EngineConfig, - /// Quota - #[serde(default = "default_quota")] - pub quota: u64, -} - -impl StorageConfig { - /// Create a new storage config - #[inline] - #[must_use] - pub fn new(engine: EngineConfig, quota: u64) -> Self { - Self { engine, quota } - } -} - -impl Default for StorageConfig { - #[inline] - fn default() -> Self { - Self { - engine: EngineConfig::default(), - quota: default_quota(), - } - } -} - -/// Default quota: 8GB -#[inline] -#[must_use] -pub fn default_quota() -> u64 { - // 8 * 1024 * 1024 * 1024 - 0x0002_0000_0000 -} - -/// Log configuration object -#[allow(clippy::module_name_repetitions)] -#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Getters)] -pub struct LogConfig { - /// Log file path - #[getset(get = "pub")] - #[serde(default)] - path: Option, - /// Log rotation strategy - #[getset(get = "pub")] - #[serde(with = "rotation_format", default = "default_rotation")] - rotation: RotationConfig, - /// Log verbosity level - #[getset(get = "pub")] - #[serde(with = "level_format", default = "default_log_level")] - level: LevelConfig, -} - -impl Default for LogConfig { - #[inline] - fn default() -> Self { - Self { - path: None, - rotation: default_rotation(), - level: default_log_level(), - } - } -} - -/// `LevelConfig` deserialization formatter -pub mod level_format { - use serde::{Deserialize, Deserializer}; - - use super::LevelConfig; - use crate::parse_log_level; - - /// deserializes a cluster duration - #[allow(single_use_lifetimes)] - pub(crate) fn deserialize<'de, D>(deserializer: D) -> Result - where - D: Deserializer<'de>, - { - let s = String::deserialize(deserializer)?; - parse_log_level(&s).map_err(serde::de::Error::custom) - } -} - -/// default log level -#[must_use] -#[inline] -pub const fn default_log_level() -> LevelConfig { - LevelConfig::INFO -} - -impl LogConfig { - /// Generate a new `LogConfig` object - #[must_use] - #[inline] - pub fn new(path: PathBuf, rotation: RotationConfig, level: LevelConfig) -> Self { - Self { - path: Some(path), - rotation, - level, - } - } -} - -/// Xline log rotation strategy -#[non_exhaustive] -#[allow(clippy::module_name_repetitions)] -#[derive(Copy, Clone, Debug, Deserialize, PartialEq, Eq)] -#[serde(rename_all(deserialize = "lowercase"))] -pub enum RotationConfig { - /// Rotate log file in every hour - Hourly, - /// Rotate log file every day - Daily, - /// Never rotate log file - Never, -} - -impl std::fmt::Display for RotationConfig { - #[inline] - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match *self { - RotationConfig::Hourly => write!(f, "hourly"), - RotationConfig::Daily => write!(f, "daily"), - RotationConfig::Never => write!(f, "never"), - } - } -} - -/// `RotationConfig` deserialization formatter -pub mod rotation_format { - use serde::{Deserialize, Deserializer}; - - use super::RotationConfig; - use crate::parse_rotation; - - /// deserializes a cluster log rotation strategy - #[allow(single_use_lifetimes)] - pub(crate) fn deserialize<'de, D>(deserializer: D) -> Result - where - D: Deserializer<'de>, - { - let s = String::deserialize(deserializer)?; - parse_rotation(&s).map_err(serde::de::Error::custom) - } -} - -/// default log rotation strategy -#[must_use] -#[inline] -pub const fn default_rotation() -> RotationConfig { - RotationConfig::Daily -} - -/// Generates a `RollingFileAppender` from the given `RotationConfig` and `name` -#[must_use] -#[inline] -pub fn file_appender( - rotation: RotationConfig, - file_path: &PathBuf, - name: &str, -) -> RollingFileAppender { - match rotation { - RotationConfig::Hourly => { - tracing_appender::rolling::hourly(file_path, format!("xline_{name}.log")) - } - RotationConfig::Daily => { - tracing_appender::rolling::daily(file_path, format!("xline_{name}.log")) - } - RotationConfig::Never => { - tracing_appender::rolling::never(file_path, format!("xline_{name}.log")) - } - #[allow(unreachable_patterns)] - // It's ok because `parse_rotation` have check the validity before. - _ => unreachable!("should not call file_appender when parse_rotation failed"), - } -} - -/// Xline tracing configuration object -#[allow(clippy::module_name_repetitions)] -#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Getters)] -pub struct TraceConfig { - /// Open jaeger online, sending data to jaeger agent directly - #[getset(get = "pub")] - jaeger_online: bool, - /// Open jaeger offline, saving data to the `jaeger_output_dir` - #[getset(get = "pub")] - jaeger_offline: bool, - /// The dir path to save the data when `jaeger_offline` is on - #[getset(get = "pub")] - jaeger_output_dir: PathBuf, - /// The verbosity level of tracing - #[getset(get = "pub")] - #[serde(with = "level_format", default = "default_log_level")] - jaeger_level: LevelConfig, -} - -impl Default for TraceConfig { - #[inline] - fn default() -> Self { - Self { - jaeger_online: false, - jaeger_offline: false, - jaeger_output_dir: "".into(), - jaeger_level: default_log_level(), - } - } -} - -impl TraceConfig { - /// Generate a new `TraceConfig` object - #[must_use] - #[inline] - pub fn new( - jaeger_online: bool, - jaeger_offline: bool, - jaeger_output_dir: PathBuf, - jaeger_level: LevelConfig, - ) -> Self { - Self { - jaeger_online, - jaeger_offline, - jaeger_output_dir, - jaeger_level, - } - } -} - -/// Xline tracing configuration object -#[allow(clippy::module_name_repetitions)] -#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Getters, Default)] -pub struct AuthConfig { - /// The public key file - #[getset(get = "pub")] - auth_public_key: Option, - /// The private key file - #[getset(get = "pub")] - auth_private_key: Option, -} - -impl AuthConfig { - /// Generate a new `AuthConfig` object - #[must_use] - #[inline] - pub fn new(auth_public_key: Option, auth_private_key: Option) -> Self { - Self { - auth_public_key, - auth_private_key, - } - } -} - -/// Xline tls configuration object -#[allow(clippy::module_name_repetitions)] -#[non_exhaustive] -#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Getters, Default)] -pub struct TlsConfig { - /// The CA certificate file used by peer to verify client certificates - #[getset(get = "pub")] - pub peer_ca_cert_path: Option, - /// The public key file used by peer - #[getset(get = "pub")] - pub peer_cert_path: Option, - /// The private key file used by peer - #[getset(get = "pub")] - pub peer_key_path: Option, - /// The CA certificate file used by client to verify peer certificates - #[getset(get = "pub")] - pub client_ca_cert_path: Option, - /// The public key file used by client - #[getset(get = "pub")] - pub client_cert_path: Option, - /// The private key file used by client - #[getset(get = "pub")] - pub client_key_path: Option, -} - -impl TlsConfig { - /// Create a new `TlsConfig` object - #[must_use] - #[inline] - pub fn new( - peer_ca_cert_path: Option, - peer_cert_path: Option, - peer_key_path: Option, - client_ca_cert_path: Option, - client_cert_path: Option, - client_key_path: Option, - ) -> Self { - Self { - peer_ca_cert_path, - peer_cert_path, - peer_key_path, - client_ca_cert_path, - client_cert_path, - client_key_path, - } - } - - /// Whether the server tls is enabled - #[must_use] - #[inline] - pub fn server_tls_enabled(&self) -> bool { - self.peer_cert_path.is_some() && self.peer_key_path.is_some() - } -} - -/// Xline metrics push protocol -#[non_exhaustive] -#[derive(Copy, Clone, Debug, Deserialize, PartialEq, Eq, Default)] -#[serde(rename_all(deserialize = "lowercase"))] -pub enum MetricsPushProtocol { - /// HTTP protocol - HTTP, - /// GRPC protocol - #[default] - GRPC, -} - -impl std::fmt::Display for MetricsPushProtocol { - #[inline] - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match *self { - MetricsPushProtocol::HTTP => write!(f, "http"), - MetricsPushProtocol::GRPC => write!(f, "grpc"), - } - } -} - -/// Metrics push protocol format -pub mod protocol_format { - use serde::{Deserialize, Deserializer}; - - use super::MetricsPushProtocol; - use crate::parse_metrics_push_protocol; - - /// deserializes a cluster duration - #[allow(single_use_lifetimes)] - pub(crate) fn deserialize<'de, D>(deserializer: D) -> Result - where - D: Deserializer<'de>, - { - let s = String::deserialize(deserializer)?; - parse_metrics_push_protocol(&s).map_err(serde::de::Error::custom) - } -} - -/// Xline metrics configuration object -#[allow(clippy::module_name_repetitions)] -#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Getters)] -pub struct MetricsConfig { - /// Enable or not - #[getset(get = "pub")] - #[serde(default = "default_metrics_enable")] - enable: bool, - /// The http port to expose - #[getset(get = "pub")] - #[serde(default = "default_metrics_port")] - port: u16, - /// The http path to expose - #[getset(get = "pub")] - #[serde(default = "default_metrics_path")] - path: String, - /// Enable push or not - #[getset(get = "pub")] - #[serde(default = "default_metrics_push")] - push: bool, - /// Push endpoint - #[getset(get = "pub")] - #[serde(default = "default_metrics_push_endpoint")] - push_endpoint: String, - /// Push protocol - #[getset(get = "pub")] - #[serde(with = "protocol_format", default = "default_metrics_push_protocol")] - push_protocol: MetricsPushProtocol, -} - -impl MetricsConfig { - /// Create a new `MetricsConfig` - #[must_use] - #[inline] - pub fn new( - enable: bool, - port: u16, - path: String, - push: bool, - push_endpoint: String, - push_protocol: MetricsPushProtocol, - ) -> Self { - Self { - enable, - port, - path, - push, - push_endpoint, - push_protocol, - } - } -} - -impl Default for MetricsConfig { - #[inline] - fn default() -> Self { - Self { - enable: default_metrics_enable(), - port: default_metrics_port(), - path: default_metrics_path(), - push: default_metrics_push(), - push_endpoint: default_metrics_push_endpoint(), - push_protocol: default_metrics_push_protocol(), - } - } -} - -/// Default metrics enable -#[must_use] -#[inline] -pub const fn default_metrics_enable() -> bool { - true -} - -/// Default metrics port -#[must_use] -#[inline] -pub const fn default_metrics_port() -> u16 { - 9100 -} - -/// Default metrics path -#[must_use] -#[inline] -pub fn default_metrics_path() -> String { - "/metrics".to_owned() -} - -/// Default metrics push option -#[must_use] -#[inline] -pub fn default_metrics_push() -> bool { - false -} - -/// Default metrics push protocol -#[must_use] -#[inline] -pub fn default_metrics_push_protocol() -> MetricsPushProtocol { - MetricsPushProtocol::GRPC -} - -/// Default metrics push endpoint -#[must_use] -#[inline] -pub fn default_metrics_push_endpoint() -> String { - "http://127.0.0.1:4318".to_owned() -} - -impl XlineServerConfig { - /// Generates a new `XlineServerConfig` object - #[must_use] - #[inline] - #[allow(clippy::too_many_arguments)] - pub fn new( - cluster: ClusterConfig, - storage: StorageConfig, - log: LogConfig, - trace: TraceConfig, - auth: AuthConfig, - compact: CompactConfig, - tls: TlsConfig, - metrics: MetricsConfig, - ) -> Self { - Self { - cluster, - storage, - log, - trace, - auth, - compact, - tls, - metrics, - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[allow(clippy::too_many_lines)] // just a testcase, not too bad - #[test] - fn test_xline_server_config_should_be_loaded() { - let config: XlineServerConfig = toml::from_str( - r#"[cluster] - name = 'node1' - is_leader = true - initial_cluster_state = 'new' - peer_listen_urls = ['127.0.0.1:2380'] - peer_advertise_urls = ['127.0.0.1:2380'] - client_listen_urls = ['127.0.0.1:2379'] - client_advertise_urls = ['127.0.0.1:2379'] - - [cluster.server_timeout] - range_retry_timeout = '3s' - compact_timeout = '5s' - sync_victims_interval = '20ms' - watch_progress_notify_interval = '1s' - - [cluster.peers] - node1 = ['127.0.0.1:2378', '127.0.0.1:2379'] - node2 = ['127.0.0.1:2380'] - node3 = ['127.0.0.1:2381'] - - [cluster.curp_config] - heartbeat_interval = '200ms' - wait_synced_timeout = '100ms' - rpc_timeout = '100ms' - retry_timeout = '100ms' - - [cluster.client_config] - initial_retry_timeout = '5s' - max_retry_timeout = '50s' - - [storage] - engine = { type = 'memory'} - - [compact] - compact_batch_size = 123 - compact_sleep_interval = '5ms' - - [compact.auto_compact_config] - mode = 'periodic' - retention = '10h' - - [log] - path = '/var/log/xline' - rotation = 'daily' - level = 'info' - - [trace] - jaeger_online = false - jaeger_offline = false - jaeger_output_dir = './jaeger_jsons' - jaeger_level = 'info' - - [auth] - auth_public_key = './public_key.pem' - auth_private_key = './private_key.pem' - - [tls] - peer_cert_path = './cert.pem' - peer_key_path = './key.pem' - client_ca_cert_path = './ca.pem' - - [metrics] - enable = true - port = 9100 - path = "/metrics" - push = true - push_endpoint = 'http://some-endpoint.com:4396' - push_protocol = 'http' - "#, - ) - .unwrap(); - - let curp_config = CurpConfigBuilder::default() - .heartbeat_interval(Duration::from_millis(200)) - .wait_synced_timeout(Duration::from_millis(100)) - .rpc_timeout(Duration::from_millis(100)) - .build() - .unwrap(); - - let client_config = ClientConfig::new( - default_client_wait_synced_timeout(), - default_propose_timeout(), - Duration::from_secs(5), - Duration::from_secs(50), - default_retry_count(), - default_fixed_backoff(), - default_client_id_keep_alive_interval(), - ); - - let server_timeout = ServerTimeout::new( - Duration::from_secs(3), - Duration::from_secs(5), - Duration::from_millis(20), - Duration::from_secs(1), - ); - - assert_eq!( - config.cluster, - ClusterConfig::new( - "node1".to_owned(), - vec!["127.0.0.1:2380".to_owned()], - vec!["127.0.0.1:2380".to_owned()], - vec!["127.0.0.1:2379".to_owned()], - vec!["127.0.0.1:2379".to_owned()], - HashMap::from_iter([ - ( - "node1".to_owned(), - vec!["127.0.0.1:2378".to_owned(), "127.0.0.1:2379".to_owned()] - ), - ("node2".to_owned(), vec!["127.0.0.1:2380".to_owned()]), - ("node3".to_owned(), vec!["127.0.0.1:2381".to_owned()]), - ]), - true, - curp_config, - client_config, - server_timeout, - InitialClusterState::New - ) - ); - - assert_eq!( - config.storage, - StorageConfig::new(EngineConfig::Memory, default_quota()) - ); - - assert_eq!( - config.log, - LogConfig::new( - PathBuf::from("/var/log/xline"), - RotationConfig::Daily, - LevelConfig::INFO - ) - ); - assert_eq!( - config.trace, - TraceConfig::new( - false, - false, - PathBuf::from("./jaeger_jsons"), - LevelConfig::INFO - ) - ); - - assert_eq!( - config.compact, - CompactConfig { - compact_batch_size: 123, - compact_sleep_interval: Duration::from_millis(5), - auto_compact_config: Some(AutoCompactConfig::Periodic(Duration::from_secs( - 10 * 60 * 60 - ))) - } - ); - - assert_eq!( - config.auth, - AuthConfig { - auth_private_key: Some(PathBuf::from("./private_key.pem")), - auth_public_key: Some(PathBuf::from("./public_key.pem")), - } - ); - - assert_eq!( - config.tls, - TlsConfig { - peer_cert_path: Some(PathBuf::from("./cert.pem")), - peer_key_path: Some(PathBuf::from("./key.pem")), - client_ca_cert_path: Some(PathBuf::from("./ca.pem")), - ..Default::default() - } - ); - - assert_eq!( - config.metrics, - MetricsConfig { - enable: true, - port: 9100, - path: "/metrics".to_owned(), - push: true, - push_endpoint: "http://some-endpoint.com:4396".to_owned(), - push_protocol: MetricsPushProtocol::HTTP, - }, - ); - } - - #[test] - fn test_xline_server_default_config_should_be_loaded() { - let config: XlineServerConfig = toml::from_str( - "[cluster] - name = 'node1' - is_leader = true - peer_listen_urls = ['127.0.0.1:2380'] - peer_advertise_urls = ['127.0.0.1:2380'] - client_listen_urls = ['127.0.0.1:2379'] - client_advertise_urls = ['127.0.0.1:2379'] - - [cluster.peers] - node1 = ['127.0.0.1:2379'] - node2 = ['127.0.0.1:2380'] - node3 = ['127.0.0.1:2381'] - - [cluster.storage] - - [log] - path = '/var/log/xline' - - [storage] - engine = { type = 'rocksdb', data_dir = '/usr/local/xline/data-dir' } - - [compact] - - [trace] - jaeger_online = false - jaeger_offline = false - jaeger_output_dir = './jaeger_jsons' - jaeger_level = 'info' - - [auth] - - [tls] - ", - ) - .unwrap(); - - assert_eq!( - config.cluster, - ClusterConfig::new( - "node1".to_owned(), - vec!["127.0.0.1:2380".to_owned()], - vec!["127.0.0.1:2380".to_owned()], - vec!["127.0.0.1:2379".to_owned()], - vec!["127.0.0.1:2379".to_owned()], - HashMap::from([ - ("node1".to_owned(), vec!["127.0.0.1:2379".to_owned()]), - ("node2".to_owned(), vec!["127.0.0.1:2380".to_owned()]), - ("node3".to_owned(), vec!["127.0.0.1:2381".to_owned()]), - ]), - true, - CurpConfigBuilder::default().build().unwrap(), - ClientConfig::default(), - ServerTimeout::default(), - InitialClusterState::default() - ) - ); - - if let EngineConfig::RocksDB(path) = config.storage.engine { - assert_eq!(path, PathBuf::from("/usr/local/xline/data-dir")); - } else { - unreachable!(); - } - - assert_eq!( - config.log, - LogConfig::new( - PathBuf::from("/var/log/xline"), - RotationConfig::Daily, - LevelConfig::INFO - ) - ); - assert_eq!( - config.trace, - TraceConfig::new( - false, - false, - PathBuf::from("./jaeger_jsons"), - LevelConfig::INFO - ) - ); - assert_eq!(config.compact, CompactConfig::default()); - assert_eq!(config.auth, AuthConfig::default()); - assert_eq!(config.tls, TlsConfig::default()); - assert_eq!(config.metrics, MetricsConfig::default()); - } - - #[test] - fn test_auto_revision_compactor_config_should_be_loaded() { - let config: XlineServerConfig = toml::from_str( - "[cluster] - name = 'node1' - is_leader = true - peer_listen_urls = ['127.0.0.1:2380'] - peer_advertise_urls = ['127.0.0.1:2380'] - client_listen_urls = ['127.0.0.1:2379'] - client_advertise_urls = ['127.0.0.1:2379'] - - [cluster.peers] - node1 = ['127.0.0.1:2379'] - node2 = ['127.0.0.1:2380'] - node3 = ['127.0.0.1:2381'] - - [cluster.storage] - - [log] - path = '/var/log/xline' - - [storage] - engine = { type = 'memory' } - - [compact] - - [compact.auto_compact_config] - mode = 'revision' - retention = 10000 - - [trace] - jaeger_online = false - jaeger_offline = false - jaeger_output_dir = './jaeger_jsons' - jaeger_level = 'info' - - [auth] - - [tls] - ", - ) - .unwrap(); - - assert_eq!( - config.compact, - CompactConfig { - auto_compact_config: Some(AutoCompactConfig::Revision(10000)), - ..Default::default() - } - ); - } -} diff --git a/crates/utils/src/config/auth.rs b/crates/utils/src/config/auth.rs new file mode 100644 index 000000000..6cdf6e649 --- /dev/null +++ b/crates/utils/src/config/auth.rs @@ -0,0 +1,58 @@ +use std::path::PathBuf; + +use getset::Getters; +use serde::Deserialize; + +/// Xline auth configuration object +#[allow(clippy::module_name_repetitions)] +#[non_exhaustive] +#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Getters, Default)] +pub struct AuthConfig { + /// The public key file + #[getset(get = "pub")] + pub auth_public_key: Option, + /// The private key file + #[getset(get = "pub")] + pub auth_private_key: Option, +} + +impl AuthConfig { + /// Creates a new `AuthConfigBuilder` for building `AuthConfig` objects. + pub fn new() -> AuthConfigBuilder { + AuthConfigBuilder { + auth_public_key: None, + auth_private_key: None, + } + } +} + +/// `AuthConfigBuilder` is a builder for `AuthConfig` objects. +#[derive(Debug)] +pub struct AuthConfigBuilder { + /// The public key file + auth_public_key: Option, + /// The private key file + auth_private_key: Option, +} + +impl AuthConfigBuilder { + /// Sets the public key file path for the `AuthConfig`. + pub fn auth_public_key(&mut self, path: PathBuf) -> &mut Self { + self.auth_public_key = Some(path); + self + } + + /// Sets the private key file path for the `AuthConfig`. + pub fn auth_private_key(&mut self, path: PathBuf) -> &mut Self { + self.auth_private_key = Some(path); + self + } + + /// Builds the `AuthConfig` object with the provided configurations. + pub fn build(&mut self) -> AuthConfig { + AuthConfig { + auth_public_key: self.auth_public_key.take(), + auth_private_key: self.auth_private_key.take(), + } + } +} diff --git a/crates/utils/src/config/client.rs b/crates/utils/src/config/client.rs new file mode 100644 index 000000000..8763f0304 --- /dev/null +++ b/crates/utils/src/config/client.rs @@ -0,0 +1,175 @@ +use std::time::Duration; + +use getset::Getters; +use serde::Deserialize; + +/// Curp client settings +#[derive(Copy, Clone, Debug, Deserialize, PartialEq, Eq, Getters)] +#[allow(clippy::module_name_repetitions)] +pub struct ClientConfig { + /// Curp client wait sync timeout + #[getset(get = "pub")] + #[serde( + with = "duration_format", + default = "default_client_wait_synced_timeout" + )] + wait_synced_timeout: Duration, + + /// Curp client propose request timeout + #[getset(get = "pub")] + #[serde(with = "duration_format", default = "default_propose_timeout")] + propose_timeout: Duration, + + /// Curp client initial retry interval + #[getset(get = "pub")] + #[serde(with = "duration_format", default = "default_initial_retry_timeout")] + initial_retry_timeout: Duration, + + /// Curp client max retry interval + #[getset(get = "pub")] + #[serde(with = "duration_format", default = "default_max_retry_timeout")] + max_retry_timeout: Duration, + + /// Curp client retry interval + #[getset(get = "pub")] + #[serde(default = "default_retry_count")] + retry_count: usize, + + /// Whether to use exponential backoff in retries + #[getset(get = "pub")] + #[serde(default = "default_fixed_backoff")] + fixed_backoff: bool, + + /// Curp client keep client id alive interval + #[getset(get = "pub")] + #[serde( + with = "duration_format", + default = "default_client_id_keep_alive_interval" + )] + keep_alive_interval: Duration, +} + +impl ClientConfig { + /// Create a new client timeout + /// + /// # Panics + /// + /// Panics if `initial_retry_timeout` is larger than `max_retry_timeout` + #[must_use] + #[inline] + pub fn new( + wait_synced_timeout: Duration, + propose_timeout: Duration, + initial_retry_timeout: Duration, + max_retry_timeout: Duration, + retry_count: usize, + fixed_backoff: bool, + keep_alive_interval: Duration, + ) -> Self { + assert!( + initial_retry_timeout <= max_retry_timeout, + "`initial_retry_timeout` should less or equal to `max_retry_timeout`" + ); + Self { + wait_synced_timeout, + propose_timeout, + initial_retry_timeout, + max_retry_timeout, + retry_count, + fixed_backoff, + keep_alive_interval, + } + } +} + +impl Default for ClientConfig { + #[inline] + fn default() -> Self { + Self { + wait_synced_timeout: default_client_wait_synced_timeout(), + propose_timeout: default_propose_timeout(), + initial_retry_timeout: default_initial_retry_timeout(), + max_retry_timeout: default_max_retry_timeout(), + retry_count: default_retry_count(), + fixed_backoff: default_fixed_backoff(), + keep_alive_interval: default_client_id_keep_alive_interval(), + } + } +} + +/// default client wait synced timeout +#[must_use] +#[inline] +pub const fn default_client_wait_synced_timeout() -> Duration { + Duration::from_secs(2) +} + +/// default client propose timeout +#[must_use] +#[inline] +pub const fn default_propose_timeout() -> Duration { + Duration::from_secs(1) +} + +/// default client id keep alive interval +#[must_use] +#[inline] +pub const fn default_client_id_keep_alive_interval() -> Duration { + Duration::from_secs(1) +} + +/// default initial retry timeout +#[must_use] +#[inline] +pub const fn default_initial_retry_timeout() -> Duration { + Duration::from_millis(1500) +} + +/// default max retry timeout +#[must_use] +#[inline] +pub const fn default_max_retry_timeout() -> Duration { + Duration::from_millis(10_000) +} + +/// default retry count +#[cfg(not(madsim))] +#[must_use] +#[inline] +pub const fn default_retry_count() -> usize { + 3 +} + +/// default retry count +#[cfg(madsim)] +#[must_use] +#[inline] +pub const fn default_retry_count() -> usize { + 10 +} + +/// default use backoff +#[must_use] +#[inline] +pub const fn default_fixed_backoff() -> bool { + false +} + +/// `Duration` deserialization formatter +pub mod duration_format { + use std::time::Duration; + + use serde::{Deserialize, Deserializer}; + + use crate::parse_duration; + + /// deserializes a cluster duration + #[allow(single_use_lifetimes)] // the false positive case blocks us + pub(crate) fn deserialize<'de, D>(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + let s = String::deserialize(deserializer)?; + parse_duration(&s).map_err(serde::de::Error::custom) + } +} diff --git a/crates/utils/src/config/cluster.rs b/crates/utils/src/config/cluster.rs new file mode 100644 index 000000000..62f2fca95 --- /dev/null +++ b/crates/utils/src/config/cluster.rs @@ -0,0 +1,138 @@ +use std::collections::HashMap; + +use getset::Getters; +use serde::Deserialize; + +use super::{client::ClientConfig, curp::CurpConfig, server::ServerTimeout}; + +/// Cluster configuration object, including cluster relevant configuration fields +#[allow(clippy::module_name_repetitions)] +#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Getters)] +pub struct ClusterConfig { + /// Get xline server name + #[getset(get = "pub")] + name: String, + /// Xline server peer listen urls + #[getset(get = "pub")] + peer_listen_urls: Vec, + /// Xline server peer advertise urls + #[getset(get = "pub")] + peer_advertise_urls: Vec, + /// Xline server client listen urls + #[getset(get = "pub")] + client_listen_urls: Vec, + /// Xline server client advertise urls + #[getset(get = "pub")] + client_advertise_urls: Vec, + /// All the nodes in the xline cluster + #[getset(get = "pub")] + peers: HashMap>, + /// Leader node. + #[getset(get = "pub")] + is_leader: bool, + /// Curp server timeout settings + #[getset(get = "pub")] + #[serde(default = "CurpConfig::default")] + curp_config: CurpConfig, + /// Curp client config settings + #[getset(get = "pub")] + #[serde(default = "ClientConfig::default")] + client_config: ClientConfig, + /// Xline server timeout settings + #[getset(get = "pub")] + #[serde(default = "ServerTimeout::default")] + server_timeout: ServerTimeout, + /// Xline server initial state + #[getset(get = "pub")] + #[serde(with = "state_format", default = "InitialClusterState::default")] + initial_cluster_state: InitialClusterState, +} + +impl Default for ClusterConfig { + #[inline] + fn default() -> Self { + Self { + name: "default".to_owned(), + peer_listen_urls: vec!["http://127.0.0.1:2380".to_owned()], + peer_advertise_urls: vec!["http://127.0.0.1:2380".to_owned()], + client_listen_urls: vec!["http://127.0.0.1:2379".to_owned()], + client_advertise_urls: vec!["http://127.0.0.1:2379".to_owned()], + peers: HashMap::from([( + "default".to_owned(), + vec!["http://127.0.0.1:2379".to_owned()], + )]), + is_leader: false, + curp_config: CurpConfig::default(), + client_config: ClientConfig::default(), + server_timeout: ServerTimeout::default(), + initial_cluster_state: InitialClusterState::default(), + } + } +} + +impl ClusterConfig { + /// Generate a new `ClusterConfig` object + #[must_use] + #[inline] + #[allow(clippy::too_many_arguments)] + pub fn new( + name: String, + peer_listen_urls: Vec, + peer_advertise_urls: Vec, + client_listen_urls: Vec, + client_advertise_urls: Vec, + members: HashMap>, + is_leader: bool, + curp: CurpConfig, + client_config: ClientConfig, + server_timeout: ServerTimeout, + initial_cluster_state: InitialClusterState, + ) -> Self { + Self { + name, + peer_listen_urls, + peer_advertise_urls, + client_listen_urls, + client_advertise_urls, + peers: members, + is_leader, + curp_config: curp, + client_config, + server_timeout, + initial_cluster_state, + } + } +} + +/// Cluster Range type alias +#[allow(clippy::module_name_repetitions)] +pub type ClusterRange = std::ops::Range; + +/// Initial cluster state of xline server +#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Deserialize)] +#[non_exhaustive] +pub enum InitialClusterState { + /// Create a new cluster + #[default] + New, + /// Join an existing cluster + Existing, +} + +/// `InitialClusterState` deserialization formatter +pub mod state_format { + use serde::{Deserialize, Deserializer}; + + use super::InitialClusterState; + use crate::parse_state; + + /// deserializes a cluster log rotation strategy + #[allow(single_use_lifetimes)] + pub(crate) fn deserialize<'de, D>(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + let s = String::deserialize(deserializer)?; + parse_state(&s).map_err(serde::de::Error::custom) + } +} diff --git a/crates/utils/src/config/compact.rs b/crates/utils/src/config/compact.rs new file mode 100644 index 000000000..8fcea72b3 --- /dev/null +++ b/crates/utils/src/config/compact.rs @@ -0,0 +1,100 @@ +use std::time::Duration; + +use getset::Getters; +use serde::Deserialize; + +/// Compaction configuration +#[derive(Clone, Copy, Debug, Deserialize, PartialEq, Eq, Getters)] +#[allow(clippy::module_name_repetitions)] +#[non_exhaustive] +pub struct CompactConfig { + /// The max number of historical versions processed in a single compact operation + #[getset(get = "pub")] + #[serde(default = "default_compact_batch_size")] + pub compact_batch_size: usize, + /// The interval between two compaction batches + #[getset(get = "pub")] + #[serde(with = "duration_format", default = "default_compact_sleep_interval")] + pub compact_sleep_interval: Duration, + /// The auto compactor config + #[getset(get = "pub")] + pub auto_compact_config: Option, +} + +impl Default for CompactConfig { + #[inline] + fn default() -> Self { + Self { + compact_batch_size: default_compact_batch_size(), + compact_sleep_interval: default_compact_sleep_interval(), + auto_compact_config: None, + } + } +} + +impl CompactConfig { + /// Create a new compact config + #[must_use] + #[inline] + pub fn new( + compact_batch_size: usize, + compact_sleep_interval: Duration, + auto_compact_config: Option, + ) -> Self { + Self { + compact_batch_size, + compact_sleep_interval, + auto_compact_config, + } + } +} + +/// Auto Compactor Configuration +#[allow(clippy::module_name_repetitions)] +#[non_exhaustive] +#[derive(Debug, Clone, Copy, Deserialize, PartialEq, Eq)] +#[serde( + tag = "mode", + content = "retention", + rename_all(deserialize = "lowercase") +)] +pub enum AutoCompactConfig { + /// auto periodic compactor + #[serde(with = "duration_format")] + Periodic(Duration), + /// auto revision compactor + Revision(i64), +} + +/// default compact batch size +#[must_use] +#[inline] +pub const fn default_compact_batch_size() -> usize { + 1000 +} + +/// default compact interval +#[must_use] +#[inline] +pub const fn default_compact_sleep_interval() -> Duration { + Duration::from_millis(10) +} + +/// `Duration` deserialization formatter +pub mod duration_format { + use std::time::Duration; + + use serde::{Deserialize, Deserializer}; + + use crate::parse_duration; + + /// deserializes a cluster duration + #[allow(single_use_lifetimes)] // the false positive case blocks us + pub(crate) fn deserialize<'de, D>(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + let s = String::deserialize(deserializer)?; + parse_duration(&s).map_err(serde::de::Error::custom) + } +} diff --git a/crates/utils/src/config/curp.rs b/crates/utils/src/config/curp.rs new file mode 100644 index 000000000..e4b234e87 --- /dev/null +++ b/crates/utils/src/config/curp.rs @@ -0,0 +1,223 @@ +use std::time::Duration; + +use derive_builder::Builder; +use getset::Getters; +use serde::Deserialize; + +use super::engine::EngineConfig; + +/// Curp server timeout settings +#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Getters, Builder)] +#[allow(clippy::module_name_repetitions, clippy::exhaustive_structs)] +pub struct CurpConfig { + /// Heartbeat Interval + #[builder(default = "default_heartbeat_interval()")] + #[serde(with = "duration_format", default = "default_heartbeat_interval")] + pub heartbeat_interval: Duration, + + /// Curp wait sync timeout + #[builder(default = "default_server_wait_synced_timeout()")] + #[serde( + with = "duration_format", + default = "default_server_wait_synced_timeout" + )] + pub wait_synced_timeout: Duration, + + /// Curp propose retry count + #[builder(default = "default_retry_count()")] + #[serde(default = "default_retry_count")] + pub retry_count: usize, + + /// Curp rpc timeout + #[builder(default = "default_rpc_timeout()")] + #[serde(with = "duration_format", default = "default_rpc_timeout")] + pub rpc_timeout: Duration, + + /// Curp append entries batch timeout + /// If the `batch_timeout` has expired, then it will be dispatched + /// whether its size reaches the `BATCHING_MSG_MAX_SIZE` or not. + #[builder(default = "default_batch_timeout()")] + #[serde(with = "duration_format", default = "default_batch_timeout")] + pub batch_timeout: Duration, + + /// The maximum number of bytes per batch. + #[builder(default = "default_batch_max_size()")] + #[serde(with = "bytes_format", default = "default_batch_max_size")] + pub batch_max_size: u64, + + /// How many ticks a follower is allowed to miss before it starts a new round of election + /// The actual timeout will be randomized and in between heartbeat_interval * [follower_timeout_ticks, 2 * follower_timeout_ticks) + #[builder(default = "default_follower_timeout_ticks()")] + #[serde(default = "default_follower_timeout_ticks")] + pub follower_timeout_ticks: u8, + + /// How many ticks a candidate needs to wait before it starts a new round of election + /// It should be smaller than `follower_timeout_ticks` + /// The actual timeout will be randomized and in between heartbeat_interval * [candidate_timeout_ticks, 2 * candidate_timeout_ticks) + #[builder(default = "default_candidate_timeout_ticks()")] + #[serde(default = "default_candidate_timeout_ticks")] + pub candidate_timeout_ticks: u8, + + /// Curp storage path + #[builder(default = "EngineConfig::default()")] + #[serde(default = "EngineConfig::default")] + pub engine_cfg: EngineConfig, + + /// Number of command execute workers + #[builder(default = "default_cmd_workers()")] + #[serde(default = "default_cmd_workers")] + pub cmd_workers: u8, + + /// How often should the gc task run + #[builder(default = "default_gc_interval()")] + #[serde(with = "duration_format", default = "default_gc_interval")] + pub gc_interval: Duration, + + /// Number of log entries to keep in memory + #[builder(default = "default_log_entries_cap()")] + #[serde(default = "default_log_entries_cap")] + pub log_entries_cap: usize, +} + +impl Default for CurpConfig { + #[inline] + fn default() -> Self { + Self { + heartbeat_interval: default_heartbeat_interval(), + wait_synced_timeout: default_server_wait_synced_timeout(), + retry_count: default_retry_count(), + rpc_timeout: default_rpc_timeout(), + batch_timeout: default_batch_timeout(), + batch_max_size: default_batch_max_size(), + follower_timeout_ticks: default_follower_timeout_ticks(), + candidate_timeout_ticks: default_candidate_timeout_ticks(), + engine_cfg: EngineConfig::default(), + cmd_workers: default_cmd_workers(), + gc_interval: default_gc_interval(), + log_entries_cap: default_log_entries_cap(), + } + } +} + +/// default heartbeat interval +#[must_use] +#[inline] +pub const fn default_heartbeat_interval() -> Duration { + Duration::from_millis(300) +} + +/// default wait synced timeout +#[must_use] +#[inline] +pub const fn default_server_wait_synced_timeout() -> Duration { + Duration::from_secs(5) +} + +/// default retry count +#[cfg(not(madsim))] +#[must_use] +#[inline] +pub const fn default_retry_count() -> usize { + 3 +} + +/// default retry count +#[cfg(madsim)] +#[must_use] +#[inline] +pub const fn default_retry_count() -> usize { + 10 +} + +/// default rpc timeout +#[must_use] +#[inline] +pub const fn default_rpc_timeout() -> Duration { + Duration::from_millis(150) +} + +/// default batch timeout +#[must_use] +#[inline] +pub const fn default_batch_timeout() -> Duration { + Duration::from_millis(15) +} + +/// default batch timeout +#[must_use] +#[inline] +#[allow(clippy::arithmetic_side_effects)] +pub const fn default_batch_max_size() -> u64 { + 2 * 1024 * 1024 +} + +/// default follower timeout +#[must_use] +#[inline] +pub const fn default_follower_timeout_ticks() -> u8 { + 5 +} + +/// default candidate timeout ticks +#[must_use] +#[inline] +pub const fn default_candidate_timeout_ticks() -> u8 { + 2 +} + +/// default number of execute workers +#[must_use] +#[inline] +pub const fn default_cmd_workers() -> u8 { + 8 +} + +/// default gc interval +#[must_use] +#[inline] +pub const fn default_gc_interval() -> Duration { + Duration::from_secs(20) +} + +/// default number of log entries to keep in memory +#[must_use] +#[inline] +pub const fn default_log_entries_cap() -> usize { + 5000 +} + +/// `Duration` deserialization formatter +pub mod duration_format { + use std::time::Duration; + + use serde::{Deserialize, Deserializer}; + + use crate::parse_duration; + + /// deserializes a cluster duration + #[allow(single_use_lifetimes)] // the false positive case blocks us + pub(crate) fn deserialize<'de, D>(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + let s = String::deserialize(deserializer)?; + parse_duration(&s).map_err(serde::de::Error::custom) + } +} + +/// batch size deserialization formatter +pub mod bytes_format { + use serde::{Deserialize, Deserializer}; + + use crate::parse_batch_bytes; + + /// deserializes a cluster duration + #[allow(single_use_lifetimes)] // the false positive case blocks us + pub(crate) fn deserialize<'de, D>(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + let s = String::deserialize(deserializer)?; + parse_batch_bytes(&s).map_err(serde::de::Error::custom) + } +} diff --git a/crates/utils/src/config/engine.rs b/crates/utils/src/config/engine.rs new file mode 100644 index 000000000..12a8a5057 --- /dev/null +++ b/crates/utils/src/config/engine.rs @@ -0,0 +1,26 @@ +use std::path::PathBuf; + +use serde::Deserialize; + +/// Engine Configuration +#[allow(clippy::module_name_repetitions)] +#[non_exhaustive] +#[derive(Debug, Clone, Deserialize, PartialEq, Eq)] +#[serde( + tag = "type", + content = "data_dir", + rename_all(deserialize = "lowercase") +)] +pub enum EngineConfig { + /// Memory Storage Engine + Memory, + /// RocksDB Storage Engine + RocksDB(PathBuf), +} + +impl Default for EngineConfig { + #[inline] + fn default() -> Self { + Self::Memory + } +} diff --git a/crates/utils/src/config/log.rs b/crates/utils/src/config/log.rs new file mode 100644 index 000000000..29b74d527 --- /dev/null +++ b/crates/utils/src/config/log.rs @@ -0,0 +1,150 @@ +use std::path::PathBuf; + +use getset::Getters; +use serde::Deserialize; +use tracing_appender::rolling::RollingFileAppender; + +/// Log verbosity level alias +#[allow(clippy::module_name_repetitions)] +pub type LevelConfig = tracing::metadata::LevelFilter; + +/// Log configuration object +#[allow(clippy::module_name_repetitions)] +#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Getters)] +pub struct LogConfig { + /// Log file path + #[getset(get = "pub")] + #[serde(default)] + path: Option, + /// Log rotation strategy + #[getset(get = "pub")] + #[serde(with = "rotation_format", default = "default_rotation")] + rotation: RotationConfig, + /// Log verbosity level + #[getset(get = "pub")] + #[serde(with = "level_format", default = "default_log_level")] + level: LevelConfig, +} + +impl Default for LogConfig { + #[inline] + fn default() -> Self { + Self { + path: None, + rotation: default_rotation(), + level: default_log_level(), + } + } +} + +/// `LevelConfig` deserialization formatter +pub mod level_format { + use serde::{Deserialize, Deserializer}; + + use super::LevelConfig; + use crate::parse_log_level; + + /// deserializes a cluster duration + #[allow(single_use_lifetimes)] + pub(crate) fn deserialize<'de, D>(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + let s = String::deserialize(deserializer)?; + parse_log_level(&s).map_err(serde::de::Error::custom) + } +} + +/// default log level +#[must_use] +#[inline] +pub const fn default_log_level() -> LevelConfig { + LevelConfig::INFO +} + +impl LogConfig { + /// Generate a new `LogConfig` object + #[must_use] + #[inline] + pub fn new(path: PathBuf, rotation: RotationConfig, level: LevelConfig) -> Self { + Self { + path: Some(path), + rotation, + level, + } + } +} + +/// Xline log rotation strategy +#[non_exhaustive] +#[allow(clippy::module_name_repetitions)] +#[derive(Copy, Clone, Debug, Deserialize, PartialEq, Eq)] +#[serde(rename_all(deserialize = "lowercase"))] +pub enum RotationConfig { + /// Rotate log file in every hour + Hourly, + /// Rotate log file every day + Daily, + /// Never rotate log file + Never, +} + +impl std::fmt::Display for RotationConfig { + #[inline] + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match *self { + RotationConfig::Hourly => write!(f, "hourly"), + RotationConfig::Daily => write!(f, "daily"), + RotationConfig::Never => write!(f, "never"), + } + } +} + +/// `RotationConfig` deserialization formatter +pub mod rotation_format { + use serde::{Deserialize, Deserializer}; + + use super::RotationConfig; + use crate::parse_rotation; + + /// deserializes a cluster log rotation strategy + #[allow(single_use_lifetimes)] + pub(crate) fn deserialize<'de, D>(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + let s = String::deserialize(deserializer)?; + parse_rotation(&s).map_err(serde::de::Error::custom) + } +} + +/// default log rotation strategy +#[must_use] +#[inline] +pub const fn default_rotation() -> RotationConfig { + RotationConfig::Daily +} + +/// Generates a `RollingFileAppender` from the given `RotationConfig` and `name` +#[must_use] +#[inline] +pub fn file_appender( + rotation: RotationConfig, + file_path: &PathBuf, + name: &str, +) -> RollingFileAppender { + match rotation { + RotationConfig::Hourly => { + tracing_appender::rolling::hourly(file_path, format!("xline_{name}.log")) + } + RotationConfig::Daily => { + tracing_appender::rolling::daily(file_path, format!("xline_{name}.log")) + } + RotationConfig::Never => { + tracing_appender::rolling::never(file_path, format!("xline_{name}.log")) + } + #[allow(unreachable_patterns)] + // It's ok because `parse_rotation` have check the validity before. + _ => unreachable!("should not call file_appender when parse_rotation failed"), + } +} diff --git a/crates/utils/src/config/metrics.rs b/crates/utils/src/config/metrics.rs new file mode 100644 index 000000000..614451791 --- /dev/null +++ b/crates/utils/src/config/metrics.rs @@ -0,0 +1,153 @@ +use getset::Getters; +use serde::Deserialize; + +/// Xline metrics configuration object +#[allow(clippy::module_name_repetitions)] +#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Getters)] +#[non_exhaustive] +pub struct MetricsConfig { + /// Enable or not + #[getset(get = "pub")] + #[serde(default = "default_metrics_enable")] + pub enable: bool, + /// The http port to expose + #[getset(get = "pub")] + #[serde(default = "default_metrics_port")] + pub port: u16, + /// The http path to expose + #[getset(get = "pub")] + #[serde(default = "default_metrics_path")] + pub path: String, + /// Enable push or not + #[getset(get = "pub")] + #[serde(default = "default_metrics_push")] + pub push: bool, + /// Push endpoint + #[getset(get = "pub")] + #[serde(default = "default_metrics_push_endpoint")] + pub push_endpoint: String, + /// Push protocol + #[getset(get = "pub")] + #[serde(with = "protocol_format", default = "default_metrics_push_protocol")] + pub push_protocol: MetricsPushProtocol, +} + +impl MetricsConfig { + /// Create a new `MetricsConfig` + #[must_use] + #[inline] + pub fn new( + enable: bool, + port: u16, + path: String, + push: bool, + push_endpoint: String, + push_protocol: MetricsPushProtocol, + ) -> Self { + Self { + enable, + port, + path, + push, + push_endpoint, + push_protocol, + } + } +} + +impl Default for MetricsConfig { + #[inline] + fn default() -> Self { + Self { + enable: default_metrics_enable(), + port: default_metrics_port(), + path: default_metrics_path(), + push: default_metrics_push(), + push_endpoint: default_metrics_push_endpoint(), + push_protocol: default_metrics_push_protocol(), + } + } +} + +/// Default metrics enable +#[must_use] +#[inline] +pub const fn default_metrics_enable() -> bool { + true +} + +/// Default metrics port +#[must_use] +#[inline] +pub const fn default_metrics_port() -> u16 { + 9100 +} + +/// Default metrics path +#[must_use] +#[inline] +pub fn default_metrics_path() -> String { + "/metrics".to_owned() +} + +/// Default metrics push option +#[must_use] +#[inline] +pub fn default_metrics_push() -> bool { + false +} + +/// Default metrics push protocol +#[must_use] +#[inline] +pub fn default_metrics_push_protocol() -> MetricsPushProtocol { + MetricsPushProtocol::GRPC +} + +/// Default metrics push endpoint +#[must_use] +#[inline] +pub fn default_metrics_push_endpoint() -> String { + "http://127.0.0.1:4318".to_owned() +} + +/// Xline metrics push protocol +#[non_exhaustive] +#[derive(Copy, Clone, Debug, Deserialize, PartialEq, Eq, Default)] +#[serde(rename_all(deserialize = "lowercase"))] +#[allow(clippy::module_name_repetitions)] +pub enum MetricsPushProtocol { + /// HTTP protocol + HTTP, + /// GRPC protocol + #[default] + GRPC, +} + +impl std::fmt::Display for MetricsPushProtocol { + #[inline] + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match *self { + MetricsPushProtocol::HTTP => write!(f, "http"), + MetricsPushProtocol::GRPC => write!(f, "grpc"), + } + } +} + +/// Metrics push protocol format +pub mod protocol_format { + use serde::{Deserialize, Deserializer}; + + use super::MetricsPushProtocol; + use crate::parse_metrics_push_protocol; + + /// deserializes a cluster duration + #[allow(single_use_lifetimes)] + pub(crate) fn deserialize<'de, D>(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + let s = String::deserialize(deserializer)?; + parse_metrics_push_protocol(&s).map_err(serde::de::Error::custom) + } +} diff --git a/crates/utils/src/config/mod.rs b/crates/utils/src/config/mod.rs new file mode 100644 index 000000000..8c4e57a99 --- /dev/null +++ b/crates/utils/src/config/mod.rs @@ -0,0 +1,442 @@ +/// Xline auth configuration module +pub mod auth; +/// Curp client module +pub mod client; +/// Cluster configuration module +pub mod cluster; +/// Compaction configuration module +pub mod compact; +/// Curp server module +pub mod curp; +/// Engine Configuration module +pub mod engine; +/// Log configuration module +pub mod log; +/// Xline metrics configuration module +pub mod metrics; +/// Xline server module +pub mod server; +/// Storage Configuration module +pub mod storage; +/// Xline tls configuration module +pub mod tls; +/// Xline tracing configuration module +pub mod trace; + +use getset::Getters; +use serde::Deserialize; + +use crate::config::{ + auth::AuthConfig, cluster::ClusterConfig, compact::CompactConfig, log::LogConfig, + metrics::MetricsConfig, storage::StorageConfig, tls::TlsConfig, trace::TraceConfig, +}; + +/// Xline server configuration object +#[allow(clippy::module_name_repetitions)] +#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Getters, Default)] +pub struct XlineServerConfig { + /// cluster configuration object + #[getset(get = "pub")] + cluster: ClusterConfig, + /// xline storage configuration object + #[getset(get = "pub")] + storage: StorageConfig, + /// log configuration object + #[getset(get = "pub")] + log: LogConfig, + /// trace configuration object + #[getset(get = "pub")] + trace: TraceConfig, + /// auth configuration object + #[getset(get = "pub")] + auth: AuthConfig, + /// compactor configuration object + #[getset(get = "pub")] + compact: CompactConfig, + /// tls configuration object + #[getset(get = "pub")] + tls: TlsConfig, + /// Metrics config + #[getset(get = "pub")] + #[serde(default = "MetricsConfig::default")] + metrics: MetricsConfig, +} + +impl XlineServerConfig { + /// Generates a new `XlineServerConfig` object + #[must_use] + #[inline] + #[allow(clippy::too_many_arguments)] + pub fn new( + cluster: ClusterConfig, + storage: StorageConfig, + log: LogConfig, + trace: TraceConfig, + auth: AuthConfig, + compact: CompactConfig, + tls: TlsConfig, + metrics: MetricsConfig, + ) -> Self { + Self { + cluster, + storage, + log, + trace, + auth, + compact, + tls, + metrics, + } + } +} + +#[cfg(test)] +mod tests { + use std::{collections::HashMap, path::PathBuf, time::Duration}; + + use super::*; + use crate::{ + config::{ + client::{ + default_client_id_keep_alive_interval, default_client_wait_synced_timeout, + default_fixed_backoff, default_propose_timeout, default_retry_count, ClientConfig, + }, + compact::AutoCompactConfig, + curp::CurpConfigBuilder, + engine::EngineConfig, + server::ServerTimeout, + storage::default_quota, + }, + InitialClusterState, LevelConfig, MetricsPushProtocol, RotationConfig, + }; + + #[allow(clippy::too_many_lines)] // just a testcase, not too bad + #[test] + fn test_xline_server_config_should_be_loaded() { + let config: XlineServerConfig = toml::from_str( + r#"[cluster] + name = 'node1' + is_leader = true + initial_cluster_state = 'new' + peer_listen_urls = ['127.0.0.1:2380'] + peer_advertise_urls = ['127.0.0.1:2380'] + client_listen_urls = ['127.0.0.1:2379'] + client_advertise_urls = ['127.0.0.1:2379'] + + [cluster.server_timeout] + range_retry_timeout = '3s' + compact_timeout = '5s' + sync_victims_interval = '20ms' + watch_progress_notify_interval = '1s' + + [cluster.peers] + node1 = ['127.0.0.1:2378', '127.0.0.1:2379'] + node2 = ['127.0.0.1:2380'] + node3 = ['127.0.0.1:2381'] + + [cluster.curp_config] + heartbeat_interval = '200ms' + wait_synced_timeout = '100ms' + rpc_timeout = '100ms' + retry_timeout = '100ms' + + [cluster.client_config] + initial_retry_timeout = '5s' + max_retry_timeout = '50s' + + [storage] + engine = { type = 'memory'} + + [compact] + compact_batch_size = 123 + compact_sleep_interval = '5ms' + + [compact.auto_compact_config] + mode = 'periodic' + retention = '10h' + + [log] + path = '/var/log/xline' + rotation = 'daily' + level = 'info' + + [trace] + jaeger_online = false + jaeger_offline = false + jaeger_output_dir = './jaeger_jsons' + jaeger_level = 'info' + + [auth] + auth_public_key = './public_key.pem' + auth_private_key = './private_key.pem' + + [tls] + peer_cert_path = './cert.pem' + peer_key_path = './key.pem' + client_ca_cert_path = './ca.pem' + + [metrics] + enable = true + port = 9100 + path = "/metrics" + push = true + push_endpoint = 'http://some-endpoint.com:4396' + push_protocol = 'http' + "#, + ) + .unwrap(); + + let curp_config = CurpConfigBuilder::default() + .heartbeat_interval(Duration::from_millis(200)) + .wait_synced_timeout(Duration::from_millis(100)) + .rpc_timeout(Duration::from_millis(100)) + .build() + .unwrap(); + + let client_config = ClientConfig::new( + default_client_wait_synced_timeout(), + default_propose_timeout(), + Duration::from_secs(5), + Duration::from_secs(50), + default_retry_count(), + default_fixed_backoff(), + default_client_id_keep_alive_interval(), + ); + + let server_timeout = ServerTimeout::new( + Duration::from_secs(3), + Duration::from_secs(5), + Duration::from_millis(20), + Duration::from_secs(1), + ); + + assert_eq!( + config.cluster, + ClusterConfig::new( + "node1".to_owned(), + vec!["127.0.0.1:2380".to_owned()], + vec!["127.0.0.1:2380".to_owned()], + vec!["127.0.0.1:2379".to_owned()], + vec!["127.0.0.1:2379".to_owned()], + HashMap::from_iter([ + ( + "node1".to_owned(), + vec!["127.0.0.1:2378".to_owned(), "127.0.0.1:2379".to_owned()] + ), + ("node2".to_owned(), vec!["127.0.0.1:2380".to_owned()]), + ("node3".to_owned(), vec!["127.0.0.1:2381".to_owned()]), + ]), + true, + curp_config, + client_config, + server_timeout, + InitialClusterState::New + ) + ); + + assert_eq!( + config.storage, + StorageConfig::new(EngineConfig::Memory, default_quota()) + ); + + assert_eq!( + config.log, + LogConfig::new( + PathBuf::from("/var/log/xline"), + RotationConfig::Daily, + LevelConfig::INFO + ) + ); + assert_eq!( + config.trace, + TraceConfig::new( + false, + false, + PathBuf::from("./jaeger_jsons"), + LevelConfig::INFO + ) + ); + + assert_eq!( + config.compact, + CompactConfig { + compact_batch_size: 123, + compact_sleep_interval: Duration::from_millis(5), + auto_compact_config: Some(AutoCompactConfig::Periodic(Duration::from_secs( + 10 * 60 * 60 + ))) + } + ); + + assert_eq!( + config.auth, + AuthConfig { + auth_private_key: Some(PathBuf::from("./private_key.pem")), + auth_public_key: Some(PathBuf::from("./public_key.pem")), + } + ); + + assert_eq!( + config.tls, + TlsConfig { + peer_cert_path: Some(PathBuf::from("./cert.pem")), + peer_key_path: Some(PathBuf::from("./key.pem")), + client_ca_cert_path: Some(PathBuf::from("./ca.pem")), + ..Default::default() + } + ); + + assert_eq!( + config.metrics, + MetricsConfig { + enable: true, + port: 9100, + path: "/metrics".to_owned(), + push: true, + push_endpoint: "http://some-endpoint.com:4396".to_owned(), + push_protocol: MetricsPushProtocol::HTTP, + }, + ); + } + + #[test] + fn test_xline_server_default_should_be_loaded() { + let config: XlineServerConfig = toml::from_str( + "[cluster] + name = 'node1' + is_leader = true + peer_listen_urls = ['127.0.0.1:2380'] + peer_advertise_urls = ['127.0.0.1:2380'] + client_listen_urls = ['127.0.0.1:2379'] + client_advertise_urls = ['127.0.0.1:2379'] + + [cluster.peers] + node1 = ['127.0.0.1:2379'] + node2 = ['127.0.0.1:2380'] + node3 = ['127.0.0.1:2381'] + + [cluster.storage] + + [log] + path = '/var/log/xline' + + [storage] + engine = { type = 'rocksdb', data_dir = '/usr/local/xline/data-dir' } + + [compact] + + [trace] + jaeger_online = false + jaeger_offline = false + jaeger_output_dir = './jaeger_jsons' + jaeger_level = 'info' + + [auth] + + [tls] + ", + ) + .unwrap(); + + assert_eq!( + config.cluster, + ClusterConfig::new( + "node1".to_owned(), + vec!["127.0.0.1:2380".to_owned()], + vec!["127.0.0.1:2380".to_owned()], + vec!["127.0.0.1:2379".to_owned()], + vec!["127.0.0.1:2379".to_owned()], + HashMap::from([ + ("node1".to_owned(), vec!["127.0.0.1:2379".to_owned()]), + ("node2".to_owned(), vec!["127.0.0.1:2380".to_owned()]), + ("node3".to_owned(), vec!["127.0.0.1:2381".to_owned()]), + ]), + true, + CurpConfigBuilder::default().build().unwrap(), + ClientConfig::default(), + ServerTimeout::default(), + InitialClusterState::default() + ) + ); + + if let EngineConfig::RocksDB(path) = config.storage.engine { + assert_eq!(path, PathBuf::from("/usr/local/xline/data-dir")); + } else { + unreachable!(); + } + + assert_eq!( + config.log, + LogConfig::new( + PathBuf::from("/var/log/xline"), + RotationConfig::Daily, + LevelConfig::INFO + ) + ); + assert_eq!( + config.trace, + TraceConfig::new( + false, + false, + PathBuf::from("./jaeger_jsons"), + LevelConfig::INFO + ) + ); + assert_eq!(config.compact, CompactConfig::default()); + assert_eq!(config.auth, AuthConfig::default()); + assert_eq!(config.tls, TlsConfig::default()); + assert_eq!(config.metrics, MetricsConfig::default()); + } + + #[test] + fn test_auto_revision_compactor_should_be_loaded() { + let config: XlineServerConfig = toml::from_str( + "[cluster] + name = 'node1' + is_leader = true + peer_listen_urls = ['127.0.0.1:2380'] + peer_advertise_urls = ['127.0.0.1:2380'] + client_listen_urls = ['127.0.0.1:2379'] + client_advertise_urls = ['127.0.0.1:2379'] + + [cluster.peers] + node1 = ['127.0.0.1:2379'] + node2 = ['127.0.0.1:2380'] + node3 = ['127.0.0.1:2381'] + + [cluster.storage] + + [log] + path = '/var/log/xline' + + [storage] + engine = { type = 'memory' } + + [compact] + + [compact.auto_compact_config] + mode = 'revision' + retention = 10000 + + [trace] + jaeger_online = false + jaeger_offline = false + jaeger_output_dir = './jaeger_jsons' + jaeger_level = 'info' + + [auth] + + [tls] + ", + ) + .unwrap(); + + assert_eq!( + config.compact, + CompactConfig { + auto_compact_config: Some(AutoCompactConfig::Revision(10000)), + ..Default::default() + } + ); + } +} diff --git a/crates/utils/src/config/server.rs b/crates/utils/src/config/server.rs new file mode 100644 index 000000000..4cf9e5424 --- /dev/null +++ b/crates/utils/src/config/server.rs @@ -0,0 +1,107 @@ +use std::time::Duration; + +use getset::Getters; +use serde::Deserialize; + +/// Xline server settings +#[derive(Copy, Clone, Debug, Deserialize, PartialEq, Eq, Getters)] +#[allow(clippy::module_name_repetitions)] +pub struct ServerTimeout { + /// Range request retry timeout settings + #[getset(get = "pub")] + #[serde(with = "duration_format", default = "default_range_retry_timeout")] + range_retry_timeout: Duration, + /// Range request retry timeout settings + #[getset(get = "pub")] + #[serde(with = "duration_format", default = "default_compact_timeout")] + compact_timeout: Duration, + /// Sync victims interval + #[getset(get = "pub")] + #[serde(with = "duration_format", default = "default_sync_victims_interval")] + sync_victims_interval: Duration, + /// Watch progress notify interval settings + #[getset(get = "pub")] + #[serde( + with = "duration_format", + default = "default_watch_progress_notify_interval" + )] + watch_progress_notify_interval: Duration, +} + +impl ServerTimeout { + /// Create a new server timeout + #[must_use] + #[inline] + pub fn new( + range_retry_timeout: Duration, + compact_timeout: Duration, + sync_victims_interval: Duration, + watch_progress_notify_interval: Duration, + ) -> Self { + Self { + range_retry_timeout, + compact_timeout, + sync_victims_interval, + watch_progress_notify_interval, + } + } +} + +impl Default for ServerTimeout { + #[inline] + fn default() -> Self { + Self { + range_retry_timeout: default_range_retry_timeout(), + compact_timeout: default_compact_timeout(), + sync_victims_interval: default_sync_victims_interval(), + watch_progress_notify_interval: default_watch_progress_notify_interval(), + } + } +} + +/// default range retry timeout +#[must_use] +#[inline] +pub const fn default_range_retry_timeout() -> Duration { + Duration::from_secs(2) +} + +/// default compact timeout +#[must_use] +#[inline] +pub const fn default_compact_timeout() -> Duration { + Duration::from_secs(5) +} + +/// default sync victims interval +#[must_use] +#[inline] +pub const fn default_sync_victims_interval() -> Duration { + Duration::from_millis(10) +} + +/// default watch progress notify interval +#[must_use] +#[inline] +pub const fn default_watch_progress_notify_interval() -> Duration { + Duration::from_secs(600) +} + +/// `Duration` deserialization formatter +pub mod duration_format { + use std::time::Duration; + + use serde::{Deserialize, Deserializer}; + + use crate::parse_duration; + + /// deserializes a cluster duration + #[allow(single_use_lifetimes)] // the false positive case blocks us + pub(crate) fn deserialize<'de, D>(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + let s = String::deserialize(deserializer)?; + parse_duration(&s).map_err(serde::de::Error::custom) + } +} diff --git a/crates/utils/src/config/storage.rs b/crates/utils/src/config/storage.rs new file mode 100644 index 000000000..9466d7f39 --- /dev/null +++ b/crates/utils/src/config/storage.rs @@ -0,0 +1,43 @@ +use serde::Deserialize; + +use crate::config::engine::EngineConfig; + +/// Storage Configuration +#[derive(Debug, Clone, Deserialize, PartialEq, Eq)] +#[allow(clippy::module_name_repetitions)] +#[non_exhaustive] +pub struct StorageConfig { + /// Engine Configuration + #[serde(default = "EngineConfig::default")] + pub engine: EngineConfig, + /// Quota + #[serde(default = "default_quota")] + pub quota: u64, +} + +impl StorageConfig { + /// Create a new storage config + #[inline] + #[must_use] + pub fn new(engine: EngineConfig, quota: u64) -> Self { + Self { engine, quota } + } +} + +impl Default for StorageConfig { + #[inline] + fn default() -> Self { + Self { + engine: EngineConfig::default(), + quota: default_quota(), + } + } +} + +/// Default quota: 8GB +#[inline] +#[must_use] +pub fn default_quota() -> u64 { + // 8 * 1024 * 1024 * 1024 + 0x0002_0000_0000 +} diff --git a/crates/utils/src/config/tls.rs b/crates/utils/src/config/tls.rs new file mode 100644 index 000000000..de56d1bc3 --- /dev/null +++ b/crates/utils/src/config/tls.rs @@ -0,0 +1,59 @@ +use std::path::PathBuf; + +use getset::Getters; +use serde::Deserialize; + +/// Xline tls configuration object +#[allow(clippy::module_name_repetitions)] +#[non_exhaustive] +#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Getters, Default)] +pub struct TlsConfig { + /// The CA certificate file used by peer to verify client certificates + #[getset(get = "pub")] + pub peer_ca_cert_path: Option, + /// The public key file used by peer + #[getset(get = "pub")] + pub peer_cert_path: Option, + /// The private key file used by peer + #[getset(get = "pub")] + pub peer_key_path: Option, + /// The CA certificate file used by client to verify peer certificates + #[getset(get = "pub")] + pub client_ca_cert_path: Option, + /// The public key file used by client + #[getset(get = "pub")] + pub client_cert_path: Option, + /// The private key file used by client + #[getset(get = "pub")] + pub client_key_path: Option, +} + +impl TlsConfig { + /// Create a new `TlsConfig` object + #[must_use] + #[inline] + pub fn new( + peer_ca_cert_path: Option, + peer_cert_path: Option, + peer_key_path: Option, + client_ca_cert_path: Option, + client_cert_path: Option, + client_key_path: Option, + ) -> Self { + Self { + peer_ca_cert_path, + peer_cert_path, + peer_key_path, + client_ca_cert_path, + client_cert_path, + client_key_path, + } + } + + /// Whether the server tls is enabled + #[must_use] + #[inline] + pub fn server_tls_enabled(&self) -> bool { + self.peer_cert_path.is_some() && self.peer_key_path.is_some() + } +} diff --git a/crates/utils/src/config/trace.rs b/crates/utils/src/config/trace.rs new file mode 100644 index 000000000..7e44f9ca8 --- /dev/null +++ b/crates/utils/src/config/trace.rs @@ -0,0 +1,56 @@ +use std::path::PathBuf; + +use getset::Getters; +use serde::Deserialize; + +use super::log::{default_log_level, level_format, LevelConfig}; + +/// Xline tracing configuration object +#[allow(clippy::module_name_repetitions)] +#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Getters)] +pub struct TraceConfig { + /// Open jaeger online, sending data to jaeger agent directly + #[getset(get = "pub")] + jaeger_online: bool, + /// Open jaeger offline, saving data to the `jaeger_output_dir` + #[getset(get = "pub")] + jaeger_offline: bool, + /// The dir path to save the data when `jaeger_offline` is on + #[getset(get = "pub")] + jaeger_output_dir: PathBuf, + /// The verbosity level of tracing + #[getset(get = "pub")] + #[serde(with = "level_format", default = "default_log_level")] + jaeger_level: LevelConfig, +} + +impl Default for TraceConfig { + #[inline] + fn default() -> Self { + Self { + jaeger_online: false, + jaeger_offline: false, + jaeger_output_dir: "".into(), + jaeger_level: default_log_level(), + } + } +} + +impl TraceConfig { + /// Generate a new `TraceConfig` object + #[must_use] + #[inline] + pub fn new( + jaeger_online: bool, + jaeger_offline: bool, + jaeger_output_dir: PathBuf, + jaeger_level: LevelConfig, + ) -> Self { + Self { + jaeger_online, + jaeger_offline, + jaeger_output_dir, + jaeger_level, + } + } +} diff --git a/crates/utils/src/parser.rs b/crates/utils/src/parser.rs index ba7186074..bb9e1c050 100644 --- a/crates/utils/src/parser.rs +++ b/crates/utils/src/parser.rs @@ -3,8 +3,10 @@ use std::{collections::HashMap, time::Duration}; use clippy_utilities::OverflowArithmetic; use thiserror::Error; -use crate::config::{ - ClusterRange, InitialClusterState, LevelConfig, MetricsPushProtocol, RotationConfig, +pub use crate::config::{ + cluster::{ClusterRange, InitialClusterState}, + log::{LevelConfig, RotationConfig}, + metrics::MetricsPushProtocol, }; /// seconds per minute diff --git a/crates/xline-client/src/lib.rs b/crates/xline-client/src/lib.rs index 11f780cdc..b29ab2e58 100644 --- a/crates/xline-client/src/lib.rs +++ b/crates/xline-client/src/lib.rs @@ -174,7 +174,7 @@ use tonic::transport::ClientTlsConfig; use tower::Service; #[cfg(madsim)] use utils::ClientTlsConfig; -use utils::{build_endpoint, config::ClientConfig}; +use utils::{build_endpoint, config::client::ClientConfig}; use xlineapi::command::{Command, CurpClient}; use crate::{ diff --git a/crates/xline-test-utils/src/lib.rs b/crates/xline-test-utils/src/lib.rs index e1bfd24de..d0fa9aadc 100644 --- a/crates/xline-test-utils/src/lib.rs +++ b/crates/xline-test-utils/src/lib.rs @@ -10,8 +10,16 @@ use tokio::{ }; use tonic::transport::ClientTlsConfig; use utils::config::{ - default_quota, AuthConfig, ClusterConfig, CompactConfig, EngineConfig, InitialClusterState, - LogConfig, MetricsConfig, StorageConfig, TlsConfig, TraceConfig, XlineServerConfig, + auth::AuthConfig, + cluster::{ClusterConfig, InitialClusterState}, + compact::CompactConfig, + engine::EngineConfig, + log::LogConfig, + metrics::MetricsConfig, + storage::{default_quota, StorageConfig}, + tls::TlsConfig, + trace::TraceConfig, + XlineServerConfig, }; use xline::server::XlineServer; use xline_client::types::auth::{ diff --git a/crates/xline/src/server/maintenance.rs b/crates/xline/src/server/maintenance.rs index 8ea586818..4b8eac235 100644 --- a/crates/xline/src/server/maintenance.rs +++ b/crates/xline/src/server/maintenance.rs @@ -288,7 +288,7 @@ mod test { use test_macros::abort_on_panic; use tokio_stream::StreamExt; - use utils::config::EngineConfig; + use utils::config::engine::EngineConfig; use super::*; use crate::storage::db::DB; diff --git a/crates/xline/src/server/watch_server.rs b/crates/xline/src/server/watch_server.rs index ecc99987f..288fe27ef 100644 --- a/crates/xline/src/server/watch_server.rs +++ b/crates/xline/src/server/watch_server.rs @@ -435,7 +435,7 @@ mod test { sync::mpsc, time::{sleep, timeout}, }; - use utils::config::{default_watch_progress_notify_interval, EngineConfig}; + use utils::config::{engine::EngineConfig, server::default_watch_progress_notify_interval}; use xlineapi::RequestWrapper; use super::*; diff --git a/crates/xline/src/server/xline_server.rs b/crates/xline/src/server/xline_server.rs index 661c4a38d..3f44d4f8b 100644 --- a/crates/xline/src/server/xline_server.rs +++ b/crates/xline/src/server/xline_server.rs @@ -24,8 +24,12 @@ use tonic::transport::{server::Router, Server}; use tracing::{info, warn}; use utils::{ config::{ - AuthConfig, ClusterConfig, CompactConfig, EngineConfig, InitialClusterState, StorageConfig, - TlsConfig, + auth::AuthConfig, + cluster::{ClusterConfig, InitialClusterState}, + compact::CompactConfig, + engine::EngineConfig, + storage::StorageConfig, + tls::TlsConfig, }, task_manager::{tasks::TaskName, TaskManager}, }; diff --git a/crates/xline/src/storage/auth_store/store.rs b/crates/xline/src/storage/auth_store/store.rs index e91c23776..f86257324 100644 --- a/crates/xline/src/storage/auth_store/store.rs +++ b/crates/xline/src/storage/auth_store/store.rs @@ -1173,7 +1173,7 @@ mod test { use std::collections::HashMap; use merged_range::MergedRange; - use utils::config::EngineConfig; + use utils::config::engine::EngineConfig; use super::*; use crate::{ diff --git a/crates/xline/src/storage/compact/mod.rs b/crates/xline/src/storage/compact/mod.rs index 36dcb19f5..48541fa0c 100644 --- a/crates/xline/src/storage/compact/mod.rs +++ b/crates/xline/src/storage/compact/mod.rs @@ -7,7 +7,7 @@ use periodic_compactor::PeriodicCompactor; use revision_compactor::RevisionCompactor; use tokio::{sync::mpsc::Receiver, time::sleep}; use utils::{ - config::AutoCompactConfig, + config::compact::AutoCompactConfig, task_manager::{tasks::TaskName, Listener, TaskManager}, }; use xlineapi::{command::Command, execute_error::ExecuteError, RequestWrapper}; diff --git a/crates/xline/src/storage/db.rs b/crates/xline/src/storage/db.rs index 9eaf04aae..8ebc54b6f 100644 --- a/crates/xline/src/storage/db.rs +++ b/crates/xline/src/storage/db.rs @@ -3,7 +3,7 @@ use std::{collections::HashMap, path::Path, sync::Arc}; use engine::{Engine, EngineType, Snapshot, StorageEngine, WriteOperation}; use prost::Message; use utils::{ - config::EngineConfig, + config::engine::EngineConfig, table_names::{ ALARM_TABLE, AUTH_TABLE, KV_TABLE, LEASE_TABLE, META_TABLE, ROLE_TABLE, USER_TABLE, XLINE_TABLES, diff --git a/crates/xline/src/storage/kv_store.rs b/crates/xline/src/storage/kv_store.rs index ebbedf68e..57f826e01 100644 --- a/crates/xline/src/storage/kv_store.rs +++ b/crates/xline/src/storage/kv_store.rs @@ -928,7 +928,7 @@ mod test { use test_macros::abort_on_panic; use tokio::{runtime::Handle, task::block_in_place}; use utils::{ - config::EngineConfig, + config::engine::EngineConfig, task_manager::{tasks::TaskName, TaskManager}, }; diff --git a/crates/xline/src/storage/kvwatcher.rs b/crates/xline/src/storage/kvwatcher.rs index 70b39fd0d..f545bac18 100644 --- a/crates/xline/src/storage/kvwatcher.rs +++ b/crates/xline/src/storage/kvwatcher.rs @@ -604,7 +604,7 @@ mod test { use clippy_utilities::{NumericCast, OverflowArithmetic}; use test_macros::abort_on_panic; use tokio::time::{sleep, timeout}; - use utils::config::EngineConfig; + use utils::config::engine::EngineConfig; use xlineapi::RequestWrapper; use super::*; diff --git a/crates/xline/src/storage/lease_store/mod.rs b/crates/xline/src/storage/lease_store/mod.rs index 9986ec40e..051aeb478 100644 --- a/crates/xline/src/storage/lease_store/mod.rs +++ b/crates/xline/src/storage/lease_store/mod.rs @@ -373,7 +373,7 @@ mod test { use std::{error::Error, time::Duration}; use test_macros::abort_on_panic; - use utils::config::EngineConfig; + use utils::config::engine::EngineConfig; use super::*; use crate::storage::db::DB; diff --git a/crates/xline/src/utils/args.rs b/crates/xline/src/utils/args.rs index 199d26b7e..bf6e03807 100644 --- a/crates/xline/src/utils/args.rs +++ b/crates/xline/src/utils/args.rs @@ -5,19 +5,37 @@ use clap::Parser; use tokio::fs; use utils::{ config::{ - default_batch_max_size, default_batch_timeout, default_candidate_timeout_ticks, - default_client_id_keep_alive_interval, default_client_wait_synced_timeout, - default_cmd_workers, default_compact_batch_size, default_compact_sleep_interval, - default_compact_timeout, default_follower_timeout_ticks, default_gc_interval, - default_heartbeat_interval, default_initial_retry_timeout, default_log_entries_cap, - default_log_level, default_max_retry_timeout, default_metrics_enable, default_metrics_path, - default_metrics_port, default_metrics_push_endpoint, default_metrics_push_protocol, - default_propose_timeout, default_quota, default_range_retry_timeout, default_retry_count, - default_rotation, default_rpc_timeout, default_server_wait_synced_timeout, - default_sync_victims_interval, default_watch_progress_notify_interval, AuthConfig, - AutoCompactConfig, ClientConfig, ClusterConfig, CompactConfig, CurpConfigBuilder, - EngineConfig, InitialClusterState, LevelConfig, LogConfig, MetricsConfig, - MetricsPushProtocol, RotationConfig, ServerTimeout, StorageConfig, TlsConfig, TraceConfig, + auth::AuthConfig, + client::{ + default_client_id_keep_alive_interval, default_client_wait_synced_timeout, + default_initial_retry_timeout, default_max_retry_timeout, default_propose_timeout, + default_retry_count, ClientConfig, + }, + cluster::{ClusterConfig, InitialClusterState}, + compact::{ + default_compact_batch_size, default_compact_sleep_interval, AutoCompactConfig, + CompactConfig, + }, + curp::{ + default_batch_max_size, default_batch_timeout, default_candidate_timeout_ticks, + default_cmd_workers, default_follower_timeout_ticks, default_gc_interval, + default_heartbeat_interval, default_log_entries_cap, default_rpc_timeout, + default_server_wait_synced_timeout, CurpConfigBuilder, + }, + engine::EngineConfig, + log::{default_log_level, default_rotation, LevelConfig, LogConfig, RotationConfig}, + metrics::{ + default_metrics_enable, default_metrics_path, default_metrics_port, + default_metrics_push_endpoint, default_metrics_push_protocol, MetricsConfig, + MetricsPushProtocol, + }, + server::{ + default_compact_timeout, default_range_retry_timeout, default_sync_victims_interval, + default_watch_progress_notify_interval, ServerTimeout, + }, + storage::{default_quota, StorageConfig}, + tls::TlsConfig, + trace::TraceConfig, XlineServerConfig, }, parse_batch_bytes, parse_duration, parse_log_level, parse_members, parse_metrics_push_protocol, @@ -299,7 +317,10 @@ impl From for XlineServerConfig { args.jaeger_output_dir, args.jaeger_level, ); - let auth = AuthConfig::new(args.auth_public_key, args.auth_private_key); + let auth = AuthConfig::new() + .auth_public_key(args.auth_public_key.unwrap()) + .auth_private_key(args.auth_private_key.unwrap()) + .build(); let auto_compactor_cfg = if let Some(mode) = args.auto_compact_mode { match mode.as_str() { "periodic" => { diff --git a/crates/xline/src/utils/metrics.rs b/crates/xline/src/utils/metrics.rs index 97a22896d..7d4af693e 100644 --- a/crates/xline/src/utils/metrics.rs +++ b/crates/xline/src/utils/metrics.rs @@ -2,7 +2,7 @@ use opentelemetry::global; use opentelemetry_otlp::WithExportConfig; use opentelemetry_sdk::{metrics::SdkMeterProvider, runtime::Tokio}; use tracing::info; -use utils::config::{MetricsConfig, MetricsPushProtocol}; +use utils::config::metrics::{MetricsConfig, MetricsPushProtocol}; /// Start metrics server /// # Errors diff --git a/crates/xline/src/utils/trace.rs b/crates/xline/src/utils/trace.rs index 3b3fda98c..c2e196fa5 100644 --- a/crates/xline/src/utils/trace.rs +++ b/crates/xline/src/utils/trace.rs @@ -3,7 +3,10 @@ use opentelemetry_contrib::trace::exporter::jaeger_json::JaegerJsonExporter; use opentelemetry_sdk::runtime::Tokio; use tracing_appender::non_blocking::WorkerGuard; use tracing_subscriber::{fmt::format, layer::SubscriberExt, util::SubscriberInitExt, Layer}; -use utils::config::{file_appender, LogConfig, TraceConfig}; +use utils::config::{ + log::{file_appender, LogConfig}, + trace::TraceConfig, +}; /// init tracing subscriber /// # Errors diff --git a/crates/xline/tests/it/auth_test.rs b/crates/xline/tests/it/auth_test.rs index 2692ffaa4..24588fdaf 100644 --- a/crates/xline/tests/it/auth_test.rs +++ b/crates/xline/tests/it/auth_test.rs @@ -2,8 +2,9 @@ use std::{error::Error, iter, path::PathBuf}; use test_macros::abort_on_panic; use utils::config::{ - AuthConfig, ClusterConfig, CompactConfig, LogConfig, MetricsConfig, StorageConfig, TlsConfig, - TraceConfig, XlineServerConfig, + auth::AuthConfig, cluster::ClusterConfig, compact::CompactConfig, log::LogConfig, + metrics::MetricsConfig, storage::StorageConfig, tls::TlsConfig, trace::TraceConfig, + XlineServerConfig, }; use xline_test_utils::{ enable_auth, set_user, diff --git a/crates/xline/tests/it/tls_test.rs b/crates/xline/tests/it/tls_test.rs index 527b305c1..5275ccaad 100644 --- a/crates/xline/tests/it/tls_test.rs +++ b/crates/xline/tests/it/tls_test.rs @@ -4,8 +4,9 @@ use etcd_client::ConnectOptions; use test_macros::abort_on_panic; use tonic::transport::{Certificate, ClientTlsConfig, Identity}; use utils::config::{ - AuthConfig, ClusterConfig, CompactConfig, LogConfig, MetricsConfig, StorageConfig, TlsConfig, - TraceConfig, XlineServerConfig, + auth::AuthConfig, cluster::ClusterConfig, compact::CompactConfig, log::LogConfig, + metrics::MetricsConfig, storage::StorageConfig, tls::TlsConfig, trace::TraceConfig, + XlineServerConfig, }; use xline_client::types::kv::PutRequest; use xline_test_utils::{enable_auth, set_user, Cluster}; diff --git a/crates/xlinectl/src/main.rs b/crates/xlinectl/src/main.rs index 42fbd82b3..fe76d5ff2 100644 --- a/crates/xlinectl/src/main.rs +++ b/crates/xlinectl/src/main.rs @@ -162,7 +162,7 @@ use std::{path::PathBuf, time::Duration}; use anyhow::Result; use clap::{arg, value_parser, Command}; use command::compaction; -use ext_utils::config::ClientConfig; +use ext_utils::config::client::ClientConfig; use tokio::fs; use tonic::transport::{Certificate, ClientTlsConfig}; use xline_client::{Client, ClientOptions};