diff --git a/Cargo.lock b/Cargo.lock index f0334ce9cfc1..49ccfc2c6458 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2331,6 +2331,7 @@ checksum = "1aaf95b3e5c8f23aa320147307562d361db0ae0d51242340f558153b4eb2439b" name = "drainer" version = "0.1.0" dependencies = [ + "actix-web", "async-bb8-diesel", "async-trait", "bb8", @@ -2342,8 +2343,10 @@ dependencies = [ "error-stack", "external_services", "masking", + "mime", "once_cell", "redis_interface", + "reqwest", "router_env", "serde", "serde_json", diff --git a/crates/api_models/src/health_check.rs b/crates/api_models/src/health_check.rs index e4611f43bcf8..a8f3db11e39b 100644 --- a/crates/api_models/src/health_check.rs +++ b/crates/api_models/src/health_check.rs @@ -9,6 +9,7 @@ pub struct RouterHealthCheckResponse { } impl common_utils::events::ApiEventMetric for RouterHealthCheckResponse {} + #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct SchedulerHealthCheckResponse { pub database: bool, diff --git a/crates/drainer/Cargo.toml b/crates/drainer/Cargo.toml index 67169a151044..0533bd12dab8 100644 --- a/crates/drainer/Cargo.toml +++ b/crates/drainer/Cargo.toml @@ -14,13 +14,16 @@ hashicorp-vault = ["external_services/hashicorp-vault"] vergen = ["router_env/vergen"] [dependencies] +actix-web = "4.3.1" async-bb8-diesel = { git = "https://github.com/jarnura/async-bb8-diesel", rev = "53b4ab901aab7635c8215fd1c2d542c8db443094" } bb8 = "0.8" clap = { version = "4.3.2", default-features = false, features = ["std", "derive", "help", "usage"] } config = { version = "0.13.3", features = ["toml"] } diesel = { version = "2.1.0", features = ["postgres"] } error-stack = "0.3.1" +mime = "0.3.17" once_cell = "1.18.0" +reqwest = { version = "0.11.18" } serde = "1.0.193" serde_json = "1.0.108" serde_path_to_error = "0.1.14" diff --git a/crates/drainer/src/errors.rs b/crates/drainer/src/errors.rs index 3034e849f8a9..8605ee2ba04e 100644 --- a/crates/drainer/src/errors.rs +++ b/crates/drainer/src/errors.rs @@ -15,6 +15,22 @@ pub enum DrainerError { ParsingError(error_stack::Report), #[error("Unexpected error occurred: {0}")] UnexpectedError(String), + #[error("I/O: {0}")] + IoError(std::io::Error), +} + +#[derive(Debug, Error, Clone, serde::Serialize)] +pub enum HealthCheckError { + #[error("Database health check is failing with error: {message}")] + DbError { message: String }, + #[error("Redis health check is failing with error: {message}")] + RedisError { message: String }, +} + +impl From for DrainerError { + fn from(err: std::io::Error) -> Self { + Self::IoError(err) + } } pub type DrainerResult = error_stack::Result; @@ -30,3 +46,13 @@ impl From> for DrainerError { Self::RedisError(err) } } + +impl actix_web::ResponseError for HealthCheckError { + fn status_code(&self) -> reqwest::StatusCode { + use reqwest::StatusCode; + + match self { + Self::DbError { .. } | Self::RedisError { .. } => StatusCode::INTERNAL_SERVER_ERROR, + } + } +} diff --git a/crates/drainer/src/health_check.rs b/crates/drainer/src/health_check.rs new file mode 100644 index 000000000000..33b4a1395a8c --- /dev/null +++ b/crates/drainer/src/health_check.rs @@ -0,0 +1,268 @@ +use std::sync::Arc; + +use actix_web::{web, Scope}; +use async_bb8_diesel::{AsyncConnection, AsyncRunQueryDsl}; +use common_utils::errors::CustomResult; +use diesel_models::{Config, ConfigNew}; +use error_stack::ResultExt; +use router_env::{instrument, logger, tracing}; + +use crate::{ + connection::{pg_connection, redis_connection}, + errors::HealthCheckError, + services::{self, Store}, + settings::Settings, +}; + +pub const TEST_STREAM_NAME: &str = "TEST_STREAM_0"; +pub const TEST_STREAM_DATA: &[(&str, &str)] = &[("data", "sample_data")]; + +pub struct Health; + +impl Health { + pub fn server(conf: Settings, store: Arc) -> Scope { + web::scope("health") + .app_data(web::Data::new(conf)) + .app_data(web::Data::new(store)) + .service(web::resource("").route(web::get().to(health))) + .service(web::resource("/ready").route(web::get().to(deep_health_check))) + } +} + +#[instrument(skip_all)] +pub async fn health() -> impl actix_web::Responder { + logger::info!("Drainer health was called"); + actix_web::HttpResponse::Ok().body("Drainer health is good") +} + +#[instrument(skip_all)] +pub async fn deep_health_check( + conf: web::Data, + store: web::Data>, +) -> impl actix_web::Responder { + match deep_health_check_func(conf, store).await { + Ok(response) => services::http_response_json( + serde_json::to_string(&response) + .map_err(|err| { + logger::error!(serialization_error=?err); + }) + .unwrap_or_default(), + ), + + Err(err) => services::log_and_return_error_response(err), + } +} + +#[instrument(skip_all)] +pub async fn deep_health_check_func( + conf: web::Data, + store: web::Data>, +) -> Result> { + logger::info!("Deep health check was called"); + + logger::debug!("Database health check begin"); + + let db_status = store.health_check_db().await.map(|_| true).map_err(|err| { + error_stack::report!(HealthCheckError::DbError { + message: err.to_string() + }) + })?; + + logger::debug!("Database health check end"); + + logger::debug!("Redis health check begin"); + + let redis_status = store + .health_check_redis(&conf.into_inner()) + .await + .map(|_| true) + .map_err(|err| { + error_stack::report!(HealthCheckError::RedisError { + message: err.to_string() + }) + })?; + + logger::debug!("Redis health check end"); + + Ok(DrainerHealthCheckResponse { + database: db_status, + redis: redis_status, + }) +} + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct DrainerHealthCheckResponse { + pub database: bool, + pub redis: bool, +} + +#[async_trait::async_trait] +pub trait HealthCheckInterface { + async fn health_check_db(&self) -> CustomResult<(), HealthCheckDBError>; + async fn health_check_redis(&self, conf: &Settings) -> CustomResult<(), HealthCheckRedisError>; +} + +#[async_trait::async_trait] +impl HealthCheckInterface for Store { + async fn health_check_db(&self) -> CustomResult<(), HealthCheckDBError> { + let conn = pg_connection(&self.master_pool).await; + + conn + .transaction_async(|conn| { + Box::pin(async move { + let query = + diesel::select(diesel::dsl::sql::("1 + 1")); + let _x: i32 = query.get_result_async(&conn).await.map_err(|err| { + logger::error!(read_err=?err,"Error while reading element in the database"); + HealthCheckDBError::DbReadError + })?; + + logger::debug!("Database read was successful"); + + let config = ConfigNew { + key: "test_key".to_string(), + config: "test_value".to_string(), + }; + + config.insert(&conn).await.map_err(|err| { + logger::error!(write_err=?err,"Error while writing to database"); + HealthCheckDBError::DbWriteError + })?; + + logger::debug!("Database write was successful"); + + Config::delete_by_key(&conn, "test_key").await.map_err(|err| { + logger::error!(delete_err=?err,"Error while deleting element in the database"); + HealthCheckDBError::DbDeleteError + })?; + + logger::debug!("Database delete was successful"); + + Ok::<_, HealthCheckDBError>(()) + }) + }) + .await?; + + Ok(()) + } + + async fn health_check_redis(&self, conf: &Settings) -> CustomResult<(), HealthCheckRedisError> { + let redis_conn = redis_connection(conf).await; + + redis_conn + .serialize_and_set_key_with_expiry("test_key", "test_value", 30) + .await + .change_context(HealthCheckRedisError::SetFailed)?; + + logger::debug!("Redis set_key was successful"); + + redis_conn + .get_key("test_key") + .await + .change_context(HealthCheckRedisError::GetFailed)?; + + logger::debug!("Redis get_key was successful"); + + redis_conn + .delete_key("test_key") + .await + .change_context(HealthCheckRedisError::DeleteFailed)?; + + logger::debug!("Redis delete_key was successful"); + + redis_conn + .stream_append_entry( + TEST_STREAM_NAME, + &redis_interface::RedisEntryId::AutoGeneratedID, + TEST_STREAM_DATA.to_vec(), + ) + .await + .change_context(HealthCheckRedisError::StreamAppendFailed)?; + + logger::debug!("Stream append succeeded"); + + let output = self + .redis_conn + .stream_read_entries(TEST_STREAM_NAME, "0-0", Some(10)) + .await + .change_context(HealthCheckRedisError::StreamReadFailed)?; + logger::debug!("Stream read succeeded"); + + let (_, id_to_trim) = output + .get(TEST_STREAM_NAME) + .and_then(|entries| { + entries + .last() + .map(|last_entry| (entries, last_entry.0.clone())) + }) + .ok_or(error_stack::report!( + HealthCheckRedisError::StreamReadFailed + ))?; + logger::debug!("Stream parse succeeded"); + + redis_conn + .stream_trim_entries( + TEST_STREAM_NAME, + ( + redis_interface::StreamCapKind::MinID, + redis_interface::StreamCapTrim::Exact, + id_to_trim, + ), + ) + .await + .change_context(HealthCheckRedisError::StreamTrimFailed)?; + logger::debug!("Stream trim succeeded"); + + Ok(()) + } +} + +#[allow(clippy::enum_variant_names)] +#[derive(Debug, thiserror::Error)] +pub enum HealthCheckDBError { + #[error("Error while connecting to database")] + DbError, + #[error("Error while writing to database")] + DbWriteError, + #[error("Error while reading element in the database")] + DbReadError, + #[error("Error while deleting element in the database")] + DbDeleteError, + #[error("Unpredictable error occurred")] + UnknownError, + #[error("Error in database transaction")] + TransactionError, +} + +impl From for HealthCheckDBError { + fn from(error: diesel::result::Error) -> Self { + match error { + diesel::result::Error::DatabaseError(_, _) => Self::DbError, + + diesel::result::Error::RollbackErrorOnCommit { .. } + | diesel::result::Error::RollbackTransaction + | diesel::result::Error::AlreadyInTransaction + | diesel::result::Error::NotInTransaction + | diesel::result::Error::BrokenTransactionManager => Self::TransactionError, + + _ => Self::UnknownError, + } + } +} + +#[allow(clippy::enum_variant_names)] +#[derive(Debug, thiserror::Error)] +pub enum HealthCheckRedisError { + #[error("Failed to set key value in Redis")] + SetFailed, + #[error("Failed to get key value in Redis")] + GetFailed, + #[error("Failed to delete key value in Redis")] + DeleteFailed, + #[error("Failed to append data to the stream in Redis")] + StreamAppendFailed, + #[error("Failed to read data from the stream in Redis")] + StreamReadFailed, + #[error("Failed to trim data from the stream in Redis")] + StreamTrimFailed, +} diff --git a/crates/drainer/src/lib.rs b/crates/drainer/src/lib.rs index abb32c877962..909ae065e265 100644 --- a/crates/drainer/src/lib.rs +++ b/crates/drainer/src/lib.rs @@ -1,6 +1,7 @@ mod connection; pub mod errors; mod handler; +mod health_check; pub mod logger; pub(crate) mod metrics; mod query; @@ -11,6 +12,7 @@ mod types; mod utils; use std::sync::Arc; +use actix_web::dev::Server; use common_utils::signals::get_allowed_signals; use diesel_models::kv; use error_stack::{IntoReport, ResultExt}; @@ -18,7 +20,10 @@ use router_env::{instrument, tracing}; use tokio::sync::mpsc; use crate::{ - connection::pg_connection, services::Store, settings::DrainerSettings, types::StreamData, + connection::pg_connection, + services::Store, + settings::{DrainerSettings, Settings}, + types::StreamData, }; pub async fn start_drainer(store: Arc, conf: DrainerSettings) -> errors::DrainerResult<()> { @@ -49,3 +54,18 @@ pub async fn start_drainer(store: Arc, conf: DrainerSettings) -> errors:: Ok(()) } + +pub async fn start_web_server( + conf: Settings, + store: Arc, +) -> Result { + let server = conf.server.clone(); + let web_server = actix_web::HttpServer::new(move || { + actix_web::App::new().service(health_check::Health::server(conf.clone(), store.clone())) + }) + .bind((server.host.as_str(), server.port))? + .run(); + let _ = web_server.handle(); + + Ok(web_server) +} diff --git a/crates/drainer/src/main.rs b/crates/drainer/src/main.rs index 34c1294d55fa..943a66f67911 100644 --- a/crates/drainer/src/main.rs +++ b/crates/drainer/src/main.rs @@ -1,4 +1,7 @@ -use drainer::{errors::DrainerResult, logger::logger, services, settings, start_drainer}; +use drainer::{ + errors::DrainerResult, logger::logger, services, settings, start_drainer, start_web_server, +}; +use router_env::tracing::Instrument; #[tokio::main] async fn main() -> DrainerResult<()> { @@ -24,6 +27,19 @@ async fn main() -> DrainerResult<()> { [router_env::service_name!()], ); + #[allow(clippy::expect_used)] + let web_server = Box::pin(start_web_server(conf.clone(), store.clone())) + .await + .expect("Failed to create the server"); + + tokio::spawn( + async move { + let _ = web_server.await; + logger::error!("The health check probe stopped working!"); + } + .in_current_span(), + ); + logger::debug!(startup_config=?conf); logger::info!("Drainer started [{:?}] [{:?}]", conf.drainer, conf.log); diff --git a/crates/drainer/src/services.rs b/crates/drainer/src/services.rs index 4393ebb9dc97..0d0b78c0cc66 100644 --- a/crates/drainer/src/services.rs +++ b/crates/drainer/src/services.rs @@ -1,6 +1,12 @@ use std::sync::Arc; -use crate::connection::{diesel_make_pg_pool, PgPool}; +use actix_web::{body, HttpResponse, ResponseError}; +use error_stack::Report; + +use crate::{ + connection::{diesel_make_pg_pool, PgPool}, + logger, +}; #[derive(Clone)] pub struct Store { @@ -45,3 +51,23 @@ impl Store { } } } + +pub fn log_and_return_error_response(error: Report) -> HttpResponse +where + T: error_stack::Context + ResponseError + Clone, +{ + logger::error!(?error); + let body = serde_json::json!({ + "message": error.to_string() + }) + .to_string(); + HttpResponse::InternalServerError() + .content_type(mime::APPLICATION_JSON) + .body(body) +} + +pub fn http_response_json(response: T) -> HttpResponse { + HttpResponse::Ok() + .content_type(mime::APPLICATION_JSON) + .body(response) +} diff --git a/crates/drainer/src/settings.rs b/crates/drainer/src/settings.rs index 5b80ee375f54..49cb5f4c7c21 100644 --- a/crates/drainer/src/settings.rs +++ b/crates/drainer/src/settings.rs @@ -30,6 +30,7 @@ pub struct CmdLineConf { #[derive(Debug, Deserialize, Clone, Default)] #[serde(default)] pub struct Settings { + pub server: Server, pub master_database: Database, pub redis: redis::RedisSettings, pub log: Log, @@ -62,6 +63,24 @@ pub struct DrainerSettings { pub loop_interval: u32, // in milliseconds } +#[derive(Debug, Deserialize, Clone)] +#[serde(default)] +pub struct Server { + pub port: u16, + pub workers: usize, + pub host: String, +} + +impl Server { + pub fn validate(&self) -> Result<(), errors::DrainerError> { + common_utils::fp_utils::when(self.host.is_default_or_empty(), || { + Err(errors::DrainerError::ConfigParsingError( + "server host must not be empty".into(), + )) + }) + } +} + impl Default for Database { fn default() -> Self { Self { @@ -88,6 +107,16 @@ impl Default for DrainerSettings { } } +impl Default for Server { + fn default() -> Self { + Self { + host: "127.0.0.1".to_string(), + port: 8080, + workers: 1, + } + } +} + impl Database { fn validate(&self) -> Result<(), errors::DrainerError> { use common_utils::fp_utils::when; @@ -169,6 +198,7 @@ impl Settings { } pub fn validate(&self) -> Result<(), errors::DrainerError> { + self.server.validate()?; self.master_database.validate()?; self.redis.validate().map_err(|error| { println!("{error}");