diff --git a/crates/api_models/src/payments.rs b/crates/api_models/src/payments.rs index c94193831f..59ea6e97fb 100644 --- a/crates/api_models/src/payments.rs +++ b/crates/api_models/src/payments.rs @@ -325,7 +325,7 @@ impl PaymentsCreateIntentRequest { } // This struct is only used internally, not visible in API Reference -#[derive(Debug, Clone, serde::Serialize)] +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] #[cfg(feature = "v2")] pub struct PaymentsGetIntentRequest { pub id: id_type::GlobalPaymentId, diff --git a/crates/diesel_models/src/enums.rs b/crates/diesel_models/src/enums.rs index 1ee1a090ff..7fda63fb6a 100644 --- a/crates/diesel_models/src/enums.rs +++ b/crates/diesel_models/src/enums.rs @@ -103,6 +103,8 @@ pub enum ProcessTrackerStatus { ProcessStarted, // Finished by consumer Finish, + // Review the task + Review, } // Refund diff --git a/crates/diesel_models/src/process_tracker.rs b/crates/diesel_models/src/process_tracker.rs index cd2c429c55..f998ca5d23 100644 --- a/crates/diesel_models/src/process_tracker.rs +++ b/crates/diesel_models/src/process_tracker.rs @@ -210,6 +210,7 @@ pub enum ProcessTrackerRunner { OutgoingWebhookRetryWorkflow, AttachPayoutAccountWorkflow, PaymentMethodStatusUpdateWorkflow, + PassiveRecoveryWorkflow, } #[cfg(test)] @@ -265,4 +266,19 @@ pub mod business_status { /// Business status set for newly created tasks. pub const PENDING: &str = "Pending"; + + /// For the PCR Workflow + /// + /// This status indicates the completion of a execute task + pub const EXECUTE_WORKFLOW_COMPLETE: &str = "COMPLETED_EXECUTE_TASK"; + + /// This status indicates that the execute task was completed to trigger the psync task + pub const EXECUTE_WORKFLOW_COMPLETE_FOR_PSYNC: &str = "COMPLETED_EXECUTE_TASK_TO_TRIGGER_PSYNC"; + + /// This status indicates that the execute task was completed to trigger the review task + pub const EXECUTE_WORKFLOW_COMPLETE_FOR_REVIEW: &str = + "COMPLETED_EXECUTE_TASK_TO_TRIGGER_REVIEW"; + + /// This status indicates the completion of a psync task + pub const PSYNC_WORKFLOW_COMPLETE: &str = "COMPLETED_PSYNC_TASK"; } diff --git a/crates/hyperswitch_domain_models/src/payments/payment_intent.rs b/crates/hyperswitch_domain_models/src/payments/payment_intent.rs index cc1a7054de..fb08d7a719 100644 --- a/crates/hyperswitch_domain_models/src/payments/payment_intent.rs +++ b/crates/hyperswitch_domain_models/src/payments/payment_intent.rs @@ -318,7 +318,7 @@ pub enum PaymentIntentUpdate { /// PreUpdate tracker of ConfirmIntent ConfirmIntent { status: common_enums::IntentStatus, - active_attempt_id: id_type::GlobalAttemptId, + active_attempt_id: Option, updated_by: String, }, /// PostUpdate tracker of ConfirmIntent @@ -397,7 +397,7 @@ impl From for diesel_models::PaymentIntentUpdateInternal { updated_by, } => Self { status: Some(status), - active_attempt_id: Some(active_attempt_id), + active_attempt_id, modified_at: common_utils::date_time::now(), amount: None, amount_captured: None, diff --git a/crates/router/src/bin/scheduler.rs b/crates/router/src/bin/scheduler.rs index 63dc9a5863..a997d062bc 100644 --- a/crates/router/src/bin/scheduler.rs +++ b/crates/router/src/bin/scheduler.rs @@ -252,7 +252,6 @@ pub async fn deep_health_check_func( #[derive(Debug, Copy, Clone)] pub struct WorkflowRunner; -#[cfg(feature = "v1")] #[async_trait::async_trait] impl ProcessTrackerWorkflows for WorkflowRunner { async fn trigger_workflow<'a>( @@ -322,6 +321,9 @@ impl ProcessTrackerWorkflows for WorkflowRunner { storage::ProcessTrackerRunner::PaymentMethodStatusUpdateWorkflow => Ok(Box::new( workflows::payment_method_status_update::PaymentMethodStatusUpdateWorkflow, )), + storage::ProcessTrackerRunner::PassiveRecoveryWorkflow => Ok(Box::new( + workflows::passive_churn_recovery_workflow::ExecutePcrWorkflow, + )), } }; @@ -360,18 +362,6 @@ impl ProcessTrackerWorkflows for WorkflowRunner { } } -#[cfg(feature = "v2")] -#[async_trait::async_trait] -impl ProcessTrackerWorkflows for WorkflowRunner { - async fn trigger_workflow<'a>( - &'a self, - _state: &'a routes::SessionState, - _process: storage::ProcessTracker, - ) -> CustomResult<(), ProcessTrackerError> { - todo!() - } -} - async fn start_scheduler( state: &routes::AppState, scheduler_flow: scheduler::SchedulerFlow, diff --git a/crates/router/src/core.rs b/crates/router/src/core.rs index 365cdaf3a1..fa243d0269 100644 --- a/crates/router/src/core.rs +++ b/crates/router/src/core.rs @@ -57,4 +57,6 @@ pub mod webhooks; pub mod unified_authentication_service; +#[cfg(feature = "v2")] +pub mod passive_churn_recovery; pub mod relay; diff --git a/crates/router/src/core/passive_churn_recovery.rs b/crates/router/src/core/passive_churn_recovery.rs new file mode 100644 index 0000000000..3c77de98e7 --- /dev/null +++ b/crates/router/src/core/passive_churn_recovery.rs @@ -0,0 +1,207 @@ +pub mod transformers; +pub mod types; +use api_models::payments::PaymentsRetrieveRequest; +use common_utils::{self, id_type, types::keymanager::KeyManagerState}; +use diesel_models::process_tracker::business_status; +use error_stack::{self, ResultExt}; +use hyperswitch_domain_models::{ + errors::api_error_response, + payments::{PaymentIntent, PaymentStatusData}, +}; +use scheduler::errors; + +use crate::{ + core::{ + errors::RouterResult, + passive_churn_recovery::types as pcr_types, + payments::{self, operations::Operation}, + }, + db::StorageInterface, + logger, + routes::{metrics, SessionState}, + types::{ + api, + storage::{self, passive_churn_recovery as pcr}, + transformers::ForeignInto, + }, +}; + +pub async fn perform_execute_payment( + state: &SessionState, + execute_task_process: &storage::ProcessTracker, + tracking_data: &pcr::PcrWorkflowTrackingData, + pcr_data: &pcr::PcrPaymentData, + _key_manager_state: &KeyManagerState, + payment_intent: &PaymentIntent, +) -> Result<(), errors::ProcessTrackerError> { + let db = &*state.store; + let decision = pcr_types::Decision::get_decision_based_on_params( + state, + payment_intent.status, + false, + payment_intent.active_attempt_id.clone(), + pcr_data, + &tracking_data.global_payment_id, + ) + .await?; + // TODO decide if its a global failure or is it requeueable error + match decision { + pcr_types::Decision::Execute => { + let action = pcr_types::Action::execute_payment( + db, + pcr_data.merchant_account.get_id(), + payment_intent, + execute_task_process, + ) + .await?; + action + .execute_payment_task_response_handler( + db, + &pcr_data.merchant_account, + payment_intent, + execute_task_process, + &pcr_data.profile, + ) + .await?; + } + + pcr_types::Decision::Psync(attempt_status, attempt_id) => { + // find if a psync task is already present + let task = "PSYNC_WORKFLOW"; + let runner = storage::ProcessTrackerRunner::PassiveRecoveryWorkflow; + let process_tracker_id = format!("{runner}_{task}_{}", attempt_id.get_string_repr()); + let psync_process = db.find_process_by_id(&process_tracker_id).await?; + + match psync_process { + Some(_) => { + let pcr_status: pcr_types::PcrAttemptStatus = attempt_status.foreign_into(); + + pcr_status + .update_pt_status_based_on_attempt_status_for_execute_payment( + db, + execute_task_process, + ) + .await?; + } + + None => { + // insert new psync task + insert_psync_pcr_task( + db, + pcr_data.merchant_account.get_id().clone(), + payment_intent.get_id().clone(), + pcr_data.profile.get_id().clone(), + attempt_id.clone(), + storage::ProcessTrackerRunner::PassiveRecoveryWorkflow, + ) + .await?; + + // finish the current task + db.finish_process_with_business_status( + execute_task_process.clone(), + business_status::EXECUTE_WORKFLOW_COMPLETE_FOR_PSYNC, + ) + .await?; + } + }; + } + pcr_types::Decision::InvalidDecision => { + db.finish_process_with_business_status( + execute_task_process.clone(), + business_status::EXECUTE_WORKFLOW_COMPLETE, + ) + .await?; + logger::warn!("Abnormal State Identified") + } + } + + Ok(()) +} + +async fn insert_psync_pcr_task( + db: &dyn StorageInterface, + merchant_id: id_type::MerchantId, + payment_id: id_type::GlobalPaymentId, + profile_id: id_type::ProfileId, + payment_attempt_id: id_type::GlobalAttemptId, + runner: storage::ProcessTrackerRunner, +) -> RouterResult { + let task = "PSYNC_WORKFLOW"; + let process_tracker_id = format!("{runner}_{task}_{}", payment_attempt_id.get_string_repr()); + let schedule_time = common_utils::date_time::now(); + let psync_workflow_tracking_data = pcr::PcrWorkflowTrackingData { + global_payment_id: payment_id, + merchant_id, + profile_id, + payment_attempt_id, + }; + let tag = ["PCR"]; + let process_tracker_entry = storage::ProcessTrackerNew::new( + process_tracker_id, + task, + runner, + tag, + psync_workflow_tracking_data, + schedule_time, + ) + .change_context(api_error_response::ApiErrorResponse::InternalServerError) + .attach_printable("Failed to construct delete tokenized data process tracker task")?; + + let response = db + .insert_process(process_tracker_entry) + .await + .change_context(api_error_response::ApiErrorResponse::InternalServerError) + .attach_printable("Failed to construct delete tokenized data process tracker task")?; + metrics::TASKS_ADDED_COUNT.add(1, router_env::metric_attributes!(("flow", "PsyncPcr"))); + + Ok(response) +} + +pub async fn call_psync_api( + state: &SessionState, + global_payment_id: &id_type::GlobalPaymentId, + pcr_data: &pcr::PcrPaymentData, +) -> RouterResult> { + let operation = payments::operations::PaymentGet; + let req = PaymentsRetrieveRequest { + force_sync: false, + param: None, + expand_attempts: true, + }; + // TODO : Use api handler instead of calling get_tracker and payments_operation_core + // Get the tracker related information. This includes payment intent and payment attempt + let get_tracker_response = operation + .to_get_tracker()? + .get_trackers( + state, + global_payment_id, + &req, + &pcr_data.merchant_account, + &pcr_data.profile, + &pcr_data.key_store, + &hyperswitch_domain_models::payments::HeaderPayload::default(), + None, + ) + .await?; + + let (payment_data, _req, _, _, _) = Box::pin(payments::payments_operation_core::< + api::PSync, + _, + _, + _, + PaymentStatusData, + >( + state, + state.get_req_state(), + pcr_data.merchant_account.clone(), + pcr_data.key_store.clone(), + &pcr_data.profile, + operation, + req, + get_tracker_response, + payments::CallConnectorAction::Trigger, + hyperswitch_domain_models::payments::HeaderPayload::default(), + )) + .await?; + Ok(payment_data) +} diff --git a/crates/router/src/core/passive_churn_recovery/transformers.rs b/crates/router/src/core/passive_churn_recovery/transformers.rs new file mode 100644 index 0000000000..ce47714353 --- /dev/null +++ b/crates/router/src/core/passive_churn_recovery/transformers.rs @@ -0,0 +1,39 @@ +use common_enums::AttemptStatus; + +use crate::{ + core::passive_churn_recovery::types::PcrAttemptStatus, types::transformers::ForeignFrom, +}; + +impl ForeignFrom for PcrAttemptStatus { + fn foreign_from(s: AttemptStatus) -> Self { + match s { + AttemptStatus::Authorized | AttemptStatus::Charged | AttemptStatus::AutoRefunded => { + Self::Succeeded + } + + AttemptStatus::Started + | AttemptStatus::AuthenticationSuccessful + | AttemptStatus::Authorizing + | AttemptStatus::CodInitiated + | AttemptStatus::VoidInitiated + | AttemptStatus::CaptureInitiated + | AttemptStatus::Pending => Self::Processing, + + AttemptStatus::AuthenticationFailed + | AttemptStatus::AuthorizationFailed + | AttemptStatus::VoidFailed + | AttemptStatus::RouterDeclined + | AttemptStatus::CaptureFailed + | AttemptStatus::Failure => Self::Failed, + + AttemptStatus::Voided + | AttemptStatus::ConfirmationAwaited + | AttemptStatus::PartialCharged + | AttemptStatus::PartialChargedAndChargeable + | AttemptStatus::PaymentMethodAwaited + | AttemptStatus::AuthenticationPending + | AttemptStatus::DeviceDataCollectionPending + | AttemptStatus::Unresolved => Self::InvalidStatus(s.to_string()), + } + } +} diff --git a/crates/router/src/core/passive_churn_recovery/types.rs b/crates/router/src/core/passive_churn_recovery/types.rs new file mode 100644 index 0000000000..c47248a882 --- /dev/null +++ b/crates/router/src/core/passive_churn_recovery/types.rs @@ -0,0 +1,259 @@ +use common_enums::{self, AttemptStatus, IntentStatus}; +use common_utils::{self, ext_traits::OptionExt, id_type}; +use diesel_models::{enums, process_tracker::business_status}; +use error_stack::{self, ResultExt}; +use hyperswitch_domain_models::{ + business_profile, merchant_account, + payments::{PaymentConfirmData, PaymentIntent}, +}; +use time::PrimitiveDateTime; + +use crate::{ + core::{ + errors::{self, RouterResult}, + passive_churn_recovery::{self as core_pcr}, + }, + db::StorageInterface, + logger, + routes::SessionState, + types::{api::payments as api_types, storage, transformers::ForeignInto}, + workflows::passive_churn_recovery_workflow::get_schedule_time_to_retry_mit_payments, +}; + +type RecoveryResult = error_stack::Result; + +/// The status of Passive Churn Payments +#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] +pub enum PcrAttemptStatus { + Succeeded, + Failed, + Processing, + InvalidStatus(String), + // Cancelled, +} + +impl PcrAttemptStatus { + pub(crate) async fn update_pt_status_based_on_attempt_status_for_execute_payment( + &self, + db: &dyn StorageInterface, + execute_task_process: &storage::ProcessTracker, + ) -> Result<(), errors::ProcessTrackerError> { + match &self { + Self::Succeeded | Self::Failed | Self::Processing => { + // finish the current execute task + db.finish_process_with_business_status( + execute_task_process.clone(), + business_status::EXECUTE_WORKFLOW_COMPLETE_FOR_PSYNC, + ) + .await?; + } + + Self::InvalidStatus(action) => { + logger::debug!( + "Invalid Attempt Status for the Recovery Payment : {}", + action + ); + let pt_update = storage::ProcessTrackerUpdate::StatusUpdate { + status: enums::ProcessTrackerStatus::Review, + business_status: Some(String::from(business_status::EXECUTE_WORKFLOW_COMPLETE)), + }; + // update the process tracker status as Review + db.update_process(execute_task_process.clone(), pt_update) + .await?; + } + }; + Ok(()) + } +} +#[derive(Debug, Clone)] +pub enum Decision { + Execute, + Psync(AttemptStatus, id_type::GlobalAttemptId), + InvalidDecision, +} + +impl Decision { + pub async fn get_decision_based_on_params( + state: &SessionState, + intent_status: IntentStatus, + called_connector: bool, + active_attempt_id: Option, + pcr_data: &storage::passive_churn_recovery::PcrPaymentData, + payment_id: &id_type::GlobalPaymentId, + ) -> RecoveryResult { + Ok(match (intent_status, called_connector, active_attempt_id) { + (IntentStatus::Failed, false, None) => Self::Execute, + (IntentStatus::Processing, true, Some(_)) => { + let psync_data = core_pcr::call_psync_api(state, payment_id, pcr_data) + .await + .change_context(errors::RecoveryError::PaymentCallFailed) + .attach_printable("Error while executing the Psync call")?; + let payment_attempt = psync_data + .payment_attempt + .get_required_value("Payment Attempt") + .change_context(errors::RecoveryError::ValueNotFound) + .attach_printable("Error while executing the Psync call")?; + Self::Psync(payment_attempt.status, payment_attempt.get_id().clone()) + } + _ => Self::InvalidDecision, + }) + } +} + +#[derive(Debug, Clone)] +pub enum Action { + SyncPayment(id_type::GlobalAttemptId), + RetryPayment(PrimitiveDateTime), + TerminalFailure, + SuccessfulPayment, + ReviewPayment, + ManualReviewAction, +} +impl Action { + pub async fn execute_payment( + db: &dyn StorageInterface, + merchant_id: &id_type::MerchantId, + payment_intent: &PaymentIntent, + process: &storage::ProcessTracker, + ) -> RecoveryResult { + // call the proxy api + let response = call_proxy_api::(payment_intent); + // handle proxy api's response + match response { + Ok(payment_data) => match payment_data.payment_attempt.status.foreign_into() { + PcrAttemptStatus::Succeeded => Ok(Self::SuccessfulPayment), + PcrAttemptStatus::Failed => { + Self::decide_retry_failure_action(db, merchant_id, process.clone()).await + } + + PcrAttemptStatus::Processing => { + Ok(Self::SyncPayment(payment_data.payment_attempt.id)) + } + PcrAttemptStatus::InvalidStatus(action) => { + logger::info!(?action, "Invalid Payment Status For PCR Payment"); + Ok(Self::ManualReviewAction) + } + }, + Err(err) => + // check for an active attempt being constructed or not + { + logger::error!(execute_payment_res=?err); + match payment_intent.active_attempt_id.clone() { + Some(attempt_id) => Ok(Self::SyncPayment(attempt_id)), + None => Ok(Self::ReviewPayment), + } + } + } + } + + pub async fn execute_payment_task_response_handler( + &self, + db: &dyn StorageInterface, + merchant_account: &merchant_account::MerchantAccount, + payment_intent: &PaymentIntent, + execute_task_process: &storage::ProcessTracker, + profile: &business_profile::Profile, + ) -> Result<(), errors::ProcessTrackerError> { + match self { + Self::SyncPayment(attempt_id) => { + core_pcr::insert_psync_pcr_task( + db, + merchant_account.get_id().to_owned(), + payment_intent.id.clone(), + profile.get_id().to_owned(), + attempt_id.clone(), + storage::ProcessTrackerRunner::PassiveRecoveryWorkflow, + ) + .await + .change_context(errors::RecoveryError::ProcessTrackerFailure) + .attach_printable("Failed to create a psync workflow in the process tracker")?; + + db.as_scheduler() + .finish_process_with_business_status( + execute_task_process.clone(), + business_status::EXECUTE_WORKFLOW_COMPLETE_FOR_PSYNC, + ) + .await + .change_context(errors::RecoveryError::ProcessTrackerFailure) + .attach_printable("Failed to update the process tracker")?; + Ok(()) + } + + Self::RetryPayment(schedule_time) => { + let mut pt = execute_task_process.clone(); + // update the schedule time + pt.schedule_time = Some(*schedule_time); + + let pt_task_update = diesel_models::ProcessTrackerUpdate::StatusUpdate { + status: storage::enums::ProcessTrackerStatus::Pending, + business_status: Some(business_status::PENDING.to_owned()), + }; + db.as_scheduler() + .update_process(pt.clone(), pt_task_update) + .await?; + // TODO: update the connector called field and make the active attempt None + + Ok(()) + } + Self::TerminalFailure => { + // TODO: Record a failure transaction back to Billing Connector + Ok(()) + } + Self::SuccessfulPayment => Ok(()), + Self::ReviewPayment => Ok(()), + Self::ManualReviewAction => { + logger::debug!("Invalid Payment Status For PCR Payment"); + let pt_update = storage::ProcessTrackerUpdate::StatusUpdate { + status: enums::ProcessTrackerStatus::Review, + business_status: Some(String::from(business_status::EXECUTE_WORKFLOW_COMPLETE)), + }; + // update the process tracker status as Review + db.as_scheduler() + .update_process(execute_task_process.clone(), pt_update) + .await?; + Ok(()) + } + } + } + + pub(crate) async fn decide_retry_failure_action( + db: &dyn StorageInterface, + merchant_id: &id_type::MerchantId, + pt: storage::ProcessTracker, + ) -> RecoveryResult { + let schedule_time = + get_schedule_time_to_retry_mit_payments(db, merchant_id, pt.retry_count + 1).await; + match schedule_time { + Some(schedule_time) => Ok(Self::RetryPayment(schedule_time)), + + None => Ok(Self::TerminalFailure), + } + } +} + +// This function would be converted to proxy_payments_core +fn call_proxy_api(payment_intent: &PaymentIntent) -> RouterResult> +where + F: Send + Clone + Sync, +{ + let payment_address = hyperswitch_domain_models::payment_address::PaymentAddress::new( + payment_intent + .shipping_address + .clone() + .map(|address| address.into_inner()), + payment_intent + .billing_address + .clone() + .map(|address| address.into_inner()), + None, + Some(true), + ); + let response = PaymentConfirmData { + flow: std::marker::PhantomData, + payment_intent: payment_intent.clone(), + payment_attempt: todo!(), + payment_method_data: None, + payment_address, + }; + Ok(response) +} diff --git a/crates/router/src/core/payments/operations/payment_confirm_intent.rs b/crates/router/src/core/payments/operations/payment_confirm_intent.rs index 0f42948e3f..40b1918323 100644 --- a/crates/router/src/core/payments/operations/payment_confirm_intent.rs +++ b/crates/router/src/core/payments/operations/payment_confirm_intent.rs @@ -210,8 +210,8 @@ impl GetTracker, PaymentsConfir ) .await?; - let payment_attempt = db - .insert_payment_attempt( + let payment_attempt: hyperswitch_domain_models::payments::payment_attempt::PaymentAttempt = + db.insert_payment_attempt( key_manager_state, key_store, payment_attempt_domain_model, @@ -416,7 +416,7 @@ impl UpdateTracker, PaymentsConfirmInt hyperswitch_domain_models::payments::payment_intent::PaymentIntentUpdate::ConfirmIntent { status: intent_status, updated_by: storage_scheme.to_string(), - active_attempt_id: payment_data.payment_attempt.id.clone(), + active_attempt_id: Some(payment_data.payment_attempt.id.clone()), }; let authentication_type = payment_data.payment_attempt.authentication_type; diff --git a/crates/router/src/types/storage.rs b/crates/router/src/types/storage.rs index 96a0d58005..bd033ff4c2 100644 --- a/crates/router/src/types/storage.rs +++ b/crates/router/src/types/storage.rs @@ -28,6 +28,8 @@ pub mod mandate; pub mod merchant_account; pub mod merchant_connector_account; pub mod merchant_key_store; +#[cfg(feature = "v2")] +pub mod passive_churn_recovery; pub mod payment_attempt; pub mod payment_link; pub mod payment_method; diff --git a/crates/router/src/types/storage/passive_churn_recovery.rs b/crates/router/src/types/storage/passive_churn_recovery.rs new file mode 100644 index 0000000000..3cf3316a39 --- /dev/null +++ b/crates/router/src/types/storage/passive_churn_recovery.rs @@ -0,0 +1,18 @@ +use std::fmt::Debug; + +use common_utils::id_type; +use hyperswitch_domain_models::{business_profile, merchant_account, merchant_key_store}; +#[derive(serde::Serialize, serde::Deserialize, Debug)] +pub struct PcrWorkflowTrackingData { + pub merchant_id: id_type::MerchantId, + pub profile_id: id_type::ProfileId, + pub global_payment_id: id_type::GlobalPaymentId, + pub payment_attempt_id: id_type::GlobalAttemptId, +} + +#[derive(Debug, Clone)] +pub struct PcrPaymentData { + pub merchant_account: merchant_account::MerchantAccount, + pub profile: business_profile::Profile, + pub key_store: merchant_key_store::MerchantKeyStore, +} diff --git a/crates/router/src/workflows.rs b/crates/router/src/workflows.rs index e86d49faf6..4961932e06 100644 --- a/crates/router/src/workflows.rs +++ b/crates/router/src/workflows.rs @@ -2,12 +2,12 @@ pub mod api_key_expiry; #[cfg(feature = "payouts")] pub mod attach_payout_account_workflow; -#[cfg(feature = "v1")] pub mod outgoing_webhook_retry; -#[cfg(feature = "v1")] pub mod payment_method_status_update; pub mod payment_sync; -#[cfg(feature = "v1")] + pub mod refund_router; -#[cfg(feature = "v1")] + pub mod tokenized_data; + +pub mod passive_churn_recovery_workflow; diff --git a/crates/router/src/workflows/outgoing_webhook_retry.rs b/crates/router/src/workflows/outgoing_webhook_retry.rs index c7df4aeff0..9be893c7ac 100644 --- a/crates/router/src/workflows/outgoing_webhook_retry.rs +++ b/crates/router/src/workflows/outgoing_webhook_retry.rs @@ -36,6 +36,7 @@ pub struct OutgoingWebhookRetryWorkflow; #[async_trait::async_trait] impl ProcessTrackerWorkflow for OutgoingWebhookRetryWorkflow { + #[cfg(feature = "v1")] #[instrument(skip_all)] async fn execute_workflow<'a>( &'a self, @@ -226,6 +227,14 @@ impl ProcessTrackerWorkflow for OutgoingWebhookRetryWorkflow { Ok(()) } + #[cfg(feature = "v2")] + async fn execute_workflow<'a>( + &'a self, + _state: &'a SessionState, + _process: storage::ProcessTracker, + ) -> Result<(), errors::ProcessTrackerError> { + todo!() + } #[instrument(skip_all)] async fn error_handler<'a>( @@ -266,6 +275,7 @@ impl ProcessTrackerWorkflow for OutgoingWebhookRetryWorkflow { /// seconds between them by default. /// - `custom_merchant_mapping.merchant_id1`: Merchant-specific retry configuration for merchant /// with merchant ID `merchant_id1`. +#[cfg(feature = "v1")] #[instrument(skip_all)] pub(crate) async fn get_webhook_delivery_retry_schedule_time( db: &dyn StorageInterface, @@ -311,6 +321,7 @@ pub(crate) async fn get_webhook_delivery_retry_schedule_time( } /// Schedule the webhook delivery task for retry +#[cfg(feature = "v1")] #[instrument(skip_all)] pub(crate) async fn retry_webhook_delivery_task( db: &dyn StorageInterface, @@ -334,6 +345,7 @@ pub(crate) async fn retry_webhook_delivery_task( } } +#[cfg(feature = "v1")] #[instrument(skip_all)] async fn get_outgoing_webhook_content_and_event_type( state: SessionState, diff --git a/crates/router/src/workflows/passive_churn_recovery_workflow.rs b/crates/router/src/workflows/passive_churn_recovery_workflow.rs new file mode 100644 index 0000000000..d67f91f116 --- /dev/null +++ b/crates/router/src/workflows/passive_churn_recovery_workflow.rs @@ -0,0 +1,175 @@ +#[cfg(feature = "v2")] +use api_models::payments::PaymentsGetIntentRequest; +#[cfg(feature = "v2")] +use common_utils::ext_traits::{StringExt, ValueExt}; +#[cfg(feature = "v2")] +use error_stack::ResultExt; +#[cfg(feature = "v2")] +use hyperswitch_domain_models::payments::PaymentIntentData; +#[cfg(feature = "v2")] +use router_env::logger; +use scheduler::{consumer::workflows::ProcessTrackerWorkflow, errors}; +#[cfg(feature = "v2")] +use scheduler::{types::process_data, utils as scheduler_utils}; + +#[cfg(feature = "v2")] +use crate::{ + core::{ + passive_churn_recovery::{self as pcr}, + payments, + }, + db::StorageInterface, + errors::StorageError, + types::{ + api::{self as api_types}, + storage::passive_churn_recovery as pcr_storage_types, + }, +}; +use crate::{routes::SessionState, types::storage}; +pub struct ExecutePcrWorkflow; + +#[async_trait::async_trait] +impl ProcessTrackerWorkflow for ExecutePcrWorkflow { + #[cfg(feature = "v1")] + async fn execute_workflow<'a>( + &'a self, + _state: &'a SessionState, + _process: storage::ProcessTracker, + ) -> Result<(), errors::ProcessTrackerError> { + Ok(()) + } + #[cfg(feature = "v2")] + async fn execute_workflow<'a>( + &'a self, + state: &'a SessionState, + process: storage::ProcessTracker, + ) -> Result<(), errors::ProcessTrackerError> { + let tracking_data = process + .tracking_data + .clone() + .parse_value::( + "PCRWorkflowTrackingData", + )?; + let request = PaymentsGetIntentRequest { + id: tracking_data.global_payment_id.clone(), + }; + let key_manager_state = &state.into(); + let pcr_data = extract_data_and_perform_action(state, &tracking_data).await?; + let (payment_data, _, _) = payments::payments_intent_operation_core::< + api_types::PaymentGetIntent, + _, + _, + PaymentIntentData, + >( + state, + state.get_req_state(), + pcr_data.merchant_account.clone(), + pcr_data.profile.clone(), + pcr_data.key_store.clone(), + payments::operations::PaymentGetIntent, + request, + tracking_data.global_payment_id.clone(), + hyperswitch_domain_models::payments::HeaderPayload::default(), + None, + ) + .await?; + + match process.name.as_deref() { + Some("EXECUTE_WORKFLOW") => { + pcr::perform_execute_payment( + state, + &process, + &tracking_data, + &pcr_data, + key_manager_state, + &payment_data.payment_intent, + ) + .await + } + Some("PSYNC_WORKFLOW") => todo!(), + + Some("REVIEW_WORKFLOW") => todo!(), + _ => Err(errors::ProcessTrackerError::JobNotFound), + } + } +} +#[cfg(feature = "v2")] +pub(crate) async fn extract_data_and_perform_action( + state: &SessionState, + tracking_data: &pcr_storage_types::PcrWorkflowTrackingData, +) -> Result { + let db = &state.store; + + let key_manager_state = &state.into(); + let key_store = db + .get_merchant_key_store_by_merchant_id( + key_manager_state, + &tracking_data.merchant_id, + &db.get_master_key().to_vec().into(), + ) + .await?; + + let merchant_account = db + .find_merchant_account_by_merchant_id( + key_manager_state, + &tracking_data.merchant_id, + &key_store, + ) + .await?; + + let profile = db + .find_business_profile_by_profile_id( + key_manager_state, + &key_store, + &tracking_data.profile_id, + ) + .await?; + + let pcr_payment_data = pcr_storage_types::PcrPaymentData { + merchant_account, + profile, + key_store, + }; + Ok(pcr_payment_data) +} + +#[cfg(feature = "v2")] +pub(crate) async fn get_schedule_time_to_retry_mit_payments( + db: &dyn StorageInterface, + merchant_id: &common_utils::id_type::MerchantId, + retry_count: i32, +) -> Option { + let key = "pt_mapping_pcr_retries"; + let result = db + .find_config_by_key(key) + .await + .map(|value| value.config) + .and_then(|config| { + config + .parse_struct("RevenueRecoveryPaymentProcessTrackerMapping") + .change_context(StorageError::DeserializationFailed) + }); + + let mapping = result.map_or_else( + |error| { + if error.current_context().is_db_not_found() { + logger::debug!("Revenue Recovery retry config `{key}` not found, ignoring"); + } else { + logger::error!( + ?error, + "Failed to read Revenue Recovery retry config `{key}`" + ); + } + process_data::RevenueRecoveryPaymentProcessTrackerMapping::default() + }, + |mapping| { + logger::debug!(?mapping, "Using custom pcr payments retry config"); + mapping + }, + ); + + let time_delta = + scheduler_utils::get_pcr_payments_retry_schedule_time(mapping, merchant_id, retry_count); + + scheduler_utils::get_time_from_delta(time_delta) +} diff --git a/crates/router/src/workflows/payment_method_status_update.rs b/crates/router/src/workflows/payment_method_status_update.rs index 124417e335..dba3bace25 100644 --- a/crates/router/src/workflows/payment_method_status_update.rs +++ b/crates/router/src/workflows/payment_method_status_update.rs @@ -111,6 +111,14 @@ impl ProcessTrackerWorkflow for PaymentMethodStatusUpdateWorkflow Ok(()) } + #[cfg(feature = "v2")] + async fn execute_workflow<'a>( + &'a self, + _state: &'a SessionState, + _process: storage::ProcessTracker, + ) -> Result<(), errors::ProcessTrackerError> { + todo!() + } async fn error_handler<'a>( &'a self, _state: &'a SessionState, diff --git a/crates/router/src/workflows/payment_sync.rs b/crates/router/src/workflows/payment_sync.rs index fb83922935..64c4853d14 100644 --- a/crates/router/src/workflows/payment_sync.rs +++ b/crates/router/src/workflows/payment_sync.rs @@ -31,8 +31,8 @@ impl ProcessTrackerWorkflow for PaymentsSyncWorkflow { #[cfg(feature = "v2")] async fn execute_workflow<'a>( &'a self, - state: &'a SessionState, - process: storage::ProcessTracker, + _state: &'a SessionState, + _process: storage::ProcessTracker, ) -> Result<(), sch_errors::ProcessTrackerError> { todo!() } diff --git a/crates/router/src/workflows/refund_router.rs b/crates/router/src/workflows/refund_router.rs index 515e34c068..82e7d8a45d 100644 --- a/crates/router/src/workflows/refund_router.rs +++ b/crates/router/src/workflows/refund_router.rs @@ -1,13 +1,14 @@ use scheduler::consumer::workflows::ProcessTrackerWorkflow; -use crate::{ - core::refunds as refund_flow, errors, logger::error, routes::SessionState, types::storage, -}; +#[cfg(feature = "v1")] +use crate::core::refunds as refund_flow; +use crate::{errors, logger::error, routes::SessionState, types::storage}; pub struct RefundWorkflowRouter; #[async_trait::async_trait] impl ProcessTrackerWorkflow for RefundWorkflowRouter { + #[cfg(feature = "v1")] async fn execute_workflow<'a>( &'a self, state: &'a SessionState, @@ -15,6 +16,14 @@ impl ProcessTrackerWorkflow for RefundWorkflowRouter { ) -> Result<(), errors::ProcessTrackerError> { Ok(Box::pin(refund_flow::start_refund_workflow(state, &process)).await?) } + #[cfg(feature = "v2")] + async fn execute_workflow<'a>( + &'a self, + _state: &'a SessionState, + _process: storage::ProcessTracker, + ) -> Result<(), errors::ProcessTrackerError> { + todo!() + } async fn error_handler<'a>( &'a self, diff --git a/crates/router/src/workflows/tokenized_data.rs b/crates/router/src/workflows/tokenized_data.rs index bc1842205a..2f2474df66 100644 --- a/crates/router/src/workflows/tokenized_data.rs +++ b/crates/router/src/workflows/tokenized_data.rs @@ -1,13 +1,14 @@ use scheduler::consumer::workflows::ProcessTrackerWorkflow; -use crate::{ - core::payment_methods::vault, errors, logger::error, routes::SessionState, types::storage, -}; +#[cfg(feature = "v1")] +use crate::core::payment_methods::vault; +use crate::{errors, logger::error, routes::SessionState, types::storage}; pub struct DeleteTokenizeDataWorkflow; #[async_trait::async_trait] impl ProcessTrackerWorkflow for DeleteTokenizeDataWorkflow { + #[cfg(feature = "v1")] async fn execute_workflow<'a>( &'a self, state: &'a SessionState, @@ -16,6 +17,15 @@ impl ProcessTrackerWorkflow for DeleteTokenizeDataWorkflow { Ok(vault::start_tokenize_data_workflow(state, &process).await?) } + #[cfg(feature = "v2")] + async fn execute_workflow<'a>( + &'a self, + _state: &'a SessionState, + _process: storage::ProcessTracker, + ) -> Result<(), errors::ProcessTrackerError> { + todo!() + } + async fn error_handler<'a>( &'a self, _state: &'a SessionState, diff --git a/crates/scheduler/src/consumer/types/process_data.rs b/crates/scheduler/src/consumer/types/process_data.rs index f68ad4795d..26d0fdf702 100644 --- a/crates/scheduler/src/consumer/types/process_data.rs +++ b/crates/scheduler/src/consumer/types/process_data.rs @@ -82,3 +82,39 @@ impl Default for OutgoingWebhookRetryProcessTrackerMapping { } } } + +/// Configuration for outgoing webhook retries. +#[derive(Debug, Serialize, Deserialize)] +pub struct RevenueRecoveryPaymentProcessTrackerMapping { + /// Default (fallback) retry configuration used when no merchant-specific retry configuration + /// exists. + pub default_mapping: RetryMapping, + + /// Merchant-specific retry configuration. + pub custom_merchant_mapping: HashMap, +} + +impl Default for RevenueRecoveryPaymentProcessTrackerMapping { + fn default() -> Self { + Self { + default_mapping: RetryMapping { + // 1st attempt happens after 1 minute of it being + start_after: 60, + + frequencies: vec![ + // 2nd and 3rd attempts happen at intervals of 3 hours each + (60 * 60 * 3, 2), + // 4th, 5th, 6th attempts happen at intervals of 6 hours each + (60 * 60 * 6, 3), + // 7th, 8th, 9th attempts happen at intervals of 9 hour each + (60 * 60 * 9, 3), + // 10th, 11th and 12th attempts happen at intervals of 12 hours each + (60 * 60 * 12, 3), + // 13th, 14th and 15th attempts happen at intervals of 18 hours each + (60 * 60 * 18, 3), + ], + }, + custom_merchant_mapping: HashMap::new(), + } + } +} diff --git a/crates/scheduler/src/errors.rs b/crates/scheduler/src/errors.rs index 1fb7599aed..254c497952 100644 --- a/crates/scheduler/src/errors.rs +++ b/crates/scheduler/src/errors.rs @@ -4,7 +4,7 @@ use external_services::email::EmailError; use hyperswitch_domain_models::errors::api_error_response::ApiErrorResponse; pub use redis_interface::errors::RedisError; pub use storage_impl::errors::ApplicationError; -use storage_impl::errors::StorageError; +use storage_impl::errors::{RecoveryError, StorageError}; use crate::env::logger::{self, error}; @@ -46,6 +46,8 @@ pub enum ProcessTrackerError { EApiErrorResponse, #[error("Received Error ClientError")] EClientError, + #[error("Received RecoveryError: {0:?}")] + ERecoveryError(error_stack::Report), #[error("Received Error StorageError: {0:?}")] EStorageError(error_stack::Report), #[error("Received Error RedisError: {0:?}")] @@ -131,3 +133,8 @@ error_to_process_tracker_error!( error_stack::Report, ProcessTrackerError::EEmailError(error_stack::Report) ); + +error_to_process_tracker_error!( + error_stack::Report, + ProcessTrackerError::ERecoveryError(error_stack::Report) +); diff --git a/crates/scheduler/src/utils.rs b/crates/scheduler/src/utils.rs index 0dbea173ea..3d7f637de9 100644 --- a/crates/scheduler/src/utils.rs +++ b/crates/scheduler/src/utils.rs @@ -350,6 +350,25 @@ pub fn get_outgoing_webhook_retry_schedule_time( } } +pub fn get_pcr_payments_retry_schedule_time( + mapping: process_data::RevenueRecoveryPaymentProcessTrackerMapping, + merchant_id: &common_utils::id_type::MerchantId, + retry_count: i32, +) -> Option { + let mapping = match mapping.custom_merchant_mapping.get(merchant_id) { + Some(map) => map.clone(), + None => mapping.default_mapping, + }; + // TODO: check if the current scheduled time is not more than the configured timerange + + // For first try, get the `start_after` time + if retry_count == 0 { + Some(mapping.start_after) + } else { + get_delay(retry_count, &mapping.frequencies) + } +} + /// Get the delay based on the retry count pub fn get_delay<'a>( retry_count: i32, diff --git a/crates/storage_impl/src/errors.rs b/crates/storage_impl/src/errors.rs index d75911d593..657e9b000a 100644 --- a/crates/storage_impl/src/errors.rs +++ b/crates/storage_impl/src/errors.rs @@ -299,3 +299,15 @@ pub enum HealthCheckGRPCServiceError { #[error("Failed to establish connection with gRPC service")] FailedToCallService, } + +#[derive(thiserror::Error, Debug, Clone)] +pub enum RecoveryError { + #[error("Failed to make a recovery payment")] + PaymentCallFailed, + #[error("Encountered a Process Tracker Task Failure")] + ProcessTrackerFailure, + #[error("The encountered task is invalid")] + InvalidTask, + #[error("The Intended data was not found")] + ValueNotFound, +}