feat(process_tracker): Invoke record back flow in PCR workflow [V2] (#7660)

Co-authored-by: Chikke Srujan <chikke.srujan@Chikke-Srujan-N7WRTY72X7.local>
This commit is contained in:
chikke srujan
2025-04-02 11:59:15 +05:30
committed by GitHub
parent a7f8560a2d
commit 40174b3c62
7 changed files with 237 additions and 34 deletions

View File

@ -1,5 +1,7 @@
#[cfg(feature = "v2")]
use common_enums::enums::PaymentConnectorTransmission;
#[cfg(feature = "v2")]
use common_utils::id_type;
use common_utils::{hashing::HashedString, pii, types::MinorUnit};
use diesel::{
sql_types::{Json, Jsonb},
@ -67,7 +69,15 @@ impl FeatureMetadata {
pub fn get_payment_method_type(&self) -> Option<common_enums::PaymentMethod> {
self.payment_revenue_recovery_metadata
.as_ref()
.map(|rrm| rrm.payment_method_type)
.map(|recovery_metadata| recovery_metadata.payment_method_type)
}
pub fn get_billing_merchant_connector_account_id(
&self,
) -> Option<id_type::MerchantConnectorAccountId> {
self.payment_revenue_recovery_metadata
.as_ref()
.map(|recovery_metadata| recovery_metadata.billing_connector_id.clone())
}
// TODO: Check search_tags for relevant payment method type
@ -150,9 +160,9 @@ pub struct PaymentRevenueRecoveryMetadata {
/// Flag for the payment connector's call
pub payment_connector_transmission: PaymentConnectorTransmission,
/// Billing Connector Id to update the invoices
pub billing_connector_id: common_utils::id_type::MerchantConnectorAccountId,
pub billing_connector_id: id_type::MerchantConnectorAccountId,
/// Payment Connector Id to retry the payments
pub active_attempt_payment_connector_id: common_utils::id_type::MerchantConnectorAccountId,
pub active_attempt_payment_connector_id: id_type::MerchantConnectorAccountId,
/// Billing Connector Payment Details
pub billing_connector_payment_details: BillingConnectorPaymentDetails,
///Payment Method Type

View File

@ -489,14 +489,23 @@ impl PaymentIntent {
pub fn get_connector_customer_id_from_feature_metadata(&self) -> Option<String> {
self.feature_metadata
.as_ref()
.and_then(|fm| fm.payment_revenue_recovery_metadata.as_ref())
.map(|rrm| {
rrm.billing_connector_payment_details
.and_then(|metadata| metadata.payment_revenue_recovery_metadata.as_ref())
.map(|recovery_metadata| {
recovery_metadata
.billing_connector_payment_details
.connector_customer_id
.clone()
})
}
pub fn get_billing_merchant_connector_account_id(
&self,
) -> Option<id_type::MerchantConnectorAccountId> {
self.feature_metadata.as_ref().and_then(|feature_metadata| {
feature_metadata.get_billing_merchant_connector_account_id()
})
}
fn get_request_incremental_authorization_value(
request: &api_models::payments::PaymentsCreateIntentRequest,
) -> CustomResult<

View File

@ -32,6 +32,7 @@ pub trait ConnectorV2:
+ api::ConnectorMandateRevokeV2
+ api::authentication_v2::ExternalAuthenticationV2
+ api::UnifiedAuthenticationServiceV2
+ api::revenue_recovery_v2::RevenueRecoveryV2
{
}
impl<
@ -49,7 +50,8 @@ impl<
+ api::FraudCheckV2
+ api::ConnectorMandateRevokeV2
+ api::authentication_v2::ExternalAuthenticationV2
+ api::UnifiedAuthenticationServiceV2,
+ api::UnifiedAuthenticationServiceV2
+ api::revenue_recovery_v2::RevenueRecoveryV2,
> ConnectorV2 for T
{
}

View File

@ -1,8 +1,12 @@
pub mod transformers;
pub mod types;
use api_models::{payments::PaymentsRetrieveRequest, process_tracker::revenue_recovery};
use api_models::{
payments::{PaymentRevenueRecoveryMetadata, PaymentsRetrieveRequest},
process_tracker::revenue_recovery,
};
use common_utils::{
self,
errors::CustomResult,
ext_traits::{OptionExt, ValueExt},
id_type,
types::keymanager::KeyManagerState,
@ -11,6 +15,9 @@ use diesel_models::process_tracker::business_status;
use error_stack::{self, ResultExt};
use hyperswitch_domain_models::{
api::ApplicationResponse,
behaviour::ReverseConversion,
errors::api_error_response,
merchant_connector_account,
payments::{PaymentIntent, PaymentStatusData},
ApiModelToDieselModelConvertor,
};
@ -42,6 +49,7 @@ pub async fn perform_execute_payment(
pcr_data: &pcr::PcrPaymentData,
_key_manager_state: &KeyManagerState,
payment_intent: &PaymentIntent,
billing_mca: &merchant_connector_account::MerchantConnectorAccount,
) -> Result<(), sch_errors::ProcessTrackerError> {
let db = &*state.store;
@ -80,6 +88,7 @@ pub async fn perform_execute_payment(
execute_task_process,
pcr_data,
&mut pcr_metadata,
billing_mca,
))
.await?;
}
@ -263,7 +272,7 @@ pub async fn retrieve_revenue_recovery_process_tracker(
.find_process_by_id(&process_tracker_id_for_psync)
.await
.map_err(|e| {
logger::error!("Error while retreiving psync task : {:?}", e);
logger::error!("Error while retrieving psync task : {:?}", e);
})
.ok()
.flatten();

View File

@ -1,3 +1,5 @@
use std::marker::PhantomData;
use api_models::{
enums as api_enums,
mandates::RecurringDetails,
@ -6,12 +8,23 @@ use api_models::{
PaymentsUpdateIntentRequest, ProxyPaymentsRequest,
},
};
use common_utils::{self, ext_traits::OptionExt, id_type};
use common_utils::{
self,
ext_traits::{OptionExt, ValueExt},
id_type,
};
use diesel_models::{enums, process_tracker::business_status, types as diesel_types};
use error_stack::{self, ResultExt};
use hyperswitch_domain_models::{
business_profile, merchant_account,
payments::{PaymentConfirmData, PaymentIntent, PaymentIntentData},
business_profile, merchant_connector_account,
payments::{
self as domain_payments, payment_attempt, PaymentConfirmData, PaymentIntent,
PaymentIntentData,
},
router_data_v2::{self, flow_common_types},
router_flow_types,
router_request_types::revenue_recovery as revenue_recovery_request,
router_response_types::revenue_recovery as revenue_recovery_response,
ApiModelToDieselModelConvertor,
};
use time::PrimitiveDateTime;
@ -19,13 +32,16 @@ use time::PrimitiveDateTime;
use crate::{
core::{
errors::{self, RouterResult},
payments::{self, operations::Operation},
payments::{self, helpers, operations::Operation},
revenue_recovery::{self as core_pcr},
},
db::StorageInterface,
logger,
routes::SessionState,
types::{api::payments as api_types, storage, transformers::ForeignInto},
services::{self, connector_integration_interface::RouterDataConversion},
types::{
self, api as api_types, api::payments as payments_types, storage, transformers::ForeignInto,
},
workflows::revenue_recovery::get_schedule_time_to_retry_mit_payments,
};
@ -121,8 +137,8 @@ impl Decision {
pub enum Action {
SyncPayment(id_type::GlobalAttemptId),
RetryPayment(PrimitiveDateTime),
TerminalFailure,
SuccessfulPayment,
TerminalFailure(payment_attempt::PaymentAttempt),
SuccessfulPayment(payment_attempt::PaymentAttempt),
ReviewPayment,
ManualReviewAction,
}
@ -141,13 +157,21 @@ impl Action {
// handle proxy api's response
match response {
Ok(payment_data) => match payment_data.payment_attempt.status.foreign_into() {
PcrAttemptStatus::Succeeded => Ok(Self::SuccessfulPayment),
PcrAttemptStatus::Succeeded => Ok(Self::SuccessfulPayment(
payment_data.payment_attempt.clone(),
)),
PcrAttemptStatus::Failed => {
Self::decide_retry_failure_action(db, merchant_id, process.clone()).await
Self::decide_retry_failure_action(
db,
merchant_id,
process.clone(),
&payment_data.payment_attempt,
)
.await
}
PcrAttemptStatus::Processing => {
Ok(Self::SyncPayment(payment_data.payment_attempt.id))
Ok(Self::SyncPayment(payment_data.payment_attempt.id.clone()))
}
PcrAttemptStatus::InvalidStatus(action) => {
logger::info!(?action, "Invalid Payment Status For PCR Payment");
@ -173,6 +197,7 @@ impl Action {
execute_task_process: &storage::ProcessTracker,
pcr_data: &storage::revenue_recovery::PcrPaymentData,
revenue_recovery_metadata: &mut PaymentRevenueRecoveryMetadata,
billing_mca: &merchant_connector_account::MerchantConnectorAccount,
) -> Result<(), errors::ProcessTrackerError> {
let db = &*state.store;
match self {
@ -231,8 +256,7 @@ impl Action {
Ok(())
}
Self::TerminalFailure => {
// TODO: Record a failure transaction back to Billing Connector
Self::TerminalFailure(payment_attempt) => {
db.as_scheduler()
.finish_process_with_business_status(
execute_task_process.clone(),
@ -241,10 +265,20 @@ impl Action {
.await
.change_context(errors::RecoveryError::ProcessTrackerFailure)
.attach_printable("Failed to update the process tracker")?;
// Record back to billing connector for terminal status
// TODO: Add support for retrying failed outgoing recordback webhooks
self.record_back_to_billing_connector(
state,
payment_attempt,
payment_intent,
billing_mca,
)
.await
.change_context(errors::RecoveryError::RecordBackToBillingConnectorFailed)
.attach_printable("Failed to record back the billing connector")?;
Ok(())
}
Self::SuccessfulPayment => {
// TODO: Record a successful transaction back to Billing Connector
Self::SuccessfulPayment(payment_attempt) => {
db.as_scheduler()
.finish_process_with_business_status(
execute_task_process.clone(),
@ -253,6 +287,17 @@ impl Action {
.await
.change_context(errors::RecoveryError::ProcessTrackerFailure)
.attach_printable("Failed to update the process tracker")?;
// Record back to billing connector for terminal status
// TODO: Add support for retrying failed outgoing recordback webhooks
self.record_back_to_billing_connector(
state,
payment_attempt,
payment_intent,
billing_mca,
)
.await
.change_context(errors::RecoveryError::RecordBackToBillingConnectorFailed)
.attach_printable("Failed to update the process tracker")?;
Ok(())
}
@ -272,17 +317,118 @@ impl Action {
}
}
async fn record_back_to_billing_connector(
&self,
state: &SessionState,
payment_attempt: &payment_attempt::PaymentAttempt,
payment_intent: &PaymentIntent,
billing_mca: &merchant_connector_account::MerchantConnectorAccount,
) -> RecoveryResult<()> {
let connector_name = billing_mca.connector_name.to_string();
let connector_data = api_types::ConnectorData::get_connector_by_name(
&state.conf.connectors,
&connector_name,
api_types::GetToken::Connector,
Some(billing_mca.get_id()),
)
.change_context(errors::RecoveryError::RecordBackToBillingConnectorFailed)
.attach_printable(
"invalid connector name received in billing merchant connector account",
)?;
let connector_integration: services::BoxedRevenueRecoveryRecordBackInterface<
router_flow_types::RecoveryRecordBack,
revenue_recovery_request::RevenueRecoveryRecordBackRequest,
revenue_recovery_response::RevenueRecoveryRecordBackResponse,
> = connector_data.connector.get_connector_integration();
let router_data = self.construct_recovery_record_back_router_data(
state,
billing_mca,
payment_attempt,
payment_intent,
)?;
let response = services::execute_connector_processing_step(
state,
connector_integration,
&router_data,
payments::CallConnectorAction::Trigger,
None,
)
.await
.change_context(errors::RecoveryError::RecordBackToBillingConnectorFailed)
.attach_printable("Failed while handling response of record back to billing connector")?;
let record_back_response = match response.response {
Ok(response) => Ok(response),
error @ Err(_) => {
router_env::logger::error!(?error);
Err(errors::RecoveryError::RecordBackToBillingConnectorFailed)
.attach_printable("Failed while recording back to billing connector")
}
}?;
Ok(())
}
pub fn construct_recovery_record_back_router_data(
&self,
state: &SessionState,
billing_mca: &merchant_connector_account::MerchantConnectorAccount,
payment_attempt: &payment_attempt::PaymentAttempt,
payment_intent: &PaymentIntent,
) -> RecoveryResult<hyperswitch_domain_models::types::RevenueRecoveryRecordBackRouterData> {
let auth_type: types::ConnectorAuthType =
helpers::MerchantConnectorAccountType::DbVal(Box::new(billing_mca.clone()))
.get_connector_account_details()
.parse_value("ConnectorAuthType")
.change_context(errors::RecoveryError::RecordBackToBillingConnectorFailed)?;
let merchant_reference_id = payment_intent
.merchant_reference_id
.clone()
.ok_or(errors::RecoveryError::RecordBackToBillingConnectorFailed)
.attach_printable(
"Merchant reference id not found while recording back to billing connector",
)?;
let router_data = router_data_v2::RouterDataV2 {
flow: PhantomData::<router_flow_types::RecoveryRecordBack>,
tenant_id: state.tenant.tenant_id.clone(),
resource_common_data: flow_common_types::RevenueRecoveryRecordBackData,
connector_auth_type: auth_type,
request: revenue_recovery_request::RevenueRecoveryRecordBackRequest {
merchant_reference_id,
amount: payment_attempt.get_total_amount(),
currency: payment_intent.amount_details.currency,
payment_method_type: payment_attempt.payment_method_subtype,
attempt_status: payment_attempt.status,
connector_transaction_id: payment_attempt
.connector_payment_id
.as_ref()
.map(|id| common_utils::types::ConnectorTransactionId::TxnId(id.clone())),
},
response: Err(types::ErrorResponse::default()),
};
let old_router_data =
flow_common_types::RevenueRecoveryRecordBackData::to_old_router_data(router_data)
.change_context(errors::RecoveryError::RecordBackToBillingConnectorFailed)
.attach_printable("Cannot construct record back router data")?;
Ok(old_router_data)
}
pub(crate) async fn decide_retry_failure_action(
db: &dyn StorageInterface,
merchant_id: &id_type::MerchantId,
pt: storage::ProcessTracker,
payment_attempt: &payment_attempt::PaymentAttempt,
) -> RecoveryResult<Self> {
let schedule_time =
get_schedule_time_to_retry_mit_payments(db, merchant_id, pt.retry_count + 1).await;
match schedule_time {
Some(schedule_time) => Ok(Self::RetryPayment(schedule_time)),
None => Ok(Self::TerminalFailure),
None => Ok(Self::TerminalFailure(payment_attempt.clone())),
}
}
}
@ -292,7 +438,7 @@ async fn call_proxy_api(
payment_intent: &PaymentIntent,
pcr_data: &storage::revenue_recovery::PcrPaymentData,
revenue_recovery: &PaymentRevenueRecoveryMetadata,
) -> RouterResult<PaymentConfirmData<api_types::Authorize>> {
) -> RouterResult<PaymentConfirmData<payments_types::Authorize>> {
let operation = payments::operations::proxy_payments_intent::PaymentProxyIntent;
let req = ProxyPaymentsRequest {
return_url: None,
@ -325,11 +471,11 @@ async fn call_proxy_api(
.await?;
let (payment_data, _req, _, _) = Box::pin(payments::proxy_for_payments_operation_core::<
api_types::Authorize,
payments_types::Authorize,
_,
_,
_,
PaymentConfirmData<api_types::Authorize>,
PaymentConfirmData<payments_types::Authorize>,
>(
state,
state.get_req_state(),
@ -351,14 +497,14 @@ pub async fn update_payment_intent_api(
global_payment_id: id_type::GlobalPaymentId,
pcr_data: &storage::revenue_recovery::PcrPaymentData,
update_req: PaymentsUpdateIntentRequest,
) -> RouterResult<PaymentIntentData<api_types::PaymentUpdateIntent>> {
) -> RouterResult<PaymentIntentData<payments_types::PaymentUpdateIntent>> {
// TODO : Use api handler instead of calling payments_intent_operation_core
let operation = payments::operations::PaymentUpdateIntent;
let (payment_data, _req, customer) = payments::payments_intent_operation_core::<
api_types::PaymentUpdateIntent,
payments_types::PaymentUpdateIntent,
_,
_,
PaymentIntentData<api_types::PaymentUpdateIntent>,
PaymentIntentData<payments_types::PaymentUpdateIntent>,
>(
state,
state.get_req_state(),

View File

@ -1,7 +1,10 @@
#[cfg(feature = "v2")]
use api_models::payments::PaymentsGetIntentRequest;
#[cfg(feature = "v2")]
use common_utils::ext_traits::{StringExt, ValueExt};
use common_utils::{
ext_traits::{StringExt, ValueExt},
id_type,
};
#[cfg(feature = "v2")]
use error_stack::ResultExt;
#[cfg(feature = "v2")]
@ -11,12 +14,14 @@ use router_env::logger;
use scheduler::{consumer::workflows::ProcessTrackerWorkflow, errors};
#[cfg(feature = "v2")]
use scheduler::{types::process_data, utils as scheduler_utils};
#[cfg(feature = "v2")]
use storage_impl::errors as storage_errors;
#[cfg(feature = "v2")]
use crate::{
core::{
payments,
revenue_recovery::{self as pcr},
admin, payments,
revenue_recovery::{self as pcr, types},
},
db::StorageInterface,
errors::StorageError,
@ -73,6 +78,23 @@ impl ProcessTrackerWorkflow<SessionState> for ExecutePcrWorkflow {
None,
)
.await?;
let store = state.store.as_ref();
let billing_merchant_connector_account_id: id_type::MerchantConnectorAccountId =
payment_data
.payment_intent
.get_billing_merchant_connector_account_id()
.ok_or(errors::ProcessTrackerError::ERecoveryError(
storage_errors::RecoveryError::BillingMerchantConnectorAccountIdNotFound.into(),
))?;
let billing_mca = store
.find_merchant_connector_account_by_id(
key_manager_state,
&billing_merchant_connector_account_id,
&pcr_data.key_store,
)
.await?;
match process.name.as_deref() {
Some("EXECUTE_WORKFLOW") => {
@ -83,6 +105,7 @@ impl ProcessTrackerWorkflow<SessionState> for ExecutePcrWorkflow {
&pcr_data,
key_manager_state,
&payment_data.payment_intent,
&billing_mca,
))
.await
}
@ -136,7 +159,7 @@ pub(crate) async fn extract_data_and_perform_action(
#[cfg(feature = "v2")]
pub(crate) async fn get_schedule_time_to_retry_mit_payments(
db: &dyn StorageInterface,
merchant_id: &common_utils::id_type::MerchantId,
merchant_id: &id_type::MerchantId,
retry_count: i32,
) -> Option<time::PrimitiveDateTime> {
let key = "pt_mapping_pcr_retries";

View File

@ -261,4 +261,8 @@ pub enum RecoveryError {
InvalidTask,
#[error("The Intended data was not found")]
ValueNotFound,
#[error("Failed to update billing connector")]
RecordBackToBillingConnectorFailed,
#[error("Failed to fetch billing connector account id")]
BillingMerchantConnectorAccountIdNotFound,
}