Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add deep health check for drainer #3396

Merged
merged 20 commits into from
Feb 2, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
d1a4744
feat: add deep health check for drainer
Chethan-rao Jan 18, 2024
b473450
chore: run formatter
hyperswitch-bot[bot] Jan 18, 2024
076b0f9
feat: add stream tests to drainer
dracarys18 Jan 22, 2024
f5a2d70
fix: warning
dracarys18 Jan 22, 2024
37fc0a3
address spell check
Chethan-rao Jan 23, 2024
e66bfb8
feat: add deep health check for analytics
dracarys18 Jan 24, 2024
b79ea49
chore: run formatter
hyperswitch-bot[bot] Jan 24, 2024
6b06075
refactor(health): refactor deep health check
dracarys18 Jan 24, 2024
7b0f7e3
Merge branch 'add_analytics_health' of github.com:juspay/hyperswitch …
dracarys18 Jan 29, 2024
bea745a
Merge branch 'main' of github.com:juspay/hyperswitch into add_analyti…
dracarys18 Jan 29, 2024
1bcd774
chore: run formatter
hyperswitch-bot[bot] Jan 29, 2024
3e67fbf
fix: fix health check error
dracarys18 Jan 30, 2024
a82dfea
Merge branch 'add_analytics_health' of github.com:juspay/hyperswitch …
dracarys18 Jan 30, 2024
39c0db8
fix: build
dracarys18 Jan 30, 2024
b0a65e8
chore: run formatter
hyperswitch-bot[bot] Jan 30, 2024
1fa6b7a
Merge branch 'main' of github.com:juspay/hyperswitch into deep-health…
Chethan-rao Jan 30, 2024
92f7831
Merge branch 'add_analytics_health' of github.com:juspay/hyperswitch …
Chethan-rao Jan 30, 2024
62bf704
handle error propagation and response type
Chethan-rao Feb 2, 2024
3355150
Merge branch 'main' of github.com:juspay/hyperswitch into deep-health…
Chethan-rao Feb 2, 2024
6ef54d7
address requested changes
Chethan-rao Feb 2, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions crates/drainer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@ kms = ["external_services/kms"]
vergen = ["router_env/vergen"]

[dependencies]
actix-web = "4.3.1"
async-bb8-diesel = { git = "https://github.com/jarnura/async-bb8-diesel", rev = "53b4ab901aab7635c8215fd1c2d542c8db443094" }
bb8 = "0.8"
clap = { version = "4.3.2", default-features = false, features = ["std", "derive", "help", "usage"] }
config = { version = "0.13.3", features = ["toml"] }
diesel = { version = "2.1.0", features = ["postgres"] }
error-stack = "0.3.1"
mime = "0.3.17"
once_cell = "1.18.0"
serde = "1.0.193"
serde_json = "1.0.108"
Expand Down
8 changes: 8 additions & 0 deletions crates/drainer/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,14 @@ pub enum DrainerError {
ParsingError(error_stack::Report<common_utils::errors::ParsingError>),
#[error("Unexpected error occurred: {0}")]
UnexpectedError(String),
#[error("I/O: {0}")]
IoError(std::io::Error),
}

impl From<std::io::Error> for DrainerError {
fn from(err: std::io::Error) -> Self {
Self::IoError(err)
}
}

pub type DrainerResult<T> = error_stack::Result<T, DrainerError>;
Expand Down
260 changes: 260 additions & 0 deletions crates/drainer/src/health_check.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,260 @@
use std::sync::Arc;

use actix_web::{web, HttpResponse, Scope};
use async_bb8_diesel::{AsyncConnection, AsyncRunQueryDsl};
use common_utils::errors::CustomResult;
use diesel_models::{Config, ConfigNew};
use error_stack::ResultExt;
use router_env::{instrument, logger, tracing};

use crate::{
connection::{pg_connection, redis_connection},
services::Store,
settings::Settings,
};

pub const TEST_STREAM_NAME: &str = "TEST_STREAM_0";
pub const TEST_STREAM_DATA: &[(&str, &str)] = &[("data", "sample_data")];

pub struct Health;

impl Health {
pub fn server(conf: Settings, store: Arc<Store>) -> Scope {
web::scope("health")
.app_data(web::Data::new(conf))
.app_data(web::Data::new(store))
.service(web::resource("").route(web::get().to(health)))
.service(web::resource("/deep_check").route(web::get().to(deep_health_check)))
}
}

