mirror of
https://github.com/juspay/hyperswitch.git
synced 2025-10-28 04:04:55 +08:00
feat(revenue_recovery): Add redis-based payment processor token tracking for revenue recovery (#8846)
Co-authored-by: Aniket Burman <aniket.burman@Aniket-Burman-JDXHW2PH34.local> Co-authored-by: hyperswitch-bot[bot] <148525504+hyperswitch-bot[bot]@users.noreply.github.com> Co-authored-by: chikke srujan <121822803+srujanchikke@users.noreply.github.com> Co-authored-by: Aditya Chaurasia <113281443+AdityaKumaar21@users.noreply.github.com>
This commit is contained in:
@ -496,4 +496,6 @@ pub enum RevenueRecoveryError {
|
||||
RetryAlgorithmUpdationFailed,
|
||||
#[error("Failed to create the revenue recovery attempt data")]
|
||||
RevenueRecoveryAttemptDataCreateFailed,
|
||||
#[error("Failed to insert the revenue recovery payment method data in redis")]
|
||||
RevenueRecoveryRedisInsertFailed,
|
||||
}
|
||||
|
||||
@ -5508,7 +5508,10 @@ impl ForeignFrom<&hyperswitch_domain_models::payments::payment_attempt::PaymentA
|
||||
created_at: attempt.created_at,
|
||||
modified_at: attempt.modified_at,
|
||||
cancellation_reason: attempt.cancellation_reason.clone(),
|
||||
payment_token: attempt.payment_token.clone(),
|
||||
payment_token: attempt
|
||||
.connector_token_details
|
||||
.as_ref()
|
||||
.and_then(|details| details.connector_mandate_id.clone()),
|
||||
connector_metadata: attempt.connector_metadata.clone(),
|
||||
payment_experience: attempt.payment_experience,
|
||||
payment_method_type: attempt.payment_method_type,
|
||||
|
||||
@ -1,5 +1,5 @@
|
||||
use actix_web::{web, Responder};
|
||||
use api_models::payments as payments_api;
|
||||
use api_models::{payments as payments_api, payments as api_payments};
|
||||
use common_utils::id_type;
|
||||
use error_stack::{report, FutureExt, ResultExt};
|
||||
use hyperswitch_domain_models::{
|
||||
|
||||
@ -107,10 +107,25 @@ impl ForeignFrom<&api_models::payments::RecoveryPaymentsCreate>
|
||||
retry_count: None,
|
||||
invoice_next_billing_time: None,
|
||||
invoice_billing_started_at_time: data.billing_started_at,
|
||||
card_network: card_info
|
||||
.as_ref()
|
||||
.and_then(|info| info.card_network.clone()),
|
||||
card_isin: card_info.as_ref().and_then(|info| info.card_isin.clone()),
|
||||
card_info: card_info
|
||||
.cloned()
|
||||
.unwrap_or(api_models::payments::AdditionalCardInfo {
|
||||
card_issuer: None,
|
||||
card_network: None,
|
||||
card_type: None,
|
||||
card_issuing_country: None,
|
||||
bank_code: None,
|
||||
last4: None,
|
||||
card_isin: None,
|
||||
card_extended_bin: None,
|
||||
card_exp_month: None,
|
||||
card_exp_year: None,
|
||||
card_holder_name: None,
|
||||
payment_checks: None,
|
||||
authentication_data: None,
|
||||
is_regulated: None,
|
||||
signature_network: None,
|
||||
}),
|
||||
charge_id: None,
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,4 +1,4 @@
|
||||
use std::{marker::PhantomData, str::FromStr};
|
||||
use std::{collections::HashMap, marker::PhantomData, str::FromStr};
|
||||
|
||||
use api_models::{enums as api_enums, payments as api_payments, webhooks};
|
||||
use common_utils::{
|
||||
@ -30,11 +30,19 @@ use crate::{
|
||||
connector_integration_interface::{self, RouterDataConversion},
|
||||
},
|
||||
types::{
|
||||
self, api, domain, storage::revenue_recovery as storage_churn_recovery,
|
||||
self, api, domain,
|
||||
storage::{
|
||||
revenue_recovery as storage_revenue_recovery,
|
||||
revenue_recovery_redis_operation::{
|
||||
PaymentProcessorTokenDetails, PaymentProcessorTokenStatus, RedisTokenManager,
|
||||
},
|
||||
},
|
||||
transformers::ForeignFrom,
|
||||
},
|
||||
workflows::revenue_recovery as revenue_recovery_flow,
|
||||
};
|
||||
#[cfg(feature = "v2")]
|
||||
pub const REVENUE_RECOVERY: &str = "revenue_recovery";
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
#[instrument(skip_all)]
|
||||
@ -617,14 +625,15 @@ impl RevenueRecoveryAttempt {
|
||||
errors::RevenueRecoveryError,
|
||||
> {
|
||||
let payment_connector_id = payment_connector_account.as_ref().map(|account: &hyperswitch_domain_models::merchant_connector_account::MerchantConnectorAccount| account.id.clone());
|
||||
let payment_connector_name = payment_connector_account
|
||||
.as_ref()
|
||||
.map(|account| account.connector_name);
|
||||
let request_payload: api_payments::PaymentsAttemptRecordRequest = self
|
||||
.create_payment_record_request(
|
||||
state,
|
||||
billing_connector_account_id,
|
||||
payment_connector_id,
|
||||
payment_connector_account
|
||||
.as_ref()
|
||||
.map(|account| account.connector_name),
|
||||
payment_connector_name,
|
||||
common_enums::TriggeredBy::External,
|
||||
)
|
||||
.await?;
|
||||
@ -685,6 +694,16 @@ impl RevenueRecoveryAttempt {
|
||||
|
||||
let response = (recovery_attempt, updated_recovery_intent);
|
||||
|
||||
self.store_payment_processor_tokens_in_redis(state, &response.0, payment_connector_name)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
router_env::logger::error!(
|
||||
"Failed to store payment processor tokens in Redis: {:?}",
|
||||
e
|
||||
);
|
||||
errors::RevenueRecoveryError::RevenueRecoveryRedisInsertFailed
|
||||
})?;
|
||||
|
||||
Ok(response)
|
||||
}
|
||||
|
||||
@ -709,6 +728,7 @@ impl RevenueRecoveryAttempt {
|
||||
};
|
||||
|
||||
let card_info = revenue_recovery_attempt_data
|
||||
.card_info
|
||||
.card_isin
|
||||
.clone()
|
||||
.async_and_then(|isin| async move {
|
||||
@ -755,7 +775,7 @@ impl RevenueRecoveryAttempt {
|
||||
invoice_billing_started_at_time: revenue_recovery_attempt_data
|
||||
.invoice_billing_started_at_time,
|
||||
triggered_by,
|
||||
card_network: revenue_recovery_attempt_data.card_network.clone(),
|
||||
card_network: revenue_recovery_attempt_data.card_info.card_network.clone(),
|
||||
card_issuer,
|
||||
})
|
||||
}
|
||||
@ -899,7 +919,7 @@ impl RevenueRecoveryAttempt {
|
||||
.attach_printable("payment attempt id is required for pcr workflow tracking")?;
|
||||
|
||||
let execute_workflow_tracking_data =
|
||||
storage_churn_recovery::RevenueRecoveryWorkflowTrackingData {
|
||||
storage_revenue_recovery::RevenueRecoveryWorkflowTrackingData {
|
||||
billing_mca_id: billing_mca_id.clone(),
|
||||
global_payment_id: payment_id.clone(),
|
||||
merchant_id,
|
||||
@ -934,6 +954,77 @@ impl RevenueRecoveryAttempt {
|
||||
status: payment_intent.status,
|
||||
})
|
||||
}
|
||||
|
||||
/// Store payment processor tokens in Redis for retry management
|
||||
async fn store_payment_processor_tokens_in_redis(
|
||||
&self,
|
||||
state: &SessionState,
|
||||
recovery_attempt: &revenue_recovery::RecoveryPaymentAttempt,
|
||||
payment_connector_name: Option<common_enums::connector_enums::Connector>,
|
||||
) -> CustomResult<(), errors::RevenueRecoveryError> {
|
||||
let revenue_recovery_attempt_data = &self.0;
|
||||
let error_code = revenue_recovery_attempt_data.error_code.clone();
|
||||
let error_message = revenue_recovery_attempt_data.error_message.clone();
|
||||
let connector_name = payment_connector_name
|
||||
.ok_or(errors::RevenueRecoveryError::TransactionWebhookProcessingFailed)
|
||||
.attach_printable("unable to derive payment connector")?
|
||||
.to_string();
|
||||
|
||||
let gsm_record = helpers::get_gsm_record(
|
||||
state,
|
||||
error_code.clone(),
|
||||
error_message,
|
||||
connector_name,
|
||||
REVENUE_RECOVERY.to_string(),
|
||||
)
|
||||
.await;
|
||||
|
||||
let is_hard_decline = gsm_record
|
||||
.and_then(|record| record.error_category)
|
||||
.map(|category| category == common_enums::ErrorCategory::HardDecline)
|
||||
.unwrap_or(false);
|
||||
|
||||
// Extract required fields from the revenue recovery attempt data
|
||||
let connector_customer_id = revenue_recovery_attempt_data.connector_customer_id.clone();
|
||||
|
||||
let attempt_id = recovery_attempt.attempt_id.clone();
|
||||
let token_unit = PaymentProcessorTokenStatus {
|
||||
error_code,
|
||||
inserted_by_attempt_id: attempt_id.clone(),
|
||||
daily_retry_history: HashMap::from([(recovery_attempt.created_at.date(), 1)]),
|
||||
scheduled_at: None,
|
||||
is_hard_decline: Some(is_hard_decline),
|
||||
payment_processor_token_details: PaymentProcessorTokenDetails {
|
||||
payment_processor_token: revenue_recovery_attempt_data
|
||||
.processor_payment_method_token
|
||||
.clone(),
|
||||
expiry_month: revenue_recovery_attempt_data
|
||||
.card_info
|
||||
.card_exp_month
|
||||
.clone(),
|
||||
expiry_year: revenue_recovery_attempt_data
|
||||
.card_info
|
||||
.card_exp_year
|
||||
.clone(),
|
||||
card_issuer: revenue_recovery_attempt_data.card_info.card_issuer.clone(),
|
||||
last_four_digits: revenue_recovery_attempt_data.card_info.last4.clone(),
|
||||
card_network: revenue_recovery_attempt_data.card_info.card_network.clone(),
|
||||
card_type: revenue_recovery_attempt_data.card_info.card_type.clone(),
|
||||
},
|
||||
};
|
||||
|
||||
// Make the Redis call to store tokens
|
||||
RedisTokenManager::upsert_payment_processor_token(
|
||||
state,
|
||||
&connector_customer_id,
|
||||
token_unit,
|
||||
)
|
||||
.await
|
||||
.change_context(errors::RevenueRecoveryError::RevenueRecoveryRedisInsertFailed)
|
||||
.attach_printable("Failed to store payment processor tokens in Redis")?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub struct BillingConnectorPaymentsSyncResponseData(
|
||||
|
||||
@ -36,6 +36,8 @@ pub mod payouts;
|
||||
pub mod refund;
|
||||
#[cfg(feature = "v2")]
|
||||
pub mod revenue_recovery;
|
||||
#[cfg(feature = "v2")]
|
||||
pub mod revenue_recovery_redis_operation;
|
||||
pub mod reverse_lookup;
|
||||
pub mod role;
|
||||
pub mod routing_algorithm;
|
||||
|
||||
@ -1,7 +1,8 @@
|
||||
use std::fmt::Debug;
|
||||
use std::{collections::HashMap, fmt::Debug};
|
||||
|
||||
use common_enums::enums;
|
||||
use common_enums::enums::{self, CardNetwork};
|
||||
use common_utils::{date_time, ext_traits::ValueExt, id_type};
|
||||
use error_stack::ResultExt;
|
||||
use external_services::grpc_client::{self as external_grpc_client, GrpcHeaders};
|
||||
use hyperswitch_domain_models::{
|
||||
business_profile, merchant_account, merchant_connector_account, merchant_key_store,
|
||||
@ -10,6 +11,7 @@ use hyperswitch_domain_models::{
|
||||
};
|
||||
use masking::PeekInterface;
|
||||
use router_env::logger;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::{db::StorageInterface, routes::SessionState, workflows::revenue_recovery};
|
||||
#[derive(serde::Serialize, serde::Deserialize, Debug)]
|
||||
@ -53,20 +55,7 @@ impl RevenueRecoveryPaymentData {
|
||||
)
|
||||
.await
|
||||
}
|
||||
enums::RevenueRecoveryAlgorithmType::Smart => {
|
||||
if is_hard_decline {
|
||||
None
|
||||
} else {
|
||||
// TODO: Integrate the smart retry call to return back a schedule time
|
||||
revenue_recovery::get_schedule_time_for_smart_retry(
|
||||
state,
|
||||
payment_attempt,
|
||||
payment_intent,
|
||||
retry_count,
|
||||
)
|
||||
.await
|
||||
}
|
||||
}
|
||||
enums::RevenueRecoveryAlgorithmType::Smart => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -76,6 +65,8 @@ pub struct RevenueRecoverySettings {
|
||||
pub monitoring_threshold_in_seconds: i64,
|
||||
pub retry_algorithm_type: enums::RevenueRecoveryAlgorithmType,
|
||||
pub recovery_timestamp: RecoveryTimestamp,
|
||||
pub card_config: RetryLimitsConfig,
|
||||
pub redis_ttl_in_seconds: i64,
|
||||
}
|
||||
|
||||
#[derive(Debug, serde::Deserialize, Clone)]
|
||||
@ -90,3 +81,28 @@ impl Default for RecoveryTimestamp {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, serde::Deserialize, Clone, Default)]
|
||||
pub struct RetryLimitsConfig(pub HashMap<CardNetwork, NetworkRetryConfig>);
|
||||
|
||||
#[derive(Debug, serde::Deserialize, Clone, Default)]
|
||||
pub struct NetworkRetryConfig {
|
||||
pub max_retries_per_day: i32,
|
||||
pub max_retry_count_for_thirty_day: i32,
|
||||
}
|
||||
|
||||
impl RetryLimitsConfig {
|
||||
pub fn get_network_config(&self, network: Option<CardNetwork>) -> &NetworkRetryConfig {
|
||||
// Hardcoded fallback default config
|
||||
static DEFAULT_CONFIG: NetworkRetryConfig = NetworkRetryConfig {
|
||||
max_retries_per_day: 20,
|
||||
max_retry_count_for_thirty_day: 20,
|
||||
};
|
||||
|
||||
if let Some(net) = network {
|
||||
self.0.get(&net).unwrap_or(&DEFAULT_CONFIG)
|
||||
} else {
|
||||
self.0.get(&CardNetwork::Visa).unwrap_or(&DEFAULT_CONFIG)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -0,0 +1,655 @@
|
||||
use std::collections::HashMap;
|
||||
|
||||
use common_enums::enums::CardNetwork;
|
||||
use common_utils::{date_time, errors::CustomResult, id_type};
|
||||
use error_stack::ResultExt;
|
||||
use masking::Secret;
|
||||
use redis_interface::{DelReply, SetnxReply};
|
||||
use router_env::{instrument, tracing};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use time::{Date, Duration, OffsetDateTime, PrimitiveDateTime};
|
||||
|
||||
use crate::{db::errors, SessionState};
|
||||
|
||||
// Constants for retry window management
|
||||
const RETRY_WINDOW_DAYS: i32 = 30;
|
||||
const INITIAL_RETRY_COUNT: i32 = 0;
|
||||
|
||||
/// Payment processor token details including card information
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
|
||||
pub struct PaymentProcessorTokenDetails {
|
||||
pub payment_processor_token: String,
|
||||
pub expiry_month: Option<Secret<String>>,
|
||||
pub expiry_year: Option<Secret<String>>,
|
||||
pub card_issuer: Option<String>,
|
||||
pub last_four_digits: Option<String>,
|
||||
pub card_network: Option<CardNetwork>,
|
||||
pub card_type: Option<String>,
|
||||
}
|
||||
|
||||
/// Represents the status and retry history of a payment processor token
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct PaymentProcessorTokenStatus {
|
||||
/// Payment processor token details including card information and token ID
|
||||
pub payment_processor_token_details: PaymentProcessorTokenDetails,
|
||||
/// Payment intent ID that originally inserted this token
|
||||
pub inserted_by_attempt_id: id_type::GlobalAttemptId,
|
||||
/// Error code associated with the token failure
|
||||
pub error_code: Option<String>,
|
||||
/// Daily retry count history for the last 30 days (date -> retry_count)
|
||||
pub daily_retry_history: HashMap<Date, i32>,
|
||||
/// Scheduled time for the next retry attempt
|
||||
pub scheduled_at: Option<PrimitiveDateTime>,
|
||||
/// Indicates if the token is a hard decline (no retries allowed)
|
||||
pub is_hard_decline: Option<bool>,
|
||||
}
|
||||
|
||||
/// Token retry availability information with detailed wait times
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct TokenRetryInfo {
|
||||
pub monthly_wait_hours: i64, // Hours to wait for 30-day limit reset
|
||||
pub daily_wait_hours: i64, // Hours to wait for daily limit reset
|
||||
pub total_30_day_retries: i32, // Current total retry count in 30-day window
|
||||
}
|
||||
|
||||
/// Complete token information with retry limits and wait times
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct PaymentProcessorTokenWithRetryInfo {
|
||||
/// The complete token status information
|
||||
pub token_status: PaymentProcessorTokenStatus,
|
||||
/// Hours to wait before next retry attempt (max of daily/monthly wait)
|
||||
pub retry_wait_time_hours: i64,
|
||||
/// Number of retries remaining in the 30-day rolling window
|
||||
pub monthly_retry_remaining: i32,
|
||||
}
|
||||
|
||||
/// Redis-based token management struct
|
||||
pub struct RedisTokenManager;
|
||||
|
||||
impl RedisTokenManager {
|
||||
/// Lock connector customer
|
||||
#[instrument(skip_all)]
|
||||
pub async fn lock_connector_customer_status(
|
||||
state: &SessionState,
|
||||
connector_customer_id: &str,
|
||||
payment_id: &id_type::GlobalPaymentId,
|
||||
) -> CustomResult<bool, errors::StorageError> {
|
||||
let redis_conn =
|
||||
state
|
||||
.store
|
||||
.get_redis_conn()
|
||||
.change_context(errors::StorageError::RedisError(
|
||||
errors::RedisError::RedisConnectionError.into(),
|
||||
))?;
|
||||
|
||||
let lock_key = format!("customer:{connector_customer_id}:status");
|
||||
let seconds = &state.conf.revenue_recovery.redis_ttl_in_seconds;
|
||||
|
||||
let result: bool = match redis_conn
|
||||
.set_key_if_not_exists_with_expiry(
|
||||
&lock_key.into(),
|
||||
payment_id.get_string_repr(),
|
||||
Some(*seconds),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(resp) => resp == SetnxReply::KeySet,
|
||||
Err(error) => {
|
||||
tracing::error!(operation = "lock_stream", err = ?error);
|
||||
false
|
||||
}
|
||||
};
|
||||
|
||||
tracing::debug!(
|
||||
connector_customer_id = connector_customer_id,
|
||||
payment_id = payment_id.get_string_repr(),
|
||||
lock_acquired = %result,
|
||||
"Connector customer lock attempt"
|
||||
);
|
||||
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
/// Unlock connector customer status
|
||||
#[instrument(skip_all)]
|
||||
pub async fn unlock_connector_customer_status(
|
||||
state: &SessionState,
|
||||
connector_customer_id: &str,
|
||||
) -> CustomResult<bool, errors::StorageError> {
|
||||
let redis_conn =
|
||||
state
|
||||
.store
|
||||
.get_redis_conn()
|
||||
.change_context(errors::StorageError::RedisError(
|
||||
errors::RedisError::RedisConnectionError.into(),
|
||||
))?;
|
||||
|
||||
let lock_key = format!("customer:{connector_customer_id}:status");
|
||||
|
||||
match redis_conn.delete_key(&lock_key.into()).await {
|
||||
Ok(DelReply::KeyDeleted) => {
|
||||
tracing::debug!(
|
||||
connector_customer_id = connector_customer_id,
|
||||
"Connector customer unlocked"
|
||||
);
|
||||
Ok(true)
|
||||
}
|
||||
Ok(DelReply::KeyNotDeleted) => {
|
||||
tracing::debug!("Tried to unlock a stream which is already unlocked");
|
||||
Ok(false)
|
||||
}
|
||||
Err(err) => {
|
||||
tracing::error!(?err, "Failed to delete lock key");
|
||||
Ok(false)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Get all payment processor tokens for a connector customer
|
||||
#[instrument(skip_all)]
|
||||
pub async fn get_connector_customer_payment_processor_tokens(
|
||||
state: &SessionState,
|
||||
connector_customer_id: &str,
|
||||
) -> CustomResult<HashMap<String, PaymentProcessorTokenStatus>, errors::StorageError> {
|
||||
let redis_conn =
|
||||
state
|
||||
.store
|
||||
.get_redis_conn()
|
||||
.change_context(errors::StorageError::RedisError(
|
||||
errors::RedisError::RedisConnectionError.into(),
|
||||
))?;
|
||||
let tokens_key = format!("customer:{connector_customer_id}:tokens");
|
||||
|
||||
let get_hash_err =
|
||||
errors::StorageError::RedisError(errors::RedisError::GetHashFieldFailed.into());
|
||||
|
||||
let payment_processor_tokens: HashMap<String, String> = redis_conn
|
||||
.get_hash_fields(&tokens_key.into())
|
||||
.await
|
||||
.change_context(get_hash_err)?;
|
||||
|
||||
// build the result map using iterator adapters (explicit match preserved for logging)
|
||||
let payment_processor_token_info_map: HashMap<String, PaymentProcessorTokenStatus> =
|
||||
payment_processor_tokens
|
||||
.into_iter()
|
||||
.filter_map(|(token_id, token_data)| {
|
||||
match serde_json::from_str::<PaymentProcessorTokenStatus>(&token_data) {
|
||||
Ok(token_status) => Some((token_id, token_status)),
|
||||
Err(err) => {
|
||||
tracing::warn!(
|
||||
connector_customer_id = %connector_customer_id,
|
||||
token_id = %token_id,
|
||||
error = %err,
|
||||
"Failed to deserialize token data, skipping",
|
||||
);
|
||||
None
|
||||
}
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
tracing::debug!(
|
||||
connector_customer_id = connector_customer_id,
|
||||
"Fetched payment processor tokens",
|
||||
);
|
||||
|
||||
Ok(payment_processor_token_info_map)
|
||||
}
|
||||
|
||||
/// Update connector customer payment processor tokens or add if doesn't exist
|
||||
#[instrument(skip_all)]
|
||||
pub async fn update_or_add_connector_customer_payment_processor_tokens(
|
||||
state: &SessionState,
|
||||
connector_customer_id: &str,
|
||||
payment_processor_token_info_map: HashMap<String, PaymentProcessorTokenStatus>,
|
||||
) -> CustomResult<(), errors::StorageError> {
|
||||
let redis_conn =
|
||||
state
|
||||
.store
|
||||
.get_redis_conn()
|
||||
.change_context(errors::StorageError::RedisError(
|
||||
errors::RedisError::RedisConnectionError.into(),
|
||||
))?;
|
||||
let tokens_key = format!("customer:{connector_customer_id}:tokens");
|
||||
|
||||
// allocate capacity up-front to avoid rehashing
|
||||
let mut serialized_payment_processor_tokens: HashMap<String, String> =
|
||||
HashMap::with_capacity(payment_processor_token_info_map.len());
|
||||
|
||||
// serialize all tokens, preserving explicit error handling and attachable diagnostics
|
||||
for (payment_processor_token_id, payment_processor_token_status) in
|
||||
payment_processor_token_info_map
|
||||
{
|
||||
let serialized = serde_json::to_string(&payment_processor_token_status)
|
||||
.change_context(errors::StorageError::SerializationFailed)
|
||||
.attach_printable("Failed to serialize token status")?;
|
||||
|
||||
serialized_payment_processor_tokens.insert(payment_processor_token_id, serialized);
|
||||
}
|
||||
let seconds = &state.conf.revenue_recovery.redis_ttl_in_seconds;
|
||||
|
||||
// Update or add tokens
|
||||
redis_conn
|
||||
.set_hash_fields(
|
||||
&tokens_key.into(),
|
||||
serialized_payment_processor_tokens,
|
||||
Some(*seconds),
|
||||
)
|
||||
.await
|
||||
.change_context(errors::StorageError::RedisError(
|
||||
errors::RedisError::SetHashFieldFailed.into(),
|
||||
))?;
|
||||
|
||||
tracing::info!(
|
||||
connector_customer_id = %connector_customer_id,
|
||||
"Successfully updated or added customer tokens",
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Get current date in `yyyy-mm-dd` format.
|
||||
pub fn get_current_date() -> String {
|
||||
let today = date_time::now().date();
|
||||
|
||||
let (year, month, day) = (today.year(), today.month(), today.day());
|
||||
|
||||
format!("{year:04}-{month:02}-{day:02}",)
|
||||
}
|
||||
|
||||
/// Normalize retry window to exactly `RETRY_WINDOW_DAYS` days (today to `RETRY_WINDOW_DAYS - 1` days ago).
|
||||
pub fn normalize_retry_window(
|
||||
payment_processor_token: &mut PaymentProcessorTokenStatus,
|
||||
today: Date,
|
||||
) {
|
||||
let mut normalized_retry_history: HashMap<Date, i32> = HashMap::new();
|
||||
|
||||
for days_ago in 0..RETRY_WINDOW_DAYS {
|
||||
let date = today - Duration::days(days_ago.into());
|
||||
|
||||
payment_processor_token
|
||||
.daily_retry_history
|
||||
.get(&date)
|
||||
.map(|&retry_count| {
|
||||
normalized_retry_history.insert(date, retry_count);
|
||||
});
|
||||
}
|
||||
|
||||
payment_processor_token.daily_retry_history = normalized_retry_history;
|
||||
}
|
||||
|
||||
/// Get all payment processor tokens with retry information and wait times.
|
||||
pub fn get_tokens_with_retry_metadata(
|
||||
state: &SessionState,
|
||||
payment_processor_token_info_map: &HashMap<String, PaymentProcessorTokenStatus>,
|
||||
) -> HashMap<String, PaymentProcessorTokenWithRetryInfo> {
|
||||
let today = OffsetDateTime::now_utc().date();
|
||||
let card_config = &state.conf.revenue_recovery.card_config;
|
||||
|
||||
let mut result: HashMap<String, PaymentProcessorTokenWithRetryInfo> =
|
||||
HashMap::with_capacity(payment_processor_token_info_map.len());
|
||||
|
||||
for (payment_processor_token_id, payment_processor_token_status) in
|
||||
payment_processor_token_info_map.iter()
|
||||
{
|
||||
let card_network = payment_processor_token_status
|
||||
.payment_processor_token_details
|
||||
.card_network
|
||||
.clone();
|
||||
|
||||
// Calculate retry information.
|
||||
let retry_info = Self::payment_processor_token_retry_info(
|
||||
state,
|
||||
payment_processor_token_status,
|
||||
today,
|
||||
card_network.clone(),
|
||||
);
|
||||
|
||||
// Determine the wait time (max of monthly and daily wait hours).
|
||||
let retry_wait_time_hours = retry_info
|
||||
.monthly_wait_hours
|
||||
.max(retry_info.daily_wait_hours);
|
||||
|
||||
// Obtain network-specific limits and compute remaining monthly retries.
|
||||
let card_network_config = card_config.get_network_config(card_network);
|
||||
|
||||
let monthly_retry_remaining = card_network_config
|
||||
.max_retry_count_for_thirty_day
|
||||
.saturating_sub(retry_info.total_30_day_retries);
|
||||
|
||||
// Build the per-token result struct.
|
||||
let token_with_retry_info = PaymentProcessorTokenWithRetryInfo {
|
||||
token_status: payment_processor_token_status.clone(),
|
||||
retry_wait_time_hours,
|
||||
monthly_retry_remaining,
|
||||
};
|
||||
|
||||
result.insert(payment_processor_token_id.clone(), token_with_retry_info);
|
||||
}
|
||||
tracing::debug!("Fetched payment processor tokens with retry metadata",);
|
||||
|
||||
result
|
||||
}
|
||||
|
||||
/// Sum retries over exactly the last 30 days
|
||||
fn calculate_total_30_day_retries(token: &PaymentProcessorTokenStatus, today: Date) -> i32 {
|
||||
(0..RETRY_WINDOW_DAYS)
|
||||
.map(|i| {
|
||||
let date = today - Duration::days(i.into());
|
||||
token
|
||||
.daily_retry_history
|
||||
.get(&date)
|
||||
.copied()
|
||||
.unwrap_or(INITIAL_RETRY_COUNT)
|
||||
})
|
||||
.sum()
|
||||
}
|
||||
|
||||
/// Calculate wait hours
|
||||
fn calculate_wait_hours(target_date: Date, now: OffsetDateTime) -> i64 {
|
||||
let expiry_time = target_date.midnight().assume_utc();
|
||||
(expiry_time - now).whole_hours().max(0)
|
||||
}
|
||||
|
||||
/// Calculate retry counts for exactly the last 30 days
|
||||
pub fn payment_processor_token_retry_info(
|
||||
state: &SessionState,
|
||||
token: &PaymentProcessorTokenStatus,
|
||||
today: Date,
|
||||
network_type: Option<CardNetwork>,
|
||||
) -> TokenRetryInfo {
|
||||
let card_config = &state.conf.revenue_recovery.card_config;
|
||||
let card_network_config = card_config.get_network_config(network_type);
|
||||
|
||||
let now = OffsetDateTime::now_utc();
|
||||
|
||||
let total_30_day_retries = Self::calculate_total_30_day_retries(token, today);
|
||||
|
||||
let monthly_wait_hours =
|
||||
if total_30_day_retries >= card_network_config.max_retry_count_for_thirty_day {
|
||||
(0..RETRY_WINDOW_DAYS)
|
||||
.map(|i| today - Duration::days(i.into()))
|
||||
.find(|date| token.daily_retry_history.get(date).copied().unwrap_or(0) > 0)
|
||||
.map(|date| Self::calculate_wait_hours(date + Duration::days(31), now))
|
||||
.unwrap_or(0)
|
||||
} else {
|
||||
0
|
||||
};
|
||||
|
||||
let today_retries = token
|
||||
.daily_retry_history
|
||||
.get(&today)
|
||||
.copied()
|
||||
.unwrap_or(INITIAL_RETRY_COUNT);
|
||||
|
||||
let daily_wait_hours = if today_retries >= card_network_config.max_retries_per_day {
|
||||
Self::calculate_wait_hours(today + Duration::days(1), now)
|
||||
} else {
|
||||
0
|
||||
};
|
||||
|
||||
TokenRetryInfo {
|
||||
monthly_wait_hours,
|
||||
daily_wait_hours,
|
||||
total_30_day_retries,
|
||||
}
|
||||
}
|
||||
|
||||
// Upsert payment processor token
|
||||
#[instrument(skip_all)]
|
||||
pub async fn upsert_payment_processor_token(
|
||||
state: &SessionState,
|
||||
connector_customer_id: &str,
|
||||
token_data: PaymentProcessorTokenStatus,
|
||||
) -> CustomResult<bool, errors::StorageError> {
|
||||
let mut token_map =
|
||||
Self::get_connector_customer_payment_processor_tokens(state, connector_customer_id)
|
||||
.await?;
|
||||
|
||||
let token_id = token_data
|
||||
.payment_processor_token_details
|
||||
.payment_processor_token
|
||||
.clone();
|
||||
|
||||
let was_existing = token_map.contains_key(&token_id);
|
||||
|
||||
let error_code = token_data.error_code.clone();
|
||||
let today = OffsetDateTime::now_utc().date();
|
||||
|
||||
token_map
|
||||
.get_mut(&token_id)
|
||||
.map(|existing_token| {
|
||||
error_code.map(|err| existing_token.error_code = Some(err));
|
||||
|
||||
Self::normalize_retry_window(existing_token, today);
|
||||
|
||||
for (date, &value) in &token_data.daily_retry_history {
|
||||
existing_token
|
||||
.daily_retry_history
|
||||
.entry(*date)
|
||||
.and_modify(|v| *v += value)
|
||||
.or_insert(value);
|
||||
}
|
||||
})
|
||||
.or_else(|| {
|
||||
token_map.insert(token_id.clone(), token_data);
|
||||
None
|
||||
});
|
||||
|
||||
Self::update_or_add_connector_customer_payment_processor_tokens(
|
||||
state,
|
||||
connector_customer_id,
|
||||
token_map,
|
||||
)
|
||||
.await?;
|
||||
tracing::debug!(
|
||||
connector_customer_id = connector_customer_id,
|
||||
"Upsert payment processor tokens",
|
||||
);
|
||||
|
||||
Ok(!was_existing)
|
||||
}
|
||||
|
||||
// Update payment processor token error code with billing connector response
|
||||
#[instrument(skip_all)]
|
||||
pub async fn update_payment_processor_token_error_code_from_process_tracker(
|
||||
state: &SessionState,
|
||||
connector_customer_id: &str,
|
||||
error_code: &Option<String>,
|
||||
is_hard_decline: &Option<bool>,
|
||||
) -> CustomResult<bool, errors::StorageError> {
|
||||
let today = OffsetDateTime::now_utc().date();
|
||||
let updated_token =
|
||||
Self::get_connector_customer_payment_processor_tokens(state, connector_customer_id)
|
||||
.await?
|
||||
.values()
|
||||
.find(|status| status.scheduled_at.is_some())
|
||||
.map(|status| PaymentProcessorTokenStatus {
|
||||
payment_processor_token_details: status.payment_processor_token_details.clone(),
|
||||
inserted_by_attempt_id: status.inserted_by_attempt_id.clone(),
|
||||
error_code: error_code.clone(),
|
||||
daily_retry_history: status.daily_retry_history.clone(),
|
||||
scheduled_at: None,
|
||||
is_hard_decline: *is_hard_decline,
|
||||
});
|
||||
|
||||
match updated_token {
|
||||
Some(mut token) => {
|
||||
Self::normalize_retry_window(&mut token, today);
|
||||
|
||||
match token.error_code {
|
||||
None => token.daily_retry_history.clear(),
|
||||
Some(_) => {
|
||||
let current_count = token
|
||||
.daily_retry_history
|
||||
.get(&today)
|
||||
.copied()
|
||||
.unwrap_or(INITIAL_RETRY_COUNT);
|
||||
token.daily_retry_history.insert(today, current_count + 1);
|
||||
}
|
||||
}
|
||||
|
||||
let mut tokens_map = HashMap::new();
|
||||
tokens_map.insert(
|
||||
token
|
||||
.payment_processor_token_details
|
||||
.payment_processor_token
|
||||
.clone(),
|
||||
token.clone(),
|
||||
);
|
||||
|
||||
Self::update_or_add_connector_customer_payment_processor_tokens(
|
||||
state,
|
||||
connector_customer_id,
|
||||
tokens_map,
|
||||
)
|
||||
.await?;
|
||||
tracing::debug!(
|
||||
connector_customer_id = connector_customer_id,
|
||||
"Updated payment processor tokens with error code",
|
||||
);
|
||||
Ok(true)
|
||||
}
|
||||
None => {
|
||||
tracing::debug!(
|
||||
connector_customer_id = connector_customer_id,
|
||||
"No Token found with scheduled time to update error code",
|
||||
);
|
||||
Ok(false)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Update payment processor token schedule time
|
||||
#[instrument(skip_all)]
|
||||
pub async fn update_payment_processor_token_schedule_time(
|
||||
state: &SessionState,
|
||||
connector_customer_id: &str,
|
||||
payment_processor_token: &str,
|
||||
schedule_time: Option<PrimitiveDateTime>,
|
||||
) -> CustomResult<bool, errors::StorageError> {
|
||||
let updated_token =
|
||||
Self::get_connector_customer_payment_processor_tokens(state, connector_customer_id)
|
||||
.await?
|
||||
.values()
|
||||
.find(|status| {
|
||||
status
|
||||
.payment_processor_token_details
|
||||
.payment_processor_token
|
||||
== payment_processor_token
|
||||
})
|
||||
.map(|status| PaymentProcessorTokenStatus {
|
||||
payment_processor_token_details: status.payment_processor_token_details.clone(),
|
||||
inserted_by_attempt_id: status.inserted_by_attempt_id.clone(),
|
||||
error_code: status.error_code.clone(),
|
||||
daily_retry_history: status.daily_retry_history.clone(),
|
||||
scheduled_at: schedule_time,
|
||||
is_hard_decline: status.is_hard_decline,
|
||||
});
|
||||
|
||||
match updated_token {
|
||||
Some(token) => {
|
||||
let mut tokens_map = HashMap::new();
|
||||
tokens_map.insert(
|
||||
token
|
||||
.payment_processor_token_details
|
||||
.payment_processor_token
|
||||
.clone(),
|
||||
token.clone(),
|
||||
);
|
||||
Self::update_or_add_connector_customer_payment_processor_tokens(
|
||||
state,
|
||||
connector_customer_id,
|
||||
tokens_map,
|
||||
)
|
||||
.await?;
|
||||
tracing::debug!(
|
||||
connector_customer_id = connector_customer_id,
|
||||
"Updated payment processor tokens with schedule time",
|
||||
);
|
||||
Ok(true)
|
||||
}
|
||||
None => {
|
||||
tracing::debug!(
|
||||
connector_customer_id = connector_customer_id,
|
||||
"payment processor tokens with not found",
|
||||
);
|
||||
Ok(false)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Get payment processor token with schedule time
|
||||
#[instrument(skip_all)]
|
||||
pub async fn get_payment_processor_token_with_schedule_time(
|
||||
state: &SessionState,
|
||||
connector_customer_id: &str,
|
||||
) -> CustomResult<Option<PaymentProcessorTokenStatus>, errors::StorageError> {
|
||||
let tokens =
|
||||
Self::get_connector_customer_payment_processor_tokens(state, connector_customer_id)
|
||||
.await?;
|
||||
|
||||
let scheduled_token = tokens
|
||||
.values()
|
||||
.find(|status| status.scheduled_at.is_some())
|
||||
.cloned();
|
||||
|
||||
tracing::debug!(
|
||||
connector_customer_id = connector_customer_id,
|
||||
"Fetched payment processor token with schedule time",
|
||||
);
|
||||
|
||||
Ok(scheduled_token)
|
||||
}
|
||||
|
||||
// Get payment processor token with max retry remaining for cascading retry algorithm
|
||||
#[instrument(skip_all)]
|
||||
pub async fn get_token_with_max_retry_remaining(
|
||||
state: &SessionState,
|
||||
connector_customer_id: &str,
|
||||
) -> CustomResult<Option<PaymentProcessorTokenWithRetryInfo>, errors::StorageError> {
|
||||
// Get all tokens for the customer
|
||||
let tokens_map =
|
||||
Self::get_connector_customer_payment_processor_tokens(state, connector_customer_id)
|
||||
.await?;
|
||||
|
||||
// Tokens with retry metadata
|
||||
let tokens_with_retry = Self::get_tokens_with_retry_metadata(state, &tokens_map);
|
||||
|
||||
// Find the token with max retry remaining
|
||||
let max_retry_token = tokens_with_retry
|
||||
.into_iter()
|
||||
.filter(|(_, token_info)| !token_info.token_status.is_hard_decline.unwrap_or(false))
|
||||
.max_by_key(|(_, token_info)| token_info.monthly_retry_remaining)
|
||||
.map(|(_, token_info)| token_info);
|
||||
|
||||
tracing::debug!(
|
||||
connector_customer_id = connector_customer_id,
|
||||
"Fetched payment processor token with max retry remaining",
|
||||
);
|
||||
|
||||
Ok(max_retry_token)
|
||||
}
|
||||
|
||||
// Check if all tokens are hard declined or no token found for the customer
|
||||
#[instrument(skip_all)]
|
||||
pub async fn are_all_tokens_hard_declined(
|
||||
state: &SessionState,
|
||||
connector_customer_id: &str,
|
||||
) -> CustomResult<bool, errors::StorageError> {
|
||||
let tokens_map =
|
||||
Self::get_connector_customer_payment_processor_tokens(state, connector_customer_id)
|
||||
.await?;
|
||||
let all_hard_declined = tokens_map.is_empty()
|
||||
&& tokens_map
|
||||
.values()
|
||||
.all(|token| token.is_hard_decline.unwrap_or(false));
|
||||
|
||||
tracing::debug!(
|
||||
connector_customer_id = connector_customer_id,
|
||||
all_hard_declined,
|
||||
"Checked if all tokens are hard declined or no token found for the customer",
|
||||
);
|
||||
|
||||
Ok(all_hard_declined)
|
||||
}
|
||||
}
|
||||
@ -1,14 +1,19 @@
|
||||
#[cfg(feature = "v2")]
|
||||
use api_models::payments::PaymentsGetIntentRequest;
|
||||
use std::collections::HashMap;
|
||||
|
||||
#[cfg(feature = "v2")]
|
||||
use api_models::{enums::RevenueRecoveryAlgorithmType, payments::PaymentsGetIntentRequest};
|
||||
#[cfg(feature = "v2")]
|
||||
use common_utils::{
|
||||
errors::CustomResult,
|
||||
ext_traits::AsyncExt,
|
||||
ext_traits::{StringExt, ValueExt},
|
||||
id_type,
|
||||
};
|
||||
#[cfg(feature = "v2")]
|
||||
use diesel_models::types::BillingConnectorPaymentMethodDetails;
|
||||
#[cfg(feature = "v2")]
|
||||
use error_stack::ResultExt;
|
||||
use error_stack::{Report, ResultExt};
|
||||
#[cfg(all(feature = "revenue_recovery", feature = "v2"))]
|
||||
use external_services::{
|
||||
date_time, grpc_client::revenue_recovery::recovery_decider_client as external_grpc_client,
|
||||
@ -16,21 +21,37 @@ use external_services::{
|
||||
#[cfg(feature = "v2")]
|
||||
use hyperswitch_domain_models::{
|
||||
payment_method_data::PaymentMethodData,
|
||||
payments::{
|
||||
payment_attempt::PaymentAttempt, PaymentConfirmData, PaymentIntent, PaymentIntentData,
|
||||
},
|
||||
payments::{payment_attempt, PaymentConfirmData, PaymentIntent, PaymentIntentData},
|
||||
router_flow_types,
|
||||
router_flow_types::Authorize,
|
||||
};
|
||||
#[cfg(feature = "v2")]
|
||||
use masking::{ExposeInterface, PeekInterface, Secret};
|
||||
#[cfg(feature = "v2")]
|
||||
use router_env::logger;
|
||||
use router_env::{logger, tracing};
|
||||
use scheduler::{consumer::workflows::ProcessTrackerWorkflow, errors};
|
||||
#[cfg(feature = "v2")]
|
||||
use scheduler::{types::process_data, utils as scheduler_utils};
|
||||
#[cfg(feature = "v2")]
|
||||
use storage_impl::errors as storage_errors;
|
||||
#[cfg(feature = "v2")]
|
||||
use time::Date;
|
||||
|
||||
#[cfg(feature = "v2")]
|
||||
use crate::core::payments::operations;
|
||||
#[cfg(feature = "v2")]
|
||||
use crate::routes::app::ReqState;
|
||||
#[cfg(feature = "v2")]
|
||||
use crate::services;
|
||||
#[cfg(feature = "v2")]
|
||||
use crate::types::storage::{
|
||||
revenue_recovery::RetryLimitsConfig,
|
||||
revenue_recovery_redis_operation::{
|
||||
PaymentProcessorTokenStatus, PaymentProcessorTokenWithRetryInfo, RedisTokenManager,
|
||||
},
|
||||
};
|
||||
#[cfg(feature = "v2")]
|
||||
use crate::workflows::revenue_recovery::pcr::api;
|
||||
#[cfg(feature = "v2")]
|
||||
use crate::{
|
||||
core::{
|
||||
@ -42,11 +63,16 @@ use crate::{
|
||||
types::{
|
||||
api::{self as api_types},
|
||||
domain,
|
||||
storage::revenue_recovery as pcr_storage_types,
|
||||
storage::{
|
||||
revenue_recovery as pcr_storage_types,
|
||||
revenue_recovery_redis_operation::PaymentProcessorTokenDetails,
|
||||
},
|
||||
},
|
||||
};
|
||||
use crate::{routes::SessionState, types::storage};
|
||||
pub struct ExecutePcrWorkflow;
|
||||
#[cfg(feature = "v2")]
|
||||
pub const REVENUE_RECOVERY: &str = "revenue_recovery";
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl ProcessTrackerWorkflow<SessionState> for ExecutePcrWorkflow {
|
||||
@ -218,21 +244,21 @@ pub(crate) async fn get_schedule_time_to_retry_mit_payments(
|
||||
#[cfg(feature = "v2")]
|
||||
pub(crate) async fn get_schedule_time_for_smart_retry(
|
||||
state: &SessionState,
|
||||
payment_attempt: &PaymentAttempt,
|
||||
payment_intent: &PaymentIntent,
|
||||
retry_count: i32,
|
||||
) -> Option<time::PrimitiveDateTime> {
|
||||
let first_error_message = match payment_attempt.error.as_ref() {
|
||||
Some(error) => error.message.clone(),
|
||||
None => {
|
||||
logger::error!(
|
||||
payment_intent_id = ?payment_intent.get_id(),
|
||||
attempt_id = ?payment_attempt.get_id(),
|
||||
"Payment attempt error information not found - cannot proceed with smart retry"
|
||||
);
|
||||
return None;
|
||||
}
|
||||
};
|
||||
retry_after_time: Option<prost_types::Timestamp>,
|
||||
token_with_retry_info: &PaymentProcessorTokenWithRetryInfo,
|
||||
) -> Result<Option<time::PrimitiveDateTime>, errors::ProcessTrackerError> {
|
||||
let card_config = &state.conf.revenue_recovery.card_config;
|
||||
|
||||
// Not populating it right now
|
||||
let first_error_message = "None".to_string();
|
||||
let retry_count_left = token_with_retry_info.monthly_retry_remaining;
|
||||
let pg_error_code = token_with_retry_info.token_status.error_code.clone();
|
||||
|
||||
let card_info = token_with_retry_info
|
||||
.token_status
|
||||
.payment_processor_token_details
|
||||
.clone();
|
||||
|
||||
let billing_state = payment_intent
|
||||
.billing_address
|
||||
@ -241,54 +267,19 @@ pub(crate) async fn get_schedule_time_for_smart_retry(
|
||||
.and_then(|details| details.state.as_ref())
|
||||
.cloned();
|
||||
|
||||
// Check if payment_method_data itself is None
|
||||
if payment_attempt.payment_method_data.is_none() {
|
||||
logger::debug!(
|
||||
payment_intent_id = ?payment_intent.get_id(),
|
||||
attempt_id = ?payment_attempt.get_id(),
|
||||
message = "payment_attempt.payment_method_data is None"
|
||||
);
|
||||
}
|
||||
|
||||
let billing_connector_payment_method_details = payment_intent
|
||||
.feature_metadata
|
||||
.as_ref()
|
||||
.and_then(|revenue_recovery_data| {
|
||||
revenue_recovery_data
|
||||
.payment_revenue_recovery_metadata
|
||||
.as_ref()
|
||||
})
|
||||
.and_then(|payment_metadata| {
|
||||
payment_metadata
|
||||
.billing_connector_payment_method_details
|
||||
.as_ref()
|
||||
});
|
||||
|
||||
let revenue_recovery_metadata = payment_intent
|
||||
.feature_metadata
|
||||
.as_ref()
|
||||
.and_then(|metadata| metadata.payment_revenue_recovery_metadata.as_ref());
|
||||
|
||||
let card_network_str = billing_connector_payment_method_details
|
||||
.and_then(|details| match details {
|
||||
BillingConnectorPaymentMethodDetails::Card(card_info) => card_info.card_network.clone(),
|
||||
})
|
||||
.map(|cn| cn.to_string());
|
||||
let card_network = card_info.card_network.clone();
|
||||
let total_retry_count_within_network = card_config.get_network_config(card_network.clone());
|
||||
|
||||
let card_issuer_str =
|
||||
billing_connector_payment_method_details.and_then(|details| match details {
|
||||
BillingConnectorPaymentMethodDetails::Card(card_info) => card_info.card_issuer.clone(),
|
||||
});
|
||||
let card_network_str = card_network.map(|network| network.to_string());
|
||||
|
||||
let card_funding_str = payment_intent
|
||||
.feature_metadata
|
||||
.as_ref()
|
||||
.and_then(|revenue_recovery_data| {
|
||||
revenue_recovery_data
|
||||
.payment_revenue_recovery_metadata
|
||||
.as_ref()
|
||||
})
|
||||
.map(|payment_metadata| payment_metadata.payment_method_subtype.to_string());
|
||||
let card_issuer_str = card_info.card_issuer.clone();
|
||||
|
||||
let card_funding_str = card_info.card_type.clone();
|
||||
|
||||
let start_time_primitive = payment_intent.created_at;
|
||||
let recovery_timestamp_config = &state.conf.revenue_recovery.recovery_timestamp;
|
||||
@ -296,6 +287,7 @@ pub(crate) async fn get_schedule_time_for_smart_retry(
|
||||
let modified_start_time_primitive = start_time_primitive.saturating_add(time::Duration::hours(
|
||||
recovery_timestamp_config.initial_timestamp_in_hours,
|
||||
));
|
||||
|
||||
let start_time_proto = date_time::convert_to_prost_timestamp(modified_start_time_primitive);
|
||||
|
||||
let merchant_id = Some(payment_intent.merchant_id.get_string_repr().to_string());
|
||||
@ -321,33 +313,6 @@ pub(crate) async fn get_schedule_time_for_smart_retry(
|
||||
.and_then(|details| details.city.as_ref())
|
||||
.cloned();
|
||||
|
||||
let attempt_currency = Some(payment_intent.amount_details.currency.to_string());
|
||||
let attempt_status = Some(payment_attempt.status.to_string());
|
||||
let attempt_amount = Some(
|
||||
payment_attempt
|
||||
.amount_details
|
||||
.get_net_amount()
|
||||
.get_amount_as_i64(),
|
||||
);
|
||||
let attempt_response_time = Some(date_time::convert_to_prost_timestamp(
|
||||
payment_attempt.created_at,
|
||||
));
|
||||
let payment_method_type = Some(payment_attempt.payment_method_type.to_string());
|
||||
let payment_gateway = payment_attempt.connector.clone();
|
||||
|
||||
let pg_error_code = payment_attempt
|
||||
.error
|
||||
.as_ref()
|
||||
.map(|error| error.code.clone());
|
||||
let network_advice_code = payment_attempt
|
||||
.error
|
||||
.as_ref()
|
||||
.and_then(|error| error.network_advice_code.clone());
|
||||
let network_error_code = payment_attempt
|
||||
.error
|
||||
.as_ref()
|
||||
.and_then(|error| error.network_decline_code.clone());
|
||||
|
||||
let first_pg_error_code = revenue_recovery_metadata
|
||||
.and_then(|metadata| metadata.first_payment_attempt_pg_error_code.clone());
|
||||
let first_network_advice_code = revenue_recovery_metadata
|
||||
@ -365,29 +330,37 @@ pub(crate) async fn get_schedule_time_for_smart_retry(
|
||||
card_funding: card_funding_str,
|
||||
card_network: card_network_str,
|
||||
card_issuer: card_issuer_str,
|
||||
invoice_start_time: start_time_proto,
|
||||
retry_count: Some(retry_count.into()),
|
||||
invoice_start_time: Some(start_time_proto),
|
||||
retry_count: Some(
|
||||
(total_retry_count_within_network.max_retry_count_for_thirty_day - retry_count_left)
|
||||
.into(),
|
||||
),
|
||||
merchant_id,
|
||||
invoice_amount,
|
||||
invoice_currency,
|
||||
invoice_due_date,
|
||||
billing_country,
|
||||
billing_city,
|
||||
attempt_currency,
|
||||
attempt_status,
|
||||
attempt_amount,
|
||||
attempt_currency: None,
|
||||
attempt_status: None,
|
||||
attempt_amount: None,
|
||||
pg_error_code,
|
||||
network_advice_code,
|
||||
network_error_code,
|
||||
network_advice_code: None,
|
||||
network_error_code: None,
|
||||
first_pg_error_code,
|
||||
first_network_advice_code,
|
||||
first_network_error_code,
|
||||
attempt_response_time,
|
||||
payment_method_type,
|
||||
payment_gateway,
|
||||
retry_count_left: None,
|
||||
attempt_response_time: None,
|
||||
payment_method_type: None,
|
||||
payment_gateway: None,
|
||||
retry_count_left: Some(retry_count_left.into()),
|
||||
total_retry_count_within_network: Some(
|
||||
total_retry_count_within_network
|
||||
.max_retry_count_for_thirty_day
|
||||
.into(),
|
||||
),
|
||||
first_error_msg_time: None,
|
||||
wait_time: None,
|
||||
wait_time: retry_after_time,
|
||||
};
|
||||
|
||||
if let Some(mut client) = state.grpc_client.recovery_decider_client.clone() {
|
||||
@ -395,7 +368,7 @@ pub(crate) async fn get_schedule_time_for_smart_retry(
|
||||
.decide_on_retry(decider_request.into(), state.get_recovery_grpc_headers())
|
||||
.await
|
||||
{
|
||||
Ok(grpc_response) => grpc_response
|
||||
Ok(grpc_response) => Ok(grpc_response
|
||||
.retry_flag
|
||||
.then_some(())
|
||||
.and(grpc_response.retry_time)
|
||||
@ -409,16 +382,16 @@ pub(crate) async fn get_schedule_time_for_smart_retry(
|
||||
None // If conversion fails, treat as no valid retry time
|
||||
}
|
||||
}
|
||||
}),
|
||||
})),
|
||||
|
||||
Err(e) => {
|
||||
logger::error!("Recovery decider gRPC call failed: {e:?}");
|
||||
None
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
logger::debug!("Recovery decider client is not configured");
|
||||
None
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
@ -430,7 +403,7 @@ struct InternalDeciderRequest {
|
||||
card_funding: Option<String>,
|
||||
card_network: Option<String>,
|
||||
card_issuer: Option<String>,
|
||||
invoice_start_time: prost_types::Timestamp,
|
||||
invoice_start_time: Option<prost_types::Timestamp>,
|
||||
retry_count: Option<i64>,
|
||||
merchant_id: Option<String>,
|
||||
invoice_amount: Option<i64>,
|
||||
@ -451,6 +424,7 @@ struct InternalDeciderRequest {
|
||||
payment_method_type: Option<String>,
|
||||
payment_gateway: Option<String>,
|
||||
retry_count_left: Option<i64>,
|
||||
total_retry_count_within_network: Option<i64>,
|
||||
first_error_msg_time: Option<prost_types::Timestamp>,
|
||||
wait_time: Option<prost_types::Timestamp>,
|
||||
}
|
||||
@ -464,7 +438,7 @@ impl From<InternalDeciderRequest> for external_grpc_client::DeciderRequest {
|
||||
card_funding: internal_request.card_funding,
|
||||
card_network: internal_request.card_network,
|
||||
card_issuer: internal_request.card_issuer,
|
||||
invoice_start_time: Some(internal_request.invoice_start_time),
|
||||
invoice_start_time: internal_request.invoice_start_time,
|
||||
retry_count: internal_request.retry_count,
|
||||
merchant_id: internal_request.merchant_id,
|
||||
invoice_amount: internal_request.invoice_amount,
|
||||
@ -485,8 +459,302 @@ impl From<InternalDeciderRequest> for external_grpc_client::DeciderRequest {
|
||||
payment_method_type: internal_request.payment_method_type,
|
||||
payment_gateway: internal_request.payment_gateway,
|
||||
retry_count_left: internal_request.retry_count_left,
|
||||
total_retry_count_within_network: internal_request.total_retry_count_within_network,
|
||||
first_error_msg_time: internal_request.first_error_msg_time,
|
||||
wait_time: internal_request.wait_time,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "v2")]
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct ScheduledToken {
|
||||
pub token_details: PaymentProcessorTokenDetails,
|
||||
pub schedule_time: time::PrimitiveDateTime,
|
||||
}
|
||||
|
||||
#[cfg(feature = "v2")]
|
||||
pub async fn get_token_with_schedule_time_based_on_retry_alogrithm_type(
|
||||
state: &SessionState,
|
||||
connector_customer_id: &str,
|
||||
payment_intent: &PaymentIntent,
|
||||
retry_algorithm_type: RevenueRecoveryAlgorithmType,
|
||||
retry_count: i32,
|
||||
) -> CustomResult<Option<time::PrimitiveDateTime>, errors::ProcessTrackerError> {
|
||||
let mut scheduled_time = None;
|
||||
|
||||
match retry_algorithm_type {
|
||||
RevenueRecoveryAlgorithmType::Monitoring => {
|
||||
logger::error!("Monitoring type found for Revenue Recovery retry payment");
|
||||
}
|
||||
|
||||
RevenueRecoveryAlgorithmType::Cascading => {
|
||||
let time = get_schedule_time_to_retry_mit_payments(
|
||||
state.store.as_ref(),
|
||||
&payment_intent.merchant_id,
|
||||
retry_count,
|
||||
)
|
||||
.await
|
||||
.ok_or(errors::ProcessTrackerError::EApiErrorResponse)?;
|
||||
|
||||
scheduled_time = Some(time);
|
||||
|
||||
let token =
|
||||
RedisTokenManager::get_token_with_max_retry_remaining(state, connector_customer_id)
|
||||
.await
|
||||
.change_context(errors::ProcessTrackerError::EApiErrorResponse)?;
|
||||
|
||||
match token {
|
||||
Some(token) => {
|
||||
RedisTokenManager::update_payment_processor_token_schedule_time(
|
||||
state,
|
||||
connector_customer_id,
|
||||
&token
|
||||
.token_status
|
||||
.payment_processor_token_details
|
||||
.payment_processor_token,
|
||||
scheduled_time,
|
||||
)
|
||||
.await
|
||||
.change_context(errors::ProcessTrackerError::EApiErrorResponse)?;
|
||||
|
||||
logger::debug!("PSP token available for cascading retry");
|
||||
}
|
||||
None => {
|
||||
logger::debug!("No PSP token available for cascading retry");
|
||||
scheduled_time = None;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
RevenueRecoveryAlgorithmType::Smart => {
|
||||
scheduled_time = get_best_psp_token_available_for_smart_retry(
|
||||
state,
|
||||
connector_customer_id,
|
||||
payment_intent,
|
||||
)
|
||||
.await
|
||||
.change_context(errors::ProcessTrackerError::EApiErrorResponse)?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(scheduled_time)
|
||||
}
|
||||
|
||||
#[cfg(feature = "v2")]
|
||||
pub async fn get_best_psp_token_available_for_smart_retry(
|
||||
state: &SessionState,
|
||||
connector_customer_id: &str,
|
||||
payment_intent: &PaymentIntent,
|
||||
) -> CustomResult<Option<time::PrimitiveDateTime>, errors::ProcessTrackerError> {
|
||||
// Lock using payment_id
|
||||
let locked = RedisTokenManager::lock_connector_customer_status(
|
||||
state,
|
||||
connector_customer_id,
|
||||
&payment_intent.id,
|
||||
)
|
||||
.await
|
||||
.change_context(errors::ProcessTrackerError::ERedisError(
|
||||
errors::RedisError::RedisConnectionError.into(),
|
||||
))?;
|
||||
|
||||
match !locked {
|
||||
true => Ok(None),
|
||||
|
||||
false => {
|
||||
// Get existing tokens from Redis
|
||||
let existing_tokens =
|
||||
RedisTokenManager::get_connector_customer_payment_processor_tokens(
|
||||
state,
|
||||
connector_customer_id,
|
||||
)
|
||||
.await
|
||||
.change_context(errors::ProcessTrackerError::ERedisError(
|
||||
errors::RedisError::RedisConnectionError.into(),
|
||||
))?;
|
||||
|
||||
// TODO: Insert into payment_intent_feature_metadata (DB operation)
|
||||
|
||||
let result = RedisTokenManager::get_tokens_with_retry_metadata(state, &existing_tokens);
|
||||
|
||||
let best_token_time = call_decider_for_payment_processor_tokens_select_closet_time(
|
||||
state,
|
||||
&result,
|
||||
payment_intent,
|
||||
connector_customer_id,
|
||||
)
|
||||
.await
|
||||
.change_context(errors::ProcessTrackerError::EApiErrorResponse)?;
|
||||
|
||||
Ok(best_token_time)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "v2")]
|
||||
pub async fn calculate_smart_retry_time(
|
||||
state: &SessionState,
|
||||
payment_intent: &PaymentIntent,
|
||||
token_with_retry_info: &PaymentProcessorTokenWithRetryInfo,
|
||||
) -> Result<Option<time::PrimitiveDateTime>, errors::ProcessTrackerError> {
|
||||
let wait_hours = token_with_retry_info.retry_wait_time_hours;
|
||||
let current_time = time::OffsetDateTime::now_utc();
|
||||
let future_time = current_time + time::Duration::hours(wait_hours);
|
||||
|
||||
// Timestamp after which retry can be done without penalty
|
||||
let future_timestamp = Some(prost_types::Timestamp {
|
||||
seconds: future_time.unix_timestamp(),
|
||||
nanos: 0,
|
||||
});
|
||||
|
||||
get_schedule_time_for_smart_retry(
|
||||
state,
|
||||
payment_intent,
|
||||
future_timestamp,
|
||||
token_with_retry_info,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
#[cfg(feature = "v2")]
|
||||
async fn process_token_for_retry(
|
||||
state: &SessionState,
|
||||
token_with_retry_info: &PaymentProcessorTokenWithRetryInfo,
|
||||
payment_intent: &PaymentIntent,
|
||||
) -> Result<Option<ScheduledToken>, errors::ProcessTrackerError> {
|
||||
let token_status: &PaymentProcessorTokenStatus = &token_with_retry_info.token_status;
|
||||
let inserted_by_attempt_id = &token_status.inserted_by_attempt_id;
|
||||
|
||||
let skip = token_status.is_hard_decline.unwrap_or(false);
|
||||
|
||||
match skip {
|
||||
true => {
|
||||
logger::info!(
|
||||
"Skipping decider call due to hard decline for attempt_id: {}",
|
||||
inserted_by_attempt_id.get_string_repr()
|
||||
);
|
||||
Ok(None)
|
||||
}
|
||||
false => {
|
||||
let schedule_time =
|
||||
calculate_smart_retry_time(state, payment_intent, token_with_retry_info).await?;
|
||||
Ok(schedule_time.map(|schedule_time| ScheduledToken {
|
||||
token_details: token_status.payment_processor_token_details.clone(),
|
||||
schedule_time,
|
||||
}))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "v2")]
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub async fn call_decider_for_payment_processor_tokens_select_closet_time(
|
||||
state: &SessionState,
|
||||
processor_tokens: &HashMap<String, PaymentProcessorTokenWithRetryInfo>,
|
||||
payment_intent: &PaymentIntent,
|
||||
connector_customer_id: &str,
|
||||
) -> CustomResult<Option<time::PrimitiveDateTime>, errors::ProcessTrackerError> {
|
||||
tracing::debug!("Filtered payment attempts based on payment tokens",);
|
||||
let mut tokens_with_schedule_time: Vec<ScheduledToken> = Vec::new();
|
||||
|
||||
for token_with_retry_info in processor_tokens.values() {
|
||||
let token_details = &token_with_retry_info
|
||||
.token_status
|
||||
.payment_processor_token_details;
|
||||
let error_code = token_with_retry_info.token_status.error_code.clone();
|
||||
|
||||
match error_code {
|
||||
None => {
|
||||
let utc_schedule_time =
|
||||
time::OffsetDateTime::now_utc() + time::Duration::minutes(1);
|
||||
let schedule_time = time::PrimitiveDateTime::new(
|
||||
utc_schedule_time.date(),
|
||||
utc_schedule_time.time(),
|
||||
);
|
||||
tokens_with_schedule_time = vec![ScheduledToken {
|
||||
token_details: token_details.clone(),
|
||||
schedule_time,
|
||||
}];
|
||||
tracing::debug!(
|
||||
"Found payment processor token with no error code scheduling it for {schedule_time}",
|
||||
);
|
||||
break;
|
||||
}
|
||||
Some(_) => {
|
||||
process_token_for_retry(state, token_with_retry_info, payment_intent)
|
||||
.await?
|
||||
.map(|token_with_schedule_time| {
|
||||
tokens_with_schedule_time.push(token_with_schedule_time)
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let best_token = tokens_with_schedule_time
|
||||
.iter()
|
||||
.min_by_key(|token| token.schedule_time)
|
||||
.cloned();
|
||||
|
||||
match best_token {
|
||||
None => {
|
||||
RedisTokenManager::unlock_connector_customer_status(state, connector_customer_id)
|
||||
.await
|
||||
.change_context(errors::ProcessTrackerError::EApiErrorResponse)?;
|
||||
tracing::debug!("No payment processor tokens available for scheduling");
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
Some(token) => {
|
||||
tracing::debug!("Found payment processor token with least schedule time");
|
||||
|
||||
RedisTokenManager::update_payment_processor_token_schedule_time(
|
||||
state,
|
||||
connector_customer_id,
|
||||
&token.token_details.payment_processor_token,
|
||||
Some(token.schedule_time),
|
||||
)
|
||||
.await
|
||||
.change_context(errors::ProcessTrackerError::EApiErrorResponse)?;
|
||||
|
||||
Ok(Some(token.schedule_time))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "v2")]
|
||||
pub async fn check_hard_decline(
|
||||
state: &SessionState,
|
||||
payment_attempt: &payment_attempt::PaymentAttempt,
|
||||
) -> Result<bool, error_stack::Report<storage_impl::errors::RecoveryError>> {
|
||||
let error_message = payment_attempt
|
||||
.error
|
||||
.as_ref()
|
||||
.map(|details| details.message.clone());
|
||||
|
||||
let error_code = payment_attempt
|
||||
.error
|
||||
.as_ref()
|
||||
.map(|details| details.code.clone());
|
||||
|
||||
let connector_name = payment_attempt
|
||||
.connector
|
||||
.clone()
|
||||
.ok_or(storage_impl::errors::RecoveryError::ValueNotFound)
|
||||
.attach_printable("unable to derive payment connector from payment attempt")?;
|
||||
|
||||
let gsm_record = payments::helpers::get_gsm_record(
|
||||
state,
|
||||
error_code,
|
||||
error_message,
|
||||
connector_name,
|
||||
REVENUE_RECOVERY.to_string(),
|
||||
)
|
||||
.await;
|
||||
|
||||
let is_hard_decline = gsm_record
|
||||
.and_then(|record| record.error_category)
|
||||
.map(|category| category == common_enums::ErrorCategory::HardDecline)
|
||||
.unwrap_or(false);
|
||||
|
||||
Ok(is_hard_decline)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user