feat(router): add requeue support for payments and fix duplicate entry error in process tracker for requeued payments (#1567)

This commit is contained in:
Sai Harsha Vardhan
2023-07-03 12:56:01 +05:30
committed by GitHub
parent 7489c870d9
commit b967d23251
18 changed files with 133 additions and 22 deletions

View File

@ -973,3 +973,23 @@ pub struct UnresolvedResponseReason {
/// A message to merchant to give hint on next action he/she should do to resolve /// A message to merchant to give hint on next action he/she should do to resolve
pub message: String, pub message: String,
} }
#[derive(
Debug,
serde::Deserialize,
serde::Serialize,
strum::Display,
strum::EnumString,
Clone,
PartialEq,
Eq,
ToSchema,
)]
#[serde(rename_all = "snake_case")]
#[strum(serialize_all = "snake_case")]
pub enum RetryAction {
/// Payment can be retried from the client side until the payment is successful or payment expires or the attempts(configured by the merchant) for payment are exhausted
ManualRetry,
/// Denotes that the payment is requeued
Requeue,
}

View File

@ -278,9 +278,9 @@ pub struct PaymentsRequest {
/// Business sub label for the payment /// Business sub label for the payment
pub business_sub_label: Option<String>, pub business_sub_label: Option<String>,
/// If enabled payment can be retried from the client side until the payment is successful or payment expires or the attempts(configured by the merchant) for payment are exhausted. /// Denotes the retry action
#[serde(default)] #[schema(value_type = Option<RetryAction>)]
pub manual_retry: bool, pub retry_action: Option<api_enums::RetryAction>,
/// Any user defined fields can be passed here. /// Any user defined fields can be passed here.
#[schema(value_type = Option<Object>, example = r#"{ "udf1": "some-value", "udf2": "some-value" }"#)] #[schema(value_type = Option<Object>, example = r#"{ "udf1": "some-value", "udf2": "some-value" }"#)]

View File

@ -40,7 +40,7 @@ use crate::{
services::{self, api::Authenticate}, services::{self, api::Authenticate},
types::{ types::{
self, api, domain, self, api, domain,
storage::{self, enums as storage_enums}, storage::{self, enums as storage_enums, ProcessTrackerExt},
}, },
utils::{Encode, OptionExt, ValueExt}, utils::{Encode, OptionExt, ValueExt},
}; };
@ -137,7 +137,11 @@ where
if should_add_task_to_process_tracker(&payment_data) { if should_add_task_to_process_tracker(&payment_data) {
operation operation
.to_domain()? .to_domain()?
.add_task_to_process_tracker(state, &payment_data.payment_attempt) .add_task_to_process_tracker(
state,
&payment_data.payment_attempt,
validate_result.requeue,
)
.await .await
.map_err(|error| logger::error!(process_tracker_error=?error)) .map_err(|error| logger::error!(process_tracker_error=?error))
.ok(); .ok();
@ -1234,8 +1238,7 @@ pub async fn add_process_sync_task(
&payment_attempt.attempt_id, &payment_attempt.attempt_id,
&payment_attempt.merchant_id, &payment_attempt.merchant_id,
); );
let process_tracker_entry = let process_tracker_entry = <storage::ProcessTracker>::make_process_tracker_new(
<storage::ProcessTracker as storage::ProcessTrackerExt>::make_process_tracker_new(
process_tracker_id, process_tracker_id,
task, task,
runner, runner,
@ -1247,6 +1250,27 @@ pub async fn add_process_sync_task(
Ok(()) Ok(())
} }
pub async fn reset_process_sync_task(
db: &dyn StorageInterface,
payment_attempt: &storage::PaymentAttempt,
schedule_time: time::PrimitiveDateTime,
) -> Result<(), errors::ProcessTrackerError> {
let runner = "PAYMENTS_SYNC_WORKFLOW";
let task = "PAYMENTS_SYNC";
let process_tracker_id = pt_utils::get_process_tracker_id(
runner,
task,
&payment_attempt.attempt_id,
&payment_attempt.merchant_id,
);
let psync_process = db
.find_process_by_id(&process_tracker_id)
.await?
.ok_or(errors::ProcessTrackerError::ProcessFetchingFailed)?;
psync_process.reset(db, schedule_time).await?;
Ok(())
}
pub fn update_straight_through_routing<F>( pub fn update_straight_through_routing<F>(
payment_data: &mut PaymentData<F>, payment_data: &mut PaymentData<F>,
request_straight_through: serde_json::Value, request_straight_through: serde_json::Value,

View File

@ -763,6 +763,7 @@ pub async fn add_domain_task_to_pt<Op>(
operation: &Op, operation: &Op,
state: &AppState, state: &AppState,
payment_attempt: &storage::PaymentAttempt, payment_attempt: &storage::PaymentAttempt,
requeue: bool,
) -> CustomResult<(), errors::ApiErrorResponse> ) -> CustomResult<(), errors::ApiErrorResponse>
where where
Op: std::fmt::Debug, Op: std::fmt::Debug,
@ -786,12 +787,21 @@ where
match schedule_time { match schedule_time {
Some(stime) => { Some(stime) => {
if !requeue {
scheduler_metrics::TASKS_ADDED_COUNT.add(&metrics::CONTEXT, 1, &[]); // Metrics scheduler_metrics::TASKS_ADDED_COUNT.add(&metrics::CONTEXT, 1, &[]); // Metrics
super::add_process_sync_task(&*state.store, payment_attempt, stime) super::add_process_sync_task(&*state.store, payment_attempt, stime)
.await .await
.into_report() .into_report()
.change_context(errors::ApiErrorResponse::InternalServerError) .change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable("Failed while adding task to process tracker") .attach_printable("Failed while adding task to process tracker")
} else {
scheduler_metrics::TASKS_RESET_COUNT.add(&metrics::CONTEXT, 1, &[]); // Metrics
super::reset_process_sync_task(&*state.store, payment_attempt, stime)
.await
.into_report()
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable("Failed while updating task in process tracker")
}
} }
None => Ok(()), None => Ok(()),
} }
@ -2155,7 +2165,10 @@ pub fn get_attempt_type(
) -> RouterResult<AttemptType> { ) -> RouterResult<AttemptType> {
match payment_intent.status { match payment_intent.status {
enums::IntentStatus::Failed => { enums::IntentStatus::Failed => {
if request.manual_retry { if matches!(
request.retry_action,
Some(api_models::enums::RetryAction::ManualRetry)
) {
match payment_attempt.status { match payment_attempt.status {
enums::AttemptStatus::Started enums::AttemptStatus::Started
| enums::AttemptStatus::AuthenticationPending | enums::AttemptStatus::AuthenticationPending

View File

@ -71,6 +71,7 @@ pub struct ValidateResult<'a> {
pub payment_id: api::PaymentIdType, pub payment_id: api::PaymentIdType,
pub mandate_type: Option<api::MandateTransactionType>, pub mandate_type: Option<api::MandateTransactionType>,
pub storage_scheme: enums::MerchantStorageScheme, pub storage_scheme: enums::MerchantStorageScheme,
pub requeue: bool,
} }
#[allow(clippy::type_complexity)] #[allow(clippy::type_complexity)]
@ -119,6 +120,7 @@ pub trait Domain<F: Clone, R>: Send + Sync {
&'a self, &'a self,
_db: &'a AppState, _db: &'a AppState,
_payment_attempt: &storage::PaymentAttempt, _payment_attempt: &storage::PaymentAttempt,
_requeue: bool,
) -> CustomResult<(), errors::ApiErrorResponse> { ) -> CustomResult<(), errors::ApiErrorResponse> {
Ok(()) Ok(())
} }

View File

@ -235,6 +235,7 @@ impl<F: Send + Clone> ValidateRequest<F, api::PaymentsCancelRequest> for Payment
payment_id: api::PaymentIdType::PaymentIntentId(request.payment_id.to_owned()), payment_id: api::PaymentIdType::PaymentIntentId(request.payment_id.to_owned()),
mandate_type: None, mandate_type: None,
storage_scheme: merchant_account.storage_scheme, storage_scheme: merchant_account.storage_scheme,
requeue: false,
}, },
)) ))
} }

View File

@ -215,6 +215,7 @@ impl<F: Send + Clone> ValidateRequest<F, api::PaymentsCaptureRequest> for Paymen
payment_id: api::PaymentIdType::PaymentIntentId(payment_id.to_owned()), payment_id: api::PaymentIdType::PaymentIntentId(payment_id.to_owned()),
mandate_type: None, mandate_type: None,
storage_scheme: merchant_account.storage_scheme, storage_scheme: merchant_account.storage_scheme,
requeue: false,
}, },
)) ))
} }

