diff --git a/crates/diesel_models/src/process_tracker.rs b/crates/diesel_models/src/process_tracker.rs index 9e649279f2..76e280d6bc 100644 --- a/crates/diesel_models/src/process_tracker.rs +++ b/crates/diesel_models/src/process_tracker.rs @@ -1,8 +1,10 @@ +use common_utils::ext_traits::Encode; use diesel::{AsChangeset, Identifiable, Insertable, Queryable}; +use error_stack::ResultExt; use serde::{Deserialize, Serialize}; use time::PrimitiveDateTime; -use crate::{enums as storage_enums, schema::process_tracker}; +use crate::{enums as storage_enums, errors, schema::process_tracker, StorageResult}; #[derive( Clone, @@ -37,6 +39,13 @@ pub struct ProcessTracker { pub updated_at: PrimitiveDateTime, } +impl ProcessTracker { + #[inline(always)] + pub fn is_valid_business_status(&self, valid_statuses: &[&str]) -> bool { + valid_statuses.iter().any(|&x| x == self.business_status) + } +} + #[derive(Clone, Debug, Insertable, router_derive::DebugAsDisplay)] #[diesel(table_name = process_tracker)] pub struct ProcessTrackerNew { @@ -55,6 +64,42 @@ pub struct ProcessTrackerNew { pub updated_at: PrimitiveDateTime, } +impl ProcessTrackerNew { + pub fn new( + process_tracker_id: impl Into, + task: impl Into, + runner: ProcessTrackerRunner, + tag: impl IntoIterator>, + tracking_data: T, + schedule_time: PrimitiveDateTime, + ) -> StorageResult + where + T: Serialize + std::fmt::Debug, + { + const BUSINESS_STATUS_PENDING: &str = "Pending"; + + let current_time = common_utils::date_time::now(); + Ok(Self { + id: process_tracker_id.into(), + name: Some(task.into()), + tag: tag.into_iter().map(Into::into).collect(), + runner: Some(runner.to_string()), + retry_count: 0, + schedule_time: Some(schedule_time), + rule: String::new(), + tracking_data: tracking_data + .encode_to_value() + .change_context(errors::DatabaseError::Others) + .attach_printable("Failed to serialize process tracker tracking data")?, + business_status: String::from(BUSINESS_STATUS_PENDING), + status: storage_enums::ProcessTrackerStatus::New, + event: vec![], + created_at: current_time, + updated_at: current_time, + }) + } +} + #[derive(Debug)] pub enum ProcessTrackerUpdate { Update { @@ -165,3 +210,39 @@ pub struct ProcessData { cache_name: String, process_tracker: ProcessTracker, } + +#[derive( + serde::Serialize, + serde::Deserialize, + Clone, + Copy, + Debug, + PartialEq, + Eq, + strum::EnumString, + strum::Display, +)] +#[serde(rename_all = "SCREAMING_SNAKE_CASE")] +#[strum(serialize_all = "SCREAMING_SNAKE_CASE")] +pub enum ProcessTrackerRunner { + PaymentsSyncWorkflow, + RefundWorkflowRouter, + DeleteTokenizeDataWorkflow, + ApiKeyExpiryWorkflow, +} + +#[cfg(test)] +mod tests { + #![allow(clippy::unwrap_used)] + use common_utils::ext_traits::StringExt; + + use super::ProcessTrackerRunner; + + #[test] + fn test_enum_to_string() { + let string_format = "PAYMENTS_SYNC_WORKFLOW".to_string(); + let enum_format: ProcessTrackerRunner = + string_format.parse_enum("ProcessTrackerRunner").unwrap(); + assert_eq!(enum_format, ProcessTrackerRunner::PaymentsSyncWorkflow); + } +} diff --git a/crates/redis_interface/src/commands.rs b/crates/redis_interface/src/commands.rs index 5d376a4d7b..70d24f0493 100644 --- a/crates/redis_interface/src/commands.rs +++ b/crates/redis_interface/src/commands.rs @@ -596,7 +596,12 @@ impl super::RedisConnectionPool { None => self.pool.xread_map(count, block, streams, ids).await, } .into_report() - .change_context(errors::RedisError::StreamReadFailed) + .map_err(|err| match err.current_context().kind() { + RedisErrorKind::NotFound | RedisErrorKind::Parse => { + err.change_context(errors::RedisError::StreamEmptyOrNotAvailable) + } + _ => err.change_context(errors::RedisError::StreamReadFailed), + }) } // Consumer Group API diff --git a/crates/router/src/bin/scheduler.rs b/crates/router/src/bin/scheduler.rs index caa69ea139..c2877535da 100644 --- a/crates/router/src/bin/scheduler.rs +++ b/crates/router/src/bin/scheduler.rs @@ -14,7 +14,6 @@ use router::{ }, logger, routes, services::{self, api}, - types::storage::ProcessTrackerExt, workflows, }; use router_env::{instrument, tracing}; @@ -22,9 +21,7 @@ use scheduler::{ consumer::workflows::ProcessTrackerWorkflow, errors::ProcessTrackerError, workflows::ProcessTrackerWorkflows, SchedulerAppState, }; -use serde::{Deserialize, Serialize}; use storage_impl::errors::ApplicationError; -use strum::EnumString; use tokio::sync::{mpsc, oneshot}; const SCHEDULER_FLOW: &str = "SCHEDULER_FLOW"; @@ -209,17 +206,6 @@ pub async fn deep_health_check_func( Ok(response) } -#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, EnumString)] -#[serde(rename_all = "SCREAMING_SNAKE_CASE")] -#[strum(serialize_all = "SCREAMING_SNAKE_CASE")] -pub enum PTRunner { - PaymentsSyncWorkflow, - RefundWorkflowRouter, - DeleteTokenizeDataWorkflow, - #[cfg(feature = "email")] - ApiKeyExpiryWorkflow, -} - #[derive(Debug, Copy, Clone)] pub struct WorkflowRunner; @@ -229,25 +215,51 @@ impl ProcessTrackerWorkflows for WorkflowRunner { &'a self, state: &'a routes::AppState, process: storage::ProcessTracker, - ) -> Result<(), ProcessTrackerError> { - let runner = process.runner.clone().get_required_value("runner")?; - let runner: Option = runner.parse_enum("PTRunner").ok(); - let operation: Box> = match runner { - Some(PTRunner::PaymentsSyncWorkflow) => { - Box::new(workflows::payment_sync::PaymentsSyncWorkflow) + ) -> CustomResult<(), ProcessTrackerError> { + let runner = process + .runner + .clone() + .get_required_value("runner") + .change_context(ProcessTrackerError::MissingRequiredField) + .attach_printable("Missing runner field in process information")?; + let runner: storage::ProcessTrackerRunner = runner + .parse_enum("ProcessTrackerRunner") + .change_context(ProcessTrackerError::UnexpectedFlow) + .attach_printable("Failed to parse workflow runner name")?; + + let get_operation = |runner: storage::ProcessTrackerRunner| -> CustomResult< + Box>, + ProcessTrackerError, + > { + match runner { + storage::ProcessTrackerRunner::PaymentsSyncWorkflow => { + Ok(Box::new(workflows::payment_sync::PaymentsSyncWorkflow)) + } + storage::ProcessTrackerRunner::RefundWorkflowRouter => { + Ok(Box::new(workflows::refund_router::RefundWorkflowRouter)) + } + storage::ProcessTrackerRunner::DeleteTokenizeDataWorkflow => Ok(Box::new( + workflows::tokenized_data::DeleteTokenizeDataWorkflow, + )), + storage::ProcessTrackerRunner::ApiKeyExpiryWorkflow => { + #[cfg(feature = "email")] + { + Ok(Box::new(workflows::api_key_expiry::ApiKeyExpiryWorkflow)) + } + + #[cfg(not(feature = "email"))] + { + Err(error_stack::report!(ProcessTrackerError::UnexpectedFlow)) + .attach_printable( + "Cannot run API key expiry workflow when email feature is disabled", + ) + } + } } - Some(PTRunner::RefundWorkflowRouter) => { - Box::new(workflows::refund_router::RefundWorkflowRouter) - } - Some(PTRunner::DeleteTokenizeDataWorkflow) => { - Box::new(workflows::tokenized_data::DeleteTokenizeDataWorkflow) - } - #[cfg(feature = "email")] - Some(PTRunner::ApiKeyExpiryWorkflow) => { - Box::new(workflows::api_key_expiry::ApiKeyExpiryWorkflow) - } - _ => Err(ProcessTrackerError::UnexpectedFlow)?, }; + + let operation = get_operation(runner)?; + let app_state = &state.clone(); let output = operation.execute_workflow(app_state, process.clone()).await; match output { @@ -259,11 +271,10 @@ impl ProcessTrackerWorkflows for WorkflowRunner { Ok(_) => (), Err(error) => { logger::error!(%error, "Failed while handling error"); - let status = process - .finish_with_status( - state.get_db().as_scheduler(), - "GLOBAL_FAILURE".to_string(), - ) + let status = state + .get_db() + .as_scheduler() + .finish_process_with_business_status(process, "GLOBAL_FAILURE".to_string()) .await; if let Err(err) = status { logger::error!(%err, "Failed while performing database operation: GLOBAL_FAILURE"); @@ -294,18 +305,3 @@ async fn start_scheduler( ) .await } - -#[cfg(test)] -mod workflow_tests { - #![allow(clippy::unwrap_used)] - use common_utils::ext_traits::StringExt; - - use super::PTRunner; - - #[test] - fn test_enum_to_string() { - let string_format = "PAYMENTS_SYNC_WORKFLOW".to_string(); - let enum_format: PTRunner = string_format.parse_enum("PTRunner").unwrap(); - assert_eq!(enum_format, PTRunner::PaymentsSyncWorkflow) - } -} diff --git a/crates/router/src/configs/validations.rs b/crates/router/src/configs/validations.rs index 21ef4037d8..851b7ba757 100644 --- a/crates/router/src/configs/validations.rs +++ b/crates/router/src/configs/validations.rs @@ -156,18 +156,27 @@ impl super::settings::ApiKeys { use common_utils::fp_utils::when; #[cfg(feature = "aws_kms")] - return when(self.kms_encrypted_hash_key.is_default_or_empty(), || { + when(self.kms_encrypted_hash_key.is_default_or_empty(), || { Err(ApplicationError::InvalidConfigurationValueError( "API key hashing key must not be empty when KMS feature is enabled".into(), )) - }); + })?; #[cfg(not(feature = "aws_kms"))] when(self.hash_key.is_empty(), || { Err(ApplicationError::InvalidConfigurationValueError( "API key hashing key must not be empty".into(), )) - }) + })?; + + #[cfg(feature = "email")] + when(self.expiry_reminder_days.is_empty(), || { + Err(ApplicationError::InvalidConfigurationValueError( + "API key expiry reminder days must not be empty".into(), + )) + })?; + + Ok(()) } } diff --git a/crates/router/src/core/api_keys.rs b/crates/router/src/core/api_keys.rs index 39212ab381..b3f8931cde 100644 --- a/crates/router/src/core/api_keys.rs +++ b/crates/router/src/core/api_keys.rs @@ -11,8 +11,6 @@ use masking::ExposeInterface; use masking::{PeekInterface, StrongSecret}; use router_env::{instrument, tracing}; -#[cfg(feature = "email")] -use crate::types::storage::enums; use crate::{ configs::settings, consts, @@ -28,7 +26,8 @@ const API_KEY_EXPIRY_TAG: &str = "API_KEY"; #[cfg(feature = "email")] const API_KEY_EXPIRY_NAME: &str = "API_KEY_EXPIRY"; #[cfg(feature = "email")] -const API_KEY_EXPIRY_RUNNER: &str = "API_KEY_EXPIRY_WORKFLOW"; +const API_KEY_EXPIRY_RUNNER: diesel_models::ProcessTrackerRunner = + diesel_models::ProcessTrackerRunner::ApiKeyExpiryWorkflow; #[cfg(feature = "aws_kms")] use external_services::aws_kms::decrypt::AwsKmsDecrypt; @@ -245,15 +244,16 @@ pub async fn add_api_key_expiry_task( api_key.expires_at.map(|expires_at| { expires_at.saturating_sub(time::Duration::days(i64::from(*expiry_reminder_day))) }) - }); + }) + .ok_or(errors::ApiErrorResponse::InternalServerError) + .into_report() + .attach_printable("Failed to obtain initial process tracker schedule time")?; - if let Some(schedule_time) = schedule_time { - if schedule_time <= current_time { - return Ok(()); - } + if schedule_time <= current_time { + return Ok(()); } - let api_key_expiry_tracker = &storage::ApiKeyExpiryTrackingData { + let api_key_expiry_tracker = storage::ApiKeyExpiryTrackingData { key_id: api_key.key_id.clone(), merchant_id: api_key.merchant_id.clone(), // We need API key expiry too, because we need to decide on the schedule_time in @@ -261,30 +261,18 @@ pub async fn add_api_key_expiry_task( api_key_expiry: api_key.expires_at, expiry_reminder_days: expiry_reminder_days.clone(), }; - let api_key_expiry_workflow_model = serde_json::to_value(api_key_expiry_tracker) - .into_report() - .change_context(errors::ApiErrorResponse::InternalServerError) - .attach_printable_lazy(|| { - format!("unable to serialize API key expiry tracker: {api_key_expiry_tracker:?}") - })?; - let process_tracker_entry = storage::ProcessTrackerNew { - id: generate_task_id_for_api_key_expiry_workflow(api_key.key_id.as_str()), - name: Some(String::from(API_KEY_EXPIRY_NAME)), - tag: vec![String::from(API_KEY_EXPIRY_TAG)], - runner: Some(String::from(API_KEY_EXPIRY_RUNNER)), - // Retry count specifies, number of times the current process (email) has been retried. - // It also acts as an index of expiry_reminder_days vector - retry_count: 0, + let process_tracker_id = generate_task_id_for_api_key_expiry_workflow(api_key.key_id.as_str()); + let process_tracker_entry = storage::ProcessTrackerNew::new( + process_tracker_id, + API_KEY_EXPIRY_NAME, + API_KEY_EXPIRY_RUNNER, + [API_KEY_EXPIRY_TAG], + api_key_expiry_tracker, schedule_time, - rule: String::new(), - tracking_data: api_key_expiry_workflow_model, - business_status: String::from("Pending"), - status: enums::ProcessTrackerStatus::New, - event: vec![], - created_at: current_time, - updated_at: current_time, - }; + ) + .change_context(errors::ApiErrorResponse::InternalServerError) + .attach_printable("Failed to construct API key expiry process tracker task")?; store .insert_process(process_tracker_entry) @@ -293,7 +281,7 @@ pub async fn add_api_key_expiry_task( .attach_printable_lazy(|| { format!( "Failed while inserting API key expiry reminder to process_tracker: api_key_id: {}", - api_key_expiry_tracker.key_id + api_key.key_id ) })?; metrics::TASKS_ADDED_COUNT.add( diff --git a/crates/router/src/core/payment_methods/vault.rs b/crates/router/src/core/payment_methods/vault.rs index b547027a1c..622fed0738 100644 --- a/crates/router/src/core/payment_methods/vault.rs +++ b/crates/router/src/core/payment_methods/vault.rs @@ -17,7 +17,7 @@ use crate::{ routes::metrics, types::{ api, domain, - storage::{self, enums, ProcessTrackerExt}, + storage::{self, enums}, }, utils::StringExt, }; @@ -998,33 +998,31 @@ pub async fn add_delete_tokenized_data_task( lookup_key: &str, pm: enums::PaymentMethod, ) -> RouterResult<()> { - let runner = "DELETE_TOKENIZE_DATA_WORKFLOW"; - let current_time = common_utils::date_time::now(); - let tracking_data = serde_json::to_value(storage::TokenizeCoreWorkflow { + let runner = storage::ProcessTrackerRunner::DeleteTokenizeDataWorkflow; + let process_tracker_id = format!("{runner}_{lookup_key}"); + let task = runner.to_string(); + let tag = ["BASILISK-V3"]; + let tracking_data = storage::TokenizeCoreWorkflow { lookup_key: lookup_key.to_owned(), pm, - }) - .into_report() - .change_context(errors::ApiErrorResponse::InternalServerError) - .attach_printable_lazy(|| format!("unable to convert into value {lookup_key:?}"))?; - - let schedule_time = get_delete_tokenize_schedule_time(db, &pm, 0).await; - - let process_tracker_entry = storage::ProcessTrackerNew { - id: format!("{runner}_{lookup_key}"), - name: Some(String::from(runner)), - tag: vec![String::from("BASILISK-V3")], - runner: Some(String::from(runner)), - retry_count: 0, - schedule_time, - rule: String::new(), - tracking_data, - business_status: String::from("Pending"), - status: enums::ProcessTrackerStatus::New, - event: vec![], - created_at: current_time, - updated_at: current_time, }; + let schedule_time = get_delete_tokenize_schedule_time(db, &pm, 0) + .await + .ok_or(errors::ApiErrorResponse::InternalServerError) + .into_report() + .attach_printable("Failed to obtain initial process tracker schedule time")?; + + let process_tracker_entry = storage::ProcessTrackerNew::new( + process_tracker_id, + &task, + runner, + tag, + tracking_data, + schedule_time, + ) + .change_context(errors::ApiErrorResponse::InternalServerError) + .attach_printable("Failed to construct delete tokenized data process tracker task")?; + let response = db.insert_process(process_tracker_entry).await; response.map(|_| ()).or_else(|err| { if err.current_context().is_db_unique_violation() { @@ -1056,10 +1054,11 @@ pub async fn start_tokenize_data_workflow( Ok(()) => { logger::info!("Card From locker deleted Successfully"); //mark task as finished - let id = tokenize_tracker.id.clone(); - tokenize_tracker - .clone() - .finish_with_status(db.as_scheduler(), format!("COMPLETED_BY_PT_{id}")) + db.as_scheduler() + .finish_process_with_business_status( + tokenize_tracker.clone(), + "COMPLETED_BY_PT".to_string(), + ) .await?; } Err(err) => { @@ -1104,7 +1103,11 @@ pub async fn retry_delete_tokenize( match schedule_time { Some(s_time) => { - let retry_schedule = pt.retry(db.as_scheduler(), s_time).await; + let retry_schedule = db + .as_scheduler() + .retry_process(pt, s_time) + .await + .map_err(Into::into); metrics::TASKS_RESET_COUNT.add( &metrics::CONTEXT, 1, @@ -1115,10 +1118,11 @@ pub async fn retry_delete_tokenize( ); retry_schedule } - None => { - pt.finish_with_status(db.as_scheduler(), "RETRIES_EXCEEDED".to_string()) - .await - } + None => db + .as_scheduler() + .finish_process_with_business_status(pt, "RETRIES_EXCEEDED".to_string()) + .await + .map_err(Into::into), } } diff --git a/crates/router/src/core/payments.rs b/crates/router/src/core/payments.rs index 7348a2b8f0..050b0dd4b7 100644 --- a/crates/router/src/core/payments.rs +++ b/crates/router/src/core/payments.rs @@ -25,7 +25,7 @@ use redis_interface::errors::RedisError; use router_env::{instrument, tracing}; #[cfg(feature = "olap")] use router_types::transformers::ForeignFrom; -use scheduler::{db::process_tracker::ProcessTrackerExt, errors as sch_errors, utils as pt_utils}; +use scheduler::utils as pt_utils; use time; pub use self::operations::{ @@ -2356,28 +2356,32 @@ pub async fn add_process_sync_task( db: &dyn StorageInterface, payment_attempt: &storage::PaymentAttempt, schedule_time: time::PrimitiveDateTime, -) -> Result<(), sch_errors::ProcessTrackerError> { +) -> CustomResult<(), errors::StorageError> { let tracking_data = api::PaymentsRetrieveRequest { force_sync: true, merchant_id: Some(payment_attempt.merchant_id.clone()), resource_id: api::PaymentIdType::PaymentAttemptId(payment_attempt.attempt_id.clone()), ..Default::default() }; - let runner = "PAYMENTS_SYNC_WORKFLOW"; + let runner = storage::ProcessTrackerRunner::PaymentsSyncWorkflow; let task = "PAYMENTS_SYNC"; + let tag = ["SYNC", "PAYMENT"]; let process_tracker_id = pt_utils::get_process_tracker_id( runner, task, &payment_attempt.attempt_id, &payment_attempt.merchant_id, ); - let process_tracker_entry = ::make_process_tracker_new( + let process_tracker_entry = storage::ProcessTrackerNew::new( process_tracker_id, task, runner, + tag, tracking_data, schedule_time, - )?; + ) + .map_err(errors::StorageError::from) + .into_report()?; db.insert_process(process_tracker_entry).await?; Ok(()) @@ -2388,7 +2392,7 @@ pub async fn reset_process_sync_task( payment_attempt: &storage::PaymentAttempt, schedule_time: time::PrimitiveDateTime, ) -> Result<(), errors::ProcessTrackerError> { - let runner = "PAYMENTS_SYNC_WORKFLOW"; + let runner = storage::ProcessTrackerRunner::PaymentsSyncWorkflow; let task = "PAYMENTS_SYNC"; let process_tracker_id = pt_utils::get_process_tracker_id( runner, @@ -2400,8 +2404,8 @@ pub async fn reset_process_sync_task( .find_process_by_id(&process_tracker_id) .await? .ok_or(errors::ProcessTrackerError::ProcessFetchingFailed)?; - psync_process - .reset(db.as_scheduler(), schedule_time) + db.as_scheduler() + .reset_process(psync_process, schedule_time) .await?; Ok(()) } diff --git a/crates/router/src/core/payments/helpers.rs b/crates/router/src/core/payments/helpers.rs index 764752ba90..b3330f82f7 100644 --- a/crates/router/src/core/payments/helpers.rs +++ b/crates/router/src/core/payments/helpers.rs @@ -1032,7 +1032,6 @@ where ); super::add_process_sync_task(&*state.store, payment_attempt, stime) .await - .into_report() .change_context(errors::ApiErrorResponse::InternalServerError) .attach_printable("Failed while adding task to process tracker") } else { diff --git a/crates/router/src/core/refunds.rs b/crates/router/src/core/refunds.rs index 4b1c33296e..06030262fb 100644 --- a/crates/router/src/core/refunds.rs +++ b/crates/router/src/core/refunds.rs @@ -19,7 +19,7 @@ use crate::{ self, api::{self, refunds}, domain, - storage::{self, enums, ProcessTrackerExt}, + storage::{self, enums}, transformers::{ForeignFrom, ForeignInto}, }, utils::{self, OptionExt}, @@ -798,9 +798,9 @@ pub async fn schedule_refund_execution( ) -> RouterResult { // refunds::RefundResponse> { let db = &*state.store; - let runner = "REFUND_WORKFLOW_ROUTER"; + let runner = storage::ProcessTrackerRunner::RefundWorkflowRouter; let task = "EXECUTE_REFUND"; - let task_id = format!("{}_{}_{}", runner, task, refund.internal_reference_id); + let task_id = format!("{runner}_{task}_{}", refund.internal_reference_id); let refund_process = db .find_process_by_id(&task_id) @@ -909,10 +909,13 @@ pub async fn sync_refund_with_gateway_workflow( ]; match response.refund_status { status if terminal_status.contains(&status) => { - let id = refund_tracker.id.clone(); - refund_tracker - .clone() - .finish_with_status(state.store.as_scheduler(), format!("COMPLETED_BY_PT_{id}")) + state + .store + .as_scheduler() + .finish_process_with_business_status( + refund_tracker.clone(), + "COMPLETED_BY_PT".to_string(), + ) .await? } _ => { @@ -1020,18 +1023,29 @@ pub async fn trigger_refund_execute_workflow( None, ) .await?; - add_refund_sync_task(db, &updated_refund, "REFUND_WORKFLOW_ROUTER").await?; + add_refund_sync_task( + db, + &updated_refund, + storage::ProcessTrackerRunner::RefundWorkflowRouter, + ) + .await?; } (true, enums::RefundStatus::Pending) => { // create sync task - add_refund_sync_task(db, &refund, "REFUND_WORKFLOW_ROUTER").await?; + add_refund_sync_task( + db, + &refund, + storage::ProcessTrackerRunner::RefundWorkflowRouter, + ) + .await?; } (_, _) => { //mark task as finished - let id = refund_tracker.id.clone(); - refund_tracker - .clone() - .finish_with_status(db.as_scheduler(), format!("COMPLETED_BY_PT_{id}")) + db.as_scheduler() + .finish_process_with_business_status( + refund_tracker.clone(), + "COMPLETED_BY_PT".to_string(), + ) .await?; } }; @@ -1054,29 +1068,23 @@ pub fn refund_to_refund_core_workflow_model( pub async fn add_refund_sync_task( db: &dyn db::StorageInterface, refund: &storage::Refund, - runner: &str, + runner: storage::ProcessTrackerRunner, ) -> RouterResult { - let current_time = common_utils::date_time::now(); - let refund_workflow_model = serde_json::to_value(refund_to_refund_core_workflow_model(refund)) - .into_report() - .change_context(errors::ApiErrorResponse::InternalServerError) - .attach_printable_lazy(|| format!("unable to convert into value {:?}", &refund))?; let task = "SYNC_REFUND"; - let process_tracker_entry = storage::ProcessTrackerNew { - id: format!("{}_{}_{}", runner, task, refund.internal_reference_id), - name: Some(String::from(task)), - tag: vec![String::from("REFUND")], - runner: Some(String::from(runner)), - retry_count: 0, - schedule_time: Some(common_utils::date_time::now()), - rule: String::new(), - tracking_data: refund_workflow_model, - business_status: String::from("Pending"), - status: enums::ProcessTrackerStatus::New, - event: vec![], - created_at: current_time, - updated_at: current_time, - }; + let process_tracker_id = format!("{runner}_{task}_{}", refund.internal_reference_id); + let schedule_time = common_utils::date_time::now(); + let refund_workflow_tracking_data = refund_to_refund_core_workflow_model(refund); + let tag = ["REFUND"]; + let process_tracker_entry = storage::ProcessTrackerNew::new( + process_tracker_id, + task, + runner, + tag, + refund_workflow_tracking_data, + schedule_time, + ) + .change_context(errors::ApiErrorResponse::InternalServerError) + .attach_printable("Failed to construct refund sync process tracker task")?; let response = db .insert_process(process_tracker_entry) @@ -1101,29 +1109,23 @@ pub async fn add_refund_sync_task( pub async fn add_refund_execute_task( db: &dyn db::StorageInterface, refund: &storage::Refund, - runner: &str, + runner: storage::ProcessTrackerRunner, ) -> RouterResult { let task = "EXECUTE_REFUND"; - let current_time = common_utils::date_time::now(); - let refund_workflow_model = serde_json::to_value(refund_to_refund_core_workflow_model(refund)) - .into_report() - .change_context(errors::ApiErrorResponse::InternalServerError) - .attach_printable_lazy(|| format!("unable to convert into value {:?}", &refund))?; - let process_tracker_entry = storage::ProcessTrackerNew { - id: format!("{}_{}_{}", runner, task, refund.internal_reference_id), - name: Some(String::from(task)), - tag: vec![String::from("REFUND")], - runner: Some(String::from(runner)), - retry_count: 0, - schedule_time: Some(common_utils::date_time::now()), - rule: String::new(), - tracking_data: refund_workflow_model, - business_status: String::from("Pending"), - status: enums::ProcessTrackerStatus::New, - event: vec![], - created_at: current_time, - updated_at: current_time, - }; + let process_tracker_id = format!("{runner}_{task}_{}", refund.internal_reference_id); + let tag = ["REFUND"]; + let schedule_time = common_utils::date_time::now(); + let refund_workflow_tracking_data = refund_to_refund_core_workflow_model(refund); + let process_tracker_entry = storage::ProcessTrackerNew::new( + process_tracker_id, + task, + runner, + tag, + refund_workflow_tracking_data, + schedule_time, + ) + .change_context(errors::ApiErrorResponse::InternalServerError) + .attach_printable("Failed to construct refund execute process tracker task")?; let response = db .insert_process(process_tracker_entry) @@ -1177,7 +1179,11 @@ pub async fn retry_refund_sync_task( match schedule_time { Some(s_time) => { - let retry_schedule = pt.retry(db.as_scheduler(), s_time).await; + let retry_schedule = db + .as_scheduler() + .retry_process(pt, s_time) + .await + .map_err(Into::into); metrics::TASKS_RESET_COUNT.add( &metrics::CONTEXT, 1, @@ -1185,9 +1191,10 @@ pub async fn retry_refund_sync_task( ); retry_schedule } - None => { - pt.finish_with_status(db.as_scheduler(), "RETRIES_EXCEEDED".to_string()) - .await - } + None => db + .as_scheduler() + .finish_process_with_business_status(pt, "RETRIES_EXCEEDED".to_string()) + .await + .map_err(Into::into), } } diff --git a/crates/router/src/db/kafka_store.rs b/crates/router/src/db/kafka_store.rs index 213417b131..78b95544ca 100644 --- a/crates/router/src/db/kafka_store.rs +++ b/crates/router/src/db/kafka_store.rs @@ -1396,15 +1396,6 @@ impl ProcessTrackerInterface for KafkaStore { .process_tracker_update_process_status_by_ids(task_ids, task_update) .await } - async fn update_process_tracker( - &self, - this: storage::ProcessTracker, - process: storage::ProcessTrackerUpdate, - ) -> CustomResult { - self.diesel_store - .update_process_tracker(this, process) - .await - } async fn insert_process( &self, @@ -1413,6 +1404,32 @@ impl ProcessTrackerInterface for KafkaStore { self.diesel_store.insert_process(new).await } + async fn reset_process( + &self, + this: storage::ProcessTracker, + schedule_time: PrimitiveDateTime, + ) -> CustomResult<(), errors::StorageError> { + self.diesel_store.reset_process(this, schedule_time).await + } + + async fn retry_process( + &self, + this: storage::ProcessTracker, + schedule_time: PrimitiveDateTime, + ) -> CustomResult<(), errors::StorageError> { + self.diesel_store.retry_process(this, schedule_time).await + } + + async fn finish_process_with_business_status( + &self, + this: storage::ProcessTracker, + business_status: String, + ) -> CustomResult<(), errors::StorageError> { + self.diesel_store + .finish_process_with_business_status(this, business_status) + .await + } + async fn find_processes_by_time_status( &self, time_lower_limit: PrimitiveDateTime, diff --git a/crates/router/src/types/storage.rs b/crates/router/src/types/storage.rs index 01decc89c4..27fd6db8e7 100644 --- a/crates/router/src/types/storage.rs +++ b/crates/router/src/types/storage.rs @@ -43,7 +43,9 @@ pub use data_models::payments::{ payment_intent::{PaymentIntentNew, PaymentIntentUpdate}, PaymentIntent, }; -pub use diesel_models::{ProcessTracker, ProcessTrackerNew, ProcessTrackerUpdate}; +pub use diesel_models::{ + ProcessTracker, ProcessTrackerNew, ProcessTrackerRunner, ProcessTrackerUpdate, +}; pub use scheduler::db::process_tracker; pub use self::{ diff --git a/crates/router/src/workflows/api_key_expiry.rs b/crates/router/src/workflows/api_key_expiry.rs index b9830c4ebc..e9e1f32370 100644 --- a/crates/router/src/workflows/api_key_expiry.rs +++ b/crates/router/src/workflows/api_key_expiry.rs @@ -8,11 +8,7 @@ use crate::{ logger::error, routes::{metrics, AppState}, services::email::types::ApiKeyExpiryReminder, - types::{ - api, - domain::UserEmail, - storage::{self, ProcessTrackerExt}, - }, + types::{api, domain::UserEmail, storage}, utils::OptionExt, }; @@ -90,8 +86,10 @@ impl ProcessTrackerWorkflow for ApiKeyExpiryWorkflow { == i32::try_from(tracking_data.expiry_reminder_days.len() - 1) .map_err(|_| errors::ProcessTrackerError::TypeConversionError)? { - process - .finish_with_status(state.get_db().as_scheduler(), "COMPLETED_BY_PT".to_string()) + state + .get_db() + .as_scheduler() + .finish_process_with_business_status(process, "COMPLETED_BY_PT".to_string()) .await? } // If tasks are remaining that has to be scheduled diff --git a/crates/router/src/workflows/payment_sync.rs b/crates/router/src/workflows/payment_sync.rs index b2296e17f7..92057b0889 100644 --- a/crates/router/src/workflows/payment_sync.rs +++ b/crates/router/src/workflows/payment_sync.rs @@ -3,7 +3,6 @@ use error_stack::ResultExt; use router_env::logger; use scheduler::{ consumer::{self, types::process_data, workflows::ProcessTrackerWorkflow}, - db::process_tracker::ProcessTrackerExt, errors as sch_errors, utils as scheduler_utils, SchedulerAppState, }; @@ -91,12 +90,10 @@ impl ProcessTrackerWorkflow for PaymentsSyncWorkflow { ]; match &payment_data.payment_attempt.status { status if terminal_status.contains(status) => { - let id = process.id.clone(); - process - .finish_with_status( - state.get_db().as_scheduler(), - format!("COMPLETED_BY_PT_{id}"), - ) + state + .get_db() + .as_scheduler() + .finish_process_with_business_status(process, "COMPLETED_BY_PT".to_string()) .await? } _ => { @@ -274,11 +271,12 @@ pub async fn retry_sync_task( match schedule_time { Some(s_time) => { - pt.retry(db.as_scheduler(), s_time).await?; + db.as_scheduler().retry_process(pt, s_time).await?; Ok(false) } None => { - pt.finish_with_status(db.as_scheduler(), "RETRIES_EXCEEDED".to_string()) + db.as_scheduler() + .finish_process_with_business_status(pt, "RETRIES_EXCEEDED".to_string()) .await?; Ok(true) } diff --git a/crates/scheduler/src/consumer.rs b/crates/scheduler/src/consumer.rs index ccc943afba..e069db28da 100644 --- a/crates/scheduler/src/consumer.rs +++ b/crates/scheduler/src/consumer.rs @@ -18,9 +18,8 @@ use uuid::Uuid; use super::env::logger; pub use super::workflows::ProcessTrackerWorkflow; use crate::{ - configs::settings::SchedulerSettings, - db::process_tracker::{ProcessTrackerExt, ProcessTrackerInterface}, - errors, metrics, utils as pt_utils, SchedulerAppState, SchedulerInterface, + configs::settings::SchedulerSettings, db::process_tracker::ProcessTrackerInterface, errors, + metrics, utils as pt_utils, SchedulerAppState, SchedulerInterface, }; // Valid consumer business statuses @@ -59,7 +58,7 @@ pub async fn start_consumer( let consumer_operation_counter = sync::Arc::new(atomic::AtomicU64::new(0)); let signal = get_allowed_signals() .map_err(|error| { - logger::error!("Signal Handler Error: {:?}", error); + logger::error!(?error, "Signal Handler Error"); errors::ProcessTrackerError::ConfigurationError }) .into_report() @@ -80,8 +79,8 @@ pub async fn start_consumer( pt_utils::consumer_operation_handler( state.clone(), settings.clone(), - |err| { - logger::error!(%err); + |error| { + logger::error!(?error, "Failed to perform consumer operation"); }, sync::Arc::clone(&consumer_operation_counter), workflow_selector, @@ -94,7 +93,7 @@ pub async fn start_consumer( loop { shutdown_interval.tick().await; let active_tasks = consumer_operation_counter.load(atomic::Ordering::Acquire); - logger::error!("{}", active_tasks); + logger::info!("Active tasks: {active_tasks}"); match active_tasks { 0 => { logger::info!("Terminating consumer"); @@ -130,7 +129,7 @@ pub async fn consumer_operations( .consumer_group_create(&stream_name, &group_name, &RedisEntryId::AfterLastID) .await; if group_created.is_err() { - logger::info!("Consumer group already exists"); + logger::info!("Consumer group {group_name} already exists"); } let mut tasks = state @@ -148,7 +147,7 @@ pub async fn consumer_operations( pt_utils::add_histogram_metrics(&pickup_time, task, &stream_name); metrics::TASK_CONSUMED.add(&metrics::CONTEXT, 1, &[]); - // let runner = workflow_selector(task)?.ok_or(errors::ProcessTrackerError::UnexpectedFlow)?; + handler.push(tokio::task::spawn(start_workflow( state.clone(), task.clone(), @@ -171,6 +170,11 @@ pub async fn fetch_consumer_tasks( ) -> CustomResult, errors::ProcessTrackerError> { let batches = pt_utils::get_batches(redis_conn, stream_name, group_name, consumer_name).await?; + // Returning early to avoid execution of database queries when `batches` is empty + if batches.is_empty() { + return Ok(Vec::new()); + } + let mut tasks = batches.into_iter().fold(Vec::new(), |mut acc, batch| { acc.extend_from_slice( batch @@ -209,15 +213,20 @@ pub async fn start_workflow( process: storage::ProcessTracker, _pickup_time: PrimitiveDateTime, workflow_selector: impl workflows::ProcessTrackerWorkflows + 'static + std::fmt::Debug, -) -> Result<(), errors::ProcessTrackerError> +) -> CustomResult<(), errors::ProcessTrackerError> where T: SchedulerAppState, { tracing::Span::current().record("workflow_id", Uuid::new_v4().to_string()); - logger::info!("{:?}", process.name.as_ref()); + logger::info!(pt.name=?process.name, pt.id=%process.id); + let res = workflow_selector .trigger_workflow(&state.clone(), process.clone()) - .await; + .await + .map_err(|error| { + logger::error!(?error, "Failed to trigger workflow"); + error + }); metrics::TASK_PROCESSED.add(&metrics::CONTEXT, 1, &[]); res } @@ -228,7 +237,7 @@ pub async fn consumer_error_handler( process: storage::ProcessTracker, error: errors::ProcessTrackerError, ) -> CustomResult<(), errors::ProcessTrackerError> { - logger::error!(pt.name = ?process.name, pt.id = %process.id, ?error, "ERROR: Failed while executing workflow"); + logger::error!(pt.name=?process.name, pt.id=%process.id, ?error, "Failed to execute workflow"); state .process_tracker_update_process_status_by_ids( diff --git a/crates/scheduler/src/consumer/workflows.rs b/crates/scheduler/src/consumer/workflows.rs index 3b897347be..3e8a4c8e50 100644 --- a/crates/scheduler/src/consumer/workflows.rs +++ b/crates/scheduler/src/consumer/workflows.rs @@ -1,8 +1,9 @@ use async_trait::async_trait; use common_utils::errors::CustomResult; pub use diesel_models::process_tracker as storage; +use router_env::logger; -use crate::{db::process_tracker::ProcessTrackerExt, errors, SchedulerAppState}; +use crate::{errors, SchedulerAppState}; pub type WorkflowSelectorFn = fn(&storage::ProcessTracker) -> Result<(), errors::ProcessTrackerError>; @@ -14,15 +15,16 @@ pub trait ProcessTrackerWorkflows: Send + Sync { &'a self, _state: &'a T, _process: storage::ProcessTracker, - ) -> Result<(), errors::ProcessTrackerError> { + ) -> CustomResult<(), errors::ProcessTrackerError> { Err(errors::ProcessTrackerError::NotImplemented)? } + async fn execute_workflow<'a>( &'a self, operation: Box>, state: &'a T, process: storage::ProcessTracker, - ) -> Result<(), errors::ProcessTrackerError> + ) -> CustomResult<(), errors::ProcessTrackerError> where T: SchedulerAppState, { @@ -35,16 +37,18 @@ pub trait ProcessTrackerWorkflows: Send + Sync { .await { Ok(_) => (), - Err(_error) => { - // logger::error!(%error, "Failed while handling error"); - let status = process - .finish_with_status( - state.get_db().as_scheduler(), - "GLOBAL_FAILURE".to_string(), - ) + Err(error) => { + logger::error!( + ?error, + "Failed to handle process tracker workflow execution error" + ); + let status = app_state + .get_db() + .as_scheduler() + .finish_process_with_business_status(process, "GLOBAL_FAILURE".to_string()) .await; - if let Err(_err) = status { - // logger::error!(%err, "Failed while performing database operation: GLOBAL_FAILURE"); + if let Err(error) = status { + logger::error!(?error, "Failed to update process business status"); } } }, @@ -55,7 +59,7 @@ pub trait ProcessTrackerWorkflows: Send + Sync { #[async_trait] pub trait ProcessTrackerWorkflow: Send + Sync { - // The core execution of the workflow + /// The core execution of the workflow async fn execute_workflow<'a>( &'a self, _state: &'a T, @@ -63,9 +67,11 @@ pub trait ProcessTrackerWorkflow: Send + Sync { ) -> Result<(), errors::ProcessTrackerError> { Err(errors::ProcessTrackerError::NotImplemented)? } - // Callback function after successful execution of the `execute_workflow` + + /// Callback function after successful execution of the `execute_workflow` async fn success_handler<'a>(&'a self, _state: &'a T, _process: storage::ProcessTracker) {} - // Callback function after error received from `execute_workflow` + + /// Callback function after error received from `execute_workflow` async fn error_handler<'a>( &'a self, _state: &'a T, @@ -75,18 +81,3 @@ pub trait ProcessTrackerWorkflow: Send + Sync { Err(errors::ProcessTrackerError::NotImplemented)? } } - -// #[cfg(test)] -// mod workflow_tests { -// #![allow(clippy::unwrap_used)] -// use common_utils::ext_traits::StringExt; - -// use super::PTRunner; - -// #[test] -// fn test_enum_to_string() { -// let string_format = "PAYMENTS_SYNC_WORKFLOW".to_string(); -// let enum_format: PTRunner = string_format.parse_enum("PTRunner").unwrap(); -// assert_eq!(enum_format, PTRunner::PaymentsSyncWorkflow) -// } -// } diff --git a/crates/scheduler/src/db/process_tracker.rs b/crates/scheduler/src/db/process_tracker.rs index 728c146a64..ac20d4b293 100644 --- a/crates/scheduler/src/db/process_tracker.rs +++ b/crates/scheduler/src/db/process_tracker.rs @@ -2,11 +2,10 @@ use common_utils::errors::CustomResult; pub use diesel_models as storage; use diesel_models::enums as storage_enums; use error_stack::{IntoReport, ResultExt}; -use serde::Serialize; use storage_impl::{connection, errors, mock_db::MockDb}; use time::PrimitiveDateTime; -use crate::{errors as sch_errors, metrics, scheduler::Store, SchedulerInterface}; +use crate::{metrics, scheduler::Store}; #[async_trait::async_trait] pub trait ProcessTrackerInterface: Send + Sync + 'static { @@ -32,17 +31,30 @@ pub trait ProcessTrackerInterface: Send + Sync + 'static { task_ids: Vec, task_update: storage::ProcessTrackerUpdate, ) -> CustomResult; - async fn update_process_tracker( - &self, - this: storage::ProcessTracker, - process: storage::ProcessTrackerUpdate, - ) -> CustomResult; async fn insert_process( &self, new: storage::ProcessTrackerNew, ) -> CustomResult; + async fn reset_process( + &self, + this: storage::ProcessTracker, + schedule_time: PrimitiveDateTime, + ) -> CustomResult<(), errors::StorageError>; + + async fn retry_process( + &self, + this: storage::ProcessTracker, + schedule_time: PrimitiveDateTime, + ) -> CustomResult<(), errors::StorageError>; + + async fn finish_process_with_business_status( + &self, + this: storage::ProcessTracker, + business_status: String, + ) -> CustomResult<(), errors::StorageError>; + async fn find_processes_by_time_status( &self, time_lower_limit: PrimitiveDateTime, @@ -120,16 +132,58 @@ impl ProcessTrackerInterface for Store { .into_report() } - async fn update_process_tracker( + async fn reset_process( &self, this: storage::ProcessTracker, - process: storage::ProcessTrackerUpdate, - ) -> CustomResult { - let conn = connection::pg_connection_write(self).await?; - this.update(&conn, process) - .await - .map_err(Into::into) - .into_report() + schedule_time: PrimitiveDateTime, + ) -> CustomResult<(), errors::StorageError> { + self.update_process( + this, + storage::ProcessTrackerUpdate::StatusRetryUpdate { + status: storage_enums::ProcessTrackerStatus::New, + retry_count: 0, + schedule_time, + }, + ) + .await?; + Ok(()) + } + + async fn retry_process( + &self, + this: storage::ProcessTracker, + schedule_time: PrimitiveDateTime, + ) -> CustomResult<(), errors::StorageError> { + metrics::TASK_RETRIED.add(&metrics::CONTEXT, 1, &[]); + let retry_count = this.retry_count + 1; + self.update_process( + this, + storage::ProcessTrackerUpdate::StatusRetryUpdate { + status: storage_enums::ProcessTrackerStatus::Pending, + retry_count, + schedule_time, + }, + ) + .await?; + Ok(()) + } + + async fn finish_process_with_business_status( + &self, + this: storage::ProcessTracker, + business_status: String, + ) -> CustomResult<(), errors::StorageError> { + self.update_process( + this, + storage::ProcessTrackerUpdate::StatusUpdate { + status: storage_enums::ProcessTrackerStatus::Finish, + business_status: Some(business_status), + }, + ) + .await + .attach_printable("Failed to update business status of process")?; + metrics::TASK_FINISHED.add(&metrics::CONTEXT, 1, &[]); + Ok(()) } async fn process_tracker_update_process_status_by_ids( @@ -215,11 +269,29 @@ impl ProcessTrackerInterface for MockDb { Err(errors::StorageError::MockDbError)? } - async fn update_process_tracker( + async fn reset_process( &self, _this: storage::ProcessTracker, - _process: storage::ProcessTrackerUpdate, - ) -> CustomResult { + _schedule_time: PrimitiveDateTime, + ) -> CustomResult<(), errors::StorageError> { + // [#172]: Implement function for `MockDb` + Err(errors::StorageError::MockDbError)? + } + + async fn retry_process( + &self, + _this: storage::ProcessTracker, + _schedule_time: PrimitiveDateTime, + ) -> CustomResult<(), errors::StorageError> { + // [#172]: Implement function for `MockDb` + Err(errors::StorageError::MockDbError)? + } + + async fn finish_process_with_business_status( + &self, + _this: storage::ProcessTracker, + _business_status: String, + ) -> CustomResult<(), errors::StorageError> { // [#172]: Implement function for `MockDb` Err(errors::StorageError::MockDbError)? } @@ -233,125 +305,3 @@ impl ProcessTrackerInterface for MockDb { Err(errors::StorageError::MockDbError)? } } - -#[async_trait::async_trait] -pub trait ProcessTrackerExt { - fn is_valid_business_status(&self, valid_statuses: &[&str]) -> bool; - - fn make_process_tracker_new<'a, T>( - process_tracker_id: String, - task: &'a str, - runner: &'a str, - tracking_data: T, - schedule_time: PrimitiveDateTime, - ) -> Result - where - T: Serialize; - - async fn reset( - self, - db: &dyn SchedulerInterface, - schedule_time: PrimitiveDateTime, - ) -> Result<(), sch_errors::ProcessTrackerError>; - - async fn retry( - self, - db: &dyn SchedulerInterface, - schedule_time: PrimitiveDateTime, - ) -> Result<(), sch_errors::ProcessTrackerError>; - - async fn finish_with_status( - self, - db: &dyn SchedulerInterface, - status: String, - ) -> Result<(), sch_errors::ProcessTrackerError>; -} - -#[async_trait::async_trait] -impl ProcessTrackerExt for storage::ProcessTracker { - fn is_valid_business_status(&self, valid_statuses: &[&str]) -> bool { - valid_statuses.iter().any(|x| x == &self.business_status) - } - - fn make_process_tracker_new<'a, T>( - process_tracker_id: String, - task: &'a str, - runner: &'a str, - tracking_data: T, - schedule_time: PrimitiveDateTime, - ) -> Result - where - T: Serialize, - { - let current_time = common_utils::date_time::now(); - Ok(storage::ProcessTrackerNew { - id: process_tracker_id, - name: Some(String::from(task)), - tag: vec![String::from("SYNC"), String::from("PAYMENT")], - runner: Some(String::from(runner)), - retry_count: 0, - schedule_time: Some(schedule_time), - rule: String::new(), - tracking_data: serde_json::to_value(tracking_data) - .map_err(|_| sch_errors::ProcessTrackerError::SerializationFailed)?, - business_status: String::from("Pending"), - status: storage_enums::ProcessTrackerStatus::New, - event: vec![], - created_at: current_time, - updated_at: current_time, - }) - } - - async fn reset( - self, - db: &dyn SchedulerInterface, - schedule_time: PrimitiveDateTime, - ) -> Result<(), sch_errors::ProcessTrackerError> { - db.update_process_tracker( - self.clone(), - storage::ProcessTrackerUpdate::StatusRetryUpdate { - status: storage_enums::ProcessTrackerStatus::New, - retry_count: 0, - schedule_time, - }, - ) - .await?; - Ok(()) - } - - async fn retry( - self, - db: &dyn SchedulerInterface, - schedule_time: PrimitiveDateTime, - ) -> Result<(), sch_errors::ProcessTrackerError> { - metrics::TASK_RETRIED.add(&metrics::CONTEXT, 1, &[]); - db.update_process_tracker( - self.clone(), - storage::ProcessTrackerUpdate::StatusRetryUpdate { - status: storage_enums::ProcessTrackerStatus::Pending, - retry_count: self.retry_count + 1, - schedule_time, - }, - ) - .await?; - Ok(()) - } - - async fn finish_with_status( - self, - db: &dyn SchedulerInterface, - status: String, - ) -> Result<(), sch_errors::ProcessTrackerError> { - db.update_process( - self, - storage::ProcessTrackerUpdate::StatusUpdate { - status: storage_enums::ProcessTrackerStatus::Finish, - business_status: Some(status), - }, - ) - .await - .attach_printable("Failed while updating status of the process")?; - metrics::TASK_FINISHED.add(&metrics::CONTEXT, 1, &[]); - Ok(()) - } -} diff --git a/crates/scheduler/src/utils.rs b/crates/scheduler/src/utils.rs index f6a340e9d5..174efd637e 100644 --- a/crates/scheduler/src/utils.rs +++ b/crates/scheduler/src/utils.rs @@ -8,7 +8,7 @@ use diesel_models::enums::{self, ProcessTrackerStatus}; pub use diesel_models::process_tracker as storage; use error_stack::{report, ResultExt}; use redis_interface::{RedisConnectionPool, RedisEntryId}; -use router_env::opentelemetry; +use router_env::{instrument, opentelemetry, tracing}; use uuid::Uuid; use super::{ @@ -178,7 +178,7 @@ pub async fn get_batches( group_name: &str, consumer_name: &str, ) -> CustomResult, errors::ProcessTrackerError> { - let response = conn + let response = match conn .stream_read_with_options( stream_name, RedisEntryId::UndeliveredEntryID, @@ -188,10 +188,20 @@ pub async fn get_batches( Some((group_name, consumer_name)), ) .await - .map_err(|error| { - logger::warn!(%error, "Warning: finding batch in stream"); - error.change_context(errors::ProcessTrackerError::BatchNotFound) - })?; + { + Ok(response) => response, + Err(error) => { + if let redis_interface::errors::RedisError::StreamEmptyOrNotAvailable = + error.current_context() + { + logger::debug!("No batches processed as stream is empty"); + return Ok(Vec::new()); + } else { + return Err(error.change_context(errors::ProcessTrackerError::BatchNotFound)); + } + } + }; + metrics::BATCHES_CONSUMED.add(&metrics::CONTEXT, 1, &[]); let (batches, entry_ids): (Vec>, Vec>) = response.into_values().map(|entries| { @@ -217,13 +227,13 @@ pub async fn get_batches( conn.stream_acknowledge_entries(stream_name, group_name, entry_ids.clone()) .await .map_err(|error| { - logger::error!(%error, "Error acknowledging batch in stream"); + logger::error!(?error, "Error acknowledging batch in stream"); error.change_context(errors::ProcessTrackerError::BatchUpdateFailed) })?; conn.stream_delete_entries(stream_name, entry_ids.clone()) .await .map_err(|error| { - logger::error!(%error, "Error deleting batch from stream"); + logger::error!(?error, "Error deleting batch from stream"); error.change_context(errors::ProcessTrackerError::BatchDeleteFailed) })?; @@ -231,7 +241,7 @@ pub async fn get_batches( } pub fn get_process_tracker_id<'a>( - runner: &'a str, + runner: storage::ProcessTrackerRunner, task_name: &'a str, txn_id: &'a str, merchant_id: &'a str, @@ -243,6 +253,7 @@ pub fn get_time_from_delta(delta: Option) -> Option( state: T, settings: sync::Arc,