fix: add graceful shutdown for consumer & router (#428)

This commit is contained in:
Nishant Joshi
2023-01-20 16:25:56 +05:30
committed by GitHub
parent 30593bd1fd
commit 25d8ec2009
9 changed files with 159 additions and 75 deletions

24
Cargo.lock generated
View File

@ -3035,6 +3035,8 @@ dependencies = [
"serde_qs 0.11.0",
"serde_urlencoded",
"serial_test",
"signal-hook",
"signal-hook-tokio",
"storage_models",
"strum",
"thiserror",
@ -3374,6 +3376,16 @@ dependencies = [
"lazy_static",
]
[[package]]
name = "signal-hook"
version = "0.3.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a253b5e89e2698464fc26b545c9edceb338e18a89effeeecfea192c3025be29d"
dependencies = [
"libc",
"signal-hook-registry",
]
[[package]]
name = "signal-hook-registry"
version = "1.4.0"
@ -3383,6 +3395,18 @@ dependencies = [
"libc",
]
[[package]]
name = "signal-hook-tokio"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "213241f76fb1e37e27de3b6aa1b068a2c333233b59cca6634f634b80a27ecf1e"
dependencies = [
"futures-core",
"libc",
"signal-hook",
"tokio",
]
[[package]]
name = "simd-abstraction"
version = "0.7.1"

View File

@ -6,9 +6,11 @@
[server]
port = 8080
host = "127.0.0.1"
# This is the grace time (in seconds) given to the actix-server to stop the execution
# For more details: https://actix.rs/docs/server/#graceful-shutdown
shutdown_timeout = 30
# HTTP Request body limit. Defaults to 16kB
request_body_limit = 16_384
# Proxy server configuration for connecting to payment gateways.
# Don't define the fields if a Proxy isn't needed. Empty strings will cause failure.
[proxy]
@ -17,32 +19,32 @@ request_body_limit = 16_384
# Main SQL data store credentials
[master_database]
username = "db_user" # DB Username
password = "db_pass" # DB Password
host = "localhost" # DB Host
port = 5432 # DB Port
dbname = "hyperswitch_db" # Name of Database
pool_size = 5 # Number of connections to keep open
username = "db_user" # DB Username
password = "db_pass" # DB Password
host = "localhost" # DB Host
port = 5432 # DB Port
dbname = "hyperswitch_db" # Name of Database
pool_size = 5 # Number of connections to keep open
# Replica SQL data store credentials
[replica_database]
username = "replica_user" # DB Username
password = "replica_pass" # DB Password
host = "localhost" # DB Host
port = 5432 # DB Port
dbname = "hyperswitch_db" # Name of Database
pool_size = 5 # Number of connections to keep open
username = "replica_user" # DB Username
password = "replica_pass" # DB Password
host = "localhost" # DB Host
port = 5432 # DB Port
dbname = "hyperswitch_db" # Name of Database
pool_size = 5 # Number of connections to keep open
# Redis credentials
[redis]
host = "127.0.0.1"
port = 6379
pool_size = 5 # Number of connections to keep open
reconnect_max_attempts = 5 # Maximum number of reconnection attempts to make before failing. Set to 0 to retry forever.
reconnect_delay = 5 # Delay between reconnection attempts, in milliseconds
default_ttl = 300 # Default TTL for entries, in seconds
use_legacy_version = false # Resp protocol for fred crate (set this to true if using RESPv2 or redis version < 6)
stream_read_count = 1 # Default number of entries to read from stream if not provided in stream read options
pool_size = 5 # Number of connections to keep open
reconnect_max_attempts = 5 # Maximum number of reconnection attempts to make before failing. Set to 0 to retry forever.
reconnect_delay = 5 # Delay between reconnection attempts, in milliseconds
default_ttl = 300 # Default TTL for entries, in seconds
use_legacy_version = false # Resp protocol for fred crate (set this to true if using RESPv2 or redis version < 6)
stream_read_count = 1 # Default number of entries to read from stream if not provided in stream read options
# Logging configuration. Logging can be either to file or console or both.
@ -57,27 +59,27 @@ level = "WARN"
# Logging configuration for console logging
[log.console]
enabled = true # boolean [true or false]
log_format = "default" # Log format. "default" or "json"
enabled = true # boolean [true or false]
log_format = "default" # Log format. "default" or "json"
# levels can be "TRACE", "DEBUG", "INFO", "WARN", "ERROR", "OFF"
# defaults to "WARN"
level = "DEBUG"
# Telemetry configuration for traces
[log.telemetry]
enabled = false # boolean [true or false]
enabled = false # boolean [true or false]
sampling_rate = 0.1 # decimal rate between 0.0 - 1.0
# This section provides some secret values.
[secrets]
admin_api_key = "test_admin" # admin API key for admin authentication
jwt_secret = "secret" # JWT secret used for user authentication
jwt_secret = "secret" # JWT secret used for user authentication
# Locker settings contain details for accessing a card locker, a
# PCI Compliant storage entity which stores payment method information
# like card details
[locker]
host = "" # Locker host
host = "" # Locker host
mock_locker = true # Emulate a locker locally using Postgres
basilisk_host = "" #Basilisk host
@ -93,7 +95,7 @@ locker_decryption_key2 = "" # private key 2 in pem format, corresponding public
# Refund configuration
[refund]
max_attempts = 10 # Number of refund attempts allowed
max_age = 365 # Max age of a refund in days.
max_age = 365 # Max age of a refund in days.
# Validity of an Ephemeral Key in Hours
[eph_key]
@ -148,7 +150,17 @@ base_url = "https://apis.sandbox.globalpay.com/ucp/"
# This data is used to call respective connectors for wallets and cards
[connectors.supported]
wallets = ["klarna", "braintree", "applepay"]
cards = ["stripe", "adyen", "authorizedotnet", "checkout", "braintree", "cybersource", "shift4", "worldpay", "globalpay"]
cards = [
"stripe",
"adyen",
"authorizedotnet",
"checkout",
"braintree",
"cybersource",
"shift4",
"worldpay",
"globalpay",
]
# Scheduler settings provides a point to modify the behaviour of scheduler flow.
# It defines the the streams/queues name and configuration as well as event selection variables
@ -157,15 +169,15 @@ stream = "SCHEDULER_STREAM"
consumer_group = "SCHEDULER_GROUP"
[scheduler.producer]
upper_fetch_limit = 0 # Upper limit for fetching entries from the redis queue (in seconds)
lower_fetch_limit = 1800 # Lower limit for fetching entries from redis queue (in seconds)
lock_key = "PRODUCER_LOCKING_KEY" # The following keys defines the producer lock that is created in redis with
lock_ttl = 160 # the ttl being the expiry (in seconds)
upper_fetch_limit = 0 # Upper limit for fetching entries from the redis queue (in seconds)
lower_fetch_limit = 1800 # Lower limit for fetching entries from redis queue (in seconds)
lock_key = "PRODUCER_LOCKING_KEY" # The following keys defines the producer lock that is created in redis with
lock_ttl = 160 # the ttl being the expiry (in seconds)
batch_size = 200 # Specifies the batch size the producer will push under a single entry in the redis queue
# Drainer configuration, which handles draining raw SQL queries from Redis streams to the SQL database
[drainer]
stream_name = "DRAINER_STREAM" # Specifies the stream name to be used by the drainer
num_partitions = 64 # Specifies the number of partitions the stream will be divided into
max_read_count = 100 # Specifies the maximum number of entries that would be read from redis stream in one call
stream_name = "DRAINER_STREAM" # Specifies the stream name to be used by the drainer
num_partitions = 64 # Specifies the number of partitions the stream will be divided into
max_read_count = 100 # Specifies the maximum number of entries that would be read from redis stream in one call

View File

@ -63,6 +63,8 @@ serde_json = "1.0.91"
serde_path_to_error = "0.1.9"
serde_qs = { version = "0.11.0", optional = true }
serde_urlencoded = "0.7.1"
signal-hook-tokio = { version = "0.3.1", features = ["futures-v0_3"]}
signal-hook = "0.3.14"
strum = { version = "0.24.1", features = ["derive"] }
thiserror = "1.0.38"
time = { version = "0.3.17", features = ["serde", "serde-well-known", "std"] }

View File

@ -6,6 +6,7 @@ impl Default for super::settings::Server {
host: "localhost".into(),
request_body_limit: 16 * 1024, // POST request body is limited to 16KiB
base_url: "http://localhost:8080".into(),
shutdown_timeout: 30,
}
}
}

