mirror of
https://github.com/juspay/hyperswitch.git
synced 2025-10-29 00:49:42 +08:00
feat(revenue_recovery): Introducing new calculate job for card switching and invoice queueing (#8848)
Co-authored-by: Aniket Burman <aniket.burman@Aniket-Burman-JDXHW2PH34.local> Co-authored-by: hyperswitch-bot[bot] <148525504+hyperswitch-bot[bot]@users.noreply.github.com> Co-authored-by: Aniket Burman <93077964+aniketburman014@users.noreply.github.com> Co-authored-by: chikke srujan <121822803+srujanchikke@users.noreply.github.com>
This commit is contained in:
@ -486,6 +486,8 @@ pub enum RevenueRecoveryError {
|
||||
BillingConnectorPaymentsSyncFailed,
|
||||
#[error("Billing connector invoice sync call failed")]
|
||||
BillingConnectorInvoiceSyncFailed,
|
||||
#[error("Failed to fetch connector customer ID")]
|
||||
CustomerIdNotFound,
|
||||
#[error("Failed to get the retry count for payment intent")]
|
||||
RetryCountFetchFailed,
|
||||
#[error("Failed to get the billing threshold retry count")]
|
||||
|
||||
@ -1,35 +1,154 @@
|
||||
pub mod api;
|
||||
pub mod transformers;
|
||||
pub mod types;
|
||||
use api_models::{enums, process_tracker::revenue_recovery};
|
||||
use api_models::{enums, process_tracker::revenue_recovery, webhooks};
|
||||
use common_utils::{
|
||||
self,
|
||||
errors::CustomResult,
|
||||
ext_traits::{OptionExt, ValueExt},
|
||||
id_type,
|
||||
};
|
||||
use diesel_models::{enums as diesel_enum, process_tracker::business_status};
|
||||
use error_stack::{self, ResultExt};
|
||||
use hyperswitch_domain_models::{payments::PaymentIntent, ApiModelToDieselModelConvertor};
|
||||
use hyperswitch_domain_models::{
|
||||
payments::PaymentIntent, revenue_recovery as domain_revenue_recovery,
|
||||
ApiModelToDieselModelConvertor,
|
||||
};
|
||||
use scheduler::errors as sch_errors;
|
||||
|
||||
use crate::{
|
||||
core::errors::{self, RouterResponse, RouterResult, StorageErrorExt},
|
||||
db::StorageInterface,
|
||||
logger,
|
||||
routes::{metrics, SessionState},
|
||||
routes::{app::ReqState, metrics, SessionState},
|
||||
services::ApplicationResponse,
|
||||
types::{
|
||||
domain,
|
||||
storage::{self, revenue_recovery as pcr},
|
||||
transformers::ForeignInto,
|
||||
},
|
||||
workflows,
|
||||
};
|
||||
|
||||
pub const EXECUTE_WORKFLOW: &str = "EXECUTE_WORKFLOW";
|
||||
pub const PSYNC_WORKFLOW: &str = "PSYNC_WORKFLOW";
|
||||
pub const CALCULATE_WORKFLOW: &str = "CALCULATE_WORKFLOW";
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub async fn upsert_calculate_pcr_task(
|
||||
billing_connector_account: &domain::MerchantConnectorAccount,
|
||||
state: &SessionState,
|
||||
merchant_context: &domain::MerchantContext,
|
||||
recovery_intent_from_payment_intent: &domain_revenue_recovery::RecoveryPaymentIntent,
|
||||
business_profile: &domain::Profile,
|
||||
intent_retry_count: u16,
|
||||
payment_attempt_id: Option<id_type::GlobalAttemptId>,
|
||||
runner: storage::ProcessTrackerRunner,
|
||||
revenue_recovery_retry: diesel_enum::RevenueRecoveryAlgorithmType,
|
||||
) -> CustomResult<webhooks::WebhookResponseTracker, errors::RevenueRecoveryError> {
|
||||
router_env::logger::info!("Starting calculate_job...");
|
||||
|
||||
let task = "CALCULATE_WORKFLOW";
|
||||
|
||||
let db = &*state.store;
|
||||
let payment_id = &recovery_intent_from_payment_intent.payment_id;
|
||||
|
||||
// Create process tracker ID in the format: CALCULATE_WORKFLOW_{payment_intent_id}
|
||||
let process_tracker_id = format!("{runner}_{task}_{}", payment_id.get_string_repr());
|
||||
|
||||
// Set scheduled time to 1 hour from now
|
||||
let schedule_time = common_utils::date_time::now() + time::Duration::hours(1);
|
||||
|
||||
let payment_attempt_id = payment_attempt_id
|
||||
.ok_or(error_stack::report!(
|
||||
errors::RevenueRecoveryError::PaymentAttemptIdNotFound
|
||||
))
|
||||
.attach_printable("payment attempt id is required for calculate workflow tracking")?;
|
||||
|
||||
// Check if a process tracker entry already exists for this payment intent
|
||||
let existing_entry = db
|
||||
.as_scheduler()
|
||||
.find_process_by_id(&process_tracker_id)
|
||||
.await
|
||||
.change_context(errors::RevenueRecoveryError::ProcessTrackerResponseError)
|
||||
.attach_printable(
|
||||
"Failed to check for existing calculate workflow process tracker entry",
|
||||
)?;
|
||||
|
||||
match existing_entry {
|
||||
Some(existing_process) => {
|
||||
router_env::logger::error!(
|
||||
"Found existing CALCULATE_WORKFLOW task with id: {}",
|
||||
existing_process.id
|
||||
);
|
||||
}
|
||||
None => {
|
||||
// No entry exists - create a new one
|
||||
router_env::logger::info!(
|
||||
"No existing CALCULATE_WORKFLOW task found for payment_intent_id: {}, creating new entry scheduled for 1 hour from now",
|
||||
payment_id.get_string_repr()
|
||||
);
|
||||
|
||||
// Create tracking data
|
||||
let calculate_workflow_tracking_data = pcr::RevenueRecoveryWorkflowTrackingData {
|
||||
billing_mca_id: billing_connector_account.get_id(),
|
||||
global_payment_id: payment_id.clone(),
|
||||
merchant_id: merchant_context.get_merchant_account().get_id().to_owned(),
|
||||
profile_id: business_profile.get_id().to_owned(),
|
||||
payment_attempt_id,
|
||||
revenue_recovery_retry,
|
||||
invoice_scheduled_time: None,
|
||||
};
|
||||
|
||||
let tag = ["PCR"];
|
||||
let task = "CALCULATE_WORKFLOW";
|
||||
let runner = storage::ProcessTrackerRunner::PassiveRecoveryWorkflow;
|
||||
|
||||
let process_tracker_entry = storage::ProcessTrackerNew::new(
|
||||
process_tracker_id,
|
||||
task,
|
||||
runner,
|
||||
tag,
|
||||
calculate_workflow_tracking_data,
|
||||
Some(1),
|
||||
schedule_time,
|
||||
common_types::consts::API_VERSION,
|
||||
)
|
||||
.change_context(errors::RevenueRecoveryError::ProcessTrackerCreationError)
|
||||
.attach_printable("Failed to construct calculate workflow process tracker entry")?;
|
||||
|
||||
// Insert into process tracker with status New
|
||||
db.as_scheduler()
|
||||
.insert_process(process_tracker_entry)
|
||||
.await
|
||||
.change_context(errors::RevenueRecoveryError::ProcessTrackerResponseError)
|
||||
.attach_printable(
|
||||
"Failed to enter calculate workflow process_tracker_entry in DB",
|
||||
)?;
|
||||
|
||||
router_env::logger::info!(
|
||||
"Successfully created new CALCULATE_WORKFLOW task for payment_intent_id: {}",
|
||||
payment_id.get_string_repr()
|
||||
);
|
||||
|
||||
metrics::TASKS_ADDED_COUNT.add(
|
||||
1,
|
||||
router_env::metric_attributes!(("flow", "CalculateWorkflow")),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(webhooks::WebhookResponseTracker::Payment {
|
||||
payment_id: payment_id.clone(),
|
||||
status: recovery_intent_from_payment_intent.status,
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn perform_execute_payment(
|
||||
state: &SessionState,
|
||||
execute_task_process: &storage::ProcessTracker,
|
||||
profile: &domain::Profile,
|
||||
merchant_context: domain::MerchantContext,
|
||||
tracking_data: &pcr::RevenueRecoveryWorkflowTrackingData,
|
||||
revenue_recovery_payment_data: &pcr::RevenueRecoveryPaymentData,
|
||||
payment_intent: &PaymentIntent,
|
||||
@ -74,6 +193,8 @@ pub async fn perform_execute_payment(
|
||||
revenue_recovery_payment_data.merchant_account.get_id(),
|
||||
payment_intent,
|
||||
execute_task_process,
|
||||
profile,
|
||||
merchant_context,
|
||||
revenue_recovery_payment_data,
|
||||
&revenue_recovery_metadata,
|
||||
))
|
||||
@ -218,6 +339,7 @@ async fn insert_psync_pcr_task_to_pt(
|
||||
profile_id,
|
||||
payment_attempt_id,
|
||||
revenue_recovery_retry,
|
||||
invoice_scheduled_time: Some(schedule_time),
|
||||
};
|
||||
let tag = ["REVENUE_RECOVERY"];
|
||||
let process_tracker_entry = storage::ProcessTrackerNew::new(
|
||||
@ -249,6 +371,8 @@ async fn insert_psync_pcr_task_to_pt(
|
||||
pub async fn perform_payments_sync(
|
||||
state: &SessionState,
|
||||
process: &storage::ProcessTracker,
|
||||
profile: &domain::Profile,
|
||||
merchant_context: domain::MerchantContext,
|
||||
tracking_data: &pcr::RevenueRecoveryWorkflowTrackingData,
|
||||
revenue_recovery_payment_data: &pcr::RevenueRecoveryPaymentData,
|
||||
payment_intent: &PaymentIntent,
|
||||
@ -274,6 +398,8 @@ pub async fn perform_payments_sync(
|
||||
state,
|
||||
payment_intent,
|
||||
process.clone(),
|
||||
profile,
|
||||
merchant_context,
|
||||
revenue_recovery_payment_data,
|
||||
payment_attempt,
|
||||
&mut revenue_recovery_metadata,
|
||||
@ -284,6 +410,416 @@ pub async fn perform_payments_sync(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn perform_calculate_workflow(
|
||||
state: &SessionState,
|
||||
process: &storage::ProcessTracker,
|
||||
profile: &domain::Profile,
|
||||
merchant_context: domain::MerchantContext,
|
||||
tracking_data: &pcr::RevenueRecoveryWorkflowTrackingData,
|
||||
revenue_recovery_payment_data: &pcr::RevenueRecoveryPaymentData,
|
||||
payment_intent: &PaymentIntent,
|
||||
) -> Result<(), sch_errors::ProcessTrackerError> {
|
||||
let db = &*state.store;
|
||||
let merchant_id = revenue_recovery_payment_data.merchant_account.get_id();
|
||||
let profile_id = revenue_recovery_payment_data.profile.get_id();
|
||||
let billing_mca_id = revenue_recovery_payment_data.billing_mca.get_id();
|
||||
|
||||
logger::info!(
|
||||
process_id = %process.id,
|
||||
payment_id = %tracking_data.global_payment_id.get_string_repr(),
|
||||
"Starting CALCULATE_WORKFLOW..."
|
||||
);
|
||||
|
||||
// 1. Extract connector_customer_id and token_list from tracking_data
|
||||
let connector_customer_id = payment_intent
|
||||
.extract_connector_customer_id_from_payment_intent()
|
||||
.change_context(errors::RecoveryError::ValueNotFound)
|
||||
.attach_printable("Failed to extract customer ID from payment intent")?;
|
||||
|
||||
let merchant_context_from_revenue_recovery_payment_data =
|
||||
domain::MerchantContext::NormalMerchant(Box::new(domain::Context(
|
||||
revenue_recovery_payment_data.merchant_account.clone(),
|
||||
revenue_recovery_payment_data.key_store.clone(),
|
||||
)));
|
||||
|
||||
let retry_algorithm_type = match profile
|
||||
.revenue_recovery_retry_algorithm_type
|
||||
.filter(|retry_type|
|
||||
*retry_type != common_enums::RevenueRecoveryAlgorithmType::Monitoring) // ignore Monitoring in profile
|
||||
.unwrap_or(tracking_data.revenue_recovery_retry) // fallback to tracking_data
|
||||
{
|
||||
common_enums::RevenueRecoveryAlgorithmType::Smart => common_enums::RevenueRecoveryAlgorithmType::Smart,
|
||||
common_enums::RevenueRecoveryAlgorithmType::Cascading => common_enums::RevenueRecoveryAlgorithmType::Cascading,
|
||||
common_enums::RevenueRecoveryAlgorithmType::Monitoring => {
|
||||
return Err(sch_errors::ProcessTrackerError::ProcessUpdateFailed);
|
||||
}
|
||||
};
|
||||
|
||||
// 2. Get best available token
|
||||
let best_time_to_schedule = match workflows::revenue_recovery::get_token_with_schedule_time_based_on_retry_alogrithm_type(
|
||||
state,
|
||||
&connector_customer_id,
|
||||
payment_intent,
|
||||
retry_algorithm_type,
|
||||
process.retry_count,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(token_opt) => token_opt,
|
||||
Err(e) => {
|
||||
logger::error!(
|
||||
error = ?e,
|
||||
connector_customer_id = %connector_customer_id,
|
||||
"Failed to get best PSP token"
|
||||
);
|
||||
None
|
||||
}
|
||||
};
|
||||
|
||||
match best_time_to_schedule {
|
||||
Some(scheduled_time) => {
|
||||
logger::info!(
|
||||
process_id = %process.id,
|
||||
connector_customer_id = %connector_customer_id,
|
||||
"Found best available token, creating EXECUTE_WORKFLOW task"
|
||||
);
|
||||
|
||||
// 3. If token found: create EXECUTE_WORKFLOW task and finish CALCULATE_WORKFLOW
|
||||
insert_execute_pcr_task_to_pt(
|
||||
&tracking_data.billing_mca_id,
|
||||
state,
|
||||
&tracking_data.merchant_id,
|
||||
payment_intent,
|
||||
&tracking_data.profile_id,
|
||||
&tracking_data.payment_attempt_id,
|
||||
storage::ProcessTrackerRunner::PassiveRecoveryWorkflow,
|
||||
tracking_data.revenue_recovery_retry,
|
||||
scheduled_time,
|
||||
)
|
||||
.await?;
|
||||
|
||||
db.as_scheduler()
|
||||
.finish_process_with_business_status(
|
||||
process.clone(),
|
||||
business_status::CALCULATE_WORKFLOW_SCHEDULED,
|
||||
)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
logger::error!(
|
||||
process_id = %process.id,
|
||||
error = ?e,
|
||||
"Failed to update CALCULATE_WORKFLOW status to complete"
|
||||
);
|
||||
sch_errors::ProcessTrackerError::ProcessUpdateFailed
|
||||
})?;
|
||||
|
||||
logger::info!(
|
||||
process_id = %process.id,
|
||||
connector_customer_id = %connector_customer_id,
|
||||
"CALCULATE_WORKFLOW completed successfully"
|
||||
);
|
||||
}
|
||||
|
||||
None => {
|
||||
let scheduled_token = match storage::revenue_recovery_redis_operation::
|
||||
RedisTokenManager::get_payment_processor_token_with_schedule_time(state, &connector_customer_id)
|
||||
.await {
|
||||
Ok(scheduled_token_opt) => scheduled_token_opt,
|
||||
Err(e) => {
|
||||
logger::error!(
|
||||
error = ?e,
|
||||
connector_customer_id = %connector_customer_id,
|
||||
"Failed to get PSP token status"
|
||||
);
|
||||
None
|
||||
}
|
||||
};
|
||||
|
||||
match scheduled_token {
|
||||
Some(scheduled_token) => {
|
||||
// Update scheduled time to scheduled time + 15 minutes
|
||||
// here scheduled_time is the wait time 15 minutes is a buffer time that we are adding
|
||||
logger::info!(
|
||||
process_id = %process.id,
|
||||
connector_customer_id = %connector_customer_id,
|
||||
"No token but time available, rescheduling for scheduled time + 15 mins"
|
||||
);
|
||||
|
||||
update_calculate_job_schedule_time(
|
||||
db,
|
||||
process,
|
||||
time::Duration::minutes(15),
|
||||
scheduled_token.scheduled_at,
|
||||
&connector_customer_id,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
None => {
|
||||
let hard_decline_flag = storage::revenue_recovery_redis_operation::
|
||||
RedisTokenManager::are_all_tokens_hard_declined(
|
||||
state,
|
||||
&connector_customer_id
|
||||
)
|
||||
.await
|
||||
.ok()
|
||||
.unwrap_or(false);
|
||||
|
||||
match hard_decline_flag {
|
||||
false => {
|
||||
logger::info!(
|
||||
process_id = %process.id,
|
||||
connector_customer_id = %connector_customer_id,
|
||||
"Hard decline flag is false, rescheduling for scheduled time + 15 mins"
|
||||
);
|
||||
|
||||
update_calculate_job_schedule_time(
|
||||
db,
|
||||
process,
|
||||
time::Duration::minutes(15),
|
||||
Some(common_utils::date_time::now()),
|
||||
&connector_customer_id,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
true => {
|
||||
// Finish calculate workflow with CALCULATE_WORKFLOW_FINISH
|
||||
logger::info!(
|
||||
process_id = %process.id,
|
||||
connector_customer_id = %connector_customer_id,
|
||||
"No token available, finishing CALCULATE_WORKFLOW"
|
||||
);
|
||||
|
||||
db.as_scheduler()
|
||||
.finish_process_with_business_status(
|
||||
process.clone(),
|
||||
business_status::CALCULATE_WORKFLOW_FINISH,
|
||||
)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
logger::error!(
|
||||
process_id = %process.id,
|
||||
error = ?e,
|
||||
"Failed to finish CALCULATE_WORKFLOW"
|
||||
);
|
||||
sch_errors::ProcessTrackerError::ProcessUpdateFailed
|
||||
})?;
|
||||
|
||||
logger::info!(
|
||||
process_id = %process.id,
|
||||
connector_customer_id = %connector_customer_id,
|
||||
"CALCULATE_WORKFLOW finished successfully"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Update the schedule time for a CALCULATE_WORKFLOW process tracker
|
||||
async fn update_calculate_job_schedule_time(
|
||||
db: &dyn StorageInterface,
|
||||
process: &storage::ProcessTracker,
|
||||
additional_time: time::Duration,
|
||||
base_time: Option<time::PrimitiveDateTime>,
|
||||
connector_customer_id: &str,
|
||||
) -> Result<(), sch_errors::ProcessTrackerError> {
|
||||
let new_schedule_time =
|
||||
base_time.unwrap_or_else(common_utils::date_time::now) + additional_time;
|
||||
|
||||
let pt_update = storage::ProcessTrackerUpdate::Update {
|
||||
name: Some("CALCULATE_WORKFLOW".to_string()),
|
||||
retry_count: Some(process.clone().retry_count),
|
||||
schedule_time: Some(new_schedule_time),
|
||||
tracking_data: Some(process.clone().tracking_data),
|
||||
business_status: Some(String::from(business_status::PENDING)),
|
||||
status: Some(common_enums::ProcessTrackerStatus::Pending),
|
||||
updated_at: Some(common_utils::date_time::now()),
|
||||
};
|
||||
|
||||
db.as_scheduler()
|
||||
.update_process(process.clone(), pt_update)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
logger::error!(
|
||||
process_id = %process.id,
|
||||
error = ?e,
|
||||
"Failed to reschedule CALCULATE_WORKFLOW"
|
||||
);
|
||||
sch_errors::ProcessTrackerError::ProcessUpdateFailed
|
||||
})?;
|
||||
|
||||
logger::info!(
|
||||
process_id = %process.id,
|
||||
connector_customer_id = %connector_customer_id,
|
||||
new_schedule_time = %new_schedule_time,
|
||||
additional_time = ?additional_time,
|
||||
"CALCULATE_WORKFLOW rescheduled successfully"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Insert Execute PCR Task to Process Tracker
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
async fn insert_execute_pcr_task_to_pt(
|
||||
billing_mca_id: &id_type::MerchantConnectorAccountId,
|
||||
state: &SessionState,
|
||||
merchant_id: &id_type::MerchantId,
|
||||
payment_intent: &PaymentIntent,
|
||||
profile_id: &id_type::ProfileId,
|
||||
payment_attempt_id: &id_type::GlobalAttemptId,
|
||||
runner: storage::ProcessTrackerRunner,
|
||||
revenue_recovery_retry: diesel_enum::RevenueRecoveryAlgorithmType,
|
||||
schedule_time: time::PrimitiveDateTime,
|
||||
) -> Result<storage::ProcessTracker, sch_errors::ProcessTrackerError> {
|
||||
let task = "EXECUTE_WORKFLOW";
|
||||
|
||||
let payment_id = payment_intent.id.clone();
|
||||
|
||||
let process_tracker_id = format!("{runner}_{task}_{}", payment_id.get_string_repr());
|
||||
|
||||
// Check if a process tracker entry already exists for this payment intent
|
||||
let existing_entry = state
|
||||
.store
|
||||
.find_process_by_id(&process_tracker_id)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
logger::error!(
|
||||
payment_id = %payment_id.get_string_repr(),
|
||||
process_tracker_id = %process_tracker_id,
|
||||
error = ?e,
|
||||
"Failed to check for existing execute workflow process tracker entry"
|
||||
);
|
||||
sch_errors::ProcessTrackerError::ProcessUpdateFailed
|
||||
})?;
|
||||
|
||||
match existing_entry {
|
||||
Some(existing_process)
|
||||
if existing_process.business_status == business_status::EXECUTE_WORKFLOW_FAILURE
|
||||
|| existing_process.business_status
|
||||
== business_status::EXECUTE_WORKFLOW_COMPLETE_FOR_PSYNC =>
|
||||
{
|
||||
// Entry exists with EXECUTE_WORKFLOW_COMPLETE status - update it
|
||||
logger::info!(
|
||||
payment_id = %payment_id.get_string_repr(),
|
||||
process_tracker_id = %process_tracker_id,
|
||||
current_retry_count = %existing_process.retry_count,
|
||||
"Found existing EXECUTE_WORKFLOW task with COMPLETE status, updating to PENDING with incremented retry count"
|
||||
);
|
||||
|
||||
let pt_update = storage::ProcessTrackerUpdate::Update {
|
||||
name: Some(task.to_string()),
|
||||
retry_count: Some(existing_process.clone().retry_count + 1),
|
||||
schedule_time: Some(schedule_time),
|
||||
tracking_data: Some(existing_process.clone().tracking_data),
|
||||
business_status: Some(String::from(business_status::PENDING)),
|
||||
status: Some(enums::ProcessTrackerStatus::Pending),
|
||||
updated_at: Some(common_utils::date_time::now()),
|
||||
};
|
||||
|
||||
let updated_process = state
|
||||
.store
|
||||
.update_process(existing_process, pt_update)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
logger::error!(
|
||||
payment_id = %payment_id.get_string_repr(),
|
||||
process_tracker_id = %process_tracker_id,
|
||||
error = ?e,
|
||||
"Failed to update existing execute workflow process tracker entry"
|
||||
);
|
||||
sch_errors::ProcessTrackerError::ProcessUpdateFailed
|
||||
})?;
|
||||
|
||||
logger::info!(
|
||||
payment_id = %payment_id.get_string_repr(),
|
||||
process_tracker_id = %process_tracker_id,
|
||||
new_retry_count = %updated_process.retry_count,
|
||||
"Successfully updated existing EXECUTE_WORKFLOW task"
|
||||
);
|
||||
|
||||
Ok(updated_process)
|
||||
}
|
||||
Some(existing_process) => {
|
||||
// Entry exists but business status is not EXECUTE_WORKFLOW_COMPLETE
|
||||
logger::info!(
|
||||
payment_id = %payment_id.get_string_repr(),
|
||||
process_tracker_id = %process_tracker_id,
|
||||
current_business_status = %existing_process.business_status,
|
||||
);
|
||||
|
||||
Ok(existing_process)
|
||||
}
|
||||
None => {
|
||||
// No entry exists - create a new one
|
||||
logger::info!(
|
||||
payment_id = %payment_id.get_string_repr(),
|
||||
process_tracker_id = %process_tracker_id,
|
||||
"No existing EXECUTE_WORKFLOW task found, creating new entry"
|
||||
);
|
||||
|
||||
let execute_workflow_tracking_data = pcr::RevenueRecoveryWorkflowTrackingData {
|
||||
billing_mca_id: billing_mca_id.clone(),
|
||||
global_payment_id: payment_id.clone(),
|
||||
merchant_id: merchant_id.clone(),
|
||||
profile_id: profile_id.clone(),
|
||||
payment_attempt_id: payment_attempt_id.clone(),
|
||||
revenue_recovery_retry,
|
||||
invoice_scheduled_time: Some(schedule_time),
|
||||
};
|
||||
|
||||
let tag = ["PCR"];
|
||||
let process_tracker_entry = storage::ProcessTrackerNew::new(
|
||||
process_tracker_id.clone(),
|
||||
task,
|
||||
runner,
|
||||
tag,
|
||||
execute_workflow_tracking_data,
|
||||
Some(1),
|
||||
schedule_time,
|
||||
common_types::consts::API_VERSION,
|
||||
)
|
||||
.map_err(|e| {
|
||||
logger::error!(
|
||||
payment_id = %payment_id.get_string_repr(),
|
||||
error = ?e,
|
||||
"Failed to construct execute workflow process tracker entry"
|
||||
);
|
||||
sch_errors::ProcessTrackerError::ProcessUpdateFailed
|
||||
})?;
|
||||
|
||||
let response = state
|
||||
.store
|
||||
.insert_process(process_tracker_entry)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
logger::error!(
|
||||
payment_id = %payment_id.get_string_repr(),
|
||||
error = ?e,
|
||||
"Failed to insert execute workflow process tracker entry"
|
||||
);
|
||||
sch_errors::ProcessTrackerError::ProcessUpdateFailed
|
||||
})?;
|
||||
|
||||
metrics::TASKS_ADDED_COUNT.add(
|
||||
1,
|
||||
router_env::metric_attributes!(("flow", "RevenueRecoveryExecute")),
|
||||
);
|
||||
|
||||
logger::info!(
|
||||
payment_id = %payment_id.get_string_repr(),
|
||||
process_tracker_id = %response.id,
|
||||
"Successfully created new EXECUTE_WORKFLOW task"
|
||||
);
|
||||
|
||||
Ok(response)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn retrieve_revenue_recovery_process_tracker(
|
||||
state: SessionState,
|
||||
id: id_type::GlobalPaymentId,
|
||||
|
||||
@ -31,11 +31,12 @@ use hyperswitch_domain_models::{
|
||||
};
|
||||
use time::PrimitiveDateTime;
|
||||
|
||||
use super::errors::StorageErrorExt;
|
||||
use crate::{
|
||||
core::{
|
||||
errors::{self, RouterResult},
|
||||
payments::{self, helpers, operations::Operation},
|
||||
revenue_recovery::{self as revenue_recovery_core},
|
||||
revenue_recovery::{self as revenue_recovery_core, perform_calculate_workflow},
|
||||
webhooks::recovery_incoming as recovery_incoming_flow,
|
||||
},
|
||||
db::StorageInterface,
|
||||
@ -43,9 +44,13 @@ use crate::{
|
||||
routes::SessionState,
|
||||
services::{self, connector_integration_interface::RouterDataConversion},
|
||||
types::{
|
||||
self, api as api_types, api::payments as payments_types, storage, transformers::ForeignInto,
|
||||
self, api as api_types, api::payments as payments_types, domain, storage,
|
||||
transformers::ForeignInto,
|
||||
},
|
||||
workflows::{
|
||||
payment_sync,
|
||||
revenue_recovery::{self, get_schedule_time_to_retry_mit_payments},
|
||||
},
|
||||
workflows::{payment_sync, revenue_recovery::get_schedule_time_to_retry_mit_payments},
|
||||
};
|
||||
|
||||
type RecoveryResult<T> = error_stack::Result<T, errors::RecoveryError>;
|
||||
@ -93,15 +98,23 @@ impl RevenueRecoveryPaymentsAttemptStatus {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub(crate) async fn update_pt_status_based_on_attempt_status_for_payments_sync(
|
||||
&self,
|
||||
state: &SessionState,
|
||||
payment_intent: &PaymentIntent,
|
||||
process_tracker: storage::ProcessTracker,
|
||||
profile: &domain::Profile,
|
||||
merchant_context: domain::MerchantContext,
|
||||
revenue_recovery_payment_data: &storage::revenue_recovery::RevenueRecoveryPaymentData,
|
||||
payment_attempt: PaymentAttempt,
|
||||
revenue_recovery_metadata: &mut PaymentRevenueRecoveryMetadata,
|
||||
) -> Result<(), errors::ProcessTrackerError> {
|
||||
let connector_customer_id = payment_intent
|
||||
.extract_connector_customer_id_from_payment_intent()
|
||||
.change_context(errors::RecoveryError::ValueNotFound)
|
||||
.attach_printable("Failed to extract customer ID from payment intent")?;
|
||||
|
||||
let db = &*state.store;
|
||||
|
||||
let recovery_payment_intent =
|
||||
@ -143,6 +156,26 @@ impl RevenueRecoveryPaymentsAttemptStatus {
|
||||
);
|
||||
};
|
||||
|
||||
let is_hard_decline = revenue_recovery::check_hard_decline(state, &payment_attempt)
|
||||
.await
|
||||
.ok();
|
||||
|
||||
// update the status of token in redis
|
||||
let _update_error_code = storage::revenue_recovery_redis_operation::RedisTokenManager::update_payment_processor_token_error_code_from_process_tracker(
|
||||
state,
|
||||
&connector_customer_id,
|
||||
&None,
|
||||
&is_hard_decline
|
||||
)
|
||||
.await;
|
||||
|
||||
// unlocking the token
|
||||
let _unlock_the_connector_customer_id = storage::revenue_recovery_redis_operation::RedisTokenManager::unlock_connector_customer_status(
|
||||
state,
|
||||
&connector_customer_id,
|
||||
)
|
||||
.await;
|
||||
|
||||
// Record a successful transaction back to Billing Connector
|
||||
// TODO: Add support for retrying failed outgoing recordback webhooks
|
||||
record_back_to_billing_connector(
|
||||
@ -175,26 +208,38 @@ impl RevenueRecoveryPaymentsAttemptStatus {
|
||||
);
|
||||
};
|
||||
|
||||
// get a reschedule time
|
||||
let schedule_time = get_schedule_time_to_retry_mit_payments(
|
||||
db,
|
||||
&revenue_recovery_payment_data
|
||||
.merchant_account
|
||||
.get_id()
|
||||
.clone(),
|
||||
process_tracker.retry_count + 1,
|
||||
let error_code = recovery_payment_attempt.error_code;
|
||||
|
||||
let is_hard_decline = revenue_recovery::check_hard_decline(state, &payment_attempt)
|
||||
.await
|
||||
.ok();
|
||||
|
||||
// update the status of token in redis
|
||||
let _update_error_code = storage::revenue_recovery_redis_operation::RedisTokenManager::update_payment_processor_token_error_code_from_process_tracker(
|
||||
state,
|
||||
&connector_customer_id,
|
||||
&error_code,
|
||||
&is_hard_decline
|
||||
)
|
||||
.await;
|
||||
|
||||
// check if retry is possible
|
||||
if let Some(schedule_time) = schedule_time {
|
||||
// schedule a retry
|
||||
// TODO: Update connecter called field and active attempt
|
||||
// unlocking the token
|
||||
let _unlock_the_connector_customer_id = storage::revenue_recovery_redis_operation::RedisTokenManager::unlock_connector_customer_status(
|
||||
state,
|
||||
&connector_customer_id,
|
||||
)
|
||||
.await;
|
||||
|
||||
db.as_scheduler()
|
||||
.retry_process(process_tracker.clone(), schedule_time)
|
||||
.await?;
|
||||
}
|
||||
// Reopen calculate workflow on payment failure
|
||||
reopen_calculate_workflow_on_payment_failure(
|
||||
state,
|
||||
&process_tracker,
|
||||
profile,
|
||||
merchant_context,
|
||||
payment_intent,
|
||||
revenue_recovery_payment_data,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
Self::Processing => {
|
||||
// do a psync payment
|
||||
@ -203,6 +248,8 @@ impl RevenueRecoveryPaymentsAttemptStatus {
|
||||
revenue_recovery_payment_data,
|
||||
payment_intent,
|
||||
&process_tracker,
|
||||
profile,
|
||||
merchant_context,
|
||||
payment_attempt,
|
||||
))
|
||||
.await?;
|
||||
@ -307,105 +354,227 @@ pub enum Action {
|
||||
ManualReviewAction,
|
||||
}
|
||||
impl Action {
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub async fn execute_payment(
|
||||
state: &SessionState,
|
||||
merchant_id: &id_type::MerchantId,
|
||||
payment_intent: &PaymentIntent,
|
||||
process: &storage::ProcessTracker,
|
||||
profile: &domain::Profile,
|
||||
merchant_context: domain::MerchantContext,
|
||||
revenue_recovery_payment_data: &storage::revenue_recovery::RevenueRecoveryPaymentData,
|
||||
revenue_recovery_metadata: &PaymentRevenueRecoveryMetadata,
|
||||
) -> RecoveryResult<Self> {
|
||||
let response = revenue_recovery_core::api::call_proxy_api(
|
||||
state,
|
||||
payment_intent,
|
||||
revenue_recovery_payment_data,
|
||||
revenue_recovery_metadata,
|
||||
)
|
||||
.await;
|
||||
let recovery_payment_intent =
|
||||
hyperswitch_domain_models::revenue_recovery::RecoveryPaymentIntent::from(
|
||||
payment_intent,
|
||||
);
|
||||
let connector_customer_id = payment_intent
|
||||
.extract_connector_customer_id_from_payment_intent()
|
||||
.change_context(errors::RecoveryError::ValueNotFound)
|
||||
.attach_printable("Failed to extract customer ID from payment intent")?;
|
||||
|
||||
// handle proxy api's response
|
||||
match response {
|
||||
Ok(payment_data) => match payment_data.payment_attempt.status.foreign_into() {
|
||||
RevenueRecoveryPaymentsAttemptStatus::Succeeded => {
|
||||
let recovery_payment_attempt =
|
||||
hyperswitch_domain_models::revenue_recovery::RecoveryPaymentAttempt::from(
|
||||
&payment_data.payment_attempt,
|
||||
);
|
||||
let scheduled_token = match storage::revenue_recovery_redis_operation::
|
||||
RedisTokenManager::get_payment_processor_token_with_schedule_time(state, &connector_customer_id)
|
||||
.await {
|
||||
Ok(scheduled_token_opt) => scheduled_token_opt,
|
||||
Err(e) => {
|
||||
logger::error!(
|
||||
error = ?e,
|
||||
connector_customer_id = %connector_customer_id,
|
||||
"Failed to get PSP token status"
|
||||
);
|
||||
None
|
||||
}
|
||||
};
|
||||
|
||||
let recovery_payment_tuple = recovery_incoming_flow::RecoveryPaymentTuple::new(
|
||||
&recovery_payment_intent,
|
||||
&recovery_payment_attempt,
|
||||
);
|
||||
|
||||
// publish events to kafka
|
||||
if let Err(e) = recovery_incoming_flow::RecoveryPaymentTuple::publish_revenue_recovery_event_to_kafka(
|
||||
match scheduled_token {
|
||||
Some(scheduled_token) => {
|
||||
let response = revenue_recovery_core::api::call_proxy_api(
|
||||
state,
|
||||
&recovery_payment_tuple,
|
||||
Some(process.retry_count+1)
|
||||
payment_intent,
|
||||
revenue_recovery_payment_data,
|
||||
revenue_recovery_metadata,
|
||||
)
|
||||
.await{
|
||||
router_env::logger::error!(
|
||||
"Failed to publish revenue recovery event to kafka: {:?}",
|
||||
e
|
||||
);
|
||||
};
|
||||
.await;
|
||||
|
||||
Ok(Self::SuccessfulPayment(
|
||||
payment_data.payment_attempt.clone(),
|
||||
))
|
||||
}
|
||||
RevenueRecoveryPaymentsAttemptStatus::Failed => {
|
||||
let recovery_payment_attempt =
|
||||
hyperswitch_domain_models::revenue_recovery::RecoveryPaymentAttempt::from(
|
||||
&payment_data.payment_attempt,
|
||||
);
|
||||
|
||||
let recovery_payment_tuple = recovery_incoming_flow::RecoveryPaymentTuple::new(
|
||||
&recovery_payment_intent,
|
||||
&recovery_payment_attempt,
|
||||
);
|
||||
|
||||
// publish events to kafka
|
||||
if let Err(e) = recovery_incoming_flow::RecoveryPaymentTuple::publish_revenue_recovery_event_to_kafka(
|
||||
state,
|
||||
&recovery_payment_tuple,
|
||||
Some(process.retry_count+1)
|
||||
)
|
||||
.await{
|
||||
router_env::logger::error!(
|
||||
"Failed to publish revenue recovery event to kafka: {:?}",
|
||||
e
|
||||
);
|
||||
};
|
||||
|
||||
Self::decide_retry_failure_action(
|
||||
state,
|
||||
merchant_id,
|
||||
process.clone(),
|
||||
revenue_recovery_payment_data,
|
||||
&payment_data.payment_attempt,
|
||||
let recovery_payment_intent =
|
||||
hyperswitch_domain_models::revenue_recovery::RecoveryPaymentIntent::from(
|
||||
payment_intent,
|
||||
);
|
||||
|
||||
// handle proxy api's response
|
||||
match response {
|
||||
Ok(payment_data) => match payment_data.payment_attempt.status.foreign_into() {
|
||||
RevenueRecoveryPaymentsAttemptStatus::Succeeded => {
|
||||
let recovery_payment_attempt =
|
||||
hyperswitch_domain_models::revenue_recovery::RecoveryPaymentAttempt::from(
|
||||
&payment_data.payment_attempt,
|
||||
);
|
||||
|
||||
let recovery_payment_tuple =
|
||||
recovery_incoming_flow::RecoveryPaymentTuple::new(
|
||||
&recovery_payment_intent,
|
||||
&recovery_payment_attempt,
|
||||
);
|
||||
|
||||
// publish events to kafka
|
||||
if let Err(e) = recovery_incoming_flow::RecoveryPaymentTuple::publish_revenue_recovery_event_to_kafka(
|
||||
state,
|
||||
&recovery_payment_tuple,
|
||||
Some(process.retry_count+1)
|
||||
)
|
||||
.await{
|
||||
router_env::logger::error!(
|
||||
"Failed to publish revenue recovery event to kafka: {:?}",
|
||||
e
|
||||
);
|
||||
};
|
||||
|
||||
let is_hard_decline = revenue_recovery::check_hard_decline(
|
||||
state,
|
||||
&payment_data.payment_attempt,
|
||||
)
|
||||
.await
|
||||
.ok();
|
||||
|
||||
// update the status of token in redis
|
||||
let _update_error_code = storage::revenue_recovery_redis_operation::RedisTokenManager::update_payment_processor_token_error_code_from_process_tracker(
|
||||
state,
|
||||
&connector_customer_id,
|
||||
&None,
|
||||
&is_hard_decline
|
||||
)
|
||||
.await;
|
||||
|
||||
// unlocking the token
|
||||
let _unlock_the_connector_customer_id = storage::revenue_recovery_redis_operation::RedisTokenManager::unlock_connector_customer_status(
|
||||
state,
|
||||
&connector_customer_id,
|
||||
)
|
||||
.await;
|
||||
|
||||
Ok(Self::SuccessfulPayment(
|
||||
payment_data.payment_attempt.clone(),
|
||||
))
|
||||
}
|
||||
RevenueRecoveryPaymentsAttemptStatus::Failed => {
|
||||
let recovery_payment_attempt =
|
||||
hyperswitch_domain_models::revenue_recovery::RecoveryPaymentAttempt::from(
|
||||
&payment_data.payment_attempt,
|
||||
);
|
||||
|
||||
let recovery_payment_tuple =
|
||||
recovery_incoming_flow::RecoveryPaymentTuple::new(
|
||||
&recovery_payment_intent,
|
||||
&recovery_payment_attempt,
|
||||
);
|
||||
|
||||
// publish events to kafka
|
||||
if let Err(e) = recovery_incoming_flow::RecoveryPaymentTuple::publish_revenue_recovery_event_to_kafka(
|
||||
state,
|
||||
&recovery_payment_tuple,
|
||||
Some(process.retry_count+1)
|
||||
)
|
||||
.await{
|
||||
router_env::logger::error!(
|
||||
"Failed to publish revenue recovery event to kafka: {:?}",
|
||||
e
|
||||
);
|
||||
};
|
||||
|
||||
let error_code = payment_data
|
||||
.payment_attempt
|
||||
.clone()
|
||||
.error
|
||||
.map(|error| error.code);
|
||||
|
||||
let is_hard_decline = revenue_recovery::check_hard_decline(
|
||||
state,
|
||||
&payment_data.payment_attempt,
|
||||
)
|
||||
.await
|
||||
.ok();
|
||||
|
||||
let _update_connector_customer_id = storage::revenue_recovery_redis_operation::RedisTokenManager::update_payment_processor_token_error_code_from_process_tracker(
|
||||
state,
|
||||
&connector_customer_id,
|
||||
&error_code,
|
||||
&is_hard_decline
|
||||
)
|
||||
.await;
|
||||
|
||||
// unlocking the token
|
||||
let _unlock_connector_customer_id = storage::revenue_recovery_redis_operation::RedisTokenManager::unlock_connector_customer_status(
|
||||
state,
|
||||
&connector_customer_id,
|
||||
)
|
||||
.await;
|
||||
|
||||
// Reopen calculate workflow on payment failure
|
||||
reopen_calculate_workflow_on_payment_failure(
|
||||
state,
|
||||
process,
|
||||
profile,
|
||||
merchant_context,
|
||||
payment_intent,
|
||||
revenue_recovery_payment_data,
|
||||
)
|
||||
.await?;
|
||||
|
||||
// Return terminal failure to finish the current execute workflow
|
||||
Ok(Self::TerminalFailure(payment_data.payment_attempt.clone()))
|
||||
}
|
||||
|
||||
RevenueRecoveryPaymentsAttemptStatus::Processing => {
|
||||
Ok(Self::SyncPayment(payment_data.payment_attempt.clone()))
|
||||
}
|
||||
RevenueRecoveryPaymentsAttemptStatus::InvalidStatus(action) => {
|
||||
logger::info!(?action, "Invalid Payment Status For PCR Payment");
|
||||
Ok(Self::ManualReviewAction)
|
||||
}
|
||||
},
|
||||
Err(err) =>
|
||||
// check for an active attempt being constructed or not
|
||||
{
|
||||
logger::error!(execute_payment_res=?err);
|
||||
Ok(Self::ReviewPayment)
|
||||
}
|
||||
}
|
||||
}
|
||||
None => {
|
||||
let response = revenue_recovery_core::api::call_psync_api(
|
||||
state,
|
||||
payment_intent.get_id(),
|
||||
revenue_recovery_payment_data,
|
||||
)
|
||||
.await;
|
||||
|
||||
let payment_status_data = response
|
||||
.change_context(errors::RecoveryError::PaymentCallFailed)
|
||||
.attach_printable("Error while executing the Psync call")?;
|
||||
|
||||
let payment_attempt = payment_status_data.payment_attempt;
|
||||
|
||||
logger::info!(
|
||||
process_id = %process.id,
|
||||
connector_customer_id = %connector_customer_id,
|
||||
"No token available, finishing CALCULATE_WORKFLOW"
|
||||
);
|
||||
|
||||
state
|
||||
.store
|
||||
.as_scheduler()
|
||||
.finish_process_with_business_status(
|
||||
process.clone(),
|
||||
business_status::CALCULATE_WORKFLOW_FINISH,
|
||||
)
|
||||
.await
|
||||
}
|
||||
.change_context(errors::RecoveryError::ProcessTrackerFailure)
|
||||
.attach_printable("Failed to finish CALCULATE_WORKFLOW")?;
|
||||
|
||||
RevenueRecoveryPaymentsAttemptStatus::Processing => {
|
||||
Ok(Self::SyncPayment(payment_data.payment_attempt.clone()))
|
||||
}
|
||||
RevenueRecoveryPaymentsAttemptStatus::InvalidStatus(action) => {
|
||||
logger::info!(?action, "Invalid Payment Status For PCR Payment");
|
||||
Ok(Self::ManualReviewAction)
|
||||
}
|
||||
},
|
||||
Err(err) =>
|
||||
// check for an active attempt being constructed or not
|
||||
{
|
||||
logger::error!(execute_payment_res=?err);
|
||||
Ok(Self::ReviewPayment)
|
||||
logger::info!(
|
||||
process_id = %process.id,
|
||||
connector_customer_id = %connector_customer_id,
|
||||
"CALCULATE_WORKFLOW finished successfully"
|
||||
);
|
||||
Ok(Self::TerminalFailure(payment_attempt.clone()))
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -486,16 +655,45 @@ impl Action {
|
||||
Ok(())
|
||||
}
|
||||
Self::TerminalFailure(payment_attempt) => {
|
||||
// update the connector payment transmission field to Unsuccessful and unset active attempt id
|
||||
revenue_recovery_metadata.set_payment_transmission_field_for_api_request(
|
||||
enums::PaymentConnectorTransmission::ConnectorCallUnsuccessful,
|
||||
);
|
||||
|
||||
let payment_update_req =
|
||||
PaymentsUpdateIntentRequest::update_feature_metadata_and_active_attempt_with_api(
|
||||
payment_intent
|
||||
.feature_metadata
|
||||
.clone()
|
||||
.unwrap_or_default()
|
||||
.convert_back()
|
||||
.set_payment_revenue_recovery_metadata_using_api(
|
||||
revenue_recovery_metadata.clone(),
|
||||
),
|
||||
api_enums::UpdateActiveAttempt::Unset,
|
||||
);
|
||||
logger::info!(
|
||||
"Call made to payments update intent api , with the request body {:?}",
|
||||
payment_update_req
|
||||
);
|
||||
revenue_recovery_core::api::update_payment_intent_api(
|
||||
state,
|
||||
payment_intent.id.clone(),
|
||||
revenue_recovery_payment_data,
|
||||
payment_update_req,
|
||||
)
|
||||
.await
|
||||
.change_context(errors::RecoveryError::PaymentCallFailed)?;
|
||||
|
||||
db.as_scheduler()
|
||||
.finish_process_with_business_status(
|
||||
execute_task_process.clone(),
|
||||
business_status::EXECUTE_WORKFLOW_COMPLETE,
|
||||
business_status::EXECUTE_WORKFLOW_FAILURE,
|
||||
)
|
||||
.await
|
||||
.change_context(errors::RecoveryError::ProcessTrackerFailure)
|
||||
.attach_printable("Failed to update the process tracker")?;
|
||||
// TODO: Add support for retrying failed outgoing recordback webhooks
|
||||
|
||||
Ok(())
|
||||
}
|
||||
Self::SuccessfulPayment(payment_attempt) => {
|
||||
@ -551,6 +749,8 @@ impl Action {
|
||||
revenue_recovery_payment_data: &storage::revenue_recovery::RevenueRecoveryPaymentData,
|
||||
payment_intent: &PaymentIntent,
|
||||
process: &storage::ProcessTracker,
|
||||
profile: &domain::Profile,
|
||||
merchant_context: domain::MerchantContext,
|
||||
payment_attempt: PaymentAttempt,
|
||||
) -> RecoveryResult<Self> {
|
||||
let response = revenue_recovery_core::api::call_psync_api(
|
||||
@ -563,18 +763,74 @@ impl Action {
|
||||
match response {
|
||||
Ok(_payment_data) => match payment_attempt.status.foreign_into() {
|
||||
RevenueRecoveryPaymentsAttemptStatus::Succeeded => {
|
||||
let connector_customer_id = payment_intent
|
||||
.extract_connector_customer_id_from_payment_intent()
|
||||
.change_context(errors::RecoveryError::ValueNotFound)
|
||||
.attach_printable("Failed to extract customer ID from payment intent")?;
|
||||
|
||||
let is_hard_decline =
|
||||
revenue_recovery::check_hard_decline(state, &payment_attempt)
|
||||
.await
|
||||
.ok();
|
||||
|
||||
// update the status of token in redis
|
||||
let _update_error_code = storage::revenue_recovery_redis_operation::RedisTokenManager::update_payment_processor_token_error_code_from_process_tracker(
|
||||
state,
|
||||
&connector_customer_id,
|
||||
&None,
|
||||
&is_hard_decline
|
||||
)
|
||||
.await;
|
||||
|
||||
// unlocking the token
|
||||
let _unlock_the_connector_customer_id = storage::revenue_recovery_redis_operation::RedisTokenManager::unlock_connector_customer_status(
|
||||
state,
|
||||
&connector_customer_id,
|
||||
)
|
||||
.await;
|
||||
|
||||
Ok(Self::SuccessfulPayment(payment_attempt))
|
||||
}
|
||||
RevenueRecoveryPaymentsAttemptStatus::Failed => {
|
||||
Self::decide_retry_failure_action(
|
||||
let connector_customer_id = payment_intent
|
||||
.extract_connector_customer_id_from_payment_intent()
|
||||
.change_context(errors::RecoveryError::ValueNotFound)
|
||||
.attach_printable("Failed to extract customer ID from payment intent")?;
|
||||
|
||||
let error_code = payment_attempt.clone().error.map(|error| error.code);
|
||||
|
||||
let is_hard_decline =
|
||||
revenue_recovery::check_hard_decline(state, &payment_attempt)
|
||||
.await
|
||||
.ok();
|
||||
|
||||
let _update_error_code = storage::revenue_recovery_redis_operation::RedisTokenManager::update_payment_processor_token_error_code_from_process_tracker(
|
||||
state,
|
||||
&connector_customer_id,
|
||||
&error_code,
|
||||
&is_hard_decline
|
||||
)
|
||||
.await;
|
||||
|
||||
// unlocking the token
|
||||
let _unlock_connector_customer_id = storage::revenue_recovery_redis_operation::RedisTokenManager::unlock_connector_customer_status(
|
||||
state,
|
||||
revenue_recovery_payment_data.merchant_account.get_id(),
|
||||
process.clone(),
|
||||
revenue_recovery_payment_data,
|
||||
&payment_attempt,
|
||||
payment_intent,
|
||||
&connector_customer_id,
|
||||
)
|
||||
.await
|
||||
.await;
|
||||
|
||||
// Reopen calculate workflow on payment failure
|
||||
reopen_calculate_workflow_on_payment_failure(
|
||||
state,
|
||||
process,
|
||||
profile,
|
||||
merchant_context,
|
||||
payment_intent,
|
||||
revenue_recovery_payment_data,
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(Self::TerminalFailure(payment_attempt.clone()))
|
||||
}
|
||||
|
||||
RevenueRecoveryPaymentsAttemptStatus::Processing => {
|
||||
@ -684,6 +940,37 @@ impl Action {
|
||||
}
|
||||
|
||||
Self::TerminalFailure(payment_attempt) => {
|
||||
// update the connector payment transmission field to Unsuccessful and unset active attempt id
|
||||
revenue_recovery_metadata.set_payment_transmission_field_for_api_request(
|
||||
enums::PaymentConnectorTransmission::ConnectorCallUnsuccessful,
|
||||
);
|
||||
|
||||
let payment_update_req =
|
||||
PaymentsUpdateIntentRequest::update_feature_metadata_and_active_attempt_with_api(
|
||||
payment_intent
|
||||
.feature_metadata
|
||||
.clone()
|
||||
.unwrap_or_default()
|
||||
.convert_back()
|
||||
.set_payment_revenue_recovery_metadata_using_api(
|
||||
revenue_recovery_metadata.clone(),
|
||||
),
|
||||
api_enums::UpdateActiveAttempt::Unset,
|
||||
);
|
||||
logger::info!(
|
||||
"Call made to payments update intent api , with the request body {:?}",
|
||||
payment_update_req
|
||||
);
|
||||
|
||||
revenue_recovery_core::api::update_payment_intent_api(
|
||||
state,
|
||||
payment_intent.id.clone(),
|
||||
revenue_recovery_payment_data,
|
||||
payment_update_req,
|
||||
)
|
||||
.await
|
||||
.change_context(errors::RecoveryError::PaymentCallFailed)?;
|
||||
|
||||
// TODO: Add support for retrying failed outgoing recordback webhooks
|
||||
// finish the current psync task
|
||||
db.as_scheduler()
|
||||
@ -804,6 +1091,140 @@ impl Action {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Reopen calculate workflow when payment fails
|
||||
pub async fn reopen_calculate_workflow_on_payment_failure(
|
||||
state: &SessionState,
|
||||
process: &storage::ProcessTracker,
|
||||
profile: &domain::Profile,
|
||||
merchant_context: domain::MerchantContext,
|
||||
payment_intent: &PaymentIntent,
|
||||
revenue_recovery_payment_data: &storage::revenue_recovery::RevenueRecoveryPaymentData,
|
||||
) -> RecoveryResult<()> {
|
||||
let db = &*state.store;
|
||||
let id = payment_intent.id.clone();
|
||||
let task = revenue_recovery_core::CALCULATE_WORKFLOW;
|
||||
let runner = storage::ProcessTrackerRunner::PassiveRecoveryWorkflow;
|
||||
|
||||
// Construct the process tracker ID for CALCULATE_WORKFLOW
|
||||
let process_tracker_id = format!("{}_{}_{}", runner, task, id.get_string_repr());
|
||||
|
||||
logger::info!(
|
||||
payment_id = %id.get_string_repr(),
|
||||
process_tracker_id = %process_tracker_id,
|
||||
"Attempting to reopen CALCULATE_WORKFLOW on payment failure"
|
||||
);
|
||||
|
||||
// Find the existing CALCULATE_WORKFLOW process tracker
|
||||
let calculate_process = db
|
||||
.find_process_by_id(&process_tracker_id)
|
||||
.await
|
||||
.change_context(errors::RecoveryError::ProcessTrackerFailure)
|
||||
.attach_printable("Failed to find CALCULATE_WORKFLOW process tracker")?;
|
||||
|
||||
match calculate_process {
|
||||
Some(process) => {
|
||||
logger::info!(
|
||||
payment_id = %id.get_string_repr(),
|
||||
process_tracker_id = %process_tracker_id,
|
||||
current_status = %process.business_status,
|
||||
current_retry_count = process.retry_count,
|
||||
"Found existing CALCULATE_WORKFLOW, updating status and retry count"
|
||||
);
|
||||
|
||||
// Update the process tracker to reopen the calculate workflow
|
||||
// 1. Change status from "finish" to "pending"
|
||||
// 2. Increase retry count by 1
|
||||
// 3. Set business status to QUEUED
|
||||
// 4. Schedule for immediate execution
|
||||
let new_retry_count = process.retry_count + 1;
|
||||
let new_schedule_time = common_utils::date_time::now() + time::Duration::hours(1);
|
||||
|
||||
let pt_update = storage::ProcessTrackerUpdate::Update {
|
||||
name: Some(task.to_string()),
|
||||
retry_count: Some(new_retry_count),
|
||||
schedule_time: Some(new_schedule_time),
|
||||
tracking_data: Some(process.clone().tracking_data),
|
||||
business_status: Some(String::from(business_status::PENDING)),
|
||||
status: Some(common_enums::ProcessTrackerStatus::Pending),
|
||||
updated_at: Some(common_utils::date_time::now()),
|
||||
};
|
||||
|
||||
db.update_process(process.clone(), pt_update)
|
||||
.await
|
||||
.change_context(errors::RecoveryError::ProcessTrackerFailure)
|
||||
.attach_printable("Failed to update CALCULATE_WORKFLOW process tracker")?;
|
||||
|
||||
logger::info!(
|
||||
payment_id = %id.get_string_repr(),
|
||||
process_tracker_id = %process_tracker_id,
|
||||
new_retry_count = new_retry_count,
|
||||
new_schedule_time = %new_schedule_time,
|
||||
"Successfully reopened CALCULATE_WORKFLOW with increased retry count"
|
||||
);
|
||||
}
|
||||
None => {
|
||||
logger::info!(
|
||||
payment_id = %id.get_string_repr(),
|
||||
process_tracker_id = %process_tracker_id,
|
||||
"CALCULATE_WORKFLOW process tracker not found, creating new entry"
|
||||
);
|
||||
|
||||
// Create tracking data for the new CALCULATE_WORKFLOW
|
||||
let tracking_data = create_calculate_workflow_tracking_data(
|
||||
payment_intent,
|
||||
revenue_recovery_payment_data,
|
||||
)?;
|
||||
|
||||
// Call the existing perform_calculate_workflow function
|
||||
perform_calculate_workflow(
|
||||
state,
|
||||
process,
|
||||
profile,
|
||||
merchant_context,
|
||||
&tracking_data,
|
||||
revenue_recovery_payment_data,
|
||||
payment_intent,
|
||||
)
|
||||
.await
|
||||
.change_context(errors::RecoveryError::ProcessTrackerFailure)
|
||||
.attach_printable("Failed to perform calculate workflow")?;
|
||||
|
||||
logger::info!(
|
||||
payment_id = %id.get_string_repr(),
|
||||
process_tracker_id = %process_tracker_id,
|
||||
"Successfully created new CALCULATE_WORKFLOW entry using perform_calculate_workflow"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Create tracking data for the CALCULATE_WORKFLOW
|
||||
fn create_calculate_workflow_tracking_data(
|
||||
payment_intent: &PaymentIntent,
|
||||
revenue_recovery_payment_data: &storage::revenue_recovery::RevenueRecoveryPaymentData,
|
||||
) -> RecoveryResult<storage::revenue_recovery::RevenueRecoveryWorkflowTrackingData> {
|
||||
let tracking_data = storage::revenue_recovery::RevenueRecoveryWorkflowTrackingData {
|
||||
merchant_id: revenue_recovery_payment_data
|
||||
.merchant_account
|
||||
.get_id()
|
||||
.clone(),
|
||||
profile_id: revenue_recovery_payment_data.profile.get_id().clone(),
|
||||
global_payment_id: payment_intent.id.clone(),
|
||||
payment_attempt_id: payment_intent
|
||||
.active_attempt_id
|
||||
.clone()
|
||||
.ok_or(storage_impl::errors::RecoveryError::ValueNotFound)?,
|
||||
billing_mca_id: revenue_recovery_payment_data.billing_mca.get_id().clone(),
|
||||
revenue_recovery_retry: revenue_recovery_payment_data.retry_algorithm,
|
||||
invoice_scheduled_time: None, // Will be set by perform_calculate_workflow
|
||||
};
|
||||
|
||||
Ok(tracking_data)
|
||||
}
|
||||
|
||||
// TODO: Move these to impl based functions
|
||||
async fn record_back_to_billing_connector(
|
||||
state: &SessionState,
|
||||
|
||||
@ -7,19 +7,25 @@ use common_utils::{
|
||||
};
|
||||
use diesel_models::process_tracker as storage;
|
||||
use error_stack::{report, ResultExt};
|
||||
use futures::stream::SelectNextSome;
|
||||
use hyperswitch_domain_models::{
|
||||
payments as domain_payments, revenue_recovery, router_data_v2::flow_common_types,
|
||||
router_flow_types, router_request_types::revenue_recovery as revenue_recovery_request,
|
||||
router_response_types::revenue_recovery as revenue_recovery_response, types as router_types,
|
||||
payments as domain_payments,
|
||||
revenue_recovery::{self, RecoveryPaymentIntent},
|
||||
router_data_v2::flow_common_types,
|
||||
router_flow_types,
|
||||
router_request_types::revenue_recovery as revenue_recovery_request,
|
||||
router_response_types::revenue_recovery as revenue_recovery_response,
|
||||
types as router_types,
|
||||
};
|
||||
use hyperswitch_interfaces::webhooks as interface_webhooks;
|
||||
use masking::{PeekInterface, Secret};
|
||||
use router_env::{instrument, logger, tracing};
|
||||
use services::kafka;
|
||||
use storage::business_status;
|
||||
|
||||
use crate::{
|
||||
core::{
|
||||
admin,
|
||||
self, admin,
|
||||
errors::{self, CustomResult},
|
||||
payments::{self, helpers},
|
||||
},
|
||||
@ -232,6 +238,7 @@ async fn handle_monitoring_threshold(
|
||||
}
|
||||
Ok(webhooks::WebhookResponseTracker::NoEffect)
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
async fn handle_schedule_failed_payment(
|
||||
billing_connector_account: &domain::MerchantConnectorAccount,
|
||||
@ -248,6 +255,8 @@ async fn handle_schedule_failed_payment(
|
||||
) -> CustomResult<webhooks::WebhookResponseTracker, errors::RevenueRecoveryError> {
|
||||
let (recovery_attempt_from_payment_attempt, recovery_intent_from_payment_attempt) =
|
||||
payment_attempt_with_recovery_intent;
|
||||
|
||||
// When intent_retry_count is less than or equal to threshold
|
||||
(intent_retry_count <= mca_retry_threshold)
|
||||
.then(|| {
|
||||
logger::error!(
|
||||
@ -258,12 +267,13 @@ async fn handle_schedule_failed_payment(
|
||||
Ok(webhooks::WebhookResponseTracker::NoEffect)
|
||||
})
|
||||
.async_unwrap_or_else(|| async {
|
||||
RevenueRecoveryAttempt::insert_execute_pcr_task(
|
||||
&billing_connector_account.get_id(),
|
||||
&*state.store,
|
||||
merchant_context.get_merchant_account().get_id().to_owned(),
|
||||
recovery_intent_from_payment_attempt.clone(),
|
||||
business_profile.get_id().to_owned(),
|
||||
// Call calculate_job
|
||||
core::revenue_recovery::upsert_calculate_pcr_task(
|
||||
billing_connector_account,
|
||||
state,
|
||||
merchant_context,
|
||||
recovery_intent_from_payment_attempt,
|
||||
business_profile,
|
||||
intent_retry_count,
|
||||
recovery_attempt_from_payment_attempt
|
||||
.as_ref()
|
||||
@ -926,6 +936,7 @@ impl RevenueRecoveryAttempt {
|
||||
profile_id,
|
||||
payment_attempt_id,
|
||||
revenue_recovery_retry,
|
||||
invoice_scheduled_time: Some(schedule_time),
|
||||
};
|
||||
|
||||
let tag = ["PCR"];
|
||||
|
||||
@ -13,7 +13,7 @@ use masking::PeekInterface;
|
||||
use router_env::logger;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::{db::StorageInterface, routes::SessionState, workflows::revenue_recovery};
|
||||
use crate::{db::StorageInterface, routes::SessionState, types, workflows::revenue_recovery};
|
||||
#[derive(serde::Serialize, serde::Deserialize, Debug)]
|
||||
pub struct RevenueRecoveryWorkflowTrackingData {
|
||||
pub merchant_id: id_type::MerchantId,
|
||||
@ -22,6 +22,7 @@ pub struct RevenueRecoveryWorkflowTrackingData {
|
||||
pub payment_attempt_id: id_type::GlobalAttemptId,
|
||||
pub billing_mca_id: id_type::MerchantConnectorAccountId,
|
||||
pub revenue_recovery_retry: enums::RevenueRecoveryAlgorithmType,
|
||||
pub invoice_scheduled_time: Option<time::PrimitiveDateTime>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
|
||||
@ -114,7 +114,7 @@ impl ProcessTrackerWorkflow<SessionState> for ExecutePcrWorkflow {
|
||||
>(
|
||||
state,
|
||||
state.get_req_state(),
|
||||
merchant_context_from_revenue_recovery_payment_data,
|
||||
merchant_context_from_revenue_recovery_payment_data.clone(),
|
||||
revenue_recovery_payment_data.profile.clone(),
|
||||
payments::operations::PaymentGetIntent,
|
||||
request,
|
||||
@ -128,6 +128,8 @@ impl ProcessTrackerWorkflow<SessionState> for ExecutePcrWorkflow {
|
||||
Box::pin(pcr::perform_execute_payment(
|
||||
state,
|
||||
&process,
|
||||
&revenue_recovery_payment_data.profile.clone(),
|
||||
merchant_context_from_revenue_recovery_payment_data.clone(),
|
||||
&tracking_data,
|
||||
&revenue_recovery_payment_data,
|
||||
&payment_data.payment_intent,
|
||||
@ -138,6 +140,8 @@ impl ProcessTrackerWorkflow<SessionState> for ExecutePcrWorkflow {
|
||||
Box::pin(pcr::perform_payments_sync(
|
||||
state,
|
||||
&process,
|
||||
&revenue_recovery_payment_data.profile.clone(),
|
||||
merchant_context_from_revenue_recovery_payment_data.clone(),
|
||||
&tracking_data,
|
||||
&revenue_recovery_payment_data,
|
||||
&payment_data.payment_intent,
|
||||
@ -145,6 +149,18 @@ impl ProcessTrackerWorkflow<SessionState> for ExecutePcrWorkflow {
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
Some("CALCULATE_WORKFLOW") => {
|
||||
Box::pin(pcr::perform_calculate_workflow(
|
||||
state,
|
||||
&process,
|
||||
&revenue_recovery_payment_data.profile.clone(),
|
||||
merchant_context_from_revenue_recovery_payment_data,
|
||||
&tracking_data,
|
||||
&revenue_recovery_payment_data,
|
||||
&payment_data.payment_intent,
|
||||
))
|
||||
.await
|
||||
}
|
||||
|
||||
_ => Err(errors::ProcessTrackerError::JobNotFound),
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user