From 25d8ec200931e70b4cd85178e244c78517c330f0 Mon Sep 17 00:00:00 2001 From: Nishant Joshi Date: Fri, 20 Jan 2023 16:25:56 +0530 Subject: [PATCH] fix: add graceful shutdown for consumer & router (#428) --- Cargo.lock | 24 +++++ config/config.example.toml | 78 +++++++++------- crates/router/Cargo.toml | 2 + crates/router/src/configs/defaults.rs | 1 + crates/router/src/configs/settings.rs | 1 + crates/router/src/lib.rs | 1 + crates/router/src/scheduler/consumer.rs | 93 +++++++++++-------- crates/router/src/scheduler/utils.rs | 32 ++++++- .../src/scheduler/workflows/payment_sync.rs | 2 +- 9 files changed, 159 insertions(+), 75 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a7bae5e5e7..8937f262af 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3035,6 +3035,8 @@ dependencies = [ "serde_qs 0.11.0", "serde_urlencoded", "serial_test", + "signal-hook", + "signal-hook-tokio", "storage_models", "strum", "thiserror", @@ -3374,6 +3376,16 @@ dependencies = [ "lazy_static", ] +[[package]] +name = "signal-hook" +version = "0.3.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a253b5e89e2698464fc26b545c9edceb338e18a89effeeecfea192c3025be29d" +dependencies = [ + "libc", + "signal-hook-registry", +] + [[package]] name = "signal-hook-registry" version = "1.4.0" @@ -3383,6 +3395,18 @@ dependencies = [ "libc", ] +[[package]] +name = "signal-hook-tokio" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "213241f76fb1e37e27de3b6aa1b068a2c333233b59cca6634f634b80a27ecf1e" +dependencies = [ + "futures-core", + "libc", + "signal-hook", + "tokio", +] + [[package]] name = "simd-abstraction" version = "0.7.1" diff --git a/config/config.example.toml b/config/config.example.toml index ed90f40319..a9e21be5a8 100644 --- a/config/config.example.toml +++ b/config/config.example.toml @@ -6,9 +6,11 @@ [server] port = 8080 host = "127.0.0.1" +# This is the grace time (in seconds) given to the actix-server to stop the execution +# For more details: https://actix.rs/docs/server/#graceful-shutdown +shutdown_timeout = 30 # HTTP Request body limit. Defaults to 16kB request_body_limit = 16_384 - # Proxy server configuration for connecting to payment gateways. # Don't define the fields if a Proxy isn't needed. Empty strings will cause failure. [proxy] @@ -17,32 +19,32 @@ request_body_limit = 16_384 # Main SQL data store credentials [master_database] -username = "db_user" # DB Username -password = "db_pass" # DB Password -host = "localhost" # DB Host -port = 5432 # DB Port -dbname = "hyperswitch_db" # Name of Database -pool_size = 5 # Number of connections to keep open +username = "db_user" # DB Username +password = "db_pass" # DB Password +host = "localhost" # DB Host +port = 5432 # DB Port +dbname = "hyperswitch_db" # Name of Database +pool_size = 5 # Number of connections to keep open # Replica SQL data store credentials [replica_database] -username = "replica_user" # DB Username -password = "replica_pass" # DB Password -host = "localhost" # DB Host -port = 5432 # DB Port -dbname = "hyperswitch_db" # Name of Database -pool_size = 5 # Number of connections to keep open +username = "replica_user" # DB Username +password = "replica_pass" # DB Password +host = "localhost" # DB Host +port = 5432 # DB Port +dbname = "hyperswitch_db" # Name of Database +pool_size = 5 # Number of connections to keep open # Redis credentials [redis] host = "127.0.0.1" port = 6379 -pool_size = 5 # Number of connections to keep open -reconnect_max_attempts = 5 # Maximum number of reconnection attempts to make before failing. Set to 0 to retry forever. -reconnect_delay = 5 # Delay between reconnection attempts, in milliseconds -default_ttl = 300 # Default TTL for entries, in seconds -use_legacy_version = false # Resp protocol for fred crate (set this to true if using RESPv2 or redis version < 6) -stream_read_count = 1 # Default number of entries to read from stream if not provided in stream read options +pool_size = 5 # Number of connections to keep open +reconnect_max_attempts = 5 # Maximum number of reconnection attempts to make before failing. Set to 0 to retry forever. +reconnect_delay = 5 # Delay between reconnection attempts, in milliseconds +default_ttl = 300 # Default TTL for entries, in seconds +use_legacy_version = false # Resp protocol for fred crate (set this to true if using RESPv2 or redis version < 6) +stream_read_count = 1 # Default number of entries to read from stream if not provided in stream read options # Logging configuration. Logging can be either to file or console or both. @@ -57,27 +59,27 @@ level = "WARN" # Logging configuration for console logging [log.console] -enabled = true # boolean [true or false] -log_format = "default" # Log format. "default" or "json" +enabled = true # boolean [true or false] +log_format = "default" # Log format. "default" or "json" # levels can be "TRACE", "DEBUG", "INFO", "WARN", "ERROR", "OFF" # defaults to "WARN" level = "DEBUG" # Telemetry configuration for traces [log.telemetry] -enabled = false # boolean [true or false] +enabled = false # boolean [true or false] sampling_rate = 0.1 # decimal rate between 0.0 - 1.0 # This section provides some secret values. [secrets] admin_api_key = "test_admin" # admin API key for admin authentication -jwt_secret = "secret" # JWT secret used for user authentication +jwt_secret = "secret" # JWT secret used for user authentication # Locker settings contain details for accessing a card locker, a # PCI Compliant storage entity which stores payment method information # like card details [locker] -host = "" # Locker host +host = "" # Locker host mock_locker = true # Emulate a locker locally using Postgres basilisk_host = "" #Basilisk host @@ -93,7 +95,7 @@ locker_decryption_key2 = "" # private key 2 in pem format, corresponding public # Refund configuration [refund] max_attempts = 10 # Number of refund attempts allowed -max_age = 365 # Max age of a refund in days. +max_age = 365 # Max age of a refund in days. # Validity of an Ephemeral Key in Hours [eph_key] @@ -148,7 +150,17 @@ base_url = "https://apis.sandbox.globalpay.com/ucp/" # This data is used to call respective connectors for wallets and cards [connectors.supported] wallets = ["klarna", "braintree", "applepay"] -cards = ["stripe", "adyen", "authorizedotnet", "checkout", "braintree", "cybersource", "shift4", "worldpay", "globalpay"] +cards = [ + "stripe", + "adyen", + "authorizedotnet", + "checkout", + "braintree", + "cybersource", + "shift4", + "worldpay", + "globalpay", +] # Scheduler settings provides a point to modify the behaviour of scheduler flow. # It defines the the streams/queues name and configuration as well as event selection variables @@ -157,15 +169,15 @@ stream = "SCHEDULER_STREAM" consumer_group = "SCHEDULER_GROUP" [scheduler.producer] -upper_fetch_limit = 0 # Upper limit for fetching entries from the redis queue (in seconds) -lower_fetch_limit = 1800 # Lower limit for fetching entries from redis queue (in seconds) -lock_key = "PRODUCER_LOCKING_KEY" # The following keys defines the producer lock that is created in redis with -lock_ttl = 160 # the ttl being the expiry (in seconds) +upper_fetch_limit = 0 # Upper limit for fetching entries from the redis queue (in seconds) +lower_fetch_limit = 1800 # Lower limit for fetching entries from redis queue (in seconds) +lock_key = "PRODUCER_LOCKING_KEY" # The following keys defines the producer lock that is created in redis with +lock_ttl = 160 # the ttl being the expiry (in seconds) batch_size = 200 # Specifies the batch size the producer will push under a single entry in the redis queue # Drainer configuration, which handles draining raw SQL queries from Redis streams to the SQL database [drainer] -stream_name = "DRAINER_STREAM" # Specifies the stream name to be used by the drainer -num_partitions = 64 # Specifies the number of partitions the stream will be divided into -max_read_count = 100 # Specifies the maximum number of entries that would be read from redis stream in one call +stream_name = "DRAINER_STREAM" # Specifies the stream name to be used by the drainer +num_partitions = 64 # Specifies the number of partitions the stream will be divided into +max_read_count = 100 # Specifies the maximum number of entries that would be read from redis stream in one call diff --git a/crates/router/Cargo.toml b/crates/router/Cargo.toml index 2f6fc18120..efbda28704 100644 --- a/crates/router/Cargo.toml +++ b/crates/router/Cargo.toml @@ -63,6 +63,8 @@ serde_json = "1.0.91" serde_path_to_error = "0.1.9" serde_qs = { version = "0.11.0", optional = true } serde_urlencoded = "0.7.1" +signal-hook-tokio = { version = "0.3.1", features = ["futures-v0_3"]} +signal-hook = "0.3.14" strum = { version = "0.24.1", features = ["derive"] } thiserror = "1.0.38" time = { version = "0.3.17", features = ["serde", "serde-well-known", "std"] } diff --git a/crates/router/src/configs/defaults.rs b/crates/router/src/configs/defaults.rs index 44ab064098..4593193ab4 100644 --- a/crates/router/src/configs/defaults.rs +++ b/crates/router/src/configs/defaults.rs @@ -6,6 +6,7 @@ impl Default for super::settings::Server { host: "localhost".into(), request_body_limit: 16 * 1024, // POST request body is limited to 16KiB base_url: "http://localhost:8080".into(), + shutdown_timeout: 30, } } } diff --git a/crates/router/src/configs/settings.rs b/crates/router/src/configs/settings.rs index a8418b7806..b21f102801 100644 --- a/crates/router/src/configs/settings.rs +++ b/crates/router/src/configs/settings.rs @@ -109,6 +109,7 @@ pub struct Server { pub host: String, pub request_body_limit: usize, pub base_url: String, + pub shutdown_timeout: u64, } #[derive(Debug, Deserialize, Clone)] diff --git a/crates/router/src/lib.rs b/crates/router/src/lib.rs index c9a69f9760..a2ba872f11 100644 --- a/crates/router/src/lib.rs +++ b/crates/router/src/lib.rs @@ -120,6 +120,7 @@ pub async fn start_server(conf: settings::Settings) -> ApplicationResult<(Server let server = actix_web::HttpServer::new(move || mk_app(state.clone(), request_body_limit)) .bind((server.host.as_str(), server.port))? .workers(server.workers) + .shutdown_timeout(server.shutdown_timeout) .run(); Ok((server, app_state)) diff --git a/crates/router/src/scheduler/consumer.rs b/crates/router/src/scheduler/consumer.rs index f84348c641..026eca7d27 100644 --- a/crates/router/src/scheduler/consumer.rs +++ b/crates/router/src/scheduler/consumer.rs @@ -5,11 +5,12 @@ use std::{ sync::{self, atomic}, }; -use error_stack::ResultExt; +use error_stack::{IntoReport, ResultExt}; use futures::future; use redis_interface::{RedisConnectionPool, RedisEntryId}; use router_env::{instrument, tracing}; use time::PrimitiveDateTime; +use tokio::sync::oneshot; use uuid::Uuid; use super::{ @@ -20,7 +21,7 @@ use crate::{ configs::settings, core::errors::{self, CustomResult}, db::StorageInterface, - logger::{error, info}, + logger, routes::AppState, scheduler::utils as pt_utils, types::storage::{self, enums, ProcessTrackerExt}, @@ -47,38 +48,59 @@ pub async fn start_consumer( let mut interval = tokio::time::interval(Duration::from_millis(options.looper_interval.milliseconds)); + let mut shutdown_interval = tokio::time::interval(Duration::from_millis( + options.readiness.graceful_termination_duration.milliseconds, + )); + let consumer_operation_counter = sync::Arc::new(atomic::AtomicU64::new(0)); + let signal = signal_hook_tokio::Signals::new([ + signal_hook::consts::SIGTERM, + signal_hook::consts::SIGINT, + ]) + .map_err(|error| { + logger::error!("Signal Handler Error: {:?}", error); + errors::ProcessTrackerError::ConfigurationError + }) + .into_report() + .attach_printable("Failed while creating a signals handler")?; + let (sx, mut rx) = oneshot::channel(); + let handle = signal.handle(); + let task_handle = tokio::spawn(pt_utils::signal_handler(signal, sx)); + loop { - interval.tick().await; + match rx.try_recv() { + Err(oneshot::error::TryRecvError::Empty) => { + interval.tick().await; + tokio::task::spawn(pt_utils::consumer_operation_handler( + state.clone(), + options.clone(), + settings.clone(), + |err| { + logger::error!(%err); + }, + sync::Arc::clone(&consumer_operation_counter), + )); + } + Ok(()) | Err(oneshot::error::TryRecvError::Closed) => { + logger::debug!("Awaiting shutdown!"); + shutdown_interval.tick().await; + let active_tasks = consumer_operation_counter.load(atomic::Ordering::Acquire); - let is_ready = options.readiness.is_ready; - if is_ready { - tokio::task::spawn(pt_utils::consumer_operation_handler( - state.clone(), - options.clone(), - settings.clone(), - |err| { - error!(%err); - }, - sync::Arc::clone(&consumer_operation_counter), - )); - } else { - tokio::time::interval(Duration::from_millis( - options.readiness.graceful_termination_duration.milliseconds, - )) - .tick() - .await; - let active_tasks = consumer_operation_counter.load(atomic::Ordering::Acquire); - - match active_tasks { - 0 => { - info!("Terminating consumer"); - break; + match active_tasks { + 0 => { + logger::info!("Terminating consumer"); + break; + } + _ => continue, } - _ => continue, } } } + handle.close(); + task_handle + .await + .into_report() + .change_context(errors::ProcessTrackerError::UnexpectedFlow)?; Ok(()) } @@ -98,7 +120,7 @@ pub async fn consumer_operations( .consumer_group_create(&stream_name, &group_name, &RedisEntryId::AfterLastID) .await; if group_created.is_err() { - info!("Consumer group already exists"); + logger::info!("Consumer group already exists"); } let mut tasks = state @@ -106,6 +128,7 @@ pub async fn consumer_operations( .fetch_consumer_tasks(&stream_name, &group_name, &consumer_name) .await?; + logger::info!("{} picked {} tasks", consumer_name, tasks.len()); let mut handler = vec![]; for task in tasks.iter_mut() { @@ -190,14 +213,12 @@ pub async fn run_executor<'a>( Err(error) => match operation.error_handler(state, process.clone(), error).await { Ok(_) => (), Err(error) => { - error!("Failed while handling error"); - error!(%error); + logger::error!(%error, "Failed while handling error"); let status = process .finish_with_status(&*state.store, "GLOBAL_FAILURE".to_string()) .await; if let Err(err) = status { - error!("Failed while performing database operation: GLOBAL_FAILURE"); - error!(%err) + logger::error!(%err, "Failed while performing database operation: GLOBAL_FAILURE"); } } }, @@ -211,13 +232,7 @@ pub async fn some_error_handler( process: storage::ProcessTracker, error: E, ) -> CustomResult<(), errors::ProcessTrackerError> { - error!(%process.id, "Failed while executing workflow"); - error!(%error); - error!( - pt.name = ?process.name, - pt.id = %process.id, - "Some error occurred" - ); + logger::error!(pt.name = ?process.name, pt.id = %process.id, %error, "Failed while executing workflow"); let db: &dyn StorageInterface = &*state.store; db.process_tracker_update_process_status_by_ids( diff --git a/crates/router/src/scheduler/utils.rs b/crates/router/src/scheduler/utils.rs index 799c610c42..9697c78e1e 100644 --- a/crates/router/src/scheduler/utils.rs +++ b/crates/router/src/scheduler/utils.rs @@ -4,8 +4,10 @@ use std::{ }; use error_stack::{report, ResultExt}; +use futures::StreamExt; use redis_interface::{RedisConnectionPool, RedisEntryId}; use router_env::opentelemetry; +use tokio::sync::oneshot; use uuid::Uuid; use super::{consumer, metrics, process_data, workflows}; @@ -248,7 +250,7 @@ pub async fn consumer_operation_handler( // Error handler function E: FnOnce(error_stack::Report), { - consumer_operation_counter.fetch_add(1, atomic::Ordering::Relaxed); + consumer_operation_counter.fetch_add(1, atomic::Ordering::Release); let start_time = std_time::Instant::now(); match consumer::consumer_operations(&state, &options, &settings).await { @@ -259,7 +261,8 @@ pub async fn consumer_operation_handler( let duration = end_time.saturating_duration_since(start_time).as_secs_f64(); logger::debug!("Time taken to execute consumer_operation: {}s", duration); - consumer_operation_counter.fetch_sub(1, atomic::Ordering::Relaxed); + let current_count = consumer_operation_counter.fetch_sub(1, atomic::Ordering::Release); + logger::info!("Current tasks being executed: {}", current_count); } pub fn runner_from_task( @@ -355,3 +358,28 @@ where }; result } + +pub(crate) async fn signal_handler( + mut sig: signal_hook_tokio::Signals, + sender: oneshot::Sender<()>, +) { + if let Some(signal) = sig.next().await { + logger::info!( + "Received signal: {:?}", + signal_hook::low_level::signal_name(signal) + ); + match signal { + signal_hook::consts::SIGTERM | signal_hook::consts::SIGINT => match sender.send(()) { + Ok(_) => { + logger::info!("Request for force shutdown received") + } + Err(_) => { + logger::error!( + "The receiver is closed, a termination call might already be sent" + ) + } + }, + _ => {} + } + } +} diff --git a/crates/router/src/scheduler/workflows/payment_sync.rs b/crates/router/src/scheduler/workflows/payment_sync.rs index 5f0ca733eb..c23b0ed790 100644 --- a/crates/router/src/scheduler/workflows/payment_sync.rs +++ b/crates/router/src/scheduler/workflows/payment_sync.rs @@ -105,7 +105,7 @@ pub async fn get_sync_process_schedule_time( let mapping = match redis_mapping { Ok(x) => x, Err(err) => { - logger::error!("Redis Mapping Error: {}", err); + logger::info!("Redis Mapping Error: {}", err); process_data::ConnectorPTMapping::default() } };