feat(core): create a process_tracker workflow for PCR (#7124)

Co-authored-by: hyperswitch-bot[bot] <148525504+hyperswitch-bot[bot]@users.noreply.github.com>
This commit is contained in:
Amisha Prabhat
2025-02-27 18:29:06 +05:30
committed by GitHub
parent c3b7197304
commit 44dc45b8bd
23 changed files with 855 additions and 32 deletions

View File

@ -325,7 +325,7 @@ impl PaymentsCreateIntentRequest {
} }
// This struct is only used internally, not visible in API Reference // This struct is only used internally, not visible in API Reference
#[derive(Debug, Clone, serde::Serialize)] #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
#[cfg(feature = "v2")] #[cfg(feature = "v2")]
pub struct PaymentsGetIntentRequest { pub struct PaymentsGetIntentRequest {
pub id: id_type::GlobalPaymentId, pub id: id_type::GlobalPaymentId,

View File

@ -103,6 +103,8 @@ pub enum ProcessTrackerStatus {
ProcessStarted, ProcessStarted,
// Finished by consumer // Finished by consumer
Finish, Finish,
// Review the task
Review,
} }
// Refund // Refund

View File

@ -210,6 +210,7 @@ pub enum ProcessTrackerRunner {
OutgoingWebhookRetryWorkflow, OutgoingWebhookRetryWorkflow,
AttachPayoutAccountWorkflow, AttachPayoutAccountWorkflow,
PaymentMethodStatusUpdateWorkflow, PaymentMethodStatusUpdateWorkflow,
PassiveRecoveryWorkflow,
} }
#[cfg(test)] #[cfg(test)]
@ -265,4 +266,19 @@ pub mod business_status {
/// Business status set for newly created tasks. /// Business status set for newly created tasks.
pub const PENDING: &str = "Pending"; pub const PENDING: &str = "Pending";
/// For the PCR Workflow
///
/// This status indicates the completion of a execute task
pub const EXECUTE_WORKFLOW_COMPLETE: &str = "COMPLETED_EXECUTE_TASK";
/// This status indicates that the execute task was completed to trigger the psync task
pub const EXECUTE_WORKFLOW_COMPLETE_FOR_PSYNC: &str = "COMPLETED_EXECUTE_TASK_TO_TRIGGER_PSYNC";
/// This status indicates that the execute task was completed to trigger the review task
pub const EXECUTE_WORKFLOW_COMPLETE_FOR_REVIEW: &str =
"COMPLETED_EXECUTE_TASK_TO_TRIGGER_REVIEW";
/// This status indicates the completion of a psync task
pub const PSYNC_WORKFLOW_COMPLETE: &str = "COMPLETED_PSYNC_TASK";
} }

View File

