diff --git a/Cargo.lock b/Cargo.lock index a1aabc89a0..f0334ce9cf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5526,6 +5526,7 @@ dependencies = [ "external_services", "futures 0.3.28", "masking", + "num_cpus", "once_cell", "rand 0.8.5", "redis_interface", diff --git a/config/config.example.toml b/config/config.example.toml index 007c671e9e..87999f0e9e 100644 --- a/config/config.example.toml +++ b/config/config.example.toml @@ -298,6 +298,12 @@ lower_fetch_limit = 1800 # Lower limit for fetching entries from redis lock_key = "PRODUCER_LOCKING_KEY" # The following keys defines the producer lock that is created in redis with lock_ttl = 160 # the ttl being the expiry (in seconds) +# Scheduler server configuration +[scheduler.server] +port = 3000 # Port on which the server will listen for incoming requests +host = "127.0.0.1" # Host IP address to bind the server to +workers = 1 # Number of actix workers to handle incoming requests concurrently + batch_size = 200 # Specifies the batch size the producer will push under a single entry in the redis queue # Drainer configuration, which handles draining raw SQL queries from Redis streams to the SQL database diff --git a/config/deployments/scheduler/consumer.toml b/config/deployments/scheduler/consumer.toml index 907e3b8297..cdd6055266 100644 --- a/config/deployments/scheduler/consumer.toml +++ b/config/deployments/scheduler/consumer.toml @@ -9,3 +9,9 @@ stream = "scheduler_stream" [scheduler.consumer] consumer_group = "scheduler_group" disabled = false # This flag decides if the consumer should actively consume task + +# Scheduler server configuration +[scheduler.server] +port = 3000 # Port on which the server will listen for incoming requests +host = "127.0.0.1" # Host IP address to bind the server to +workers = 1 # Number of actix workers to handle incoming requests concurrently diff --git a/config/deployments/scheduler/producer.toml b/config/deployments/scheduler/producer.toml index 579466a23c..9cbaee96f0 100644 --- a/config/deployments/scheduler/producer.toml +++ b/config/deployments/scheduler/producer.toml @@ -12,3 +12,9 @@ lock_key = "producer_locking_key" # The following keys defines the producer lock lock_ttl = 160 # the ttl being the expiry (in seconds) lower_fetch_limit = 900 # Lower limit for fetching entries from redis queue (in seconds) upper_fetch_limit = 0 # Upper limit for fetching entries from the redis queue (in seconds)0 + +# Scheduler server configuration +[scheduler.server] +port = 3000 # Port on which the server will listen for incoming requests +host = "127.0.0.1" # Host IP address to bind the server to +workers = 1 # Number of actix workers to handle incoming requests concurrently diff --git a/config/development.toml b/config/development.toml index 584bdf751a..20abb7bd6f 100644 --- a/config/development.toml +++ b/config/development.toml @@ -228,6 +228,11 @@ stream = "SCHEDULER_STREAM" disabled = false consumer_group = "SCHEDULER_GROUP" +[scheduler.server] +port = 3000 +host = "127.0.0.1" +workers = 1 + [email] sender_email = "example@example.com" aws_region = "" diff --git a/config/docker_compose.toml b/config/docker_compose.toml index fba91dc125..e6dc01afa7 100644 --- a/config/docker_compose.toml +++ b/config/docker_compose.toml @@ -227,6 +227,11 @@ stream = "SCHEDULER_STREAM" disabled = false consumer_group = "SCHEDULER_GROUP" +[scheduler.server] +port = 3000 +host = "127.0.0.1" +workers = 1 + #tokenization configuration which describe token lifetime and payment method for specific connector [tokenization] stripe = { long_lived_token = false, payment_method = "wallet", payment_method_type = { type = "disable_only", list = "google_pay" } } diff --git a/crates/api_models/src/health_check.rs b/crates/api_models/src/health_check.rs index 8323f13513..eab971b5fe 100644 --- a/crates/api_models/src/health_check.rs +++ b/crates/api_models/src/health_check.rs @@ -8,3 +8,10 @@ pub struct RouterHealthCheckResponse { } impl common_utils::events::ApiEventMetric for RouterHealthCheckResponse {} +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct SchedulerHealthCheckResponse { + pub database: bool, + pub redis: bool, +} + +impl common_utils::events::ApiEventMetric for SchedulerHealthCheckResponse {} diff --git a/crates/router/src/bin/scheduler.rs b/crates/router/src/bin/scheduler.rs index b800ecb897..5f98cd8801 100644 --- a/crates/router/src/bin/scheduler.rs +++ b/crates/router/src/bin/scheduler.rs @@ -1,21 +1,29 @@ #![recursion_limit = "256"] use std::{str::FromStr, sync::Arc}; +use actix_web::{dev::Server, web, Scope}; +use api_models::health_check::SchedulerHealthCheckResponse; use common_utils::ext_traits::{OptionExt, StringExt}; use diesel_models::process_tracker as storage; use error_stack::ResultExt; use router::{ configs::settings::{CmdLineConf, Settings}, - core::errors::{self, CustomResult}, - logger, routes, services, + core::{ + errors::{self, CustomResult}, + health_check::HealthCheckInterface, + }, + logger, routes, + services::{self, api}, types::storage::ProcessTrackerExt, workflows, }; +use router_env::{instrument, tracing}; use scheduler::{ consumer::workflows::ProcessTrackerWorkflow, errors::ProcessTrackerError, workflows::ProcessTrackerWorkflows, SchedulerAppState, }; use serde::{Deserialize, Serialize}; +use storage_impl::errors::ApplicationError; use strum::EnumString; use tokio::sync::{mpsc, oneshot}; @@ -68,6 +76,19 @@ async fn main() -> CustomResult<(), ProcessTrackerError> { [router_env::service_name!()], ); + #[allow(clippy::expect_used)] + let web_server = Box::pin(start_web_server( + state.clone(), + scheduler_flow_str.to_string(), + )) + .await + .expect("Failed to create the server"); + + tokio::spawn(async move { + let _ = web_server.await; + logger::error!("The health check probe stopped working!"); + }); + logger::debug!(startup_config=?state.conf); start_scheduler(&state, scheduler_flow, (tx, rx)).await?; @@ -76,6 +97,106 @@ async fn main() -> CustomResult<(), ProcessTrackerError> { Ok(()) } +pub async fn start_web_server( + state: routes::AppState, + service: String, +) -> errors::ApplicationResult { + let server = state + .conf + .scheduler + .as_ref() + .ok_or(ApplicationError::InvalidConfigurationValueError( + "Scheduler server is invalidly configured".into(), + ))? + .server + .clone(); + + let web_server = actix_web::HttpServer::new(move || { + actix_web::App::new().service(Health::server(state.clone(), service.clone())) + }) + .bind((server.host.as_str(), server.port))? + .workers(server.workers) + .run(); + let _ = web_server.handle(); + + Ok(web_server) +} + +pub struct Health; + +impl Health { + pub fn server(state: routes::AppState, service: String) -> Scope { + web::scope("health") + .app_data(web::Data::new(state)) + .app_data(web::Data::new(service)) + .service(web::resource("").route(web::get().to(health))) + .service(web::resource("/ready").route(web::get().to(deep_health_check))) + } +} + +#[instrument(skip_all)] +pub async fn health() -> impl actix_web::Responder { + logger::info!("Scheduler health was called"); + actix_web::HttpResponse::Ok().body("Scheduler health is good") +} +#[instrument(skip_all)] +pub async fn deep_health_check( + state: web::Data, + service: web::Data, +) -> impl actix_web::Responder { + let report = deep_health_check_func(state, service).await; + match report { + Ok(response) => services::http_response_json( + serde_json::to_string(&response) + .map_err(|err| { + logger::error!(serialization_error=?err); + }) + .unwrap_or_default(), + ), + Err(err) => api::log_and_return_error_response(err), + } +} +#[instrument(skip_all)] +pub async fn deep_health_check_func( + state: web::Data, + service: web::Data, +) -> errors::RouterResult { + logger::info!("{} deep health check was called", service.into_inner()); + + logger::debug!("Database health check begin"); + + let db_status = state.health_check_db().await.map(|_| true).map_err(|err| { + error_stack::report!(errors::ApiErrorResponse::HealthCheckError { + component: "Database", + message: err.to_string() + }) + })?; + + logger::debug!("Database health check end"); + + logger::debug!("Redis health check begin"); + + let redis_status = state + .health_check_redis() + .await + .map(|_| true) + .map_err(|err| { + error_stack::report!(errors::ApiErrorResponse::HealthCheckError { + component: "Redis", + message: err.to_string() + }) + })?; + + logger::debug!("Redis health check end"); + + let response = SchedulerHealthCheckResponse { + database: db_status, + redis: redis_status, + }; + + Ok(response) +} + #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, EnumString)] #[serde(rename_all = "SCREAMING_SNAKE_CASE")] #[strum(serialize_all = "SCREAMING_SNAKE_CASE")] diff --git a/crates/router/src/core/errors/api_error_response.rs b/crates/router/src/core/errors/api_error_response.rs index 023e1f4b7f..e83483b081 100644 --- a/crates/router/src/core/errors/api_error_response.rs +++ b/crates/router/src/core/errors/api_error_response.rs @@ -238,7 +238,7 @@ pub enum ApiErrorResponse { WebhookInvalidMerchantSecret, #[error(error_type = ErrorType::InvalidRequestError, code = "IR_19", message = "{message}")] CurrencyNotSupported { message: String }, - #[error(error_type = ErrorType::ServerNotAvailable, code= "HE_00", message = "{component} health check is failiing with error: {message}")] + #[error(error_type = ErrorType::ServerNotAvailable, code= "HE_00", message = "{component} health check is failing with error: {message}")] HealthCheckError { component: &'static str, message: String, diff --git a/crates/scheduler/Cargo.toml b/crates/scheduler/Cargo.toml index fe090552ed..7d4a7821fc 100644 --- a/crates/scheduler/Cargo.toml +++ b/crates/scheduler/Cargo.toml @@ -13,6 +13,7 @@ kv_store = [] async-trait = "0.1.68" error-stack = "0.3.1" futures = "0.3.28" +num_cpus = "1.15.0" once_cell = "1.18.0" rand = "0.8.5" serde = "1.0.193" diff --git a/crates/scheduler/src/configs/defaults.rs b/crates/scheduler/src/configs/defaults.rs index 25eb19e24f..d17c20829e 100644 --- a/crates/scheduler/src/configs/defaults.rs +++ b/crates/scheduler/src/configs/defaults.rs @@ -6,6 +6,7 @@ impl Default for super::settings::SchedulerSettings { consumer: super::settings::ConsumerSettings::default(), graceful_shutdown_interval: 60000, loop_interval: 5000, + server: super::settings::Server::default(), } } } @@ -30,3 +31,13 @@ impl Default for super::settings::ConsumerSettings { } } } + +impl Default for super::settings::Server { + fn default() -> Self { + Self { + port: 8080, + workers: num_cpus::get_physical(), + host: "localhost".into(), + } + } +} diff --git a/crates/scheduler/src/configs/settings.rs b/crates/scheduler/src/configs/settings.rs index 56a9f4079a..723ef81e70 100644 --- a/crates/scheduler/src/configs/settings.rs +++ b/crates/scheduler/src/configs/settings.rs @@ -15,6 +15,15 @@ pub struct SchedulerSettings { pub consumer: ConsumerSettings, pub loop_interval: u64, pub graceful_shutdown_interval: u64, + pub server: Server, +} + +#[derive(Debug, Deserialize, Clone)] +#[serde(default)] +pub struct Server { + pub port: u16, + pub workers: usize, + pub host: String, } #[derive(Debug, Clone, Deserialize)] diff --git a/crates/scheduler/src/configs/validations.rs b/crates/scheduler/src/configs/validations.rs index e9f6621b2a..06052f9ff6 100644 --- a/crates/scheduler/src/configs/validations.rs +++ b/crates/scheduler/src/configs/validations.rs @@ -19,6 +19,8 @@ impl super::settings::SchedulerSettings { self.producer.validate()?; + self.server.validate()?; + Ok(()) } } @@ -32,3 +34,13 @@ impl super::settings::ProducerSettings { }) } } + +impl super::settings::Server { + pub fn validate(&self) -> Result<(), ApplicationError> { + common_utils::fp_utils::when(self.host.is_default_or_empty(), || { + Err(ApplicationError::InvalidConfigurationValueError( + "server host must not be empty".into(), + )) + }) + } +}