View File

@ -271,6 +271,7 @@ impl<F: Clone + Send> Domain<F, api::PaymentsRequest> for CompleteAuthorize {
&'a self, &'a self,
_state: &'a AppState, _state: &'a AppState,
_payment_attempt: &storage::PaymentAttempt, _payment_attempt: &storage::PaymentAttempt,
_requeue: bool,
) -> CustomResult<(), errors::ApiErrorResponse> { ) -> CustomResult<(), errors::ApiErrorResponse> {
Ok(()) Ok(())
} }
@ -347,6 +348,10 @@ impl<F: Send + Clone> ValidateRequest<F, api::PaymentsRequest> for CompleteAutho
payment_id: api::PaymentIdType::PaymentIntentId(payment_id), payment_id: api::PaymentIdType::PaymentIntentId(payment_id),
mandate_type, mandate_type,
storage_scheme: merchant_account.storage_scheme, storage_scheme: merchant_account.storage_scheme,
requeue: matches!(
request.retry_action,
Some(api_models::enums::RetryAction::Requeue)
),
}, },
)) ))
} }

View File

@ -325,8 +325,9 @@ impl<F: Clone + Send> Domain<F, api::PaymentsRequest> for PaymentConfirm {
&'a self, &'a self,
state: &'a AppState, state: &'a AppState,
payment_attempt: &storage::PaymentAttempt, payment_attempt: &storage::PaymentAttempt,
requeue: bool,
) -> CustomResult<(), errors::ApiErrorResponse> { ) -> CustomResult<(), errors::ApiErrorResponse> {
helpers::add_domain_task_to_pt(self, state, payment_attempt).await helpers::add_domain_task_to_pt(self, state, payment_attempt, requeue).await
} }
async fn get_connector<'a>( async fn get_connector<'a>(
@ -525,6 +526,10 @@ impl<F: Send + Clone> ValidateRequest<F, api::PaymentsRequest> for PaymentConfir
payment_id: api::PaymentIdType::PaymentIntentId(payment_id), payment_id: api::PaymentIdType::PaymentIntentId(payment_id),
mandate_type, mandate_type,
storage_scheme: merchant_account.storage_scheme, storage_scheme: merchant_account.storage_scheme,
requeue: matches!(
request.retry_action,
Some(api_models::enums::RetryAction::Requeue)
),
}, },
)) ))
} }

