feat(revenue_recovery): add support for updating additional card info data from csv to redis (#9233)

Co-authored-by: hyperswitch-bot[bot] <148525504+hyperswitch-bot[bot]@users.noreply.github.com>
This commit is contained in:
AdityaWNL
2025-09-18 16:33:32 +05:30
committed by GitHub
parent 85bc733d5b
commit d98ffdfb37
14 changed files with 702 additions and 94 deletions

2
Cargo.lock generated
View File

@ -459,6 +459,7 @@ dependencies = [
"common_enums", "common_enums",
"common_types", "common_types",
"common_utils", "common_utils",
"csv",
"deserialize_form_style_query_parameter", "deserialize_form_style_query_parameter",
"error-stack 0.4.1", "error-stack 0.4.1",
"euclid", "euclid",
@ -470,6 +471,7 @@ dependencies = [
"serde", "serde",
"serde_json", "serde_json",
"strum 0.26.3", "strum 0.26.3",
"tempfile",
"time", "time",
"url", "url",
"utoipa", "utoipa",

View File

@ -17,21 +17,23 @@ olap = []
openapi = ["common_enums/openapi", "olap", "recon", "dummy_connector", "olap"] openapi = ["common_enums/openapi", "olap", "recon", "dummy_connector", "olap"]
recon = [] recon = []
v1 = ["common_utils/v1"] v1 = ["common_utils/v1"]
v2 = ["common_types/v2", "common_utils/v2", "tokenization_v2", "dep:reqwest"] v2 = ["common_types/v2", "common_utils/v2", "tokenization_v2", "dep:reqwest", "revenue_recovery"]
dynamic_routing = [] dynamic_routing = []
control_center_theme = ["dep:actix-web", "dep:actix-multipart"] control_center_theme = ["dep:actix-web", "dep:actix-multipart"]
revenue_recovery = [] revenue_recovery = ["dep:actix-multipart"]
tokenization_v2 = ["common_utils/tokenization_v2"] tokenization_v2 = ["common_utils/tokenization_v2"]
[dependencies] [dependencies]
actix-multipart = { version = "0.6.2", optional = true } actix-multipart = { version = "0.6.2", optional = true }
actix-web = { version = "4.11.0", optional = true } actix-web = { version = "4.11.0", optional = true }
csv = "1.3"
error-stack = "0.4.1" error-stack = "0.4.1"
mime = "0.3.17" mime = "0.3.17"
reqwest = { version = "0.11.27", optional = true } reqwest = { version = "0.11.27", optional = true }
serde = { version = "1.0.219", features = ["derive"] } serde = { version = "1.0.219", features = ["derive"] }
serde_json = "1.0.140" serde_json = "1.0.140"
strum = { version = "0.26", features = ["derive"] } strum = { version = "0.26", features = ["derive"] }
tempfile = "3.8"
time = { version = "0.3.41", features = ["serde", "serde-well-known", "std"] } time = { version = "0.3.41", features = ["serde", "serde-well-known", "std"] }
url = { version = "2.5.4", features = ["serde"] } url = { version = "2.5.4", features = ["serde"] }
utoipa = { version = "4.2.3", features = ["preserve_order", "preserve_path_order"] } utoipa = { version = "4.2.3", features = ["preserve_order", "preserve_path_order"] }

View File

@ -41,6 +41,8 @@ pub mod proxy;
pub mod recon; pub mod recon;
pub mod refunds; pub mod refunds;
pub mod relay; pub mod relay;
#[cfg(feature = "v2")]
pub mod revenue_recovery_data_backfill;
pub mod routing; pub mod routing;
pub mod surcharge_decision_configs; pub mod surcharge_decision_configs;
pub mod three_ds_decision_rule; pub mod three_ds_decision_rule;

View File

@ -0,0 +1,150 @@
use std::{collections::HashMap, fs::File, io::BufReader};
use actix_multipart::form::{tempfile::TempFile, MultipartForm};
use actix_web::{HttpResponse, ResponseError};
use common_enums::{CardNetwork, PaymentMethodType};
use common_utils::events::ApiEventMetric;
use csv::Reader;
use masking::Secret;
use serde::{Deserialize, Serialize};
use time::Date;
#[derive(Debug, Deserialize, Serialize)]
pub struct RevenueRecoveryBackfillRequest {
pub bin_number: Option<Secret<String>>,
pub customer_id_resp: String,
pub connector_payment_id: Option<String>,
pub token: Option<Secret<String>>,
pub exp_date: Option<Secret<String>>,
pub card_network: Option<CardNetwork>,
pub payment_method_sub_type: Option<PaymentMethodType>,
pub clean_bank_name: Option<String>,
pub country_name: Option<String>,
pub daily_retry_history: Option<String>,
}
#[derive(Debug, Serialize)]
pub struct RevenueRecoveryDataBackfillResponse {
pub processed_records: usize,
pub failed_records: usize,
}
#[derive(Debug, Serialize)]
pub struct CsvParsingResult {
pub records: Vec<RevenueRecoveryBackfillRequest>,
pub failed_records: Vec<CsvParsingError>,
}
#[derive(Debug, Serialize)]
pub struct CsvParsingError {
pub row_number: usize,
pub error: String,
}
/// Comprehensive card
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ComprehensiveCardData {
pub card_type: Option<String>,
pub card_exp_month: Option<Secret<String>>,
pub card_exp_year: Option<Secret<String>>,
pub card_network: Option<CardNetwork>,
pub card_issuer: Option<String>,
pub card_issuing_country: Option<String>,
pub daily_retry_history: Option<HashMap<Date, i32>>,
}
impl ApiEventMetric for RevenueRecoveryDataBackfillResponse {
fn get_api_event_type(&self) -> Option<common_utils::events::ApiEventsType> {
Some(common_utils::events::ApiEventsType::Miscellaneous)
}
}
impl ApiEventMetric for CsvParsingResult {
fn get_api_event_type(&self) -> Option<common_utils::events::ApiEventsType> {
Some(common_utils::events::ApiEventsType::Miscellaneous)
}
}
impl ApiEventMetric for CsvParsingError {
fn get_api_event_type(&self) -> Option<common_utils::events::ApiEventsType> {
Some(common_utils::events::ApiEventsType::Miscellaneous)
}
}
#[derive(Debug, Clone, Serialize)]
pub enum BackfillError {
InvalidCardType(String),
DatabaseError(String),
RedisError(String),
CsvParsingError(String),
FileProcessingError(String),
}
#[derive(serde::Deserialize)]
pub struct BackfillQuery {
pub cutoff_time: Option<String>,
}
impl std::fmt::Display for BackfillError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::InvalidCardType(msg) => write!(f, "Invalid card type: {}", msg),
Self::DatabaseError(msg) => write!(f, "Database error: {}", msg),
Self::RedisError(msg) => write!(f, "Redis error: {}", msg),
Self::CsvParsingError(msg) => write!(f, "CSV parsing error: {}", msg),
Self::FileProcessingError(msg) => write!(f, "File processing error: {}", msg),
}
}
}
impl std::error::Error for BackfillError {}
impl ResponseError for BackfillError {
fn error_response(&self) -> HttpResponse {
HttpResponse::BadRequest().json(serde_json::json!({
"error": self.to_string()
}))
}
}
#[derive(Debug, MultipartForm)]
pub struct RevenueRecoveryDataBackfillForm {
#[multipart(rename = "file")]
pub file: TempFile,
}
impl RevenueRecoveryDataBackfillForm {
pub fn validate_and_get_records_with_errors(&self) -> Result<CsvParsingResult, BackfillError> {
// Step 1: Open the file
let file = File::open(self.file.file.path())
.map_err(|error| BackfillError::FileProcessingError(error.to_string()))?;
let mut csv_reader = Reader::from_reader(BufReader::new(file));
// Step 2: Parse CSV into typed records
let mut records = Vec::new();
let mut failed_records = Vec::new();
for (row_index, record_result) in csv_reader
.deserialize::<RevenueRecoveryBackfillRequest>()
.enumerate()
{
match record_result {
Ok(record) => {
records.push(record);
}
Err(err) => {
failed_records.push(CsvParsingError {
row_number: row_index + 2, // +2 because enumerate starts at 0 and CSV has header row
error: err.to_string(),
});
}
}
}
Ok(CsvParsingResult {
records,
failed_records,
})
}
}

