refactor(scheduler): remove scheduler options & adding graceful shutdown to producer (#840)

Co-authored-by: Arun Raj M <jarnura47@gmail.com>
This commit is contained in:
Nishant Joshi
2023-04-06 15:36:40 +05:30
committed by GitHub
parent b8bcba4e6c
commit 11df843610
10 changed files with 60 additions and 91 deletions

View File

@ -170,6 +170,8 @@ cards = [
# It defines the the streams/queues name and configuration as well as event selection variables
[scheduler]
stream = "SCHEDULER_STREAM"
graceful_shutdown_interval = 60000 # Specifies how much time to wait while re-attempting shutdown for a service (in milliseconds)
loop_interval = 5000 # Specifies how much time to wait before starting the defined behaviour of producer or consumer (in milliseconds)
[scheduler.consumer]
consumer_group = "SCHEDULER_GROUP"

View File

@ -38,28 +38,6 @@ async fn start_scheduler(
) -> CustomResult<(), errors::ProcessTrackerError> {
use std::str::FromStr;
let options = scheduler::SchedulerOptions {
looper_interval: scheduler::Milliseconds {
milliseconds: 5_000,
},
db_name: "".to_string(),
cache_name: "".to_string(),
schema_name: "".to_string(),
cache_expiry: scheduler::Milliseconds {
milliseconds: 30_000_000,
},
runners: vec![],
fetch_limit: 30,
fetch_limit_product_factor: 1,
query_order: "".to_string(),
readiness: scheduler::options::ReadinessOptions {
is_ready: true,
graceful_termination_duration: scheduler::Milliseconds {
milliseconds: 60_000,
},
},
};
#[allow(clippy::expect_used)]
let flow = std::env::var(SCHEDULER_FLOW).expect("SCHEDULER_FLOW environment variable not set");
#[allow(clippy::expect_used)]
@ -71,6 +49,5 @@ async fn start_scheduler(
.scheduler
.clone()
.ok_or(errors::ProcessTrackerError::ConfigurationError)?;
scheduler::start_process_tracker(state, Arc::new(options), flow, Arc::new(scheduler_settings))
.await
scheduler::start_process_tracker(state, flow, Arc::new(scheduler_settings)).await
}

View File

@ -90,6 +90,8 @@ impl Default for super::settings::SchedulerSettings {
stream: "SCHEDULER_STREAM".into(),
producer: super::settings::ProducerSettings::default(),
consumer: super::settings::ConsumerSettings::default(),
graceful_shutdown_interval: 60000,
loop_interval: 5000,
}
}
}

View File

@ -294,6 +294,8 @@ pub struct SchedulerSettings {
pub stream: String,
pub producer: ProducerSettings,
pub consumer: ConsumerSettings,
pub loop_interval: u64,
pub graceful_shutdown_interval: u64,
}
#[derive(Debug, Clone, Deserialize)]

View File

@ -19,17 +19,12 @@ use crate::{
pub async fn start_process_tracker(
state: &AppState,
options: Arc<SchedulerOptions>,
scheduler_flow: SchedulerFlow,
scheduler_settings: Arc<SchedulerSettings>,
) -> CustomResult<(), errors::ProcessTrackerError> {
match scheduler_flow {
SchedulerFlow::Producer => {
producer::start_producer(state, Arc::clone(&options), scheduler_settings).await?
}
SchedulerFlow::Consumer => {
consumer::start_consumer(state, Arc::clone(&options), scheduler_settings).await?
}
SchedulerFlow::Producer => producer::start_producer(state, scheduler_settings).await?,
SchedulerFlow::Consumer => consumer::start_consumer(state, scheduler_settings).await?,
SchedulerFlow::Cleaner => {
error!("This flow has not been implemented yet!");
}

View File

@ -35,22 +35,19 @@ pub fn valid_business_statuses() -> Vec<&'static str> {
#[instrument(skip_all)]
pub async fn start_consumer(
state: &AppState,
options: sync::Arc<super::SchedulerOptions>,
settings: sync::Arc<settings::SchedulerSettings>,
) -> CustomResult<(), errors::ProcessTrackerError> {
use std::time::Duration;
use rand::Rng;
let timeout = rand::thread_rng().gen_range(0..=options.looper_interval.milliseconds);
let timeout = rand::thread_rng().gen_range(0..=settings.loop_interval);
tokio::time::sleep(Duration::from_millis(timeout)).await;
let mut interval =
tokio::time::interval(Duration::from_millis(options.looper_interval.milliseconds));
let mut interval = tokio::time::interval(Duration::from_millis(settings.loop_interval));
let mut shutdown_interval = tokio::time::interval(Duration::from_millis(
options.readiness.graceful_termination_duration.milliseconds,
));
let mut shutdown_interval =
tokio::time::interval(Duration::from_millis(settings.graceful_shutdown_interval));
let consumer_operation_counter = sync::Arc::new(atomic::AtomicU64::new(0));
let signal = get_allowed_signals()
@ -76,7 +73,6 @@ pub async fn start_consumer(
tokio::task::spawn(pt_utils::consumer_operation_handler(
state.clone(),
options.clone(),
settings.clone(),
|err| {
logger::error!(%err);
@ -111,7 +107,6 @@ pub async fn start_consumer(
#[instrument(skip_all)]
pub async fn consumer_operations(
state: &AppState,
_options: &super::SchedulerOptions,
settings: &settings::SchedulerSettings,
) -> CustomResult<(), errors::ProcessTrackerError> {
let stream_name = settings.stream.clone();

View File

@ -1,6 +1,7 @@
use std::sync::Arc;
use error_stack::{report, ResultExt};
use common_utils::signals::oneshot;
use error_stack::{report, IntoReport, ResultExt};
use router_env::{instrument, tracing};
use time::Duration;
@ -9,49 +10,70 @@ use crate::{
configs::settings::SchedulerSettings,
core::errors::{self, CustomResult},
db::StorageInterface,
logger::{debug, error, info, warn},
logger::{self, debug, error, warn},
routes::AppState,
scheduler::{utils::*, SchedulerFlow, SchedulerOptions},
scheduler::{utils::*, SchedulerFlow},
types::storage::{self, enums::ProcessTrackerStatus},
};
#[instrument(skip_all)]
pub async fn start_producer(
state: &AppState,
options: Arc<SchedulerOptions>,
scheduler_settings: Arc<SchedulerSettings>,
) -> CustomResult<(), errors::ProcessTrackerError> {
use rand::Rng;
let timeout = rand::thread_rng().gen_range(0..=options.looper_interval.milliseconds);
let timeout = rand::thread_rng().gen_range(0..=scheduler_settings.loop_interval);
tokio::time::sleep(std::time::Duration::from_millis(timeout)).await;
let mut interval = tokio::time::interval(std::time::Duration::from_millis(
options.looper_interval.milliseconds,
scheduler_settings.loop_interval,
));
loop {
interval.tick().await;
let mut shutdown_interval = tokio::time::interval(std::time::Duration::from_millis(
scheduler_settings.graceful_shutdown_interval,
));
let is_ready = options.readiness.is_ready;
if is_ready {
match run_producer_flow(state, &options, &scheduler_settings).await {
Ok(_) => (),
Err(error) => {
// Intentionally not propagating error to caller.
// Any errors that occur in the producer flow must be handled here only, as
// this is the topmost level function which is concerned with the producer flow.
error!(%error);
let signal = common_utils::signals::get_allowed_signals()
.map_err(|error| {
logger::error!("Signal Handler Error: {:?}", error);
errors::ProcessTrackerError::ConfigurationError
})
.into_report()
.attach_printable("Failed while creating a signals handler")?;
let (sx, mut rx) = oneshot::channel();
let handle = signal.handle();
let task_handle = tokio::spawn(common_utils::signals::signal_handler(signal, sx));
loop {
match rx.try_recv() {
Err(oneshot::error::TryRecvError::Empty) => {
interval.tick().await;
match run_producer_flow(state, &scheduler_settings).await {
Ok(_) => (),
Err(error) => {
// Intentionally not propagating error to caller.
// Any errors that occur in the producer flow must be handled here only, as
// this is the topmost level function which is concerned with the producer flow.
error!(%error);
}
}
}
} else {
// Currently the producer workflow isn't parallel and a direct termination
// will not cause any loss of data.
// [#268]: resolving this issue will require a different logic for handling this termination.
info!("Terminating producer");
break;
Ok(()) | Err(oneshot::error::TryRecvError::Closed) => {
logger::debug!("Awaiting shutdown!");
shutdown_interval.tick().await;
logger::info!("Terminating consumer");
break;
}
}
}
handle.close();
task_handle
.await
.into_report()
.change_context(errors::ProcessTrackerError::UnexpectedFlow)?;
Ok(())
}
@ -59,11 +81,10 @@ pub async fn start_producer(
#[instrument(skip_all)]
pub async fn run_producer_flow(
state: &AppState,
op: &SchedulerOptions,
settings: &SchedulerSettings,
) -> CustomResult<(), errors::ProcessTrackerError> {
lock_acquire_release::<_, _>(state, settings, move || async {
let tasks = fetch_producer_tasks(&*state.store, op, settings).await?;
let tasks = fetch_producer_tasks(&*state.store, settings).await?;
debug!("Producer count of tasks {}", tasks.len());
// [#268]: Allow task based segregation of tasks
@ -80,7 +101,6 @@ pub async fn run_producer_flow(
#[instrument(skip_all)]
pub async fn fetch_producer_tasks(
db: &dyn StorageInterface,
_options: &SchedulerOptions,
conf: &SchedulerSettings,
) -> CustomResult<Vec<storage::ProcessTracker>, errors::ProcessTrackerError> {
let upper = conf.producer.upper_fetch_limit;

View File

@ -1,7 +1,6 @@
pub mod batch;
pub mod config;
pub mod flow;
pub mod options;
pub mod process_data;
pub mod state;
@ -9,7 +8,6 @@ pub use self::{
batch::ProcessTrackerBatch,
config::SchedulerConfig,
flow::SchedulerFlow,
options::{Milliseconds, SchedulerOptions},
process_data::ProcessData,
state::{DummyWorkflowState, WorkflowState},
};

View File

@ -1,21 +0,0 @@
pub struct Milliseconds {
pub milliseconds: u64,
}
pub struct SchedulerOptions {
pub looper_interval: Milliseconds,
pub db_name: String,
pub cache_name: String,
pub schema_name: String,
pub cache_expiry: Milliseconds,
pub runners: Vec<String>,
pub fetch_limit: i32,
pub fetch_limit_product_factor: i32,
pub query_order: String,
pub readiness: ReadinessOptions,
}
pub struct ReadinessOptions {
pub is_ready: bool,
pub graceful_termination_duration: Milliseconds,
}

View File

@ -243,7 +243,6 @@ pub fn get_time_from_delta(delta: Option<i32>) -> Option<time::PrimitiveDateTime
pub async fn consumer_operation_handler<E>(
state: AppState,
options: sync::Arc<super::SchedulerOptions>,
settings: sync::Arc<SchedulerSettings>,
error_handler_fun: E,
consumer_operation_counter: sync::Arc<atomic::AtomicU64>,
@ -254,7 +253,7 @@ pub async fn consumer_operation_handler<E>(
consumer_operation_counter.fetch_add(1, atomic::Ordering::Release);
let start_time = std_time::Instant::now();
match consumer::consumer_operations(&state, &options, &settings).await {
match consumer::consumer_operations(&state, &settings).await {
Ok(_) => (),
Err(err) => error_handler_fun(err),
}