View File

@ -312,6 +312,7 @@ impl<F: Clone + Send> Domain<F, api::PaymentsRequest> for PaymentCreate {
&'a self, &'a self,
_state: &'a AppState, _state: &'a AppState,
_payment_attempt: &storage::PaymentAttempt, _payment_attempt: &storage::PaymentAttempt,
_requeue: bool,
) -> CustomResult<(), errors::ApiErrorResponse> { ) -> CustomResult<(), errors::ApiErrorResponse> {
Ok(()) Ok(())
} }
@ -488,6 +489,10 @@ impl<F: Send + Clone> ValidateRequest<F, api::PaymentsRequest> for PaymentCreate
payment_id: api::PaymentIdType::PaymentIntentId(payment_id), payment_id: api::PaymentIdType::PaymentIntentId(payment_id),
mandate_type, mandate_type,
storage_scheme: merchant_account.storage_scheme, storage_scheme: merchant_account.storage_scheme,
requeue: matches!(
request.retry_action,
Some(api_models::enums::RetryAction::Requeue)
),
}, },
)) ))
} }

View File

@ -56,6 +56,7 @@ impl<F: Send + Clone> ValidateRequest<F, api::VerifyRequest> for PaymentMethodVa
payment_id: api::PaymentIdType::PaymentIntentId(validation_id), payment_id: api::PaymentIdType::PaymentIntentId(validation_id),
mandate_type, mandate_type,
storage_scheme: merchant_account.storage_scheme, storage_scheme: merchant_account.storage_scheme,
requeue: false,
}, },
)) ))
} }

View File

@ -237,6 +237,7 @@ impl<F: Send + Clone> ValidateRequest<F, api::PaymentsSessionRequest> for Paymen
payment_id: api::PaymentIdType::PaymentIntentId(given_payment_id), payment_id: api::PaymentIdType::PaymentIntentId(given_payment_id),
mandate_type: None, mandate_type: None,
storage_scheme: merchant_account.storage_scheme, storage_scheme: merchant_account.storage_scheme,
requeue: false,
}, },
)) ))
} }

View File

@ -202,6 +202,7 @@ impl<F: Send + Clone> ValidateRequest<F, api::PaymentsStartRequest> for PaymentS
payment_id: api::PaymentIdType::PaymentIntentId(payment_id), payment_id: api::PaymentIdType::PaymentIntentId(payment_id),
mandate_type: None, mandate_type: None,
storage_scheme: merchant_account.storage_scheme, storage_scheme: merchant_account.storage_scheme,
requeue: false,
}, },
)) ))
} }

View File