View File

@ -75,4 +75,6 @@ pub mod relay;
pub mod revenue_recovery; pub mod revenue_recovery;
pub mod chat; pub mod chat;
#[cfg(feature = "v2")]
pub mod revenue_recovery_data_backfill;
pub mod tokenization; pub mod tokenization;

View File

@ -1231,8 +1231,17 @@ pub async fn reopen_calculate_workflow_on_payment_failure(
// Create process tracker ID in the format: CALCULATE_WORKFLOW_{payment_intent_id} // Create process tracker ID in the format: CALCULATE_WORKFLOW_{payment_intent_id}
let process_tracker_id = format!("{runner}_{task}_{}", id.get_string_repr()); let process_tracker_id = format!("{runner}_{task}_{}", id.get_string_repr());
// Set scheduled time to 1 hour from now // Set scheduled time to current time + buffer time set in configuration
let schedule_time = common_utils::date_time::now() + time::Duration::hours(1); let schedule_time = common_utils::date_time::now()
+ time::Duration::seconds(
state
.conf
.revenue_recovery
.recovery_timestamp
.reopen_workflow_buffer_time_in_seconds,
);
let new_retry_count = process.retry_count + 1;
// Check if a process tracker entry already exists for this payment intent // Check if a process tracker entry already exists for this payment intent
let existing_entry = db let existing_entry = db
@ -1244,22 +1253,13 @@ pub async fn reopen_calculate_workflow_on_payment_failure(
"Failed to check for existing calculate workflow process tracker entry", "Failed to check for existing calculate workflow process tracker entry",
)?; )?;
match existing_entry {
Some(existing_process) => {
router_env::logger::error!(
"Found existing CALCULATE_WORKFLOW task with id: {}",
existing_process.id
);
}
None => {
// No entry exists - create a new one // No entry exists - create a new one
router_env::logger::info!( router_env::logger::info!(
"No existing CALCULATE_WORKFLOW task found for payment_intent_id: {}, creating new entry scheduled for 1 hour from now", "No existing CALCULATE_WORKFLOW task found for payment_intent_id: {}, creating new entry... ",
id.get_string_repr() id.get_string_repr()
); );
let tag = ["PCR"]; let tag = ["PCR"];
let task = "CALCULATE_WORKFLOW";
let runner = storage::ProcessTrackerRunner::PassiveRecoveryWorkflow; let runner = storage::ProcessTrackerRunner::PassiveRecoveryWorkflow;
let process_tracker_entry = storage::ProcessTrackerNew::new( let process_tracker_entry = storage::ProcessTrackerNew::new(
@ -1268,14 +1268,12 @@ pub async fn reopen_calculate_workflow_on_payment_failure(
runner, runner,
tag, tag,
process.tracking_data.clone(), process.tracking_data.clone(),
Some(process.retry_count), Some(new_retry_count),
schedule_time, schedule_time,
common_types::consts::API_VERSION, common_types::consts::API_VERSION,
) )
.change_context(errors::RecoveryError::ProcessTrackerFailure) .change_context(errors::RecoveryError::ProcessTrackerFailure)
.attach_printable( .attach_printable("Failed to construct calculate workflow process tracker entry")?;
"Failed to construct calculate workflow process tracker entry",
)?;
// Insert into process tracker with status New // Insert into process tracker with status New
db.as_scheduler() db.as_scheduler()
@ -1290,26 +1288,6 @@ pub async fn reopen_calculate_workflow_on_payment_failure(
"Successfully created new CALCULATE_WORKFLOW task for payment_intent_id: {}", "Successfully created new CALCULATE_WORKFLOW task for payment_intent_id: {}",
id.get_string_repr() id.get_string_repr()
); );
}
}
let tracking_data = serde_json::from_value(process.tracking_data.clone())
.change_context(errors::RecoveryError::ValueNotFound)
.attach_printable("Failed to deserialize the tracking data from process tracker")?;
// Call the existing perform_calculate_workflow function
Box::pin(perform_calculate_workflow(
state,
process,
profile,
merchant_context,
&tracking_data,
revenue_recovery_payment_data,
payment_intent,
))
.await
.change_context(errors::RecoveryError::ProcessTrackerFailure)
.attach_printable("Failed to perform calculate workflow")?;
logger::info!( logger::info!(
payment_id = %id.get_string_repr(), payment_id = %id.get_string_repr(),
@ -1322,30 +1300,6 @@ pub async fn reopen_calculate_workflow_on_payment_failure(
Ok(()) Ok(())
} }
/// Create tracking data for the CALCULATE_WORKFLOW
fn create_calculate_workflow_tracking_data(
payment_intent: &PaymentIntent,
revenue_recovery_payment_data: &storage::revenue_recovery::RevenueRecoveryPaymentData,
) -> RecoveryResult<storage::revenue_recovery::RevenueRecoveryWorkflowTrackingData> {
let tracking_data = storage::revenue_recovery::RevenueRecoveryWorkflowTrackingData {
merchant_id: revenue_recovery_payment_data
.merchant_account
.get_id()
.clone(),
profile_id: revenue_recovery_payment_data.profile.get_id().clone(),
global_payment_id: payment_intent.id.clone(),
payment_attempt_id: payment_intent
.active_attempt_id
.clone()
.ok_or(storage_impl::errors::RecoveryError::ValueNotFound)?,
billing_mca_id: revenue_recovery_payment_data.billing_mca.get_id().clone(),
revenue_recovery_retry: revenue_recovery_payment_data.retry_algorithm,
invoice_scheduled_time: None, // Will be set by perform_calculate_workflow
};
Ok(tracking_data)
}
// TODO: Move these to impl based functions // TODO: Move these to impl based functions
async fn record_back_to_billing_connector( async fn record_back_to_billing_connector(
state: &SessionState, state: &SessionState,

View File

@ -0,0 +1,316 @@
use std::collections::HashMap;
use api_models::revenue_recovery_data_backfill::{
BackfillError, ComprehensiveCardData, RevenueRecoveryBackfillRequest,
RevenueRecoveryDataBackfillResponse,
};
use common_enums::{CardNetwork, PaymentMethodType};
use hyperswitch_domain_models::api::ApplicationResponse;
use masking::ExposeInterface;
use router_env::{instrument, logger};
use time::{format_description, Date};
use crate::{
connection,
core::errors::{self, RouterResult},
routes::SessionState,
types::{domain, storage},
};
pub async fn revenue_recovery_data_backfill(
state: SessionState,
records: Vec<RevenueRecoveryBackfillRequest>,
cutoff_datetime: Option<time::PrimitiveDateTime>,
) -> RouterResult<ApplicationResponse<RevenueRecoveryDataBackfillResponse>> {
let mut processed_records = 0;
let mut failed_records = 0;
// Process each record
for record in records {
match process_payment_method_record(&state, &record, cutoff_datetime).await {
Ok(_) => {
processed_records += 1;
logger::info!(
"Successfully processed record with connector customer id: {}",
record.customer_id_resp
);
}
Err(e) => {
failed_records += 1;
logger::error!(
"Payment method backfill failed: customer_id={}, error={}",
record.customer_id_resp,
e
);
}
}
}
let response = RevenueRecoveryDataBackfillResponse {
processed_records,
failed_records,
};
logger::info!(
"Revenue recovery data backfill completed - Processed: {}, Failed: {}",
processed_records,
failed_records
);
Ok(ApplicationResponse::Json(response))
}
async fn process_payment_method_record(
state: &SessionState,
record: &RevenueRecoveryBackfillRequest,
cutoff_datetime: Option<time::PrimitiveDateTime>,
) -> Result<(), BackfillError> {
// Build comprehensive card data from CSV record
let card_data = match build_comprehensive_card_data(record) {
Ok(data) => data,
Err(e) => {
logger::warn!(
"Failed to build card data for connector customer id: {}, error: {}.",
record.customer_id_resp,
e
);
ComprehensiveCardData {
card_type: Some("card".to_string()),
card_exp_month: None,
card_exp_year: None,
card_network: None,
card_issuer: None,
card_issuing_country: None,
daily_retry_history: None,
}
}
};
logger::info!(
"Built comprehensive card data - card_type: {:?}, exp_month: {}, exp_year: {}, network: {:?}, issuer: {:?}, country: {:?}, daily_retry_history: {:?}",
card_data.card_type,
card_data.card_exp_month.as_ref().map(|_| "**").unwrap_or("None"),
card_data.card_exp_year.as_ref().map(|_| "**").unwrap_or("None"),
card_data.card_network,
card_data.card_issuer,
card_data.card_issuing_country,
card_data.daily_retry_history
);
// Update Redis if token exists and is valid
match record.token.as_ref().map(|token| token.clone().expose()) {
Some(token) if !token.is_empty() => {
logger::info!("Updating Redis for customer: {}", record.customer_id_resp,);
storage::revenue_recovery_redis_operation::
RedisTokenManager::update_redis_token_with_comprehensive_card_data(
state,
&record.customer_id_resp,
&token,
&card_data,
cutoff_datetime,
)
.await
.map_err(|e| {
logger::error!("Redis update failed: {}", e);
BackfillError::RedisError(format!("Token not found in Redis: {}", e))
})?;
}
_ => {
logger::info!(
"Skipping Redis update - token is missing, empty or 'nan': {:?}",
record.token
);
}
}
logger::info!(
"Successfully completed processing for connector customer id: {}",
record.customer_id_resp
);
Ok(())
}
/// Parse daily retry history from CSV
fn parse_daily_retry_history(json_str: Option<&str>) -> Option<HashMap<Date, i32>> {
match json_str {
Some(json) if !json.is_empty() => {
match serde_json::from_str::<HashMap<String, i32>>(json) {
Ok(string_retry_history) => {
// Convert string dates to Date objects
let format = format_description::parse("[year]-[month]-[day]")
.map_err(|e| {
BackfillError::CsvParsingError(format!(
"Invalid date format configuration: {}",
e
))
})
.ok()?;
let mut date_retry_history = HashMap::new();
for (date_str, count) in string_retry_history {
match Date::parse(&date_str, &format) {
Ok(date) => {
date_retry_history.insert(date, count);
}
Err(e) => {
logger::warn!(
"Failed to parse date '{}' in daily_retry_history: {}",
date_str,
e
);
}
}
}
logger::debug!(
"Successfully parsed daily_retry_history with {} entries",
date_retry_history.len()
);
Some(date_retry_history)
}
Err(e) => {
logger::warn!("Failed to parse daily_retry_history JSON '{}': {}", json, e);
None
}
}
}
_ => {
logger::debug!("Daily retry history not present or invalid, preserving existing data");
None
}
}
}
/// Build comprehensive card data from CSV record
fn build_comprehensive_card_data(
record: &RevenueRecoveryBackfillRequest,
) -> Result<ComprehensiveCardData, BackfillError> {
// Extract card type from request, if not present then update it with 'card'
let card_type = Some(determine_card_type(record.payment_method_sub_type));
// Parse expiration date
let (exp_month, exp_year) = parse_expiration_date(
record
.exp_date
.as_ref()
.map(|date| date.clone().expose())
.as_deref(),
)?;
let card_exp_month = exp_month.map(masking::Secret::new);
let card_exp_year = exp_year.map(masking::Secret::new);
// Extract card network
let card_network = record.card_network.clone();
// Extract card issuer and issuing country
let card_issuer = record
.clean_bank_name
.as_ref()
.filter(|value| !value.is_empty())
.cloned();
let card_issuing_country = record
.country_name
.as_ref()
.filter(|value| !value.is_empty())
.cloned();
// Parse daily retry history
let daily_retry_history = parse_daily_retry_history(record.daily_retry_history.as_deref());
Ok(ComprehensiveCardData {
card_type,
card_exp_month,
card_exp_year,
card_network,
card_issuer,
card_issuing_country,
daily_retry_history,
})
}
/// Determine card type with fallback logic: payment_method_sub_type if not present -> "Card"
fn determine_card_type(payment_method_sub_type: Option<PaymentMethodType>) -> String {
match payment_method_sub_type {
Some(card_type_enum) => {
let mapped_type = match card_type_enum {
PaymentMethodType::Credit => "credit".to_string(),
PaymentMethodType::Debit => "debit".to_string(),
PaymentMethodType::Card => "card".to_string(),
// For all other payment method types, default to "card"
_ => "card".to_string(),
};
logger::debug!(
"Using payment_method_sub_type enum '{:?}' -> '{}'",
card_type_enum,
mapped_type
);
mapped_type
}
None => {
logger::info!("In CSV payment_method_sub_type not present, defaulting to 'card'");
"card".to_string()
}
}
}
/// Parse expiration date
fn parse_expiration_date(
exp_date: Option<&str>,
) -> Result<(Option<String>, Option<String>), BackfillError> {
exp_date
.filter(|date| !date.is_empty())
.map(|date| {
date.split_once('/')
.ok_or_else(|| {
logger::warn!("Unrecognized expiration date format (MM/YY expected)");
BackfillError::CsvParsingError(
"Invalid expiration date format: expected MM/YY".to_string(),
)
})
.and_then(|(month_part, year_part)| {
let month = month_part.trim();
let year = year_part.trim();
logger::debug!("Split expiration date - parsing month and year");
// Validate and parse month
let month_num = month.parse::<u8>().map_err(|_| {
logger::warn!("Failed to parse month component in expiration date");
BackfillError::CsvParsingError(
"Invalid month format in expiration date".to_string(),
)
})?;
if !(1..=12).contains(&month_num) {
logger::warn!("Invalid month value in expiration date (not in range 1-12)");
return Err(BackfillError::CsvParsingError(
"Invalid month value in expiration date".to_string(),
));
}
// Handle year conversion
let final_year = match year.len() {
4 => &year[2..4], // Convert 4-digit to 2-digit
2 => year, // Already 2-digit
_ => {
logger::warn!(
"Invalid year length in expiration date (expected 2 or 4 digits)"
);
return Err(BackfillError::CsvParsingError(
"Invalid year format in expiration date".to_string(),
));
}
};
logger::debug!("Successfully parsed expiration date... ",);
Ok((Some(month.to_string()), Some(final_year.to_string())))
})
})
.unwrap_or_else(|| {
logger::debug!("Empty expiration date, returning None");
Ok((None, None))
})
}

View File

@ -226,7 +226,8 @@ pub fn mk_app(
.service(routes::UserDeprecated::server(state.clone())) .service(routes::UserDeprecated::server(state.clone()))
.service(routes::ProcessTrackerDeprecated::server(state.clone())) .service(routes::ProcessTrackerDeprecated::server(state.clone()))
.service(routes::ProcessTracker::server(state.clone())) .service(routes::ProcessTracker::server(state.clone()))
.service(routes::Gsm::server(state.clone())); .service(routes::Gsm::server(state.clone()))
.service(routes::RecoveryDataBackfill::server(state.clone()));
} }
} }

