fix(router): fix retry_count and add validation for process_tracker (#7614)

Co-authored-by: hyperswitch-bot[bot] <148525504+hyperswitch-bot[bot]@users.noreply.github.com>
Co-authored-by: Chikke Srujan <chikke.srujan@Chikke-Srujan-N7WRTY72X7.local>
This commit is contained in:
Aditya Chaurasia
2025-04-09 14:57:57 +05:30
committed by GitHub
parent b475171dd2
commit 6ef71051f6
20 changed files with 401 additions and 111 deletions

View File

@ -500,4 +500,8 @@ pub enum RevenueRecoveryError {
ProcessTrackerResponseError,
#[error("Billing connector psync call failed")]
BillingConnectorPaymentsSyncFailed,
#[error("Failed to get the retry count for payment intent")]
RetryCountFetchFailed,
#[error("Failed to get the billing threshold retry count")]
BillingThresholdRetryCountFetchFailed,
}

View File

@ -1738,7 +1738,7 @@ pub async fn record_attempt_core(
payment_id: id_type::GlobalPaymentId,
header_payload: HeaderPayload,
platform_merchant_account: Option<domain::MerchantAccount>,
) -> RouterResponse<api_models::payments::PaymentAttemptResponse> {
) -> RouterResponse<api_models::payments::PaymentAttemptRecordResponse> {
tracing::Span::current().record("merchant_id", merchant_account.get_id().get_string_repr());
let operation: &operations::payment_attempt_record::PaymentAttemptRecord =

View File

@ -235,7 +235,6 @@ impl<F: Clone + Sync> UpdateTracker<F, PaymentAttemptRecordData<F>, PaymentsAtte
{
status: common_enums::IntentStatus::from(payment_data.payment_attempt.status),
feature_metadata: Box::new(feature_metadata),
active_attempt_id: payment_data.payment_attempt.id.clone(),
updated_by: storage_scheme.to_string(),
};
payment_data.payment_intent = state

View File

@ -1825,6 +1825,42 @@ where
}
}
#[cfg(feature = "v2")]
impl<F> GenerateResponse<api_models::payments::PaymentAttemptRecordResponse>
for hyperswitch_domain_models::payments::PaymentAttemptRecordData<F>
where
F: Clone,
{
fn generate_response(
self,
_state: &SessionState,
_connector_http_status_code: Option<u16>,
_external_latency: Option<u128>,
_is_latency_header_enabled: Option<bool>,
_merchant_account: &domain::MerchantAccount,
_profile: &domain::Profile,
) -> RouterResponse<api_models::payments::PaymentAttemptRecordResponse> {
let payment_attempt = self.payment_attempt;
let payment_intent = self.payment_intent;
let response = api_models::payments::PaymentAttemptRecordResponse {
id: payment_attempt.get_id().to_owned(),
status: payment_attempt.status,
payment_intent_feature_metadata: payment_intent
.feature_metadata
.as_ref()
.map(api_models::payments::FeatureMetadata::foreign_from),
payment_attempt_feature_metadata: payment_attempt
.feature_metadata
.as_ref()
.map(api_models::payments::PaymentAttemptFeatureMetadata::foreign_from),
};
Ok(services::ApplicationResponse::JsonWithHeaders((
response,
vec![],
)))
}
}
#[cfg(feature = "v1")]
impl<F, Op, D> ToResponse<F, D, Op> for api::PaymentsPostSessionTokensResponse
where
@ -3935,6 +3971,17 @@ impl ForeignFrom<diesel_models::types::RecurringPaymentIntervalUnit>
}
}
impl ForeignFrom<diesel_models::types::RedirectResponse>
for api_models::payments::RedirectResponse
{
fn foreign_from(redirect_res: diesel_models::types::RedirectResponse) -> Self {
Self {
param: redirect_res.param,
json_payload: redirect_res.json_payload,
}
}
}
#[cfg(feature = "v1")]
impl<F: Clone> TryFrom<PaymentAdditionalData<'_, F>> for types::SetupMandateRequestData {
type Error = error_stack::Report<errors::ApiErrorResponse>;
@ -4435,6 +4482,18 @@ impl ForeignFrom<&hyperswitch_domain_models::payments::payment_attempt::AttemptA
}
}
#[cfg(feature = "v2")]
impl ForeignFrom<&diesel_models::types::BillingConnectorPaymentDetails>
for api_models::payments::BillingConnectorPaymentDetails
{
fn foreign_from(metadata: &diesel_models::types::BillingConnectorPaymentDetails) -> Self {
Self {
payment_processor_token: metadata.payment_processor_token.clone(),
connector_customer_id: metadata.connector_customer_id.clone(),
}
}
}
#[cfg(feature = "v2")]
impl ForeignFrom<&hyperswitch_domain_models::payments::payment_attempt::ErrorDetails>
for api_models::payments::ErrorDetails
@ -4472,6 +4531,51 @@ impl
}
}
#[cfg(feature = "v2")]
impl ForeignFrom<&diesel_models::types::FeatureMetadata> for api_models::payments::FeatureMetadata {
fn foreign_from(feature_metadata: &diesel_models::types::FeatureMetadata) -> Self {
let revenue_recovery = feature_metadata
.payment_revenue_recovery_metadata
.as_ref()
.map(|payment_revenue_recovery_metadata| {
api_models::payments::PaymentRevenueRecoveryMetadata {
total_retry_count: payment_revenue_recovery_metadata.total_retry_count,
payment_connector_transmission: Some(
payment_revenue_recovery_metadata.payment_connector_transmission,
),
connector: payment_revenue_recovery_metadata.connector,
billing_connector_id: payment_revenue_recovery_metadata
.billing_connector_id
.clone(),
active_attempt_payment_connector_id: payment_revenue_recovery_metadata
.active_attempt_payment_connector_id
.clone(),
payment_method_type: payment_revenue_recovery_metadata.payment_method_type,
payment_method_subtype: payment_revenue_recovery_metadata
.payment_method_subtype,
billing_connector_payment_details:
api_models::payments::BillingConnectorPaymentDetails::foreign_from(
&payment_revenue_recovery_metadata.billing_connector_payment_details,
),
}
});
let apple_pay_details = feature_metadata
.apple_pay_recurring_details
.clone()
.map(api_models::payments::ApplePayRecurringDetails::foreign_from);
let redirect_res = feature_metadata
.redirect_response
.clone()
.map(api_models::payments::RedirectResponse::foreign_from);
Self {
payment_revenue_recovery_metadata: revenue_recovery,
apple_pay_recurring_details: apple_pay_details,
redirect_response: redirect_res,
search_tags: feature_metadata.search_tags.clone(),
}
}
}
#[cfg(feature = "v2")]
impl ForeignFrom<hyperswitch_domain_models::payments::AmountDetails>
for api_models::payments::AmountDetailsResponse

