diff --git a/config/config.example.toml b/config/config.example.toml index 962b9293b9..9f6652bc3c 100644 --- a/config/config.example.toml +++ b/config/config.example.toml @@ -170,6 +170,8 @@ cards = [ # It defines the the streams/queues name and configuration as well as event selection variables [scheduler] stream = "SCHEDULER_STREAM" +graceful_shutdown_interval = 60000 # Specifies how much time to wait while re-attempting shutdown for a service (in milliseconds) +loop_interval = 5000 # Specifies how much time to wait before starting the defined behaviour of producer or consumer (in milliseconds) [scheduler.consumer] consumer_group = "SCHEDULER_GROUP" diff --git a/crates/router/src/bin/scheduler.rs b/crates/router/src/bin/scheduler.rs index 7f3615e2a4..7745221f46 100644 --- a/crates/router/src/bin/scheduler.rs +++ b/crates/router/src/bin/scheduler.rs @@ -38,28 +38,6 @@ async fn start_scheduler( ) -> CustomResult<(), errors::ProcessTrackerError> { use std::str::FromStr; - let options = scheduler::SchedulerOptions { - looper_interval: scheduler::Milliseconds { - milliseconds: 5_000, - }, - db_name: "".to_string(), - cache_name: "".to_string(), - schema_name: "".to_string(), - cache_expiry: scheduler::Milliseconds { - milliseconds: 30_000_000, - }, - runners: vec![], - fetch_limit: 30, - fetch_limit_product_factor: 1, - query_order: "".to_string(), - readiness: scheduler::options::ReadinessOptions { - is_ready: true, - graceful_termination_duration: scheduler::Milliseconds { - milliseconds: 60_000, - }, - }, - }; - #[allow(clippy::expect_used)] let flow = std::env::var(SCHEDULER_FLOW).expect("SCHEDULER_FLOW environment variable not set"); #[allow(clippy::expect_used)] @@ -71,6 +49,5 @@ async fn start_scheduler( .scheduler .clone() .ok_or(errors::ProcessTrackerError::ConfigurationError)?; - scheduler::start_process_tracker(state, Arc::new(options), flow, Arc::new(scheduler_settings)) - .await + scheduler::start_process_tracker(state, flow, Arc::new(scheduler_settings)).await } diff --git a/crates/router/src/configs/defaults.rs b/crates/router/src/configs/defaults.rs index cf4829a7ce..a586945909 100644 --- a/crates/router/src/configs/defaults.rs +++ b/crates/router/src/configs/defaults.rs @@ -90,6 +90,8 @@ impl Default for super::settings::SchedulerSettings { stream: "SCHEDULER_STREAM".into(), producer: super::settings::ProducerSettings::default(), consumer: super::settings::ConsumerSettings::default(), + graceful_shutdown_interval: 60000, + loop_interval: 5000, } } } diff --git a/crates/router/src/configs/settings.rs b/crates/router/src/configs/settings.rs index ed33c7153e..eb88a48c9d 100644 --- a/crates/router/src/configs/settings.rs +++ b/crates/router/src/configs/settings.rs @@ -294,6 +294,8 @@ pub struct SchedulerSettings { pub stream: String, pub producer: ProducerSettings, pub consumer: ConsumerSettings, + pub loop_interval: u64, + pub graceful_shutdown_interval: u64, } #[derive(Debug, Clone, Deserialize)] diff --git a/crates/router/src/scheduler.rs b/crates/router/src/scheduler.rs index 18a11e1624..acee091e78 100644 --- a/crates/router/src/scheduler.rs +++ b/crates/router/src/scheduler.rs @@ -19,17 +19,12 @@ use crate::{ pub async fn start_process_tracker( state: &AppState, - options: Arc, scheduler_flow: SchedulerFlow, scheduler_settings: Arc, ) -> CustomResult<(), errors::ProcessTrackerError> { match scheduler_flow { - SchedulerFlow::Producer => { - producer::start_producer(state, Arc::clone(&options), scheduler_settings).await? - } - SchedulerFlow::Consumer => { - consumer::start_consumer(state, Arc::clone(&options), scheduler_settings).await? - } + SchedulerFlow::Producer => producer::start_producer(state, scheduler_settings).await?, + SchedulerFlow::Consumer => consumer::start_consumer(state, scheduler_settings).await?, SchedulerFlow::Cleaner => { error!("This flow has not been implemented yet!"); } diff --git a/crates/router/src/scheduler/consumer.rs b/crates/router/src/scheduler/consumer.rs index e77e0ad677..a2e36d0e98 100644 --- a/crates/router/src/scheduler/consumer.rs +++ b/crates/router/src/scheduler/consumer.rs @@ -35,22 +35,19 @@ pub fn valid_business_statuses() -> Vec<&'static str> { #[instrument(skip_all)] pub async fn start_consumer( state: &AppState, - options: sync::Arc, settings: sync::Arc, ) -> CustomResult<(), errors::ProcessTrackerError> { use std::time::Duration; use rand::Rng; - let timeout = rand::thread_rng().gen_range(0..=options.looper_interval.milliseconds); + let timeout = rand::thread_rng().gen_range(0..=settings.loop_interval); tokio::time::sleep(Duration::from_millis(timeout)).await; - let mut interval = - tokio::time::interval(Duration::from_millis(options.looper_interval.milliseconds)); + let mut interval = tokio::time::interval(Duration::from_millis(settings.loop_interval)); - let mut shutdown_interval = tokio::time::interval(Duration::from_millis( - options.readiness.graceful_termination_duration.milliseconds, - )); + let mut shutdown_interval = + tokio::time::interval(Duration::from_millis(settings.graceful_shutdown_interval)); let consumer_operation_counter = sync::Arc::new(atomic::AtomicU64::new(0)); let signal = get_allowed_signals() @@ -76,7 +73,6 @@ pub async fn start_consumer( tokio::task::spawn(pt_utils::consumer_operation_handler( state.clone(), - options.clone(), settings.clone(), |err| { logger::error!(%err); @@ -111,7 +107,6 @@ pub async fn start_consumer( #[instrument(skip_all)] pub async fn consumer_operations( state: &AppState, - _options: &super::SchedulerOptions, settings: &settings::SchedulerSettings, ) -> CustomResult<(), errors::ProcessTrackerError> { let stream_name = settings.stream.clone(); diff --git a/crates/router/src/scheduler/producer.rs b/crates/router/src/scheduler/producer.rs index 7218bd86d5..4bb61b34bd 100644 --- a/crates/router/src/scheduler/producer.rs +++ b/crates/router/src/scheduler/producer.rs @@ -1,6 +1,7 @@ use std::sync::Arc; -use error_stack::{report, ResultExt}; +use common_utils::signals::oneshot; +use error_stack::{report, IntoReport, ResultExt}; use router_env::{instrument, tracing}; use time::Duration; @@ -9,49 +10,70 @@ use crate::{ configs::settings::SchedulerSettings, core::errors::{self, CustomResult}, db::StorageInterface, - logger::{debug, error, info, warn}, + logger::{self, debug, error, warn}, routes::AppState, - scheduler::{utils::*, SchedulerFlow, SchedulerOptions}, + scheduler::{utils::*, SchedulerFlow}, types::storage::{self, enums::ProcessTrackerStatus}, }; #[instrument(skip_all)] pub async fn start_producer( state: &AppState, - options: Arc, scheduler_settings: Arc, ) -> CustomResult<(), errors::ProcessTrackerError> { use rand::Rng; - let timeout = rand::thread_rng().gen_range(0..=options.looper_interval.milliseconds); + let timeout = rand::thread_rng().gen_range(0..=scheduler_settings.loop_interval); tokio::time::sleep(std::time::Duration::from_millis(timeout)).await; let mut interval = tokio::time::interval(std::time::Duration::from_millis( - options.looper_interval.milliseconds, + scheduler_settings.loop_interval, )); - loop { - interval.tick().await; + let mut shutdown_interval = tokio::time::interval(std::time::Duration::from_millis( + scheduler_settings.graceful_shutdown_interval, + )); - let is_ready = options.readiness.is_ready; - if is_ready { - match run_producer_flow(state, &options, &scheduler_settings).await { - Ok(_) => (), - Err(error) => { - // Intentionally not propagating error to caller. - // Any errors that occur in the producer flow must be handled here only, as - // this is the topmost level function which is concerned with the producer flow. - error!(%error); + let signal = common_utils::signals::get_allowed_signals() + .map_err(|error| { + logger::error!("Signal Handler Error: {:?}", error); + errors::ProcessTrackerError::ConfigurationError + }) + .into_report() + .attach_printable("Failed while creating a signals handler")?; + let (sx, mut rx) = oneshot::channel(); + let handle = signal.handle(); + let task_handle = tokio::spawn(common_utils::signals::signal_handler(signal, sx)); + + loop { + match rx.try_recv() { + Err(oneshot::error::TryRecvError::Empty) => { + interval.tick().await; + + match run_producer_flow(state, &scheduler_settings).await { + Ok(_) => (), + Err(error) => { + // Intentionally not propagating error to caller. + // Any errors that occur in the producer flow must be handled here only, as + // this is the topmost level function which is concerned with the producer flow. + error!(%error); + } } } - } else { - // Currently the producer workflow isn't parallel and a direct termination - // will not cause any loss of data. - // [#268]: resolving this issue will require a different logic for handling this termination. - info!("Terminating producer"); - break; + Ok(()) | Err(oneshot::error::TryRecvError::Closed) => { + logger::debug!("Awaiting shutdown!"); + shutdown_interval.tick().await; + + logger::info!("Terminating consumer"); + break; + } } } + handle.close(); + task_handle + .await + .into_report() + .change_context(errors::ProcessTrackerError::UnexpectedFlow)?; Ok(()) } @@ -59,11 +81,10 @@ pub async fn start_producer( #[instrument(skip_all)] pub async fn run_producer_flow( state: &AppState, - op: &SchedulerOptions, settings: &SchedulerSettings, ) -> CustomResult<(), errors::ProcessTrackerError> { lock_acquire_release::<_, _>(state, settings, move || async { - let tasks = fetch_producer_tasks(&*state.store, op, settings).await?; + let tasks = fetch_producer_tasks(&*state.store, settings).await?; debug!("Producer count of tasks {}", tasks.len()); // [#268]: Allow task based segregation of tasks @@ -80,7 +101,6 @@ pub async fn run_producer_flow( #[instrument(skip_all)] pub async fn fetch_producer_tasks( db: &dyn StorageInterface, - _options: &SchedulerOptions, conf: &SchedulerSettings, ) -> CustomResult, errors::ProcessTrackerError> { let upper = conf.producer.upper_fetch_limit; diff --git a/crates/router/src/scheduler/types.rs b/crates/router/src/scheduler/types.rs index 40b94324a1..fa0a752030 100644 --- a/crates/router/src/scheduler/types.rs +++ b/crates/router/src/scheduler/types.rs @@ -1,7 +1,6 @@ pub mod batch; pub mod config; pub mod flow; -pub mod options; pub mod process_data; pub mod state; @@ -9,7 +8,6 @@ pub use self::{ batch::ProcessTrackerBatch, config::SchedulerConfig, flow::SchedulerFlow, - options::{Milliseconds, SchedulerOptions}, process_data::ProcessData, state::{DummyWorkflowState, WorkflowState}, }; diff --git a/crates/router/src/scheduler/types/options.rs b/crates/router/src/scheduler/types/options.rs deleted file mode 100644 index 2c0e966e2d..0000000000 --- a/crates/router/src/scheduler/types/options.rs +++ /dev/null @@ -1,21 +0,0 @@ -pub struct Milliseconds { - pub milliseconds: u64, -} - -pub struct SchedulerOptions { - pub looper_interval: Milliseconds, - pub db_name: String, - pub cache_name: String, - pub schema_name: String, - pub cache_expiry: Milliseconds, - pub runners: Vec, - pub fetch_limit: i32, - pub fetch_limit_product_factor: i32, - pub query_order: String, - pub readiness: ReadinessOptions, -} - -pub struct ReadinessOptions { - pub is_ready: bool, - pub graceful_termination_duration: Milliseconds, -} diff --git a/crates/router/src/scheduler/utils.rs b/crates/router/src/scheduler/utils.rs index efa1bc6707..dba7992318 100644 --- a/crates/router/src/scheduler/utils.rs +++ b/crates/router/src/scheduler/utils.rs @@ -243,7 +243,6 @@ pub fn get_time_from_delta(delta: Option) -> Option( state: AppState, - options: sync::Arc, settings: sync::Arc, error_handler_fun: E, consumer_operation_counter: sync::Arc, @@ -254,7 +253,7 @@ pub async fn consumer_operation_handler( consumer_operation_counter.fetch_add(1, atomic::Ordering::Release); let start_time = std_time::Instant::now(); - match consumer::consumer_operations(&state, &options, &settings).await { + match consumer::consumer_operations(&state, &settings).await { Ok(_) => (), Err(err) => error_handler_fun(err), }