chore: fix channel handling for consumer workflow loop (#3223)

Co-authored-by: hyperswitch-bot[bot] <148525504+hyperswitch-bot[bot]@users.noreply.github.com>
This commit is contained in:
Nishant Joshi
2024-01-03 17:42:05 +05:30
committed by GitHub
parent 46e84a6b0c
commit 51e1fac556
2 changed files with 17 additions and 12 deletions

View File

@ -61,7 +61,7 @@ pub async fn start_consumer<T: SchedulerAppState + 'static>(
let handle = signal.handle(); let handle = signal.handle();
let task_handle = tokio::spawn(common_utils::signals::signal_handler(signal, tx)); let task_handle = tokio::spawn(common_utils::signals::signal_handler(signal, tx));
loop { 'consumer: loop {
match rx.try_recv() { match rx.try_recv() {
Err(mpsc::error::TryRecvError::Empty) => { Err(mpsc::error::TryRecvError::Empty) => {
interval.tick().await; interval.tick().await;
@ -71,7 +71,7 @@ pub async fn start_consumer<T: SchedulerAppState + 'static>(
continue; continue;
} }
tokio::task::spawn(pt_utils::consumer_operation_handler( pt_utils::consumer_operation_handler(
state.clone(), state.clone(),
settings.clone(), settings.clone(),
|err| { |err| {
@ -79,19 +79,23 @@ pub async fn start_consumer<T: SchedulerAppState + 'static>(
}, },
sync::Arc::clone(&consumer_operation_counter), sync::Arc::clone(&consumer_operation_counter),
workflow_selector, workflow_selector,
)); )
.await;
} }
Ok(()) | Err(mpsc::error::TryRecvError::Disconnected) => { Ok(()) | Err(mpsc::error::TryRecvError::Disconnected) => {
logger::debug!("Awaiting shutdown!"); logger::debug!("Awaiting shutdown!");
rx.close(); rx.close();
shutdown_interval.tick().await; loop {
let active_tasks = consumer_operation_counter.load(atomic::Ordering::Acquire); shutdown_interval.tick().await;
match active_tasks { let active_tasks = consumer_operation_counter.load(atomic::Ordering::Acquire);
0 => { logger::error!("{}", active_tasks);
logger::info!("Terminating consumer"); match active_tasks {
break; 0 => {
logger::info!("Terminating consumer");
break 'consumer;
}
_ => continue,
} }
_ => continue,
} }
} }
} }
@ -204,6 +208,7 @@ where
T: SchedulerAppState, T: SchedulerAppState,
{ {
tracing::Span::current().record("workflow_id", Uuid::new_v4().to_string()); tracing::Span::current().record("workflow_id", Uuid::new_v4().to_string());
logger::info!("{:?}", process.name.as_ref());
let res = workflow_selector let res = workflow_selector
.trigger_workflow(&state.clone(), process.clone()) .trigger_workflow(&state.clone(), process.clone())
.await; .await;

View File

@ -252,7 +252,7 @@ pub async fn consumer_operation_handler<E, T: Send + Sync + 'static>(
E: FnOnce(error_stack::Report<errors::ProcessTrackerError>), E: FnOnce(error_stack::Report<errors::ProcessTrackerError>),
T: SchedulerAppState, T: SchedulerAppState,
{ {
consumer_operation_counter.fetch_add(1, atomic::Ordering::Release); consumer_operation_counter.fetch_add(1, atomic::Ordering::SeqCst);
let start_time = std_time::Instant::now(); let start_time = std_time::Instant::now();
match consumer::consumer_operations(&state, &settings, workflow_selector).await { match consumer::consumer_operations(&state, &settings, workflow_selector).await {
@ -263,7 +263,7 @@ pub async fn consumer_operation_handler<E, T: Send + Sync + 'static>(
let duration = end_time.saturating_duration_since(start_time).as_secs_f64(); let duration = end_time.saturating_duration_since(start_time).as_secs_f64();
logger::debug!("Time taken to execute consumer_operation: {}s", duration); logger::debug!("Time taken to execute consumer_operation: {}s", duration);
let current_count = consumer_operation_counter.fetch_sub(1, atomic::Ordering::Release); let current_count = consumer_operation_counter.fetch_sub(1, atomic::Ordering::SeqCst);
logger::info!("Current tasks being executed: {}", current_count); logger::info!("Current tasks being executed: {}", current_count);
} }