View File

@ -63,7 +63,9 @@ pub async fn perform_execute_payment(
let decision = pcr_types::Decision::get_decision_based_on_params(
state,
payment_intent.status,
pcr_metadata.payment_connector_transmission,
pcr_metadata
.payment_connector_transmission
.unwrap_or_default(),
payment_intent.active_attempt_id.clone(),
pcr_data,
&tracking_data.global_payment_id,
@ -115,6 +117,7 @@ pub async fn perform_execute_payment(
None => {
// insert new psync task
insert_psync_pcr_task(
billing_mca.get_id().clone(),
db,
pcr_data.merchant_account.get_id().clone(),
payment_intent.get_id().clone(),
@ -147,6 +150,7 @@ pub async fn perform_execute_payment(
}
async fn insert_psync_pcr_task(
billing_mca_id: id_type::MerchantConnectorAccountId,
db: &dyn StorageInterface,
merchant_id: id_type::MerchantId,
payment_id: id_type::GlobalPaymentId,
@ -158,6 +162,7 @@ async fn insert_psync_pcr_task(
let process_tracker_id = payment_attempt_id.get_psync_revenue_recovery_id(task, runner);
let schedule_time = common_utils::date_time::now();
let psync_workflow_tracking_data = pcr::PcrWorkflowTrackingData {
billing_mca_id,
global_payment_id: payment_id,
merchant_id,
profile_id,

View File

@ -203,6 +203,7 @@ impl Action {
match self {
Self::SyncPayment(attempt_id) => {
core_pcr::insert_psync_pcr_task(
billing_mca.get_id().clone(),
db,
pcr_data.merchant_account.get_id().to_owned(),
payment_intent.id.clone(),

View File

@ -110,41 +110,58 @@ pub async fn recovery_incoming_webhook_flow(
.await?;
let is_event_recovery_transaction_event = event_type.is_recovery_transaction_event();
let (recovery_attempt_from_payment_attempt, recovery_intent_from_payment_attempt) =
RevenueRecoveryAttempt::get_recovery_payment_attempt(
is_event_recovery_transaction_event,
&billing_connector_account,
&state,
&key_store,
connector_enum,
&req_state,
billing_connector_payment_details.as_ref(),
request_details,
&merchant_account,
&business_profile,
&payment_intent,
)
.await?;
let payment_attempt = RevenueRecoveryAttempt::get_recovery_payment_attempt(
is_event_recovery_transaction_event,
&billing_connector_account,
&state,
&key_store,
connector_enum,
&req_state,
billing_connector_payment_details.as_ref(),
request_details,
&merchant_account,
&business_profile,
&payment_intent,
)
.await?;
let attempt_triggered_by = payment_attempt
let attempt_triggered_by = recovery_attempt_from_payment_attempt
.as_ref()
.and_then(revenue_recovery::RecoveryPaymentAttempt::get_attempt_triggered_by);
.and_then(|attempt| attempt.get_attempt_triggered_by());
let action = revenue_recovery::RecoveryAction::get_action(event_type, attempt_triggered_by);
let mca_retry_threshold = billing_connector_account
.get_retry_threshold()
.ok_or(report!(
errors::RevenueRecoveryError::BillingThresholdRetryCountFetchFailed
))?;
let intent_retry_count = recovery_intent_from_payment_attempt
.feature_metadata
.as_ref()
.and_then(|metadata| metadata.get_retry_count())
.ok_or(report!(errors::RevenueRecoveryError::RetryCountFetchFailed))?;
router_env::logger::info!("Intent retry count: {:?}", intent_retry_count);
match action {
revenue_recovery::RecoveryAction::CancelInvoice => todo!(),
revenue_recovery::RecoveryAction::ScheduleFailedPayment => {
Ok(RevenueRecoveryAttempt::insert_execute_pcr_task(
&*state.store,
merchant_account.get_id().to_owned(),
payment_intent,
business_profile.get_id().to_owned(),
payment_attempt.map(|attempt| attempt.attempt_id.clone()),
storage::ProcessTrackerRunner::PassiveRecoveryWorkflow,
handle_schedule_failed_payment(
&billing_connector_account,
intent_retry_count,
mca_retry_threshold,
&state,
&merchant_account,
&(
recovery_attempt_from_payment_attempt,
recovery_intent_from_payment_attempt,
),
&business_profile,
)
.await
.change_context(errors::RevenueRecoveryError::InvoiceWebhookProcessingFailed)?)
}
revenue_recovery::RecoveryAction::SuccessPaymentExternal => {
// Need to add recovery stop flow for this scenario
@ -171,6 +188,48 @@ pub async fn recovery_incoming_webhook_flow(
}
}
}
async fn handle_schedule_failed_payment(
billing_connector_account: &domain::MerchantConnectorAccount,
intent_retry_count: u16,
mca_retry_threshold: u16,
state: &SessionState,
merchant_account: &domain::MerchantAccount,
payment_attempt_with_recovery_intent: &(
Option<revenue_recovery::RecoveryPaymentAttempt>,
revenue_recovery::RecoveryPaymentIntent,
),
business_profile: &domain::Profile,
) -> CustomResult<webhooks::WebhookResponseTracker, errors::RevenueRecoveryError> {
let (recovery_attempt_from_payment_attempt, recovery_intent_from_payment_attempt) =
payment_attempt_with_recovery_intent;
(intent_retry_count <= mca_retry_threshold)
.then(|| {
router_env::logger::error!(
"Payment retry count {} is less than threshold {}",
intent_retry_count,
mca_retry_threshold
);
Ok(webhooks::WebhookResponseTracker::NoEffect)
})
.async_unwrap_or_else(|| async {
RevenueRecoveryAttempt::insert_execute_pcr_task(
&billing_connector_account.get_id(),
&*state.store,
merchant_account.get_id().to_owned(),
recovery_intent_from_payment_attempt.clone(),
business_profile.get_id().to_owned(),
intent_retry_count,
recovery_attempt_from_payment_attempt
.as_ref()
.map(|attempt| attempt.attempt_id.clone()),
storage::ProcessTrackerRunner::PassiveRecoveryWorkflow,
)
.await
})
.await
}
#[derive(Debug)]
pub struct RevenueRecoveryInvoice(revenue_recovery::RevenueRecoveryInvoiceData);
#[derive(Debug)]
@ -333,9 +392,14 @@ impl RevenueRecoveryAttempt {
merchant_account: &domain::MerchantAccount,
profile: &domain::Profile,
key_store: &domain::MerchantKeyStore,
payment_id: id_type::GlobalPaymentId,
) -> CustomResult<Option<revenue_recovery::RecoveryPaymentAttempt>, errors::RevenueRecoveryError>
{
payment_intent: &revenue_recovery::RecoveryPaymentIntent,
) -> CustomResult<
Option<(
revenue_recovery::RecoveryPaymentAttempt,
revenue_recovery::RecoveryPaymentIntent,
)>,
errors::RevenueRecoveryError,
> {
let attempt_response = Box::pin(payments::payments_core::<
router_flow_types::payments::PSync,
api_payments::PaymentsResponse,
@ -357,7 +421,7 @@ impl RevenueRecoveryAttempt {
expand_attempts: true,
param: None,
},
payment_id.clone(),
payment_intent.payment_id.clone(),
payments::CallConnectorAction::Avoid,
hyperswitch_domain_models::payments::HeaderPayload::default(),
))
@ -380,7 +444,10 @@ impl RevenueRecoveryAttempt {
attempt_status: attempt_res.status.to_owned(),
feature_metadata: attempt_res.feature_metadata.to_owned(),
});
Ok(payment_attempt)
// If we have an attempt, combine it with payment_intent in a tuple.
let res_with_payment_intent_and_attempt =
payment_attempt.map(|attempt| (attempt, (*payment_intent).clone()));
Ok(res_with_payment_intent_and_attempt)
}
Ok(_) => Err(errors::RevenueRecoveryError::PaymentAttemptFetchFailed)
.attach_printable("Unexpected response from payment intent core"),
@ -401,10 +468,16 @@ impl RevenueRecoveryAttempt {
merchant_account: &domain::MerchantAccount,
profile: &domain::Profile,
key_store: &domain::MerchantKeyStore,
payment_id: id_type::GlobalPaymentId,
payment_intent: &revenue_recovery::RecoveryPaymentIntent,
billing_connector_account_id: &id_type::MerchantConnectorAccountId,
payment_connector_account: Option<domain::MerchantConnectorAccount>,
) -> CustomResult<revenue_recovery::RecoveryPaymentAttempt, errors::RevenueRecoveryError> {
) -> CustomResult<
(
revenue_recovery::RecoveryPaymentAttempt,
revenue_recovery::RecoveryPaymentIntent,
),
errors::RevenueRecoveryError,
> {
let request_payload = self
.create_payment_record_request(billing_connector_account_id, payment_connector_account);
let attempt_response = Box::pin(payments::record_attempt_core(
@ -414,19 +487,26 @@ impl RevenueRecoveryAttempt {
profile.clone(),
key_store.clone(),
request_payload,
payment_id.clone(),
payment_intent.payment_id.clone(),
hyperswitch_domain_models::payments::HeaderPayload::default(),
None,
))
.await;
let response = match attempt_response {
let (recovery_attempt, updated_recovery_intent) = match attempt_response {
Ok(services::ApplicationResponse::JsonWithHeaders((attempt_response, _))) => {
Ok(revenue_recovery::RecoveryPaymentAttempt {
attempt_id: attempt_response.id,
attempt_status: attempt_response.status,
feature_metadata: attempt_response.feature_metadata,
})
Ok((
revenue_recovery::RecoveryPaymentAttempt {
attempt_id: attempt_response.id.clone(),
attempt_status: attempt_response.status,
feature_metadata: attempt_response.payment_attempt_feature_metadata,
},
revenue_recovery::RecoveryPaymentIntent {
payment_id: payment_intent.payment_id.clone(),
status: attempt_response.status.into(), // Using status from attempt_response
feature_metadata: attempt_response.payment_intent_feature_metadata, // Using feature_metadata from attempt_response
},
))
}
Ok(_) => Err(errors::RevenueRecoveryError::PaymentAttemptFetchFailed)
.attach_printable("Unexpected response from record attempt core"),
@ -436,6 +516,9 @@ impl RevenueRecoveryAttempt {
.attach_printable("failed to record attempt in recovery webhook flow")
}
}?;
let response = (recovery_attempt, updated_recovery_intent);
Ok(response)
}
@ -516,11 +599,15 @@ impl RevenueRecoveryAttempt {
merchant_account: &domain::MerchantAccount,
business_profile: &domain::Profile,
payment_intent: &revenue_recovery::RecoveryPaymentIntent,
) -> CustomResult<Option<revenue_recovery::RecoveryPaymentAttempt>, errors::RevenueRecoveryError>
{
let recovery_payment_attempt = match is_recovery_transaction_event {
) -> CustomResult<
(
Option<revenue_recovery::RecoveryPaymentAttempt>,
revenue_recovery::RecoveryPaymentIntent,
),
errors::RevenueRecoveryError,
> {
let payment_attempt_with_recovery_intent = match is_recovery_transaction_event {
true => {
// Checks whether we have data in recovery_details , If its there then it will use the data and convert it into required from or else fetches from Incoming webhook
let invoice_transaction_details = Self::get_recovery_invoice_transaction_details(
connector_enum,
request_details,
@ -536,46 +623,49 @@ impl RevenueRecoveryAttempt {
)
.await?;
Some(
invoice_transaction_details
.get_payment_attempt(
state,
req_state,
merchant_account,
business_profile,
key_store,
payment_intent.payment_id.clone(),
)
.await
.transpose()
.async_unwrap_or_else(|| async {
invoice_transaction_details
.record_payment_attempt(
state,
req_state,
merchant_account,
business_profile,
key_store,
payment_intent.payment_id.clone(),
&billing_connector_account.id,
payment_merchant_connector_account,
)
.await
})
.await?,
)
let (payment_attempt, updated_payment_intent) = invoice_transaction_details
.get_payment_attempt(
state,
req_state,
merchant_account,
business_profile,
key_store,
payment_intent,
)
.await
.transpose()
.async_unwrap_or_else(|| async {
invoice_transaction_details
.record_payment_attempt(
state,
req_state,
merchant_account,
business_profile,
key_store,
payment_intent,
&billing_connector_account.get_id(),
payment_merchant_connector_account,
)
.await
})
.await?;
(Some(payment_attempt), updated_payment_intent)
}
false => None,
false => (None, payment_intent.clone()),
};
Ok(recovery_payment_attempt)
Ok(payment_attempt_with_recovery_intent)
}
#[allow(clippy::too_many_arguments)]
async fn insert_execute_pcr_task(
billing_mca_id: &id_type::MerchantConnectorAccountId,
db: &dyn StorageInterface,
merchant_id: id_type::MerchantId,
payment_intent: revenue_recovery::RecoveryPaymentIntent,
profile_id: id_type::ProfileId,
intent_retry_count: u16,
payment_attempt_id: Option<id_type::GlobalAttemptId>,
runner: storage::ProcessTrackerRunner,
) -> CustomResult<webhooks::WebhookResponseTracker, errors::RevenueRecoveryError> {
@ -585,23 +675,16 @@ impl RevenueRecoveryAttempt {
let process_tracker_id = format!("{runner}_{task}_{}", payment_id.get_string_repr());
let total_retry_count = payment_intent
.feature_metadata
.and_then(|feature_metadata| feature_metadata.get_retry_count())
.unwrap_or(0);
let schedule_time = revenue_recovery_flow::get_schedule_time_to_retry_mit_payments(
db,
&merchant_id,
(total_retry_count + 1).into(),
(intent_retry_count + 1).into(),
)
.await
.map_or_else(
|| {
Err(
report!(errors::RevenueRecoveryError::ScheduleTimeFetchFailed)
.attach_printable("Failed to get schedule time for pcr workflow"),
)
Err(errors::RevenueRecoveryError::ScheduleTimeFetchFailed)
.attach_printable("Failed to get schedule time for pcr workflow")
},
Ok, // Simply returns `time` wrapped in `Ok`
)?;
@ -613,6 +696,7 @@ impl RevenueRecoveryAttempt {
.attach_printable("payment attempt id is required for pcr workflow tracking")?;
let execute_workflow_tracking_data = storage_churn_recovery::PcrWorkflowTrackingData {
billing_mca_id: billing_mca_id.clone(),
global_payment_id: payment_id.clone(),
merchant_id,
profile_id,
@ -627,7 +711,7 @@ impl RevenueRecoveryAttempt {
runner,
tag,
execute_workflow_tracking_data,
Some(total_retry_count.into()),
Some(intent_retry_count.into()),
schedule_time,
common_enums::ApiVersion::V2,
)

View File

@ -8,6 +8,7 @@ pub struct PcrWorkflowTrackingData {
pub profile_id: id_type::ProfileId,
pub global_payment_id: id_type::GlobalPaymentId,
pub payment_attempt_id: id_type::GlobalAttemptId,
pub billing_mca_id: id_type::MerchantConnectorAccountId,
}
#[derive(Debug, Clone)]