feat(reveue_recovery): Add support for multiple retry algorithms in revenue recovery workflow (#7915)

Co-authored-by: hyperswitch-bot[bot] <148525504+hyperswitch-bot[bot]@users.noreply.github.com>
Co-authored-by: Shankar Singh C <83439957+ShankarSinghC@users.noreply.github.com>
This commit is contained in:
Amisha Prabhat
2025-05-20 18:02:47 +05:30
committed by GitHub
parent 6e08edca39
commit 151b57fa10
18 changed files with 220 additions and 63 deletions

View File

@ -1066,6 +1066,10 @@ url = "http://localhost:8080" # Open Router URL
[billing_connectors_invoice_sync]
billing_connectors_which_requires_invoice_sync_call = "recurly" # List of billing connectors which has invoice sync api call
[revenue_recovery]
monitoring_threshold_in_seconds = 2592000 # 30*24*60*60 secs , threshold for monitoring the retry system
retry_algorithm_type = "cascading" # type of retry algorithm
[clone_connector_allowlist]
merchant_ids = "merchant_ids" # Comma-separated list of allowed merchant IDs
connector_names = "connector_names" # Comma-separated list of allowed connector names

View File

@ -731,5 +731,10 @@ billing_connectors_which_require_payment_sync = "stripebilling, recurly"
[billing_connectors_invoice_sync]
billing_connectors_which_requires_invoice_sync_call = "recurly"
[revenue_recovery]
monitoring_threshold_in_seconds = 2592000
retry_algorithm_type = "cascading"
[authentication_providers]
click_to_pay = {connector_list = "adyen, cybersource"}

View File

@ -741,3 +741,8 @@ billing_connectors_which_requires_invoice_sync_call = "recurly"
[authentication_providers]
click_to_pay = {connector_list = "adyen, cybersource"}
[revenue_recovery]
monitoring_threshold_in_seconds = 2592000
retry_algorithm_type = "cascading"

View File

@ -748,3 +748,7 @@ billing_connectors_which_requires_invoice_sync_call = "recurly"
[authentication_providers]
click_to_pay = {connector_list = "adyen, cybersource"}
[revenue_recovery]
monitoring_threshold_in_seconds = 2592000
retry_algorithm_type = "cascading"

View File

@ -1163,6 +1163,10 @@ click_to_pay = {connector_list = "adyen, cybersource"}
enabled = false
url = "http://localhost:8080"
[revenue_recovery]
monitoring_threshold_in_seconds = 2592000
retry_algorithm_type = "cascading"
[clone_connector_allowlist]
merchant_ids = "merchant_123, merchant_234" # Comma-separated list of allowed merchant IDs
connector_names = "stripe, adyen" # Comma-separated list of allowed connector names

View File

@ -1053,6 +1053,10 @@ enabled = true
[authentication_providers]
click_to_pay = {connector_list = "adyen, cybersource"}
[revenue_recovery]
monitoring_threshold_in_seconds = 2592000 # threshold for monitoring the retry system
retry_algorithm_type = "cascading" # type of retry algorithm
[clone_connector_allowlist]
merchant_ids = "merchant_123, merchant_234" # Comma-separated list of allowed merchant IDs
connector_names = "stripe, adyen" # Comma-separated list of allowed connector names

View File

@ -216,6 +216,7 @@ pub enum CardDiscovery {
Clone,
Copy,
Debug,
Default,
Hash,
Eq,
PartialEq,
@ -230,6 +231,7 @@ pub enum CardDiscovery {
#[serde(rename_all = "snake_case")]
#[strum(serialize_all = "snake_case")]
pub enum RevenueRecoveryAlgorithmType {
#[default]
Monitoring,
Smart,
Cascading,

View File

@ -5,6 +5,7 @@ use common_types::primitive_wrappers;
use common_utils::{encryption::Encryption, pii};
use diesel::{AsChangeset, Identifiable, Insertable, Queryable, Selectable};
use masking::Secret;
use time::Duration;
#[cfg(feature = "v1")]
use crate::schema::business_profile;
@ -796,4 +797,12 @@ pub struct RevenueRecoveryAlgorithmData {
pub monitoring_configured_timestamp: time::PrimitiveDateTime,
}
impl RevenueRecoveryAlgorithmData {
pub fn has_exceeded_monitoring_threshold(&self, monitoring_threshold_in_seconds: i64) -> bool {
let total_threshold_time = self.monitoring_configured_timestamp
+ Duration::seconds(monitoring_threshold_in_seconds);
common_utils::date_time::now() >= total_threshold_time
}
}
common_utils::impl_to_sql_from_sql_json!(RevenueRecoveryAlgorithmData);

View File

@ -536,6 +536,8 @@ pub(crate) async fn fetch_raw_secrets(
platform: conf.platform,
authentication_providers: conf.authentication_providers,
open_router: conf.open_router,
#[cfg(feature = "v2")]
revenue_recovery: conf.revenue_recovery,
debit_routing_config: conf.debit_routing_config,
clone_connector_allowlist: conf.clone_connector_allowlist,
}

View File

@ -43,6 +43,8 @@ use storage_impl::config::QueueStrategy;
#[cfg(feature = "olap")]
use crate::analytics::{AnalyticsConfig, AnalyticsProvider};
#[cfg(feature = "v2")]
use crate::types::storage::revenue_recovery;
use crate::{
configs,
core::errors::{ApplicationError, ApplicationResult},
@ -51,7 +53,6 @@ use crate::{
routes::app,
AppState,
};
pub const REQUIRED_FIELDS_CONFIG_FILE: &str = "payment_required_fields_v2.toml";
#[derive(clap::Parser, Default)]
@ -154,6 +155,8 @@ pub struct Settings<S: SecretState> {
pub platform: Platform,
pub authentication_providers: AuthenticationProviders,
pub open_router: OpenRouter,
#[cfg(feature = "v2")]
pub revenue_recovery: revenue_recovery::RevenueRecoverySettings,
pub clone_connector_allowlist: Option<CloneConnectorAllowlistConfig>,
}

View File

@ -2998,8 +2998,9 @@ pub async fn create_connector(
#[cfg(feature = "v2")]
if req.connector_type == common_enums::ConnectorType::BillingProcessor {
update_revenue_recovery_algorithm_under_profile(
business_profile.clone(),
let profile_wrapper = ProfileWrapper::new(business_profile.clone());
profile_wrapper
.update_revenue_recovery_algorithm_under_profile(
store,
key_manager_state,
merchant_context.get_merchant_key_store(),
@ -4706,16 +4707,15 @@ impl ProfileWrapper {
.attach_printable("Failed to update routing algorithm ref in business profile")?;
Ok(())
}
}
#[cfg(feature = "v2")]
pub async fn update_revenue_recovery_algorithm_under_profile(
profile: domain::Profile,
self,
db: &dyn StorageInterface,
key_manager_state: &KeyManagerState,
merchant_key_store: &domain::MerchantKeyStore,
revenue_recovery_retry_algorithm_type: common_enums::RevenueRecoveryAlgorithmType,
) -> RouterResult<()> {
let recovery_algorithm_data = diesel_models::business_profile::RevenueRecoveryAlgorithmData {
let recovery_algorithm_data =
diesel_models::business_profile::RevenueRecoveryAlgorithmData {
monitoring_configured_timestamp: date_time::now(),
};
let profile_update = domain::ProfileUpdate::RevenueRecoveryAlgorithmUpdate {
@ -4726,14 +4726,17 @@ pub async fn update_revenue_recovery_algorithm_under_profile(
db.update_profile_by_profile_id(
key_manager_state,
merchant_key_store,
profile,
self.profile,
profile_update,
)
.await
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable("Failed to update revenue recovery retry algorithm in business profile")?;
.attach_printable(
"Failed to update revenue recovery retry algorithm in business profile",
)?;
Ok(())
}
}
pub async fn extended_card_info_toggle(
state: SessionState,

View File

@ -483,6 +483,10 @@ pub enum RevenueRecoveryError {
RetryCountFetchFailed,
#[error("Failed to get the billing threshold retry count")]
BillingThresholdRetryCountFetchFailed,
#[error("Failed to get the retry algorithm type")]
RetryAlgorithmTypeNotFound,
#[error("Failed to update the retry algorithm type")]
RetryAlgorithmUpdationFailed,
#[error("Failed to create the revenue recovery attempt data")]
RevenueRecoveryAttemptDataCreateFailed,
}

View File

@ -31,7 +31,7 @@ pub async fn files_create_core(
.get_string_repr(),
file_id
);
let file_new = diesel_models::file::FileMetadataNew {
let file_new: diesel_models::FileMetadataNew = diesel_models::file::FileMetadataNew {
file_id: file_id.clone(),
merchant_id: merchant_context.get_merchant_account().get_id().clone(),
file_name: create_file_request.file_name.clone(),

View File

@ -7,7 +7,7 @@ use common_utils::{
ext_traits::{OptionExt, ValueExt},
id_type,
};
use diesel_models::process_tracker::business_status;
use diesel_models::{enums as diesel_enum, process_tracker::business_status};
use error_stack::{self, ResultExt};
use hyperswitch_domain_models::{payments::PaymentIntent, ApiModelToDieselModelConvertor};
use scheduler::errors as sch_errors;
@ -135,6 +135,7 @@ pub async fn perform_execute_payment(
revenue_recovery_payment_data.profile.get_id().clone(),
attempt_id.clone(),
storage::ProcessTrackerRunner::PassiveRecoveryWorkflow,
tracking_data.revenue_recovery_retry,
)
.await?;
@ -196,6 +197,7 @@ pub async fn perform_execute_payment(
Ok(())
}
#[allow(clippy::too_many_arguments)]
async fn insert_psync_pcr_task_to_pt(
billing_mca_id: id_type::MerchantConnectorAccountId,
db: &dyn StorageInterface,
@ -204,6 +206,7 @@ async fn insert_psync_pcr_task_to_pt(
profile_id: id_type::ProfileId,
payment_attempt_id: id_type::GlobalAttemptId,
runner: storage::ProcessTrackerRunner,
revenue_recovery_retry: diesel_enum::RevenueRecoveryAlgorithmType,
) -> RouterResult<storage::ProcessTracker> {
let task = PSYNC_WORKFLOW;
let process_tracker_id = payment_attempt_id.get_psync_revenue_recovery_id(task, runner);
@ -214,6 +217,7 @@ async fn insert_psync_pcr_task_to_pt(
merchant_id,
profile_id,
payment_attempt_id,
revenue_recovery_retry,
};
let tag = ["REVENUE_RECOVERY"];
let process_tracker_entry = storage::ProcessTrackerNew::new(

View File

@ -310,6 +310,7 @@ impl Action {
db,
merchant_id,
process.clone(),
revenue_recovery_payment_data,
&payment_data.payment_attempt,
)
.await
@ -354,6 +355,7 @@ impl Action {
revenue_recovery_payment_data.profile.get_id().to_owned(),
payment_attempt.id.clone(),
storage::ProcessTrackerRunner::PassiveRecoveryWorkflow,
revenue_recovery_payment_data.retry_algorithm,
)
.await
.change_context(errors::RecoveryError::ProcessTrackerFailure)
@ -478,13 +480,17 @@ impl Action {
pub async fn payment_sync_call(
state: &SessionState,
pcr_data: &storage::revenue_recovery::RevenueRecoveryPaymentData,
revenue_recovery_payment_data: &storage::revenue_recovery::RevenueRecoveryPaymentData,
global_payment_id: &id_type::GlobalPaymentId,
process: &storage::ProcessTracker,
payment_attempt: payment_attempt::PaymentAttempt,
) -> RecoveryResult<Self> {
let response =
revenue_recovery_core::api::call_psync_api(state, global_payment_id, pcr_data).await;
let response = revenue_recovery_core::api::call_psync_api(
state,
global_payment_id,
revenue_recovery_payment_data,
)
.await;
let db = &*state.store;
match response {
Ok(_payment_data) => match payment_attempt.status.foreign_into() {
@ -494,8 +500,9 @@ impl Action {
RevenueRecoveryPaymentsAttemptStatus::Failed => {
Self::decide_retry_failure_action(
db,
pcr_data.merchant_account.get_id(),
revenue_recovery_payment_data.merchant_account.get_id(),
process.clone(),
revenue_recovery_payment_data,
&payment_attempt,
)
.await
@ -688,10 +695,14 @@ impl Action {
db: &dyn StorageInterface,
merchant_id: &id_type::MerchantId,
pt: storage::ProcessTracker,
revenue_recovery_payment_data: &storage::revenue_recovery::RevenueRecoveryPaymentData,
payment_attempt: &payment_attempt::PaymentAttempt,
) -> RecoveryResult<Self> {
let schedule_time =
get_schedule_time_to_retry_mit_payments(db, merchant_id, pt.retry_count + 1).await;
let next_retry_count = pt.retry_count + 1;
let schedule_time = revenue_recovery_payment_data
.get_schedule_time_based_on_retry_type(db, merchant_id, next_retry_count)
.await;
match schedule_time {
Some(schedule_time) => Ok(Self::RetryPayment(schedule_time)),

View File

@ -1,6 +1,6 @@
use std::{marker::PhantomData, str::FromStr};
use api_models::{payments as api_payments, webhooks};
use api_models::{enums as api_enums, payments as api_payments, webhooks};
use common_utils::{
ext_traits::{AsyncExt, ValueExt},
id_type,
@ -17,6 +17,7 @@ use router_env::{instrument, tracing};
use crate::{
core::{
admin,
errors::{self, CustomResult},
payments::{self, helpers},
},
@ -54,7 +55,7 @@ pub async fn recovery_incoming_webhook_flow(
))
})?;
let connector = api_models::enums::Connector::from_str(connector_name)
let connector = api_enums::Connector::from_str(connector_name)
.change_context(errors::RevenueRecoveryError::InvoiceWebhookProcessingFailed)
.attach_printable_lazy(|| format!("unable to parse connector name {connector_name:?}"))?;
@ -155,6 +156,21 @@ pub async fn recovery_incoming_webhook_flow(
match action {
revenue_recovery::RecoveryAction::CancelInvoice => todo!(),
revenue_recovery::RecoveryAction::ScheduleFailedPayment => {
let recovery_algorithm_type = business_profile
.revenue_recovery_retry_algorithm_type
.ok_or(report!(
errors::RevenueRecoveryError::RetryAlgorithmTypeNotFound
))?;
match recovery_algorithm_type {
api_enums::RevenueRecoveryAlgorithmType::Monitoring => {
handle_monitoring_threshold(
&state,
&business_profile,
merchant_context.get_merchant_key_store(),
)
.await
}
revenue_recovery_retry_type => {
handle_schedule_failed_payment(
&billing_connector_account,
intent_retry_count,
@ -166,9 +182,12 @@ pub async fn recovery_incoming_webhook_flow(
recovery_intent_from_payment_attempt,
),
&business_profile,
revenue_recovery_retry_type,
)
.await
}
}
}
revenue_recovery::RecoveryAction::SuccessPaymentExternal => {
// Need to add recovery stop flow for this scenario
router_env::logger::info!("Payment has been succeeded via external system");
@ -195,6 +214,38 @@ pub async fn recovery_incoming_webhook_flow(
}
}
async fn handle_monitoring_threshold(
state: &SessionState,
business_profile: &domain::Profile,
key_store: &domain::MerchantKeyStore,
) -> CustomResult<webhooks::WebhookResponseTracker, errors::RevenueRecoveryError> {
let db = &*state.store;
let key_manager_state = &(state).into();
let monitoring_threshold_config = state.conf.revenue_recovery.monitoring_threshold_in_seconds;
let retry_algorithm_type = state.conf.revenue_recovery.retry_algorithm_type;
let revenue_recovery_retry_algorithm = business_profile
.revenue_recovery_retry_algorithm_data
.clone()
.ok_or(report!(
errors::RevenueRecoveryError::RetryAlgorithmTypeNotFound
))?;
if revenue_recovery_retry_algorithm
.has_exceeded_monitoring_threshold(monitoring_threshold_config)
{
let profile_wrapper = admin::ProfileWrapper::new(business_profile.clone());
profile_wrapper
.update_revenue_recovery_algorithm_under_profile(
db,
key_manager_state,
key_store,
retry_algorithm_type,
)
.await
.change_context(errors::RevenueRecoveryError::RetryAlgorithmUpdationFailed)?;
}
Ok(webhooks::WebhookResponseTracker::NoEffect)
}
#[allow(clippy::too_many_arguments)]
async fn handle_schedule_failed_payment(
billing_connector_account: &domain::MerchantConnectorAccount,
intent_retry_count: u16,
@ -206,6 +257,7 @@ async fn handle_schedule_failed_payment(
revenue_recovery::RecoveryPaymentIntent,
),
business_profile: &domain::Profile,
revenue_recovery_retry: api_enums::RevenueRecoveryAlgorithmType,
) -> CustomResult<webhooks::WebhookResponseTracker, errors::RevenueRecoveryError> {
let (recovery_attempt_from_payment_attempt, recovery_intent_from_payment_attempt) =
payment_attempt_with_recovery_intent;
@ -230,6 +282,7 @@ async fn handle_schedule_failed_payment(
.as_ref()
.map(|attempt| attempt.attempt_id.clone()),
storage::ProcessTrackerRunner::PassiveRecoveryWorkflow,
revenue_recovery_retry,
)
.await
})
@ -697,6 +750,7 @@ impl RevenueRecoveryAttempt {
intent_retry_count: u16,
payment_attempt_id: Option<id_type::GlobalAttemptId>,
runner: storage::ProcessTrackerRunner,
revenue_recovery_retry: api_enums::RevenueRecoveryAlgorithmType,
) -> CustomResult<webhooks::WebhookResponseTracker, errors::RevenueRecoveryError> {
let task = "EXECUTE_WORKFLOW";
@ -731,6 +785,7 @@ impl RevenueRecoveryAttempt {
merchant_id,
profile_id,
payment_attempt_id,
revenue_recovery_retry,
};
let tag = ["PCR"];

View File

@ -1,17 +1,21 @@
use std::fmt::Debug;
use common_enums::enums;
use common_utils::id_type;
use hyperswitch_domain_models::{
business_profile, merchant_account, merchant_connector_account, merchant_key_store,
};
use router_env::logger;
#[derive(serde::Serialize, serde::Deserialize, Debug, Clone)]
use crate::{db::StorageInterface, workflows::revenue_recovery};
#[derive(serde::Serialize, serde::Deserialize, Debug)]
pub struct RevenueRecoveryWorkflowTrackingData {
pub merchant_id: id_type::MerchantId,
pub profile_id: id_type::ProfileId,
pub global_payment_id: id_type::GlobalPaymentId,
pub payment_attempt_id: id_type::GlobalAttemptId,
pub billing_mca_id: id_type::MerchantConnectorAccountId,
pub revenue_recovery_retry: enums::RevenueRecoveryAlgorithmType,
}
#[derive(Debug, Clone)]
@ -20,4 +24,37 @@ pub struct RevenueRecoveryPaymentData {
pub profile: business_profile::Profile,
pub key_store: merchant_key_store::MerchantKeyStore,
pub billing_mca: merchant_connector_account::MerchantConnectorAccount,
pub retry_algorithm: enums::RevenueRecoveryAlgorithmType,
}
impl RevenueRecoveryPaymentData {
pub async fn get_schedule_time_based_on_retry_type(
&self,
db: &dyn StorageInterface,
merchant_id: &id_type::MerchantId,
retry_count: i32,
) -> Option<time::PrimitiveDateTime> {
match self.retry_algorithm {
enums::RevenueRecoveryAlgorithmType::Monitoring => {
logger::error!("Monitoring type found for Revenue Recovery retry payment");
None
}
enums::RevenueRecoveryAlgorithmType::Cascading => {
revenue_recovery::get_schedule_time_to_retry_mit_payments(
db,
merchant_id,
retry_count,
)
.await
}
enums::RevenueRecoveryAlgorithmType::Smart => {
// TODO: Integrate the smart retry call to return back a schedule time
None
}
}
}
}
#[derive(Debug, serde::Deserialize, Clone, Default)]
pub struct RevenueRecoverySettings {
pub monitoring_threshold_in_seconds: i64,
pub retry_algorithm_type: enums::RevenueRecoveryAlgorithmType,
}

View File

@ -20,8 +20,8 @@ use storage_impl::errors as storage_errors;
#[cfg(feature = "v2")]
use crate::{
core::{
admin, payments,
revenue_recovery::{self as pcr, types},
payments,
revenue_recovery::{self as pcr},
},
db::StorageInterface,
errors::StorageError,
@ -155,6 +155,7 @@ pub(crate) async fn extract_data_and_perform_action(
profile,
key_store,
billing_mca,
retry_algorithm: tracking_data.revenue_recovery_retry,
};
Ok(pcr_payment_data)
}