@ -93,8 +93,9 @@ impl<F: Clone + Send> Domain<F, api::PaymentsRequest> for PaymentStatus {
&'a self, &'a self,
state: &'a AppState, state: &'a AppState,
payment_attempt: &storage::PaymentAttempt, payment_attempt: &storage::PaymentAttempt,
requeue: bool,
) -> CustomResult<(), errors::ApiErrorResponse> { ) -> CustomResult<(), errors::ApiErrorResponse> {
helpers::add_domain_task_to_pt(self, state, payment_attempt).await helpers::add_domain_task_to_pt(self, state, payment_attempt, requeue).await
} }
async fn get_connector<'a>( async fn get_connector<'a>(
@ -333,6 +334,7 @@ impl<F: Send + Clone> ValidateRequest<F, api::PaymentsRetrieveRequest> for Payme
payment_id: request.resource_id.clone(), payment_id: request.resource_id.clone(),
mandate_type: None, mandate_type: None,
storage_scheme: merchant_account.storage_scheme, storage_scheme: merchant_account.storage_scheme,
requeue: false,
}, },
)) ))
} }

View File

@ -358,6 +358,7 @@ impl<F: Clone + Send> Domain<F, api::PaymentsRequest> for PaymentUpdate {
&'a self, &'a self,
_state: &'a AppState, _state: &'a AppState,
_payment_attempt: &storage::PaymentAttempt, _payment_attempt: &storage::PaymentAttempt,
_requeue: bool,
) -> CustomResult<(), errors::ApiErrorResponse> { ) -> CustomResult<(), errors::ApiErrorResponse> {
Ok(()) Ok(())
} }
@ -575,6 +576,10 @@ impl<F: Send + Clone> ValidateRequest<F, api::PaymentsRequest> for PaymentUpdate
payment_id: api::PaymentIdType::PaymentIntentId(payment_id), payment_id: api::PaymentIdType::PaymentIntentId(payment_id),
mandate_type, mandate_type,
storage_scheme: merchant_account.storage_scheme, storage_scheme: merchant_account.storage_scheme,
requeue: matches!(
request.retry_action,
Some(api_models::enums::RetryAction::Requeue)
),
}, },
)) ))
} }

View File

@ -152,6 +152,7 @@ Never share your secret api keys. Keep them guarded and secure.
api_models::enums::CountryAlpha2, api_models::enums::CountryAlpha2,
api_models::enums::FrmAction, api_models::enums::FrmAction,
api_models::enums::FrmPreferredFlowTypes, api_models::enums::FrmPreferredFlowTypes,
api_models::enums::RetryAction,
api_models::admin::MerchantConnectorCreate, api_models::admin::MerchantConnectorCreate,
api_models::admin::MerchantConnectorUpdate, api_models::admin::MerchantConnectorUpdate,
api_models::admin::PrimaryBusinessDetails, api_models::admin::PrimaryBusinessDetails,

View File

@ -7,6 +7,7 @@ histogram_metric!(CONSUMER_STATS, PT_METER, "CONSUMER_OPS");
counter_metric!(PAYMENT_COUNT, PT_METER); // No. of payments created counter_metric!(PAYMENT_COUNT, PT_METER); // No. of payments created
counter_metric!(TASKS_ADDED_COUNT, PT_METER); // Tasks added to process tracker counter_metric!(TASKS_ADDED_COUNT, PT_METER); // Tasks added to process tracker
counter_metric!(TASKS_RESET_COUNT, PT_METER); // Tasks reset in process tracker for requeue flow
counter_metric!(TASKS_PICKED_COUNT, PT_METER); // Tasks picked by counter_metric!(TASKS_PICKED_COUNT, PT_METER); // Tasks picked by
counter_metric!(BATCHES_CREATED, PT_METER); // Batches added to stream counter_metric!(BATCHES_CREATED, PT_METER); // Batches added to stream
counter_metric!(BATCHES_CONSUMED, PT_METER); // Batches consumed by consumer counter_metric!(BATCHES_CONSUMED, PT_METER); // Batches consumed by consumer

View File

@ -24,6 +24,12 @@ pub trait ProcessTrackerExt {
where where
T: Serialize; T: Serialize;
async fn reset(
self,
db: &dyn StorageInterface,
schedule_time: PrimitiveDateTime,
) -> Result<(), errors::ProcessTrackerError>;
async fn retry( async fn retry(
self, self,
db: &dyn StorageInterface, db: &dyn StorageInterface,
@ -72,6 +78,23 @@ impl ProcessTrackerExt for ProcessTracker {
}) })
} }
async fn reset(
self,
db: &dyn StorageInterface,
schedule_time: PrimitiveDateTime,
) -> Result<(), errors::ProcessTrackerError> {
db.update_process_tracker(
self.clone(),
ProcessTrackerUpdate::StatusRetryUpdate {
status: storage_enums::ProcessTrackerStatus::New,
retry_count: 0,
schedule_time,
},
)
.await?;
Ok(())
}
async fn retry( async fn retry(
self, self,
db: &dyn StorageInterface, db: &dyn StorageInterface,