View File

@ -48,6 +48,8 @@ pub mod profiles;
#[cfg(feature = "recon")] #[cfg(feature = "recon")]
pub mod recon; pub mod recon;
pub mod refunds; pub mod refunds;
#[cfg(feature = "v2")]
pub mod revenue_recovery_data_backfill;
#[cfg(feature = "olap")] #[cfg(feature = "olap")]
pub mod routing; pub mod routing;
pub mod three_ds_decision_rule; pub mod three_ds_decision_rule;
@ -85,8 +87,6 @@ pub use self::app::PaymentMethodSession;
pub use self::app::Proxy; pub use self::app::Proxy;
#[cfg(all(feature = "olap", feature = "recon", feature = "v1"))] #[cfg(all(feature = "olap", feature = "recon", feature = "v1"))]
pub use self::app::Recon; pub use self::app::Recon;
#[cfg(feature = "v2")]
pub use self::app::Tokenization;
pub use self::app::{ pub use self::app::{
ApiKeys, AppState, ApplePayCertificatesMigration, Authentication, Cache, Cards, Chat, Configs, ApiKeys, AppState, ApplePayCertificatesMigration, Authentication, Cache, Cards, Chat, Configs,
ConnectorOnboarding, Customers, Disputes, EphemeralKey, FeatureMatrix, Files, Forex, Gsm, ConnectorOnboarding, Customers, Disputes, EphemeralKey, FeatureMatrix, Files, Forex, Gsm,
@ -99,6 +99,8 @@ pub use self::app::{
pub use self::app::{Blocklist, Organization, Routing, Verify, WebhookEvents}; pub use self::app::{Blocklist, Organization, Routing, Verify, WebhookEvents};
#[cfg(feature = "payouts")] #[cfg(feature = "payouts")]
pub use self::app::{PayoutLink, Payouts}; pub use self::app::{PayoutLink, Payouts};
#[cfg(feature = "v2")]
pub use self::app::{RecoveryDataBackfill, Tokenization};
#[cfg(all(feature = "stripe", feature = "v1"))] #[cfg(all(feature = "stripe", feature = "v1"))]
pub use super::compatibility::stripe::StripeApis; pub use super::compatibility::stripe::StripeApis;
#[cfg(feature = "olap")] #[cfg(feature = "olap")]

View File

@ -2970,3 +2970,19 @@ impl ProfileAcquirer {
) )
} }
} }
#[cfg(feature = "v2")]
pub struct RecoveryDataBackfill;
#[cfg(feature = "v2")]
impl RecoveryDataBackfill {
pub fn server(state: AppState) -> Scope {
web::scope("/v2/recovery/data-backfill")
.app_data(web::Data::new(state))
.service(
web::resource("").route(
web::post()
.to(super::revenue_recovery_data_backfill::revenue_recovery_data_backfill),
),
)
}
}

