diff --git a/crates/diesel_models/src/process_tracker.rs b/crates/diesel_models/src/process_tracker.rs index 3aefb48681..962f917788 100644 --- a/crates/diesel_models/src/process_tracker.rs +++ b/crates/diesel_models/src/process_tracker.rs @@ -255,6 +255,9 @@ pub mod business_status { /// This status indicates the completion of a execute task pub const EXECUTE_WORKFLOW_COMPLETE: &str = "COMPLETED_EXECUTE_TASK"; + /// This status indicates the failure of a execute task + pub const EXECUTE_WORKFLOW_FAILURE: &str = "FAILED_EXECUTE_TASK"; + /// This status indicates that the execute task was completed to trigger the psync task pub const EXECUTE_WORKFLOW_COMPLETE_FOR_PSYNC: &str = "COMPLETED_EXECUTE_TASK_TO_TRIGGER_PSYNC"; @@ -276,4 +279,21 @@ pub mod business_status { /// This status indicates the completion of a review task pub const REVIEW_WORKFLOW_COMPLETE: &str = "COMPLETED_REVIEW_TASK"; + + /// For the CALCULATE_WORKFLOW + /// + /// This status indicates an invoice is queued + pub const CALCULATE_WORKFLOW_QUEUED: &str = "CALCULATE_WORKFLOW_QUEUED"; + + /// This status indicates an invoice has been declined due to hard decline + pub const CALCULATE_WORKFLOW_FINISH: &str = "FAILED_DUE_TO_HARD_DECLINE_ERROR"; + + /// This status indicates that the invoice is scheduled with the best available token + pub const CALCULATE_WORKFLOW_SCHEDULED: &str = "CALCULATE_WORKFLOW_SCHEDULED"; + + /// This status indicates the invoice is in payment sync state + pub const CALCULATE_WORKFLOW_PROCESSING: &str = "CALCULATE_WORKFLOW_PROCESSING"; + + /// This status indicates the workflow has completed successfully when the invoice is paid + pub const CALCULATE_WORKFLOW_COMPLETE: &str = "CALCULATE_WORKFLOW_COMPLETE"; } diff --git a/crates/hyperswitch_domain_models/src/payments.rs b/crates/hyperswitch_domain_models/src/payments.rs index 0a63c03678..1a8224b814 100644 --- a/crates/hyperswitch_domain_models/src/payments.rs +++ b/crates/hyperswitch_domain_models/src/payments.rs @@ -518,6 +518,26 @@ pub struct PaymentIntent { #[cfg(feature = "v2")] impl PaymentIntent { + /// Extract customer_id from payment intent feature metadata + pub fn extract_connector_customer_id_from_payment_intent( + &self, + ) -> Result { + self.feature_metadata + .as_ref() + .and_then(|metadata| metadata.payment_revenue_recovery_metadata.as_ref()) + .map(|recovery| { + recovery + .billing_connector_payment_details + .connector_customer_id + .clone() + }) + .ok_or( + common_utils::errors::ValidationError::MissingRequiredField { + field_name: "connector_customer_id".to_string(), + }, + ) + } + fn get_payment_method_sub_type(&self) -> Option { self.feature_metadata .as_ref() diff --git a/crates/router/src/core/errors.rs b/crates/router/src/core/errors.rs index bc8397b416..bf5330a932 100644 --- a/crates/router/src/core/errors.rs +++ b/crates/router/src/core/errors.rs @@ -486,6 +486,8 @@ pub enum RevenueRecoveryError { BillingConnectorPaymentsSyncFailed, #[error("Billing connector invoice sync call failed")] BillingConnectorInvoiceSyncFailed, + #[error("Failed to fetch connector customer ID")] + CustomerIdNotFound, #[error("Failed to get the retry count for payment intent")] RetryCountFetchFailed, #[error("Failed to get the billing threshold retry count")] diff --git a/crates/router/src/core/revenue_recovery.rs b/crates/router/src/core/revenue_recovery.rs index 235d57f96d..a2a90f1e9e 100644 --- a/crates/router/src/core/revenue_recovery.rs +++ b/crates/router/src/core/revenue_recovery.rs @@ -1,35 +1,154 @@ pub mod api; pub mod transformers; pub mod types; -use api_models::{enums, process_tracker::revenue_recovery}; +use api_models::{enums, process_tracker::revenue_recovery, webhooks}; use common_utils::{ self, + errors::CustomResult, ext_traits::{OptionExt, ValueExt}, id_type, }; use diesel_models::{enums as diesel_enum, process_tracker::business_status}; use error_stack::{self, ResultExt}; -use hyperswitch_domain_models::{payments::PaymentIntent, ApiModelToDieselModelConvertor}; +use hyperswitch_domain_models::{ + payments::PaymentIntent, revenue_recovery as domain_revenue_recovery, + ApiModelToDieselModelConvertor, +}; use scheduler::errors as sch_errors; use crate::{ core::errors::{self, RouterResponse, RouterResult, StorageErrorExt}, db::StorageInterface, logger, - routes::{metrics, SessionState}, + routes::{app::ReqState, metrics, SessionState}, services::ApplicationResponse, types::{ + domain, storage::{self, revenue_recovery as pcr}, transformers::ForeignInto, }, + workflows, }; pub const EXECUTE_WORKFLOW: &str = "EXECUTE_WORKFLOW"; pub const PSYNC_WORKFLOW: &str = "PSYNC_WORKFLOW"; +pub const CALCULATE_WORKFLOW: &str = "CALCULATE_WORKFLOW"; + +#[allow(clippy::too_many_arguments)] +pub async fn upsert_calculate_pcr_task( + billing_connector_account: &domain::MerchantConnectorAccount, + state: &SessionState, + merchant_context: &domain::MerchantContext, + recovery_intent_from_payment_intent: &domain_revenue_recovery::RecoveryPaymentIntent, + business_profile: &domain::Profile, + intent_retry_count: u16, + payment_attempt_id: Option, + runner: storage::ProcessTrackerRunner, + revenue_recovery_retry: diesel_enum::RevenueRecoveryAlgorithmType, +) -> CustomResult { + router_env::logger::info!("Starting calculate_job..."); + + let task = "CALCULATE_WORKFLOW"; + + let db = &*state.store; + let payment_id = &recovery_intent_from_payment_intent.payment_id; + + // Create process tracker ID in the format: CALCULATE_WORKFLOW_{payment_intent_id} + let process_tracker_id = format!("{runner}_{task}_{}", payment_id.get_string_repr()); + + // Set scheduled time to 1 hour from now + let schedule_time = common_utils::date_time::now() + time::Duration::hours(1); + + let payment_attempt_id = payment_attempt_id + .ok_or(error_stack::report!( + errors::RevenueRecoveryError::PaymentAttemptIdNotFound + )) + .attach_printable("payment attempt id is required for calculate workflow tracking")?; + + // Check if a process tracker entry already exists for this payment intent + let existing_entry = db + .as_scheduler() + .find_process_by_id(&process_tracker_id) + .await + .change_context(errors::RevenueRecoveryError::ProcessTrackerResponseError) + .attach_printable( + "Failed to check for existing calculate workflow process tracker entry", + )?; + + match existing_entry { + Some(existing_process) => { + router_env::logger::error!( + "Found existing CALCULATE_WORKFLOW task with id: {}", + existing_process.id + ); + } + None => { + // No entry exists - create a new one + router_env::logger::info!( + "No existing CALCULATE_WORKFLOW task found for payment_intent_id: {}, creating new entry scheduled for 1 hour from now", + payment_id.get_string_repr() + ); + + // Create tracking data + let calculate_workflow_tracking_data = pcr::RevenueRecoveryWorkflowTrackingData { + billing_mca_id: billing_connector_account.get_id(), + global_payment_id: payment_id.clone(), + merchant_id: merchant_context.get_merchant_account().get_id().to_owned(), + profile_id: business_profile.get_id().to_owned(), + payment_attempt_id, + revenue_recovery_retry, + invoice_scheduled_time: None, + }; + + let tag = ["PCR"]; + let task = "CALCULATE_WORKFLOW"; + let runner = storage::ProcessTrackerRunner::PassiveRecoveryWorkflow; + + let process_tracker_entry = storage::ProcessTrackerNew::new( + process_tracker_id, + task, + runner, + tag, + calculate_workflow_tracking_data, + Some(1), + schedule_time, + common_types::consts::API_VERSION, + ) + .change_context(errors::RevenueRecoveryError::ProcessTrackerCreationError) + .attach_printable("Failed to construct calculate workflow process tracker entry")?; + + // Insert into process tracker with status New + db.as_scheduler() + .insert_process(process_tracker_entry) + .await + .change_context(errors::RevenueRecoveryError::ProcessTrackerResponseError) + .attach_printable( + "Failed to enter calculate workflow process_tracker_entry in DB", + )?; + + router_env::logger::info!( + "Successfully created new CALCULATE_WORKFLOW task for payment_intent_id: {}", + payment_id.get_string_repr() + ); + + metrics::TASKS_ADDED_COUNT.add( + 1, + router_env::metric_attributes!(("flow", "CalculateWorkflow")), + ); + } + } + + Ok(webhooks::WebhookResponseTracker::Payment { + payment_id: payment_id.clone(), + status: recovery_intent_from_payment_intent.status, + }) +} pub async fn perform_execute_payment( state: &SessionState, execute_task_process: &storage::ProcessTracker, + profile: &domain::Profile, + merchant_context: domain::MerchantContext, tracking_data: &pcr::RevenueRecoveryWorkflowTrackingData, revenue_recovery_payment_data: &pcr::RevenueRecoveryPaymentData, payment_intent: &PaymentIntent, @@ -74,6 +193,8 @@ pub async fn perform_execute_payment( revenue_recovery_payment_data.merchant_account.get_id(), payment_intent, execute_task_process, + profile, + merchant_context, revenue_recovery_payment_data, &revenue_recovery_metadata, )) @@ -218,6 +339,7 @@ async fn insert_psync_pcr_task_to_pt( profile_id, payment_attempt_id, revenue_recovery_retry, + invoice_scheduled_time: Some(schedule_time), }; let tag = ["REVENUE_RECOVERY"]; let process_tracker_entry = storage::ProcessTrackerNew::new( @@ -249,6 +371,8 @@ async fn insert_psync_pcr_task_to_pt( pub async fn perform_payments_sync( state: &SessionState, process: &storage::ProcessTracker, + profile: &domain::Profile, + merchant_context: domain::MerchantContext, tracking_data: &pcr::RevenueRecoveryWorkflowTrackingData, revenue_recovery_payment_data: &pcr::RevenueRecoveryPaymentData, payment_intent: &PaymentIntent, @@ -274,6 +398,8 @@ pub async fn perform_payments_sync( state, payment_intent, process.clone(), + profile, + merchant_context, revenue_recovery_payment_data, payment_attempt, &mut revenue_recovery_metadata, @@ -284,6 +410,416 @@ pub async fn perform_payments_sync( Ok(()) } +pub async fn perform_calculate_workflow( + state: &SessionState, + process: &storage::ProcessTracker, + profile: &domain::Profile, + merchant_context: domain::MerchantContext, + tracking_data: &pcr::RevenueRecoveryWorkflowTrackingData, + revenue_recovery_payment_data: &pcr::RevenueRecoveryPaymentData, + payment_intent: &PaymentIntent, +) -> Result<(), sch_errors::ProcessTrackerError> { + let db = &*state.store; + let merchant_id = revenue_recovery_payment_data.merchant_account.get_id(); + let profile_id = revenue_recovery_payment_data.profile.get_id(); + let billing_mca_id = revenue_recovery_payment_data.billing_mca.get_id(); + + logger::info!( + process_id = %process.id, + payment_id = %tracking_data.global_payment_id.get_string_repr(), + "Starting CALCULATE_WORKFLOW..." + ); + + // 1. Extract connector_customer_id and token_list from tracking_data + let connector_customer_id = payment_intent + .extract_connector_customer_id_from_payment_intent() + .change_context(errors::RecoveryError::ValueNotFound) + .attach_printable("Failed to extract customer ID from payment intent")?; + + let merchant_context_from_revenue_recovery_payment_data = + domain::MerchantContext::NormalMerchant(Box::new(domain::Context( + revenue_recovery_payment_data.merchant_account.clone(), + revenue_recovery_payment_data.key_store.clone(), + ))); + + let retry_algorithm_type = match profile + .revenue_recovery_retry_algorithm_type + .filter(|retry_type| + *retry_type != common_enums::RevenueRecoveryAlgorithmType::Monitoring) // ignore Monitoring in profile + .unwrap_or(tracking_data.revenue_recovery_retry) // fallback to tracking_data + { + common_enums::RevenueRecoveryAlgorithmType::Smart => common_enums::RevenueRecoveryAlgorithmType::Smart, + common_enums::RevenueRecoveryAlgorithmType::Cascading => common_enums::RevenueRecoveryAlgorithmType::Cascading, + common_enums::RevenueRecoveryAlgorithmType::Monitoring => { + return Err(sch_errors::ProcessTrackerError::ProcessUpdateFailed); + } + }; + + // 2. Get best available token + let best_time_to_schedule = match workflows::revenue_recovery::get_token_with_schedule_time_based_on_retry_alogrithm_type( + state, + &connector_customer_id, + payment_intent, + retry_algorithm_type, + process.retry_count, + ) + .await + { + Ok(token_opt) => token_opt, + Err(e) => { + logger::error!( + error = ?e, + connector_customer_id = %connector_customer_id, + "Failed to get best PSP token" + ); + None + } + }; + + match best_time_to_schedule { + Some(scheduled_time) => { + logger::info!( + process_id = %process.id, + connector_customer_id = %connector_customer_id, + "Found best available token, creating EXECUTE_WORKFLOW task" + ); + + // 3. If token found: create EXECUTE_WORKFLOW task and finish CALCULATE_WORKFLOW + insert_execute_pcr_task_to_pt( + &tracking_data.billing_mca_id, + state, + &tracking_data.merchant_id, + payment_intent, + &tracking_data.profile_id, + &tracking_data.payment_attempt_id, + storage::ProcessTrackerRunner::PassiveRecoveryWorkflow, + tracking_data.revenue_recovery_retry, + scheduled_time, + ) + .await?; + + db.as_scheduler() + .finish_process_with_business_status( + process.clone(), + business_status::CALCULATE_WORKFLOW_SCHEDULED, + ) + .await + .map_err(|e| { + logger::error!( + process_id = %process.id, + error = ?e, + "Failed to update CALCULATE_WORKFLOW status to complete" + ); + sch_errors::ProcessTrackerError::ProcessUpdateFailed + })?; + + logger::info!( + process_id = %process.id, + connector_customer_id = %connector_customer_id, + "CALCULATE_WORKFLOW completed successfully" + ); + } + + None => { + let scheduled_token = match storage::revenue_recovery_redis_operation:: + RedisTokenManager::get_payment_processor_token_with_schedule_time(state, &connector_customer_id) + .await { + Ok(scheduled_token_opt) => scheduled_token_opt, + Err(e) => { + logger::error!( + error = ?e, + connector_customer_id = %connector_customer_id, + "Failed to get PSP token status" + ); + None + } + }; + + match scheduled_token { + Some(scheduled_token) => { + // Update scheduled time to scheduled time + 15 minutes + // here scheduled_time is the wait time 15 minutes is a buffer time that we are adding + logger::info!( + process_id = %process.id, + connector_customer_id = %connector_customer_id, + "No token but time available, rescheduling for scheduled time + 15 mins" + ); + + update_calculate_job_schedule_time( + db, + process, + time::Duration::minutes(15), + scheduled_token.scheduled_at, + &connector_customer_id, + ) + .await?; + } + None => { + let hard_decline_flag = storage::revenue_recovery_redis_operation:: + RedisTokenManager::are_all_tokens_hard_declined( + state, + &connector_customer_id + ) + .await + .ok() + .unwrap_or(false); + + match hard_decline_flag { + false => { + logger::info!( + process_id = %process.id, + connector_customer_id = %connector_customer_id, + "Hard decline flag is false, rescheduling for scheduled time + 15 mins" + ); + + update_calculate_job_schedule_time( + db, + process, + time::Duration::minutes(15), + Some(common_utils::date_time::now()), + &connector_customer_id, + ) + .await?; + } + true => { + // Finish calculate workflow with CALCULATE_WORKFLOW_FINISH + logger::info!( + process_id = %process.id, + connector_customer_id = %connector_customer_id, + "No token available, finishing CALCULATE_WORKFLOW" + ); + + db.as_scheduler() + .finish_process_with_business_status( + process.clone(), + business_status::CALCULATE_WORKFLOW_FINISH, + ) + .await + .map_err(|e| { + logger::error!( + process_id = %process.id, + error = ?e, + "Failed to finish CALCULATE_WORKFLOW" + ); + sch_errors::ProcessTrackerError::ProcessUpdateFailed + })?; + + logger::info!( + process_id = %process.id, + connector_customer_id = %connector_customer_id, + "CALCULATE_WORKFLOW finished successfully" + ); + } + } + } + } + } + } + Ok(()) +} + +/// Update the schedule time for a CALCULATE_WORKFLOW process tracker +async fn update_calculate_job_schedule_time( + db: &dyn StorageInterface, + process: &storage::ProcessTracker, + additional_time: time::Duration, + base_time: Option, + connector_customer_id: &str, +) -> Result<(), sch_errors::ProcessTrackerError> { + let new_schedule_time = + base_time.unwrap_or_else(common_utils::date_time::now) + additional_time; + + let pt_update = storage::ProcessTrackerUpdate::Update { + name: Some("CALCULATE_WORKFLOW".to_string()), + retry_count: Some(process.clone().retry_count), + schedule_time: Some(new_schedule_time), + tracking_data: Some(process.clone().tracking_data), + business_status: Some(String::from(business_status::PENDING)), + status: Some(common_enums::ProcessTrackerStatus::Pending), + updated_at: Some(common_utils::date_time::now()), + }; + + db.as_scheduler() + .update_process(process.clone(), pt_update) + .await + .map_err(|e| { + logger::error!( + process_id = %process.id, + error = ?e, + "Failed to reschedule CALCULATE_WORKFLOW" + ); + sch_errors::ProcessTrackerError::ProcessUpdateFailed + })?; + + logger::info!( + process_id = %process.id, + connector_customer_id = %connector_customer_id, + new_schedule_time = %new_schedule_time, + additional_time = ?additional_time, + "CALCULATE_WORKFLOW rescheduled successfully" + ); + + Ok(()) +} + +/// Insert Execute PCR Task to Process Tracker +#[allow(clippy::too_many_arguments)] +async fn insert_execute_pcr_task_to_pt( + billing_mca_id: &id_type::MerchantConnectorAccountId, + state: &SessionState, + merchant_id: &id_type::MerchantId, + payment_intent: &PaymentIntent, + profile_id: &id_type::ProfileId, + payment_attempt_id: &id_type::GlobalAttemptId, + runner: storage::ProcessTrackerRunner, + revenue_recovery_retry: diesel_enum::RevenueRecoveryAlgorithmType, + schedule_time: time::PrimitiveDateTime, +) -> Result { + let task = "EXECUTE_WORKFLOW"; + + let payment_id = payment_intent.id.clone(); + + let process_tracker_id = format!("{runner}_{task}_{}", payment_id.get_string_repr()); + + // Check if a process tracker entry already exists for this payment intent + let existing_entry = state + .store + .find_process_by_id(&process_tracker_id) + .await + .map_err(|e| { + logger::error!( + payment_id = %payment_id.get_string_repr(), + process_tracker_id = %process_tracker_id, + error = ?e, + "Failed to check for existing execute workflow process tracker entry" + ); + sch_errors::ProcessTrackerError::ProcessUpdateFailed + })?; + + match existing_entry { + Some(existing_process) + if existing_process.business_status == business_status::EXECUTE_WORKFLOW_FAILURE + || existing_process.business_status + == business_status::EXECUTE_WORKFLOW_COMPLETE_FOR_PSYNC => + { + // Entry exists with EXECUTE_WORKFLOW_COMPLETE status - update it + logger::info!( + payment_id = %payment_id.get_string_repr(), + process_tracker_id = %process_tracker_id, + current_retry_count = %existing_process.retry_count, + "Found existing EXECUTE_WORKFLOW task with COMPLETE status, updating to PENDING with incremented retry count" + ); + + let pt_update = storage::ProcessTrackerUpdate::Update { + name: Some(task.to_string()), + retry_count: Some(existing_process.clone().retry_count + 1), + schedule_time: Some(schedule_time), + tracking_data: Some(existing_process.clone().tracking_data), + business_status: Some(String::from(business_status::PENDING)), + status: Some(enums::ProcessTrackerStatus::Pending), + updated_at: Some(common_utils::date_time::now()), + }; + + let updated_process = state + .store + .update_process(existing_process, pt_update) + .await + .map_err(|e| { + logger::error!( + payment_id = %payment_id.get_string_repr(), + process_tracker_id = %process_tracker_id, + error = ?e, + "Failed to update existing execute workflow process tracker entry" + ); + sch_errors::ProcessTrackerError::ProcessUpdateFailed + })?; + + logger::info!( + payment_id = %payment_id.get_string_repr(), + process_tracker_id = %process_tracker_id, + new_retry_count = %updated_process.retry_count, + "Successfully updated existing EXECUTE_WORKFLOW task" + ); + + Ok(updated_process) + } + Some(existing_process) => { + // Entry exists but business status is not EXECUTE_WORKFLOW_COMPLETE + logger::info!( + payment_id = %payment_id.get_string_repr(), + process_tracker_id = %process_tracker_id, + current_business_status = %existing_process.business_status, + ); + + Ok(existing_process) + } + None => { + // No entry exists - create a new one + logger::info!( + payment_id = %payment_id.get_string_repr(), + process_tracker_id = %process_tracker_id, + "No existing EXECUTE_WORKFLOW task found, creating new entry" + ); + + let execute_workflow_tracking_data = pcr::RevenueRecoveryWorkflowTrackingData { + billing_mca_id: billing_mca_id.clone(), + global_payment_id: payment_id.clone(), + merchant_id: merchant_id.clone(), + profile_id: profile_id.clone(), + payment_attempt_id: payment_attempt_id.clone(), + revenue_recovery_retry, + invoice_scheduled_time: Some(schedule_time), + }; + + let tag = ["PCR"]; + let process_tracker_entry = storage::ProcessTrackerNew::new( + process_tracker_id.clone(), + task, + runner, + tag, + execute_workflow_tracking_data, + Some(1), + schedule_time, + common_types::consts::API_VERSION, + ) + .map_err(|e| { + logger::error!( + payment_id = %payment_id.get_string_repr(), + error = ?e, + "Failed to construct execute workflow process tracker entry" + ); + sch_errors::ProcessTrackerError::ProcessUpdateFailed + })?; + + let response = state + .store + .insert_process(process_tracker_entry) + .await + .map_err(|e| { + logger::error!( + payment_id = %payment_id.get_string_repr(), + error = ?e, + "Failed to insert execute workflow process tracker entry" + ); + sch_errors::ProcessTrackerError::ProcessUpdateFailed + })?; + + metrics::TASKS_ADDED_COUNT.add( + 1, + router_env::metric_attributes!(("flow", "RevenueRecoveryExecute")), + ); + + logger::info!( + payment_id = %payment_id.get_string_repr(), + process_tracker_id = %response.id, + "Successfully created new EXECUTE_WORKFLOW task" + ); + + Ok(response) + } + } +} + pub async fn retrieve_revenue_recovery_process_tracker( state: SessionState, id: id_type::GlobalPaymentId, diff --git a/crates/router/src/core/revenue_recovery/types.rs b/crates/router/src/core/revenue_recovery/types.rs index 1bccb5134b..e3401baa40 100644 --- a/crates/router/src/core/revenue_recovery/types.rs +++ b/crates/router/src/core/revenue_recovery/types.rs @@ -31,11 +31,12 @@ use hyperswitch_domain_models::{ }; use time::PrimitiveDateTime; +use super::errors::StorageErrorExt; use crate::{ core::{ errors::{self, RouterResult}, payments::{self, helpers, operations::Operation}, - revenue_recovery::{self as revenue_recovery_core}, + revenue_recovery::{self as revenue_recovery_core, perform_calculate_workflow}, webhooks::recovery_incoming as recovery_incoming_flow, }, db::StorageInterface, @@ -43,9 +44,13 @@ use crate::{ routes::SessionState, services::{self, connector_integration_interface::RouterDataConversion}, types::{ - self, api as api_types, api::payments as payments_types, storage, transformers::ForeignInto, + self, api as api_types, api::payments as payments_types, domain, storage, + transformers::ForeignInto, + }, + workflows::{ + payment_sync, + revenue_recovery::{self, get_schedule_time_to_retry_mit_payments}, }, - workflows::{payment_sync, revenue_recovery::get_schedule_time_to_retry_mit_payments}, }; type RecoveryResult = error_stack::Result; @@ -93,15 +98,23 @@ impl RevenueRecoveryPaymentsAttemptStatus { Ok(()) } + #[allow(clippy::too_many_arguments)] pub(crate) async fn update_pt_status_based_on_attempt_status_for_payments_sync( &self, state: &SessionState, payment_intent: &PaymentIntent, process_tracker: storage::ProcessTracker, + profile: &domain::Profile, + merchant_context: domain::MerchantContext, revenue_recovery_payment_data: &storage::revenue_recovery::RevenueRecoveryPaymentData, payment_attempt: PaymentAttempt, revenue_recovery_metadata: &mut PaymentRevenueRecoveryMetadata, ) -> Result<(), errors::ProcessTrackerError> { + let connector_customer_id = payment_intent + .extract_connector_customer_id_from_payment_intent() + .change_context(errors::RecoveryError::ValueNotFound) + .attach_printable("Failed to extract customer ID from payment intent")?; + let db = &*state.store; let recovery_payment_intent = @@ -143,6 +156,26 @@ impl RevenueRecoveryPaymentsAttemptStatus { ); }; + let is_hard_decline = revenue_recovery::check_hard_decline(state, &payment_attempt) + .await + .ok(); + + // update the status of token in redis + let _update_error_code = storage::revenue_recovery_redis_operation::RedisTokenManager::update_payment_processor_token_error_code_from_process_tracker( + state, + &connector_customer_id, + &None, + &is_hard_decline + ) + .await; + + // unlocking the token + let _unlock_the_connector_customer_id = storage::revenue_recovery_redis_operation::RedisTokenManager::unlock_connector_customer_status( + state, + &connector_customer_id, + ) + .await; + // Record a successful transaction back to Billing Connector // TODO: Add support for retrying failed outgoing recordback webhooks record_back_to_billing_connector( @@ -175,26 +208,38 @@ impl RevenueRecoveryPaymentsAttemptStatus { ); }; - // get a reschedule time - let schedule_time = get_schedule_time_to_retry_mit_payments( - db, - &revenue_recovery_payment_data - .merchant_account - .get_id() - .clone(), - process_tracker.retry_count + 1, + let error_code = recovery_payment_attempt.error_code; + + let is_hard_decline = revenue_recovery::check_hard_decline(state, &payment_attempt) + .await + .ok(); + + // update the status of token in redis + let _update_error_code = storage::revenue_recovery_redis_operation::RedisTokenManager::update_payment_processor_token_error_code_from_process_tracker( + state, + &connector_customer_id, + &error_code, + &is_hard_decline ) .await; - // check if retry is possible - if let Some(schedule_time) = schedule_time { - // schedule a retry - // TODO: Update connecter called field and active attempt + // unlocking the token + let _unlock_the_connector_customer_id = storage::revenue_recovery_redis_operation::RedisTokenManager::unlock_connector_customer_status( + state, + &connector_customer_id, + ) + .await; - db.as_scheduler() - .retry_process(process_tracker.clone(), schedule_time) - .await?; - } + // Reopen calculate workflow on payment failure + reopen_calculate_workflow_on_payment_failure( + state, + &process_tracker, + profile, + merchant_context, + payment_intent, + revenue_recovery_payment_data, + ) + .await?; } Self::Processing => { // do a psync payment @@ -203,6 +248,8 @@ impl RevenueRecoveryPaymentsAttemptStatus { revenue_recovery_payment_data, payment_intent, &process_tracker, + profile, + merchant_context, payment_attempt, )) .await?; @@ -307,105 +354,227 @@ pub enum Action { ManualReviewAction, } impl Action { + #[allow(clippy::too_many_arguments)] pub async fn execute_payment( state: &SessionState, merchant_id: &id_type::MerchantId, payment_intent: &PaymentIntent, process: &storage::ProcessTracker, + profile: &domain::Profile, + merchant_context: domain::MerchantContext, revenue_recovery_payment_data: &storage::revenue_recovery::RevenueRecoveryPaymentData, revenue_recovery_metadata: &PaymentRevenueRecoveryMetadata, ) -> RecoveryResult { - let response = revenue_recovery_core::api::call_proxy_api( - state, - payment_intent, - revenue_recovery_payment_data, - revenue_recovery_metadata, - ) - .await; - let recovery_payment_intent = - hyperswitch_domain_models::revenue_recovery::RecoveryPaymentIntent::from( - payment_intent, - ); + let connector_customer_id = payment_intent + .extract_connector_customer_id_from_payment_intent() + .change_context(errors::RecoveryError::ValueNotFound) + .attach_printable("Failed to extract customer ID from payment intent")?; - // handle proxy api's response - match response { - Ok(payment_data) => match payment_data.payment_attempt.status.foreign_into() { - RevenueRecoveryPaymentsAttemptStatus::Succeeded => { - let recovery_payment_attempt = - hyperswitch_domain_models::revenue_recovery::RecoveryPaymentAttempt::from( - &payment_data.payment_attempt, - ); + let scheduled_token = match storage::revenue_recovery_redis_operation:: + RedisTokenManager::get_payment_processor_token_with_schedule_time(state, &connector_customer_id) + .await { + Ok(scheduled_token_opt) => scheduled_token_opt, + Err(e) => { + logger::error!( + error = ?e, + connector_customer_id = %connector_customer_id, + "Failed to get PSP token status" + ); + None + } + }; - let recovery_payment_tuple = recovery_incoming_flow::RecoveryPaymentTuple::new( - &recovery_payment_intent, - &recovery_payment_attempt, - ); - - // publish events to kafka - if let Err(e) = recovery_incoming_flow::RecoveryPaymentTuple::publish_revenue_recovery_event_to_kafka( + match scheduled_token { + Some(scheduled_token) => { + let response = revenue_recovery_core::api::call_proxy_api( state, - &recovery_payment_tuple, - Some(process.retry_count+1) + payment_intent, + revenue_recovery_payment_data, + revenue_recovery_metadata, ) - .await{ - router_env::logger::error!( - "Failed to publish revenue recovery event to kafka: {:?}", - e - ); - }; + .await; - Ok(Self::SuccessfulPayment( - payment_data.payment_attempt.clone(), - )) - } - RevenueRecoveryPaymentsAttemptStatus::Failed => { - let recovery_payment_attempt = - hyperswitch_domain_models::revenue_recovery::RecoveryPaymentAttempt::from( - &payment_data.payment_attempt, - ); - - let recovery_payment_tuple = recovery_incoming_flow::RecoveryPaymentTuple::new( - &recovery_payment_intent, - &recovery_payment_attempt, - ); - - // publish events to kafka - if let Err(e) = recovery_incoming_flow::RecoveryPaymentTuple::publish_revenue_recovery_event_to_kafka( - state, - &recovery_payment_tuple, - Some(process.retry_count+1) - ) - .await{ - router_env::logger::error!( - "Failed to publish revenue recovery event to kafka: {:?}", - e - ); - }; - - Self::decide_retry_failure_action( - state, - merchant_id, - process.clone(), - revenue_recovery_payment_data, - &payment_data.payment_attempt, + let recovery_payment_intent = + hyperswitch_domain_models::revenue_recovery::RecoveryPaymentIntent::from( payment_intent, + ); + + // handle proxy api's response + match response { + Ok(payment_data) => match payment_data.payment_attempt.status.foreign_into() { + RevenueRecoveryPaymentsAttemptStatus::Succeeded => { + let recovery_payment_attempt = + hyperswitch_domain_models::revenue_recovery::RecoveryPaymentAttempt::from( + &payment_data.payment_attempt, + ); + + let recovery_payment_tuple = + recovery_incoming_flow::RecoveryPaymentTuple::new( + &recovery_payment_intent, + &recovery_payment_attempt, + ); + + // publish events to kafka + if let Err(e) = recovery_incoming_flow::RecoveryPaymentTuple::publish_revenue_recovery_event_to_kafka( + state, + &recovery_payment_tuple, + Some(process.retry_count+1) + ) + .await{ + router_env::logger::error!( + "Failed to publish revenue recovery event to kafka: {:?}", + e + ); + }; + + let is_hard_decline = revenue_recovery::check_hard_decline( + state, + &payment_data.payment_attempt, + ) + .await + .ok(); + + // update the status of token in redis + let _update_error_code = storage::revenue_recovery_redis_operation::RedisTokenManager::update_payment_processor_token_error_code_from_process_tracker( + state, + &connector_customer_id, + &None, + &is_hard_decline + ) + .await; + + // unlocking the token + let _unlock_the_connector_customer_id = storage::revenue_recovery_redis_operation::RedisTokenManager::unlock_connector_customer_status( + state, + &connector_customer_id, + ) + .await; + + Ok(Self::SuccessfulPayment( + payment_data.payment_attempt.clone(), + )) + } + RevenueRecoveryPaymentsAttemptStatus::Failed => { + let recovery_payment_attempt = + hyperswitch_domain_models::revenue_recovery::RecoveryPaymentAttempt::from( + &payment_data.payment_attempt, + ); + + let recovery_payment_tuple = + recovery_incoming_flow::RecoveryPaymentTuple::new( + &recovery_payment_intent, + &recovery_payment_attempt, + ); + + // publish events to kafka + if let Err(e) = recovery_incoming_flow::RecoveryPaymentTuple::publish_revenue_recovery_event_to_kafka( + state, + &recovery_payment_tuple, + Some(process.retry_count+1) + ) + .await{ + router_env::logger::error!( + "Failed to publish revenue recovery event to kafka: {:?}", + e + ); + }; + + let error_code = payment_data + .payment_attempt + .clone() + .error + .map(|error| error.code); + + let is_hard_decline = revenue_recovery::check_hard_decline( + state, + &payment_data.payment_attempt, + ) + .await + .ok(); + + let _update_connector_customer_id = storage::revenue_recovery_redis_operation::RedisTokenManager::update_payment_processor_token_error_code_from_process_tracker( + state, + &connector_customer_id, + &error_code, + &is_hard_decline + ) + .await; + + // unlocking the token + let _unlock_connector_customer_id = storage::revenue_recovery_redis_operation::RedisTokenManager::unlock_connector_customer_status( + state, + &connector_customer_id, + ) + .await; + + // Reopen calculate workflow on payment failure + reopen_calculate_workflow_on_payment_failure( + state, + process, + profile, + merchant_context, + payment_intent, + revenue_recovery_payment_data, + ) + .await?; + + // Return terminal failure to finish the current execute workflow + Ok(Self::TerminalFailure(payment_data.payment_attempt.clone())) + } + + RevenueRecoveryPaymentsAttemptStatus::Processing => { + Ok(Self::SyncPayment(payment_data.payment_attempt.clone())) + } + RevenueRecoveryPaymentsAttemptStatus::InvalidStatus(action) => { + logger::info!(?action, "Invalid Payment Status For PCR Payment"); + Ok(Self::ManualReviewAction) + } + }, + Err(err) => + // check for an active attempt being constructed or not + { + logger::error!(execute_payment_res=?err); + Ok(Self::ReviewPayment) + } + } + } + None => { + let response = revenue_recovery_core::api::call_psync_api( + state, + payment_intent.get_id(), + revenue_recovery_payment_data, + ) + .await; + + let payment_status_data = response + .change_context(errors::RecoveryError::PaymentCallFailed) + .attach_printable("Error while executing the Psync call")?; + + let payment_attempt = payment_status_data.payment_attempt; + + logger::info!( + process_id = %process.id, + connector_customer_id = %connector_customer_id, + "No token available, finishing CALCULATE_WORKFLOW" + ); + + state + .store + .as_scheduler() + .finish_process_with_business_status( + process.clone(), + business_status::CALCULATE_WORKFLOW_FINISH, ) .await - } + .change_context(errors::RecoveryError::ProcessTrackerFailure) + .attach_printable("Failed to finish CALCULATE_WORKFLOW")?; - RevenueRecoveryPaymentsAttemptStatus::Processing => { - Ok(Self::SyncPayment(payment_data.payment_attempt.clone())) - } - RevenueRecoveryPaymentsAttemptStatus::InvalidStatus(action) => { - logger::info!(?action, "Invalid Payment Status For PCR Payment"); - Ok(Self::ManualReviewAction) - } - }, - Err(err) => - // check for an active attempt being constructed or not - { - logger::error!(execute_payment_res=?err); - Ok(Self::ReviewPayment) + logger::info!( + process_id = %process.id, + connector_customer_id = %connector_customer_id, + "CALCULATE_WORKFLOW finished successfully" + ); + Ok(Self::TerminalFailure(payment_attempt.clone())) } } } @@ -486,16 +655,45 @@ impl Action { Ok(()) } Self::TerminalFailure(payment_attempt) => { + // update the connector payment transmission field to Unsuccessful and unset active attempt id + revenue_recovery_metadata.set_payment_transmission_field_for_api_request( + enums::PaymentConnectorTransmission::ConnectorCallUnsuccessful, + ); + + let payment_update_req = + PaymentsUpdateIntentRequest::update_feature_metadata_and_active_attempt_with_api( + payment_intent + .feature_metadata + .clone() + .unwrap_or_default() + .convert_back() + .set_payment_revenue_recovery_metadata_using_api( + revenue_recovery_metadata.clone(), + ), + api_enums::UpdateActiveAttempt::Unset, + ); + logger::info!( + "Call made to payments update intent api , with the request body {:?}", + payment_update_req + ); + revenue_recovery_core::api::update_payment_intent_api( + state, + payment_intent.id.clone(), + revenue_recovery_payment_data, + payment_update_req, + ) + .await + .change_context(errors::RecoveryError::PaymentCallFailed)?; + db.as_scheduler() .finish_process_with_business_status( execute_task_process.clone(), - business_status::EXECUTE_WORKFLOW_COMPLETE, + business_status::EXECUTE_WORKFLOW_FAILURE, ) .await .change_context(errors::RecoveryError::ProcessTrackerFailure) .attach_printable("Failed to update the process tracker")?; // TODO: Add support for retrying failed outgoing recordback webhooks - Ok(()) } Self::SuccessfulPayment(payment_attempt) => { @@ -551,6 +749,8 @@ impl Action { revenue_recovery_payment_data: &storage::revenue_recovery::RevenueRecoveryPaymentData, payment_intent: &PaymentIntent, process: &storage::ProcessTracker, + profile: &domain::Profile, + merchant_context: domain::MerchantContext, payment_attempt: PaymentAttempt, ) -> RecoveryResult { let response = revenue_recovery_core::api::call_psync_api( @@ -563,18 +763,74 @@ impl Action { match response { Ok(_payment_data) => match payment_attempt.status.foreign_into() { RevenueRecoveryPaymentsAttemptStatus::Succeeded => { + let connector_customer_id = payment_intent + .extract_connector_customer_id_from_payment_intent() + .change_context(errors::RecoveryError::ValueNotFound) + .attach_printable("Failed to extract customer ID from payment intent")?; + + let is_hard_decline = + revenue_recovery::check_hard_decline(state, &payment_attempt) + .await + .ok(); + + // update the status of token in redis + let _update_error_code = storage::revenue_recovery_redis_operation::RedisTokenManager::update_payment_processor_token_error_code_from_process_tracker( + state, + &connector_customer_id, + &None, + &is_hard_decline + ) + .await; + + // unlocking the token + let _unlock_the_connector_customer_id = storage::revenue_recovery_redis_operation::RedisTokenManager::unlock_connector_customer_status( + state, + &connector_customer_id, + ) + .await; + Ok(Self::SuccessfulPayment(payment_attempt)) } RevenueRecoveryPaymentsAttemptStatus::Failed => { - Self::decide_retry_failure_action( + let connector_customer_id = payment_intent + .extract_connector_customer_id_from_payment_intent() + .change_context(errors::RecoveryError::ValueNotFound) + .attach_printable("Failed to extract customer ID from payment intent")?; + + let error_code = payment_attempt.clone().error.map(|error| error.code); + + let is_hard_decline = + revenue_recovery::check_hard_decline(state, &payment_attempt) + .await + .ok(); + + let _update_error_code = storage::revenue_recovery_redis_operation::RedisTokenManager::update_payment_processor_token_error_code_from_process_tracker( + state, + &connector_customer_id, + &error_code, + &is_hard_decline + ) + .await; + + // unlocking the token + let _unlock_connector_customer_id = storage::revenue_recovery_redis_operation::RedisTokenManager::unlock_connector_customer_status( state, - revenue_recovery_payment_data.merchant_account.get_id(), - process.clone(), - revenue_recovery_payment_data, - &payment_attempt, - payment_intent, + &connector_customer_id, ) - .await + .await; + + // Reopen calculate workflow on payment failure + reopen_calculate_workflow_on_payment_failure( + state, + process, + profile, + merchant_context, + payment_intent, + revenue_recovery_payment_data, + ) + .await?; + + Ok(Self::TerminalFailure(payment_attempt.clone())) } RevenueRecoveryPaymentsAttemptStatus::Processing => { @@ -684,6 +940,37 @@ impl Action { } Self::TerminalFailure(payment_attempt) => { + // update the connector payment transmission field to Unsuccessful and unset active attempt id + revenue_recovery_metadata.set_payment_transmission_field_for_api_request( + enums::PaymentConnectorTransmission::ConnectorCallUnsuccessful, + ); + + let payment_update_req = + PaymentsUpdateIntentRequest::update_feature_metadata_and_active_attempt_with_api( + payment_intent + .feature_metadata + .clone() + .unwrap_or_default() + .convert_back() + .set_payment_revenue_recovery_metadata_using_api( + revenue_recovery_metadata.clone(), + ), + api_enums::UpdateActiveAttempt::Unset, + ); + logger::info!( + "Call made to payments update intent api , with the request body {:?}", + payment_update_req + ); + + revenue_recovery_core::api::update_payment_intent_api( + state, + payment_intent.id.clone(), + revenue_recovery_payment_data, + payment_update_req, + ) + .await + .change_context(errors::RecoveryError::PaymentCallFailed)?; + // TODO: Add support for retrying failed outgoing recordback webhooks // finish the current psync task db.as_scheduler() @@ -804,6 +1091,140 @@ impl Action { } } } + +/// Reopen calculate workflow when payment fails +pub async fn reopen_calculate_workflow_on_payment_failure( + state: &SessionState, + process: &storage::ProcessTracker, + profile: &domain::Profile, + merchant_context: domain::MerchantContext, + payment_intent: &PaymentIntent, + revenue_recovery_payment_data: &storage::revenue_recovery::RevenueRecoveryPaymentData, +) -> RecoveryResult<()> { + let db = &*state.store; + let id = payment_intent.id.clone(); + let task = revenue_recovery_core::CALCULATE_WORKFLOW; + let runner = storage::ProcessTrackerRunner::PassiveRecoveryWorkflow; + + // Construct the process tracker ID for CALCULATE_WORKFLOW + let process_tracker_id = format!("{}_{}_{}", runner, task, id.get_string_repr()); + + logger::info!( + payment_id = %id.get_string_repr(), + process_tracker_id = %process_tracker_id, + "Attempting to reopen CALCULATE_WORKFLOW on payment failure" + ); + + // Find the existing CALCULATE_WORKFLOW process tracker + let calculate_process = db + .find_process_by_id(&process_tracker_id) + .await + .change_context(errors::RecoveryError::ProcessTrackerFailure) + .attach_printable("Failed to find CALCULATE_WORKFLOW process tracker")?; + + match calculate_process { + Some(process) => { + logger::info!( + payment_id = %id.get_string_repr(), + process_tracker_id = %process_tracker_id, + current_status = %process.business_status, + current_retry_count = process.retry_count, + "Found existing CALCULATE_WORKFLOW, updating status and retry count" + ); + + // Update the process tracker to reopen the calculate workflow + // 1. Change status from "finish" to "pending" + // 2. Increase retry count by 1 + // 3. Set business status to QUEUED + // 4. Schedule for immediate execution + let new_retry_count = process.retry_count + 1; + let new_schedule_time = common_utils::date_time::now() + time::Duration::hours(1); + + let pt_update = storage::ProcessTrackerUpdate::Update { + name: Some(task.to_string()), + retry_count: Some(new_retry_count), + schedule_time: Some(new_schedule_time), + tracking_data: Some(process.clone().tracking_data), + business_status: Some(String::from(business_status::PENDING)), + status: Some(common_enums::ProcessTrackerStatus::Pending), + updated_at: Some(common_utils::date_time::now()), + }; + + db.update_process(process.clone(), pt_update) + .await + .change_context(errors::RecoveryError::ProcessTrackerFailure) + .attach_printable("Failed to update CALCULATE_WORKFLOW process tracker")?; + + logger::info!( + payment_id = %id.get_string_repr(), + process_tracker_id = %process_tracker_id, + new_retry_count = new_retry_count, + new_schedule_time = %new_schedule_time, + "Successfully reopened CALCULATE_WORKFLOW with increased retry count" + ); + } + None => { + logger::info!( + payment_id = %id.get_string_repr(), + process_tracker_id = %process_tracker_id, + "CALCULATE_WORKFLOW process tracker not found, creating new entry" + ); + + // Create tracking data for the new CALCULATE_WORKFLOW + let tracking_data = create_calculate_workflow_tracking_data( + payment_intent, + revenue_recovery_payment_data, + )?; + + // Call the existing perform_calculate_workflow function + perform_calculate_workflow( + state, + process, + profile, + merchant_context, + &tracking_data, + revenue_recovery_payment_data, + payment_intent, + ) + .await + .change_context(errors::RecoveryError::ProcessTrackerFailure) + .attach_printable("Failed to perform calculate workflow")?; + + logger::info!( + payment_id = %id.get_string_repr(), + process_tracker_id = %process_tracker_id, + "Successfully created new CALCULATE_WORKFLOW entry using perform_calculate_workflow" + ); + } + } + + Ok(()) +} + +/// Create tracking data for the CALCULATE_WORKFLOW +fn create_calculate_workflow_tracking_data( + payment_intent: &PaymentIntent, + revenue_recovery_payment_data: &storage::revenue_recovery::RevenueRecoveryPaymentData, +) -> RecoveryResult { + let tracking_data = storage::revenue_recovery::RevenueRecoveryWorkflowTrackingData { + merchant_id: revenue_recovery_payment_data + .merchant_account + .get_id() + .clone(), + profile_id: revenue_recovery_payment_data.profile.get_id().clone(), + global_payment_id: payment_intent.id.clone(), + payment_attempt_id: payment_intent + .active_attempt_id + .clone() + .ok_or(storage_impl::errors::RecoveryError::ValueNotFound)?, + billing_mca_id: revenue_recovery_payment_data.billing_mca.get_id().clone(), + revenue_recovery_retry: revenue_recovery_payment_data.retry_algorithm, + invoice_scheduled_time: None, // Will be set by perform_calculate_workflow + }; + + Ok(tracking_data) +} + // TODO: Move these to impl based functions async fn record_back_to_billing_connector( state: &SessionState, diff --git a/crates/router/src/core/webhooks/recovery_incoming.rs b/crates/router/src/core/webhooks/recovery_incoming.rs index 5f00dd0c5f..9238b30723 100644 --- a/crates/router/src/core/webhooks/recovery_incoming.rs +++ b/crates/router/src/core/webhooks/recovery_incoming.rs @@ -7,19 +7,25 @@ use common_utils::{ }; use diesel_models::process_tracker as storage; use error_stack::{report, ResultExt}; +use futures::stream::SelectNextSome; use hyperswitch_domain_models::{ - payments as domain_payments, revenue_recovery, router_data_v2::flow_common_types, - router_flow_types, router_request_types::revenue_recovery as revenue_recovery_request, - router_response_types::revenue_recovery as revenue_recovery_response, types as router_types, + payments as domain_payments, + revenue_recovery::{self, RecoveryPaymentIntent}, + router_data_v2::flow_common_types, + router_flow_types, + router_request_types::revenue_recovery as revenue_recovery_request, + router_response_types::revenue_recovery as revenue_recovery_response, + types as router_types, }; use hyperswitch_interfaces::webhooks as interface_webhooks; use masking::{PeekInterface, Secret}; use router_env::{instrument, logger, tracing}; use services::kafka; +use storage::business_status; use crate::{ core::{ - admin, + self, admin, errors::{self, CustomResult}, payments::{self, helpers}, }, @@ -232,6 +238,7 @@ async fn handle_monitoring_threshold( } Ok(webhooks::WebhookResponseTracker::NoEffect) } + #[allow(clippy::too_many_arguments)] async fn handle_schedule_failed_payment( billing_connector_account: &domain::MerchantConnectorAccount, @@ -248,6 +255,8 @@ async fn handle_schedule_failed_payment( ) -> CustomResult { let (recovery_attempt_from_payment_attempt, recovery_intent_from_payment_attempt) = payment_attempt_with_recovery_intent; + + // When intent_retry_count is less than or equal to threshold (intent_retry_count <= mca_retry_threshold) .then(|| { logger::error!( @@ -258,12 +267,13 @@ async fn handle_schedule_failed_payment( Ok(webhooks::WebhookResponseTracker::NoEffect) }) .async_unwrap_or_else(|| async { - RevenueRecoveryAttempt::insert_execute_pcr_task( - &billing_connector_account.get_id(), - &*state.store, - merchant_context.get_merchant_account().get_id().to_owned(), - recovery_intent_from_payment_attempt.clone(), - business_profile.get_id().to_owned(), + // Call calculate_job + core::revenue_recovery::upsert_calculate_pcr_task( + billing_connector_account, + state, + merchant_context, + recovery_intent_from_payment_attempt, + business_profile, intent_retry_count, recovery_attempt_from_payment_attempt .as_ref() @@ -926,6 +936,7 @@ impl RevenueRecoveryAttempt { profile_id, payment_attempt_id, revenue_recovery_retry, + invoice_scheduled_time: Some(schedule_time), }; let tag = ["PCR"]; diff --git a/crates/router/src/types/storage/revenue_recovery.rs b/crates/router/src/types/storage/revenue_recovery.rs index 81a012ba41..af84ed2170 100644 --- a/crates/router/src/types/storage/revenue_recovery.rs +++ b/crates/router/src/types/storage/revenue_recovery.rs @@ -13,7 +13,7 @@ use masking::PeekInterface; use router_env::logger; use serde::{Deserialize, Serialize}; -use crate::{db::StorageInterface, routes::SessionState, workflows::revenue_recovery}; +use crate::{db::StorageInterface, routes::SessionState, types, workflows::revenue_recovery}; #[derive(serde::Serialize, serde::Deserialize, Debug)] pub struct RevenueRecoveryWorkflowTrackingData { pub merchant_id: id_type::MerchantId, @@ -22,6 +22,7 @@ pub struct RevenueRecoveryWorkflowTrackingData { pub payment_attempt_id: id_type::GlobalAttemptId, pub billing_mca_id: id_type::MerchantConnectorAccountId, pub revenue_recovery_retry: enums::RevenueRecoveryAlgorithmType, + pub invoice_scheduled_time: Option, } #[derive(Debug, Clone)] diff --git a/crates/router/src/workflows/revenue_recovery.rs b/crates/router/src/workflows/revenue_recovery.rs index 8f3b3f0e00..dc1d97bc42 100644 --- a/crates/router/src/workflows/revenue_recovery.rs +++ b/crates/router/src/workflows/revenue_recovery.rs @@ -114,7 +114,7 @@ impl ProcessTrackerWorkflow for ExecutePcrWorkflow { >( state, state.get_req_state(), - merchant_context_from_revenue_recovery_payment_data, + merchant_context_from_revenue_recovery_payment_data.clone(), revenue_recovery_payment_data.profile.clone(), payments::operations::PaymentGetIntent, request, @@ -128,6 +128,8 @@ impl ProcessTrackerWorkflow for ExecutePcrWorkflow { Box::pin(pcr::perform_execute_payment( state, &process, + &revenue_recovery_payment_data.profile.clone(), + merchant_context_from_revenue_recovery_payment_data.clone(), &tracking_data, &revenue_recovery_payment_data, &payment_data.payment_intent, @@ -138,6 +140,8 @@ impl ProcessTrackerWorkflow for ExecutePcrWorkflow { Box::pin(pcr::perform_payments_sync( state, &process, + &revenue_recovery_payment_data.profile.clone(), + merchant_context_from_revenue_recovery_payment_data.clone(), &tracking_data, &revenue_recovery_payment_data, &payment_data.payment_intent, @@ -145,6 +149,18 @@ impl ProcessTrackerWorkflow for ExecutePcrWorkflow { .await?; Ok(()) } + Some("CALCULATE_WORKFLOW") => { + Box::pin(pcr::perform_calculate_workflow( + state, + &process, + &revenue_recovery_payment_data.profile.clone(), + merchant_context_from_revenue_recovery_payment_data, + &tracking_data, + &revenue_recovery_payment_data, + &payment_data.payment_intent, + )) + .await + } _ => Err(errors::ProcessTrackerError::JobNotFound), }