diff --git a/crates/api_models/src/payments.rs b/crates/api_models/src/payments.rs index f9540fd08f..c55ef48328 100644 --- a/crates/api_models/src/payments.rs +++ b/crates/api_models/src/payments.rs @@ -7473,6 +7473,15 @@ pub struct FeatureMetadata { pub payment_revenue_recovery_metadata: Option, } +#[cfg(feature = "v2")] +impl FeatureMetadata { + pub fn get_retry_count(&self) -> Option { + self.payment_revenue_recovery_metadata + .as_ref() + .map(|metadata| metadata.total_retry_count) + } +} + /// additional data that might be required by hyperswitch #[cfg(feature = "v1")] #[derive(Debug, Clone, serde::Deserialize, serde::Serialize, ToSchema)] @@ -8378,6 +8387,9 @@ pub struct PaymentRevenueRecoveryMetadata { /// PaymentMethod Subtype #[schema(example = "klarna", value_type = PaymentMethodType)] pub payment_method_subtype: common_enums::PaymentMethodType, + /// The name of the payment connector through which the payment attempt was made. + #[schema(value_type = Connector, example = "stripe")] + pub connector: common_enums::connector_enums::Connector, } #[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] #[cfg(feature = "v2")] diff --git a/crates/diesel_models/src/process_tracker.rs b/crates/diesel_models/src/process_tracker.rs index 64b9a66afb..294fb33c66 100644 --- a/crates/diesel_models/src/process_tracker.rs +++ b/crates/diesel_models/src/process_tracker.rs @@ -69,12 +69,14 @@ pub struct ProcessTrackerNew { } impl ProcessTrackerNew { + #[allow(clippy::too_many_arguments)] pub fn new( process_tracker_id: impl Into, task: impl Into, runner: ProcessTrackerRunner, tag: impl IntoIterator>, tracking_data: T, + retry_count: Option, schedule_time: PrimitiveDateTime, api_version: ApiVersion, ) -> StorageResult @@ -87,7 +89,7 @@ impl ProcessTrackerNew { name: Some(task.into()), tag: tag.into_iter().map(Into::into).collect(), runner: Some(runner.to_string()), - retry_count: 0, + retry_count: retry_count.unwrap_or(0), schedule_time: Some(schedule_time), rule: String::new(), tracking_data: tracking_data diff --git a/crates/diesel_models/src/types.rs b/crates/diesel_models/src/types.rs index b79974c33f..b1fa219703 100644 --- a/crates/diesel_models/src/types.rs +++ b/crates/diesel_models/src/types.rs @@ -159,6 +159,8 @@ pub struct PaymentRevenueRecoveryMetadata { pub payment_method_type: common_enums::enums::PaymentMethod, /// PaymentMethod Subtype pub payment_method_subtype: common_enums::enums::PaymentMethodType, + /// The name of the payment connector through which the payment attempt was made. + pub connector: common_enums::connector_enums::Connector, } #[derive(Debug, Clone, PartialEq, Deserialize, Serialize)] diff --git a/crates/hyperswitch_domain_models/src/lib.rs b/crates/hyperswitch_domain_models/src/lib.rs index fac251d8b6..76e06ab9b4 100644 --- a/crates/hyperswitch_domain_models/src/lib.rs +++ b/crates/hyperswitch_domain_models/src/lib.rs @@ -274,6 +274,7 @@ impl ApiModelToDieselModelConvertor for PaymentReven ), payment_method_type: from.payment_method_type, payment_method_subtype: from.payment_method_subtype, + connector: from.connector, } } @@ -288,6 +289,7 @@ impl ApiModelToDieselModelConvertor for PaymentReven .convert_back(), payment_method_type: self.payment_method_type, payment_method_subtype: self.payment_method_subtype, + connector: self.connector, } } } diff --git a/crates/hyperswitch_domain_models/src/payments.rs b/crates/hyperswitch_domain_models/src/payments.rs index 36cc1c5f80..9921cee8a6 100644 --- a/crates/hyperswitch_domain_models/src/payments.rs +++ b/crates/hyperswitch_domain_models/src/payments.rs @@ -424,24 +424,6 @@ impl PaymentIntent { .and_then(|metadata| metadata.get_payment_method_type()) } - pub fn set_payment_connector_transmission( - &self, - feature_metadata: Option, - status: bool, - ) -> Option> { - feature_metadata.map(|fm| { - let mut updated_metadata = fm; - if let Some(ref mut rrm) = updated_metadata.payment_revenue_recovery_metadata { - rrm.payment_connector_transmission = if status { - common_enums::PaymentConnectorTransmission::ConnectorCallFailed - } else { - common_enums::PaymentConnectorTransmission::ConnectorCallSucceeded - }; - } - Box::new(updated_metadata) - }) - } - pub fn get_connector_customer_id_from_feature_metadata(&self) -> Option { self.feature_metadata .as_ref() @@ -774,11 +756,13 @@ where let payment_intent_feature_metadata = self.payment_intent.get_feature_metadata(); let revenue_recovery = self.payment_intent.get_revenue_recovery_metadata(); - - let payment_revenue_recovery_metadata = - Some(diesel_models::types::PaymentRevenueRecoveryMetadata { + let payment_attempt_connector = self.payment_attempt.connector.clone(); + let payment_revenue_recovery_metadata = match payment_attempt_connector { + Some(connector) => Some(diesel_models::types::PaymentRevenueRecoveryMetadata { // Update retry count by one. - total_retry_count: revenue_recovery.map_or(1, |data| (data.total_retry_count + 1)), + total_retry_count: revenue_recovery + .as_ref() + .map_or(1, |data| (data.total_retry_count + 1)), // Since this is an external system call, marking this payment_connector_transmission to ConnectorCallSucceeded. payment_connector_transmission: common_enums::PaymentConnectorTransmission::ConnectorCallSucceeded, @@ -799,7 +783,14 @@ where }, payment_method_type: self.payment_attempt.payment_method_type, payment_method_subtype: self.payment_attempt.payment_method_subtype, - }); + connector: connector.parse().map_err(|err| { + router_env::logger::error!(?err, "Failed to parse connector string to enum"); + errors::api_error_response::ApiErrorResponse::InternalServerError + })?, + }), + None => Err(errors::api_error_response::ApiErrorResponse::InternalServerError) + .attach_printable("Connector not found in payment attempt")?, + }; Ok(Some(FeatureMetadata { redirect_response: payment_intent_feature_metadata .as_ref() diff --git a/crates/hyperswitch_domain_models/src/revenue_recovery.rs b/crates/hyperswitch_domain_models/src/revenue_recovery.rs index 8adf5676f9..3e1bc93d91 100644 --- a/crates/hyperswitch_domain_models/src/revenue_recovery.rs +++ b/crates/hyperswitch_domain_models/src/revenue_recovery.rs @@ -61,7 +61,6 @@ pub enum RecoveryAction { /// Invalid event has been received. InvalidAction, } - pub struct RecoveryPaymentIntent { pub payment_id: id_type::GlobalPaymentId, pub status: common_enums::IntentStatus, @@ -75,10 +74,11 @@ pub struct RecoveryPaymentAttempt { } impl RecoveryPaymentAttempt { - pub fn get_attempt_triggered_by(self) -> Option { - self.feature_metadata.and_then(|metadata| { + pub fn get_attempt_triggered_by(&self) -> Option { + self.feature_metadata.as_ref().and_then(|metadata| { metadata .revenue_recovery + .as_ref() .map(|recovery| recovery.attempt_triggered_by) }) } diff --git a/crates/hyperswitch_domain_models/src/router_data.rs b/crates/hyperswitch_domain_models/src/router_data.rs index ae17e69710..7eb8a8091a 100644 --- a/crates/hyperswitch_domain_models/src/router_data.rs +++ b/crates/hyperswitch_domain_models/src/router_data.rs @@ -475,12 +475,24 @@ impl ) -> PaymentIntentUpdate { let amount_captured = self.get_captured_amount(payment_data); let status = payment_data.payment_attempt.status.is_terminal_status(); - let updated_feature_metadata = payment_data - .payment_intent - .set_payment_connector_transmission( - payment_data.payment_intent.feature_metadata.clone(), - status, - ); + let updated_feature_metadata = + payment_data + .payment_intent + .feature_metadata + .clone() + .map(|mut feature_metadata| { + if let Some(ref mut payment_revenue_recovery_metadata) = + feature_metadata.payment_revenue_recovery_metadata + { + payment_revenue_recovery_metadata.payment_connector_transmission = + if self.response.is_ok() { + common_enums::PaymentConnectorTransmission::ConnectorCallSucceeded + } else { + common_enums::PaymentConnectorTransmission::ConnectorCallFailed + }; + } + Box::new(feature_metadata) + }); match self.response { Ok(ref _response) => PaymentIntentUpdate::ConfirmIntentPostUpdate { diff --git a/crates/router/src/core/api_keys.rs b/crates/router/src/core/api_keys.rs index fc87a490b6..bb970a4956 100644 --- a/crates/router/src/core/api_keys.rs +++ b/crates/router/src/core/api_keys.rs @@ -228,6 +228,7 @@ pub async fn add_api_key_expiry_task( API_KEY_EXPIRY_RUNNER, [API_KEY_EXPIRY_TAG], api_key_expiry_tracker, + None, schedule_time, hyperswitch_domain_models::consts::API_VERSION, ) diff --git a/crates/router/src/core/errors.rs b/crates/router/src/core/errors.rs index 64b2b81725..5e1ef76056 100644 --- a/crates/router/src/core/errors.rs +++ b/crates/router/src/core/errors.rs @@ -481,6 +481,8 @@ pub enum RevenueRecoveryError { PaymentIntentFetchFailed, #[error("Failed to fetch payment attempt")] PaymentAttemptFetchFailed, + #[error("Failed to fetch payment attempt")] + PaymentAttemptIdNotFound, #[error("Failed to get revenue recovery invoice webhook")] InvoiceWebhookProcessingFailed, #[error("Failed to get revenue recovery invoice transaction")] @@ -491,4 +493,10 @@ pub enum RevenueRecoveryError { WebhookAuthenticationFailed, #[error("Payment merchant connector account not found using account reference id")] PaymentMerchantConnectorAccountNotFound, + #[error("Failed to fetch primitive date_time")] + ScheduleTimeFetchFailed, + #[error("Failed to create process tracker")] + ProcessTrackerCreationError, + #[error("Failed to get the response from process tracker")] + ProcessTrackerResponseError, } diff --git a/crates/router/src/core/passive_churn_recovery.rs b/crates/router/src/core/passive_churn_recovery.rs index 958baeee3f..3b4ca0d7af 100644 --- a/crates/router/src/core/passive_churn_recovery.rs +++ b/crates/router/src/core/passive_churn_recovery.rs @@ -142,6 +142,7 @@ async fn insert_psync_pcr_task( runner, tag, psync_workflow_tracking_data, + None, schedule_time, hyperswitch_domain_models::consts::API_VERSION, ) diff --git a/crates/router/src/core/payment_methods.rs b/crates/router/src/core/payment_methods.rs index 5893fbc685..eec47ca625 100644 --- a/crates/router/src/core/payment_methods.rs +++ b/crates/router/src/core/payment_methods.rs @@ -493,6 +493,7 @@ pub async fn add_payment_method_status_update_task( runner, tag, tracking_data, + None, schedule_time, hyperswitch_domain_models::consts::API_VERSION, ) diff --git a/crates/router/src/core/payment_methods/vault.rs b/crates/router/src/core/payment_methods/vault.rs index 0325acd599..8181df8c05 100644 --- a/crates/router/src/core/payment_methods/vault.rs +++ b/crates/router/src/core/payment_methods/vault.rs @@ -1399,6 +1399,7 @@ pub async fn add_delete_tokenized_data_task( runner, tag, tracking_data, + None, schedule_time, hyperswitch_domain_models::consts::API_VERSION, ) diff --git a/crates/router/src/core/payments.rs b/crates/router/src/core/payments.rs index d072463020..0ab33dedf6 100644 --- a/crates/router/src/core/payments.rs +++ b/crates/router/src/core/payments.rs @@ -5941,6 +5941,7 @@ pub async fn add_process_sync_task( runner, tag, tracking_data, + None, schedule_time, hyperswitch_domain_models::consts::API_VERSION, ) diff --git a/crates/router/src/core/payments/operations/proxy_payments_intent.rs b/crates/router/src/core/payments/operations/proxy_payments_intent.rs index c029f1439e..bdf20635df 100644 --- a/crates/router/src/core/payments/operations/proxy_payments_intent.rs +++ b/crates/router/src/core/payments/operations/proxy_payments_intent.rs @@ -160,12 +160,6 @@ impl GetTracker, ProxyPaymentsR self.validate_status_for_operation(payment_intent.status)?; - let client_secret = header_payload - .client_secret - .as_ref() - .get_required_value("client_secret header")?; - payment_intent.validate_client_secret(client_secret)?; - let cell_id = state.conf.cell_information.id.clone(); let batch_encrypted_data = domain_types::crypto_operation( diff --git a/crates/router/src/core/payouts.rs b/crates/router/src/core/payouts.rs index f1c8d29ca6..21a560e781 100644 --- a/crates/router/src/core/payouts.rs +++ b/crates/router/src/core/payouts.rs @@ -3092,6 +3092,7 @@ pub async fn add_external_account_addition_task( runner, tag, tracking_data, + None, schedule_time, hyperswitch_domain_models::consts::API_VERSION, ) diff --git a/crates/router/src/core/refunds.rs b/crates/router/src/core/refunds.rs index 1057f47b4c..b850420dfd 100644 --- a/crates/router/src/core/refunds.rs +++ b/crates/router/src/core/refunds.rs @@ -1605,6 +1605,7 @@ pub async fn add_refund_sync_task( runner, tag, refund_workflow_tracking_data, + None, schedule_time, hyperswitch_domain_models::consts::API_VERSION, ) @@ -1643,6 +1644,7 @@ pub async fn add_refund_execute_task( runner, tag, refund_workflow_tracking_data, + None, schedule_time, hyperswitch_domain_models::consts::API_VERSION, ) diff --git a/crates/router/src/core/webhooks/outgoing.rs b/crates/router/src/core/webhooks/outgoing.rs index 595171114f..92c29282b2 100644 --- a/crates/router/src/core/webhooks/outgoing.rs +++ b/crates/router/src/core/webhooks/outgoing.rs @@ -553,6 +553,7 @@ pub(crate) async fn add_outgoing_webhook_retry_task_to_process_tracker( runner, tag, tracking_data, + None, schedule_time, hyperswitch_domain_models::consts::API_VERSION, ) diff --git a/crates/router/src/core/webhooks/recovery_incoming.rs b/crates/router/src/core/webhooks/recovery_incoming.rs index f27ee47d6a..60e06ab078 100644 --- a/crates/router/src/core/webhooks/recovery_incoming.rs +++ b/crates/router/src/core/webhooks/recovery_incoming.rs @@ -1,18 +1,22 @@ use api_models::{payments as api_payments, webhooks}; -use common_utils::ext_traits::AsyncExt; +use common_utils::{ext_traits::AsyncExt, id_type}; +use diesel_models::{process_tracker as storage, schema::process_tracker::retry_count}; use error_stack::{report, ResultExt}; -use hyperswitch_domain_models::revenue_recovery; +use hyperswitch_domain_models::{errors::api_error_response, revenue_recovery}; use hyperswitch_interfaces::webhooks as interface_webhooks; use router_env::{instrument, tracing}; +use serde_with::rust::unwrap_or_skip; use crate::{ core::{ errors::{self, CustomResult}, payments, }, - routes::{app::ReqState, SessionState}, + db::StorageInterface, + routes::{app::ReqState, metrics, SessionState}, services::{self, connector_integration_interface}, - types::{api, domain}, + types::{api, domain, storage::passive_churn_recovery as storage_churn_recovery}, + workflows::passive_churn_recovery_workflow, }; #[allow(clippy::too_many_arguments)] @@ -122,6 +126,7 @@ pub async fn recovery_incoming_webhook_flow( }; let attempt_triggered_by = payment_attempt + .as_ref() .and_then(revenue_recovery::RecoveryPaymentAttempt::get_attempt_triggered_by); let action = revenue_recovery::RecoveryAction::get_action(event_type, attempt_triggered_by); @@ -129,10 +134,21 @@ pub async fn recovery_incoming_webhook_flow( match action { revenue_recovery::RecoveryAction::CancelInvoice => todo!(), revenue_recovery::RecoveryAction::ScheduleFailedPayment => { - todo!() + Ok(RevenueRecoveryAttempt::insert_execute_pcr_task( + &*state.store, + merchant_account.get_id().to_owned(), + payment_intent, + business_profile.get_id().to_owned(), + payment_attempt.map(|attempt| attempt.attempt_id.clone()), + storage::ProcessTrackerRunner::PassiveRecoveryWorkflow, + ) + .await + .change_context(errors::RevenueRecoveryError::InvoiceWebhookProcessingFailed)?) } revenue_recovery::RecoveryAction::SuccessPaymentExternal => { - todo!() + // Need to add recovery stop flow for this scenario + router_env::logger::info!("Payment has been succeeded via external system"); + Ok(webhooks::WebhookResponseTracker::NoEffect) } revenue_recovery::RecoveryAction::PendingPayment => { router_env::logger::info!( @@ -217,8 +233,7 @@ impl RevenueRecoveryInvoice { key_store: &domain::MerchantKeyStore, ) -> CustomResult { let payload = api_payments::PaymentsCreateIntentRequest::from(&self.0); - let global_payment_id = - common_utils::id_type::GlobalPaymentId::generate(&state.conf.cell_information.id); + let global_payment_id = id_type::GlobalPaymentId::generate(&state.conf.cell_information.id); let create_intent_response = Box::pin(payments::payments_intent_core::< hyperswitch_domain_models::router_flow_types::payments::PaymentCreateIntent, @@ -264,7 +279,7 @@ impl RevenueRecoveryAttempt { merchant_account: &domain::MerchantAccount, profile: &domain::Profile, key_store: &domain::MerchantKeyStore, - payment_id: common_utils::id_type::GlobalPaymentId, + payment_id: id_type::GlobalPaymentId, ) -> CustomResult, errors::RevenueRecoveryError> { let attempt_response = Box::pin(payments::payments_core::< @@ -332,8 +347,8 @@ impl RevenueRecoveryAttempt { merchant_account: &domain::MerchantAccount, profile: &domain::Profile, key_store: &domain::MerchantKeyStore, - payment_id: common_utils::id_type::GlobalPaymentId, - billing_connector_account_id: &common_utils::id_type::MerchantConnectorAccountId, + payment_id: id_type::GlobalPaymentId, + billing_connector_account_id: &id_type::MerchantConnectorAccountId, payment_connector_account: Option, ) -> CustomResult { let request_payload = self @@ -372,7 +387,7 @@ impl RevenueRecoveryAttempt { pub fn create_payment_record_request( &self, - billing_merchant_connector_account_id: &common_utils::id_type::MerchantConnectorAccountId, + billing_merchant_connector_account_id: &id_type::MerchantConnectorAccountId, payment_merchant_connector_account: Option, ) -> api_payments::PaymentsAttemptRecordRequest { let amount_details = api_payments::PaymentAttemptAmountDetails::from(&self.0); @@ -431,4 +446,80 @@ impl RevenueRecoveryAttempt { )?; Ok(payment_merchant_connector_account) } + + async fn insert_execute_pcr_task( + db: &dyn StorageInterface, + merchant_id: id_type::MerchantId, + payment_intent: revenue_recovery::RecoveryPaymentIntent, + profile_id: id_type::ProfileId, + payment_attempt_id: Option, + runner: storage::ProcessTrackerRunner, + ) -> CustomResult { + let task = "EXECUTE_WORKFLOW"; + + let payment_id = payment_intent.payment_id.clone(); + + let process_tracker_id = format!("{runner}_{task}_{}", payment_id.get_string_repr()); + + let total_retry_count = payment_intent + .feature_metadata + .and_then(|feature_metadata| feature_metadata.get_retry_count()) + .unwrap_or(0); + + let schedule_time = + passive_churn_recovery_workflow::get_schedule_time_to_retry_mit_payments( + db, + &merchant_id, + (total_retry_count + 1).into(), + ) + .await + .map_or_else( + || { + Err( + report!(errors::RevenueRecoveryError::ScheduleTimeFetchFailed) + .attach_printable("Failed to get schedule time for pcr workflow"), + ) + }, + Ok, // Simply returns `time` wrapped in `Ok` + )?; + + let payment_attempt_id = payment_attempt_id + .ok_or(report!( + errors::RevenueRecoveryError::PaymentAttemptIdNotFound + )) + .attach_printable("payment attempt id is required for pcr workflow tracking")?; + + let execute_workflow_tracking_data = storage_churn_recovery::PcrWorkflowTrackingData { + global_payment_id: payment_id.clone(), + merchant_id, + profile_id, + payment_attempt_id, + }; + + let tag = ["PCR"]; + + let process_tracker_entry = storage::ProcessTrackerNew::new( + process_tracker_id, + task, + runner, + tag, + execute_workflow_tracking_data, + Some(total_retry_count.into()), + schedule_time, + common_enums::ApiVersion::V2, + ) + .change_context(errors::RevenueRecoveryError::ProcessTrackerCreationError) + .attach_printable("Failed to construct process tracker entry")?; + + db.insert_process(process_tracker_entry) + .await + .change_context(errors::RevenueRecoveryError::ProcessTrackerResponseError) + .attach_printable("Failed to enter process_tracker_entry in DB")?; + metrics::TASKS_ADDED_COUNT.add(1, router_env::metric_attributes!(("flow", "ExecutePCR"))); + + Ok(webhooks::WebhookResponseTracker::Payment { + payment_id, + status: payment_intent.status, + }) + } }