From 0239c422b6bbf169db49b97080cb711b49921f3e Mon Sep 17 00:00:00 2001 From: Nishant Joshi Date: Tue, 7 Feb 2023 12:44:24 +0530 Subject: [PATCH] feat: Add graceful shutdown in drainer (#498) --- Cargo.lock | 3 + config/config.example.toml | 1 + crates/common_utils/Cargo.toml | 3 + crates/common_utils/src/lib.rs | 1 + crates/common_utils/src/signals.rs | 38 +++++++++++++ crates/drainer/src/errors.rs | 4 ++ crates/drainer/src/lib.rs | 76 +++++++++++++++++++++---- crates/drainer/src/main.rs | 9 ++- crates/drainer/src/metrics.rs | 6 +- crates/drainer/src/settings.rs | 2 + crates/router/src/scheduler/consumer.rs | 21 +++---- 11 files changed, 138 insertions(+), 26 deletions(-) create mode 100644 crates/common_utils/src/signals.rs diff --git a/Cargo.lock b/Cargo.lock index ece43bc184..f192fbfe68 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1018,8 +1018,11 @@ dependencies = [ "serde", "serde_json", "serde_urlencoded", + "signal-hook", + "signal-hook-tokio", "thiserror", "time", + "tokio", ] [[package]] diff --git a/config/config.example.toml b/config/config.example.toml index 219e5fd761..978f112b2f 100644 --- a/config/config.example.toml +++ b/config/config.example.toml @@ -187,3 +187,4 @@ batch_size = 200 # Specifies the batch size the producer will push under a singl stream_name = "DRAINER_STREAM" # Specifies the stream name to be used by the drainer num_partitions = 64 # Specifies the number of partitions the stream will be divided into max_read_count = 100 # Specifies the maximum number of entries that would be read from redis stream in one call +shutdown_interval = 1000 # Specifies how much time to wait, while waiting for threads to complete execution diff --git a/crates/common_utils/Cargo.toml b/crates/common_utils/Cargo.toml index e2f5ad54bf..e041d22d13 100644 --- a/crates/common_utils/Cargo.toml +++ b/crates/common_utils/Cargo.toml @@ -20,6 +20,9 @@ ring = "0.16.20" serde = { version = "1.0.152", features = ["derive"] } serde_json = "1.0.91" serde_urlencoded = "0.7.1" +signal-hook-tokio = { version = "0.3.1", features = ["futures-v0_3"] } +signal-hook = "0.3.14" +tokio = { version = "1.25.0", features = ["macros", "rt-multi-thread"] } thiserror = "1.0.38" time = { version = "0.3.17", features = ["serde", "serde-well-known", "std"] } diff --git a/crates/common_utils/src/lib.rs b/crates/common_utils/src/lib.rs index 994375a34d..ed51e6815f 100644 --- a/crates/common_utils/src/lib.rs +++ b/crates/common_utils/src/lib.rs @@ -9,6 +9,7 @@ pub mod errors; pub mod ext_traits; pub mod fp_utils; pub mod pii; +pub mod signals; pub mod validation; /// Date-time utilities. diff --git a/crates/common_utils/src/signals.rs b/crates/common_utils/src/signals.rs new file mode 100644 index 0000000000..c354f51dd3 --- /dev/null +++ b/crates/common_utils/src/signals.rs @@ -0,0 +1,38 @@ +//! Provide Interface for worker services to handle signals + +use futures::StreamExt; +use router_env::logger; +pub use tokio::sync::oneshot; + +/// +/// This functions is meant to run in parallel to the application. +/// It will send a signal to the receiver when a SIGTERM or SIGINT is received +/// +pub async fn signal_handler(mut sig: signal_hook_tokio::Signals, sender: oneshot::Sender<()>) { + if let Some(signal) = sig.next().await { + logger::info!( + "Received signal: {:?}", + signal_hook::low_level::signal_name(signal) + ); + match signal { + signal_hook::consts::SIGTERM | signal_hook::consts::SIGINT => match sender.send(()) { + Ok(_) => { + logger::info!("Request for force shutdown received") + } + Err(_) => { + logger::error!( + "The receiver is closed, a termination call might already be sent" + ) + } + }, + _ => {} + } + } +} + +/// +/// This function is used to generate a list of signals that the signal_handler should listen for +/// +pub fn get_allowed_signals() -> Result { + signal_hook_tokio::Signals::new([signal_hook::consts::SIGTERM, signal_hook::consts::SIGINT]) +} diff --git a/crates/drainer/src/errors.rs b/crates/drainer/src/errors.rs index f46620cd33..7cc73b44aa 100644 --- a/crates/drainer/src/errors.rs +++ b/crates/drainer/src/errors.rs @@ -11,6 +11,10 @@ pub enum DrainerError { ConfigurationError(config::ConfigError), #[error("Metrics initialization error")] MetricsError, + #[error("Error while configuring signals: {0}")] + SignalError(String), + #[error("Unexpected error occurred: {0}")] + UnexpectedError(String), } pub type DrainerResult = error_stack::Result; diff --git a/crates/drainer/src/lib.rs b/crates/drainer/src/lib.rs index f6f8fe6fd0..bb7e5d1e2b 100644 --- a/crates/drainer/src/lib.rs +++ b/crates/drainer/src/lib.rs @@ -5,37 +5,90 @@ pub(crate) mod metrics; pub mod services; pub mod settings; mod utils; -use std::sync::Arc; +use std::sync::{atomic, Arc}; +use common_utils::signals::{get_allowed_signals, oneshot}; pub use env as logger; -use logger::{instrument, tracing}; +use error_stack::{IntoReport, ResultExt}; use storage_models::kv; use crate::{connection::pg_connection, services::Store}; -#[instrument(skip(store))] pub async fn start_drainer( store: Arc, number_of_streams: u8, max_read_count: u64, + shutdown_interval: u32, ) -> errors::DrainerResult<()> { let mut stream_index: u8 = 0; let mut jobs_picked: u8 = 0; - loop { - if utils::is_stream_available(stream_index, store.clone()).await { - tokio::spawn(drainer_handler(store.clone(), stream_index, max_read_count)); - jobs_picked += 1; + + let mut shutdown_interval = + tokio::time::interval(std::time::Duration::from_millis(shutdown_interval.into())); + + let signal = + get_allowed_signals() + .into_report() + .change_context(errors::DrainerError::SignalError( + "Failed while getting allowed signals".to_string(), + ))?; + let (tx, mut rx) = oneshot::channel(); + let handle = signal.handle(); + let task_handle = tokio::spawn(common_utils::signals::signal_handler(signal, tx)); + + let active_tasks = Arc::new(atomic::AtomicU64::new(0)); + 'event: loop { + match rx.try_recv() { + Err(oneshot::error::TryRecvError::Empty) => { + if utils::is_stream_available(stream_index, store.clone()).await { + tokio::spawn(drainer_handler( + store.clone(), + stream_index, + max_read_count, + active_tasks.clone(), + )); + jobs_picked += 1; + } + (stream_index, jobs_picked) = + utils::increment_stream_index((stream_index, jobs_picked), number_of_streams); + } + Ok(()) | Err(oneshot::error::TryRecvError::Closed) => { + logger::info!("Awaiting shutdown!"); + metrics::SHUTDOWN_SIGNAL_RECEIVED.add(&metrics::CONTEXT, 1, &[]); + let shutdown_started = tokio::time::Instant::now(); + loop { + if active_tasks.load(atomic::Ordering::Acquire) == 0 { + logger::info!("Terminating drainer"); + metrics::SUCCESSFUL_SHUTDOWN.add(&metrics::CONTEXT, 1, &[]); + let shutdown_ended = shutdown_started.elapsed().as_secs_f64() * 1000f64; + metrics::CLEANUP_TIME.record(&metrics::CONTEXT, shutdown_ended, &[]); + + break 'event; + } + shutdown_interval.tick().await; + } + } } - (stream_index, jobs_picked) = - utils::increment_stream_index((stream_index, jobs_picked), number_of_streams); } + handle.close(); + task_handle + .await + .into_report() + .change_context(errors::DrainerError::UnexpectedError( + "Failed while joining signal handler".to_string(), + ))?; + + Ok(()) } async fn drainer_handler( store: Arc, stream_index: u8, max_read_count: u64, + active_tasks: Arc, ) -> errors::DrainerResult<()> { + active_tasks.fetch_add(1, atomic::Ordering::Release); + let stream_name = utils::get_drainer_stream_name(store.clone(), stream_index); let drainer_result = drainer(store.clone(), max_read_count, stream_name.as_str()).await; @@ -45,7 +98,10 @@ async fn drainer_handler( let flag_stream_name = utils::get_stream_key_flag(store.clone(), stream_index); //TODO: USE THE RESULT FOR LOGGING - utils::make_stream_available(flag_stream_name.as_str(), store.redis_conn.as_ref()).await + let output = + utils::make_stream_available(flag_stream_name.as_str(), store.redis_conn.as_ref()).await; + active_tasks.fetch_sub(1, atomic::Ordering::Release); + output } async fn drainer( diff --git a/crates/drainer/src/main.rs b/crates/drainer/src/main.rs index 6c1ba08674..18238c7684 100644 --- a/crates/drainer/src/main.rs +++ b/crates/drainer/src/main.rs @@ -18,12 +18,19 @@ async fn main() -> DrainerResult<()> { let number_of_streams = store.config.drainer_num_partitions; let max_read_count = conf.drainer.max_read_count; + let shutdown_intervals = conf.drainer.shutdown_interval; let _guard = logger::setup(&conf.log).change_context(errors::DrainerError::MetricsError)?; logger::info!("Drainer started [{:?}] [{:?}]", conf.drainer, conf.log); - start_drainer(store.clone(), number_of_streams, max_read_count).await?; + start_drainer( + store.clone(), + number_of_streams, + max_read_count, + shutdown_intervals, + ) + .await?; store.close().await; Ok(()) diff --git a/crates/drainer/src/metrics.rs b/crates/drainer/src/metrics.rs index d829a60d68..2b1bfcd014 100644 --- a/crates/drainer/src/metrics.rs +++ b/crates/drainer/src/metrics.rs @@ -48,12 +48,12 @@ pub(crate) static REDIS_STREAM_READ_TIME: Lazy> = pub(crate) static REDIS_STREAM_TRIM_TIME: Lazy> = Lazy::new(|| DRAINER_METER.f64_histogram("REDIS_STREAM_TRIM_TIME").init()); -pub(crate) static _SHUTDOWN_SIGNAL_RECEIVED: Lazy> = +pub(crate) static SHUTDOWN_SIGNAL_RECEIVED: Lazy> = Lazy::new(|| DRAINER_METER.u64_counter("SHUTDOWN_SIGNAL_RECEIVED").init()); -pub(crate) static _SUCCESSFUL_SHUTDOWN: Lazy> = +pub(crate) static SUCCESSFUL_SHUTDOWN: Lazy> = Lazy::new(|| DRAINER_METER.u64_counter("SUCCESSFUL_SHUTDOWN").init()); // Time in (ms) milliseconds -pub(crate) static _CLEANUP_TIME: Lazy> = +pub(crate) static CLEANUP_TIME: Lazy> = Lazy::new(|| DRAINER_METER.f64_histogram("CLEANUP_TIME").init()); diff --git a/crates/drainer/src/settings.rs b/crates/drainer/src/settings.rs index e26c4cb398..2f616b43d0 100644 --- a/crates/drainer/src/settings.rs +++ b/crates/drainer/src/settings.rs @@ -44,6 +44,7 @@ pub struct DrainerSettings { pub stream_name: String, pub num_partitions: u8, pub max_read_count: u64, + pub shutdown_interval: u32, // in milliseconds } impl Default for Database { @@ -65,6 +66,7 @@ impl Default for DrainerSettings { stream_name: "DRAINER_STREAM".into(), num_partitions: 64, max_read_count: 100, + shutdown_interval: 1000, // in milliseconds } } } diff --git a/crates/router/src/scheduler/consumer.rs b/crates/router/src/scheduler/consumer.rs index 5ec7c5a303..e77e0ad677 100644 --- a/crates/router/src/scheduler/consumer.rs +++ b/crates/router/src/scheduler/consumer.rs @@ -5,12 +5,12 @@ use std::{ sync::{self, atomic}, }; +use common_utils::signals::{get_allowed_signals, oneshot}; use error_stack::{IntoReport, ResultExt}; use futures::future; use redis_interface::{RedisConnectionPool, RedisEntryId}; use router_env::{instrument, tracing}; use time::PrimitiveDateTime; -use tokio::sync::oneshot; use uuid::Uuid; use super::{ @@ -53,19 +53,16 @@ pub async fn start_consumer( )); let consumer_operation_counter = sync::Arc::new(atomic::AtomicU64::new(0)); - let signal = signal_hook_tokio::Signals::new([ - signal_hook::consts::SIGTERM, - signal_hook::consts::SIGINT, - ]) - .map_err(|error| { - logger::error!("Signal Handler Error: {:?}", error); - errors::ProcessTrackerError::ConfigurationError - }) - .into_report() - .attach_printable("Failed while creating a signals handler")?; + let signal = get_allowed_signals() + .map_err(|error| { + logger::error!("Signal Handler Error: {:?}", error); + errors::ProcessTrackerError::ConfigurationError + }) + .into_report() + .attach_printable("Failed while creating a signals handler")?; let (sx, mut rx) = oneshot::channel(); let handle = signal.handle(); - let task_handle = tokio::spawn(pt_utils::signal_handler(signal, sx)); + let task_handle = tokio::spawn(common_utils::signals::signal_handler(signal, sx)); loop { match rx.try_recv() {