View File

@ -109,6 +109,7 @@ pub struct Server {
pub host: String,
pub request_body_limit: usize,
pub base_url: String,
pub shutdown_timeout: u64,
}
#[derive(Debug, Deserialize, Clone)]

View File

@ -120,6 +120,7 @@ pub async fn start_server(conf: settings::Settings) -> ApplicationResult<(Server
let server = actix_web::HttpServer::new(move || mk_app(state.clone(), request_body_limit))
.bind((server.host.as_str(), server.port))?
.workers(server.workers)
.shutdown_timeout(server.shutdown_timeout)
.run();
Ok((server, app_state))

View File

@ -5,11 +5,12 @@ use std::{
sync::{self, atomic},
};
use error_stack::ResultExt;
use error_stack::{IntoReport, ResultExt};
use futures::future;
use redis_interface::{RedisConnectionPool, RedisEntryId};
use router_env::{instrument, tracing};
use time::PrimitiveDateTime;
use tokio::sync::oneshot;
use uuid::Uuid;
use super::{
@ -20,7 +21,7 @@ use crate::{
configs::settings,
core::errors::{self, CustomResult},
db::StorageInterface,
logger::{error, info},
logger,
routes::AppState,
scheduler::utils as pt_utils,
types::storage::{self, enums, ProcessTrackerExt},
@ -47,38 +48,59 @@ pub async fn start_consumer(
let mut interval =
tokio::time::interval(Duration::from_millis(options.looper_interval.milliseconds));
let mut shutdown_interval = tokio::time::interval(Duration::from_millis(
options.readiness.graceful_termination_duration.milliseconds,
));
let consumer_operation_counter = sync::Arc::new(atomic::AtomicU64::new(0));
let signal = signal_hook_tokio::Signals::new([
signal_hook::consts::SIGTERM,
signal_hook::consts::SIGINT,
])
.map_err(|error| {
logger::error!("Signal Handler Error: {:?}", error);
errors::ProcessTrackerError::ConfigurationError
})
.into_report()
.attach_printable("Failed while creating a signals handler")?;
let (sx, mut rx) = oneshot::channel();
let handle = signal.handle();
let task_handle = tokio::spawn(pt_utils::signal_handler(signal, sx));
loop {
interval.tick().await;
match rx.try_recv() {
Err(oneshot::error::TryRecvError::Empty) => {
interval.tick().await;
tokio::task::spawn(pt_utils::consumer_operation_handler(
state.clone(),
options.clone(),
settings.clone(),
|err| {
logger::error!(%err);
},
sync::Arc::clone(&consumer_operation_counter),
));
}
Ok(()) | Err(oneshot::error::TryRecvError::Closed) => {
logger::debug!("Awaiting shutdown!");
shutdown_interval.tick().await;
let active_tasks = consumer_operation_counter.load(atomic::Ordering::Acquire);
let is_ready = options.readiness.is_ready;
if is_ready {
tokio::task::spawn(pt_utils::consumer_operation_handler(
state.clone(),
options.clone(),
settings.clone(),
|err| {
error!(%err);
},
sync::Arc::clone(&consumer_operation_counter),
));
} else {
tokio::time::interval(Duration::from_millis(
options.readiness.graceful_termination_duration.milliseconds,
))
.tick()
.await;
let active_tasks = consumer_operation_counter.load(atomic::Ordering::Acquire);
match active_tasks {
0 => {
info!("Terminating consumer");
break;
match active_tasks {
0 => {
logger::info!("Terminating consumer");
break;
}
_ => continue,
}
_ => continue,
}
}
}
handle.close();
task_handle
.await
.into_report()
.change_context(errors::ProcessTrackerError::UnexpectedFlow)?;
Ok(())
}
@ -98,7 +120,7 @@ pub async fn consumer_operations(
.consumer_group_create(&stream_name, &group_name, &RedisEntryId::AfterLastID)
.await;
if group_created.is_err() {
info!("Consumer group already exists");
logger::info!("Consumer group already exists");
}
let mut tasks = state
@ -106,6 +128,7 @@ pub async fn consumer_operations(
.fetch_consumer_tasks(&stream_name, &group_name, &consumer_name)
.await?;
logger::info!("{} picked {} tasks", consumer_name, tasks.len());
let mut handler = vec![];
for task in tasks.iter_mut() {
@ -190,14 +213,12 @@ pub async fn run_executor<'a>(
Err(error) => match operation.error_handler(state, process.clone(), error).await {
Ok(_) => (),
Err(error) => {
error!("Failed while handling error");
error!(%error);
logger::error!(%error, "Failed while handling error");
let status = process
.finish_with_status(&*state.store, "GLOBAL_FAILURE".to_string())
.await;
if let Err(err) = status {
error!("Failed while performing database operation: GLOBAL_FAILURE");
error!(%err)
logger::error!(%err, "Failed while performing database operation: GLOBAL_FAILURE");
}
}
},
@ -211,13 +232,7 @@ pub async fn some_error_handler<E: fmt::Display>(
process: storage::ProcessTracker,
error: E,
) -> CustomResult<(), errors::ProcessTrackerError> {
error!(%process.id, "Failed while executing workflow");
error!(%error);
error!(
pt.name = ?process.name,
pt.id = %process.id,
"Some error occurred"
);
logger::error!(pt.name = ?process.name, pt.id = %process.id, %error, "Failed while executing workflow");
let db: &dyn StorageInterface = &*state.store;
db.process_tracker_update_process_status_by_ids(

View File

@ -4,8 +4,10 @@ use std::{
};
use error_stack::{report, ResultExt};
use futures::StreamExt;
use redis_interface::{RedisConnectionPool, RedisEntryId};
use router_env::opentelemetry;
use tokio::sync::oneshot;
use uuid::Uuid;
use super::{consumer, metrics, process_data, workflows};
@ -248,7 +250,7 @@ pub async fn consumer_operation_handler<E>(
// Error handler function
E: FnOnce(error_stack::Report<errors::ProcessTrackerError>),
{
consumer_operation_counter.fetch_add(1, atomic::Ordering::Relaxed);
consumer_operation_counter.fetch_add(1, atomic::Ordering::Release);
let start_time = std_time::Instant::now();
match consumer::consumer_operations(&state, &options, &settings).await {
@ -259,7 +261,8 @@ pub async fn consumer_operation_handler<E>(
let duration = end_time.saturating_duration_since(start_time).as_secs_f64();
logger::debug!("Time taken to execute consumer_operation: {}s", duration);
consumer_operation_counter.fetch_sub(1, atomic::Ordering::Relaxed);
let current_count = consumer_operation_counter.fetch_sub(1, atomic::Ordering::Release);
logger::info!("Current tasks being executed: {}", current_count);
}
pub fn runner_from_task(
@ -355,3 +358,28 @@ where
};
result
}
pub(crate) async fn signal_handler(
mut sig: signal_hook_tokio::Signals,
sender: oneshot::Sender<()>,
) {
if let Some(signal) = sig.next().await {
logger::info!(
"Received signal: {:?}",
signal_hook::low_level::signal_name(signal)
);
match signal {
signal_hook::consts::SIGTERM | signal_hook::consts::SIGINT => match sender.send(()) {
Ok(_) => {
logger::info!("Request for force shutdown received")
}
Err(_) => {
logger::error!(
"The receiver is closed, a termination call might already be sent"
)
}
},
_ => {}
}
}
}

View File

@ -105,7 +105,7 @@ pub async fn get_sync_process_schedule_time(
let mapping = match redis_mapping {
Ok(x) => x,
Err(err) => {
logger::error!("Redis Mapping Error: {}", err);
logger::info!("Redis Mapping Error: {}", err);
process_data::ConnectorPTMapping::default()
}
};