View File

@ -49,6 +49,7 @@ pub enum ApiIdentifier {
ProfileAcquirer, ProfileAcquirer,
ThreeDsDecisionRule, ThreeDsDecisionRule,
GenericTokenization, GenericTokenization,
RecoveryDataBackfill,
} }
impl From<Flow> for ApiIdentifier { impl From<Flow> for ApiIdentifier {
@ -380,6 +381,8 @@ impl From<Flow> for ApiIdentifier {
Flow::TokenizationCreate | Flow::TokenizationRetrieve | Flow::TokenizationDelete => { Flow::TokenizationCreate | Flow::TokenizationRetrieve | Flow::TokenizationDelete => {
Self::GenericTokenization Self::GenericTokenization
} }
Flow::RecoveryDataBackfill => Self::RecoveryDataBackfill,
} }
} }
} }

View File

@ -0,0 +1,67 @@
use actix_multipart::form::MultipartForm;
use actix_web::{web, HttpRequest, HttpResponse};
use api_models::revenue_recovery_data_backfill::{BackfillQuery, RevenueRecoveryDataBackfillForm};
use router_env::{instrument, tracing, Flow};
use crate::{
core::{api_locking, revenue_recovery_data_backfill},
routes::AppState,
services::{api, authentication as auth},
types::domain,
};
#[instrument(skip_all, fields(flow = ?Flow::RecoveryDataBackfill))]
pub async fn revenue_recovery_data_backfill(
state: web::Data<AppState>,
req: HttpRequest,
query: web::Query<BackfillQuery>,
MultipartForm(form): MultipartForm<RevenueRecoveryDataBackfillForm>,
) -> HttpResponse {
let flow = Flow::RecoveryDataBackfill;
// Parse cutoff_time from query parameter
let cutoff_datetime = match query
.cutoff_time
.as_ref()
.map(|time_str| {
time::PrimitiveDateTime::parse(
time_str,
&time::format_description::well_known::Iso8601::DEFAULT,
)
})
.transpose()
{
Ok(datetime) => datetime,
Err(err) => {
return HttpResponse::BadRequest().json(serde_json::json!({
"error": format!("Invalid datetime format: {}. Use ISO8601: 2024-01-15T10:30:00", err)
}));
}
};
let records = match form.validate_and_get_records_with_errors() {
Ok(records) => records,
Err(e) => {
return HttpResponse::BadRequest().json(serde_json::json!({
"error": e.to_string()
}));
}
};
Box::pin(api::server_wrap(
flow,
state,
&req,
records,
|state, _, records, _req| {
revenue_recovery_data_backfill::revenue_recovery_data_backfill(
state,
records.records,
cutoff_datetime,
)
},
&auth::V2AdminApiAuth,
api_locking::LockAction::NotApplicable,
))
.await
}

