diff --git a/crates/scheduler/src/consumer.rs b/crates/scheduler/src/consumer.rs index 0889955270..ef0386bec2 100644 --- a/crates/scheduler/src/consumer.rs +++ b/crates/scheduler/src/consumer.rs @@ -61,7 +61,7 @@ pub async fn start_consumer( let handle = signal.handle(); let task_handle = tokio::spawn(common_utils::signals::signal_handler(signal, tx)); - loop { + 'consumer: loop { match rx.try_recv() { Err(mpsc::error::TryRecvError::Empty) => { interval.tick().await; @@ -71,7 +71,7 @@ pub async fn start_consumer( continue; } - tokio::task::spawn(pt_utils::consumer_operation_handler( + pt_utils::consumer_operation_handler( state.clone(), settings.clone(), |err| { @@ -79,19 +79,23 @@ pub async fn start_consumer( }, sync::Arc::clone(&consumer_operation_counter), workflow_selector, - )); + ) + .await; } Ok(()) | Err(mpsc::error::TryRecvError::Disconnected) => { logger::debug!("Awaiting shutdown!"); rx.close(); - shutdown_interval.tick().await; - let active_tasks = consumer_operation_counter.load(atomic::Ordering::Acquire); - match active_tasks { - 0 => { - logger::info!("Terminating consumer"); - break; + loop { + shutdown_interval.tick().await; + let active_tasks = consumer_operation_counter.load(atomic::Ordering::Acquire); + logger::error!("{}", active_tasks); + match active_tasks { + 0 => { + logger::info!("Terminating consumer"); + break 'consumer; + } + _ => continue, } - _ => continue, } } } @@ -204,6 +208,7 @@ where T: SchedulerAppState, { tracing::Span::current().record("workflow_id", Uuid::new_v4().to_string()); + logger::info!("{:?}", process.name.as_ref()); let res = workflow_selector .trigger_workflow(&state.clone(), process.clone()) .await; diff --git a/crates/scheduler/src/utils.rs b/crates/scheduler/src/utils.rs index 53f14bd1fb..32fd97fca3 100644 --- a/crates/scheduler/src/utils.rs +++ b/crates/scheduler/src/utils.rs @@ -252,7 +252,7 @@ pub async fn consumer_operation_handler( E: FnOnce(error_stack::Report), T: SchedulerAppState, { - consumer_operation_counter.fetch_add(1, atomic::Ordering::Release); + consumer_operation_counter.fetch_add(1, atomic::Ordering::SeqCst); let start_time = std_time::Instant::now(); match consumer::consumer_operations(&state, &settings, workflow_selector).await { @@ -263,7 +263,7 @@ pub async fn consumer_operation_handler( let duration = end_time.saturating_duration_since(start_time).as_secs_f64(); logger::debug!("Time taken to execute consumer_operation: {}s", duration); - let current_count = consumer_operation_counter.fetch_sub(1, atomic::Ordering::Release); + let current_count = consumer_operation_counter.fetch_sub(1, atomic::Ordering::SeqCst); logger::info!("Current tasks being executed: {}", current_count); }