diff --git a/crates/api_models/src/payments.rs b/crates/api_models/src/payments.rs index fefbbed0f0..7ab18f3274 100644 --- a/crates/api_models/src/payments.rs +++ b/crates/api_models/src/payments.rs @@ -2628,6 +2628,16 @@ pub struct PaymentMethodDataRequest { pub billing: Option
, } +/// The payment method information provided for making a payment +#[derive(Debug, Clone, serde::Deserialize, serde::Serialize, ToSchema, Eq, PartialEq)] +pub struct RecordAttemptPaymentMethodDataRequest { + /// Additional details for the payment method (e.g., card expiry date, card network). + #[serde(flatten)] + pub payment_method_data: AdditionalPaymentData, + /// billing details for the payment method. + pub billing: Option
, +} + #[derive(Debug, Clone, serde::Deserialize, serde::Serialize, ToSchema, Eq, PartialEq)] pub struct ProxyPaymentMethodDataRequest { /// This field is optional because, in case of saved cards we pass the payment_token @@ -3116,7 +3126,7 @@ pub struct AdditionalCardInfo { pub signature_network: Option, } -#[derive(Debug, Clone, Eq, PartialEq, serde::Deserialize, serde::Serialize)] +#[derive(Debug, Clone, Eq, PartialEq, serde::Deserialize, serde::Serialize, ToSchema)] #[serde(rename_all = "snake_case")] pub enum AdditionalPaymentData { Card(Box), @@ -4482,8 +4492,12 @@ pub struct PaymentMethodDataResponseWithBilling { #[derive(Debug, Clone, Eq, PartialEq, serde::Deserialize, ToSchema, serde::Serialize)] pub struct CustomRecoveryPaymentMethodData { - #[serde(flatten)] - pub units: HashMap, + /// Primary payment method token at payment processor end. + #[schema(value_type = String, example = "token_1234")] + pub primary_processor_payment_method_token: Secret, + + /// AdditionalCardInfo for the primary token. + pub additional_payment_method_info: AdditionalCardInfo, } #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, ToSchema)] @@ -9214,6 +9228,12 @@ impl PaymentRevenueRecoveryMetadata { pub fn get_merchant_connector_id_for_api_request(&self) -> id_type::MerchantConnectorAccountId { self.active_attempt_payment_connector_id.clone() } + + pub fn get_connector_customer_id(&self) -> String { + self.billing_connector_payment_details + .connector_customer_id + .to_owned() + } } #[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] @@ -9272,8 +9292,8 @@ pub struct PaymentsAttemptRecordRequest { #[schema(value_type = PaymentMethodType, example = "apple_pay")] pub payment_method_subtype: api_enums::PaymentMethodType, - /// The payment instrument data to be used for the payment attempt. - pub payment_method_data: Option, + /// The additional payment data to be used for the payment attempt. + pub payment_method_data: Option, /// Metadata is useful for storing additional, unstructured information on an object. #[schema(value_type = Option, example = r#"{ "udf1": "some-value", "udf2": "some-value" }"#)] @@ -9361,10 +9381,6 @@ pub struct RecoveryPaymentsCreate { #[schema(value_type = PaymentMethodType, example = "apple_pay")] pub payment_method_sub_type: api_enums::PaymentMethodType, - /// primary payment method token at payment processor end. - #[schema(value_type = String, example = "token_1234")] - pub primary_processor_payment_method_token: Secret, - /// The time at which payment attempt was created. #[schema(example = "2022-09-10T10:11:12Z")] #[serde(default, with = "common_utils::custom_serde::iso8601::option")] @@ -9388,7 +9404,7 @@ pub struct RecoveryPaymentsCreate { pub connector_transaction_id: Option>, /// payment method token units at payment processor end. - pub payment_method_units: CustomRecoveryPaymentMethodData, + pub payment_method_data: CustomRecoveryPaymentMethodData, /// Type of action that needs to be taken after consuming the recovery payload. For example: scheduling a failed payment or stopping the invoice. pub action: common_payments_types::RecoveryAction, diff --git a/crates/hyperswitch_connectors/src/connectors/worldpayvantiv.rs b/crates/hyperswitch_connectors/src/connectors/worldpayvantiv.rs index 2447e6a203..75e396b755 100644 --- a/crates/hyperswitch_connectors/src/connectors/worldpayvantiv.rs +++ b/crates/hyperswitch_connectors/src/connectors/worldpayvantiv.rs @@ -1648,6 +1648,26 @@ static WORLDPAYVANTIV_SUPPORTED_PAYMENT_METHODS: LazyLock TryFrom TryFrom> for RefundsRo .as_ref() .and_then(|detail| detail.response_reason_message.clone()) .unwrap_or(consts::NO_ERROR_MESSAGE.to_string()); + let connector_transaction_id = item + .response + .payment_detail + .as_ref() + .and_then(|detail| detail.payment_id.map(|id| id.to_string())); Ok(Self { response: Err(ErrorResponse { @@ -2152,7 +2163,7 @@ impl TryFrom> for RefundsRo reason: Some(error_message.clone()), status_code: item.http_code, attempt_status: None, - connector_transaction_id: None, + connector_transaction_id, network_advice_code: None, network_decline_code: None, network_error_message: None, diff --git a/crates/hyperswitch_domain_models/src/payments.rs b/crates/hyperswitch_domain_models/src/payments.rs index 1a8224b814..4f8e12fa7c 100644 --- a/crates/hyperswitch_domain_models/src/payments.rs +++ b/crates/hyperswitch_domain_models/src/payments.rs @@ -720,6 +720,8 @@ impl PaymentIntent { &self, revenue_recovery_metadata: api_models::payments::PaymentRevenueRecoveryMetadata, billing_connector_account: &merchant_connector_account::MerchantConnectorAccount, + card_info: api_models::payments::AdditionalCardInfo, + payment_processor_token: &str, ) -> CustomResult< revenue_recovery::RevenueRecoveryAttemptData, errors::api_error_response::ApiErrorResponse, @@ -751,9 +753,7 @@ impl PaymentIntent { connector_transaction_id: None, // No connector id error_code: None, error_message: None, - processor_payment_method_token: revenue_recovery_metadata - .billing_connector_payment_details - .payment_processor_token, + processor_payment_method_token: payment_processor_token.to_string(), connector_customer_id: revenue_recovery_metadata .billing_connector_payment_details .connector_customer_id, @@ -774,23 +774,7 @@ impl PaymentIntent { invoice_billing_started_at_time: None, // No charge id is present here since it is an internal payment and we didn't call connector yet. charge_id: None, - card_info: api_models::payments::AdditionalCardInfo { - card_issuer: None, - card_network: None, - card_type: None, - card_issuing_country: None, - bank_code: None, - last4: None, - card_isin: None, - card_extended_bin: None, - card_exp_month: None, - card_exp_year: None, - card_holder_name: None, - payment_checks: None, - authentication_data: None, - is_regulated: None, - signature_network: None, - }, + card_info: card_info.clone(), }) } diff --git a/crates/hyperswitch_domain_models/src/payments/payment_attempt.rs b/crates/hyperswitch_domain_models/src/payments/payment_attempt.rs index 4d1ae8bb92..327a0137a6 100644 --- a/crates/hyperswitch_domain_models/src/payments/payment_attempt.rs +++ b/crates/hyperswitch_domain_models/src/payments/payment_attempt.rs @@ -847,6 +847,15 @@ impl PaymentAttempt { }), }; + let payment_method_data = request + .payment_method_data + .as_ref() + .map(|data| data.payment_method_data.clone().encode_to_value()) + .transpose() + .change_context(errors::api_error_response::ApiErrorResponse::InternalServerError) + .attach_printable("Unable to decode additional payment method data")? + .map(pii::SecretSerdeValue::new); + let payment_method_billing_address = encrypted_data .payment_method_billing_address .as_ref() @@ -882,7 +891,7 @@ impl PaymentAttempt { payment_token: None, connector_metadata: None, payment_experience: None, - payment_method_data: None, + payment_method_data, routing_result: None, preprocessing_step_id: None, multiple_capture_count: None, diff --git a/crates/router/src/core/payments/transformers.rs b/crates/router/src/core/payments/transformers.rs index 574a4a7702..50be78e992 100644 --- a/crates/router/src/core/payments/transformers.rs +++ b/crates/router/src/core/payments/transformers.rs @@ -358,6 +358,13 @@ pub async fn construct_payment_router_data_for_authorize<'a>( .browser_info .clone() .map(types::BrowserInformation::from); + let additional_payment_method_data: Option = + payment_data.payment_attempt + .payment_method_data + .as_ref().map(|data| data.clone().parse_value("AdditionalPaymentData")) + .transpose() + .change_context(errors::ApiErrorResponse::InternalServerError) + .attach_printable("Failed to parse AdditionalPaymentData from payment_data.payment_attempt.payment_method_data")?; // TODO: few fields are repeated in both routerdata and request let request = types::PaymentsAuthorizeData { @@ -412,7 +419,7 @@ pub async fn construct_payment_router_data_for_authorize<'a>( .map(|reference_id| reference_id.get_string_repr().to_owned()), integrity_object: None, shipping_cost: payment_data.payment_intent.amount_details.shipping_cost, - additional_payment_method_data: None, + additional_payment_method_data, merchant_account_id: None, merchant_config_currency: None, connector_testing_data: None, diff --git a/crates/router/src/core/revenue_recovery.rs b/crates/router/src/core/revenue_recovery.rs index a2a90f1e9e..206c5026a6 100644 --- a/crates/router/src/core/revenue_recovery.rs +++ b/crates/router/src/core/revenue_recovery.rs @@ -25,7 +25,7 @@ use crate::{ types::{ domain, storage::{self, revenue_recovery as pcr}, - transformers::ForeignInto, + transformers::{ForeignFrom, ForeignInto}, }, workflows, }; @@ -177,12 +177,44 @@ pub async fn perform_execute_payment( // TODO decide if its a global failure or is it requeueable error match decision { types::Decision::Execute => { + let connector_customer_id = revenue_recovery_metadata.get_connector_customer_id(); + + let last_token_used = payment_intent + .feature_metadata + .as_ref() + .and_then(|fm| fm.payment_revenue_recovery_metadata.as_ref()) + .map(|rr| { + rr.billing_connector_payment_details + .payment_processor_token + .clone() + }); + + let processor_token = storage::revenue_recovery_redis_operation::RedisTokenManager::get_token_based_on_retry_type( + state, + &connector_customer_id, + tracking_data.revenue_recovery_retry, + last_token_used.as_deref(), + ) + .await + .change_context(errors::ApiErrorResponse::GenericNotFoundError { + message: "Failed to fetch token details from redis".to_string(), + })? + .ok_or(errors::ApiErrorResponse::GenericNotFoundError { + message: "Failed to fetch token details from redis".to_string(), + })?; + logger::info!("Token fetched from redis success"); + let card_info = + api_models::payments::AdditionalCardInfo::foreign_from(&processor_token); // record attempt call let record_attempt = api::record_internal_attempt_api( state, payment_intent, revenue_recovery_payment_data, &revenue_recovery_metadata, + card_info, + &processor_token + .payment_processor_token_details + .payment_processor_token, ) .await; @@ -456,7 +488,7 @@ pub async fn perform_calculate_workflow( }; // 2. Get best available token - let best_time_to_schedule = match workflows::revenue_recovery::get_token_with_schedule_time_based_on_retry_alogrithm_type( + let best_time_to_schedule = match workflows::revenue_recovery::get_token_with_schedule_time_based_on_retry_algorithm_type( state, &connector_customer_id, payment_intent, @@ -493,7 +525,7 @@ pub async fn perform_calculate_workflow( &tracking_data.profile_id, &tracking_data.payment_attempt_id, storage::ProcessTrackerRunner::PassiveRecoveryWorkflow, - tracking_data.revenue_recovery_retry, + retry_algorithm_type, scheduled_time, ) .await?; @@ -628,7 +660,12 @@ async fn update_calculate_job_schedule_time( ) -> Result<(), sch_errors::ProcessTrackerError> { let new_schedule_time = base_time.unwrap_or_else(common_utils::date_time::now) + additional_time; - + logger::info!( + new_schedule_time = %new_schedule_time, + process_id = %process.id, + connector_customer_id = %connector_customer_id, + "Rescheduling Calculate Job at " + ); let pt_update = storage::ProcessTrackerUpdate::Update { name: Some("CALCULATE_WORKFLOW".to_string()), retry_count: Some(process.clone().retry_count), @@ -710,11 +747,24 @@ async fn insert_execute_pcr_task_to_pt( "Found existing EXECUTE_WORKFLOW task with COMPLETE status, updating to PENDING with incremented retry count" ); + let mut tracking_data: pcr::RevenueRecoveryWorkflowTrackingData = + serde_json::from_value(existing_process.tracking_data.clone()) + .change_context(errors::RecoveryError::ValueNotFound) + .attach_printable( + "Failed to deserialize the tracking data from process tracker", + )?; + + tracking_data.revenue_recovery_retry = revenue_recovery_retry; + + let tracking_data_json = serde_json::to_value(&tracking_data) + .change_context(errors::RecoveryError::ValueNotFound) + .attach_printable("Failed to serialize the tracking data to json")?; + let pt_update = storage::ProcessTrackerUpdate::Update { name: Some(task.to_string()), retry_count: Some(existing_process.clone().retry_count + 1), schedule_time: Some(schedule_time), - tracking_data: Some(existing_process.clone().tracking_data), + tracking_data: Some(tracking_data_json), business_status: Some(String::from(business_status::PENDING)), status: Some(enums::ProcessTrackerStatus::Pending), updated_at: Some(common_utils::date_time::now()), diff --git a/crates/router/src/core/revenue_recovery/api.rs b/crates/router/src/core/revenue_recovery/api.rs index d942db48b4..f7172e2369 100644 --- a/crates/router/src/core/revenue_recovery/api.rs +++ b/crates/router/src/core/revenue_recovery/api.rs @@ -13,7 +13,10 @@ use crate::{ payments::{self, operations::Operation}, webhooks::recovery_incoming, }, - db::errors::{RouterResponse, StorageErrorExt}, + db::{ + errors::{RouterResponse, StorageErrorExt}, + storage::revenue_recovery_redis_operation::RedisTokenManager, + }, logger, routes::{app::ReqState, SessionState}, services, @@ -21,6 +24,7 @@ use crate::{ api::payments as api_types, domain, storage::{self, revenue_recovery as revenue_recovery_types}, + transformers::ForeignFrom, }, }; @@ -31,7 +35,7 @@ pub async fn call_psync_api( ) -> RouterResult> { let operation = payments::operations::PaymentGet; let req = payments_api::PaymentsRetrieveRequest { - force_sync: false, + force_sync: true, param: None, expand_attempts: true, return_raw_connector_response: None, @@ -82,12 +86,17 @@ pub async fn call_proxy_api( payment_intent: &payments_domain::PaymentIntent, revenue_recovery_payment_data: &storage::revenue_recovery::RevenueRecoveryPaymentData, revenue_recovery: &payments_api::PaymentRevenueRecoveryMetadata, + payment_processor_token: &str, ) -> RouterResult> { let operation = payments::operations::proxy_payments_intent::PaymentProxyIntent; + let recurring_details = api_models::mandates::ProcessorPaymentToken { + processor_payment_token: payment_processor_token.to_string(), + merchant_connector_id: Some(revenue_recovery.get_merchant_connector_id_for_api_request()), + }; let req = payments_api::ProxyPaymentsRequest { return_url: None, amount: payments_api::AmountDetails::new(payment_intent.amount_details.clone().into()), - recurring_details: revenue_recovery.get_payment_token_for_api_request(), + recurring_details, shipping: None, browser_info: None, connector: revenue_recovery.connector.to_string(), @@ -176,12 +185,16 @@ pub async fn record_internal_attempt_api( payment_intent: &payments_domain::PaymentIntent, revenue_recovery_payment_data: &storage::revenue_recovery::RevenueRecoveryPaymentData, revenue_recovery_metadata: &payments_api::PaymentRevenueRecoveryMetadata, + card_info: payments_api::AdditionalCardInfo, + payment_processor_token: &str, ) -> RouterResult { let revenue_recovery_attempt_data = recovery_incoming::RevenueRecoveryAttempt::get_revenue_recovery_attempt( payment_intent, revenue_recovery_metadata, &revenue_recovery_payment_data.billing_mca, + card_info, + payment_processor_token, ) .change_context(errors::ApiErrorResponse::GenericNotFoundError { message: "get_revenue_recovery_attempt was not constructed".to_string(), diff --git a/crates/router/src/core/revenue_recovery/transformers.rs b/crates/router/src/core/revenue_recovery/transformers.rs index f78d496a6b..9835999627 100644 --- a/crates/router/src/core/revenue_recovery/transformers.rs +++ b/crates/router/src/core/revenue_recovery/transformers.rs @@ -64,11 +64,6 @@ impl ForeignFrom<&api_models::payments::RecoveryPaymentsCreate> for hyperswitch_domain_models::revenue_recovery::RevenueRecoveryAttemptData { fn foreign_from(data: &api_models::payments::RecoveryPaymentsCreate) -> Self { - let primary_token = &data - .primary_processor_payment_method_token - .peek() - .to_string(); - let card_info = data.payment_method_units.units.get(primary_token); Self { amount: data.amount_details.order_amount().into(), currency: data.amount_details.currency(), @@ -79,6 +74,7 @@ impl ForeignFrom<&api_models::payments::RecoveryPaymentsCreate> error_code: data.error.as_ref().map(|error| error.code.clone()), error_message: data.error.as_ref().map(|error| error.message.clone()), processor_payment_method_token: data + .payment_method_data .primary_processor_payment_method_token .peek() .to_string(), @@ -103,29 +99,14 @@ impl ForeignFrom<&api_models::payments::RecoveryPaymentsCreate> .error .as_ref() .and_then(|error| error.network_error_message.clone()), - /// retry count will be updated whenever there is new attempt is created. + // retry count will be updated whenever there is new attempt is created. retry_count: None, invoice_next_billing_time: None, invoice_billing_started_at_time: data.billing_started_at, - card_info: card_info - .cloned() - .unwrap_or(api_models::payments::AdditionalCardInfo { - card_issuer: None, - card_network: None, - card_type: None, - card_issuing_country: None, - bank_code: None, - last4: None, - card_isin: None, - card_extended_bin: None, - card_exp_month: None, - card_exp_year: None, - card_holder_name: None, - payment_checks: None, - authentication_data: None, - is_regulated: None, - signature_network: None, - }), + card_info: data + .payment_method_data + .additional_payment_method_info + .clone(), charge_id: None, } } diff --git a/crates/router/src/core/revenue_recovery/types.rs b/crates/router/src/core/revenue_recovery/types.rs index e3401baa40..1777f4cb22 100644 --- a/crates/router/src/core/revenue_recovery/types.rs +++ b/crates/router/src/core/revenue_recovery/types.rs @@ -36,7 +36,7 @@ use crate::{ core::{ errors::{self, RouterResult}, payments::{self, helpers, operations::Operation}, - revenue_recovery::{self as revenue_recovery_core, perform_calculate_workflow}, + revenue_recovery::{self as revenue_recovery_core, pcr, perform_calculate_workflow}, webhooks::recovery_incoming as recovery_incoming_flow, }, db::StorageInterface, @@ -71,6 +71,7 @@ impl RevenueRecoveryPaymentsAttemptStatus { db: &dyn StorageInterface, execute_task_process: &storage::ProcessTracker, ) -> Result<(), errors::ProcessTrackerError> { + logger::info!("Entering update_pt_status_based_on_attempt_status_for_execute_payment"); match &self { Self::Succeeded | Self::Failed | Self::Processing => { // finish the current execute task @@ -132,6 +133,8 @@ impl RevenueRecoveryPaymentsAttemptStatus { &recovery_payment_attempt, ); + let used_token = get_payment_processor_token_id_from_payment_attempt(&payment_attempt); + let retry_count = process_tracker.retry_count; match self { @@ -156,16 +159,14 @@ impl RevenueRecoveryPaymentsAttemptStatus { ); }; - let is_hard_decline = revenue_recovery::check_hard_decline(state, &payment_attempt) - .await - .ok(); - // update the status of token in redis let _update_error_code = storage::revenue_recovery_redis_operation::RedisTokenManager::update_payment_processor_token_error_code_from_process_tracker( state, &connector_customer_id, &None, - &is_hard_decline + // Since this is succeeded payment attempt, 'is_hard_decine' will be false. + &Some(false), + used_token.as_deref(), ) .await; @@ -219,7 +220,8 @@ impl RevenueRecoveryPaymentsAttemptStatus { state, &connector_customer_id, &error_code, - &is_hard_decline + &is_hard_decline, + used_token.as_deref(), ) .await; @@ -289,6 +291,8 @@ impl Decision { revenue_recovery_data: &storage::revenue_recovery::RevenueRecoveryPaymentData, payment_id: &id_type::GlobalPaymentId, ) -> RecoveryResult { + logger::info!("Entering get_decision_based_on_params"); + Ok(match (intent_status, called_connector, active_attempt_id) { ( enums::IntentStatus::Failed, @@ -357,7 +361,7 @@ impl Action { #[allow(clippy::too_many_arguments)] pub async fn execute_payment( state: &SessionState, - merchant_id: &id_type::MerchantId, + _merchant_id: &id_type::MerchantId, payment_intent: &PaymentIntent, process: &storage::ProcessTracker, profile: &domain::Profile, @@ -370,8 +374,29 @@ impl Action { .change_context(errors::RecoveryError::ValueNotFound) .attach_printable("Failed to extract customer ID from payment intent")?; - let scheduled_token = match storage::revenue_recovery_redis_operation:: - RedisTokenManager::get_payment_processor_token_with_schedule_time(state, &connector_customer_id) + let tracking_data: pcr::RevenueRecoveryWorkflowTrackingData = + serde_json::from_value(process.tracking_data.clone()) + .change_context(errors::RecoveryError::ValueNotFound) + .attach_printable("Failed to deserialize the tracking data from process tracker")?; + + let last_token_used = payment_intent + .feature_metadata + .as_ref() + .and_then(|fm| fm.payment_revenue_recovery_metadata.as_ref()) + .map(|rr| { + rr.billing_connector_payment_details + .payment_processor_token + .clone() + }); + + let recovery_algorithm = tracking_data.revenue_recovery_retry; + + let scheduled_token = match storage::revenue_recovery_redis_operation::RedisTokenManager::get_token_based_on_retry_type( + state, + &connector_customer_id, + recovery_algorithm, + last_token_used.as_deref(), + ) .await { Ok(scheduled_token_opt) => scheduled_token_opt, Err(e) => { @@ -391,6 +416,9 @@ impl Action { payment_intent, revenue_recovery_payment_data, revenue_recovery_metadata, + &scheduled_token + .payment_processor_token_details + .payment_processor_token, ) .await; @@ -439,7 +467,8 @@ impl Action { state, &connector_customer_id, &None, - &is_hard_decline + &is_hard_decline, + Some(&scheduled_token.payment_processor_token_details.payment_processor_token), ) .await; @@ -496,7 +525,11 @@ impl Action { state, &connector_customer_id, &error_code, - &is_hard_decline + &is_hard_decline, + Some(&scheduled_token + .payment_processor_token_details + .payment_processor_token) + , ) .await; @@ -587,6 +620,8 @@ impl Action { revenue_recovery_payment_data: &storage::revenue_recovery::RevenueRecoveryPaymentData, revenue_recovery_metadata: &mut PaymentRevenueRecoveryMetadata, ) -> Result<(), errors::ProcessTrackerError> { + logger::info!("Entering execute_payment_task_response_handler"); + let db = &*state.store; match self { Self::SyncPayment(payment_attempt) => { @@ -753,12 +788,15 @@ impl Action { merchant_context: domain::MerchantContext, payment_attempt: PaymentAttempt, ) -> RecoveryResult { + logger::info!("Entering payment_sync_call"); + let response = revenue_recovery_core::api::call_psync_api( state, payment_intent.get_id(), revenue_recovery_payment_data, ) .await; + let used_token = get_payment_processor_token_id_from_payment_attempt(&payment_attempt); match response { Ok(_payment_data) => match payment_attempt.status.foreign_into() { @@ -778,7 +816,8 @@ impl Action { state, &connector_customer_id, &None, - &is_hard_decline + &is_hard_decline, + used_token.as_deref(), ) .await; @@ -808,7 +847,8 @@ impl Action { state, &connector_customer_id, &error_code, - &is_hard_decline + &is_hard_decline, + used_token.as_deref(), ) .await; @@ -857,6 +897,8 @@ impl Action { revenue_recovery_metadata: &mut PaymentRevenueRecoveryMetadata, revenue_recovery_payment_data: &storage::revenue_recovery::RevenueRecoveryPaymentData, ) -> Result<(), errors::ProcessTrackerError> { + logger::info!("Entering psync_response_handler"); + let db = &*state.store; match self { Self::SyncPayment(payment_attempt) => { @@ -1170,11 +1212,78 @@ pub async fn reopen_calculate_workflow_on_payment_failure( "CALCULATE_WORKFLOW process tracker not found, creating new entry" ); - // Create tracking data for the new CALCULATE_WORKFLOW - let tracking_data = create_calculate_workflow_tracking_data( - payment_intent, - revenue_recovery_payment_data, - )?; + let task = "CALCULATE_WORKFLOW"; + + let db = &*state.store; + + // Create process tracker ID in the format: CALCULATE_WORKFLOW_{payment_intent_id} + let process_tracker_id = format!("{runner}_{task}_{}", id.get_string_repr()); + + // Set scheduled time to 1 hour from now + let schedule_time = common_utils::date_time::now() + time::Duration::hours(1); + + // Check if a process tracker entry already exists for this payment intent + let existing_entry = db + .as_scheduler() + .find_process_by_id(&process_tracker_id) + .await + .change_context(errors::RecoveryError::ProcessTrackerFailure) + .attach_printable( + "Failed to check for existing calculate workflow process tracker entry", + )?; + + match existing_entry { + Some(existing_process) => { + router_env::logger::error!( + "Found existing CALCULATE_WORKFLOW task with id: {}", + existing_process.id + ); + } + None => { + // No entry exists - create a new one + router_env::logger::info!( + "No existing CALCULATE_WORKFLOW task found for payment_intent_id: {}, creating new entry scheduled for 1 hour from now", + id.get_string_repr() + ); + + let tag = ["PCR"]; + let task = "CALCULATE_WORKFLOW"; + let runner = storage::ProcessTrackerRunner::PassiveRecoveryWorkflow; + + let process_tracker_entry = storage::ProcessTrackerNew::new( + &process_tracker_id, + task, + runner, + tag, + process.tracking_data.clone(), + Some(process.retry_count), + schedule_time, + common_types::consts::API_VERSION, + ) + .change_context(errors::RecoveryError::ProcessTrackerFailure) + .attach_printable( + "Failed to construct calculate workflow process tracker entry", + )?; + + // Insert into process tracker with status New + db.as_scheduler() + .insert_process(process_tracker_entry) + .await + .change_context(errors::RecoveryError::ProcessTrackerFailure) + .attach_printable( + "Failed to enter calculate workflow process_tracker_entry in DB", + )?; + + router_env::logger::info!( + "Successfully created new CALCULATE_WORKFLOW task for payment_intent_id: {}", + id.get_string_repr() + ); + } + } + + let tracking_data = serde_json::from_value(process.tracking_data.clone()) + .change_context(errors::RecoveryError::ValueNotFound) + .attach_printable("Failed to deserialize the tracking data from process tracker")?; // Call the existing perform_calculate_workflow function perform_calculate_workflow( @@ -1232,6 +1341,8 @@ async fn record_back_to_billing_connector( payment_intent: &PaymentIntent, billing_mca: &merchant_connector_account::MerchantConnectorAccount, ) -> RecoveryResult<()> { + logger::info!("Entering record_back_to_billing_connector"); + let connector_name = billing_mca.connector_name.to_string(); let connector_data = api_types::ConnectorData::get_connector_by_name( &state.conf.connectors, @@ -1284,6 +1395,8 @@ pub fn construct_recovery_record_back_router_data( payment_attempt: &PaymentAttempt, payment_intent: &PaymentIntent, ) -> RecoveryResult { + logger::info!("Entering construct_recovery_record_back_router_data"); + let auth_type: types::ConnectorAuthType = helpers::MerchantConnectorAccountType::DbVal(Box::new(billing_mca.clone())) .get_connector_account_details() @@ -1337,3 +1450,15 @@ pub fn construct_recovery_record_back_router_data( .attach_printable("Cannot construct record back router data")?; Ok(old_router_data) } + +pub fn get_payment_processor_token_id_from_payment_attempt( + payment_attempt: &PaymentAttempt, +) -> Option { + let used_token = payment_attempt + .connector_token_details + .as_ref() + .and_then(|t| t.connector_mandate_id.clone()); + logger::info!("Used token in the payment attempt : {:?}", used_token); + + used_token +} diff --git a/crates/router/src/core/utils.rs b/crates/router/src/core/utils.rs index 1cf4a2a50d..4a56078858 100644 --- a/crates/router/src/core/utils.rs +++ b/crates/router/src/core/utils.rs @@ -386,7 +386,11 @@ pub async fn construct_refund_router_data<'a, F>( connector_customer: None, recurring_mandate_payment_data: None, preprocessing_id: None, - connector_request_reference_id: refund.id.get_string_repr().to_string().clone(), + connector_request_reference_id: refund + .merchant_reference_id + .get_string_repr() + .to_string() + .clone(), #[cfg(feature = "payouts")] payout_method_data: None, #[cfg(feature = "payouts")] diff --git a/crates/router/src/core/webhooks/recovery_incoming.rs b/crates/router/src/core/webhooks/recovery_incoming.rs index 9238b30723..b56477cdac 100644 --- a/crates/router/src/core/webhooks/recovery_incoming.rs +++ b/crates/router/src/core/webhooks/recovery_incoming.rs @@ -520,11 +520,15 @@ impl RevenueRecoveryAttempt { payment_intent: &domain_payments::PaymentIntent, revenue_recovery_metadata: &api_payments::PaymentRevenueRecoveryMetadata, billing_connector_account: &domain::MerchantConnectorAccount, + card_info: api_payments::AdditionalCardInfo, + payment_processor_token: &str, ) -> CustomResult { let revenue_recovery_data = payment_intent .create_revenue_recovery_attempt_data( revenue_recovery_metadata.clone(), billing_connector_account, + card_info, + payment_processor_token, ) .change_context(errors::RevenueRecoveryError::RevenueRecoveryAttemptDataCreateFailed) .attach_printable("Failed to build recovery attempt data")?; @@ -752,8 +756,14 @@ impl RevenueRecoveryAttempt { }) .await .flatten(); + let payment_method_data = api_models::payments::RecordAttemptPaymentMethodDataRequest { + payment_method_data: api_models::payments::AdditionalPaymentData::Card(Box::new( + revenue_recovery_attempt_data.card_info.clone(), + )), + billing: None, + }; - let card_issuer = card_info.and_then(|info| info.card_issuer); + let card_issuer = revenue_recovery_attempt_data.card_info.card_issuer.clone(); let error = Option::::from(revenue_recovery_attempt_data); @@ -772,7 +782,7 @@ impl RevenueRecoveryAttempt { payment_method_type: revenue_recovery_attempt_data.payment_method_type, billing_connector_id: billing_merchant_connector_account_id.clone(), payment_method_subtype: revenue_recovery_attempt_data.payment_method_sub_type, - payment_method_data: None, + payment_method_data: Some(payment_method_data), metadata: None, feature_metadata: Some(feature_metadata), transaction_created_at: revenue_recovery_attempt_data.transaction_created_at, @@ -890,82 +900,6 @@ impl RevenueRecoveryAttempt { Ok(payment_attempt_with_recovery_intent) } - #[allow(clippy::too_many_arguments)] - async fn insert_execute_pcr_task( - billing_mca_id: &id_type::MerchantConnectorAccountId, - db: &dyn StorageInterface, - merchant_id: id_type::MerchantId, - payment_intent: revenue_recovery::RecoveryPaymentIntent, - profile_id: id_type::ProfileId, - intent_retry_count: u16, - payment_attempt_id: Option, - runner: storage::ProcessTrackerRunner, - revenue_recovery_retry: api_enums::RevenueRecoveryAlgorithmType, - ) -> CustomResult { - let task = "EXECUTE_WORKFLOW"; - - let payment_id = payment_intent.payment_id.clone(); - - let process_tracker_id = format!("{runner}_{task}_{}", payment_id.get_string_repr()); - - let schedule_time = revenue_recovery_flow::get_schedule_time_to_retry_mit_payments( - db, - &merchant_id, - (intent_retry_count + 1).into(), - ) - .await - .map_or_else( - || { - Err(errors::RevenueRecoveryError::ScheduleTimeFetchFailed) - .attach_printable("Failed to get schedule time for pcr workflow") - }, - Ok, // Simply returns `time` wrapped in `Ok` - )?; - - let payment_attempt_id = payment_attempt_id - .ok_or(report!( - errors::RevenueRecoveryError::PaymentAttemptIdNotFound - )) - .attach_printable("payment attempt id is required for pcr workflow tracking")?; - - let execute_workflow_tracking_data = - storage_revenue_recovery::RevenueRecoveryWorkflowTrackingData { - billing_mca_id: billing_mca_id.clone(), - global_payment_id: payment_id.clone(), - merchant_id, - profile_id, - payment_attempt_id, - revenue_recovery_retry, - invoice_scheduled_time: Some(schedule_time), - }; - - let tag = ["PCR"]; - - let process_tracker_entry = storage::ProcessTrackerNew::new( - process_tracker_id, - task, - runner, - tag, - execute_workflow_tracking_data, - Some(intent_retry_count.into()), - schedule_time, - common_enums::ApiVersion::V2, - ) - .change_context(errors::RevenueRecoveryError::ProcessTrackerCreationError) - .attach_printable("Failed to construct process tracker entry")?; - - db.insert_process(process_tracker_entry) - .await - .change_context(errors::RevenueRecoveryError::ProcessTrackerResponseError) - .attach_printable("Failed to enter process_tracker_entry in DB")?; - metrics::TASKS_ADDED_COUNT.add(1, router_env::metric_attributes!(("flow", "ExecutePCR"))); - - Ok(webhooks::WebhookResponseTracker::Payment { - payment_id, - status: payment_intent.status, - }) - } - /// Store payment processor tokens in Redis for retry management async fn store_payment_processor_tokens_in_redis( &self, diff --git a/crates/router/src/types/storage/revenue_recovery.rs b/crates/router/src/types/storage/revenue_recovery.rs index af84ed2170..3c3df991d4 100644 --- a/crates/router/src/types/storage/revenue_recovery.rs +++ b/crates/router/src/types/storage/revenue_recovery.rs @@ -43,12 +43,17 @@ impl RevenueRecoveryPaymentData { payment_intent: &PaymentIntent, is_hard_decline: bool, ) -> Option { + if is_hard_decline { + logger::info!("Hard Decline encountered"); + return None; + } match self.retry_algorithm { enums::RevenueRecoveryAlgorithmType::Monitoring => { logger::error!("Monitoring type found for Revenue Recovery retry payment"); None } enums::RevenueRecoveryAlgorithmType::Cascading => { + logger::info!("Cascading type found for Revenue Recovery retry payment"); revenue_recovery::get_schedule_time_to_retry_mit_payments( state.store.as_ref(), merchant_id, diff --git a/crates/router/src/types/storage/revenue_recovery_redis_operation.rs b/crates/router/src/types/storage/revenue_recovery_redis_operation.rs index 433de9c2b3..e4eec924a9 100644 --- a/crates/router/src/types/storage/revenue_recovery_redis_operation.rs +++ b/crates/router/src/types/storage/revenue_recovery_redis_operation.rs @@ -5,11 +5,11 @@ use common_utils::{date_time, errors::CustomResult, id_type}; use error_stack::ResultExt; use masking::Secret; use redis_interface::{DelReply, SetnxReply}; -use router_env::{instrument, tracing}; +use router_env::{instrument, logger, tracing}; use serde::{Deserialize, Serialize}; use time::{Date, Duration, OffsetDateTime, PrimitiveDateTime}; -use crate::{db::errors, SessionState}; +use crate::{db::errors, types::storage::enums::RevenueRecoveryAlgorithmType, SessionState}; // Constants for retry window management const RETRY_WINDOW_DAYS: i32 = 30; @@ -456,21 +456,33 @@ impl RedisTokenManager { connector_customer_id: &str, error_code: &Option, is_hard_decline: &Option, + payment_processor_token_id: Option<&str>, ) -> CustomResult { let today = OffsetDateTime::now_utc().date(); - let updated_token = - Self::get_connector_customer_payment_processor_tokens(state, connector_customer_id) - .await? - .values() - .find(|status| status.scheduled_at.is_some()) - .map(|status| PaymentProcessorTokenStatus { - payment_processor_token_details: status.payment_processor_token_details.clone(), - inserted_by_attempt_id: status.inserted_by_attempt_id.clone(), - error_code: error_code.clone(), - daily_retry_history: status.daily_retry_history.clone(), - scheduled_at: None, - is_hard_decline: *is_hard_decline, - }); + let updated_token = match payment_processor_token_id { + Some(token_id) => { + Self::get_connector_customer_payment_processor_tokens(state, connector_customer_id) + .await? + .values() + .find(|status| { + status + .payment_processor_token_details + .payment_processor_token + == token_id + }) + .map(|status| PaymentProcessorTokenStatus { + payment_processor_token_details: status + .payment_processor_token_details + .clone(), + inserted_by_attempt_id: status.inserted_by_attempt_id.clone(), + error_code: error_code.clone(), + daily_retry_history: status.daily_retry_history.clone(), + scheduled_at: None, + is_hard_decline: *is_hard_decline, + }) + } + None => None, + }; match updated_token { Some(mut token) => { @@ -512,7 +524,7 @@ impl RedisTokenManager { None => { tracing::debug!( connector_customer_id = connector_customer_id, - "No Token found with scheduled time to update error code", + "No Token found with token id to update error code", ); Ok(false) } @@ -601,33 +613,26 @@ impl RedisTokenManager { Ok(scheduled_token) } - // Get payment processor token with max retry remaining for cascading retry algorithm + // Get payment processor token using token id #[instrument(skip_all)] - pub async fn get_token_with_max_retry_remaining( + pub async fn get_payment_processor_token_using_token_id( state: &SessionState, connector_customer_id: &str, - ) -> CustomResult, errors::StorageError> { + payment_processor_token: &str, + ) -> CustomResult, errors::StorageError> { // Get all tokens for the customer let tokens_map = Self::get_connector_customer_payment_processor_tokens(state, connector_customer_id) .await?; - - // Tokens with retry metadata - let tokens_with_retry = Self::get_tokens_with_retry_metadata(state, &tokens_map); - - // Find the token with max retry remaining - let max_retry_token = tokens_with_retry - .into_iter() - .filter(|(_, token_info)| !token_info.token_status.is_hard_decline.unwrap_or(false)) - .max_by_key(|(_, token_info)| token_info.monthly_retry_remaining) - .map(|(_, token_info)| token_info); + let token_details = tokens_map.get(payment_processor_token).cloned(); tracing::debug!( - connector_customer_id = connector_customer_id, - "Fetched payment processor token with max retry remaining", + token_found = token_details.is_some(), + customer_id = connector_customer_id, + "Fetched payment processor token & Checked existence ", ); - Ok(max_retry_token) + Ok(token_details) } // Check if all tokens are hard declined or no token found for the customer @@ -639,10 +644,9 @@ impl RedisTokenManager { let tokens_map = Self::get_connector_customer_payment_processor_tokens(state, connector_customer_id) .await?; - let all_hard_declined = tokens_map.is_empty() - && tokens_map - .values() - .all(|token| token.is_hard_decline.unwrap_or(false)); + let all_hard_declined = tokens_map + .values() + .all(|token| token.is_hard_decline.unwrap_or(false)); tracing::debug!( connector_customer_id = connector_customer_id, @@ -652,4 +656,52 @@ impl RedisTokenManager { Ok(all_hard_declined) } + + // Get token based on retry type + pub async fn get_token_based_on_retry_type( + state: &SessionState, + connector_customer_id: &str, + retry_algorithm_type: RevenueRecoveryAlgorithmType, + last_token_used: Option<&str>, + ) -> CustomResult, errors::StorageError> { + let mut token = None; + match retry_algorithm_type { + RevenueRecoveryAlgorithmType::Monitoring => { + logger::error!("Monitoring type found for Revenue Recovery retry payment"); + } + + RevenueRecoveryAlgorithmType::Cascading => { + token = match last_token_used { + Some(token_id) => { + Self::get_payment_processor_token_using_token_id( + state, + connector_customer_id, + token_id, + ) + .await? + } + None => None, + }; + } + + RevenueRecoveryAlgorithmType::Smart => { + token = Self::get_payment_processor_token_with_schedule_time( + state, + connector_customer_id, + ) + .await?; + } + } + + token = token.and_then(|t| { + t.is_hard_decline + .unwrap_or(false) + .then(|| { + logger::error!("Token is hard declined"); + }) + .map_or(Some(t), |_| None) + }); + + Ok(token) + } } diff --git a/crates/router/src/types/transformers.rs b/crates/router/src/types/transformers.rs index 692bcf6919..c04eb94582 100644 --- a/crates/router/src/types/transformers.rs +++ b/crates/router/src/types/transformers.rs @@ -17,6 +17,8 @@ use hyperswitch_domain_models::payments::payment_intent::CustomerData; use masking::{ExposeInterface, PeekInterface, Secret}; use super::domain; +#[cfg(feature = "v2")] +use crate::db::storage::revenue_recovery_redis_operation; use crate::{ core::errors, headers::{ @@ -2231,3 +2233,30 @@ impl ForeignFrom for storage::CardInfo { } } } + +#[cfg(feature = "v2")] +impl ForeignFrom<&revenue_recovery_redis_operation::PaymentProcessorTokenStatus> + for payments::AdditionalCardInfo +{ + fn foreign_from(value: &revenue_recovery_redis_operation::PaymentProcessorTokenStatus) -> Self { + let card_info = &value.payment_processor_token_details; + // TODO! All other card info fields needs to be populated in redis. + Self { + card_issuer: card_info.card_issuer.to_owned(), + card_network: card_info.card_network.to_owned(), + card_type: card_info.card_type.to_owned(), + card_issuing_country: None, + bank_code: None, + last4: card_info.last_four_digits.to_owned(), + card_isin: None, + card_extended_bin: None, + card_exp_month: card_info.expiry_month.to_owned(), + card_exp_year: card_info.expiry_year.to_owned(), + card_holder_name: None, + payment_checks: None, + authentication_data: None, + is_regulated: None, + signature_network: None, + } + } +} diff --git a/crates/router/src/workflows/revenue_recovery.rs b/crates/router/src/workflows/revenue_recovery.rs index dc1d97bc42..4d42be64f9 100644 --- a/crates/router/src/workflows/revenue_recovery.rs +++ b/crates/router/src/workflows/revenue_recovery.rs @@ -3,9 +3,9 @@ use std::collections::HashMap; #[cfg(feature = "v2")] use api_models::{enums::RevenueRecoveryAlgorithmType, payments::PaymentsGetIntentRequest}; +use common_utils::errors::CustomResult; #[cfg(feature = "v2")] use common_utils::{ - errors::CustomResult, ext_traits::AsyncExt, ext_traits::{StringExt, ValueExt}, id_type, @@ -28,8 +28,15 @@ use hyperswitch_domain_models::{ #[cfg(feature = "v2")] use masking::{ExposeInterface, PeekInterface, Secret}; #[cfg(feature = "v2")] -use router_env::{logger, tracing}; -use scheduler::{consumer::workflows::ProcessTrackerWorkflow, errors}; +use rand::Rng; +use router_env::{ + logger, + tracing::{self, instrument}, +}; +use scheduler::{ + consumer::{self, workflows::ProcessTrackerWorkflow}, + errors, +}; #[cfg(feature = "v2")] use scheduler::{types::process_data, utils as scheduler_utils}; #[cfg(feature = "v2")] @@ -165,7 +172,18 @@ impl ProcessTrackerWorkflow for ExecutePcrWorkflow { _ => Err(errors::ProcessTrackerError::JobNotFound), } } + #[instrument(skip_all)] + async fn error_handler<'a>( + &'a self, + state: &'a SessionState, + process: storage::ProcessTracker, + error: errors::ProcessTrackerError, + ) -> CustomResult<(), errors::ProcessTrackerError> { + logger::error!("Encountered error"); + consumer::consumer_error_handler(state.store.as_scheduler(), process, error).await + } } + #[cfg(feature = "v2")] pub(crate) async fn extract_data_and_perform_action( state: &SessionState, @@ -208,10 +226,12 @@ pub(crate) async fn extract_data_and_perform_action( let pcr_payment_data = pcr_storage_types::RevenueRecoveryPaymentData { merchant_account, - profile, + profile: profile.clone(), key_store, billing_mca, - retry_algorithm: tracking_data.revenue_recovery_retry, + retry_algorithm: profile + .revenue_recovery_retry_algorithm_type + .unwrap_or(tracking_data.revenue_recovery_retry), }; Ok(pcr_payment_data) } @@ -295,7 +315,11 @@ pub(crate) async fn get_schedule_time_for_smart_retry( let card_issuer_str = card_info.card_issuer.clone(); - let card_funding_str = card_info.card_type.clone(); + let card_funding_str = match card_info.card_type.as_deref() { + Some("card") => None, + Some(s) => Some(s.to_string()), + None => None, + }; let start_time_primitive = payment_intent.created_at; let recovery_timestamp_config = &state.conf.revenue_recovery.recovery_timestamp; @@ -490,7 +514,7 @@ pub struct ScheduledToken { } #[cfg(feature = "v2")] -pub async fn get_token_with_schedule_time_based_on_retry_alogrithm_type( +pub async fn get_token_with_schedule_time_based_on_retry_algorithm_type( state: &SessionState, connector_customer_id: &str, payment_intent: &PaymentIntent, @@ -514,33 +538,6 @@ pub async fn get_token_with_schedule_time_based_on_retry_alogrithm_type( .ok_or(errors::ProcessTrackerError::EApiErrorResponse)?; scheduled_time = Some(time); - - let token = - RedisTokenManager::get_token_with_max_retry_remaining(state, connector_customer_id) - .await - .change_context(errors::ProcessTrackerError::EApiErrorResponse)?; - - match token { - Some(token) => { - RedisTokenManager::update_payment_processor_token_schedule_time( - state, - connector_customer_id, - &token - .token_status - .payment_processor_token_details - .payment_processor_token, - scheduled_time, - ) - .await - .change_context(errors::ProcessTrackerError::EApiErrorResponse)?; - - logger::debug!("PSP token available for cascading retry"); - } - None => { - logger::debug!("No PSP token available for cascading retry"); - scheduled_time = None; - } - } } RevenueRecoveryAlgorithmType::Smart => { @@ -553,8 +550,9 @@ pub async fn get_token_with_schedule_time_based_on_retry_alogrithm_type( .change_context(errors::ProcessTrackerError::EApiErrorResponse)?; } } + let delayed_schedule_time = scheduled_time.map(add_random_delay_to_schedule_time); - Ok(scheduled_time) + Ok(delayed_schedule_time) } #[cfg(feature = "v2")] @@ -646,7 +644,7 @@ async fn process_token_for_retry( match skip { true => { logger::info!( - "Skipping decider call due to hard decline for attempt_id: {}", + "Skipping decider call due to hard decline token inserted by attempt_id: {}", inserted_by_attempt_id.get_string_repr() ); Ok(None) @@ -683,6 +681,7 @@ pub async fn call_decider_for_payment_processor_tokens_select_closet_time( None => { let utc_schedule_time = time::OffsetDateTime::now_utc() + time::Duration::minutes(1); + let schedule_time = time::PrimitiveDateTime::new( utc_schedule_time.date(), utc_schedule_time.time(), @@ -774,3 +773,13 @@ pub async fn check_hard_decline( Ok(is_hard_decline) } + +#[cfg(feature = "v2")] +pub fn add_random_delay_to_schedule_time( + schedule_time: time::PrimitiveDateTime, +) -> time::PrimitiveDateTime { + let mut rng = rand::thread_rng(); + let random_secs = rng.gen_range(1..=3600); + logger::info!("Adding random delay of {random_secs} seconds to schedule time"); + schedule_time + time::Duration::seconds(random_secs) +}