mirror of
https://github.com/juspay/hyperswitch.git
synced 2025-10-30 17:47:54 +08:00
Merge branch 'main' of https://github.com/juspay/hyperswitch into ucs-authorizedotnet-fix
This commit is contained in:
@ -529,7 +529,7 @@ pub async fn perform_calculate_workflow(
|
|||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
// 2. Get best available token
|
// 2. Get best available token
|
||||||
let best_time_to_schedule =
|
let payment_processor_token_response =
|
||||||
match revenue_recovery_workflow::get_token_with_schedule_time_based_on_retry_algorithm_type(
|
match revenue_recovery_workflow::get_token_with_schedule_time_based_on_retry_algorithm_type(
|
||||||
state,
|
state,
|
||||||
&connector_customer_id,
|
&connector_customer_id,
|
||||||
@ -546,12 +546,14 @@ pub async fn perform_calculate_workflow(
|
|||||||
connector_customer_id = %connector_customer_id,
|
connector_customer_id = %connector_customer_id,
|
||||||
"Failed to get best PSP token"
|
"Failed to get best PSP token"
|
||||||
);
|
);
|
||||||
None
|
revenue_recovery_workflow::PaymentProcessorTokenResponse::None
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
match best_time_to_schedule {
|
match payment_processor_token_response {
|
||||||
Some(scheduled_time) => {
|
revenue_recovery_workflow::PaymentProcessorTokenResponse::ScheduledTime {
|
||||||
|
scheduled_time,
|
||||||
|
} => {
|
||||||
logger::info!(
|
logger::info!(
|
||||||
process_id = %process.id,
|
process_id = %process.id,
|
||||||
connector_customer_id = %connector_customer_id,
|
connector_customer_id = %connector_customer_id,
|
||||||
@ -602,29 +604,15 @@ pub async fn perform_calculate_workflow(
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
None => {
|
revenue_recovery_workflow::PaymentProcessorTokenResponse::NextAvailableTime {
|
||||||
let scheduled_token = match storage::revenue_recovery_redis_operation::
|
next_available_time,
|
||||||
RedisTokenManager::get_payment_processor_token_with_schedule_time(state, &connector_customer_id)
|
} => {
|
||||||
.await {
|
// Update scheduled time to next_available_time + Buffer
|
||||||
Ok(scheduled_token_opt) => scheduled_token_opt,
|
// here next_available_time is the wait time
|
||||||
Err(e) => {
|
|
||||||
logger::error!(
|
|
||||||
error = ?e,
|
|
||||||
connector_customer_id = %connector_customer_id,
|
|
||||||
"Failed to get PSP token status"
|
|
||||||
);
|
|
||||||
None
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
match scheduled_token {
|
|
||||||
Some(scheduled_token) => {
|
|
||||||
// Update scheduled time to scheduled time + 15 minutes
|
|
||||||
// here scheduled_time is the wait time 15 minutes is a buffer time that we are adding
|
|
||||||
logger::info!(
|
logger::info!(
|
||||||
process_id = %process.id,
|
process_id = %process.id,
|
||||||
connector_customer_id = %connector_customer_id,
|
connector_customer_id = %connector_customer_id,
|
||||||
"No token but time available, rescheduling for scheduled time + 15 mins"
|
"No token but time available, rescheduling for scheduled time "
|
||||||
);
|
);
|
||||||
|
|
||||||
update_calculate_job_schedule_time(
|
update_calculate_job_schedule_time(
|
||||||
@ -637,23 +625,13 @@ pub async fn perform_calculate_workflow(
|
|||||||
.recovery_timestamp
|
.recovery_timestamp
|
||||||
.job_schedule_buffer_time_in_seconds,
|
.job_schedule_buffer_time_in_seconds,
|
||||||
),
|
),
|
||||||
scheduled_token.scheduled_at,
|
Some(next_available_time),
|
||||||
&connector_customer_id,
|
&connector_customer_id,
|
||||||
|
retry_algorithm_type,
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
}
|
}
|
||||||
None => {
|
revenue_recovery_workflow::PaymentProcessorTokenResponse::None => {
|
||||||
let hard_decline_flag = storage::revenue_recovery_redis_operation::
|
|
||||||
RedisTokenManager::are_all_tokens_hard_declined(
|
|
||||||
state,
|
|
||||||
&connector_customer_id
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
.ok()
|
|
||||||
.unwrap_or(false);
|
|
||||||
|
|
||||||
match hard_decline_flag {
|
|
||||||
false => {
|
|
||||||
logger::info!(
|
logger::info!(
|
||||||
process_id = %process.id,
|
process_id = %process.id,
|
||||||
connector_customer_id = %connector_customer_id,
|
connector_customer_id = %connector_customer_id,
|
||||||
@ -672,15 +650,16 @@ pub async fn perform_calculate_workflow(
|
|||||||
),
|
),
|
||||||
Some(common_utils::date_time::now()),
|
Some(common_utils::date_time::now()),
|
||||||
&connector_customer_id,
|
&connector_customer_id,
|
||||||
|
retry_algorithm_type,
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
}
|
}
|
||||||
true => {
|
revenue_recovery_workflow::PaymentProcessorTokenResponse::HardDecline => {
|
||||||
// Finish calculate workflow with CALCULATE_WORKFLOW_FINISH
|
// Finish calculate workflow with CALCULATE_WORKFLOW_FINISH
|
||||||
logger::info!(
|
logger::info!(
|
||||||
process_id = %process.id,
|
process_id = %process.id,
|
||||||
connector_customer_id = %connector_customer_id,
|
connector_customer_id = %connector_customer_id,
|
||||||
"No token available, finishing CALCULATE_WORKFLOW"
|
"Token/Tokens is/are Hard decline, finishing CALCULATE_WORKFLOW"
|
||||||
);
|
);
|
||||||
|
|
||||||
db.as_scheduler()
|
db.as_scheduler()
|
||||||
@ -707,10 +686,6 @@ pub async fn perform_calculate_workflow(
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let _outgoing_webhook = event_type.and_then(|event_kind| {
|
let _outgoing_webhook = event_type.and_then(|event_kind| {
|
||||||
payments_response.map(|resp| Some((event_kind, resp)))
|
payments_response.map(|resp| Some((event_kind, resp)))
|
||||||
@ -749,6 +724,7 @@ async fn update_calculate_job_schedule_time(
|
|||||||
additional_time: time::Duration,
|
additional_time: time::Duration,
|
||||||
base_time: Option<time::PrimitiveDateTime>,
|
base_time: Option<time::PrimitiveDateTime>,
|
||||||
connector_customer_id: &str,
|
connector_customer_id: &str,
|
||||||
|
retry_algorithm_type: common_enums::RevenueRecoveryAlgorithmType,
|
||||||
) -> Result<(), sch_errors::ProcessTrackerError> {
|
) -> Result<(), sch_errors::ProcessTrackerError> {
|
||||||
let now = common_utils::date_time::now();
|
let now = common_utils::date_time::now();
|
||||||
|
|
||||||
@ -759,11 +735,22 @@ async fn update_calculate_job_schedule_time(
|
|||||||
connector_customer_id = %connector_customer_id,
|
connector_customer_id = %connector_customer_id,
|
||||||
"Rescheduling Calculate Job at "
|
"Rescheduling Calculate Job at "
|
||||||
);
|
);
|
||||||
|
let mut old_tracking_data: pcr::RevenueRecoveryWorkflowTrackingData =
|
||||||
|
serde_json::from_value(process.tracking_data.clone())
|
||||||
|
.change_context(errors::RecoveryError::ValueNotFound)
|
||||||
|
.attach_printable("Failed to deserialize the tracking data from process tracker")?;
|
||||||
|
|
||||||
|
old_tracking_data.revenue_recovery_retry = retry_algorithm_type;
|
||||||
|
|
||||||
|
let tracking_data = serde_json::to_value(old_tracking_data)
|
||||||
|
.change_context(errors::RecoveryError::ValueNotFound)
|
||||||
|
.attach_printable("Failed to serialize the tracking data for process tracker")?;
|
||||||
|
|
||||||
let pt_update = storage::ProcessTrackerUpdate::Update {
|
let pt_update = storage::ProcessTrackerUpdate::Update {
|
||||||
name: Some("CALCULATE_WORKFLOW".to_string()),
|
name: Some("CALCULATE_WORKFLOW".to_string()),
|
||||||
retry_count: Some(process.clone().retry_count),
|
retry_count: Some(process.clone().retry_count),
|
||||||
schedule_time: Some(new_schedule_time),
|
schedule_time: Some(new_schedule_time),
|
||||||
tracking_data: Some(process.clone().tracking_data),
|
tracking_data: Some(tracking_data),
|
||||||
business_status: Some(String::from(business_status::PENDING)),
|
business_status: Some(String::from(business_status::PENDING)),
|
||||||
status: Some(common_enums::ProcessTrackerStatus::Pending),
|
status: Some(common_enums::ProcessTrackerStatus::Pending),
|
||||||
updated_at: Some(common_utils::date_time::now()),
|
updated_at: Some(common_utils::date_time::now()),
|
||||||
|
|||||||
@ -176,8 +176,7 @@ impl RevenueRecoveryPaymentsAttemptStatus {
|
|||||||
state,
|
state,
|
||||||
&connector_customer_id,
|
&connector_customer_id,
|
||||||
&None,
|
&None,
|
||||||
// Since this is succeeded payment attempt, 'is_hard_decine' will be false.
|
&None,
|
||||||
&Some(false),
|
|
||||||
used_token.as_deref(),
|
used_token.as_deref(),
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
@ -493,19 +492,12 @@ impl Action {
|
|||||||
);
|
);
|
||||||
};
|
};
|
||||||
|
|
||||||
let is_hard_decline = revenue_recovery::check_hard_decline(
|
|
||||||
state,
|
|
||||||
&payment_data.payment_attempt,
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
.ok();
|
|
||||||
|
|
||||||
// update the status of token in redis
|
// update the status of token in redis
|
||||||
let _update_error_code = storage::revenue_recovery_redis_operation::RedisTokenManager::update_payment_processor_token_error_code_from_process_tracker(
|
let _update_error_code = storage::revenue_recovery_redis_operation::RedisTokenManager::update_payment_processor_token_error_code_from_process_tracker(
|
||||||
state,
|
state,
|
||||||
&connector_customer_id,
|
&connector_customer_id,
|
||||||
&None,
|
&None,
|
||||||
&is_hard_decline,
|
&None,
|
||||||
Some(&scheduled_token.payment_processor_token_details.payment_processor_token),
|
Some(&scheduled_token.payment_processor_token_details.payment_processor_token),
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
@ -659,7 +651,7 @@ impl Action {
|
|||||||
logger::info!(
|
logger::info!(
|
||||||
process_id = %process.id,
|
process_id = %process.id,
|
||||||
connector_customer_id = %connector_customer_id,
|
connector_customer_id = %connector_customer_id,
|
||||||
"No token available, finishing CALCULATE_WORKFLOW"
|
"No token available, finishing EXECUTE_WORKFLOW"
|
||||||
);
|
);
|
||||||
|
|
||||||
state
|
state
|
||||||
@ -671,12 +663,12 @@ impl Action {
|
|||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
.change_context(errors::RecoveryError::ProcessTrackerFailure)
|
.change_context(errors::RecoveryError::ProcessTrackerFailure)
|
||||||
.attach_printable("Failed to finish CALCULATE_WORKFLOW")?;
|
.attach_printable("Failed to finish EXECUTE_WORKFLOW")?;
|
||||||
|
|
||||||
logger::info!(
|
logger::info!(
|
||||||
process_id = %process.id,
|
process_id = %process.id,
|
||||||
connector_customer_id = %connector_customer_id,
|
connector_customer_id = %connector_customer_id,
|
||||||
"CALCULATE_WORKFLOW finished successfully"
|
"EXECUTE_WORKFLOW finished successfully"
|
||||||
);
|
);
|
||||||
Ok(Self::TerminalFailure(payment_attempt.clone()))
|
Ok(Self::TerminalFailure(payment_attempt.clone()))
|
||||||
}
|
}
|
||||||
@ -854,8 +846,7 @@ impl Action {
|
|||||||
state,
|
state,
|
||||||
&connector_customer_id,
|
&connector_customer_id,
|
||||||
&None,
|
&None,
|
||||||
// Since this is succeeded, 'hard_decine' will be false.
|
&None,
|
||||||
&Some(false),
|
|
||||||
used_token.as_deref(),
|
used_token.as_deref(),
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
@ -1144,9 +1135,19 @@ pub async fn reopen_calculate_workflow_on_payment_failure(
|
|||||||
.change_context(errors::RecoveryError::ValueNotFound)
|
.change_context(errors::RecoveryError::ValueNotFound)
|
||||||
.attach_printable("Failed to deserialize the tracking data from process tracker")?;
|
.attach_printable("Failed to deserialize the tracking data from process tracker")?;
|
||||||
|
|
||||||
|
let retry_algorithm_type = profile
|
||||||
|
.revenue_recovery_retry_algorithm_type
|
||||||
|
.filter(|retry_type| *retry_type != common_enums::RevenueRecoveryAlgorithmType::Monitoring) // ignore Monitoring
|
||||||
|
.unwrap_or(old_tracking_data.revenue_recovery_retry);
|
||||||
|
|
||||||
let new_tracking_data = pcr::RevenueRecoveryWorkflowTrackingData {
|
let new_tracking_data = pcr::RevenueRecoveryWorkflowTrackingData {
|
||||||
payment_attempt_id: latest_attempt_id.clone(),
|
payment_attempt_id: latest_attempt_id.clone(),
|
||||||
..old_tracking_data
|
revenue_recovery_retry: retry_algorithm_type,
|
||||||
|
merchant_id: old_tracking_data.merchant_id.clone(),
|
||||||
|
profile_id: old_tracking_data.profile_id.clone(),
|
||||||
|
global_payment_id: old_tracking_data.global_payment_id.clone(),
|
||||||
|
billing_mca_id: old_tracking_data.billing_mca_id.clone(),
|
||||||
|
invoice_scheduled_time: old_tracking_data.invoice_scheduled_time,
|
||||||
};
|
};
|
||||||
|
|
||||||
let tracking_data = serde_json::to_value(new_tracking_data)
|
let tracking_data = serde_json::to_value(new_tracking_data)
|
||||||
|
|||||||
@ -908,7 +908,7 @@ impl RevenueRecoveryAttempt {
|
|||||||
payment_connector_name: Option<common_enums::connector_enums::Connector>,
|
payment_connector_name: Option<common_enums::connector_enums::Connector>,
|
||||||
) -> CustomResult<(), errors::RevenueRecoveryError> {
|
) -> CustomResult<(), errors::RevenueRecoveryError> {
|
||||||
let revenue_recovery_attempt_data = &self.0;
|
let revenue_recovery_attempt_data = &self.0;
|
||||||
let error_code = revenue_recovery_attempt_data.error_code.clone();
|
let error_code = recovery_attempt.error_code.clone();
|
||||||
let error_message = revenue_recovery_attempt_data.error_message.clone();
|
let error_message = revenue_recovery_attempt_data.error_message.clone();
|
||||||
let connector_name = payment_connector_name
|
let connector_name = payment_connector_name
|
||||||
.ok_or(errors::RevenueRecoveryError::TransactionWebhookProcessingFailed)
|
.ok_or(errors::RevenueRecoveryError::TransactionWebhookProcessingFailed)
|
||||||
@ -939,6 +939,7 @@ impl RevenueRecoveryAttempt {
|
|||||||
daily_retry_history: HashMap::from([(recovery_attempt.created_at.date(), 1)]),
|
daily_retry_history: HashMap::from([(recovery_attempt.created_at.date(), 1)]),
|
||||||
scheduled_at: None,
|
scheduled_at: None,
|
||||||
is_hard_decline: Some(is_hard_decline),
|
is_hard_decline: Some(is_hard_decline),
|
||||||
|
modified_at: Some(recovery_attempt.created_at),
|
||||||
payment_processor_token_details: PaymentProcessorTokenDetails {
|
payment_processor_token_details: PaymentProcessorTokenDetails {
|
||||||
payment_processor_token: revenue_recovery_attempt_data
|
payment_processor_token: revenue_recovery_attempt_data
|
||||||
.processor_payment_method_token
|
.processor_payment_method_token
|
||||||
|
|||||||
@ -43,6 +43,8 @@ pub struct PaymentProcessorTokenStatus {
|
|||||||
pub scheduled_at: Option<PrimitiveDateTime>,
|
pub scheduled_at: Option<PrimitiveDateTime>,
|
||||||
/// Indicates if the token is a hard decline (no retries allowed)
|
/// Indicates if the token is a hard decline (no retries allowed)
|
||||||
pub is_hard_decline: Option<bool>,
|
pub is_hard_decline: Option<bool>,
|
||||||
|
/// Timestamp of the last modification to this token status
|
||||||
|
pub modified_at: Option<PrimitiveDateTime>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Token retry availability information with detailed wait times
|
/// Token retry availability information with detailed wait times
|
||||||
@ -415,11 +417,19 @@ impl RedisTokenManager {
|
|||||||
|
|
||||||
let monthly_wait_hours =
|
let monthly_wait_hours =
|
||||||
if total_30_day_retries >= card_network_config.max_retry_count_for_thirty_day {
|
if total_30_day_retries >= card_network_config.max_retry_count_for_thirty_day {
|
||||||
|
let mut accumulated_retries = 0;
|
||||||
|
|
||||||
|
// Iterate from most recent to oldest
|
||||||
(0..RETRY_WINDOW_DAYS)
|
(0..RETRY_WINDOW_DAYS)
|
||||||
.rev()
|
.map(|days_ago| today - Duration::days(days_ago.into()))
|
||||||
.map(|i| today - Duration::days(i.into()))
|
.find(|date| {
|
||||||
.find(|date| token.daily_retry_history.get(date).copied().unwrap_or(0) > 0)
|
let retries = token.daily_retry_history.get(date).copied().unwrap_or(0);
|
||||||
.map(|date| Self::calculate_wait_hours(date + Duration::days(31), now))
|
accumulated_retries += retries;
|
||||||
|
accumulated_retries >= card_network_config.max_retry_count_for_thirty_day
|
||||||
|
})
|
||||||
|
.map(|breach_date| {
|
||||||
|
Self::calculate_wait_hours(breach_date + Duration::days(31), now)
|
||||||
|
})
|
||||||
.unwrap_or(0)
|
.unwrap_or(0)
|
||||||
} else {
|
} else {
|
||||||
0
|
0
|
||||||
@ -463,13 +473,14 @@ impl RedisTokenManager {
|
|||||||
let was_existing = token_map.contains_key(&token_id);
|
let was_existing = token_map.contains_key(&token_id);
|
||||||
|
|
||||||
let error_code = token_data.error_code.clone();
|
let error_code = token_data.error_code.clone();
|
||||||
|
|
||||||
|
let modified_at = token_data.modified_at;
|
||||||
|
|
||||||
let today = OffsetDateTime::now_utc().date();
|
let today = OffsetDateTime::now_utc().date();
|
||||||
|
|
||||||
token_map
|
token_map
|
||||||
.get_mut(&token_id)
|
.get_mut(&token_id)
|
||||||
.map(|existing_token| {
|
.map(|existing_token| {
|
||||||
error_code.map(|err| existing_token.error_code = Some(err));
|
|
||||||
|
|
||||||
Self::normalize_retry_window(existing_token, today);
|
Self::normalize_retry_window(existing_token, today);
|
||||||
|
|
||||||
for (date, &value) in &token_data.daily_retry_history {
|
for (date, &value) in &token_data.daily_retry_history {
|
||||||
@ -479,6 +490,12 @@ impl RedisTokenManager {
|
|||||||
.and_modify(|v| *v += value)
|
.and_modify(|v| *v += value)
|
||||||
.or_insert(value);
|
.or_insert(value);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
(existing_token.modified_at < modified_at).then(|| {
|
||||||
|
existing_token.modified_at = modified_at;
|
||||||
|
error_code.map(|err| existing_token.error_code = Some(err));
|
||||||
|
existing_token.is_hard_decline = token_data.is_hard_decline;
|
||||||
|
});
|
||||||
})
|
})
|
||||||
.or_else(|| {
|
.or_else(|| {
|
||||||
token_map.insert(token_id.clone(), token_data);
|
token_map.insert(token_id.clone(), token_data);
|
||||||
@ -529,6 +546,10 @@ impl RedisTokenManager {
|
|||||||
daily_retry_history: status.daily_retry_history.clone(),
|
daily_retry_history: status.daily_retry_history.clone(),
|
||||||
scheduled_at: None,
|
scheduled_at: None,
|
||||||
is_hard_decline: *is_hard_decline,
|
is_hard_decline: *is_hard_decline,
|
||||||
|
modified_at: Some(PrimitiveDateTime::new(
|
||||||
|
OffsetDateTime::now_utc().date(),
|
||||||
|
OffsetDateTime::now_utc().time(),
|
||||||
|
)),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
None => None,
|
None => None,
|
||||||
@ -606,6 +627,10 @@ impl RedisTokenManager {
|
|||||||
daily_retry_history: status.daily_retry_history.clone(),
|
daily_retry_history: status.daily_retry_history.clone(),
|
||||||
scheduled_at: schedule_time,
|
scheduled_at: schedule_time,
|
||||||
is_hard_decline: status.is_hard_decline,
|
is_hard_decline: status.is_hard_decline,
|
||||||
|
modified_at: Some(PrimitiveDateTime::new(
|
||||||
|
OffsetDateTime::now_utc().date(),
|
||||||
|
OffsetDateTime::now_utc().time(),
|
||||||
|
)),
|
||||||
});
|
});
|
||||||
|
|
||||||
match updated_token {
|
match updated_token {
|
||||||
@ -905,6 +930,11 @@ impl RedisTokenManager {
|
|||||||
.unwrap_or(Some(existing_scheduled_at)) // No cutoff provided, keep existing value
|
.unwrap_or(Some(existing_scheduled_at)) // No cutoff provided, keep existing value
|
||||||
});
|
});
|
||||||
|
|
||||||
|
existing_token.modified_at = Some(PrimitiveDateTime::new(
|
||||||
|
OffsetDateTime::now_utc().date(),
|
||||||
|
OffsetDateTime::now_utc().time(),
|
||||||
|
));
|
||||||
|
|
||||||
// Save the updated token map back to Redis
|
// Save the updated token map back to Redis
|
||||||
Self::update_or_add_connector_customer_payment_processor_tokens(
|
Self::update_or_add_connector_customer_payment_processor_tokens(
|
||||||
state,
|
state,
|
||||||
@ -920,4 +950,16 @@ impl RedisTokenManager {
|
|||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
pub async fn get_payment_processor_metadata_for_connector_customer(
|
||||||
|
state: &SessionState,
|
||||||
|
customer_id: &str,
|
||||||
|
) -> CustomResult<HashMap<String, PaymentProcessorTokenWithRetryInfo>, errors::StorageError>
|
||||||
|
{
|
||||||
|
let token_map =
|
||||||
|
Self::get_connector_customer_payment_processor_tokens(state, customer_id).await?;
|
||||||
|
|
||||||
|
let token_data = Self::get_tokens_with_retry_metadata(state, &token_map);
|
||||||
|
|
||||||
|
Ok(token_data)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -329,7 +329,7 @@ pub async fn recovery_retry_sync_task(
|
|||||||
|
|
||||||
connector_customer_id
|
connector_customer_id
|
||||||
.async_map(|id| async move {
|
.async_map(|id| async move {
|
||||||
let _ = update_token_expiry_based_on_schedule_time(state, &id, Some(s_time))
|
let _ = update_token_expiry_based_on_schedule_time(state, &id, s_time)
|
||||||
.await
|
.await
|
||||||
.map_err(|e| {
|
.map_err(|e| {
|
||||||
logger::error!(
|
logger::error!(
|
||||||
|
|||||||
@ -525,7 +525,7 @@ pub fn calculate_difference_in_seconds(scheduled_time: time::PrimitiveDateTime)
|
|||||||
pub async fn update_token_expiry_based_on_schedule_time(
|
pub async fn update_token_expiry_based_on_schedule_time(
|
||||||
state: &SessionState,
|
state: &SessionState,
|
||||||
connector_customer_id: &str,
|
connector_customer_id: &str,
|
||||||
delayed_schedule_time: Option<time::PrimitiveDateTime>,
|
delayed_schedule_time: time::PrimitiveDateTime,
|
||||||
) -> CustomResult<(), errors::ProcessTrackerError> {
|
) -> CustomResult<(), errors::ProcessTrackerError> {
|
||||||
let expiry_buffer = state
|
let expiry_buffer = state
|
||||||
.conf
|
.conf
|
||||||
@ -533,9 +533,7 @@ pub async fn update_token_expiry_based_on_schedule_time(
|
|||||||
.recovery_timestamp
|
.recovery_timestamp
|
||||||
.redis_ttl_buffer_in_seconds;
|
.redis_ttl_buffer_in_seconds;
|
||||||
|
|
||||||
delayed_schedule_time
|
let expiry_time = calculate_difference_in_seconds(delayed_schedule_time) + expiry_buffer;
|
||||||
.async_map(|t| async move {
|
|
||||||
let expiry_time = calculate_difference_in_seconds(t) + expiry_buffer;
|
|
||||||
RedisTokenManager::update_connector_customer_lock_ttl(
|
RedisTokenManager::update_connector_customer_lock_ttl(
|
||||||
state,
|
state,
|
||||||
connector_customer_id,
|
connector_customer_id,
|
||||||
@ -544,14 +542,31 @@ pub async fn update_token_expiry_based_on_schedule_time(
|
|||||||
.await
|
.await
|
||||||
.change_context(errors::ProcessTrackerError::ERedisError(
|
.change_context(errors::ProcessTrackerError::ERedisError(
|
||||||
errors::RedisError::RedisConnectionError.into(),
|
errors::RedisError::RedisConnectionError.into(),
|
||||||
))
|
));
|
||||||
})
|
|
||||||
.await
|
|
||||||
.transpose()?;
|
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "v2")]
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub enum PaymentProcessorTokenResponse {
|
||||||
|
/// Token HardDecline
|
||||||
|
HardDecline,
|
||||||
|
|
||||||
|
/// Token can be retried at this specific time
|
||||||
|
ScheduledTime {
|
||||||
|
scheduled_time: time::PrimitiveDateTime,
|
||||||
|
},
|
||||||
|
|
||||||
|
/// Token locked or unavailable, next attempt possible
|
||||||
|
NextAvailableTime {
|
||||||
|
next_available_time: time::PrimitiveDateTime,
|
||||||
|
},
|
||||||
|
|
||||||
|
/// No retry info available / nothing to do yet
|
||||||
|
None,
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(feature = "v2")]
|
#[cfg(feature = "v2")]
|
||||||
pub async fn get_token_with_schedule_time_based_on_retry_algorithm_type(
|
pub async fn get_token_with_schedule_time_based_on_retry_algorithm_type(
|
||||||
state: &SessionState,
|
state: &SessionState,
|
||||||
@ -559,9 +574,8 @@ pub async fn get_token_with_schedule_time_based_on_retry_algorithm_type(
|
|||||||
payment_intent: &PaymentIntent,
|
payment_intent: &PaymentIntent,
|
||||||
retry_algorithm_type: RevenueRecoveryAlgorithmType,
|
retry_algorithm_type: RevenueRecoveryAlgorithmType,
|
||||||
retry_count: i32,
|
retry_count: i32,
|
||||||
) -> CustomResult<Option<time::PrimitiveDateTime>, errors::ProcessTrackerError> {
|
) -> CustomResult<PaymentProcessorTokenResponse, errors::ProcessTrackerError> {
|
||||||
let mut scheduled_time = None;
|
let mut payment_processor_token_response = PaymentProcessorTokenResponse::None;
|
||||||
|
|
||||||
match retry_algorithm_type {
|
match retry_algorithm_type {
|
||||||
RevenueRecoveryAlgorithmType::Monitoring => {
|
RevenueRecoveryAlgorithmType::Monitoring => {
|
||||||
logger::error!("Monitoring type found for Revenue Recovery retry payment");
|
logger::error!("Monitoring type found for Revenue Recovery retry payment");
|
||||||
@ -576,11 +590,67 @@ pub async fn get_token_with_schedule_time_based_on_retry_algorithm_type(
|
|||||||
.await
|
.await
|
||||||
.ok_or(errors::ProcessTrackerError::EApiErrorResponse)?;
|
.ok_or(errors::ProcessTrackerError::EApiErrorResponse)?;
|
||||||
|
|
||||||
scheduled_time = Some(time);
|
let payment_processor_token = payment_intent
|
||||||
|
.feature_metadata
|
||||||
|
.as_ref()
|
||||||
|
.and_then(|metadata| metadata.payment_revenue_recovery_metadata.as_ref())
|
||||||
|
.map(|recovery_metadata| {
|
||||||
|
recovery_metadata
|
||||||
|
.billing_connector_payment_details
|
||||||
|
.payment_processor_token
|
||||||
|
.clone()
|
||||||
|
});
|
||||||
|
|
||||||
|
let payment_processor_tokens_details =
|
||||||
|
RedisTokenManager::get_payment_processor_metadata_for_connector_customer(
|
||||||
|
state,
|
||||||
|
connector_customer_id,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.change_context(errors::ProcessTrackerError::ERedisError(
|
||||||
|
errors::RedisError::RedisConnectionError.into(),
|
||||||
|
))?;
|
||||||
|
|
||||||
|
// Get the token info from redis
|
||||||
|
let payment_processor_tokens_details_with_retry_info = payment_processor_token
|
||||||
|
.as_ref()
|
||||||
|
.and_then(|t| payment_processor_tokens_details.get(t));
|
||||||
|
|
||||||
|
// If payment_processor_tokens_details_with_retry_info is None, then no schedule time
|
||||||
|
match payment_processor_tokens_details_with_retry_info {
|
||||||
|
None => {
|
||||||
|
payment_processor_token_response = PaymentProcessorTokenResponse::None;
|
||||||
|
logger::debug!("No payment processor token found for cascading retry");
|
||||||
|
}
|
||||||
|
Some(payment_token) => {
|
||||||
|
if payment_token.token_status.is_hard_decline.unwrap_or(false) {
|
||||||
|
payment_processor_token_response =
|
||||||
|
PaymentProcessorTokenResponse::HardDecline;
|
||||||
|
} else if payment_token.retry_wait_time_hours > 0 {
|
||||||
|
let utc_schedule_time: time::OffsetDateTime =
|
||||||
|
time::OffsetDateTime::now_utc()
|
||||||
|
+ time::Duration::hours(payment_token.retry_wait_time_hours);
|
||||||
|
let next_available_time = time::PrimitiveDateTime::new(
|
||||||
|
utc_schedule_time.date(),
|
||||||
|
utc_schedule_time.time(),
|
||||||
|
);
|
||||||
|
|
||||||
|
payment_processor_token_response =
|
||||||
|
PaymentProcessorTokenResponse::NextAvailableTime {
|
||||||
|
next_available_time,
|
||||||
|
};
|
||||||
|
} else {
|
||||||
|
payment_processor_token_response =
|
||||||
|
PaymentProcessorTokenResponse::ScheduledTime {
|
||||||
|
scheduled_time: time,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
RevenueRecoveryAlgorithmType::Smart => {
|
RevenueRecoveryAlgorithmType::Smart => {
|
||||||
scheduled_time = get_best_psp_token_available_for_smart_retry(
|
payment_processor_token_response = get_best_psp_token_available_for_smart_retry(
|
||||||
state,
|
state,
|
||||||
connector_customer_id,
|
connector_customer_id,
|
||||||
payment_intent,
|
payment_intent,
|
||||||
@ -589,17 +659,40 @@ pub async fn get_token_with_schedule_time_based_on_retry_algorithm_type(
|
|||||||
.change_context(errors::ProcessTrackerError::EApiErrorResponse)?;
|
.change_context(errors::ProcessTrackerError::EApiErrorResponse)?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
let delayed_schedule_time =
|
|
||||||
scheduled_time.map(|time| add_random_delay_to_schedule_time(state, time));
|
|
||||||
|
|
||||||
let _ = update_token_expiry_based_on_schedule_time(
|
match &mut payment_processor_token_response {
|
||||||
|
PaymentProcessorTokenResponse::HardDecline => {
|
||||||
|
logger::debug!("Token is hard declined");
|
||||||
|
}
|
||||||
|
|
||||||
|
PaymentProcessorTokenResponse::ScheduledTime { scheduled_time } => {
|
||||||
|
// Add random delay to schedule time
|
||||||
|
*scheduled_time = add_random_delay_to_schedule_time(state, *scheduled_time);
|
||||||
|
|
||||||
|
// Log the scheduled retry time at debug level
|
||||||
|
logger::info!("Retry scheduled at {:?}", scheduled_time);
|
||||||
|
|
||||||
|
// Update token expiry based on schedule time
|
||||||
|
update_token_expiry_based_on_schedule_time(
|
||||||
state,
|
state,
|
||||||
connector_customer_id,
|
connector_customer_id,
|
||||||
delayed_schedule_time,
|
*scheduled_time,
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
|
}
|
||||||
|
|
||||||
Ok(delayed_schedule_time)
|
PaymentProcessorTokenResponse::NextAvailableTime {
|
||||||
|
next_available_time,
|
||||||
|
} => {
|
||||||
|
logger::info!("Next available retry at {:?}", next_available_time);
|
||||||
|
}
|
||||||
|
|
||||||
|
PaymentProcessorTokenResponse::None => {
|
||||||
|
logger::debug!("No retry info available");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(payment_processor_token_response)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(feature = "v2")]
|
#[cfg(feature = "v2")]
|
||||||
@ -607,7 +700,7 @@ pub async fn get_best_psp_token_available_for_smart_retry(
|
|||||||
state: &SessionState,
|
state: &SessionState,
|
||||||
connector_customer_id: &str,
|
connector_customer_id: &str,
|
||||||
payment_intent: &PaymentIntent,
|
payment_intent: &PaymentIntent,
|
||||||
) -> CustomResult<Option<time::PrimitiveDateTime>, errors::ProcessTrackerError> {
|
) -> CustomResult<PaymentProcessorTokenResponse, errors::ProcessTrackerError> {
|
||||||
// Lock using payment_id
|
// Lock using payment_id
|
||||||
let locked = RedisTokenManager::lock_connector_customer_status(
|
let locked = RedisTokenManager::lock_connector_customer_status(
|
||||||
state,
|
state,
|
||||||
@ -619,10 +712,48 @@ pub async fn get_best_psp_token_available_for_smart_retry(
|
|||||||
errors::RedisError::RedisConnectionError.into(),
|
errors::RedisError::RedisConnectionError.into(),
|
||||||
))?;
|
))?;
|
||||||
|
|
||||||
match !locked {
|
match locked {
|
||||||
true => Ok(None),
|
|
||||||
|
|
||||||
false => {
|
false => {
|
||||||
|
let token_details =
|
||||||
|
RedisTokenManager::get_payment_processor_metadata_for_connector_customer(
|
||||||
|
state,
|
||||||
|
connector_customer_id,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.change_context(errors::ProcessTrackerError::ERedisError(
|
||||||
|
errors::RedisError::RedisConnectionError.into(),
|
||||||
|
))?;
|
||||||
|
|
||||||
|
// Check token with schedule time in Redis
|
||||||
|
let token_info_with_schedule_time = token_details
|
||||||
|
.values()
|
||||||
|
.find(|info| info.token_status.scheduled_at.is_some());
|
||||||
|
|
||||||
|
// Check for hard decline if info is none
|
||||||
|
let hard_decline_status = token_details
|
||||||
|
.values()
|
||||||
|
.all(|token| token.token_status.is_hard_decline.unwrap_or(false));
|
||||||
|
|
||||||
|
let mut payment_processor_token_response = PaymentProcessorTokenResponse::None;
|
||||||
|
|
||||||
|
if hard_decline_status {
|
||||||
|
payment_processor_token_response = PaymentProcessorTokenResponse::HardDecline;
|
||||||
|
} else {
|
||||||
|
payment_processor_token_response = match token_info_with_schedule_time
|
||||||
|
.as_ref()
|
||||||
|
.and_then(|t| t.token_status.scheduled_at)
|
||||||
|
{
|
||||||
|
Some(scheduled_time) => PaymentProcessorTokenResponse::NextAvailableTime {
|
||||||
|
next_available_time: scheduled_time,
|
||||||
|
},
|
||||||
|
None => PaymentProcessorTokenResponse::None,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(payment_processor_token_response)
|
||||||
|
}
|
||||||
|
|
||||||
|
true => {
|
||||||
// Get existing tokens from Redis
|
// Get existing tokens from Redis
|
||||||
let existing_tokens =
|
let existing_tokens =
|
||||||
RedisTokenManager::get_connector_customer_payment_processor_tokens(
|
RedisTokenManager::get_connector_customer_payment_processor_tokens(
|
||||||
@ -634,11 +765,10 @@ pub async fn get_best_psp_token_available_for_smart_retry(
|
|||||||
errors::RedisError::RedisConnectionError.into(),
|
errors::RedisError::RedisConnectionError.into(),
|
||||||
))?;
|
))?;
|
||||||
|
|
||||||
// TODO: Insert into payment_intent_feature_metadata (DB operation)
|
|
||||||
|
|
||||||
let result = RedisTokenManager::get_tokens_with_retry_metadata(state, &existing_tokens);
|
let result = RedisTokenManager::get_tokens_with_retry_metadata(state, &existing_tokens);
|
||||||
|
|
||||||
let best_token_time = call_decider_for_payment_processor_tokens_select_closet_time(
|
let payment_processor_token_response =
|
||||||
|
call_decider_for_payment_processor_tokens_select_closest_time(
|
||||||
state,
|
state,
|
||||||
&result,
|
&result,
|
||||||
payment_intent,
|
payment_intent,
|
||||||
@ -647,7 +777,7 @@ pub async fn get_best_psp_token_available_for_smart_retry(
|
|||||||
.await
|
.await
|
||||||
.change_context(errors::ProcessTrackerError::EApiErrorResponse)?;
|
.change_context(errors::ProcessTrackerError::EApiErrorResponse)?;
|
||||||
|
|
||||||
Ok(best_token_time)
|
Ok(payment_processor_token_response)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -709,40 +839,42 @@ async fn process_token_for_retry(
|
|||||||
|
|
||||||
#[cfg(feature = "v2")]
|
#[cfg(feature = "v2")]
|
||||||
#[allow(clippy::too_many_arguments)]
|
#[allow(clippy::too_many_arguments)]
|
||||||
pub async fn call_decider_for_payment_processor_tokens_select_closet_time(
|
pub async fn call_decider_for_payment_processor_tokens_select_closest_time(
|
||||||
state: &SessionState,
|
state: &SessionState,
|
||||||
processor_tokens: &HashMap<String, PaymentProcessorTokenWithRetryInfo>,
|
processor_tokens: &HashMap<String, PaymentProcessorTokenWithRetryInfo>,
|
||||||
payment_intent: &PaymentIntent,
|
payment_intent: &PaymentIntent,
|
||||||
connector_customer_id: &str,
|
connector_customer_id: &str,
|
||||||
) -> CustomResult<Option<time::PrimitiveDateTime>, errors::ProcessTrackerError> {
|
) -> CustomResult<PaymentProcessorTokenResponse, errors::ProcessTrackerError> {
|
||||||
tracing::debug!("Filtered payment attempts based on payment tokens",);
|
|
||||||
let mut tokens_with_schedule_time: Vec<ScheduledToken> = Vec::new();
|
let mut tokens_with_schedule_time: Vec<ScheduledToken> = Vec::new();
|
||||||
|
|
||||||
for token_with_retry_info in processor_tokens.values() {
|
// Check for successful token
|
||||||
|
let mut token_with_none_error_code = processor_tokens.values().find(|token| {
|
||||||
|
token.token_status.error_code.is_none()
|
||||||
|
&& !token.token_status.is_hard_decline.unwrap_or(false)
|
||||||
|
});
|
||||||
|
|
||||||
|
match token_with_none_error_code {
|
||||||
|
Some(token_with_retry_info) => {
|
||||||
let token_details = &token_with_retry_info
|
let token_details = &token_with_retry_info
|
||||||
.token_status
|
.token_status
|
||||||
.payment_processor_token_details;
|
.payment_processor_token_details;
|
||||||
let error_code = token_with_retry_info.token_status.error_code.clone();
|
|
||||||
|
|
||||||
match error_code {
|
let utc_schedule_time = time::OffsetDateTime::now_utc() + time::Duration::minutes(1);
|
||||||
None => {
|
let schedule_time =
|
||||||
let utc_schedule_time =
|
time::PrimitiveDateTime::new(utc_schedule_time.date(), utc_schedule_time.time());
|
||||||
time::OffsetDateTime::now_utc() + time::Duration::minutes(1);
|
|
||||||
|
|
||||||
let schedule_time = time::PrimitiveDateTime::new(
|
|
||||||
utc_schedule_time.date(),
|
|
||||||
utc_schedule_time.time(),
|
|
||||||
);
|
|
||||||
tokens_with_schedule_time = vec![ScheduledToken {
|
tokens_with_schedule_time = vec![ScheduledToken {
|
||||||
token_details: token_details.clone(),
|
token_details: token_details.clone(),
|
||||||
schedule_time,
|
schedule_time,
|
||||||
}];
|
}];
|
||||||
|
|
||||||
tracing::debug!(
|
tracing::debug!(
|
||||||
"Found payment processor token with no error code scheduling it for {schedule_time}",
|
"Found payment processor token with no error code, scheduling it for {schedule_time}",
|
||||||
);
|
);
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
Some(_) => {
|
|
||||||
|
None => {
|
||||||
|
for token_with_retry_info in processor_tokens.values() {
|
||||||
process_token_for_retry(state, token_with_retry_info, payment_intent)
|
process_token_for_retry(state, token_with_retry_info, payment_intent)
|
||||||
.await?
|
.await?
|
||||||
.map(|token_with_schedule_time| {
|
.map(|token_with_schedule_time| {
|
||||||
@ -757,13 +889,27 @@ pub async fn call_decider_for_payment_processor_tokens_select_closet_time(
|
|||||||
.min_by_key(|token| token.schedule_time)
|
.min_by_key(|token| token.schedule_time)
|
||||||
.cloned();
|
.cloned();
|
||||||
|
|
||||||
|
let mut payment_processor_token_response;
|
||||||
match best_token {
|
match best_token {
|
||||||
None => {
|
None => {
|
||||||
|
// No tokens available for scheduling, unlock the connector customer status
|
||||||
|
|
||||||
|
// Check if all tokens are hard declined
|
||||||
|
let hard_decline_status = processor_tokens
|
||||||
|
.values()
|
||||||
|
.all(|token| token.token_status.is_hard_decline.unwrap_or(false));
|
||||||
|
|
||||||
RedisTokenManager::unlock_connector_customer_status(state, connector_customer_id)
|
RedisTokenManager::unlock_connector_customer_status(state, connector_customer_id)
|
||||||
.await
|
.await
|
||||||
.change_context(errors::ProcessTrackerError::EApiErrorResponse)?;
|
.change_context(errors::ProcessTrackerError::EApiErrorResponse)?;
|
||||||
|
|
||||||
tracing::debug!("No payment processor tokens available for scheduling");
|
tracing::debug!("No payment processor tokens available for scheduling");
|
||||||
Ok(None)
|
|
||||||
|
if hard_decline_status {
|
||||||
|
payment_processor_token_response = PaymentProcessorTokenResponse::HardDecline;
|
||||||
|
} else {
|
||||||
|
payment_processor_token_response = PaymentProcessorTokenResponse::None;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Some(token) => {
|
Some(token) => {
|
||||||
@ -778,9 +924,12 @@ pub async fn call_decider_for_payment_processor_tokens_select_closet_time(
|
|||||||
.await
|
.await
|
||||||
.change_context(errors::ProcessTrackerError::EApiErrorResponse)?;
|
.change_context(errors::ProcessTrackerError::EApiErrorResponse)?;
|
||||||
|
|
||||||
Ok(Some(token.schedule_time))
|
payment_processor_token_response = PaymentProcessorTokenResponse::ScheduledTime {
|
||||||
|
scheduled_time: token.schedule_time,
|
||||||
|
};
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Ok(payment_processor_token_response)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(feature = "v2")]
|
#[cfg(feature = "v2")]
|
||||||
|
|||||||
Reference in New Issue
Block a user