View File

@ -1,9 +1,10 @@
use std::collections::HashMap; use std::collections::HashMap;
use api_models;
use common_enums::enums::CardNetwork; use common_enums::enums::CardNetwork;
use common_utils::{date_time, errors::CustomResult, id_type}; use common_utils::{date_time, errors::CustomResult, id_type};
use error_stack::ResultExt; use error_stack::ResultExt;
use masking::Secret; use masking::{ExposeInterface, Secret};
use redis_interface::{DelReply, SetnxReply}; use redis_interface::{DelReply, SetnxReply};
use router_env::{instrument, logger, tracing}; use router_env::{instrument, logger, tracing};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@ -214,7 +215,6 @@ impl RedisTokenManager {
.await .await
.change_context(get_hash_err)?; .change_context(get_hash_err)?;
// build the result map using iterator adapters (explicit match preserved for logging)
let payment_processor_token_info_map: HashMap<String, PaymentProcessorTokenStatus> = let payment_processor_token_info_map: HashMap<String, PaymentProcessorTokenStatus> =
payment_processor_tokens payment_processor_tokens
.into_iter() .into_iter()
@ -754,4 +754,93 @@ impl RedisTokenManager {
Ok(token) Ok(token)
} }
/// Update Redis token with comprehensive card data
#[instrument(skip_all)]
pub async fn update_redis_token_with_comprehensive_card_data(
state: &SessionState,
customer_id: &str,
token: &str,
card_data: &api_models::revenue_recovery_data_backfill::ComprehensiveCardData,
cutoff_datetime: Option<PrimitiveDateTime>,
) -> CustomResult<(), errors::StorageError> {
// Get existing token data
let mut token_map =
Self::get_connector_customer_payment_processor_tokens(state, customer_id).await?;
// Find the token to update
let existing_token = token_map.get_mut(token).ok_or_else(|| {
tracing::warn!(
customer_id = customer_id,
"Token not found in parsed Redis data - may be corrupted or missing for "
);
error_stack::Report::new(errors::StorageError::ValueNotFound(
"Token not found in Redis".to_string(),
))
})?;
// Update the token details with new card data
card_data.card_type.as_ref().map(|card_type| {
existing_token.payment_processor_token_details.card_type = Some(card_type.clone())
});
card_data.card_exp_month.as_ref().map(|exp_month| {
existing_token.payment_processor_token_details.expiry_month = Some(exp_month.clone())
});
card_data.card_exp_year.as_ref().map(|exp_year| {
existing_token.payment_processor_token_details.expiry_year = Some(exp_year.clone())
});
card_data.card_network.as_ref().map(|card_network| {
existing_token.payment_processor_token_details.card_network = Some(card_network.clone())
});
card_data.card_issuer.as_ref().map(|card_issuer| {
existing_token.payment_processor_token_details.card_issuer = Some(card_issuer.clone())
});
// Update daily retry history if provided
card_data
.daily_retry_history
.as_ref()
.map(|retry_history| existing_token.daily_retry_history = retry_history.clone());
// If cutoff_datetime is provided and existing scheduled_at < cutoff_datetime, set to None
// If no scheduled_at value exists, leave it as None
existing_token.scheduled_at = existing_token
.scheduled_at
.and_then(|existing_scheduled_at| {
cutoff_datetime
.map(|cutoff| {
if existing_scheduled_at < cutoff {
tracing::info!(
customer_id = customer_id,
existing_scheduled_at = %existing_scheduled_at,
cutoff_datetime = %cutoff,
"Set scheduled_at to None because existing time is before cutoff time"
);
None
} else {
Some(existing_scheduled_at)
}
})
.unwrap_or(Some(existing_scheduled_at)) // No cutoff provided, keep existing value
});
// Save the updated token map back to Redis
Self::update_or_add_connector_customer_payment_processor_tokens(
state,
customer_id,
token_map,
)
.await?;
tracing::info!(
customer_id = customer_id,
"Updated Redis token data with comprehensive card data using struct"
);
Ok(())
}
} }

View File

@ -652,6 +652,8 @@ pub enum Flow {
RecoveryPaymentsCreate, RecoveryPaymentsCreate,
/// Tokenization delete flow /// Tokenization delete flow
TokenizationDelete, TokenizationDelete,
/// Payment method data backfill flow
RecoveryDataBackfill,
/// Gift card balance check flow /// Gift card balance check flow
GiftCardBalanceCheck, GiftCardBalanceCheck,
} }