mirror of
https://github.com/juspay/hyperswitch.git
synced 2025-10-29 09:07:09 +08:00
feat(payment_methods): Implement Process tracker workflow for Payment method Status update (#4668)
Co-authored-by: hyperswitch-bot[bot] <148525504+hyperswitch-bot[bot]@users.noreply.github.com>
This commit is contained in:
@ -208,6 +208,7 @@ pub enum ProcessTrackerRunner {
|
|||||||
ApiKeyExpiryWorkflow,
|
ApiKeyExpiryWorkflow,
|
||||||
OutgoingWebhookRetryWorkflow,
|
OutgoingWebhookRetryWorkflow,
|
||||||
AttachPayoutAccountWorkflow,
|
AttachPayoutAccountWorkflow,
|
||||||
|
PaymentMethodStatusUpdateWorkflow,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
|
|||||||
@ -310,6 +310,9 @@ impl ProcessTrackerWorkflows<routes::SessionState> for WorkflowRunner {
|
|||||||
)
|
)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
storage::ProcessTrackerRunner::PaymentMethodStatusUpdateWorkflow => Ok(Box::new(
|
||||||
|
workflows::payment_method_status_update::PaymentMethodStatusUpdateWorkflow,
|
||||||
|
)),
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|||||||
@ -8,11 +8,18 @@ use api_models::payments::CardToken;
|
|||||||
#[cfg(feature = "payouts")]
|
#[cfg(feature = "payouts")]
|
||||||
pub use api_models::{enums::PayoutConnectors, payouts as payout_types};
|
pub use api_models::{enums::PayoutConnectors, payouts as payout_types};
|
||||||
use diesel_models::enums;
|
use diesel_models::enums;
|
||||||
|
use error_stack::ResultExt;
|
||||||
use hyperswitch_domain_models::payments::{payment_attempt::PaymentAttempt, PaymentIntent};
|
use hyperswitch_domain_models::payments::{payment_attempt::PaymentAttempt, PaymentIntent};
|
||||||
use router_env::{instrument, tracing};
|
use router_env::{instrument, tracing};
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
core::{errors::RouterResult, payments::helpers, pm_auth as core_pm_auth},
|
consts,
|
||||||
|
core::{
|
||||||
|
errors::{self, RouterResult},
|
||||||
|
payments::helpers,
|
||||||
|
pm_auth as core_pm_auth,
|
||||||
|
},
|
||||||
|
db,
|
||||||
routes::SessionState,
|
routes::SessionState,
|
||||||
types::{
|
types::{
|
||||||
api::{self, payments},
|
api::{self, payments},
|
||||||
@ -20,6 +27,9 @@ use crate::{
|
|||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
|
const PAYMENT_METHOD_STATUS_UPDATE_TASK: &str = "PAYMENT_METHOD_STATUS_UPDATE";
|
||||||
|
const PAYMENT_METHOD_STATUS_TAG: &str = "PAYMENT_METHOD_STATUS";
|
||||||
|
|
||||||
#[instrument(skip_all)]
|
#[instrument(skip_all)]
|
||||||
pub async fn retrieve_payment_method(
|
pub async fn retrieve_payment_method(
|
||||||
pm_data: &Option<payments::PaymentMethodData>,
|
pm_data: &Option<payments::PaymentMethodData>,
|
||||||
@ -94,6 +104,66 @@ pub async fn retrieve_payment_method(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn generate_task_id_for_payment_method_status_update_workflow(
|
||||||
|
key_id: &str,
|
||||||
|
runner: &storage::ProcessTrackerRunner,
|
||||||
|
task: &str,
|
||||||
|
) -> String {
|
||||||
|
format!("{runner}_{task}_{key_id}")
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn add_payment_method_status_update_task(
|
||||||
|
db: &dyn db::StorageInterface,
|
||||||
|
payment_method: &diesel_models::PaymentMethod,
|
||||||
|
prev_status: enums::PaymentMethodStatus,
|
||||||
|
curr_status: enums::PaymentMethodStatus,
|
||||||
|
merchant_id: &str,
|
||||||
|
) -> Result<(), errors::ProcessTrackerError> {
|
||||||
|
let created_at = payment_method.created_at;
|
||||||
|
let schedule_time =
|
||||||
|
created_at.saturating_add(time::Duration::seconds(consts::DEFAULT_SESSION_EXPIRY));
|
||||||
|
|
||||||
|
let tracking_data = storage::PaymentMethodStatusTrackingData {
|
||||||
|
payment_method_id: payment_method.payment_method_id.clone(),
|
||||||
|
prev_status,
|
||||||
|
curr_status,
|
||||||
|
merchant_id: merchant_id.to_string(),
|
||||||
|
};
|
||||||
|
|
||||||
|
let runner = storage::ProcessTrackerRunner::PaymentMethodStatusUpdateWorkflow;
|
||||||
|
let task = PAYMENT_METHOD_STATUS_UPDATE_TASK;
|
||||||
|
let tag = [PAYMENT_METHOD_STATUS_TAG];
|
||||||
|
|
||||||
|
let process_tracker_id = generate_task_id_for_payment_method_status_update_workflow(
|
||||||
|
payment_method.payment_method_id.as_str(),
|
||||||
|
&runner,
|
||||||
|
task,
|
||||||
|
);
|
||||||
|
let process_tracker_entry = storage::ProcessTrackerNew::new(
|
||||||
|
process_tracker_id,
|
||||||
|
task,
|
||||||
|
runner,
|
||||||
|
tag,
|
||||||
|
tracking_data,
|
||||||
|
schedule_time,
|
||||||
|
)
|
||||||
|
.change_context(errors::ApiErrorResponse::InternalServerError)
|
||||||
|
.attach_printable("Failed to construct PAYMENT_METHOD_STATUS_UPDATE process tracker task")?;
|
||||||
|
|
||||||
|
db
|
||||||
|
.insert_process(process_tracker_entry)
|
||||||
|
.await
|
||||||
|
.change_context(errors::ApiErrorResponse::InternalServerError)
|
||||||
|
.attach_printable_lazy(|| {
|
||||||
|
format!(
|
||||||
|
"Failed while inserting PAYMENT_METHOD_STATUS_UPDATE reminder to process_tracker for payment_method_id: {}",
|
||||||
|
payment_method.payment_method_id.clone()
|
||||||
|
)
|
||||||
|
})?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
#[instrument(skip_all)]
|
#[instrument(skip_all)]
|
||||||
pub async fn retrieve_payment_method_with_token(
|
pub async fn retrieve_payment_method_with_token(
|
||||||
state: &SessionState,
|
state: &SessionState,
|
||||||
|
|||||||
@ -45,7 +45,9 @@ use crate::{
|
|||||||
configs::settings,
|
configs::settings,
|
||||||
core::{
|
core::{
|
||||||
errors::{self, StorageErrorExt},
|
errors::{self, StorageErrorExt},
|
||||||
payment_methods::{transformers as payment_methods, vault},
|
payment_methods::{
|
||||||
|
add_payment_method_status_update_task, transformers as payment_methods, vault,
|
||||||
|
},
|
||||||
payments::{
|
payments::{
|
||||||
helpers,
|
helpers,
|
||||||
routing::{self, SessionFlowRoutingInput},
|
routing::{self, SessionFlowRoutingInput},
|
||||||
@ -297,6 +299,21 @@ pub async fn get_client_secret_or_add_payment_method(
|
|||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
|
if res.status == enums::PaymentMethodStatus::AwaitingData {
|
||||||
|
add_payment_method_status_update_task(
|
||||||
|
db,
|
||||||
|
&res,
|
||||||
|
enums::PaymentMethodStatus::AwaitingData,
|
||||||
|
enums::PaymentMethodStatus::Inactive,
|
||||||
|
merchant_id,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.change_context(errors::ApiErrorResponse::InternalServerError)
|
||||||
|
.attach_printable(
|
||||||
|
"Failed to add payment method status update task in process tracker",
|
||||||
|
)?;
|
||||||
|
}
|
||||||
|
|
||||||
Ok(services::api::ApplicationResponse::Json(
|
Ok(services::api::ApplicationResponse::Json(
|
||||||
api::PaymentMethodResponse::foreign_from(res),
|
api::PaymentMethodResponse::foreign_from(res),
|
||||||
))
|
))
|
||||||
@ -357,7 +374,7 @@ pub async fn add_payment_method_data(
|
|||||||
.attach_printable("Unable to find payment method")?;
|
.attach_printable("Unable to find payment method")?;
|
||||||
|
|
||||||
if payment_method.status != enums::PaymentMethodStatus::AwaitingData {
|
if payment_method.status != enums::PaymentMethodStatus::AwaitingData {
|
||||||
return Err((errors::ApiErrorResponse::DuplicatePaymentMethod).into());
|
return Err((errors::ApiErrorResponse::ClientSecretExpired).into());
|
||||||
}
|
}
|
||||||
|
|
||||||
let customer_id = payment_method.customer_id.clone();
|
let customer_id = payment_method.customer_id.clone();
|
||||||
|
|||||||
@ -110,3 +110,11 @@ impl DerefMut for PaymentsMandateReference {
|
|||||||
&mut self.0
|
&mut self.0
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, serde::Deserialize, serde::Serialize, Clone)]
|
||||||
|
pub struct PaymentMethodStatusTrackingData {
|
||||||
|
pub payment_method_id: String,
|
||||||
|
pub prev_status: enums::PaymentMethodStatus,
|
||||||
|
pub curr_status: enums::PaymentMethodStatus,
|
||||||
|
pub merchant_id: String,
|
||||||
|
}
|
||||||
|
|||||||
@ -3,6 +3,7 @@ pub mod api_key_expiry;
|
|||||||
#[cfg(feature = "payouts")]
|
#[cfg(feature = "payouts")]
|
||||||
pub mod attach_payout_account_workflow;
|
pub mod attach_payout_account_workflow;
|
||||||
pub mod outgoing_webhook_retry;
|
pub mod outgoing_webhook_retry;
|
||||||
|
pub mod payment_method_status_update;
|
||||||
pub mod payment_sync;
|
pub mod payment_sync;
|
||||||
pub mod refund_router;
|
pub mod refund_router;
|
||||||
pub mod tokenized_data;
|
pub mod tokenized_data;
|
||||||
|
|||||||
107
crates/router/src/workflows/payment_method_status_update.rs
Normal file
107
crates/router/src/workflows/payment_method_status_update.rs
Normal file
@ -0,0 +1,107 @@
|
|||||||
|
use common_utils::ext_traits::ValueExt;
|
||||||
|
use scheduler::{
|
||||||
|
consumer::types::process_data, utils as pt_utils, workflows::ProcessTrackerWorkflow,
|
||||||
|
};
|
||||||
|
|
||||||
|
use crate::{
|
||||||
|
errors,
|
||||||
|
logger::error,
|
||||||
|
routes::SessionState,
|
||||||
|
types::storage::{self, PaymentMethodStatusTrackingData},
|
||||||
|
};
|
||||||
|
|
||||||
|
pub struct PaymentMethodStatusUpdateWorkflow;
|
||||||
|
|
||||||
|
#[async_trait::async_trait]
|
||||||
|
impl ProcessTrackerWorkflow<SessionState> for PaymentMethodStatusUpdateWorkflow {
|
||||||
|
async fn execute_workflow<'a>(
|
||||||
|
&'a self,
|
||||||
|
state: &'a SessionState,
|
||||||
|
process: storage::ProcessTracker,
|
||||||
|
) -> Result<(), errors::ProcessTrackerError> {
|
||||||
|
let db = &*state.store;
|
||||||
|
let tracking_data: PaymentMethodStatusTrackingData = process
|
||||||
|
.tracking_data
|
||||||
|
.clone()
|
||||||
|
.parse_value("PaymentMethodStatusTrackingData")?;
|
||||||
|
|
||||||
|
let retry_count = process.retry_count;
|
||||||
|
let pm_id = tracking_data.payment_method_id;
|
||||||
|
let prev_pm_status = tracking_data.prev_status;
|
||||||
|
let curr_pm_status = tracking_data.curr_status;
|
||||||
|
let merchant_id = tracking_data.merchant_id;
|
||||||
|
|
||||||
|
let key_store = state
|
||||||
|
.store
|
||||||
|
.get_merchant_key_store_by_merchant_id(
|
||||||
|
merchant_id.as_str(),
|
||||||
|
&state.store.get_master_key().to_vec().into(),
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
let merchant_account = db
|
||||||
|
.find_merchant_account_by_merchant_id(&merchant_id, &key_store)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
let payment_method = db
|
||||||
|
.find_payment_method(&pm_id, merchant_account.storage_scheme)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
if payment_method.status != prev_pm_status {
|
||||||
|
return db
|
||||||
|
.as_scheduler()
|
||||||
|
.finish_process_with_business_status(process, "PROCESS_ALREADY_COMPLETED")
|
||||||
|
.await
|
||||||
|
.map_err(Into::<errors::ProcessTrackerError>::into);
|
||||||
|
}
|
||||||
|
|
||||||
|
let pm_update = storage::PaymentMethodUpdate::StatusUpdate {
|
||||||
|
status: Some(curr_pm_status),
|
||||||
|
};
|
||||||
|
|
||||||
|
let res = db
|
||||||
|
.update_payment_method(payment_method, pm_update, merchant_account.storage_scheme)
|
||||||
|
.await
|
||||||
|
.map_err(errors::ProcessTrackerError::EStorageError);
|
||||||
|
|
||||||
|
if let Ok(_pm) = res {
|
||||||
|
db.as_scheduler()
|
||||||
|
.finish_process_with_business_status(process, "COMPLETED_BY_PT")
|
||||||
|
.await?;
|
||||||
|
} else {
|
||||||
|
let mapping = process_data::PaymentMethodsPTMapping::default();
|
||||||
|
let time_delta = if retry_count == 0 {
|
||||||
|
Some(mapping.default_mapping.start_after)
|
||||||
|
} else {
|
||||||
|
pt_utils::get_delay(retry_count + 1, &mapping.default_mapping.frequencies)
|
||||||
|
};
|
||||||
|
|
||||||
|
let schedule_time = pt_utils::get_time_from_delta(time_delta);
|
||||||
|
|
||||||
|
match schedule_time {
|
||||||
|
Some(s_time) => db
|
||||||
|
.as_scheduler()
|
||||||
|
.retry_process(process, s_time)
|
||||||
|
.await
|
||||||
|
.map_err(Into::<errors::ProcessTrackerError>::into)?,
|
||||||
|
None => db
|
||||||
|
.as_scheduler()
|
||||||
|
.finish_process_with_business_status(process, "RETRIES_EXCEEDED")
|
||||||
|
.await
|
||||||
|
.map_err(Into::<errors::ProcessTrackerError>::into)?,
|
||||||
|
};
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn error_handler<'a>(
|
||||||
|
&'a self,
|
||||||
|
_state: &'a SessionState,
|
||||||
|
process: storage::ProcessTracker,
|
||||||
|
_error: errors::ProcessTrackerError,
|
||||||
|
) -> errors::CustomResult<(), errors::ProcessTrackerError> {
|
||||||
|
error!(%process.id, "Failed while executing workflow");
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -342,7 +342,7 @@ pub fn get_outgoing_webhook_retry_schedule_time(
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Get the delay based on the retry count
|
/// Get the delay based on the retry count
|
||||||
fn get_delay<'a>(
|
pub fn get_delay<'a>(
|
||||||
retry_count: i32,
|
retry_count: i32,
|
||||||
frequencies: impl IntoIterator<Item = &'a (i32, i32)>,
|
frequencies: impl IntoIterator<Item = &'a (i32, i32)>,
|
||||||
) -> Option<i32> {
|
) -> Option<i32> {
|
||||||
|
|||||||
Reference in New Issue
Block a user