feat: add deep health check for scheduler (#3304)

Co-authored-by: hyperswitch-bot[bot] <148525504+hyperswitch-bot[bot]@users.noreply.github.com>
Co-authored-by: dracarys18 <karthikey.hegde@juspay.in>
This commit is contained in:
Chethan Rao
2024-02-01 16:44:13 +05:30
committed by GitHub
parent 7cf6c8c0b9
commit 170e10cb8e
13 changed files with 193 additions and 3 deletions

1
Cargo.lock generated
View File

@ -5526,6 +5526,7 @@ dependencies = [
"external_services",
"futures 0.3.28",
"masking",
"num_cpus",
"once_cell",
"rand 0.8.5",
"redis_interface",

View File

@ -298,6 +298,12 @@ lower_fetch_limit = 1800 # Lower limit for fetching entries from redis
lock_key = "PRODUCER_LOCKING_KEY" # The following keys defines the producer lock that is created in redis with
lock_ttl = 160 # the ttl being the expiry (in seconds)
# Scheduler server configuration
[scheduler.server]
port = 3000 # Port on which the server will listen for incoming requests
host = "127.0.0.1" # Host IP address to bind the server to
workers = 1 # Number of actix workers to handle incoming requests concurrently
batch_size = 200 # Specifies the batch size the producer will push under a single entry in the redis queue
# Drainer configuration, which handles draining raw SQL queries from Redis streams to the SQL database

View File

@ -9,3 +9,9 @@ stream = "scheduler_stream"
[scheduler.consumer]
consumer_group = "scheduler_group"
disabled = false # This flag decides if the consumer should actively consume task
# Scheduler server configuration
[scheduler.server]
port = 3000 # Port on which the server will listen for incoming requests
host = "127.0.0.1" # Host IP address to bind the server to
workers = 1 # Number of actix workers to handle incoming requests concurrently

View File

@ -12,3 +12,9 @@ lock_key = "producer_locking_key" # The following keys defines the producer lock
lock_ttl = 160 # the ttl being the expiry (in seconds)
lower_fetch_limit = 900 # Lower limit for fetching entries from redis queue (in seconds)
upper_fetch_limit = 0 # Upper limit for fetching entries from the redis queue (in seconds)0
# Scheduler server configuration
[scheduler.server]
port = 3000 # Port on which the server will listen for incoming requests
host = "127.0.0.1" # Host IP address to bind the server to
workers = 1 # Number of actix workers to handle incoming requests concurrently

View File

@ -228,6 +228,11 @@ stream = "SCHEDULER_STREAM"
disabled = false
consumer_group = "SCHEDULER_GROUP"
[scheduler.server]
port = 3000
host = "127.0.0.1"
workers = 1
[email]
sender_email = "example@example.com"
aws_region = ""

View File

@ -227,6 +227,11 @@ stream = "SCHEDULER_STREAM"
disabled = false
consumer_group = "SCHEDULER_GROUP"
[scheduler.server]
port = 3000
host = "127.0.0.1"
workers = 1
#tokenization configuration which describe token lifetime and payment method for specific connector
[tokenization]
stripe = { long_lived_token = false, payment_method = "wallet", payment_method_type = { type = "disable_only", list = "google_pay" } }

View File

@ -8,3 +8,10 @@ pub struct RouterHealthCheckResponse {
}
impl common_utils::events::ApiEventMetric for RouterHealthCheckResponse {}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct SchedulerHealthCheckResponse {
pub database: bool,
pub redis: bool,
}
impl common_utils::events::ApiEventMetric for SchedulerHealthCheckResponse {}

View File

@ -1,21 +1,29 @@
#![recursion_limit = "256"]
use std::{str::FromStr, sync::Arc};
use actix_web::{dev::Server, web, Scope};
use api_models::health_check::SchedulerHealthCheckResponse;
use common_utils::ext_traits::{OptionExt, StringExt};
use diesel_models::process_tracker as storage;
use error_stack::ResultExt;
use router::{
configs::settings::{CmdLineConf, Settings},
core::errors::{self, CustomResult},
logger, routes, services,
core::{
errors::{self, CustomResult},
health_check::HealthCheckInterface,
},
logger, routes,
services::{self, api},
types::storage::ProcessTrackerExt,
workflows,
};
use router_env::{instrument, tracing};
use scheduler::{
consumer::workflows::ProcessTrackerWorkflow, errors::ProcessTrackerError,
workflows::ProcessTrackerWorkflows, SchedulerAppState,
};
use serde::{Deserialize, Serialize};
use storage_impl::errors::ApplicationError;
use strum::EnumString;
use tokio::sync::{mpsc, oneshot};
@ -68,6 +76,19 @@ async fn main() -> CustomResult<(), ProcessTrackerError> {
[router_env::service_name!()],
);
#[allow(clippy::expect_used)]
let web_server = Box::pin(start_web_server(
state.clone(),
scheduler_flow_str.to_string(),
))
.await
.expect("Failed to create the server");
tokio::spawn(async move {
let _ = web_server.await;
logger::error!("The health check probe stopped working!");
});
logger::debug!(startup_config=?state.conf);
start_scheduler(&state, scheduler_flow, (tx, rx)).await?;
@ -76,6 +97,106 @@ async fn main() -> CustomResult<(), ProcessTrackerError> {
Ok(())
}
pub async fn start_web_server(
state: routes::AppState,
service: String,
) -> errors::ApplicationResult<Server> {
let server = state
.conf
.scheduler
.as_ref()
.ok_or(ApplicationError::InvalidConfigurationValueError(
"Scheduler server is invalidly configured".into(),
))?
.server
.clone();
let web_server = actix_web::HttpServer::new(move || {
actix_web::App::new().service(Health::server(state.clone(), service.clone()))
})
.bind((server.host.as_str(), server.port))?
.workers(server.workers)
.run();
let _ = web_server.handle();
Ok(web_server)
}
pub struct Health;
impl Health {
pub fn server(state: routes::AppState, service: String) -> Scope {
web::scope("health")
.app_data(web::Data::new(state))
.app_data(web::Data::new(service))
.service(web::resource("").route(web::get().to(health)))
.service(web::resource("/ready").route(web::get().to(deep_health_check)))
}
}
#[instrument(skip_all)]
pub async fn health() -> impl actix_web::Responder {
logger::info!("Scheduler health was called");
actix_web::HttpResponse::Ok().body("Scheduler health is good")
}
#[instrument(skip_all)]
pub async fn deep_health_check(
state: web::Data<routes::AppState>,
service: web::Data<String>,
) -> impl actix_web::Responder {
let report = deep_health_check_func(state, service).await;
match report {
Ok(response) => services::http_response_json(
serde_json::to_string(&response)
.map_err(|err| {
logger::error!(serialization_error=?err);
})
.unwrap_or_default(),
),
Err(err) => api::log_and_return_error_response(err),
}
}
#[instrument(skip_all)]
pub async fn deep_health_check_func(
state: web::Data<routes::AppState>,
service: web::Data<String>,
) -> errors::RouterResult<SchedulerHealthCheckResponse> {
logger::info!("{} deep health check was called", service.into_inner());
logger::debug!("Database health check begin");
let db_status = state.health_check_db().await.map(|_| true).map_err(|err| {
error_stack::report!(errors::ApiErrorResponse::HealthCheckError {
component: "Database",
message: err.to_string()
})
})?;
logger::debug!("Database health check end");
logger::debug!("Redis health check begin");
let redis_status = state
.health_check_redis()
.await
.map(|_| true)
.map_err(|err| {
error_stack::report!(errors::ApiErrorResponse::HealthCheckError {
component: "Redis",
message: err.to_string()
})
})?;
logger::debug!("Redis health check end");
let response = SchedulerHealthCheckResponse {
database: db_status,
redis: redis_status,
};
Ok(response)
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, EnumString)]
#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
#[strum(serialize_all = "SCREAMING_SNAKE_CASE")]

View File

@ -238,7 +238,7 @@ pub enum ApiErrorResponse {
WebhookInvalidMerchantSecret,
#[error(error_type = ErrorType::InvalidRequestError, code = "IR_19", message = "{message}")]
CurrencyNotSupported { message: String },
#[error(error_type = ErrorType::ServerNotAvailable, code= "HE_00", message = "{component} health check is failiing with error: {message}")]
#[error(error_type = ErrorType::ServerNotAvailable, code= "HE_00", message = "{component} health check is failing with error: {message}")]
HealthCheckError {
component: &'static str,
message: String,

View File

@ -13,6 +13,7 @@ kv_store = []
async-trait = "0.1.68"
error-stack = "0.3.1"
futures = "0.3.28"
num_cpus = "1.15.0"
once_cell = "1.18.0"
rand = "0.8.5"
serde = "1.0.193"

View File

@ -6,6 +6,7 @@ impl Default for super::settings::SchedulerSettings {
consumer: super::settings::ConsumerSettings::default(),
graceful_shutdown_interval: 60000,
loop_interval: 5000,
server: super::settings::Server::default(),
}
}
}
@ -30,3 +31,13 @@ impl Default for super::settings::ConsumerSettings {
}
}
}
impl Default for super::settings::Server {
fn default() -> Self {
Self {
port: 8080,
workers: num_cpus::get_physical(),
host: "localhost".into(),
}
}
}

View File

@ -15,6 +15,15 @@ pub struct SchedulerSettings {
pub consumer: ConsumerSettings,
pub loop_interval: u64,
pub graceful_shutdown_interval: u64,
pub server: Server,
}
#[derive(Debug, Deserialize, Clone)]
#[serde(default)]
pub struct Server {
pub port: u16,
pub workers: usize,
pub host: String,
}
#[derive(Debug, Clone, Deserialize)]

View File

@ -19,6 +19,8 @@ impl super::settings::SchedulerSettings {
self.producer.validate()?;
self.server.validate()?;
Ok(())
}
}
@ -32,3 +34,13 @@ impl super::settings::ProducerSettings {
})
}
}
impl super::settings::Server {
pub fn validate(&self) -> Result<(), ApplicationError> {
common_utils::fp_utils::when(self.host.is_default_or_empty(), || {
Err(ApplicationError::InvalidConfigurationValueError(
"server host must not be empty".into(),
))
})
}
}