feat: add deep health check for drainer (#3396)

Co-authored-by: hyperswitch-bot[bot] <148525504+hyperswitch-bot[bot]@users.noreply.github.com>
Co-authored-by: dracarys18 <karthikey.hegde@juspay.in>
This commit is contained in:
Chethan Rao
2024-02-02 16:29:35 +05:30
committed by GitHub
parent 91519d8462
commit 63c383f5a2
9 changed files with 396 additions and 3 deletions

3
Cargo.lock generated
View File

@ -2331,6 +2331,7 @@ checksum = "1aaf95b3e5c8f23aa320147307562d361db0ae0d51242340f558153b4eb2439b"
name = "drainer"
version = "0.1.0"
dependencies = [
"actix-web",
"async-bb8-diesel",
"async-trait",
"bb8",
@ -2342,8 +2343,10 @@ dependencies = [
"error-stack",
"external_services",
"masking",
"mime",
"once_cell",
"redis_interface",
"reqwest",
"router_env",
"serde",
"serde_json",

View File

@ -9,6 +9,7 @@ pub struct RouterHealthCheckResponse {
}
impl common_utils::events::ApiEventMetric for RouterHealthCheckResponse {}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct SchedulerHealthCheckResponse {
pub database: bool,

View File

@ -14,13 +14,16 @@ hashicorp-vault = ["external_services/hashicorp-vault"]
vergen = ["router_env/vergen"]
[dependencies]
actix-web = "4.3.1"
async-bb8-diesel = { git = "https://github.com/jarnura/async-bb8-diesel", rev = "53b4ab901aab7635c8215fd1c2d542c8db443094" }
bb8 = "0.8"
clap = { version = "4.3.2", default-features = false, features = ["std", "derive", "help", "usage"] }
config = { version = "0.13.3", features = ["toml"] }
diesel = { version = "2.1.0", features = ["postgres"] }
error-stack = "0.3.1"
mime = "0.3.17"
once_cell = "1.18.0"
reqwest = { version = "0.11.18" }
serde = "1.0.193"
serde_json = "1.0.108"
serde_path_to_error = "0.1.14"

View File

@ -15,6 +15,22 @@ pub enum DrainerError {
ParsingError(error_stack::Report<common_utils::errors::ParsingError>),
#[error("Unexpected error occurred: {0}")]
UnexpectedError(String),
#[error("I/O: {0}")]
IoError(std::io::Error),
}
#[derive(Debug, Error, Clone, serde::Serialize)]
pub enum HealthCheckError {
#[error("Database health check is failing with error: {message}")]
DbError { message: String },
#[error("Redis health check is failing with error: {message}")]
RedisError { message: String },
}
impl From<std::io::Error> for DrainerError {
fn from(err: std::io::Error) -> Self {
Self::IoError(err)
}
}
pub type DrainerResult<T> = error_stack::Result<T, DrainerError>;
@ -30,3 +46,13 @@ impl From<error_stack::Report<redis::errors::RedisError>> for DrainerError {
Self::RedisError(err)
}
}
impl actix_web::ResponseError for HealthCheckError {
fn status_code(&self) -> reqwest::StatusCode {
use reqwest::StatusCode;
match self {
Self::DbError { .. } | Self::RedisError { .. } => StatusCode::INTERNAL_SERVER_ERROR,
}
}
}

View File

@ -0,0 +1,268 @@
use std::sync::Arc;
use actix_web::{web, Scope};
use async_bb8_diesel::{AsyncConnection, AsyncRunQueryDsl};
use common_utils::errors::CustomResult;
use diesel_models::{Config, ConfigNew};
use error_stack::ResultExt;
use router_env::{instrument, logger, tracing};
use crate::{
connection::{pg_connection, redis_connection},
errors::HealthCheckError,
services::{self, Store},
settings::Settings,
};
pub const TEST_STREAM_NAME: &str = "TEST_STREAM_0";
pub const TEST_STREAM_DATA: &[(&str, &str)] = &[("data", "sample_data")];
pub struct Health;
impl Health {
pub fn server(conf: Settings, store: Arc<Store>) -> Scope {
web::scope("health")
.app_data(web::Data::new(conf))
.app_data(web::Data::new(store))
.service(web::resource("").route(web::get().to(health)))
.service(web::resource("/ready").route(web::get().to(deep_health_check)))
}
}
#[instrument(skip_all)]
pub async fn health() -> impl actix_web::Responder {
logger::info!("Drainer health was called");
actix_web::HttpResponse::Ok().body("Drainer health is good")
}
#[instrument(skip_all)]
pub async fn deep_health_check(
conf: web::Data<Settings>,
store: web::Data<Arc<Store>>,
) -> impl actix_web::Responder {
match deep_health_check_func(conf, store).await {
Ok(response) => services::http_response_json(
serde_json::to_string(&response)
.map_err(|err| {
logger::error!(serialization_error=?err);
})
.unwrap_or_default(),
),
Err(err) => services::log_and_return_error_response(err),
}
}
#[instrument(skip_all)]
pub async fn deep_health_check_func(
conf: web::Data<Settings>,
store: web::Data<Arc<Store>>,
) -> Result<DrainerHealthCheckResponse, error_stack::Report<HealthCheckError>> {
logger::info!("Deep health check was called");
logger::debug!("Database health check begin");
let db_status = store.health_check_db().await.map(|_| true).map_err(|err| {
error_stack::report!(HealthCheckError::DbError {
message: err.to_string()
})
})?;
logger::debug!("Database health check end");
logger::debug!("Redis health check begin");
let redis_status = store
.health_check_redis(&conf.into_inner())
.await
.map(|_| true)
.map_err(|err| {
error_stack::report!(HealthCheckError::RedisError {
message: err.to_string()
})
})?;
logger::debug!("Redis health check end");
Ok(DrainerHealthCheckResponse {
database: db_status,
redis: redis_status,
})
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct DrainerHealthCheckResponse {
pub database: bool,
pub redis: bool,
}
#[async_trait::async_trait]
pub trait HealthCheckInterface {
async fn health_check_db(&self) -> CustomResult<(), HealthCheckDBError>;
async fn health_check_redis(&self, conf: &Settings) -> CustomResult<(), HealthCheckRedisError>;
}
#[async_trait::async_trait]
impl HealthCheckInterface for Store {
async fn health_check_db(&self) -> CustomResult<(), HealthCheckDBError> {
let conn = pg_connection(&self.master_pool).await;
conn
.transaction_async(|conn| {
Box::pin(async move {
let query =
diesel::select(diesel::dsl::sql::<diesel::sql_types::Integer>("1 + 1"));
let _x: i32 = query.get_result_async(&conn).await.map_err(|err| {
logger::error!(read_err=?err,"Error while reading element in the database");
HealthCheckDBError::DbReadError
})?;
logger::debug!("Database read was successful");
let config = ConfigNew {
key: "test_key".to_string(),
config: "test_value".to_string(),
};
config.insert(&conn).await.map_err(|err| {
logger::error!(write_err=?err,"Error while writing to database");
HealthCheckDBError::DbWriteError
})?;
logger::debug!("Database write was successful");
Config::delete_by_key(&conn, "test_key").await.map_err(|err| {
logger::error!(delete_err=?err,"Error while deleting element in the database");
HealthCheckDBError::DbDeleteError
})?;
logger::debug!("Database delete was successful");
Ok::<_, HealthCheckDBError>(())
})
})
.await?;
Ok(())
}
async fn health_check_redis(&self, conf: &Settings) -> CustomResult<(), HealthCheckRedisError> {
let redis_conn = redis_connection(conf).await;
redis_conn
.serialize_and_set_key_with_expiry("test_key", "test_value", 30)
.await
.change_context(HealthCheckRedisError::SetFailed)?;
logger::debug!("Redis set_key was successful");
redis_conn
.get_key("test_key")
.await
.change_context(HealthCheckRedisError::GetFailed)?;
logger::debug!("Redis get_key was successful");
redis_conn
.delete_key("test_key")
.await
.change_context(HealthCheckRedisError::DeleteFailed)?;
logger::debug!("Redis delete_key was successful");
redis_conn
.stream_append_entry(
TEST_STREAM_NAME,
&redis_interface::RedisEntryId::AutoGeneratedID,
TEST_STREAM_DATA.to_vec(),
)
.await
.change_context(HealthCheckRedisError::StreamAppendFailed)?;
logger::debug!("Stream append succeeded");
let output = self
.redis_conn
.stream_read_entries(TEST_STREAM_NAME, "0-0", Some(10))
.await
.change_context(HealthCheckRedisError::StreamReadFailed)?;
logger::debug!("Stream read succeeded");
let (_, id_to_trim) = output
.get(TEST_STREAM_NAME)
.and_then(|entries| {
entries
.last()
.map(|last_entry| (entries, last_entry.0.clone()))
})
.ok_or(error_stack::report!(
HealthCheckRedisError::StreamReadFailed
))?;
logger::debug!("Stream parse succeeded");
redis_conn
.stream_trim_entries(
TEST_STREAM_NAME,
(
redis_interface::StreamCapKind::MinID,
redis_interface::StreamCapTrim::Exact,
id_to_trim,
),
)
.await
.change_context(HealthCheckRedisError::StreamTrimFailed)?;
logger::debug!("Stream trim succeeded");
Ok(())
}
}
#[allow(clippy::enum_variant_names)]
#[derive(Debug, thiserror::Error)]
pub enum HealthCheckDBError {
#[error("Error while connecting to database")]
DbError,
#[error("Error while writing to database")]
DbWriteError,
#[error("Error while reading element in the database")]
DbReadError,
#[error("Error while deleting element in the database")]
DbDeleteError,
#[error("Unpredictable error occurred")]
UnknownError,
#[error("Error in database transaction")]
TransactionError,
}
impl From<diesel::result::Error> for HealthCheckDBError {
fn from(error: diesel::result::Error) -> Self {
match error {
diesel::result::Error::DatabaseError(_, _) => Self::DbError,
diesel::result::Error::RollbackErrorOnCommit { .. }
| diesel::result::Error::RollbackTransaction
| diesel::result::Error::AlreadyInTransaction
| diesel::result::Error::NotInTransaction
| diesel::result::Error::BrokenTransactionManager => Self::TransactionError,
_ => Self::UnknownError,
}
}
}
#[allow(clippy::enum_variant_names)]
#[derive(Debug, thiserror::Error)]
pub enum HealthCheckRedisError {
#[error("Failed to set key value in Redis")]
SetFailed,
#[error("Failed to get key value in Redis")]
GetFailed,
#[error("Failed to delete key value in Redis")]
DeleteFailed,
#[error("Failed to append data to the stream in Redis")]
StreamAppendFailed,
#[error("Failed to read data from the stream in Redis")]
StreamReadFailed,
#[error("Failed to trim data from the stream in Redis")]
StreamTrimFailed,
}

View File

@ -1,6 +1,7 @@
mod connection;
pub mod errors;
mod handler;
mod health_check;
pub mod logger;
pub(crate) mod metrics;
mod query;
@ -11,6 +12,7 @@ mod types;
mod utils;
use std::sync::Arc;
use actix_web::dev::Server;
use common_utils::signals::get_allowed_signals;
use diesel_models::kv;
use error_stack::{IntoReport, ResultExt};
@ -18,7 +20,10 @@ use router_env::{instrument, tracing};
use tokio::sync::mpsc;
use crate::{
connection::pg_connection, services::Store, settings::DrainerSettings, types::StreamData,
connection::pg_connection,
services::Store,
settings::{DrainerSettings, Settings},
types::StreamData,
};
pub async fn start_drainer(store: Arc<Store>, conf: DrainerSettings) -> errors::DrainerResult<()> {
@ -49,3 +54,18 @@ pub async fn start_drainer(store: Arc<Store>, conf: DrainerSettings) -> errors::
Ok(())
}
pub async fn start_web_server(
conf: Settings,
store: Arc<Store>,
) -> Result<Server, errors::DrainerError> {
let server = conf.server.clone();
let web_server = actix_web::HttpServer::new(move || {
actix_web::App::new().service(health_check::Health::server(conf.clone(), store.clone()))
})
.bind((server.host.as_str(), server.port))?
.run();
let _ = web_server.handle();
Ok(web_server)
}

View File

@ -1,4 +1,7 @@
use drainer::{errors::DrainerResult, logger::logger, services, settings, start_drainer};
use drainer::{
errors::DrainerResult, logger::logger, services, settings, start_drainer, start_web_server,
};
use router_env::tracing::Instrument;
#[tokio::main]
async fn main() -> DrainerResult<()> {
@ -24,6 +27,19 @@ async fn main() -> DrainerResult<()> {
[router_env::service_name!()],
);
#[allow(clippy::expect_used)]
let web_server = Box::pin(start_web_server(conf.clone(), store.clone()))
.await
.expect("Failed to create the server");
tokio::spawn(
async move {
let _ = web_server.await;
logger::error!("The health check probe stopped working!");
}
.in_current_span(),
);
logger::debug!(startup_config=?conf);
logger::info!("Drainer started [{:?}] [{:?}]", conf.drainer, conf.log);

View File

@ -1,6 +1,12 @@
use std::sync::Arc;
use crate::connection::{diesel_make_pg_pool, PgPool};
use actix_web::{body, HttpResponse, ResponseError};
use error_stack::Report;
use crate::{
connection::{diesel_make_pg_pool, PgPool},
logger,
};
#[derive(Clone)]
pub struct Store {
@ -45,3 +51,23 @@ impl Store {
}
}
}
pub fn log_and_return_error_response<T>(error: Report<T>) -> HttpResponse
where
T: error_stack::Context + ResponseError + Clone,
{
logger::error!(?error);
let body = serde_json::json!({
"message": error.to_string()
})
.to_string();
HttpResponse::InternalServerError()
.content_type(mime::APPLICATION_JSON)
.body(body)
}
pub fn http_response_json<T: body::MessageBody + 'static>(response: T) -> HttpResponse {
HttpResponse::Ok()
.content_type(mime::APPLICATION_JSON)
.body(response)
}

View File

@ -30,6 +30,7 @@ pub struct CmdLineConf {
#[derive(Debug, Deserialize, Clone, Default)]
#[serde(default)]
pub struct Settings {
pub server: Server,
pub master_database: Database,
pub redis: redis::RedisSettings,
pub log: Log,
@ -62,6 +63,24 @@ pub struct DrainerSettings {
pub loop_interval: u32, // in milliseconds
}
#[derive(Debug, Deserialize, Clone)]
#[serde(default)]
pub struct Server {
pub port: u16,
pub workers: usize,
pub host: String,
}
impl Server {
pub fn validate(&self) -> Result<(), errors::DrainerError> {
common_utils::fp_utils::when(self.host.is_default_or_empty(), || {
Err(errors::DrainerError::ConfigParsingError(
"server host must not be empty".into(),
))
})
}
}
impl Default for Database {
fn default() -> Self {
Self {
@ -88,6 +107,16 @@ impl Default for DrainerSettings {
}
}
impl Default for Server {
fn default() -> Self {
Self {
host: "127.0.0.1".to_string(),
port: 8080,
workers: 1,
}
}
}
impl Database {
fn validate(&self) -> Result<(), errors::DrainerError> {
use common_utils::fp_utils::when;
@ -169,6 +198,7 @@ impl Settings {
}
pub fn validate(&self) -> Result<(), errors::DrainerError> {
self.server.validate()?;
self.master_database.validate()?;
self.redis.validate().map_err(|error| {
println!("{error}");