diff --git a/crates/router/src/core/payments/operations.rs b/crates/router/src/core/payments/operations.rs index b0ad19a91f..1b403b8615 100644 --- a/crates/router/src/core/payments/operations.rs +++ b/crates/router/src/core/payments/operations.rs @@ -212,7 +212,7 @@ where payment_attempt: &storage::PaymentAttempt, ) -> CustomResult<(), errors::ApiErrorResponse> { if helpers::check_if_operation_confirm(self) { - metrics::JOB_COUNT.add(1, &[]); // Metrics + metrics::TASKS_ADDED_COUNT.add(1, &[]); // Metrics let schedule_time = payment_sync::get_sync_process_schedule_time( &payment_attempt.connector, diff --git a/crates/router/src/scheduler/consumer.rs b/crates/router/src/scheduler/consumer.rs index eb4398bf67..41f12853cd 100644 --- a/crates/router/src/scheduler/consumer.rs +++ b/crates/router/src/scheduler/consumer.rs @@ -206,6 +206,7 @@ pub async fn run_executor<'a>( } }, }; + metrics::TASK_PROCESSED.add(1, &[]); } #[instrument(skip_all)] diff --git a/crates/router/src/scheduler/metrics.rs b/crates/router/src/scheduler/metrics.rs index fb3a2f2781..7967c0cbc9 100644 --- a/crates/router/src/scheduler/metrics.rs +++ b/crates/router/src/scheduler/metrics.rs @@ -18,8 +18,12 @@ macro_rules! create_counter { }; } -create_counter!(PAYMENT_COUNT, PT_METER); -create_counter!(JOB_COUNT, PT_METER); -create_counter!(BATCHES_CREATED, PT_METER); -create_counter!(BATCHES_CONSUMED, PT_METER); -create_counter!(TASK_CONSUMED, PT_METER); +create_counter!(PAYMENT_COUNT, PT_METER); // No. of payments created +create_counter!(TASKS_ADDED_COUNT, PT_METER); // Tasks added to process tracker +create_counter!(TASKS_PICKED_COUNT, PT_METER); // Tasks picked by +create_counter!(BATCHES_CREATED, PT_METER); // Batches added to stream +create_counter!(BATCHES_CONSUMED, PT_METER); // Batches consumed by consumer +create_counter!(TASK_CONSUMED, PT_METER); // Tasks consumed by consumer +create_counter!(TASK_PROCESSED, PT_METER); // Tasks completed processing +create_counter!(TASK_FINISHED, PT_METER); // Tasks finished +create_counter!(TASK_RETRIED, PT_METER); // Tasks added for retries diff --git a/crates/router/src/scheduler/producer.rs b/crates/router/src/scheduler/producer.rs index b3f6b1f7db..0c4ab2ca81 100644 --- a/crates/router/src/scheduler/producer.rs +++ b/crates/router/src/scheduler/producer.rs @@ -4,6 +4,7 @@ use error_stack::{report, ResultExt}; use router_env::{tracing, tracing::instrument}; use time::Duration; +use super::metrics; use crate::{ configs::settings::SchedulerSettings, core::errors::{self, CustomResult}, @@ -15,25 +16,6 @@ use crate::{ utils, }; -//TODO: move to env -pub fn fetch_upper_limit() -> i64 { - 0 -} - -pub fn fetch_lower_limit() -> i64 { - 1800 -} - -pub fn producer_lock_key() -> &'static str { - "PRODUCER_LOCKING_KEY" -} - -pub fn producer_lock_ttl() -> i64 { - // ttl_offset = config.scheduler_lock_offset.or_else(60); - // (scheduler_looper_interval / 100) + (ttl_offset) - 160 //seconds -} - #[instrument(skip_all)] pub async fn start_producer( state: &AppState, @@ -80,7 +62,7 @@ pub async fn run_producer_flow( settings: &SchedulerSettings, ) -> CustomResult<(), errors::ProcessTrackerError> { let tag = "PRODUCER_LOCK"; - let lock_key = producer_lock_key(); + let lock_key = &settings.producer.lock_key; let lock_val = "LOCKED"; let ttl = settings.producer.lock_ttl; @@ -143,5 +125,6 @@ pub async fn fetch_producer_tasks( } new_tasks.append(&mut pending_tasks); + metrics::TASKS_PICKED_COUNT.add(new_tasks.len() as u64, &[]); Ok(new_tasks) } diff --git a/crates/router/src/types/storage/process_tracker.rs b/crates/router/src/types/storage/process_tracker.rs index 03219fdd7b..fa9ac368ba 100644 --- a/crates/router/src/types/storage/process_tracker.rs +++ b/crates/router/src/types/storage/process_tracker.rs @@ -5,7 +5,9 @@ use error_stack::ResultExt; use serde::{Deserialize, Serialize}; use time::PrimitiveDateTime; -use crate::{core::errors, db::Db, schema::process_tracker, types::storage::enums, utils}; +use crate::{ + core::errors, db, scheduler::metrics, schema::process_tracker, types::storage::enums, utils, +}; #[derive( Clone, @@ -72,9 +74,10 @@ impl ProcessTracker { pub async fn retry( self, - db: &dyn Db, + db: &dyn db::Db, schedule_time: PrimitiveDateTime, ) -> Result<(), errors::ProcessTrackerError> { + metrics::TASK_RETRIED.add(1, &[]); db.update_process_tracker( self.clone(), ProcessTrackerUpdate::StatusRetryUpdate { @@ -89,7 +92,7 @@ impl ProcessTracker { pub async fn finish_with_status( self, - db: &dyn Db, + db: &dyn db::Db, status: String, ) -> Result<(), errors::ProcessTrackerError> { db.update_process( @@ -101,6 +104,7 @@ impl ProcessTracker { ) .await .attach_printable("Failed while updating status of the process")?; + metrics::TASK_FINISHED.add(1, &[]); Ok(()) } }