diff --git a/crates/api_models/src/enums.rs b/crates/api_models/src/enums.rs index 26e96165ba..10aabb0b59 100644 --- a/crates/api_models/src/enums.rs +++ b/crates/api_models/src/enums.rs @@ -973,3 +973,23 @@ pub struct UnresolvedResponseReason { /// A message to merchant to give hint on next action he/she should do to resolve pub message: String, } + +#[derive( + Debug, + serde::Deserialize, + serde::Serialize, + strum::Display, + strum::EnumString, + Clone, + PartialEq, + Eq, + ToSchema, +)] +#[serde(rename_all = "snake_case")] +#[strum(serialize_all = "snake_case")] +pub enum RetryAction { + /// Payment can be retried from the client side until the payment is successful or payment expires or the attempts(configured by the merchant) for payment are exhausted + ManualRetry, + /// Denotes that the payment is requeued + Requeue, +} diff --git a/crates/api_models/src/payments.rs b/crates/api_models/src/payments.rs index 95f3835620..cacff51844 100644 --- a/crates/api_models/src/payments.rs +++ b/crates/api_models/src/payments.rs @@ -278,9 +278,9 @@ pub struct PaymentsRequest { /// Business sub label for the payment pub business_sub_label: Option, - /// If enabled payment can be retried from the client side until the payment is successful or payment expires or the attempts(configured by the merchant) for payment are exhausted. - #[serde(default)] - pub manual_retry: bool, + /// Denotes the retry action + #[schema(value_type = Option)] + pub retry_action: Option, /// Any user defined fields can be passed here. #[schema(value_type = Option, example = r#"{ "udf1": "some-value", "udf2": "some-value" }"#)] diff --git a/crates/router/src/core/payments.rs b/crates/router/src/core/payments.rs index 1a3057bb97..37f11e93ba 100644 --- a/crates/router/src/core/payments.rs +++ b/crates/router/src/core/payments.rs @@ -40,7 +40,7 @@ use crate::{ services::{self, api::Authenticate}, types::{ self, api, domain, - storage::{self, enums as storage_enums}, + storage::{self, enums as storage_enums, ProcessTrackerExt}, }, utils::{Encode, OptionExt, ValueExt}, }; @@ -137,7 +137,11 @@ where if should_add_task_to_process_tracker(&payment_data) { operation .to_domain()? - .add_task_to_process_tracker(state, &payment_data.payment_attempt) + .add_task_to_process_tracker( + state, + &payment_data.payment_attempt, + validate_result.requeue, + ) .await .map_err(|error| logger::error!(process_tracker_error=?error)) .ok(); @@ -1234,19 +1238,39 @@ pub async fn add_process_sync_task( &payment_attempt.attempt_id, &payment_attempt.merchant_id, ); - let process_tracker_entry = - ::make_process_tracker_new( - process_tracker_id, - task, - runner, - tracking_data, - schedule_time, - )?; + let process_tracker_entry = ::make_process_tracker_new( + process_tracker_id, + task, + runner, + tracking_data, + schedule_time, + )?; db.insert_process(process_tracker_entry).await?; Ok(()) } +pub async fn reset_process_sync_task( + db: &dyn StorageInterface, + payment_attempt: &storage::PaymentAttempt, + schedule_time: time::PrimitiveDateTime, +) -> Result<(), errors::ProcessTrackerError> { + let runner = "PAYMENTS_SYNC_WORKFLOW"; + let task = "PAYMENTS_SYNC"; + let process_tracker_id = pt_utils::get_process_tracker_id( + runner, + task, + &payment_attempt.attempt_id, + &payment_attempt.merchant_id, + ); + let psync_process = db + .find_process_by_id(&process_tracker_id) + .await? + .ok_or(errors::ProcessTrackerError::ProcessFetchingFailed)?; + psync_process.reset(db, schedule_time).await?; + Ok(()) +} + pub fn update_straight_through_routing( payment_data: &mut PaymentData, request_straight_through: serde_json::Value, diff --git a/crates/router/src/core/payments/helpers.rs b/crates/router/src/core/payments/helpers.rs index 9752aa44fa..aaaeaba0a3 100644 --- a/crates/router/src/core/payments/helpers.rs +++ b/crates/router/src/core/payments/helpers.rs @@ -763,6 +763,7 @@ pub async fn add_domain_task_to_pt( operation: &Op, state: &AppState, payment_attempt: &storage::PaymentAttempt, + requeue: bool, ) -> CustomResult<(), errors::ApiErrorResponse> where Op: std::fmt::Debug, @@ -786,12 +787,21 @@ where match schedule_time { Some(stime) => { - scheduler_metrics::TASKS_ADDED_COUNT.add(&metrics::CONTEXT, 1, &[]); // Metrics - super::add_process_sync_task(&*state.store, payment_attempt, stime) - .await - .into_report() - .change_context(errors::ApiErrorResponse::InternalServerError) - .attach_printable("Failed while adding task to process tracker") + if !requeue { + scheduler_metrics::TASKS_ADDED_COUNT.add(&metrics::CONTEXT, 1, &[]); // Metrics + super::add_process_sync_task(&*state.store, payment_attempt, stime) + .await + .into_report() + .change_context(errors::ApiErrorResponse::InternalServerError) + .attach_printable("Failed while adding task to process tracker") + } else { + scheduler_metrics::TASKS_RESET_COUNT.add(&metrics::CONTEXT, 1, &[]); // Metrics + super::reset_process_sync_task(&*state.store, payment_attempt, stime) + .await + .into_report() + .change_context(errors::ApiErrorResponse::InternalServerError) + .attach_printable("Failed while updating task in process tracker") + } } None => Ok(()), } @@ -2155,7 +2165,10 @@ pub fn get_attempt_type( ) -> RouterResult { match payment_intent.status { enums::IntentStatus::Failed => { - if request.manual_retry { + if matches!( + request.retry_action, + Some(api_models::enums::RetryAction::ManualRetry) + ) { match payment_attempt.status { enums::AttemptStatus::Started | enums::AttemptStatus::AuthenticationPending diff --git a/crates/router/src/core/payments/operations.rs b/crates/router/src/core/payments/operations.rs index 7b39d6e447..b0e362280f 100644 --- a/crates/router/src/core/payments/operations.rs +++ b/crates/router/src/core/payments/operations.rs @@ -71,6 +71,7 @@ pub struct ValidateResult<'a> { pub payment_id: api::PaymentIdType, pub mandate_type: Option, pub storage_scheme: enums::MerchantStorageScheme, + pub requeue: bool, } #[allow(clippy::type_complexity)] @@ -119,6 +120,7 @@ pub trait Domain: Send + Sync { &'a self, _db: &'a AppState, _payment_attempt: &storage::PaymentAttempt, + _requeue: bool, ) -> CustomResult<(), errors::ApiErrorResponse> { Ok(()) } diff --git a/crates/router/src/core/payments/operations/payment_cancel.rs b/crates/router/src/core/payments/operations/payment_cancel.rs index b522be8065..435c5684fb 100644 --- a/crates/router/src/core/payments/operations/payment_cancel.rs +++ b/crates/router/src/core/payments/operations/payment_cancel.rs @@ -235,6 +235,7 @@ impl ValidateRequest for Payment payment_id: api::PaymentIdType::PaymentIntentId(request.payment_id.to_owned()), mandate_type: None, storage_scheme: merchant_account.storage_scheme, + requeue: false, }, )) } diff --git a/crates/router/src/core/payments/operations/payment_capture.rs b/crates/router/src/core/payments/operations/payment_capture.rs index 82f6b895db..ceafe45703 100644 --- a/crates/router/src/core/payments/operations/payment_capture.rs +++ b/crates/router/src/core/payments/operations/payment_capture.rs @@ -215,6 +215,7 @@ impl ValidateRequest for Paymen payment_id: api::PaymentIdType::PaymentIntentId(payment_id.to_owned()), mandate_type: None, storage_scheme: merchant_account.storage_scheme, + requeue: false, }, )) } diff --git a/crates/router/src/core/payments/operations/payment_complete_authorize.rs b/crates/router/src/core/payments/operations/payment_complete_authorize.rs index 9f52c47145..1d71f4ba31 100644 --- a/crates/router/src/core/payments/operations/payment_complete_authorize.rs +++ b/crates/router/src/core/payments/operations/payment_complete_authorize.rs @@ -271,6 +271,7 @@ impl Domain for CompleteAuthorize { &'a self, _state: &'a AppState, _payment_attempt: &storage::PaymentAttempt, + _requeue: bool, ) -> CustomResult<(), errors::ApiErrorResponse> { Ok(()) } @@ -347,6 +348,10 @@ impl ValidateRequest for CompleteAutho payment_id: api::PaymentIdType::PaymentIntentId(payment_id), mandate_type, storage_scheme: merchant_account.storage_scheme, + requeue: matches!( + request.retry_action, + Some(api_models::enums::RetryAction::Requeue) + ), }, )) } diff --git a/crates/router/src/core/payments/operations/payment_confirm.rs b/crates/router/src/core/payments/operations/payment_confirm.rs index 91809e1415..01800b7017 100644 --- a/crates/router/src/core/payments/operations/payment_confirm.rs +++ b/crates/router/src/core/payments/operations/payment_confirm.rs @@ -325,8 +325,9 @@ impl Domain for PaymentConfirm { &'a self, state: &'a AppState, payment_attempt: &storage::PaymentAttempt, + requeue: bool, ) -> CustomResult<(), errors::ApiErrorResponse> { - helpers::add_domain_task_to_pt(self, state, payment_attempt).await + helpers::add_domain_task_to_pt(self, state, payment_attempt, requeue).await } async fn get_connector<'a>( @@ -525,6 +526,10 @@ impl ValidateRequest for PaymentConfir payment_id: api::PaymentIdType::PaymentIntentId(payment_id), mandate_type, storage_scheme: merchant_account.storage_scheme, + requeue: matches!( + request.retry_action, + Some(api_models::enums::RetryAction::Requeue) + ), }, )) } diff --git a/crates/router/src/core/payments/operations/payment_create.rs b/crates/router/src/core/payments/operations/payment_create.rs index 2ea7fa1e12..861fc5f18a 100644 --- a/crates/router/src/core/payments/operations/payment_create.rs +++ b/crates/router/src/core/payments/operations/payment_create.rs @@ -312,6 +312,7 @@ impl Domain for PaymentCreate { &'a self, _state: &'a AppState, _payment_attempt: &storage::PaymentAttempt, + _requeue: bool, ) -> CustomResult<(), errors::ApiErrorResponse> { Ok(()) } @@ -488,6 +489,10 @@ impl ValidateRequest for PaymentCreate payment_id: api::PaymentIdType::PaymentIntentId(payment_id), mandate_type, storage_scheme: merchant_account.storage_scheme, + requeue: matches!( + request.retry_action, + Some(api_models::enums::RetryAction::Requeue) + ), }, )) } diff --git a/crates/router/src/core/payments/operations/payment_method_validate.rs b/crates/router/src/core/payments/operations/payment_method_validate.rs index f2bc4abb8f..f0bee3fe4e 100644 --- a/crates/router/src/core/payments/operations/payment_method_validate.rs +++ b/crates/router/src/core/payments/operations/payment_method_validate.rs @@ -56,6 +56,7 @@ impl ValidateRequest for PaymentMethodVa payment_id: api::PaymentIdType::PaymentIntentId(validation_id), mandate_type, storage_scheme: merchant_account.storage_scheme, + requeue: false, }, )) } diff --git a/crates/router/src/core/payments/operations/payment_session.rs b/crates/router/src/core/payments/operations/payment_session.rs index 5589fca3f9..096cf297d0 100644 --- a/crates/router/src/core/payments/operations/payment_session.rs +++ b/crates/router/src/core/payments/operations/payment_session.rs @@ -237,6 +237,7 @@ impl ValidateRequest for Paymen payment_id: api::PaymentIdType::PaymentIntentId(given_payment_id), mandate_type: None, storage_scheme: merchant_account.storage_scheme, + requeue: false, }, )) } diff --git a/crates/router/src/core/payments/operations/payment_start.rs b/crates/router/src/core/payments/operations/payment_start.rs index 5cd2d11c08..9490213a39 100644 --- a/crates/router/src/core/payments/operations/payment_start.rs +++ b/crates/router/src/core/payments/operations/payment_start.rs @@ -202,6 +202,7 @@ impl ValidateRequest for PaymentS payment_id: api::PaymentIdType::PaymentIntentId(payment_id), mandate_type: None, storage_scheme: merchant_account.storage_scheme, + requeue: false, }, )) } diff --git a/crates/router/src/core/payments/operations/payment_status.rs b/crates/router/src/core/payments/operations/payment_status.rs index 5d22bd3b0a..90e655f303 100644 --- a/crates/router/src/core/payments/operations/payment_status.rs +++ b/crates/router/src/core/payments/operations/payment_status.rs @@ -93,8 +93,9 @@ impl Domain for PaymentStatus { &'a self, state: &'a AppState, payment_attempt: &storage::PaymentAttempt, + requeue: bool, ) -> CustomResult<(), errors::ApiErrorResponse> { - helpers::add_domain_task_to_pt(self, state, payment_attempt).await + helpers::add_domain_task_to_pt(self, state, payment_attempt, requeue).await } async fn get_connector<'a>( @@ -333,6 +334,7 @@ impl ValidateRequest for Payme payment_id: request.resource_id.clone(), mandate_type: None, storage_scheme: merchant_account.storage_scheme, + requeue: false, }, )) } diff --git a/crates/router/src/core/payments/operations/payment_update.rs b/crates/router/src/core/payments/operations/payment_update.rs index 507f4034e6..f2868a2ede 100644 --- a/crates/router/src/core/payments/operations/payment_update.rs +++ b/crates/router/src/core/payments/operations/payment_update.rs @@ -358,6 +358,7 @@ impl Domain for PaymentUpdate { &'a self, _state: &'a AppState, _payment_attempt: &storage::PaymentAttempt, + _requeue: bool, ) -> CustomResult<(), errors::ApiErrorResponse> { Ok(()) } @@ -575,6 +576,10 @@ impl ValidateRequest for PaymentUpdate payment_id: api::PaymentIdType::PaymentIntentId(payment_id), mandate_type, storage_scheme: merchant_account.storage_scheme, + requeue: matches!( + request.retry_action, + Some(api_models::enums::RetryAction::Requeue) + ), }, )) } diff --git a/crates/router/src/openapi.rs b/crates/router/src/openapi.rs index 726d2295c9..40e7d32a1b 100644 --- a/crates/router/src/openapi.rs +++ b/crates/router/src/openapi.rs @@ -152,6 +152,7 @@ Never share your secret api keys. Keep them guarded and secure. api_models::enums::CountryAlpha2, api_models::enums::FrmAction, api_models::enums::FrmPreferredFlowTypes, + api_models::enums::RetryAction, api_models::admin::MerchantConnectorCreate, api_models::admin::MerchantConnectorUpdate, api_models::admin::PrimaryBusinessDetails, diff --git a/crates/router/src/scheduler/metrics.rs b/crates/router/src/scheduler/metrics.rs index 6fae07bba9..7337e3ab4f 100644 --- a/crates/router/src/scheduler/metrics.rs +++ b/crates/router/src/scheduler/metrics.rs @@ -7,6 +7,7 @@ histogram_metric!(CONSUMER_STATS, PT_METER, "CONSUMER_OPS"); counter_metric!(PAYMENT_COUNT, PT_METER); // No. of payments created counter_metric!(TASKS_ADDED_COUNT, PT_METER); // Tasks added to process tracker +counter_metric!(TASKS_RESET_COUNT, PT_METER); // Tasks reset in process tracker for requeue flow counter_metric!(TASKS_PICKED_COUNT, PT_METER); // Tasks picked by counter_metric!(BATCHES_CREATED, PT_METER); // Batches added to stream counter_metric!(BATCHES_CONSUMED, PT_METER); // Batches consumed by consumer diff --git a/crates/router/src/types/storage/process_tracker.rs b/crates/router/src/types/storage/process_tracker.rs index 98b2fa2b20..9e1120745e 100644 --- a/crates/router/src/types/storage/process_tracker.rs +++ b/crates/router/src/types/storage/process_tracker.rs @@ -24,6 +24,12 @@ pub trait ProcessTrackerExt { where T: Serialize; + async fn reset( + self, + db: &dyn StorageInterface, + schedule_time: PrimitiveDateTime, + ) -> Result<(), errors::ProcessTrackerError>; + async fn retry( self, db: &dyn StorageInterface, @@ -72,6 +78,23 @@ impl ProcessTrackerExt for ProcessTracker { }) } + async fn reset( + self, + db: &dyn StorageInterface, + schedule_time: PrimitiveDateTime, + ) -> Result<(), errors::ProcessTrackerError> { + db.update_process_tracker( + self.clone(), + ProcessTrackerUpdate::StatusRetryUpdate { + status: storage_enums::ProcessTrackerStatus::New, + retry_count: 0, + schedule_time, + }, + ) + .await?; + Ok(()) + } + async fn retry( self, db: &dyn StorageInterface,