From d1a47448f74ee1a2226aeb11f83b8e9ee32fe773 Mon Sep 17 00:00:00 2001 From: Chethan Date: Thu, 18 Jan 2024 23:42:31 +0530 Subject: [PATCH 01/14] feat: add deep health check for drainer --- Cargo.lock | 2 + crates/drainer/Cargo.toml | 2 + crates/drainer/src/errors.rs | 8 ++ crates/drainer/src/health_check.rs | 209 +++++++++++++++++++++++++++++ crates/drainer/src/lib.rs | 22 ++- crates/drainer/src/main.rs | 11 +- crates/drainer/src/settings.rs | 23 ++++ 7 files changed, 275 insertions(+), 2 deletions(-) create mode 100644 crates/drainer/src/health_check.rs diff --git a/Cargo.lock b/Cargo.lock index 7ce0851ba159..7a72a062112f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2265,6 +2265,7 @@ checksum = "1aaf95b3e5c8f23aa320147307562d361db0ae0d51242340f558153b4eb2439b" name = "drainer" version = "0.1.0" dependencies = [ + "actix-web", "async-bb8-diesel", "async-trait", "bb8", @@ -2276,6 +2277,7 @@ dependencies = [ "error-stack", "external_services", "masking", + "mime", "once_cell", "redis_interface", "router_env", diff --git a/crates/drainer/Cargo.toml b/crates/drainer/Cargo.toml index 50e0effd03e0..d5aed36f0641 100644 --- a/crates/drainer/Cargo.toml +++ b/crates/drainer/Cargo.toml @@ -13,12 +13,14 @@ kms = ["external_services/kms"] vergen = ["router_env/vergen"] [dependencies] +actix-web = "4.3.1" async-bb8-diesel = { git = "https://github.com/jarnura/async-bb8-diesel", rev = "53b4ab901aab7635c8215fd1c2d542c8db443094" } bb8 = "0.8" clap = { version = "4.3.2", default-features = false, features = ["std", "derive", "help", "usage"] } config = { version = "0.13.3", features = ["toml"] } diesel = { version = "2.1.0", features = ["postgres"] } error-stack = "0.3.1" +mime = "0.3.17" once_cell = "1.18.0" serde = "1.0.193" serde_json = "1.0.108" diff --git a/crates/drainer/src/errors.rs b/crates/drainer/src/errors.rs index 3034e849f8a9..fe2c27e879d3 100644 --- a/crates/drainer/src/errors.rs +++ b/crates/drainer/src/errors.rs @@ -15,6 +15,14 @@ pub enum DrainerError { ParsingError(error_stack::Report), #[error("Unexpected error occurred: {0}")] UnexpectedError(String), + #[error("I/O: {0}")] + IoError(std::io::Error), +} + +impl From for DrainerError { + fn from(err: std::io::Error) -> Self { + Self::IoError(err) + } } pub type DrainerResult = error_stack::Result; diff --git a/crates/drainer/src/health_check.rs b/crates/drainer/src/health_check.rs new file mode 100644 index 000000000000..cfe5c7736b4d --- /dev/null +++ b/crates/drainer/src/health_check.rs @@ -0,0 +1,209 @@ +use std::sync::Arc; + +use actix_web::{web, HttpResponse, Scope}; + +use async_bb8_diesel::{AsyncConnection, AsyncRunQueryDsl}; +use common_utils::errors::CustomResult; +use diesel_models::{Config, ConfigNew}; +use error_stack::ResultExt; +use router_env::{instrument, logger, tracing}; + +use crate::{ + connection::{pg_connection, redis_connection}, + services::Store, + settings::Settings, +}; + +pub struct Health; + +impl Health { + pub fn server(conf: Settings, store: Arc) -> Scope { + web::scope("health") + .app_data(web::Data::new(conf)) + .app_data(web::Data::new(store)) + .service(web::resource("").route(web::get().to(health))) + .service(web::resource("/deep_check").route(web::post().to(deep_health_check))) + } +} + +#[instrument(skip_all)] +pub async fn health() -> impl actix_web::Responder { + logger::info!("Drainer health was called"); + actix_web::HttpResponse::Ok().body("Drainer health is good") +} + +#[instrument(skip_all)] +pub async fn deep_health_check( + conf: web::Data, + store: web::Data>, +) -> impl actix_web::Responder { + let mut status_code = 200; + logger::info!("Deep health check was called"); + + logger::debug!("Database health check begin"); + + let db_status = match store.health_check_db().await { + Ok(_) => "Health is good".to_string(), + Err(err) => { + status_code = 500; + err.to_string() + } + }; + logger::debug!("Database health check end"); + + logger::debug!("Redis health check begin"); + + let redis_status = match store.health_check_redis(&conf).await { + Ok(_) => "Health is good".to_string(), + Err(err) => { + status_code = 500; + err.to_string() + } + }; + + logger::debug!("Redis health check end"); + + let response = serde_json::to_string(&DrainerHealthCheckResponse { + database: db_status, + redis: redis_status, + }) + .unwrap_or_default(); + + if status_code == 200 { + HttpResponse::Ok() + .content_type(mime::APPLICATION_JSON) + .body(response) + } else { + HttpResponse::InternalServerError() + .content_type(mime::APPLICATION_JSON) + .body(response) + } +} + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct DrainerHealthCheckResponse { + pub database: String, + pub redis: String, +} + +#[async_trait::async_trait] +pub trait HealthCheckInterface { + async fn health_check_db(&self) -> CustomResult<(), HealthCheckDBError>; + async fn health_check_redis(&self, conf: &Settings) -> CustomResult<(), HealthCheckRedisError>; +} + +#[async_trait::async_trait] +impl HealthCheckInterface for Store { + async fn health_check_db(&self) -> CustomResult<(), HealthCheckDBError> { + let conn = pg_connection(&self.master_pool).await; + + conn + .transaction_async(|conn| { + Box::pin(async move { + let query = + diesel::select(diesel::dsl::sql::("1 + 1")); + let _x: i32 = query.get_result_async(&conn).await.map_err(|err| { + logger::error!(read_err=?err,"Error while reading element in the database"); + HealthCheckDBError::DBReadError + })?; + + logger::debug!("Database read was successful"); + + let config = ConfigNew { + key: "test_key".to_string(), + config: "test_value".to_string(), + }; + + config.insert(&conn).await.map_err(|err| { + logger::error!(write_err=?err,"Error while writing to database"); + HealthCheckDBError::DBWriteError + })?; + + logger::debug!("Database write was successful"); + + Config::delete_by_key(&conn, "test_key").await.map_err(|err| { + logger::error!(delete_err=?err,"Error while deleting element in the database"); + HealthCheckDBError::DBDeleteError + })?; + + logger::debug!("Database delete was successful"); + + Ok::<_, HealthCheckDBError>(()) + }) + }) + .await?; + + Ok(()) + } + + async fn health_check_redis(&self, conf: &Settings) -> CustomResult<(), HealthCheckRedisError> { + let redis_conn = redis_connection(conf).await; + + redis_conn + .serialize_and_set_key_with_expiry("test_key", "test_value", 30) + .await + .change_context(HealthCheckRedisError::SetFailed)?; + + logger::debug!("Redis set_key was successful"); + + redis_conn + .get_key("test_key") + .await + .change_context(HealthCheckRedisError::GetFailed)?; + + logger::debug!("Redis get_key was successful"); + + redis_conn + .delete_key("test_key") + .await + .change_context(HealthCheckRedisError::DeleteFailed)?; + + logger::debug!("Redis delete_key was successful"); + + Ok(()) + } +} + +#[allow(clippy::enum_variant_names)] +#[derive(Debug, thiserror::Error)] +pub enum HealthCheckDBError { + #[error("Error while connecting to database")] + DBError, + #[error("Error while writing to database")] + DBWriteError, + #[error("Error while reading element in the database")] + DBReadError, + #[error("Error while deleting element in the database")] + DBDeleteError, + #[error("Unpredictable error occurred")] + UnknownError, + #[error("Error in database transaction")] + TransactionError, +} + +impl From for HealthCheckDBError { + fn from(error: diesel::result::Error) -> Self { + match error { + diesel::result::Error::DatabaseError(_, _) => Self::DBError, + + diesel::result::Error::RollbackErrorOnCommit { .. } + | diesel::result::Error::RollbackTransaction + | diesel::result::Error::AlreadyInTransaction + | diesel::result::Error::NotInTransaction + | diesel::result::Error::BrokenTransactionManager => Self::TransactionError, + + _ => Self::UnknownError, + } + } +} + +#[allow(clippy::enum_variant_names)] +#[derive(Debug, thiserror::Error)] +pub enum HealthCheckRedisError { + #[error("Failed to set key value in Redis")] + SetFailed, + #[error("Failed to get key value in Redis")] + GetFailed, + #[error("Failed to delete key value in Redis")] + DeleteFailed, +} diff --git a/crates/drainer/src/lib.rs b/crates/drainer/src/lib.rs index abb32c877962..909ae065e265 100644 --- a/crates/drainer/src/lib.rs +++ b/crates/drainer/src/lib.rs @@ -1,6 +1,7 @@ mod connection; pub mod errors; mod handler; +mod health_check; pub mod logger; pub(crate) mod metrics; mod query; @@ -11,6 +12,7 @@ mod types; mod utils; use std::sync::Arc; +use actix_web::dev::Server; use common_utils::signals::get_allowed_signals; use diesel_models::kv; use error_stack::{IntoReport, ResultExt}; @@ -18,7 +20,10 @@ use router_env::{instrument, tracing}; use tokio::sync::mpsc; use crate::{ - connection::pg_connection, services::Store, settings::DrainerSettings, types::StreamData, + connection::pg_connection, + services::Store, + settings::{DrainerSettings, Settings}, + types::StreamData, }; pub async fn start_drainer(store: Arc, conf: DrainerSettings) -> errors::DrainerResult<()> { @@ -49,3 +54,18 @@ pub async fn start_drainer(store: Arc, conf: DrainerSettings) -> errors:: Ok(()) } + +pub async fn start_web_server( + conf: Settings, + store: Arc, +) -> Result { + let server = conf.server.clone(); + let web_server = actix_web::HttpServer::new(move || { + actix_web::App::new().service(health_check::Health::server(conf.clone(), store.clone())) + }) + .bind((server.host.as_str(), server.port))? + .run(); + let _ = web_server.handle(); + + Ok(web_server) +} diff --git a/crates/drainer/src/main.rs b/crates/drainer/src/main.rs index 34c1294d55fa..838d50198ebd 100644 --- a/crates/drainer/src/main.rs +++ b/crates/drainer/src/main.rs @@ -1,4 +1,6 @@ -use drainer::{errors::DrainerResult, logger::logger, services, settings, start_drainer}; +use drainer::{ + errors::DrainerResult, logger::logger, services, settings, start_drainer, start_web_server, +}; #[tokio::main] async fn main() -> DrainerResult<()> { @@ -24,6 +26,13 @@ async fn main() -> DrainerResult<()> { [router_env::service_name!()], ); + #[allow(clippy::expect_used)] + let web_server = Box::pin(start_web_server(conf.clone(), store.clone())) + .await + .expect("Failed to create the server"); + + tokio::spawn(web_server); + logger::debug!(startup_config=?conf); logger::info!("Drainer started [{:?}] [{:?}]", conf.drainer, conf.log); diff --git a/crates/drainer/src/settings.rs b/crates/drainer/src/settings.rs index 8101abf5028e..7b0ff5db04b9 100644 --- a/crates/drainer/src/settings.rs +++ b/crates/drainer/src/settings.rs @@ -28,6 +28,7 @@ pub struct CmdLineConf { #[derive(Debug, Deserialize, Clone, Default)] #[serde(default)] pub struct Settings { + pub server: Server, pub master_database: Database, pub redis: redis::RedisSettings, pub log: Log, @@ -58,6 +59,27 @@ pub struct DrainerSettings { pub loop_interval: u32, // in milliseconds } +#[derive(Debug, Default, Deserialize, Clone)] +#[serde(default)] +pub struct Server { + pub port: u16, + pub workers: usize, + pub host: String, + pub request_body_limit: usize, + pub base_url: String, + pub shutdown_timeout: u64, +} + +impl Server { + pub fn validate(&self) -> Result<(), errors::DrainerError> { + common_utils::fp_utils::when(self.host.is_default_or_empty(), || { + Err(errors::DrainerError::ConfigParsingError( + "server host must not be empty".into(), + )) + }) + } +} + impl Default for Database { fn default() -> Self { Self { @@ -165,6 +187,7 @@ impl Settings { } pub fn validate(&self) -> Result<(), errors::DrainerError> { + self.server.validate()?; self.master_database.validate()?; self.redis.validate().map_err(|error| { println!("{error}"); From b4734507b58de702e8dfd908794b042333a75290 Mon Sep 17 00:00:00 2001 From: "hyperswitch-bot[bot]" <148525504+hyperswitch-bot[bot]@users.noreply.github.com> Date: Thu, 18 Jan 2024 18:15:03 +0000 Subject: [PATCH 02/14] chore: run formatter --- crates/drainer/src/health_check.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/drainer/src/health_check.rs b/crates/drainer/src/health_check.rs index cfe5c7736b4d..8297fa7fe1f3 100644 --- a/crates/drainer/src/health_check.rs +++ b/crates/drainer/src/health_check.rs @@ -1,7 +1,6 @@ use std::sync::Arc; use actix_web::{web, HttpResponse, Scope}; - use async_bb8_diesel::{AsyncConnection, AsyncRunQueryDsl}; use common_utils::errors::CustomResult; use diesel_models::{Config, ConfigNew}; From 076b0f9292820ac96234cdb71e79ad01429452ed Mon Sep 17 00:00:00 2001 From: dracarys18 Date: Mon, 22 Jan 2024 23:21:01 +0530 Subject: [PATCH 03/14] feat: add stream tests to drainer --- config/development.toml | 1 + crates/drainer/src/health_check.rs | 56 ++++++++++++++++++++++++++++-- crates/drainer/src/settings.rs | 15 +++++--- 3 files changed, 66 insertions(+), 6 deletions(-) diff --git a/config/development.toml b/config/development.toml index b23f68680e64..645a2e8de7db 100644 --- a/config/development.toml +++ b/config/development.toml @@ -50,6 +50,7 @@ max_feed_count = 200 [server] # HTTP Request body limit. Defaults to 32kB +host = "127.0.0.1" request_body_limit = 32768 [secrets] diff --git a/crates/drainer/src/health_check.rs b/crates/drainer/src/health_check.rs index 8297fa7fe1f3..0b5d1d7aead9 100644 --- a/crates/drainer/src/health_check.rs +++ b/crates/drainer/src/health_check.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -use actix_web::{web, HttpResponse, Scope}; +use actix_web::{body::BoxBody, web, HttpResponse, Scope}; use async_bb8_diesel::{AsyncConnection, AsyncRunQueryDsl}; use common_utils::errors::CustomResult; use diesel_models::{Config, ConfigNew}; @@ -13,6 +13,9 @@ use crate::{ settings::Settings, }; +pub const TEST_STREAM_NAME: &str = "TEST_STREAM_0"; +pub const TEST_STREAM_DATA: &[(&str, &str)] = &[("data", "sample_data")]; + pub struct Health; impl Health { @@ -21,7 +24,7 @@ impl Health { .app_data(web::Data::new(conf)) .app_data(web::Data::new(store)) .service(web::resource("").route(web::get().to(health))) - .service(web::resource("/deep_check").route(web::post().to(deep_health_check))) + .service(web::resource("/deep_check").route(web::get().to(deep_health_check))) } } @@ -159,6 +162,49 @@ impl HealthCheckInterface for Store { logger::debug!("Redis delete_key was successful"); + redis_conn + .stream_append_entry( + TEST_STREAM_NAME, + &redis_interface::RedisEntryId::AutoGeneratedID, + TEST_STREAM_DATA.to_vec(), + ) + .await + .change_context(HealthCheckRedisError::StreamAppendFailed)?; + + logger::debug!("Stream append succeded"); + + let output = self + .redis_conn + .stream_read_entries(TEST_STREAM_NAME, "0-0", Some(10)) + .await + .change_context(HealthCheckRedisError::StreamReadFailed)?; + logger::debug!("Stream read succeded"); + + let (_, id_to_trim) = output + .get(TEST_STREAM_NAME) + .and_then(|entries| { + entries + .last() + .map(|last_entry| (entries, last_entry.0.clone())) + }) + .ok_or(error_stack::report!( + HealthCheckRedisError::StreamReadFailed + ))?; + logger::debug!("Stream parse succeded"); + + redis_conn + .stream_trim_entries( + TEST_STREAM_NAME, + ( + redis_interface::StreamCapKind::MinID, + redis_interface::StreamCapTrim::Exact, + id_to_trim, + ), + ) + .await + .change_context(HealthCheckRedisError::StreamTrimFailed)?; + logger::debug!("Stream trim succeded"); + Ok(()) } } @@ -205,4 +251,10 @@ pub enum HealthCheckRedisError { GetFailed, #[error("Failed to delete key value in Redis")] DeleteFailed, + #[error("Failed to append data to the stream in Redis")] + StreamAppendFailed, + #[error("Failed to read data from the stream in Redis")] + StreamReadFailed, + #[error("Failed to trim data from the stream in Redis")] + StreamTrimFailed, } diff --git a/crates/drainer/src/settings.rs b/crates/drainer/src/settings.rs index 7b0ff5db04b9..7d1423fc8805 100644 --- a/crates/drainer/src/settings.rs +++ b/crates/drainer/src/settings.rs @@ -59,15 +59,12 @@ pub struct DrainerSettings { pub loop_interval: u32, // in milliseconds } -#[derive(Debug, Default, Deserialize, Clone)] +#[derive(Debug, Deserialize, Clone)] #[serde(default)] pub struct Server { pub port: u16, pub workers: usize, pub host: String, - pub request_body_limit: usize, - pub base_url: String, - pub shutdown_timeout: u64, } impl Server { @@ -106,6 +103,16 @@ impl Default for DrainerSettings { } } +impl Default for Server { + fn default() -> Self { + Self { + host: "127.0.0.1".to_string(), + port: 8080, + workers: 1, + } + } +} + impl Database { fn validate(&self) -> Result<(), errors::DrainerError> { use common_utils::fp_utils::when; From f5a2d70dcf5dba7fdd4e3ec221204d978209e1ab Mon Sep 17 00:00:00 2001 From: dracarys18 Date: Mon, 22 Jan 2024 23:30:14 +0530 Subject: [PATCH 04/14] fix: warning --- crates/drainer/src/health_check.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/drainer/src/health_check.rs b/crates/drainer/src/health_check.rs index 0b5d1d7aead9..7fc765cbb8bb 100644 --- a/crates/drainer/src/health_check.rs +++ b/crates/drainer/src/health_check.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -use actix_web::{body::BoxBody, web, HttpResponse, Scope}; +use actix_web::{web, HttpResponse, Scope}; use async_bb8_diesel::{AsyncConnection, AsyncRunQueryDsl}; use common_utils::errors::CustomResult; use diesel_models::{Config, ConfigNew}; From 37fc0a35081603b3e0326761dfa3fcfe466ab266 Mon Sep 17 00:00:00 2001 From: Chethan Date: Tue, 23 Jan 2024 14:54:31 +0530 Subject: [PATCH 05/14] address spell check --- config/development.toml | 1 - crates/drainer/src/health_check.rs | 8 ++++---- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/config/development.toml b/config/development.toml index 645a2e8de7db..b23f68680e64 100644 --- a/config/development.toml +++ b/config/development.toml @@ -50,7 +50,6 @@ max_feed_count = 200 [server] # HTTP Request body limit. Defaults to 32kB -host = "127.0.0.1" request_body_limit = 32768 [secrets] diff --git a/crates/drainer/src/health_check.rs b/crates/drainer/src/health_check.rs index 7fc765cbb8bb..c7033033976a 100644 --- a/crates/drainer/src/health_check.rs +++ b/crates/drainer/src/health_check.rs @@ -171,14 +171,14 @@ impl HealthCheckInterface for Store { .await .change_context(HealthCheckRedisError::StreamAppendFailed)?; - logger::debug!("Stream append succeded"); + logger::debug!("Stream append succeeded"); let output = self .redis_conn .stream_read_entries(TEST_STREAM_NAME, "0-0", Some(10)) .await .change_context(HealthCheckRedisError::StreamReadFailed)?; - logger::debug!("Stream read succeded"); + logger::debug!("Stream read succeeded"); let (_, id_to_trim) = output .get(TEST_STREAM_NAME) @@ -190,7 +190,7 @@ impl HealthCheckInterface for Store { .ok_or(error_stack::report!( HealthCheckRedisError::StreamReadFailed ))?; - logger::debug!("Stream parse succeded"); + logger::debug!("Stream parse succeeded"); redis_conn .stream_trim_entries( @@ -203,7 +203,7 @@ impl HealthCheckInterface for Store { ) .await .change_context(HealthCheckRedisError::StreamTrimFailed)?; - logger::debug!("Stream trim succeded"); + logger::debug!("Stream trim succeeded"); Ok(()) } From e66bfb80b75570c9f7e9de48a124f93076ad3415 Mon Sep 17 00:00:00 2001 From: dracarys18 Date: Wed, 24 Jan 2024 13:07:42 +0530 Subject: [PATCH 06/14] feat: add deep health check for analytics --- crates/analytics/src/clickhouse.rs | 13 +++++++++++++ crates/analytics/src/health_check.rs | 7 +++++++ crates/analytics/src/lib.rs | 1 + crates/analytics/src/sqlx.rs | 12 ++++++++++++ crates/api_models/src/health_check.rs | 1 + crates/router/src/db/health_check.rs | 27 +++++++++++++++++++++++++++ crates/router/src/db/kafka_store.rs | 6 ++++++ crates/router/src/routes/health.rs | 9 +++++++++ crates/storage_impl/src/errors.rs | 4 ++++ 9 files changed, 80 insertions(+) create mode 100644 crates/analytics/src/health_check.rs diff --git a/crates/analytics/src/clickhouse.rs b/crates/analytics/src/clickhouse.rs index f81c29c801c0..bade1b178ab0 100644 --- a/crates/analytics/src/clickhouse.rs +++ b/crates/analytics/src/clickhouse.rs @@ -7,6 +7,7 @@ use router_env::logger; use time::PrimitiveDateTime; use super::{ + health_check::HealthCheck, payments::{ distribution::PaymentDistributionRow, filters::FilterRow, metrics::PaymentMetricRow, }, @@ -93,6 +94,18 @@ impl ClickhouseClient { } } +#[async_trait::async_trait] +impl HealthCheck for ClickhouseClient { + async fn deep_health_check( + &self, + ) -> common_utils::errors::CustomResult<(), QueryExecutionError> { + self.execute_query("SELECT 1") + .await + .map(|_| ()) + .change_context(QueryExecutionError::DatabaseError) + } +} + #[async_trait::async_trait] impl AnalyticsDataSource for ClickhouseClient { type Row = serde_json::Value; diff --git a/crates/analytics/src/health_check.rs b/crates/analytics/src/health_check.rs new file mode 100644 index 000000000000..d9f95848723a --- /dev/null +++ b/crates/analytics/src/health_check.rs @@ -0,0 +1,7 @@ +use crate::types::QueryExecutionError; +use common_utils::errors::CustomResult; + +#[async_trait::async_trait] +pub trait HealthCheck { + async fn deep_health_check(&self) -> CustomResult<(), QueryExecutionError>; +} diff --git a/crates/analytics/src/lib.rs b/crates/analytics/src/lib.rs index 501bd58527c3..a4e925519ceb 100644 --- a/crates/analytics/src/lib.rs +++ b/crates/analytics/src/lib.rs @@ -8,6 +8,7 @@ pub mod refunds; pub mod api_event; pub mod connector_events; +pub mod health_check; pub mod outgoing_webhook_event; pub mod sdk_events; mod sqlx; diff --git a/crates/analytics/src/sqlx.rs b/crates/analytics/src/sqlx.rs index 7ab8a2aa4bc5..562a3a1f64d1 100644 --- a/crates/analytics/src/sqlx.rs +++ b/crates/analytics/src/sqlx.rs @@ -17,6 +17,7 @@ use storage_impl::config::Database; use time::PrimitiveDateTime; use super::{ + health_check::HealthCheck, query::{Aggregate, ToSql, Window}, types::{ AnalyticsCollection, AnalyticsDataSource, DBEnumWrapper, LoadRow, QueryExecutionError, @@ -164,6 +165,17 @@ impl AnalyticsDataSource for SqlxClient { .change_context(QueryExecutionError::RowExtractionFailure) } } +#[async_trait::async_trait] +impl HealthCheck for SqlxClient { + async fn deep_health_check(&self) -> CustomResult<(), QueryExecutionError> { + sqlx::query("SELECT 1") + .fetch_all(&self.pool) + .await + .map(|_| ()) + .into_report() + .change_context(QueryExecutionError::DatabaseError) + } +} impl<'a> FromRow<'a, PgRow> for super::refunds::metrics::RefundMetricRow { fn from_row(row: &'a PgRow) -> sqlx::Result { diff --git a/crates/api_models/src/health_check.rs b/crates/api_models/src/health_check.rs index d7bb120d0176..529ada52db7e 100644 --- a/crates/api_models/src/health_check.rs +++ b/crates/api_models/src/health_check.rs @@ -3,4 +3,5 @@ pub struct RouterHealthCheckResponse { pub database: String, pub redis: String, pub locker: String, + pub analytics: String, } diff --git a/crates/router/src/db/health_check.rs b/crates/router/src/db/health_check.rs index 73bc2a4321d7..44db5dfaed9d 100644 --- a/crates/router/src/db/health_check.rs +++ b/crates/router/src/db/health_check.rs @@ -1,3 +1,4 @@ +use analytics::health_check::HealthCheck; use async_bb8_diesel::{AsyncConnection, AsyncRunQueryDsl}; use diesel_models::ConfigNew; use error_stack::ResultExt; @@ -24,6 +25,10 @@ pub trait HealthCheckInterface { &self, state: &routes::AppState, ) -> CustomResult<(), errors::HealthCheckLockerError>; + async fn health_check_analytics( + &self, + analytics: &analytics::AnalyticsProvider, + ) -> CustomResult<(), errors::HealthCheckDBError>; } #[async_trait::async_trait] @@ -123,6 +128,22 @@ impl HealthCheckInterface for Store { Ok(()) } + async fn health_check_analytics( + &self, + analytics: &analytics::AnalyticsProvider, + ) -> CustomResult<(), errors::HealthCheckDBError> { + match analytics { + analytics::AnalyticsProvider::Sqlx(client) => client + .deep_health_check() + .await + .change_context(errors::HealthCheckDBError::SqlxAnalyticsError), + analytics::AnalyticsProvider::Clickhouse(client) => client + .deep_health_check() + .await + .change_context(errors::HealthCheckDBError::ClickhouseAnalyticsError), + _ => Ok(()), + } + } } #[async_trait::async_trait] @@ -144,4 +165,10 @@ impl HealthCheckInterface for MockDb { ) -> CustomResult<(), errors::HealthCheckLockerError> { Ok(()) } + async fn health_check_analytics( + &self, + _analytics: &analytics::AnalyticsProvider, + ) -> CustomResult<(), errors::HealthCheckDBError> { + Ok(()) + } } diff --git a/crates/router/src/db/kafka_store.rs b/crates/router/src/db/kafka_store.rs index 8398c153156d..73ab50ff13dd 100644 --- a/crates/router/src/db/kafka_store.rs +++ b/crates/router/src/db/kafka_store.rs @@ -2188,4 +2188,10 @@ impl HealthCheckInterface for KafkaStore { ) -> CustomResult<(), errors::HealthCheckLockerError> { self.diesel_store.health_check_locker(state).await } + async fn health_check_analytics( + &self, + analytics: &analytics::AnalyticsProvider, + ) -> CustomResult<(), errors::HealthCheckDBError> { + self.diesel_store.health_check_analytics(analytics).await + } } diff --git a/crates/router/src/routes/health.rs b/crates/router/src/routes/health.rs index f07b744f7f52..3f963c4ed95f 100644 --- a/crates/router/src/routes/health.rs +++ b/crates/router/src/routes/health.rs @@ -54,12 +54,21 @@ pub async fn deep_health_check(state: web::Data) -> impl actix_we } }; + let analytics_status = match db.health_check_analytics(&state.pool).await { + Ok(_) => "Health is good".to_string(), + Err(err) => { + status_code = 500; + err.to_string() + } + }; + logger::debug!("Locker health check end"); let response = serde_json::to_string(&RouterHealthCheckResponse { database: db_status, redis: redis_status, locker: locker_status, + analytics: analytics_status, }) .unwrap_or_default(); diff --git a/crates/storage_impl/src/errors.rs b/crates/storage_impl/src/errors.rs index ac3a04e85b2b..2adcdcf8d2e7 100644 --- a/crates/storage_impl/src/errors.rs +++ b/crates/storage_impl/src/errors.rs @@ -394,6 +394,10 @@ pub enum HealthCheckDBError { UnknownError, #[error("Error in database transaction")] TransactionError, + #[error("Error while executing query in Sqlx Analytics")] + SqlxAnalyticsError, + #[error("Error while executing query in Clickhouse Analytics")] + ClickhouseAnalyticsError, } impl From for HealthCheckDBError { From b79ea4906b5432f9b7fe6144c8b658736896b16f Mon Sep 17 00:00:00 2001 From: "hyperswitch-bot[bot]" <148525504+hyperswitch-bot[bot]@users.noreply.github.com> Date: Wed, 24 Jan 2024 07:41:37 +0000 Subject: [PATCH 07/14] chore: run formatter --- crates/analytics/src/health_check.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/crates/analytics/src/health_check.rs b/crates/analytics/src/health_check.rs index d9f95848723a..f566aecf10bd 100644 --- a/crates/analytics/src/health_check.rs +++ b/crates/analytics/src/health_check.rs @@ -1,6 +1,7 @@ -use crate::types::QueryExecutionError; use common_utils::errors::CustomResult; +use crate::types::QueryExecutionError; + #[async_trait::async_trait] pub trait HealthCheck { async fn deep_health_check(&self) -> CustomResult<(), QueryExecutionError>; From 6b06075677b132928ddc5cb54ae5fccfea71b549 Mon Sep 17 00:00:00 2001 From: dracarys18 Date: Wed, 24 Jan 2024 15:19:39 +0530 Subject: [PATCH 08/14] refactor(health): refactor deep health check --- crates/api_models/src/health_check.rs | 10 +- crates/router/src/core.rs | 1 + crates/router/src/core/health_check.rs | 108 +++++++++++++++ crates/router/src/db.rs | 2 +- crates/router/src/db/health_check.rs | 174 +++++-------------------- crates/router/src/db/kafka_store.rs | 26 +--- crates/router/src/routes/app.rs | 2 +- crates/router/src/routes/health.rs | 100 +++++++------- crates/router/src/routes/lock_utils.rs | 2 + crates/router_env/src/logger/types.rs | 2 + 10 files changed, 215 insertions(+), 212 deletions(-) create mode 100644 crates/router/src/core/health_check.rs diff --git a/crates/api_models/src/health_check.rs b/crates/api_models/src/health_check.rs index 529ada52db7e..70c4fe96304c 100644 --- a/crates/api_models/src/health_check.rs +++ b/crates/api_models/src/health_check.rs @@ -1,7 +1,9 @@ #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct RouterHealthCheckResponse { - pub database: String, - pub redis: String, - pub locker: String, - pub analytics: String, + pub database: bool, + pub redis: bool, + pub locker: bool, + pub analytics: bool, } + +impl common_utils::events::ApiEventMetric for RouterHealthCheckResponse {} diff --git a/crates/router/src/core.rs b/crates/router/src/core.rs index 5ae4b0be33da..9bdc493e0786 100644 --- a/crates/router/src/core.rs +++ b/crates/router/src/core.rs @@ -17,6 +17,7 @@ pub mod files; #[cfg(feature = "frm")] pub mod fraud_check; pub mod gsm; +pub mod health_check; pub mod locker_migration; pub mod mandate; pub mod metrics; diff --git a/crates/router/src/core/health_check.rs b/crates/router/src/core/health_check.rs new file mode 100644 index 000000000000..d0ffe7ec3f3d --- /dev/null +++ b/crates/router/src/core/health_check.rs @@ -0,0 +1,108 @@ +use crate::routes::app; + +use analytics::health_check::HealthCheck; +use error_stack::ResultExt; +use router_env::logger; + +use crate::{ + consts::LOCKER_HEALTH_CALL_PATH, + core::errors::{self, CustomResult}, + services::api as services, +}; + +#[async_trait::async_trait] +pub trait HealthCheckInterface { + async fn health_check_db(&self) -> CustomResult<(), errors::HealthCheckDBError>; + async fn health_check_redis(&self) -> CustomResult<(), errors::HealthCheckRedisError>; + async fn health_check_locker(&self) -> CustomResult<(), errors::HealthCheckLockerError>; + async fn health_check_analytics(&self) -> CustomResult<(), errors::HealthCheckDBError>; +} + +#[async_trait::async_trait] +impl HealthCheckInterface for app::AppState { + async fn health_check_db(&self) -> CustomResult<(), errors::HealthCheckDBError> { + let db = &*self.store; + db.health_check_db().await?; + Ok(()) + } + + async fn health_check_redis(&self) -> CustomResult<(), errors::HealthCheckRedisError> { + let db = &*self.store; + let redis_conn = db + .get_redis_conn() + .change_context(errors::HealthCheckRedisError::RedisConnectionError)?; + + redis_conn + .serialize_and_set_key_with_expiry("test_key", "test_value", 30) + .await + .change_context(errors::HealthCheckRedisError::SetFailed)?; + + logger::debug!("Redis set_key was successful"); + + redis_conn + .get_key("test_key") + .await + .change_context(errors::HealthCheckRedisError::GetFailed)?; + + logger::debug!("Redis get_key was successful"); + + redis_conn + .delete_key("test_key") + .await + .change_context(errors::HealthCheckRedisError::DeleteFailed)?; + + logger::debug!("Redis delete_key was successful"); + + Ok(()) + } + + async fn health_check_locker(&self) -> CustomResult<(), errors::HealthCheckLockerError> { + let locker = &self.conf.locker; + if !locker.mock_locker { + let mut url = locker.host_rs.to_owned(); + url.push_str(LOCKER_HEALTH_CALL_PATH); + let request = services::Request::new(services::Method::Get, &url); + services::call_connector_api(self, request) + .await + .change_context(errors::HealthCheckLockerError::FailedToCallLocker)? + .ok(); + } + + logger::debug!("Locker call was successful"); + + Ok(()) + } + async fn health_check_analytics(&self) -> CustomResult<(), errors::HealthCheckDBError> { + let analytics = &self.pool; + match analytics { + analytics::AnalyticsProvider::Sqlx(client) => client + .deep_health_check() + .await + .change_context(errors::HealthCheckDBError::SqlxAnalyticsError), + analytics::AnalyticsProvider::Clickhouse(client) => client + .deep_health_check() + .await + .change_context(errors::HealthCheckDBError::ClickhouseAnalyticsError), + analytics::AnalyticsProvider::CombinedCkh(sqlx_client, ckh_client) => { + sqlx_client + .deep_health_check() + .await + .change_context(errors::HealthCheckDBError::SqlxAnalyticsError)?; + ckh_client + .deep_health_check() + .await + .change_context(errors::HealthCheckDBError::ClickhouseAnalyticsError) + } + analytics::AnalyticsProvider::CombinedSqlx(sqlx_client, ckh_client) => { + sqlx_client + .deep_health_check() + .await + .change_context(errors::HealthCheckDBError::SqlxAnalyticsError)?; + ckh_client + .deep_health_check() + .await + .change_context(errors::HealthCheckDBError::ClickhouseAnalyticsError) + } + } + } +} diff --git a/crates/router/src/db.rs b/crates/router/src/db.rs index b9d346b7a71f..549001772464 100644 --- a/crates/router/src/db.rs +++ b/crates/router/src/db.rs @@ -110,7 +110,7 @@ pub trait StorageInterface: + user_role::UserRoleInterface + authorization::AuthorizationInterface + user::sample_data::BatchSampleDataInterface - + health_check::HealthCheckInterface + + health_check::HealthCheckDbInterface + 'static { fn get_scheduler_db(&self) -> Box; diff --git a/crates/router/src/db/health_check.rs b/crates/router/src/db/health_check.rs index 44db5dfaed9d..74556c93e43c 100644 --- a/crates/router/src/db/health_check.rs +++ b/crates/router/src/db/health_check.rs @@ -1,174 +1,70 @@ -use analytics::health_check::HealthCheck; -use async_bb8_diesel::{AsyncConnection, AsyncRunQueryDsl}; -use diesel_models::ConfigNew; +use async_bb8_diesel::AsyncConnection; use error_stack::ResultExt; -use router_env::logger; -use super::{MockDb, StorageInterface, Store}; +use super::{MockDb, Store}; use crate::{ connection, - consts::LOCKER_HEALTH_CALL_PATH, core::errors::{self, CustomResult}, - routes, - services::api as services, types::storage, }; +use diesel_models::ConfigNew; +use router_env::logger; + +use async_bb8_diesel::AsyncRunQueryDsl; #[async_trait::async_trait] -pub trait HealthCheckInterface { +pub trait HealthCheckDbInterface { async fn health_check_db(&self) -> CustomResult<(), errors::HealthCheckDBError>; - async fn health_check_redis( - &self, - db: &dyn StorageInterface, - ) -> CustomResult<(), errors::HealthCheckRedisError>; - async fn health_check_locker( - &self, - state: &routes::AppState, - ) -> CustomResult<(), errors::HealthCheckLockerError>; - async fn health_check_analytics( - &self, - analytics: &analytics::AnalyticsProvider, - ) -> CustomResult<(), errors::HealthCheckDBError>; } #[async_trait::async_trait] -impl HealthCheckInterface for Store { +impl HealthCheckDbInterface for Store { async fn health_check_db(&self) -> CustomResult<(), errors::HealthCheckDBError> { let conn = connection::pg_connection_write(self) .await .change_context(errors::HealthCheckDBError::DBError)?; - let _data = conn - .transaction_async(|conn| { - Box::pin(async move { - let query = - diesel::select(diesel::dsl::sql::("1 + 1")); - let _x: i32 = query.get_result_async(&conn).await.map_err(|err| { - logger::error!(read_err=?err,"Error while reading element in the database"); - errors::HealthCheckDBError::DBReadError - })?; - - logger::debug!("Database read was successful"); - - let config = ConfigNew { - key: "test_key".to_string(), - config: "test_value".to_string(), - }; - - config.insert(&conn).await.map_err(|err| { - logger::error!(write_err=?err,"Error while writing to database"); - errors::HealthCheckDBError::DBWriteError - })?; - - logger::debug!("Database write was successful"); - - storage::Config::delete_by_key(&conn, "test_key").await.map_err(|err| { - logger::error!(delete_err=?err,"Error while deleting element in the database"); - errors::HealthCheckDBError::DBDeleteError - })?; - - logger::debug!("Database delete was successful"); - - Ok::<_, errors::HealthCheckDBError>(()) - }) - }) - .await?; - - Ok(()) - } - - async fn health_check_redis( - &self, - db: &dyn StorageInterface, - ) -> CustomResult<(), errors::HealthCheckRedisError> { - let redis_conn = db - .get_redis_conn() - .change_context(errors::HealthCheckRedisError::RedisConnectionError)?; - - redis_conn - .serialize_and_set_key_with_expiry("test_key", "test_value", 30) - .await - .change_context(errors::HealthCheckRedisError::SetFailed)?; - - logger::debug!("Redis set_key was successful"); + conn.transaction_async(|conn| async move { + let query = diesel::select(diesel::dsl::sql::("1 + 1")); + let _x: i32 = query.get_result_async(&conn).await.map_err(|err| { + logger::error!(read_err=?err,"Error while reading element in the database"); + errors::HealthCheckDBError::DBReadError + })?; - redis_conn - .get_key("test_key") - .await - .change_context(errors::HealthCheckRedisError::GetFailed)?; + logger::debug!("Database read was successful"); - logger::debug!("Redis get_key was successful"); + let config = ConfigNew { + key: "test_key".to_string(), + config: "test_value".to_string(), + }; - redis_conn - .delete_key("test_key") - .await - .change_context(errors::HealthCheckRedisError::DeleteFailed)?; + config.insert(&conn).await.map_err(|err| { + logger::error!(write_err=?err,"Error while writing to database"); + errors::HealthCheckDBError::DBWriteError + })?; - logger::debug!("Redis delete_key was successful"); + logger::debug!("Database write was successful"); - Ok(()) - } - - async fn health_check_locker( - &self, - state: &routes::AppState, - ) -> CustomResult<(), errors::HealthCheckLockerError> { - let locker = &state.conf.locker; - if !locker.mock_locker { - let mut url = locker.host_rs.to_owned(); - url.push_str(LOCKER_HEALTH_CALL_PATH); - let request = services::Request::new(services::Method::Get, &url); - services::call_connector_api(state, request) + storage::Config::delete_by_key(&conn, "test_key") .await - .change_context(errors::HealthCheckLockerError::FailedToCallLocker)? - .ok(); - } + .map_err(|err| { + logger::error!(delete_err=?err,"Error while deleting element in the database"); + errors::HealthCheckDBError::DBDeleteError + })?; + + logger::debug!("Database delete was successful"); - logger::debug!("Locker call was successful"); + Ok::<_, errors::HealthCheckDBError>(()) + }) + .await?; Ok(()) } - async fn health_check_analytics( - &self, - analytics: &analytics::AnalyticsProvider, - ) -> CustomResult<(), errors::HealthCheckDBError> { - match analytics { - analytics::AnalyticsProvider::Sqlx(client) => client - .deep_health_check() - .await - .change_context(errors::HealthCheckDBError::SqlxAnalyticsError), - analytics::AnalyticsProvider::Clickhouse(client) => client - .deep_health_check() - .await - .change_context(errors::HealthCheckDBError::ClickhouseAnalyticsError), - _ => Ok(()), - } - } } #[async_trait::async_trait] -impl HealthCheckInterface for MockDb { +impl HealthCheckDbInterface for MockDb { async fn health_check_db(&self) -> CustomResult<(), errors::HealthCheckDBError> { Ok(()) } - - async fn health_check_redis( - &self, - _: &dyn StorageInterface, - ) -> CustomResult<(), errors::HealthCheckRedisError> { - Ok(()) - } - - async fn health_check_locker( - &self, - _: &routes::AppState, - ) -> CustomResult<(), errors::HealthCheckLockerError> { - Ok(()) - } - async fn health_check_analytics( - &self, - _analytics: &analytics::AnalyticsProvider, - ) -> CustomResult<(), errors::HealthCheckDBError> { - Ok(()) - } } diff --git a/crates/router/src/db/kafka_store.rs b/crates/router/src/db/kafka_store.rs index 73ab50ff13dd..d8dc115aba41 100644 --- a/crates/router/src/db/kafka_store.rs +++ b/crates/router/src/db/kafka_store.rs @@ -27,6 +27,7 @@ use super::{ user::{sample_data::BatchSampleDataInterface, UserInterface}, user_role::UserRoleInterface, }; + use crate::{ core::errors::{self, ProcessTrackerError}, db::{ @@ -43,7 +44,7 @@ use crate::{ events::EventInterface, file::FileMetadataInterface, gsm::GsmInterface, - health_check::HealthCheckInterface, + health_check::HealthCheckDbInterface, locker_mock_up::LockerMockUpInterface, mandate::MandateInterface, merchant_account::MerchantAccountInterface, @@ -58,7 +59,6 @@ use crate::{ routing_algorithm::RoutingAlgorithmInterface, MasterKeyInterface, StorageInterface, }, - routes, services::{authentication, kafka::KafkaProducer, Store}, types::{ domain, @@ -2170,28 +2170,8 @@ impl AuthorizationInterface for KafkaStore { } #[async_trait::async_trait] -impl HealthCheckInterface for KafkaStore { +impl HealthCheckDbInterface for KafkaStore { async fn health_check_db(&self) -> CustomResult<(), errors::HealthCheckDBError> { self.diesel_store.health_check_db().await } - - async fn health_check_redis( - &self, - db: &dyn StorageInterface, - ) -> CustomResult<(), errors::HealthCheckRedisError> { - self.diesel_store.health_check_redis(db).await - } - - async fn health_check_locker( - &self, - state: &routes::AppState, - ) -> CustomResult<(), errors::HealthCheckLockerError> { - self.diesel_store.health_check_locker(state).await - } - async fn health_check_analytics( - &self, - analytics: &analytics::AnalyticsProvider, - ) -> CustomResult<(), errors::HealthCheckDBError> { - self.diesel_store.health_check_analytics(analytics).await - } } diff --git a/crates/router/src/routes/app.rs b/crates/router/src/routes/app.rs index 4345109a6724..db664900619f 100644 --- a/crates/router/src/routes/app.rs +++ b/crates/router/src/routes/app.rs @@ -260,7 +260,7 @@ impl Health { web::scope("health") .app_data(web::Data::new(state)) .service(web::resource("").route(web::get().to(health))) - .service(web::resource("/deep_check").route(web::post().to(deep_health_check))) + .service(web::resource("/ready").route(web::get().to(deep_health_check))) } } diff --git a/crates/router/src/routes/health.rs b/crates/router/src/routes/health.rs index 3f963c4ed95f..2ea36919cb68 100644 --- a/crates/router/src/routes/health.rs +++ b/crates/router/src/routes/health.rs @@ -1,9 +1,16 @@ -use actix_web::web; +use actix_web::{web, HttpRequest}; use api_models::health_check::RouterHealthCheckResponse; -use router_env::{instrument, logger, tracing}; +use router_env::{instrument, logger, tracing, Flow}; + +use error_stack::ResultExt; use super::app; -use crate::{routes::metrics, services}; +use crate::{ + core::{api_locking, health_check::HealthCheckInterface}, + errors::{self, RouterResponse}, + routes::metrics, + services::{api, authentication as auth}, +}; /// . // #[logger::instrument(skip_all, name = "name1", level = "warn", fields( key1 = "val1" ))] #[instrument(skip_all)] @@ -14,67 +21,72 @@ pub async fn health() -> impl actix_web::Responder { actix_web::HttpResponse::Ok().body("health is good") } -#[instrument(skip_all)] -pub async fn deep_health_check(state: web::Data) -> impl actix_web::Responder { +#[instrument(skip_all, fields(flow = ?Flow::DeepHealthCheck))] +pub async fn deep_health_check( + state: web::Data, + request: HttpRequest, +) -> impl actix_web::Responder { metrics::HEALTH_METRIC.add(&metrics::CONTEXT, 1, &[]); - let db = &*state.store; - let mut status_code = 200; + + let flow = Flow::DeepHealthCheck; + + Box::pin(api::server_wrap( + flow, + state, + &request, + (), + |state, _, _| deep_health_check_func(state), + &auth::NoAuth, + api_locking::LockAction::NotApplicable, + )) + .await +} + +async fn deep_health_check_func(state: app::AppState) -> RouterResponse { logger::info!("Deep health check was called"); logger::debug!("Database health check begin"); - let db_status = match db.health_check_db().await { - Ok(_) => "Health is good".to_string(), - Err(err) => { - status_code = 500; - err.to_string() - } - }; + let db_status = state + .health_check_db() + .await + .map(|_| true) + .change_context(errors::ApiErrorResponse::InternalServerError)?; + logger::debug!("Database health check end"); logger::debug!("Redis health check begin"); - let redis_status = match db.health_check_redis(db).await { - Ok(_) => "Health is good".to_string(), - Err(err) => { - status_code = 500; - err.to_string() - } - }; + let redis_status = state + .health_check_redis() + .await + .map(|_| true) + .change_context(errors::ApiErrorResponse::InternalServerError)?; logger::debug!("Redis health check end"); logger::debug!("Locker health check begin"); - let locker_status = match db.health_check_locker(&state).await { - Ok(_) => "Health is good".to_string(), - Err(err) => { - status_code = 500; - err.to_string() - } - }; + let locker_status = state + .health_check_locker() + .await + .map(|_| true) + .change_context(errors::ApiErrorResponse::InternalServerError)?; - let analytics_status = match db.health_check_analytics(&state.pool).await { - Ok(_) => "Health is good".to_string(), - Err(err) => { - status_code = 500; - err.to_string() - } - }; + let analytics_status = state + .health_check_analytics() + .await + .map(|_| true) + .change_context(errors::ApiErrorResponse::InternalServerError)?; logger::debug!("Locker health check end"); - let response = serde_json::to_string(&RouterHealthCheckResponse { + let response = RouterHealthCheckResponse { database: db_status, redis: redis_status, locker: locker_status, analytics: analytics_status, - }) - .unwrap_or_default(); - - if status_code == 200 { - services::http_response_json(response) - } else { - services::http_server_error_json_response(response) - } + }; + + Ok(api::ApplicationResponse::Json(response)) } diff --git a/crates/router/src/routes/lock_utils.rs b/crates/router/src/routes/lock_utils.rs index 1c967222dc7f..a24c5dc66871 100644 --- a/crates/router/src/routes/lock_utils.rs +++ b/crates/router/src/routes/lock_utils.rs @@ -11,6 +11,7 @@ pub enum ApiIdentifier { Configs, Customers, Ephemeral, + Health, Mandates, PaymentMethods, PaymentMethodAuth, @@ -83,6 +84,7 @@ impl From for ApiIdentifier { Flow::EphemeralKeyCreate | Flow::EphemeralKeyDelete => Self::Ephemeral, + Flow::DeepHealthCheck => Self::Health, Flow::MandatesRetrieve | Flow::MandatesRevoke | Flow::MandatesList => Self::Mandates, Flow::PaymentMethodsCreate diff --git a/crates/router_env/src/logger/types.rs b/crates/router_env/src/logger/types.rs index ba323ebc5e3f..dbbc3cdc2772 100644 --- a/crates/router_env/src/logger/types.rs +++ b/crates/router_env/src/logger/types.rs @@ -54,6 +54,8 @@ pub enum Tag { /// API Flow #[derive(Debug, Display, Clone, PartialEq, Eq)] pub enum Flow { + /// Deep health Check + DeepHealthCheck, /// Merchants account create flow. MerchantsAccountCreate, /// Merchants account retrieve flow. From 1bcd77420e67594ffe44ce749af1e9e0c9e6fb88 Mon Sep 17 00:00:00 2001 From: "hyperswitch-bot[bot]" <148525504+hyperswitch-bot[bot]@users.noreply.github.com> Date: Mon, 29 Jan 2024 17:19:47 +0000 Subject: [PATCH 09/14] chore: run formatter --- crates/router/src/core/health_check.rs | 3 +-- crates/router/src/db/health_check.rs | 8 +++----- crates/router/src/db/kafka_store.rs | 1 - crates/router/src/routes/health.rs | 3 +-- 4 files changed, 5 insertions(+), 10 deletions(-) diff --git a/crates/router/src/core/health_check.rs b/crates/router/src/core/health_check.rs index d0ffe7ec3f3d..e8215dd4bee0 100644 --- a/crates/router/src/core/health_check.rs +++ b/crates/router/src/core/health_check.rs @@ -1,5 +1,3 @@ -use crate::routes::app; - use analytics::health_check::HealthCheck; use error_stack::ResultExt; use router_env::logger; @@ -7,6 +5,7 @@ use router_env::logger; use crate::{ consts::LOCKER_HEALTH_CALL_PATH, core::errors::{self, CustomResult}, + routes::app, services::api as services, }; diff --git a/crates/router/src/db/health_check.rs b/crates/router/src/db/health_check.rs index 74556c93e43c..6ebc9dfff5ad 100644 --- a/crates/router/src/db/health_check.rs +++ b/crates/router/src/db/health_check.rs @@ -1,5 +1,7 @@ -use async_bb8_diesel::AsyncConnection; +use async_bb8_diesel::{AsyncConnection, AsyncRunQueryDsl}; +use diesel_models::ConfigNew; use error_stack::ResultExt; +use router_env::logger; use super::{MockDb, Store}; use crate::{ @@ -7,10 +9,6 @@ use crate::{ core::errors::{self, CustomResult}, types::storage, }; -use diesel_models::ConfigNew; -use router_env::logger; - -use async_bb8_diesel::AsyncRunQueryDsl; #[async_trait::async_trait] pub trait HealthCheckDbInterface { diff --git a/crates/router/src/db/kafka_store.rs b/crates/router/src/db/kafka_store.rs index d3b90f1026d3..665a920bcada 100644 --- a/crates/router/src/db/kafka_store.rs +++ b/crates/router/src/db/kafka_store.rs @@ -27,7 +27,6 @@ use super::{ user::{sample_data::BatchSampleDataInterface, UserInterface}, user_role::UserRoleInterface, }; - use crate::{ core::errors::{self, ProcessTrackerError}, db::{ diff --git a/crates/router/src/routes/health.rs b/crates/router/src/routes/health.rs index 2ea36919cb68..20b39a44401f 100644 --- a/crates/router/src/routes/health.rs +++ b/crates/router/src/routes/health.rs @@ -1,8 +1,7 @@ use actix_web::{web, HttpRequest}; use api_models::health_check::RouterHealthCheckResponse; -use router_env::{instrument, logger, tracing, Flow}; - use error_stack::ResultExt; +use router_env::{instrument, logger, tracing, Flow}; use super::app; use crate::{ From 3e67fbf6429fb61a222199ea0d178a516730b9c6 Mon Sep 17 00:00:00 2001 From: dracarys18 Date: Tue, 30 Jan 2024 12:07:16 +0530 Subject: [PATCH 10/14] fix: fix health check error --- .../router/src/compatibility/stripe/errors.rs | 3 +- .../src/core/errors/api_error_response.rs | 5 +++ crates/router/src/core/errors/transformers.rs | 5 ++- crates/router/src/routes/health.rs | 34 +++++++++++++------ 4 files changed, 35 insertions(+), 12 deletions(-) diff --git a/crates/router/src/compatibility/stripe/errors.rs b/crates/router/src/compatibility/stripe/errors.rs index 63205ea68ca6..759e968125ff 100644 --- a/crates/router/src/compatibility/stripe/errors.rs +++ b/crates/router/src/compatibility/stripe/errors.rs @@ -468,7 +468,8 @@ impl From for StripeErrorCode { errors::ApiErrorResponse::MandateUpdateFailed | errors::ApiErrorResponse::MandateSerializationFailed | errors::ApiErrorResponse::MandateDeserializationFailed - | errors::ApiErrorResponse::InternalServerError => Self::InternalServerError, // not a stripe code + | errors::ApiErrorResponse::InternalServerError + | errors::ApiErrorResponse::HealthCheckError { .. } => Self::InternalServerError, // not a stripe code errors::ApiErrorResponse::ExternalConnectorError { code, message, diff --git a/crates/router/src/core/errors/api_error_response.rs b/crates/router/src/core/errors/api_error_response.rs index 54ec4ec1e295..023e1f4b7fb3 100644 --- a/crates/router/src/core/errors/api_error_response.rs +++ b/crates/router/src/core/errors/api_error_response.rs @@ -238,6 +238,11 @@ pub enum ApiErrorResponse { WebhookInvalidMerchantSecret, #[error(error_type = ErrorType::InvalidRequestError, code = "IR_19", message = "{message}")] CurrencyNotSupported { message: String }, + #[error(error_type = ErrorType::ServerNotAvailable, code= "HE_00", message = "{component} health check is failiing with error: {message}")] + HealthCheckError { + component: &'static str, + message: String, + }, #[error(error_type = ErrorType::InvalidRequestError, code = "IR_24", message = "Merchant connector account is configured with invalid {config}")] InvalidConnectorConfiguration { config: String }, #[error(error_type = ErrorType::ValidationError, code = "HE_01", message = "Failed to convert currency to minor unit")] diff --git a/crates/router/src/core/errors/transformers.rs b/crates/router/src/core/errors/transformers.rs index ff764cafed62..0119335b7c45 100644 --- a/crates/router/src/core/errors/transformers.rs +++ b/crates/router/src/core/errors/transformers.rs @@ -123,7 +123,10 @@ impl ErrorSwitch for ApiErrorRespon }, Self::MandateUpdateFailed | Self::MandateSerializationFailed | Self::MandateDeserializationFailed | Self::InternalServerError => { AER::InternalServerError(ApiError::new("HE", 0, "Something went wrong", None)) - } + }, + Self::HealthCheckError { message,component } => { + AER::InternalServerError(ApiError::new("HE",0,format!("{} health check failed with error: {}",component,message),None)) + }, Self::PayoutFailed { data } => { AER::BadRequest(ApiError::new("CE", 4, "Payout failed while processing with connector.", Some(Extra { data: data.clone(), ..Default::default()}))) }, diff --git a/crates/router/src/routes/health.rs b/crates/router/src/routes/health.rs index 2ea36919cb68..83454ccb590b 100644 --- a/crates/router/src/routes/health.rs +++ b/crates/router/src/routes/health.rs @@ -2,8 +2,6 @@ use actix_web::{web, HttpRequest}; use api_models::health_check::RouterHealthCheckResponse; use router_env::{instrument, logger, tracing, Flow}; -use error_stack::ResultExt; - use super::app; use crate::{ core::{api_locking, health_check::HealthCheckInterface}, @@ -47,11 +45,12 @@ async fn deep_health_check_func(state: app::AppState) -> RouterResponse RouterResponse RouterResponse Date: Tue, 30 Jan 2024 17:30:07 +0530 Subject: [PATCH 11/14] fix: build --- crates/api_models/Cargo.toml | 1 + crates/api_models/src/health_check.rs | 1 + crates/router/Cargo.toml | 2 +- crates/router/src/core/health_check.rs | 5 +++++ crates/router/src/core/pm_auth.rs | 3 --- crates/router/src/routes/health.rs | 2 ++ 6 files changed, 10 insertions(+), 4 deletions(-) diff --git a/crates/api_models/Cargo.toml b/crates/api_models/Cargo.toml index 8cd3ee53f218..1e8e0f47eb94 100644 --- a/crates/api_models/Cargo.toml +++ b/crates/api_models/Cargo.toml @@ -18,6 +18,7 @@ dummy_connector = ["euclid/dummy_connector", "common_enums/dummy_connector"] detailed_errors = [] payouts = [] frm = [] +olap = [] openapi = ["common_enums/openapi"] recon = [] diff --git a/crates/api_models/src/health_check.rs b/crates/api_models/src/health_check.rs index 70c4fe96304c..8323f1351346 100644 --- a/crates/api_models/src/health_check.rs +++ b/crates/api_models/src/health_check.rs @@ -3,6 +3,7 @@ pub struct RouterHealthCheckResponse { pub database: bool, pub redis: bool, pub locker: bool, + #[cfg(feature = "olap")] pub analytics: bool, } diff --git a/crates/router/Cargo.toml b/crates/router/Cargo.toml index e575daf7e7ad..4567b1656d7f 100644 --- a/crates/router/Cargo.toml +++ b/crates/router/Cargo.toml @@ -17,7 +17,7 @@ email = ["external_services/email", "dep:aws-config", "olap"] frm = [] stripe = ["dep:serde_qs"] release = ["kms", "stripe", "aws_s3", "email", "backwards_compatibility", "business_profile_routing", "accounts_cache", "kv_store", "connector_choice_mca_id", "profile_specific_fallback_routing", "vergen", "recon"] -olap = ["data_models/olap", "storage_impl/olap", "scheduler/olap", "dep:analytics"] +olap = ["data_models/olap", "storage_impl/olap", "scheduler/olap","api_models/olap","dep:analytics"] oltp = ["storage_impl/oltp"] kv_store = ["scheduler/kv_store"] accounts_cache = [] diff --git a/crates/router/src/core/health_check.rs b/crates/router/src/core/health_check.rs index e8215dd4bee0..5f2351ef726e 100644 --- a/crates/router/src/core/health_check.rs +++ b/crates/router/src/core/health_check.rs @@ -1,4 +1,6 @@ +#[cfg(feature = "olap")] use analytics::health_check::HealthCheck; + use error_stack::ResultExt; use router_env::logger; @@ -14,6 +16,7 @@ pub trait HealthCheckInterface { async fn health_check_db(&self) -> CustomResult<(), errors::HealthCheckDBError>; async fn health_check_redis(&self) -> CustomResult<(), errors::HealthCheckRedisError>; async fn health_check_locker(&self) -> CustomResult<(), errors::HealthCheckLockerError>; + #[cfg(feature = "olap")] async fn health_check_analytics(&self) -> CustomResult<(), errors::HealthCheckDBError>; } @@ -71,6 +74,8 @@ impl HealthCheckInterface for app::AppState { Ok(()) } + + #[cfg(feature = "olap")] async fn health_check_analytics(&self) -> CustomResult<(), errors::HealthCheckDBError> { let analytics = &self.pool; match analytics { diff --git a/crates/router/src/core/pm_auth.rs b/crates/router/src/core/pm_auth.rs index d805925f3183..9f70cc6baeec 100644 --- a/crates/router/src/core/pm_auth.rs +++ b/crates/router/src/core/pm_auth.rs @@ -375,9 +375,6 @@ async fn store_bank_details_in_payment_methods( .await .change_context(ApiErrorResponse::InternalServerError)?; - #[cfg(not(feature = "kms"))] - let pm_auth_key = pm_auth_key; - let mut update_entries: Vec<(storage::PaymentMethod, storage::PaymentMethodUpdate)> = Vec::new(); let mut new_entries: Vec = Vec::new(); diff --git a/crates/router/src/routes/health.rs b/crates/router/src/routes/health.rs index 83454ccb590b..89132c3319bf 100644 --- a/crates/router/src/routes/health.rs +++ b/crates/router/src/routes/health.rs @@ -82,6 +82,7 @@ async fn deep_health_check_func(state: app::AppState) -> RouterResponse RouterResponse Date: Tue, 30 Jan 2024 12:00:44 +0000 Subject: [PATCH 12/14] chore: run formatter --- crates/router/src/core/health_check.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/router/src/core/health_check.rs b/crates/router/src/core/health_check.rs index 5f2351ef726e..6fc038b82e91 100644 --- a/crates/router/src/core/health_check.rs +++ b/crates/router/src/core/health_check.rs @@ -1,6 +1,5 @@ #[cfg(feature = "olap")] use analytics::health_check::HealthCheck; - use error_stack::ResultExt; use router_env::logger; From 62bf704d929ac7fbb562fff075fe287752dd9cce Mon Sep 17 00:00:00 2001 From: Chethan Date: Fri, 2 Feb 2024 13:38:09 +0530 Subject: [PATCH 13/14] handle error propagation and response type --- Cargo.lock | 1 + crates/drainer/Cargo.toml | 1 + crates/drainer/src/errors.rs | 18 ++++++++ crates/drainer/src/health_check.rs | 72 +++++++++++++++++------------- crates/drainer/src/main.rs | 5 ++- crates/drainer/src/services.rs | 28 +++++++++++- 6 files changed, 91 insertions(+), 34 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 65f65af45a2c..187914ee40ec 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2346,6 +2346,7 @@ dependencies = [ "mime", "once_cell", "redis_interface", + "reqwest", "router_env", "serde", "serde_json", diff --git a/crates/drainer/Cargo.toml b/crates/drainer/Cargo.toml index b23c33975019..0533bd12dab8 100644 --- a/crates/drainer/Cargo.toml +++ b/crates/drainer/Cargo.toml @@ -23,6 +23,7 @@ diesel = { version = "2.1.0", features = ["postgres"] } error-stack = "0.3.1" mime = "0.3.17" once_cell = "1.18.0" +reqwest = { version = "0.11.18" } serde = "1.0.193" serde_json = "1.0.108" serde_path_to_error = "0.1.14" diff --git a/crates/drainer/src/errors.rs b/crates/drainer/src/errors.rs index fe2c27e879d3..b41758a8a482 100644 --- a/crates/drainer/src/errors.rs +++ b/crates/drainer/src/errors.rs @@ -19,6 +19,14 @@ pub enum DrainerError { IoError(std::io::Error), } +#[derive(Debug, Error, Clone, serde::Serialize)] +pub enum HealthCheckError { + #[error("Database health check is failiing with error: {message}")] + DbError { message: String }, + #[error("Redis health check is failiing with error: {message}")] + RedisError { message: String }, +} + impl From for DrainerError { fn from(err: std::io::Error) -> Self { Self::IoError(err) @@ -38,3 +46,13 @@ impl From> for DrainerError { Self::RedisError(err) } } + +impl actix_web::ResponseError for HealthCheckError { + fn status_code(&self) -> reqwest::StatusCode { + use reqwest::StatusCode; + + match self { + Self::DbError { .. } | Self::RedisError { .. } => StatusCode::INTERNAL_SERVER_ERROR, + } + } +} diff --git a/crates/drainer/src/health_check.rs b/crates/drainer/src/health_check.rs index c7033033976a..5c0327e8975b 100644 --- a/crates/drainer/src/health_check.rs +++ b/crates/drainer/src/health_check.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -use actix_web::{web, HttpResponse, Scope}; +use actix_web::{web, Scope}; use async_bb8_diesel::{AsyncConnection, AsyncRunQueryDsl}; use common_utils::errors::CustomResult; use diesel_models::{Config, ConfigNew}; @@ -9,7 +9,8 @@ use router_env::{instrument, logger, tracing}; use crate::{ connection::{pg_connection, redis_connection}, - services::Store, + errors::HealthCheckError, + services::{self, Store}, settings::Settings, }; @@ -24,7 +25,7 @@ impl Health { .app_data(web::Data::new(conf)) .app_data(web::Data::new(store)) .service(web::resource("").route(web::get().to(health))) - .service(web::resource("/deep_check").route(web::get().to(deep_health_check))) + .service(web::resource("/ready").route(web::get().to(deep_health_check))) } } @@ -39,53 +40,60 @@ pub async fn deep_health_check( conf: web::Data, store: web::Data>, ) -> impl actix_web::Responder { - let mut status_code = 200; + match deep_health_check_func(conf, store).await { + Ok(response) => services::http_response_json( + serde_json::to_string(&response) + .map_err(|err| { + logger::error!(serialization_error=?err); + }) + .unwrap_or_default(), + ), + + Err(err) => services::log_and_return_error_response(err), + } +} + +#[instrument(skip_all)] +pub async fn deep_health_check_func( + conf: web::Data, + store: web::Data>, +) -> Result> { logger::info!("Deep health check was called"); logger::debug!("Database health check begin"); - let db_status = match store.health_check_db().await { - Ok(_) => "Health is good".to_string(), - Err(err) => { - status_code = 500; - err.to_string() - } - }; + let db_status = store.health_check_db().await.map(|_| true).map_err(|err| { + error_stack::report!(HealthCheckError::DbError { + message: err.to_string() + }) + })?; + logger::debug!("Database health check end"); logger::debug!("Redis health check begin"); - let redis_status = match store.health_check_redis(&conf).await { - Ok(_) => "Health is good".to_string(), - Err(err) => { - status_code = 500; - err.to_string() - } - }; + let redis_status = store + .health_check_redis(&conf.into_inner()) + .await + .map(|_| true) + .map_err(|err| { + error_stack::report!(HealthCheckError::RedisError { + message: err.to_string() + }) + })?; logger::debug!("Redis health check end"); - let response = serde_json::to_string(&DrainerHealthCheckResponse { + Ok(DrainerHealthCheckResponse { database: db_status, redis: redis_status, }) - .unwrap_or_default(); - - if status_code == 200 { - HttpResponse::Ok() - .content_type(mime::APPLICATION_JSON) - .body(response) - } else { - HttpResponse::InternalServerError() - .content_type(mime::APPLICATION_JSON) - .body(response) - } } #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct DrainerHealthCheckResponse { - pub database: String, - pub redis: String, + pub database: bool, + pub redis: bool, } #[async_trait::async_trait] diff --git a/crates/drainer/src/main.rs b/crates/drainer/src/main.rs index 838d50198ebd..e14c674d57f2 100644 --- a/crates/drainer/src/main.rs +++ b/crates/drainer/src/main.rs @@ -31,7 +31,10 @@ async fn main() -> DrainerResult<()> { .await .expect("Failed to create the server"); - tokio::spawn(web_server); + tokio::spawn(async move { + let _ = web_server.await; + logger::error!("The health check probe stopped working!"); + }); logger::debug!(startup_config=?conf); logger::info!("Drainer started [{:?}] [{:?}]", conf.drainer, conf.log); diff --git a/crates/drainer/src/services.rs b/crates/drainer/src/services.rs index 4393ebb9dc97..0d0b78c0cc66 100644 --- a/crates/drainer/src/services.rs +++ b/crates/drainer/src/services.rs @@ -1,6 +1,12 @@ use std::sync::Arc; -use crate::connection::{diesel_make_pg_pool, PgPool}; +use actix_web::{body, HttpResponse, ResponseError}; +use error_stack::Report; + +use crate::{ + connection::{diesel_make_pg_pool, PgPool}, + logger, +}; #[derive(Clone)] pub struct Store { @@ -45,3 +51,23 @@ impl Store { } } } + +pub fn log_and_return_error_response(error: Report) -> HttpResponse +where + T: error_stack::Context + ResponseError + Clone, +{ + logger::error!(?error); + let body = serde_json::json!({ + "message": error.to_string() + }) + .to_string(); + HttpResponse::InternalServerError() + .content_type(mime::APPLICATION_JSON) + .body(body) +} + +pub fn http_response_json(response: T) -> HttpResponse { + HttpResponse::Ok() + .content_type(mime::APPLICATION_JSON) + .body(response) +} From 6ef54d7911f1e30f5f007d10a946a401273b609d Mon Sep 17 00:00:00 2001 From: Chethan Date: Fri, 2 Feb 2024 15:20:06 +0530 Subject: [PATCH 14/14] address requested changes --- crates/drainer/src/errors.rs | 4 ++-- crates/drainer/src/health_check.rs | 16 ++++++++-------- crates/drainer/src/main.rs | 12 ++++++++---- 3 files changed, 18 insertions(+), 14 deletions(-) diff --git a/crates/drainer/src/errors.rs b/crates/drainer/src/errors.rs index b41758a8a482..8605ee2ba04e 100644 --- a/crates/drainer/src/errors.rs +++ b/crates/drainer/src/errors.rs @@ -21,9 +21,9 @@ pub enum DrainerError { #[derive(Debug, Error, Clone, serde::Serialize)] pub enum HealthCheckError { - #[error("Database health check is failiing with error: {message}")] + #[error("Database health check is failing with error: {message}")] DbError { message: String }, - #[error("Redis health check is failiing with error: {message}")] + #[error("Redis health check is failing with error: {message}")] RedisError { message: String }, } diff --git a/crates/drainer/src/health_check.rs b/crates/drainer/src/health_check.rs index 5c0327e8975b..33b4a1395a8c 100644 --- a/crates/drainer/src/health_check.rs +++ b/crates/drainer/src/health_check.rs @@ -114,7 +114,7 @@ impl HealthCheckInterface for Store { diesel::select(diesel::dsl::sql::("1 + 1")); let _x: i32 = query.get_result_async(&conn).await.map_err(|err| { logger::error!(read_err=?err,"Error while reading element in the database"); - HealthCheckDBError::DBReadError + HealthCheckDBError::DbReadError })?; logger::debug!("Database read was successful"); @@ -126,14 +126,14 @@ impl HealthCheckInterface for Store { config.insert(&conn).await.map_err(|err| { logger::error!(write_err=?err,"Error while writing to database"); - HealthCheckDBError::DBWriteError + HealthCheckDBError::DbWriteError })?; logger::debug!("Database write was successful"); Config::delete_by_key(&conn, "test_key").await.map_err(|err| { logger::error!(delete_err=?err,"Error while deleting element in the database"); - HealthCheckDBError::DBDeleteError + HealthCheckDBError::DbDeleteError })?; logger::debug!("Database delete was successful"); @@ -221,13 +221,13 @@ impl HealthCheckInterface for Store { #[derive(Debug, thiserror::Error)] pub enum HealthCheckDBError { #[error("Error while connecting to database")] - DBError, + DbError, #[error("Error while writing to database")] - DBWriteError, + DbWriteError, #[error("Error while reading element in the database")] - DBReadError, + DbReadError, #[error("Error while deleting element in the database")] - DBDeleteError, + DbDeleteError, #[error("Unpredictable error occurred")] UnknownError, #[error("Error in database transaction")] @@ -237,7 +237,7 @@ pub enum HealthCheckDBError { impl From for HealthCheckDBError { fn from(error: diesel::result::Error) -> Self { match error { - diesel::result::Error::DatabaseError(_, _) => Self::DBError, + diesel::result::Error::DatabaseError(_, _) => Self::DbError, diesel::result::Error::RollbackErrorOnCommit { .. } | diesel::result::Error::RollbackTransaction diff --git a/crates/drainer/src/main.rs b/crates/drainer/src/main.rs index e14c674d57f2..943a66f67911 100644 --- a/crates/drainer/src/main.rs +++ b/crates/drainer/src/main.rs @@ -1,6 +1,7 @@ use drainer::{ errors::DrainerResult, logger::logger, services, settings, start_drainer, start_web_server, }; +use router_env::tracing::Instrument; #[tokio::main] async fn main() -> DrainerResult<()> { @@ -31,10 +32,13 @@ async fn main() -> DrainerResult<()> { .await .expect("Failed to create the server"); - tokio::spawn(async move { - let _ = web_server.await; - logger::error!("The health check probe stopped working!"); - }); + tokio::spawn( + async move { + let _ = web_server.await; + logger::error!("The health check probe stopped working!"); + } + .in_current_span(), + ); logger::debug!(startup_config=?conf); logger::info!("Drainer started [{:?}] [{:?}]", conf.drainer, conf.log);