#[instrument(skip_all)]
pub async fn health() -> impl actix_web::Responder {
logger::info!("Drainer health was called");
actix_web::HttpResponse::Ok().body("Drainer health is good")
}

#[instrument(skip_all)]
pub async fn deep_health_check(
conf: web::Data<Settings>,
store: web::Data<Arc<Store>>,
) -> impl actix_web::Responder {
let mut status_code = 200;
logger::info!("Deep health check was called");

logger::debug!("Database health check begin");

let db_status = match store.health_check_db().await {
Ok(_) => "Health is good".to_string(),
Err(err) => {
status_code = 500;
err.to_string()
}
};
logger::debug!("Database health check end");

logger::debug!("Redis health check begin");

let redis_status = match store.health_check_redis(&conf).await {
Ok(_) => "Health is good".to_string(),
Err(err) => {
status_code = 500;
err.to_string()
}
};

logger::debug!("Redis health check end");

let response = serde_json::to_string(&DrainerHealthCheckResponse {
database: db_status,
redis: redis_status,
})
.unwrap_or_default();

if status_code == 200 {
HttpResponse::Ok()
.content_type(mime::APPLICATION_JSON)
.body(response)
} else {
HttpResponse::InternalServerError()
.content_type(mime::APPLICATION_JSON)
.body(response)
}
}

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct DrainerHealthCheckResponse {
pub database: String,
pub redis: String,
}

#[async_trait::async_trait]
pub trait HealthCheckInterface {
async fn health_check_db(&self) -> CustomResult<(), HealthCheckDBError>;
async fn health_check_redis(&self, conf: &Settings) -> CustomResult<(), HealthCheckRedisError>;
}

#[async_trait::async_trait]
impl HealthCheckInterface for Store {
async fn health_check_db(&self) -> CustomResult<(), HealthCheckDBError> {
let conn = pg_connection(&self.master_pool).await;

conn
.transaction_async(|conn| {
Box::pin(async move {
let query =
diesel::select(diesel::dsl::sql::<diesel::sql_types::Integer>("1 + 1"));
let _x: i32 = query.get_result_async(&conn).await.map_err(|err| {
logger::error!(read_err=?err,"Error while reading element in the database");
HealthCheckDBError::DBReadError
})?;

logger::debug!("Database read was successful");

let config = ConfigNew {
key: "test_key".to_string(),
config: "test_value".to_string(),
};

config.insert(&conn).await.map_err(|err| {
logger::error!(write_err=?err,"Error while writing to database");
HealthCheckDBError::DBWriteError
})?;

logger::debug!("Database write was successful");

Config::delete_by_key(&conn, "test_key").await.map_err(|err| {
logger::error!(delete_err=?err,"Error while deleting element in the database");
HealthCheckDBError::DBDeleteError
})?;

logger::debug!("Database delete was successful");

Ok::<_, HealthCheckDBError>(())
})
})
.await?;

Ok(())
}

async fn health_check_redis(&self, conf: &Settings) -> CustomResult<(), HealthCheckRedisError> {
let redis_conn = redis_connection(conf).await;

redis_conn
.serialize_and_set_key_with_expiry("test_key", "test_value", 30)
.await
.change_context(HealthCheckRedisError::SetFailed)?;

logger::debug!("Redis set_key was successful");

redis_conn
.get_key("test_key")
.await
.change_context(HealthCheckRedisError::GetFailed)?;

logger::debug!("Redis get_key was successful");

redis_conn
.delete_key("test_key")
.await
.change_context(HealthCheckRedisError::DeleteFailed)?;

logger::debug!("Redis delete_key was successful");

redis_conn
.stream_append_entry(
TEST_STREAM_NAME,
&redis_interface::RedisEntryId::AutoGeneratedID,
TEST_STREAM_DATA.to_vec(),
)
.await
.change_context(HealthCheckRedisError::StreamAppendFailed)?;

logger::debug!("Stream append succeeded");

let output = self
.redis_conn
.stream_read_entries(TEST_STREAM_NAME, "0-0", Some(10))
.await
.change_context(HealthCheckRedisError::StreamReadFailed)?;
logger::debug!("Stream read succeeded");

let (_, id_to_trim) = output
.get(TEST_STREAM_NAME)
.and_then(|entries| {
entries
.last()
.map(|last_entry| (entries, last_entry.0.clone()))
})
.ok_or(error_stack::report!(
HealthCheckRedisError::StreamReadFailed
))?;
logger::debug!("Stream parse succeeded");

redis_conn
.stream_trim_entries(
TEST_STREAM_NAME,
(
redis_interface::StreamCapKind::MinID,
redis_interface::StreamCapTrim::Exact,
id_to_trim,
),
)
.await
.change_context(HealthCheckRedisError::StreamTrimFailed)?;
logger::debug!("Stream trim succeeded");

Ok(())
}
}

