mirror of
https://github.com/juspay/hyperswitch.git
synced 2025-10-29 09:07:09 +08:00
feat: Time based deletion of temp card (#729)
This commit is contained in:
@ -549,7 +549,7 @@ pub struct GetTokenizePayloadRequest {
|
||||
pub get_value2: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug, serde::Serialize)]
|
||||
#[derive(Debug, serde::Serialize, serde::Deserialize)]
|
||||
pub struct DeleteTokenizeByTokenRequest {
|
||||
pub lookup_key: String,
|
||||
pub service_name: String,
|
||||
|
||||
@ -18,6 +18,8 @@ use common_utils::{
|
||||
use error_stack::{report, ResultExt};
|
||||
use router_env::{instrument, tracing};
|
||||
|
||||
#[cfg(feature = "basilisk")]
|
||||
use crate::scheduler::metrics;
|
||||
use crate::{
|
||||
configs::settings,
|
||||
core::{
|
||||
@ -1623,7 +1625,15 @@ impl BasiliskCardSupport {
|
||||
.change_context(errors::ApiErrorResponse::InternalServerError)
|
||||
.attach_printable("Wrapped value2 construction failed when saving card to locker")?;
|
||||
|
||||
vault::create_tokenize(state, value1, Some(value2), payment_token.to_string()).await?;
|
||||
let lookup_key =
|
||||
vault::create_tokenize(state, value1, Some(value2), payment_token.to_string()).await?;
|
||||
vault::add_delete_tokenized_data_task(
|
||||
&*state.store,
|
||||
&lookup_key,
|
||||
enums::PaymentMethod::Card,
|
||||
)
|
||||
.await?;
|
||||
metrics::TOKENIZED_DATA_COUNT.add(&metrics::CONTEXT, 1, &[]);
|
||||
Ok(card)
|
||||
}
|
||||
}
|
||||
|
||||
@ -5,13 +5,14 @@ use josekit::jwe;
|
||||
use masking::PeekInterface;
|
||||
use router_env::{instrument, tracing};
|
||||
|
||||
#[cfg(not(feature = "basilisk"))]
|
||||
use crate::types::storage;
|
||||
use crate::{
|
||||
configs::settings::Jwekey,
|
||||
core::errors::{self, CustomResult, RouterResult},
|
||||
logger, routes,
|
||||
types::api,
|
||||
types::{
|
||||
api,
|
||||
storage::{self, enums},
|
||||
},
|
||||
utils::{self, StringExt},
|
||||
};
|
||||
#[cfg(feature = "basilisk")]
|
||||
@ -21,6 +22,12 @@ use crate::{
|
||||
utils::BytesExt,
|
||||
};
|
||||
#[cfg(feature = "basilisk")]
|
||||
use crate::{
|
||||
db,
|
||||
scheduler::{metrics, process_data, utils as process_tracker_utils},
|
||||
types::storage::ProcessTrackerExt,
|
||||
};
|
||||
#[cfg(feature = "basilisk")]
|
||||
const VAULT_SERVICE_NAME: &str = "CARD";
|
||||
#[cfg(feature = "basilisk")]
|
||||
const VAULT_VERSION: &str = "0";
|
||||
@ -257,6 +264,7 @@ impl Vault {
|
||||
token_id: Option<String>,
|
||||
payment_method: &api::PaymentMethodData,
|
||||
customer_id: Option<String>,
|
||||
_pm: enums::PaymentMethod,
|
||||
) -> RouterResult<String> {
|
||||
let value1 = payment_method
|
||||
.get_value1(customer_id.clone())
|
||||
@ -343,6 +351,7 @@ impl Vault {
|
||||
token_id: Option<String>,
|
||||
payment_method: &api::PaymentMethodData,
|
||||
customer_id: Option<String>,
|
||||
pm: enums::PaymentMethod,
|
||||
) -> RouterResult<String> {
|
||||
let value1 = payment_method
|
||||
.get_value1(customer_id.clone())
|
||||
@ -356,7 +365,10 @@ impl Vault {
|
||||
|
||||
let lookup_key = token_id.unwrap_or_else(|| generate_id_with_default_len("token"));
|
||||
|
||||
create_tokenize(state, value1, Some(value2), lookup_key).await
|
||||
let lookup_key = create_tokenize(state, value1, Some(value2), lookup_key).await?;
|
||||
add_delete_tokenized_data_task(&*state.store, &lookup_key, pm).await?;
|
||||
metrics::TOKENIZED_DATA_COUNT.add(&metrics::CONTEXT, 1, &[]);
|
||||
Ok(lookup_key)
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
@ -644,3 +656,137 @@ pub async fn delete_tokenized_data(
|
||||
.attach_printable(format!("Got 4xx from the basilisk locker: {err:?}")),
|
||||
}
|
||||
}
|
||||
|
||||
// ********************************************** PROCESS TRACKER **********************************************
|
||||
#[cfg(feature = "basilisk")]
|
||||
pub async fn add_delete_tokenized_data_task(
|
||||
db: &dyn db::StorageInterface,
|
||||
lookup_key: &str,
|
||||
pm: enums::PaymentMethod,
|
||||
) -> RouterResult<storage::ProcessTracker> {
|
||||
let runner = "DELETE_TOKENIZE_DATA_WORKFLOW";
|
||||
let current_time = common_utils::date_time::now();
|
||||
let tracking_data = serde_json::to_value(storage::TokenizeCoreWorkflow {
|
||||
lookup_key: lookup_key.to_owned(),
|
||||
pm,
|
||||
})
|
||||
.into_report()
|
||||
.change_context(errors::ApiErrorResponse::InternalServerError)
|
||||
.attach_printable_lazy(|| format!("unable to convert into value {:?}", lookup_key))?;
|
||||
|
||||
let schedule_time = get_delete_tokenize_schedule_time(db, &pm, 0).await;
|
||||
|
||||
let process_tracker_entry = storage::ProcessTrackerNew {
|
||||
id: format!("{}_{}", runner, lookup_key),
|
||||
name: Some(String::from(runner)),
|
||||
tag: vec![String::from("BASILISK-V3")],
|
||||
runner: Some(String::from(runner)),
|
||||
retry_count: 0,
|
||||
schedule_time,
|
||||
rule: String::new(),
|
||||
tracking_data,
|
||||
business_status: String::from("Pending"),
|
||||
status: enums::ProcessTrackerStatus::New,
|
||||
event: vec![],
|
||||
created_at: current_time,
|
||||
updated_at: current_time,
|
||||
};
|
||||
let response = db
|
||||
.insert_process(process_tracker_entry)
|
||||
.await
|
||||
.change_context(errors::ApiErrorResponse::InternalServerError)
|
||||
.attach_printable_lazy(|| {
|
||||
format!(
|
||||
"Failed while inserting task in process_tracker: lookup_key: {}",
|
||||
lookup_key
|
||||
)
|
||||
})?;
|
||||
Ok(response)
|
||||
}
|
||||
|
||||
#[cfg(feature = "basilisk")]
|
||||
pub async fn start_tokenize_data_workflow(
|
||||
state: &routes::AppState,
|
||||
tokenize_tracker: &storage::ProcessTracker,
|
||||
) -> Result<(), errors::ProcessTrackerError> {
|
||||
let db = &*state.store;
|
||||
let delete_tokenize_data = serde_json::from_value::<storage::TokenizeCoreWorkflow>(
|
||||
tokenize_tracker.tracking_data.clone(),
|
||||
)
|
||||
.into_report()
|
||||
.change_context(errors::ApiErrorResponse::InternalServerError)
|
||||
.attach_printable_lazy(|| {
|
||||
format!(
|
||||
"unable to convert into DeleteTokenizeByTokenRequest {:?}",
|
||||
tokenize_tracker.tracking_data
|
||||
)
|
||||
})?;
|
||||
|
||||
let delete_resp = delete_tokenized_data(state, &delete_tokenize_data.lookup_key).await;
|
||||
match delete_resp {
|
||||
Ok(resp) => {
|
||||
if resp == "Ok" {
|
||||
logger::info!("Card From locker deleted Successfully");
|
||||
//mark task as finished
|
||||
let id = tokenize_tracker.id.clone();
|
||||
tokenize_tracker
|
||||
.clone()
|
||||
.finish_with_status(db, format!("COMPLETED_BY_PT_{id}"))
|
||||
.await?;
|
||||
} else {
|
||||
logger::error!("Error: Deleting Card From Locker : {}", resp);
|
||||
retry_delete_tokenize(db, &delete_tokenize_data.pm, tokenize_tracker.to_owned())
|
||||
.await?;
|
||||
metrics::RETRIED_DELETE_DATA_COUNT.add(&metrics::CONTEXT, 1, &[]);
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
logger::error!("Err: Deleting Card From Locker : {}", err);
|
||||
retry_delete_tokenize(db, &delete_tokenize_data.pm, tokenize_tracker.to_owned())
|
||||
.await?;
|
||||
metrics::RETRIED_DELETE_DATA_COUNT.add(&metrics::CONTEXT, 1, &[]);
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(feature = "basilisk")]
|
||||
pub async fn get_delete_tokenize_schedule_time(
|
||||
db: &dyn db::StorageInterface,
|
||||
pm: &enums::PaymentMethod,
|
||||
retry_count: i32,
|
||||
) -> Option<time::PrimitiveDateTime> {
|
||||
let redis_mapping = db::get_and_deserialize_key(
|
||||
db,
|
||||
&format!("pt_mapping_delete_{pm}_tokenize_data"),
|
||||
"PaymentMethodsPTMapping",
|
||||
)
|
||||
.await;
|
||||
let mapping = match redis_mapping {
|
||||
Ok(x) => x,
|
||||
Err(err) => {
|
||||
logger::info!("Redis Mapping Error: {}", err);
|
||||
process_data::PaymentMethodsPTMapping::default()
|
||||
}
|
||||
};
|
||||
let time_delta = process_tracker_utils::get_pm_schedule_time(mapping, pm, retry_count + 1);
|
||||
|
||||
process_tracker_utils::get_time_from_delta(time_delta)
|
||||
}
|
||||
|
||||
#[cfg(feature = "basilisk")]
|
||||
pub async fn retry_delete_tokenize(
|
||||
db: &dyn db::StorageInterface,
|
||||
pm: &enums::PaymentMethod,
|
||||
pt: storage::ProcessTracker,
|
||||
) -> Result<(), errors::ProcessTrackerError> {
|
||||
let schedule_time = get_delete_tokenize_schedule_time(db, pm, pt.retry_count).await;
|
||||
|
||||
match schedule_time {
|
||||
Some(s_time) => pt.retry(db, s_time).await,
|
||||
None => {
|
||||
pt.finish_with_status(db, "RETRIES_EXCEEDED".to_string())
|
||||
.await
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -5,6 +5,7 @@ use common_utils::{ext_traits::AsyncExt, fp_utils};
|
||||
use error_stack::{report, IntoReport, ResultExt};
|
||||
use masking::ExposeOptionInterface;
|
||||
use router_env::{instrument, tracing};
|
||||
use storage_models::enums;
|
||||
use uuid::Uuid;
|
||||
|
||||
use super::{
|
||||
@ -736,6 +737,7 @@ pub async fn make_pm_data<'a, F: Clone, R>(
|
||||
Some(token),
|
||||
&updated_pm,
|
||||
payment_data.payment_intent.customer_id.to_owned(),
|
||||
enums::PaymentMethod::Card,
|
||||
)
|
||||
.await?;
|
||||
Some(updated_pm)
|
||||
@ -757,6 +759,7 @@ pub async fn make_pm_data<'a, F: Clone, R>(
|
||||
Some(token),
|
||||
&updated_pm,
|
||||
payment_data.payment_intent.customer_id.to_owned(),
|
||||
enums::PaymentMethod::Wallet,
|
||||
)
|
||||
.await?;
|
||||
Some(updated_pm)
|
||||
@ -779,6 +782,7 @@ pub async fn make_pm_data<'a, F: Clone, R>(
|
||||
None,
|
||||
pm,
|
||||
payment_data.payment_intent.customer_id.to_owned(),
|
||||
enums::PaymentMethod::Card,
|
||||
)
|
||||
.await?;
|
||||
payment_data.token = Some(token);
|
||||
@ -792,6 +796,7 @@ pub async fn make_pm_data<'a, F: Clone, R>(
|
||||
None,
|
||||
pm,
|
||||
payment_data.payment_intent.customer_id.to_owned(),
|
||||
enums::PaymentMethod::Wallet,
|
||||
)
|
||||
.await?;
|
||||
payment_data.token = Some(token);
|
||||
|
||||
@ -28,3 +28,5 @@ create_counter!(TASK_CONSUMED, PT_METER); // Tasks consumed by consumer
|
||||
create_counter!(TASK_PROCESSED, PT_METER); // Tasks completed processing
|
||||
create_counter!(TASK_FINISHED, PT_METER); // Tasks finished
|
||||
create_counter!(TASK_RETRIED, PT_METER); // Tasks added for retries
|
||||
create_counter!(TOKENIZED_DATA_COUNT, PT_METER); // Tokenized data added
|
||||
create_counter!(RETRIED_DELETE_DATA_COUNT, PT_METER); // Tokenized data retried
|
||||
|
||||
@ -2,8 +2,7 @@ use std::collections::HashMap;
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::types::storage::process_tracker::ProcessTracker;
|
||||
|
||||
use crate::types::storage::{enums, process_tracker::ProcessTracker};
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct ProcessData {
|
||||
db_name: String,
|
||||
@ -39,3 +38,25 @@ impl Default for ConnectorPTMapping {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct PaymentMethodsPTMapping {
|
||||
pub default_mapping: RetryMapping,
|
||||
pub custom_pm_mapping: HashMap<enums::PaymentMethod, RetryMapping>,
|
||||
pub max_retries_count: i32,
|
||||
}
|
||||
|
||||
impl Default for PaymentMethodsPTMapping {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
custom_pm_mapping: HashMap::new(),
|
||||
default_mapping: RetryMapping {
|
||||
start_after: 900,
|
||||
frequency: vec![300],
|
||||
count: vec![5],
|
||||
},
|
||||
max_retries_count: 5,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -17,7 +17,10 @@ use crate::{
|
||||
logger,
|
||||
routes::AppState,
|
||||
scheduler::{ProcessTrackerBatch, SchedulerFlow},
|
||||
types::storage::{self, enums::ProcessTrackerStatus},
|
||||
types::storage::{
|
||||
self,
|
||||
enums::{self, ProcessTrackerStatus},
|
||||
},
|
||||
utils::{OptionExt, StringExt},
|
||||
};
|
||||
|
||||
@ -314,6 +317,26 @@ pub fn get_schedule_time(
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_pm_schedule_time(
|
||||
mapping: process_data::PaymentMethodsPTMapping,
|
||||
pm: &enums::PaymentMethod,
|
||||
retry_count: i32,
|
||||
) -> Option<i32> {
|
||||
let mapping = match mapping.custom_pm_mapping.get(pm) {
|
||||
Some(map) => map.clone(),
|
||||
None => mapping.default_mapping,
|
||||
};
|
||||
|
||||
if retry_count == 0 {
|
||||
Some(mapping.start_after)
|
||||
} else {
|
||||
get_delay(
|
||||
retry_count,
|
||||
mapping.count.iter().zip(mapping.frequency.iter()),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
fn get_delay<'a>(
|
||||
retry_count: i32,
|
||||
mut array: impl Iterator<Item = (&'a i32, &'a i32)>,
|
||||
|
||||
@ -6,6 +6,7 @@ use strum::EnumString;
|
||||
use crate::{core::errors, routes::AppState, scheduler::consumer, types::storage};
|
||||
pub mod payment_sync;
|
||||
pub mod refund_router;
|
||||
pub mod tokenized_data;
|
||||
|
||||
macro_rules! runners {
|
||||
($($body:tt),*) => {
|
||||
@ -44,7 +45,8 @@ macro_rules! as_item {
|
||||
|
||||
runners! {
|
||||
PaymentsSyncWorkflow,
|
||||
RefundWorkflowRouter
|
||||
RefundWorkflowRouter,
|
||||
DeleteTokenizeDataWorkflow
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
|
||||
35
crates/router/src/scheduler/workflows/tokenized_data.rs
Normal file
35
crates/router/src/scheduler/workflows/tokenized_data.rs
Normal file
@ -0,0 +1,35 @@
|
||||
use super::{DeleteTokenizeDataWorkflow, ProcessTrackerWorkflow};
|
||||
#[cfg(feature = "basilisk")]
|
||||
use crate::core::payment_methods::vault;
|
||||
use crate::{errors, logger::error, routes::AppState, types::storage};
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl ProcessTrackerWorkflow for DeleteTokenizeDataWorkflow {
|
||||
#[cfg(feature = "basilisk")]
|
||||
async fn execute_workflow<'a>(
|
||||
&'a self,
|
||||
state: &'a AppState,
|
||||
process: storage::ProcessTracker,
|
||||
) -> Result<(), errors::ProcessTrackerError> {
|
||||
Ok(vault::start_tokenize_data_workflow(state, &process).await?)
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "basilisk"))]
|
||||
async fn execute_workflow<'a>(
|
||||
&'a self,
|
||||
_state: &'a AppState,
|
||||
_process: storage::ProcessTracker,
|
||||
) -> Result<(), errors::ProcessTrackerError> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn error_handler<'a>(
|
||||
&'a self,
|
||||
_state: &'a AppState,
|
||||
process: storage::ProcessTracker,
|
||||
_error: errors::ProcessTrackerError,
|
||||
) -> errors::CustomResult<(), errors::ProcessTrackerError> {
|
||||
error!(%process.id, "Failed while executing workflow");
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@ -1 +1 @@
|
||||
pub use storage_models::payment_method::{PaymentMethod, PaymentMethodNew};
|
||||
pub use storage_models::payment_method::{PaymentMethod, PaymentMethodNew, TokenizeCoreWorkflow};
|
||||
|
||||
@ -1,6 +1,7 @@
|
||||
use common_utils::pii;
|
||||
use diesel::{Identifiable, Insertable, Queryable};
|
||||
use masking::Secret;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use time::PrimitiveDateTime;
|
||||
|
||||
use crate::{enums as storage_enums, schema::payment_methods};
|
||||
@ -86,3 +87,9 @@ impl Default for PaymentMethodNew {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Eq, PartialEq, Deserialize, Serialize)]
|
||||
pub struct TokenizeCoreWorkflow {
|
||||
pub lookup_key: String,
|
||||
pub pm: storage_enums::PaymentMethod,
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user