@ -318,7 +318,7 @@ pub enum PaymentIntentUpdate {
/// PreUpdate tracker of ConfirmIntent /// PreUpdate tracker of ConfirmIntent
ConfirmIntent { ConfirmIntent {
status: common_enums::IntentStatus, status: common_enums::IntentStatus,
active_attempt_id: id_type::GlobalAttemptId, active_attempt_id: Option<id_type::GlobalAttemptId>,
updated_by: String, updated_by: String,
}, },
/// PostUpdate tracker of ConfirmIntent /// PostUpdate tracker of ConfirmIntent
@ -397,7 +397,7 @@ impl From<PaymentIntentUpdate> for diesel_models::PaymentIntentUpdateInternal {
updated_by, updated_by,
} => Self { } => Self {
status: Some(status), status: Some(status),
active_attempt_id: Some(active_attempt_id), active_attempt_id,
modified_at: common_utils::date_time::now(), modified_at: common_utils::date_time::now(),
amount: None, amount: None,
amount_captured: None, amount_captured: None,

View File

@ -252,7 +252,6 @@ pub async fn deep_health_check_func(
#[derive(Debug, Copy, Clone)] #[derive(Debug, Copy, Clone)]
pub struct WorkflowRunner; pub struct WorkflowRunner;
#[cfg(feature = "v1")]
#[async_trait::async_trait] #[async_trait::async_trait]
impl ProcessTrackerWorkflows<routes::SessionState> for WorkflowRunner { impl ProcessTrackerWorkflows<routes::SessionState> for WorkflowRunner {
async fn trigger_workflow<'a>( async fn trigger_workflow<'a>(
@ -322,6 +321,9 @@ impl ProcessTrackerWorkflows<routes::SessionState> for WorkflowRunner {
storage::ProcessTrackerRunner::PaymentMethodStatusUpdateWorkflow => Ok(Box::new( storage::ProcessTrackerRunner::PaymentMethodStatusUpdateWorkflow => Ok(Box::new(
workflows::payment_method_status_update::PaymentMethodStatusUpdateWorkflow, workflows::payment_method_status_update::PaymentMethodStatusUpdateWorkflow,
)), )),
storage::ProcessTrackerRunner::PassiveRecoveryWorkflow => Ok(Box::new(
workflows::passive_churn_recovery_workflow::ExecutePcrWorkflow,
)),
} }
}; };
@ -360,18 +362,6 @@ impl ProcessTrackerWorkflows<routes::SessionState> for WorkflowRunner {
} }
} }
#[cfg(feature = "v2")]
#[async_trait::async_trait]
impl ProcessTrackerWorkflows<routes::SessionState> for WorkflowRunner {
async fn trigger_workflow<'a>(
&'a self,
_state: &'a routes::SessionState,
_process: storage::ProcessTracker,
) -> CustomResult<(), ProcessTrackerError> {
todo!()
}
}
async fn start_scheduler( async fn start_scheduler(
state: &routes::AppState, state: &routes::AppState,
scheduler_flow: scheduler::SchedulerFlow, scheduler_flow: scheduler::SchedulerFlow,

View File

@ -57,4 +57,6 @@ pub mod webhooks;
pub mod unified_authentication_service; pub mod unified_authentication_service;
#[cfg(feature = "v2")]
pub mod passive_churn_recovery;
pub mod relay; pub mod relay;

View File

@ -0,0 +1,207 @@
pub mod transformers;
pub mod types;
use api_models::payments::PaymentsRetrieveRequest;
use common_utils::{self, id_type, types::keymanager::KeyManagerState};
use diesel_models::process_tracker::business_status;
use error_stack::{self, ResultExt};
use hyperswitch_domain_models::{
errors::api_error_response,
payments::{PaymentIntent, PaymentStatusData},
};
use scheduler::errors;
use crate::{
core::{
errors::RouterResult,
passive_churn_recovery::types as pcr_types,
payments::{self, operations::Operation},
},
db::StorageInterface,
logger,
routes::{metrics, SessionState},
types::{
api,
storage::{self, passive_churn_recovery as pcr},
transformers::ForeignInto,
},
};
pub async fn perform_execute_payment(
state: &SessionState,
execute_task_process: &storage::ProcessTracker,
tracking_data: &pcr::PcrWorkflowTrackingData,
pcr_data: &pcr::PcrPaymentData,
_key_manager_state: &KeyManagerState,
payment_intent: &PaymentIntent,
) -> Result<(), errors::ProcessTrackerError> {
let db = &*state.store;
let decision = pcr_types::Decision::get_decision_based_on_params(
state,
payment_intent.status,
false,
payment_intent.active_attempt_id.clone(),
pcr_data,
&tracking_data.global_payment_id,
)
.await?;
// TODO decide if its a global failure or is it requeueable error
match decision {
pcr_types::Decision::Execute => {
let action = pcr_types::Action::execute_payment(
db,
pcr_data.merchant_account.get_id(),
payment_intent,
execute_task_process,
)
.await?;
action
.execute_payment_task_response_handler(
db,
&pcr_data.merchant_account,
payment_intent,
execute_task_process,
&pcr_data.profile,
)
.await?;
}
pcr_types::Decision::Psync(attempt_status, attempt_id) => {
// find if a psync task is already present
let task = "PSYNC_WORKFLOW";
let runner = storage::ProcessTrackerRunner::PassiveRecoveryWorkflow;
let process_tracker_id = format!("{runner}_{task}_{}", attempt_id.get_string_repr());
let psync_process = db.find_process_by_id(&process_tracker_id).await?;
match psync_process {
Some(_) => {
let pcr_status: pcr_types::PcrAttemptStatus = attempt_status.foreign_into();
pcr_status
.update_pt_status_based_on_attempt_status_for_execute_payment(
db,
execute_task_process,
)
.await?;
}
None => {
// insert new psync task
insert_psync_pcr_task(
db,
pcr_data.merchant_account.get_id().clone(),
payment_intent.get_id().clone(),
pcr_data.profile.get_id().clone(),
attempt_id.clone(),
storage::ProcessTrackerRunner::PassiveRecoveryWorkflow,
)
.await?;
// finish the current task
db.finish_process_with_business_status(
execute_task_process.clone(),
business_status::EXECUTE_WORKFLOW_COMPLETE_FOR_PSYNC,
)
.await?;
}
};
}
pcr_types::Decision::InvalidDecision => {
db.finish_process_with_business_status(
execute_task_process.clone(),
business_status::EXECUTE_WORKFLOW_COMPLETE,
)
.await?;
logger::warn!("Abnormal State Identified")
}
}
Ok(())
}
async fn insert_psync_pcr_task(
db: &dyn StorageInterface,
merchant_id: id_type::MerchantId,
payment_id: id_type::GlobalPaymentId,
profile_id: id_type::ProfileId,
payment_attempt_id: id_type::GlobalAttemptId,
runner: storage::ProcessTrackerRunner,
) -> RouterResult<storage::ProcessTracker> {
let task = "PSYNC_WORKFLOW";
let process_tracker_id = format!("{runner}_{task}_{}", payment_attempt_id.get_string_repr());
let schedule_time = common_utils::date_time::now();
let psync_workflow_tracking_data = pcr::PcrWorkflowTrackingData {
global_payment_id: payment_id,
merchant_id,
profile_id,
payment_attempt_id,
};
let tag = ["PCR"];
let process_tracker_entry = storage::ProcessTrackerNew::new(
process_tracker_id,
task,
runner,
tag,
psync_workflow_tracking_data,
schedule_time,
)
.change_context(api_error_response::ApiErrorResponse::InternalServerError)
.attach_printable("Failed to construct delete tokenized data process tracker task")?;
let response = db
.insert_process(process_tracker_entry)
.await
.change_context(api_error_response::ApiErrorResponse::InternalServerError)
.attach_printable("Failed to construct delete tokenized data process tracker task")?;
metrics::TASKS_ADDED_COUNT.add(1, router_env::metric_attributes!(("flow", "PsyncPcr")));
Ok(response)
}
pub async fn call_psync_api(
state: &SessionState,
global_payment_id: &id_type::GlobalPaymentId,
pcr_data: &pcr::PcrPaymentData,
) -> RouterResult<PaymentStatusData<api::PSync>> {
let operation = payments::operations::PaymentGet;
let req = PaymentsRetrieveRequest {
force_sync: false,
param: None,
expand_attempts: true,
};
// TODO : Use api handler instead of calling get_tracker and payments_operation_core
// Get the tracker related information. This includes payment intent and payment attempt
let get_tracker_response = operation
.to_get_tracker()?
.get_trackers(
state,
global_payment_id,
&req,
&pcr_data.merchant_account,
&pcr_data.profile,
&pcr_data.key_store,
&hyperswitch_domain_models::payments::HeaderPayload::default(),
None,
)
.await?;
let (payment_data, _req, _, _, _) = Box::pin(payments::payments_operation_core::<
api::PSync,
_,
_,
_,
PaymentStatusData<api::PSync>,
>(
state,
state.get_req_state(),
pcr_data.merchant_account.clone(),
pcr_data.key_store.clone(),
&pcr_data.profile,
operation,
req,
get_tracker_response,
payments::CallConnectorAction::Trigger,
hyperswitch_domain_models::payments::HeaderPayload::default(),
))
.await?;
Ok(payment_data)
}

View File

@ -0,0 +1,39 @@
use common_enums::AttemptStatus;
use crate::{
core::passive_churn_recovery::types::PcrAttemptStatus, types::transformers::ForeignFrom,
};
impl ForeignFrom<AttemptStatus> for PcrAttemptStatus {
fn foreign_from(s: AttemptStatus) -> Self {
match s {
AttemptStatus::Authorized | AttemptStatus::Charged | AttemptStatus::AutoRefunded => {
Self::Succeeded
}
AttemptStatus::Started
| AttemptStatus::AuthenticationSuccessful
| AttemptStatus::Authorizing
| AttemptStatus::CodInitiated
| AttemptStatus::VoidInitiated
| AttemptStatus::CaptureInitiated
| AttemptStatus::Pending => Self::Processing,
AttemptStatus::AuthenticationFailed
| AttemptStatus::AuthorizationFailed
| AttemptStatus::VoidFailed
| AttemptStatus::RouterDeclined
| AttemptStatus::CaptureFailed
| AttemptStatus::Failure => Self::Failed,
AttemptStatus::Voided
| AttemptStatus::ConfirmationAwaited
| AttemptStatus::PartialCharged
| AttemptStatus::PartialChargedAndChargeable
| AttemptStatus::PaymentMethodAwaited
| AttemptStatus::AuthenticationPending
| AttemptStatus::DeviceDataCollectionPending
| AttemptStatus::Unresolved => Self::InvalidStatus(s.to_string()),
}
}
}

View File

@ -0,0 +1,259 @@
use common_enums::{self, AttemptStatus, IntentStatus};
use common_utils::{self, ext_traits::OptionExt, id_type};
use diesel_models::{enums, process_tracker::business_status};
use error_stack::{self, ResultExt};
use hyperswitch_domain_models::{
business_profile, merchant_account,
payments::{PaymentConfirmData, PaymentIntent},
};
use time::PrimitiveDateTime;
use crate::{
core::{
errors::{self, RouterResult},
passive_churn_recovery::{self as core_pcr},
},
db::StorageInterface,
logger,
routes::SessionState,
types::{api::payments as api_types, storage, transformers::ForeignInto},
workflows::passive_churn_recovery_workflow::get_schedule_time_to_retry_mit_payments,
};
type RecoveryResult<T> = error_stack::Result<T, errors::RecoveryError>;
/// The status of Passive Churn Payments
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
pub enum PcrAttemptStatus {
Succeeded,
Failed,
Processing,
InvalidStatus(String),
// Cancelled,
}
impl PcrAttemptStatus {
pub(crate) async fn update_pt_status_based_on_attempt_status_for_execute_payment(
&self,
db: &dyn StorageInterface,
execute_task_process: &storage::ProcessTracker,
) -> Result<(), errors::ProcessTrackerError> {
match &self {
Self::Succeeded | Self::Failed | Self::Processing => {
// finish the current execute task
db.finish_process_with_business_status(
execute_task_process.clone(),
business_status::EXECUTE_WORKFLOW_COMPLETE_FOR_PSYNC,
)
.await?;
}
Self::InvalidStatus(action) => {
logger::debug!(
"Invalid Attempt Status for the Recovery Payment : {}",
action
);
let pt_update = storage::ProcessTrackerUpdate::StatusUpdate {
status: enums::ProcessTrackerStatus::Review,
business_status: Some(String::from(business_status::EXECUTE_WORKFLOW_COMPLETE)),
};
// update the process tracker status as Review
db.update_process(execute_task_process.clone(), pt_update)
.await?;
}
};
Ok(())
}
}
#[derive(Debug, Clone)]
pub enum Decision {
Execute,
Psync(AttemptStatus, id_type::GlobalAttemptId),
InvalidDecision,
}
impl Decision {
pub async fn get_decision_based_on_params(
state: &SessionState,
intent_status: IntentStatus,
called_connector: bool,
active_attempt_id: Option<id_type::GlobalAttemptId>,
pcr_data: &storage::passive_churn_recovery::PcrPaymentData,
payment_id: &id_type::GlobalPaymentId,
) -> RecoveryResult<Self> {
Ok(match (intent_status, called_connector, active_attempt_id) {
(IntentStatus::Failed, false, None) => Self::Execute,
(IntentStatus::Processing, true, Some(_)) => {
let psync_data = core_pcr::call_psync_api(state, payment_id, pcr_data)
.await
.change_context(errors::RecoveryError::PaymentCallFailed)
.attach_printable("Error while executing the Psync call")?;
let payment_attempt = psync_data
.payment_attempt
.get_required_value("Payment Attempt")
.change_context(errors::RecoveryError::ValueNotFound)
.attach_printable("Error while executing the Psync call")?;
Self::Psync(payment_attempt.status, payment_attempt.get_id().clone())
}
_ => Self::InvalidDecision,
})
}
}
#[derive(Debug, Clone)]
pub enum Action {
SyncPayment(id_type::GlobalAttemptId),
RetryPayment(PrimitiveDateTime),
TerminalFailure,
SuccessfulPayment,
ReviewPayment,
ManualReviewAction,
}
impl Action {
pub async fn execute_payment(
db: &dyn StorageInterface,
merchant_id: &id_type::MerchantId,
payment_intent: &PaymentIntent,
process: &storage::ProcessTracker,
) -> RecoveryResult<Self> {
// call the proxy api
let response = call_proxy_api::<api_types::Authorize>(payment_intent);
// handle proxy api's response
match response {
Ok(payment_data) => match payment_data.payment_attempt.status.foreign_into() {
PcrAttemptStatus::Succeeded => Ok(Self::SuccessfulPayment),
PcrAttemptStatus::Failed => {
Self::decide_retry_failure_action(db, merchant_id, process.clone()).await
}
PcrAttemptStatus::Processing => {
Ok(Self::SyncPayment(payment_data.payment_attempt.id))
}
PcrAttemptStatus::InvalidStatus(action) => {
logger::info!(?action, "Invalid Payment Status For PCR Payment");
Ok(Self::ManualReviewAction)
}
},
Err(err) =>
// check for an active attempt being constructed or not
{
logger::error!(execute_payment_res=?err);
match payment_intent.active_attempt_id.clone() {
Some(attempt_id) => Ok(Self::SyncPayment(attempt_id)),
None => Ok(Self::ReviewPayment),
}
}
}
}
pub async fn execute_payment_task_response_handler(
&self,
db: &dyn StorageInterface,
merchant_account: &merchant_account::MerchantAccount,
payment_intent: &PaymentIntent,
execute_task_process: &storage::ProcessTracker,
profile: &business_profile::Profile,
) -> Result<(), errors::ProcessTrackerError> {
match self {
Self::SyncPayment(attempt_id) => {
core_pcr::insert_psync_pcr_task(
db,
merchant_account.get_id().to_owned(),
payment_intent.id.clone(),
profile.get_id().to_owned(),
attempt_id.clone(),
storage::ProcessTrackerRunner::PassiveRecoveryWorkflow,
)
.await
.change_context(errors::RecoveryError::ProcessTrackerFailure)
.attach_printable("Failed to create a psync workflow in the process tracker")?;
db.as_scheduler()
.finish_process_with_business_status(
execute_task_process.clone(),
business_status::EXECUTE_WORKFLOW_COMPLETE_FOR_PSYNC,
)
.await
.change_context(errors::RecoveryError::ProcessTrackerFailure)
.attach_printable("Failed to update the process tracker")?;
Ok(())
}
Self::RetryPayment(schedule_time) => {
let mut pt = execute_task_process.clone();
// update the schedule time
pt.schedule_time = Some(*schedule_time);
let pt_task_update = diesel_models::ProcessTrackerUpdate::StatusUpdate {
status: storage::enums::ProcessTrackerStatus::Pending,
business_status: Some(business_status::PENDING.to_owned()),
};
db.as_scheduler()
.update_process(pt.clone(), pt_task_update)
.await?;
// TODO: update the connector called field and make the active attempt None
Ok(())
}
Self::TerminalFailure => {
// TODO: Record a failure transaction back to Billing Connector
Ok(())
}
Self::SuccessfulPayment => Ok(()),
Self::ReviewPayment => Ok(()),
Self::ManualReviewAction => {
logger::debug!("Invalid Payment Status For PCR Payment");
let pt_update = storage::ProcessTrackerUpdate::StatusUpdate {
status: enums::ProcessTrackerStatus::Review,
business_status: Some(String::from(business_status::EXECUTE_WORKFLOW_COMPLETE)),
};
// update the process tracker status as Review
db.as_scheduler()
.update_process(execute_task_process.clone(), pt_update)
.await?;
Ok(())
}
}
}
pub(crate) async fn decide_retry_failure_action(
db: &dyn StorageInterface,
merchant_id: &id_type::MerchantId,
pt: storage::ProcessTracker,
) -> RecoveryResult<Self> {
let schedule_time =
get_schedule_time_to_retry_mit_payments(db, merchant_id, pt.retry_count + 1).await;
match schedule_time {
Some(schedule_time) => Ok(Self::RetryPayment(schedule_time)),
None => Ok(Self::TerminalFailure),
}
}
}
// This function would be converted to proxy_payments_core
fn call_proxy_api<F>(payment_intent: &PaymentIntent) -> RouterResult<PaymentConfirmData<F>>
where
F: Send + Clone + Sync,
{
let payment_address = hyperswitch_domain_models::payment_address::PaymentAddress::new(
payment_intent
.shipping_address
.clone()
.map(|address| address.into_inner()),
payment_intent
.billing_address
.clone()
.map(|address| address.into_inner()),
None,
Some(true),
);
let response = PaymentConfirmData {
flow: std::marker::PhantomData,
payment_intent: payment_intent.clone(),
payment_attempt: todo!(),
payment_method_data: None,
payment_address,
};
Ok(response)
}

View File

@ -210,8 +210,8 @@ impl<F: Send + Clone + Sync> GetTracker<F, PaymentConfirmData<F>, PaymentsConfir
) )
.await?; .await?;
let payment_attempt = db let payment_attempt: hyperswitch_domain_models::payments::payment_attempt::PaymentAttempt =
.insert_payment_attempt( db.insert_payment_attempt(
key_manager_state, key_manager_state,
key_store, key_store,
payment_attempt_domain_model, payment_attempt_domain_model,
@ -416,7 +416,7 @@ impl<F: Clone + Sync> UpdateTracker<F, PaymentConfirmData<F>, PaymentsConfirmInt
hyperswitch_domain_models::payments::payment_intent::PaymentIntentUpdate::ConfirmIntent { hyperswitch_domain_models::payments::payment_intent::PaymentIntentUpdate::ConfirmIntent {
status: intent_status, status: intent_status,
updated_by: storage_scheme.to_string(), updated_by: storage_scheme.to_string(),
active_attempt_id: payment_data.payment_attempt.id.clone(), active_attempt_id: Some(payment_data.payment_attempt.id.clone()),
}; };
let authentication_type = payment_data.payment_attempt.authentication_type; let authentication_type = payment_data.payment_attempt.authentication_type;

View File

@ -28,6 +28,8 @@ pub mod mandate;
pub mod merchant_account; pub mod merchant_account;
pub mod merchant_connector_account; pub mod merchant_connector_account;
pub mod merchant_key_store; pub mod merchant_key_store;
#[cfg(feature = "v2")]
pub mod passive_churn_recovery;
pub mod payment_attempt; pub mod payment_attempt;
pub mod payment_link; pub mod payment_link;
pub mod payment_method; pub mod payment_method;

View File

@ -0,0 +1,18 @@
use std::fmt::Debug;
use common_utils::id_type;
use hyperswitch_domain_models::{business_profile, merchant_account, merchant_key_store};
#[derive(serde::Serialize, serde::Deserialize, Debug)]
pub struct PcrWorkflowTrackingData {
pub merchant_id: id_type::MerchantId,
pub profile_id: id_type::ProfileId,
pub global_payment_id: id_type::GlobalPaymentId,
pub payment_attempt_id: id_type::GlobalAttemptId,
}
#[derive(Debug, Clone)]
pub struct PcrPaymentData {
pub merchant_account: merchant_account::MerchantAccount,
pub profile: business_profile::Profile,
pub key_store: merchant_key_store::MerchantKeyStore,
}

View File

@ -2,12 +2,12 @@
pub mod api_key_expiry; pub mod api_key_expiry;
#[cfg(feature = "payouts")] #[cfg(feature = "payouts")]
pub mod attach_payout_account_workflow; pub mod attach_payout_account_workflow;
#[cfg(feature = "v1")]
pub mod outgoing_webhook_retry; pub mod outgoing_webhook_retry;
#[cfg(feature = "v1")]
pub mod payment_method_status_update; pub mod payment_method_status_update;
pub mod payment_sync; pub mod payment_sync;
#[cfg(feature = "v1")]
pub mod refund_router; pub mod refund_router;
#[cfg(feature = "v1")]
pub mod tokenized_data; pub mod tokenized_data;
pub mod passive_churn_recovery_workflow;

View File

@ -36,6 +36,7 @@ pub struct OutgoingWebhookRetryWorkflow;
#[async_trait::async_trait] #[async_trait::async_trait]
impl ProcessTrackerWorkflow<SessionState> for OutgoingWebhookRetryWorkflow { impl ProcessTrackerWorkflow<SessionState> for OutgoingWebhookRetryWorkflow {
#[cfg(feature = "v1")]
#[instrument(skip_all)] #[instrument(skip_all)]
async fn execute_workflow<'a>( async fn execute_workflow<'a>(
&'a self, &'a self,
@ -226,6 +227,14 @@ impl ProcessTrackerWorkflow<SessionState> for OutgoingWebhookRetryWorkflow {
Ok(()) Ok(())
} }
#[cfg(feature = "v2")]
async fn execute_workflow<'a>(
&'a self,
_state: &'a SessionState,
_process: storage::ProcessTracker,
) -> Result<(), errors::ProcessTrackerError> {
todo!()
}
#[instrument(skip_all)] #[instrument(skip_all)]
async fn error_handler<'a>( async fn error_handler<'a>(
@ -266,6 +275,7 @@ impl ProcessTrackerWorkflow<SessionState> for OutgoingWebhookRetryWorkflow {
/// seconds between them by default. /// seconds between them by default.
/// - `custom_merchant_mapping.merchant_id1`: Merchant-specific retry configuration for merchant /// - `custom_merchant_mapping.merchant_id1`: Merchant-specific retry configuration for merchant
/// with merchant ID `merchant_id1`. /// with merchant ID `merchant_id1`.
#[cfg(feature = "v1")]
#[instrument(skip_all)] #[instrument(skip_all)]
pub(crate) async fn get_webhook_delivery_retry_schedule_time( pub(crate) async fn get_webhook_delivery_retry_schedule_time(
db: &dyn StorageInterface, db: &dyn StorageInterface,
@ -311,6 +321,7 @@ pub(crate) async fn get_webhook_delivery_retry_schedule_time(
} }
/// Schedule the webhook delivery task for retry /// Schedule the webhook delivery task for retry
#[cfg(feature = "v1")]
#[instrument(skip_all)] #[instrument(skip_all)]
pub(crate) async fn retry_webhook_delivery_task( pub(crate) async fn retry_webhook_delivery_task(
db: &dyn StorageInterface, db: &dyn StorageInterface,
@ -334,6 +345,7 @@ pub(crate) async fn retry_webhook_delivery_task(
} }
} }
#[cfg(feature = "v1")]
#[instrument(skip_all)] #[instrument(skip_all)]
async fn get_outgoing_webhook_content_and_event_type( async fn get_outgoing_webhook_content_and_event_type(
state: SessionState, state: SessionState,

View File

@ -0,0 +1,175 @@
#[cfg(feature = "v2")]
use api_models::payments::PaymentsGetIntentRequest;
#[cfg(feature = "v2")]
use common_utils::ext_traits::{StringExt, ValueExt};
#[cfg(feature = "v2")]
use error_stack::ResultExt;
#[cfg(feature = "v2")]
use hyperswitch_domain_models::payments::PaymentIntentData;
#[cfg(feature = "v2")]
use router_env::logger;
use scheduler::{consumer::workflows::ProcessTrackerWorkflow, errors};
#[cfg(feature = "v2")]
use scheduler::{types::process_data, utils as scheduler_utils};
#[cfg(feature = "v2")]
use crate::{
core::{
passive_churn_recovery::{self as pcr},
payments,
},
db::StorageInterface,
errors::StorageError,
types::{
api::{self as api_types},
storage::passive_churn_recovery as pcr_storage_types,
},
};
use crate::{routes::SessionState, types::storage};
pub struct ExecutePcrWorkflow;
#[async_trait::async_trait]
impl ProcessTrackerWorkflow<SessionState> for ExecutePcrWorkflow {
#[cfg(feature = "v1")]
async fn execute_workflow<'a>(
&'a self,
_state: &'a SessionState,
_process: storage::ProcessTracker,
) -> Result<(), errors::ProcessTrackerError> {
Ok(())
}
#[cfg(feature = "v2")]
async fn execute_workflow<'a>(
&'a self,
state: &'a SessionState,
process: storage::ProcessTracker,
) -> Result<(), errors::ProcessTrackerError> {
let tracking_data = process
.tracking_data
.clone()
.parse_value::<pcr_storage_types::PcrWorkflowTrackingData>(
"PCRWorkflowTrackingData",
)?;
let request = PaymentsGetIntentRequest {
id: tracking_data.global_payment_id.clone(),
};
let key_manager_state = &state.into();
let pcr_data = extract_data_and_perform_action(state, &tracking_data).await?;
let (payment_data, _, _) = payments::payments_intent_operation_core::<
api_types::PaymentGetIntent,
_,
_,
PaymentIntentData<api_types::PaymentGetIntent>,
>(
state,
state.get_req_state(),
pcr_data.merchant_account.clone(),
pcr_data.profile.clone(),
pcr_data.key_store.clone(),
payments::operations::PaymentGetIntent,
request,
tracking_data.global_payment_id.clone(),
hyperswitch_domain_models::payments::HeaderPayload::default(),
None,
)
.await?;
match process.name.as_deref() {
Some("EXECUTE_WORKFLOW") => {
pcr::perform_execute_payment(
state,
&process,
&tracking_data,
&pcr_data,
key_manager_state,
&payment_data.payment_intent,
)
.await
}
Some("PSYNC_WORKFLOW") => todo!(),
Some("REVIEW_WORKFLOW") => todo!(),
_ => Err(errors::ProcessTrackerError::JobNotFound),
}
}
}
#[cfg(feature = "v2")]
pub(crate) async fn extract_data_and_perform_action(
state: &SessionState,
tracking_data: &pcr_storage_types::PcrWorkflowTrackingData,
) -> Result<pcr_storage_types::PcrPaymentData, errors::ProcessTrackerError> {
let db = &state.store;
let key_manager_state = &state.into();
let key_store = db
.get_merchant_key_store_by_merchant_id(
key_manager_state,
&tracking_data.merchant_id,
&db.get_master_key().to_vec().into(),
)
.await?;
let merchant_account = db
.find_merchant_account_by_merchant_id(
key_manager_state,
&tracking_data.merchant_id,
&key_store,
)
.await?;
let profile = db
.find_business_profile_by_profile_id(
key_manager_state,
&key_store,
&tracking_data.profile_id,
)
.await?;
let pcr_payment_data = pcr_storage_types::PcrPaymentData {
merchant_account,
profile,
key_store,
};
Ok(pcr_payment_data)
}
#[cfg(feature = "v2")]
pub(crate) async fn get_schedule_time_to_retry_mit_payments(
db: &dyn StorageInterface,
merchant_id: &common_utils::id_type::MerchantId,
retry_count: i32,
) -> Option<time::PrimitiveDateTime> {
let key = "pt_mapping_pcr_retries";
let result = db
.find_config_by_key(key)
.await
.map(|value| value.config)
.and_then(|config| {
config
.parse_struct("RevenueRecoveryPaymentProcessTrackerMapping")
.change_context(StorageError::DeserializationFailed)
});
let mapping = result.map_or_else(
|error| {
if error.current_context().is_db_not_found() {
logger::debug!("Revenue Recovery retry config `{key}` not found, ignoring");
} else {
logger::error!(
?error,
"Failed to read Revenue Recovery retry config `{key}`"
);
}
process_data::RevenueRecoveryPaymentProcessTrackerMapping::default()
},
|mapping| {
logger::debug!(?mapping, "Using custom pcr payments retry config");
mapping
},
);
let time_delta =
scheduler_utils::get_pcr_payments_retry_schedule_time(mapping, merchant_id, retry_count);
scheduler_utils::get_time_from_delta(time_delta)
}

View File

@ -111,6 +111,14 @@ impl ProcessTrackerWorkflow<SessionState> for PaymentMethodStatusUpdateWorkflow
Ok(()) Ok(())
} }
#[cfg(feature = "v2")]
async fn execute_workflow<'a>(
&'a self,
_state: &'a SessionState,
_process: storage::ProcessTracker,
) -> Result<(), errors::ProcessTrackerError> {
todo!()
}
async fn error_handler<'a>( async fn error_handler<'a>(
&'a self, &'a self,
_state: &'a SessionState, _state: &'a SessionState,

View File

@ -31,8 +31,8 @@ impl ProcessTrackerWorkflow<SessionState> for PaymentsSyncWorkflow {
#[cfg(feature = "v2")] #[cfg(feature = "v2")]
async fn execute_workflow<'a>( async fn execute_workflow<'a>(
&'a self, &'a self,
state: &'a SessionState, _state: &'a SessionState,
process: storage::ProcessTracker, _process: storage::ProcessTracker,
) -> Result<(), sch_errors::ProcessTrackerError> { ) -> Result<(), sch_errors::ProcessTrackerError> {
todo!() todo!()
} }

View File

@ -1,13 +1,14 @@
use scheduler::consumer::workflows::ProcessTrackerWorkflow; use scheduler::consumer::workflows::ProcessTrackerWorkflow;
use crate::{ #[cfg(feature = "v1")]
core::refunds as refund_flow, errors, logger::error, routes::SessionState, types::storage, use crate::core::refunds as refund_flow;
}; use crate::{errors, logger::error, routes::SessionState, types::storage};
pub struct RefundWorkflowRouter; pub struct RefundWorkflowRouter;
#[async_trait::async_trait] #[async_trait::async_trait]
impl ProcessTrackerWorkflow<SessionState> for RefundWorkflowRouter { impl ProcessTrackerWorkflow<SessionState> for RefundWorkflowRouter {
#[cfg(feature = "v1")]
async fn execute_workflow<'a>( async fn execute_workflow<'a>(
&'a self, &'a self,
state: &'a SessionState, state: &'a SessionState,
@ -15,6 +16,14 @@ impl ProcessTrackerWorkflow<SessionState> for RefundWorkflowRouter {
) -> Result<(), errors::ProcessTrackerError> { ) -> Result<(), errors::ProcessTrackerError> {
Ok(Box::pin(refund_flow::start_refund_workflow(state, &process)).await?) Ok(Box::pin(refund_flow::start_refund_workflow(state, &process)).await?)
} }
#[cfg(feature = "v2")]
async fn execute_workflow<'a>(
&'a self,
_state: &'a SessionState,
_process: storage::ProcessTracker,
) -> Result<(), errors::ProcessTrackerError> {
todo!()
}
async fn error_handler<'a>( async fn error_handler<'a>(
&'a self, &'a self,

View File

@ -1,13 +1,14 @@
use scheduler::consumer::workflows::ProcessTrackerWorkflow; use scheduler::consumer::workflows::ProcessTrackerWorkflow;
use crate::{ #[cfg(feature = "v1")]
core::payment_methods::vault, errors, logger::error, routes::SessionState, types::storage, use crate::core::payment_methods::vault;
}; use crate::{errors, logger::error, routes::SessionState, types::storage};
pub struct DeleteTokenizeDataWorkflow; pub struct DeleteTokenizeDataWorkflow;
#[async_trait::async_trait] #[async_trait::async_trait]
impl ProcessTrackerWorkflow<SessionState> for DeleteTokenizeDataWorkflow { impl ProcessTrackerWorkflow<SessionState> for DeleteTokenizeDataWorkflow {
#[cfg(feature = "v1")]
async fn execute_workflow<'a>( async fn execute_workflow<'a>(
&'a self, &'a self,
state: &'a SessionState, state: &'a SessionState,
@ -16,6 +17,15 @@ impl ProcessTrackerWorkflow<SessionState> for DeleteTokenizeDataWorkflow {
Ok(vault::start_tokenize_data_workflow(state, &process).await?) Ok(vault::start_tokenize_data_workflow(state, &process).await?)
} }
#[cfg(feature = "v2")]
async fn execute_workflow<'a>(
&'a self,
_state: &'a SessionState,
_process: storage::ProcessTracker,
) -> Result<(), errors::ProcessTrackerError> {
todo!()
}
async fn error_handler<'a>( async fn error_handler<'a>(
&'a self, &'a self,
_state: &'a SessionState, _state: &'a SessionState,

View File

@ -82,3 +82,39 @@ impl Default for OutgoingWebhookRetryProcessTrackerMapping {
} }
} }
} }
/// Configuration for outgoing webhook retries.
#[derive(Debug, Serialize, Deserialize)]
pub struct RevenueRecoveryPaymentProcessTrackerMapping {
/// Default (fallback) retry configuration used when no merchant-specific retry configuration
/// exists.
pub default_mapping: RetryMapping,
/// Merchant-specific retry configuration.
pub custom_merchant_mapping: HashMap<common_utils::id_type::MerchantId, RetryMapping>,
}
impl Default for RevenueRecoveryPaymentProcessTrackerMapping {
fn default() -> Self {
Self {
default_mapping: RetryMapping {
// 1st attempt happens after 1 minute of it being
start_after: 60,
frequencies: vec![
// 2nd and 3rd attempts happen at intervals of 3 hours each
(60 * 60 * 3, 2),
// 4th, 5th, 6th attempts happen at intervals of 6 hours each
(60 * 60 * 6, 3),
// 7th, 8th, 9th attempts happen at intervals of 9 hour each
(60 * 60 * 9, 3),
// 10th, 11th and 12th attempts happen at intervals of 12 hours each
(60 * 60 * 12, 3),
// 13th, 14th and 15th attempts happen at intervals of 18 hours each
(60 * 60 * 18, 3),
],
},
custom_merchant_mapping: HashMap::new(),
}
}
}

View File

@ -4,7 +4,7 @@ use external_services::email::EmailError;
use hyperswitch_domain_models::errors::api_error_response::ApiErrorResponse; use hyperswitch_domain_models::errors::api_error_response::ApiErrorResponse;
pub use redis_interface::errors::RedisError; pub use redis_interface::errors::RedisError;
pub use storage_impl::errors::ApplicationError; pub use storage_impl::errors::ApplicationError;
use storage_impl::errors::StorageError; use storage_impl::errors::{RecoveryError, StorageError};
use crate::env::logger::{self, error}; use crate::env::logger::{self, error};
@ -46,6 +46,8 @@ pub enum ProcessTrackerError {
EApiErrorResponse, EApiErrorResponse,
#[error("Received Error ClientError")] #[error("Received Error ClientError")]
EClientError, EClientError,
#[error("Received RecoveryError: {0:?}")]
ERecoveryError(error_stack::Report<RecoveryError>),
#[error("Received Error StorageError: {0:?}")] #[error("Received Error StorageError: {0:?}")]
EStorageError(error_stack::Report<StorageError>), EStorageError(error_stack::Report<StorageError>),
#[error("Received Error RedisError: {0:?}")] #[error("Received Error RedisError: {0:?}")]
@ -131,3 +133,8 @@ error_to_process_tracker_error!(
error_stack::Report<EmailError>, error_stack::Report<EmailError>,
ProcessTrackerError::EEmailError(error_stack::Report<EmailError>) ProcessTrackerError::EEmailError(error_stack::Report<EmailError>)
); );
error_to_process_tracker_error!(
error_stack::Report<RecoveryError>,
ProcessTrackerError::ERecoveryError(error_stack::Report<RecoveryError>)
);

View File

@ -350,6 +350,25 @@ pub fn get_outgoing_webhook_retry_schedule_time(
} }
} }
pub fn get_pcr_payments_retry_schedule_time(
mapping: process_data::RevenueRecoveryPaymentProcessTrackerMapping,
merchant_id: &common_utils::id_type::MerchantId,
retry_count: i32,
) -> Option<i32> {
let mapping = match mapping.custom_merchant_mapping.get(merchant_id) {
Some(map) => map.clone(),
None => mapping.default_mapping,
};
// TODO: check if the current scheduled time is not more than the configured timerange
// For first try, get the `start_after` time
if retry_count == 0 {
Some(mapping.start_after)
} else {
get_delay(retry_count, &mapping.frequencies)
}
}
/// Get the delay based on the retry count /// Get the delay based on the retry count
pub fn get_delay<'a>( pub fn get_delay<'a>(
retry_count: i32, retry_count: i32,

View File

@ -299,3 +299,15 @@ pub enum HealthCheckGRPCServiceError {
#[error("Failed to establish connection with gRPC service")] #[error("Failed to establish connection with gRPC service")]
FailedToCallService, FailedToCallService,
} }
#[derive(thiserror::Error, Debug, Clone)]
pub enum RecoveryError {
#[error("Failed to make a recovery payment")]
PaymentCallFailed,
#[error("Encountered a Process Tracker Task Failure")]
ProcessTrackerFailure,
#[error("The encountered task is invalid")]
InvalidTask,
#[error("The Intended data was not found")]
ValueNotFound,
}