#[allow(clippy::enum_variant_names)]
#[derive(Debug, thiserror::Error)]
pub enum HealthCheckDBError {
#[error("Error while connecting to database")]
DBError,
#[error("Error while writing to database")]
DBWriteError,
#[error("Error while reading element in the database")]
DBReadError,
#[error("Error while deleting element in the database")]
DBDeleteError,
Chethan-rao marked this conversation as resolved.
Show resolved Hide resolved
#[error("Unpredictable error occurred")]
UnknownError,
#[error("Error in database transaction")]
TransactionError,
}

impl From<diesel::result::Error> for HealthCheckDBError {
fn from(error: diesel::result::Error) -> Self {
match error {
diesel::result::Error::DatabaseError(_, _) => Self::DBError,

diesel::result::Error::RollbackErrorOnCommit { .. }
| diesel::result::Error::RollbackTransaction
| diesel::result::Error::AlreadyInTransaction
| diesel::result::Error::NotInTransaction
| diesel::result::Error::BrokenTransactionManager => Self::TransactionError,

_ => Self::UnknownError,
}
}
}

#[allow(clippy::enum_variant_names)]
#[derive(Debug, thiserror::Error)]
pub enum HealthCheckRedisError {
#[error("Failed to set key value in Redis")]
SetFailed,
#[error("Failed to get key value in Redis")]
GetFailed,
#[error("Failed to delete key value in Redis")]
DeleteFailed,
#[error("Failed to append data to the stream in Redis")]
StreamAppendFailed,
#[error("Failed to read data from the stream in Redis")]
StreamReadFailed,
#[error("Failed to trim data from the stream in Redis")]
StreamTrimFailed,
}
22 changes: 21 additions & 1 deletion crates/drainer/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
mod connection;
pub mod errors;
mod handler;
mod health_check;
pub mod logger;
pub(crate) mod metrics;
mod query;
Expand All @@ -11,14 +12,18 @@ mod types;
mod utils;
use std::sync::Arc;

use actix_web::dev::Server;
use common_utils::signals::get_allowed_signals;
use diesel_models::kv;
use error_stack::{IntoReport, ResultExt};
use router_env::{instrument, tracing};
use tokio::sync::mpsc;

use crate::{
connection::pg_connection, services::Store, settings::DrainerSettings, types::StreamData,
connection::pg_connection,
services::Store,
settings::{DrainerSettings, Settings},
types::StreamData,
};

pub async fn start_drainer(store: Arc<Store>, conf: DrainerSettings) -> errors::DrainerResult<()> {
Expand Down Expand Up @@ -49,3 +54,18 @@ pub async fn start_drainer(store: Arc<Store>, conf: DrainerSettings) -> errors::

Ok(())
}

pub async fn start_web_server(
conf: Settings,
store: Arc<Store>,
) -> Result<Server, errors::DrainerError> {
let server = conf.server.clone();
let web_server = actix_web::HttpServer::new(move || {
actix_web::App::new().service(health_check::Health::server(conf.clone(), store.clone()))
})
.bind((server.host.as_str(), server.port))?
.run();
let _ = web_server.handle();

Ok(web_server)
}
11 changes: 10 additions & 1 deletion crates/drainer/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use drainer::{errors::DrainerResult, logger::logger, services, settings, start_drainer};
use drainer::{
errors::DrainerResult, logger::logger, services, settings, start_drainer, start_web_server,
};

#[tokio::main]
async fn main() -> DrainerResult<()> {
Expand All @@ -24,6 +26,13 @@ async fn main() -> DrainerResult<()> {
[router_env::service_name!()],
);

#[allow(clippy::expect_used)]
let web_server = Box::pin(start_web_server(conf.clone(), store.clone()))
.await
.expect("Failed to create the server");

tokio::spawn(web_server);

logger::debug!(startup_config=?conf);
logger::info!("Drainer started [{:?}] [{:?}]", conf.drainer, conf.log);

Expand Down
Loading
Loading