diff --git a/Cargo.lock b/Cargo.lock index 491f77bf39..2304b8ab83 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -459,6 +459,7 @@ dependencies = [ "common_enums", "common_types", "common_utils", + "csv", "deserialize_form_style_query_parameter", "error-stack 0.4.1", "euclid", @@ -470,6 +471,7 @@ dependencies = [ "serde", "serde_json", "strum 0.26.3", + "tempfile", "time", "url", "utoipa", diff --git a/crates/api_models/Cargo.toml b/crates/api_models/Cargo.toml index cac70d2664..16403f0e25 100644 --- a/crates/api_models/Cargo.toml +++ b/crates/api_models/Cargo.toml @@ -17,21 +17,23 @@ olap = [] openapi = ["common_enums/openapi", "olap", "recon", "dummy_connector", "olap"] recon = [] v1 = ["common_utils/v1"] -v2 = ["common_types/v2", "common_utils/v2", "tokenization_v2", "dep:reqwest"] +v2 = ["common_types/v2", "common_utils/v2", "tokenization_v2", "dep:reqwest", "revenue_recovery"] dynamic_routing = [] control_center_theme = ["dep:actix-web", "dep:actix-multipart"] -revenue_recovery = [] +revenue_recovery = ["dep:actix-multipart"] tokenization_v2 = ["common_utils/tokenization_v2"] [dependencies] actix-multipart = { version = "0.6.2", optional = true } actix-web = { version = "4.11.0", optional = true } +csv = "1.3" error-stack = "0.4.1" mime = "0.3.17" reqwest = { version = "0.11.27", optional = true } serde = { version = "1.0.219", features = ["derive"] } serde_json = "1.0.140" strum = { version = "0.26", features = ["derive"] } +tempfile = "3.8" time = { version = "0.3.41", features = ["serde", "serde-well-known", "std"] } url = { version = "2.5.4", features = ["serde"] } utoipa = { version = "4.2.3", features = ["preserve_order", "preserve_path_order"] } diff --git a/crates/api_models/src/lib.rs b/crates/api_models/src/lib.rs index 3b343459e9..16ad859f69 100644 --- a/crates/api_models/src/lib.rs +++ b/crates/api_models/src/lib.rs @@ -41,6 +41,8 @@ pub mod proxy; pub mod recon; pub mod refunds; pub mod relay; +#[cfg(feature = "v2")] +pub mod revenue_recovery_data_backfill; pub mod routing; pub mod surcharge_decision_configs; pub mod three_ds_decision_rule; diff --git a/crates/api_models/src/revenue_recovery_data_backfill.rs b/crates/api_models/src/revenue_recovery_data_backfill.rs new file mode 100644 index 0000000000..d73ae8a33c --- /dev/null +++ b/crates/api_models/src/revenue_recovery_data_backfill.rs @@ -0,0 +1,150 @@ +use std::{collections::HashMap, fs::File, io::BufReader}; + +use actix_multipart::form::{tempfile::TempFile, MultipartForm}; +use actix_web::{HttpResponse, ResponseError}; +use common_enums::{CardNetwork, PaymentMethodType}; +use common_utils::events::ApiEventMetric; +use csv::Reader; +use masking::Secret; +use serde::{Deserialize, Serialize}; +use time::Date; + +#[derive(Debug, Deserialize, Serialize)] +pub struct RevenueRecoveryBackfillRequest { + pub bin_number: Option>, + pub customer_id_resp: String, + pub connector_payment_id: Option, + pub token: Option>, + pub exp_date: Option>, + pub card_network: Option, + pub payment_method_sub_type: Option, + pub clean_bank_name: Option, + pub country_name: Option, + pub daily_retry_history: Option, +} + +#[derive(Debug, Serialize)] +pub struct RevenueRecoveryDataBackfillResponse { + pub processed_records: usize, + pub failed_records: usize, +} + +#[derive(Debug, Serialize)] +pub struct CsvParsingResult { + pub records: Vec, + pub failed_records: Vec, +} + +#[derive(Debug, Serialize)] +pub struct CsvParsingError { + pub row_number: usize, + pub error: String, +} + +/// Comprehensive card +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ComprehensiveCardData { + pub card_type: Option, + pub card_exp_month: Option>, + pub card_exp_year: Option>, + pub card_network: Option, + pub card_issuer: Option, + pub card_issuing_country: Option, + pub daily_retry_history: Option>, +} + +impl ApiEventMetric for RevenueRecoveryDataBackfillResponse { + fn get_api_event_type(&self) -> Option { + Some(common_utils::events::ApiEventsType::Miscellaneous) + } +} + +impl ApiEventMetric for CsvParsingResult { + fn get_api_event_type(&self) -> Option { + Some(common_utils::events::ApiEventsType::Miscellaneous) + } +} + +impl ApiEventMetric for CsvParsingError { + fn get_api_event_type(&self) -> Option { + Some(common_utils::events::ApiEventsType::Miscellaneous) + } +} + +#[derive(Debug, Clone, Serialize)] +pub enum BackfillError { + InvalidCardType(String), + DatabaseError(String), + RedisError(String), + CsvParsingError(String), + FileProcessingError(String), +} + +#[derive(serde::Deserialize)] +pub struct BackfillQuery { + pub cutoff_time: Option, +} + +impl std::fmt::Display for BackfillError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::InvalidCardType(msg) => write!(f, "Invalid card type: {}", msg), + Self::DatabaseError(msg) => write!(f, "Database error: {}", msg), + Self::RedisError(msg) => write!(f, "Redis error: {}", msg), + Self::CsvParsingError(msg) => write!(f, "CSV parsing error: {}", msg), + Self::FileProcessingError(msg) => write!(f, "File processing error: {}", msg), + } + } +} + +impl std::error::Error for BackfillError {} + +impl ResponseError for BackfillError { + fn error_response(&self) -> HttpResponse { + HttpResponse::BadRequest().json(serde_json::json!({ + "error": self.to_string() + })) + } +} + +#[derive(Debug, MultipartForm)] +pub struct RevenueRecoveryDataBackfillForm { + #[multipart(rename = "file")] + pub file: TempFile, +} + +impl RevenueRecoveryDataBackfillForm { + pub fn validate_and_get_records_with_errors(&self) -> Result { + // Step 1: Open the file + let file = File::open(self.file.file.path()) + .map_err(|error| BackfillError::FileProcessingError(error.to_string()))?; + + let mut csv_reader = Reader::from_reader(BufReader::new(file)); + + // Step 2: Parse CSV into typed records + let mut records = Vec::new(); + let mut failed_records = Vec::new(); + + for (row_index, record_result) in csv_reader + .deserialize::() + .enumerate() + { + match record_result { + Ok(record) => { + records.push(record); + } + Err(err) => { + failed_records.push(CsvParsingError { + row_number: row_index + 2, // +2 because enumerate starts at 0 and CSV has header row + error: err.to_string(), + }); + } + } + } + + Ok(CsvParsingResult { + records, + failed_records, + }) + } +} diff --git a/crates/router/src/core.rs b/crates/router/src/core.rs index f7cb256f58..6a7e77f6be 100644 --- a/crates/router/src/core.rs +++ b/crates/router/src/core.rs @@ -75,4 +75,6 @@ pub mod relay; pub mod revenue_recovery; pub mod chat; +#[cfg(feature = "v2")] +pub mod revenue_recovery_data_backfill; pub mod tokenization; diff --git a/crates/router/src/core/revenue_recovery/types.rs b/crates/router/src/core/revenue_recovery/types.rs index 0b7fb4ad06..3bdc822859 100644 --- a/crates/router/src/core/revenue_recovery/types.rs +++ b/crates/router/src/core/revenue_recovery/types.rs @@ -1231,8 +1231,17 @@ pub async fn reopen_calculate_workflow_on_payment_failure( // Create process tracker ID in the format: CALCULATE_WORKFLOW_{payment_intent_id} let process_tracker_id = format!("{runner}_{task}_{}", id.get_string_repr()); - // Set scheduled time to 1 hour from now - let schedule_time = common_utils::date_time::now() + time::Duration::hours(1); + // Set scheduled time to current time + buffer time set in configuration + let schedule_time = common_utils::date_time::now() + + time::Duration::seconds( + state + .conf + .revenue_recovery + .recovery_timestamp + .reopen_workflow_buffer_time_in_seconds, + ); + + let new_retry_count = process.retry_count + 1; // Check if a process tracker entry already exists for this payment intent let existing_entry = db @@ -1244,72 +1253,41 @@ pub async fn reopen_calculate_workflow_on_payment_failure( "Failed to check for existing calculate workflow process tracker entry", )?; - match existing_entry { - Some(existing_process) => { - router_env::logger::error!( - "Found existing CALCULATE_WORKFLOW task with id: {}", - existing_process.id - ); - } - None => { - // No entry exists - create a new one - router_env::logger::info!( - "No existing CALCULATE_WORKFLOW task found for payment_intent_id: {}, creating new entry scheduled for 1 hour from now", + // No entry exists - create a new one + router_env::logger::info!( + "No existing CALCULATE_WORKFLOW task found for payment_intent_id: {}, creating new entry... ", id.get_string_repr() ); - let tag = ["PCR"]; - let task = "CALCULATE_WORKFLOW"; - let runner = storage::ProcessTrackerRunner::PassiveRecoveryWorkflow; + let tag = ["PCR"]; + let runner = storage::ProcessTrackerRunner::PassiveRecoveryWorkflow; - let process_tracker_entry = storage::ProcessTrackerNew::new( - &process_tracker_id, - task, - runner, - tag, - process.tracking_data.clone(), - Some(process.retry_count), - schedule_time, - common_types::consts::API_VERSION, - ) - .change_context(errors::RecoveryError::ProcessTrackerFailure) - .attach_printable( - "Failed to construct calculate workflow process tracker entry", - )?; - - // Insert into process tracker with status New - db.as_scheduler() - .insert_process(process_tracker_entry) - .await - .change_context(errors::RecoveryError::ProcessTrackerFailure) - .attach_printable( - "Failed to enter calculate workflow process_tracker_entry in DB", - )?; - - router_env::logger::info!( - "Successfully created new CALCULATE_WORKFLOW task for payment_intent_id: {}", - id.get_string_repr() - ); - } - } - - let tracking_data = serde_json::from_value(process.tracking_data.clone()) - .change_context(errors::RecoveryError::ValueNotFound) - .attach_printable("Failed to deserialize the tracking data from process tracker")?; - - // Call the existing perform_calculate_workflow function - Box::pin(perform_calculate_workflow( - state, - process, - profile, - merchant_context, - &tracking_data, - revenue_recovery_payment_data, - payment_intent, - )) - .await + let process_tracker_entry = storage::ProcessTrackerNew::new( + &process_tracker_id, + task, + runner, + tag, + process.tracking_data.clone(), + Some(new_retry_count), + schedule_time, + common_types::consts::API_VERSION, + ) .change_context(errors::RecoveryError::ProcessTrackerFailure) - .attach_printable("Failed to perform calculate workflow")?; + .attach_printable("Failed to construct calculate workflow process tracker entry")?; + + // Insert into process tracker with status New + db.as_scheduler() + .insert_process(process_tracker_entry) + .await + .change_context(errors::RecoveryError::ProcessTrackerFailure) + .attach_printable( + "Failed to enter calculate workflow process_tracker_entry in DB", + )?; + + router_env::logger::info!( + "Successfully created new CALCULATE_WORKFLOW task for payment_intent_id: {}", + id.get_string_repr() + ); logger::info!( payment_id = %id.get_string_repr(), @@ -1322,30 +1300,6 @@ pub async fn reopen_calculate_workflow_on_payment_failure( Ok(()) } -/// Create tracking data for the CALCULATE_WORKFLOW -fn create_calculate_workflow_tracking_data( - payment_intent: &PaymentIntent, - revenue_recovery_payment_data: &storage::revenue_recovery::RevenueRecoveryPaymentData, -) -> RecoveryResult { - let tracking_data = storage::revenue_recovery::RevenueRecoveryWorkflowTrackingData { - merchant_id: revenue_recovery_payment_data - .merchant_account - .get_id() - .clone(), - profile_id: revenue_recovery_payment_data.profile.get_id().clone(), - global_payment_id: payment_intent.id.clone(), - payment_attempt_id: payment_intent - .active_attempt_id - .clone() - .ok_or(storage_impl::errors::RecoveryError::ValueNotFound)?, - billing_mca_id: revenue_recovery_payment_data.billing_mca.get_id().clone(), - revenue_recovery_retry: revenue_recovery_payment_data.retry_algorithm, - invoice_scheduled_time: None, // Will be set by perform_calculate_workflow - }; - - Ok(tracking_data) -} - // TODO: Move these to impl based functions async fn record_back_to_billing_connector( state: &SessionState, diff --git a/crates/router/src/core/revenue_recovery_data_backfill.rs b/crates/router/src/core/revenue_recovery_data_backfill.rs new file mode 100644 index 0000000000..4f32a91484 --- /dev/null +++ b/crates/router/src/core/revenue_recovery_data_backfill.rs @@ -0,0 +1,316 @@ +use std::collections::HashMap; + +use api_models::revenue_recovery_data_backfill::{ + BackfillError, ComprehensiveCardData, RevenueRecoveryBackfillRequest, + RevenueRecoveryDataBackfillResponse, +}; +use common_enums::{CardNetwork, PaymentMethodType}; +use hyperswitch_domain_models::api::ApplicationResponse; +use masking::ExposeInterface; +use router_env::{instrument, logger}; +use time::{format_description, Date}; + +use crate::{ + connection, + core::errors::{self, RouterResult}, + routes::SessionState, + types::{domain, storage}, +}; + +pub async fn revenue_recovery_data_backfill( + state: SessionState, + records: Vec, + cutoff_datetime: Option, +) -> RouterResult> { + let mut processed_records = 0; + let mut failed_records = 0; + + // Process each record + for record in records { + match process_payment_method_record(&state, &record, cutoff_datetime).await { + Ok(_) => { + processed_records += 1; + logger::info!( + "Successfully processed record with connector customer id: {}", + record.customer_id_resp + ); + } + Err(e) => { + failed_records += 1; + logger::error!( + "Payment method backfill failed: customer_id={}, error={}", + record.customer_id_resp, + e + ); + } + } + } + + let response = RevenueRecoveryDataBackfillResponse { + processed_records, + failed_records, + }; + + logger::info!( + "Revenue recovery data backfill completed - Processed: {}, Failed: {}", + processed_records, + failed_records + ); + + Ok(ApplicationResponse::Json(response)) +} + +async fn process_payment_method_record( + state: &SessionState, + record: &RevenueRecoveryBackfillRequest, + cutoff_datetime: Option, +) -> Result<(), BackfillError> { + // Build comprehensive card data from CSV record + let card_data = match build_comprehensive_card_data(record) { + Ok(data) => data, + Err(e) => { + logger::warn!( + "Failed to build card data for connector customer id: {}, error: {}.", + record.customer_id_resp, + e + ); + ComprehensiveCardData { + card_type: Some("card".to_string()), + card_exp_month: None, + card_exp_year: None, + card_network: None, + card_issuer: None, + card_issuing_country: None, + daily_retry_history: None, + } + } + }; + logger::info!( + "Built comprehensive card data - card_type: {:?}, exp_month: {}, exp_year: {}, network: {:?}, issuer: {:?}, country: {:?}, daily_retry_history: {:?}", + card_data.card_type, + card_data.card_exp_month.as_ref().map(|_| "**").unwrap_or("None"), + card_data.card_exp_year.as_ref().map(|_| "**").unwrap_or("None"), + card_data.card_network, + card_data.card_issuer, + card_data.card_issuing_country, + card_data.daily_retry_history + ); + + // Update Redis if token exists and is valid + match record.token.as_ref().map(|token| token.clone().expose()) { + Some(token) if !token.is_empty() => { + logger::info!("Updating Redis for customer: {}", record.customer_id_resp,); + + storage::revenue_recovery_redis_operation:: + RedisTokenManager::update_redis_token_with_comprehensive_card_data( + state, + &record.customer_id_resp, + &token, + &card_data, + cutoff_datetime, + ) + .await + .map_err(|e| { + logger::error!("Redis update failed: {}", e); + BackfillError::RedisError(format!("Token not found in Redis: {}", e)) + })?; + } + _ => { + logger::info!( + "Skipping Redis update - token is missing, empty or 'nan': {:?}", + record.token + ); + } + } + + logger::info!( + "Successfully completed processing for connector customer id: {}", + record.customer_id_resp + ); + Ok(()) +} + +/// Parse daily retry history from CSV +fn parse_daily_retry_history(json_str: Option<&str>) -> Option> { + match json_str { + Some(json) if !json.is_empty() => { + match serde_json::from_str::>(json) { + Ok(string_retry_history) => { + // Convert string dates to Date objects + let format = format_description::parse("[year]-[month]-[day]") + .map_err(|e| { + BackfillError::CsvParsingError(format!( + "Invalid date format configuration: {}", + e + )) + }) + .ok()?; + + let mut date_retry_history = HashMap::new(); + + for (date_str, count) in string_retry_history { + match Date::parse(&date_str, &format) { + Ok(date) => { + date_retry_history.insert(date, count); + } + Err(e) => { + logger::warn!( + "Failed to parse date '{}' in daily_retry_history: {}", + date_str, + e + ); + } + } + } + + logger::debug!( + "Successfully parsed daily_retry_history with {} entries", + date_retry_history.len() + ); + Some(date_retry_history) + } + Err(e) => { + logger::warn!("Failed to parse daily_retry_history JSON '{}': {}", json, e); + None + } + } + } + _ => { + logger::debug!("Daily retry history not present or invalid, preserving existing data"); + None + } + } +} + +/// Build comprehensive card data from CSV record +fn build_comprehensive_card_data( + record: &RevenueRecoveryBackfillRequest, +) -> Result { + // Extract card type from request, if not present then update it with 'card' + let card_type = Some(determine_card_type(record.payment_method_sub_type)); + + // Parse expiration date + let (exp_month, exp_year) = parse_expiration_date( + record + .exp_date + .as_ref() + .map(|date| date.clone().expose()) + .as_deref(), + )?; + + let card_exp_month = exp_month.map(masking::Secret::new); + let card_exp_year = exp_year.map(masking::Secret::new); + + // Extract card network + let card_network = record.card_network.clone(); + + // Extract card issuer and issuing country + let card_issuer = record + .clean_bank_name + .as_ref() + .filter(|value| !value.is_empty()) + .cloned(); + + let card_issuing_country = record + .country_name + .as_ref() + .filter(|value| !value.is_empty()) + .cloned(); + + // Parse daily retry history + let daily_retry_history = parse_daily_retry_history(record.daily_retry_history.as_deref()); + + Ok(ComprehensiveCardData { + card_type, + card_exp_month, + card_exp_year, + card_network, + card_issuer, + card_issuing_country, + daily_retry_history, + }) +} + +/// Determine card type with fallback logic: payment_method_sub_type if not present -> "Card" +fn determine_card_type(payment_method_sub_type: Option) -> String { + match payment_method_sub_type { + Some(card_type_enum) => { + let mapped_type = match card_type_enum { + PaymentMethodType::Credit => "credit".to_string(), + PaymentMethodType::Debit => "debit".to_string(), + PaymentMethodType::Card => "card".to_string(), + // For all other payment method types, default to "card" + _ => "card".to_string(), + }; + logger::debug!( + "Using payment_method_sub_type enum '{:?}' -> '{}'", + card_type_enum, + mapped_type + ); + mapped_type + } + None => { + logger::info!("In CSV payment_method_sub_type not present, defaulting to 'card'"); + "card".to_string() + } + } +} + +/// Parse expiration date +fn parse_expiration_date( + exp_date: Option<&str>, +) -> Result<(Option, Option), BackfillError> { + exp_date + .filter(|date| !date.is_empty()) + .map(|date| { + date.split_once('/') + .ok_or_else(|| { + logger::warn!("Unrecognized expiration date format (MM/YY expected)"); + BackfillError::CsvParsingError( + "Invalid expiration date format: expected MM/YY".to_string(), + ) + }) + .and_then(|(month_part, year_part)| { + let month = month_part.trim(); + let year = year_part.trim(); + + logger::debug!("Split expiration date - parsing month and year"); + + // Validate and parse month + let month_num = month.parse::().map_err(|_| { + logger::warn!("Failed to parse month component in expiration date"); + BackfillError::CsvParsingError( + "Invalid month format in expiration date".to_string(), + ) + })?; + + if !(1..=12).contains(&month_num) { + logger::warn!("Invalid month value in expiration date (not in range 1-12)"); + return Err(BackfillError::CsvParsingError( + "Invalid month value in expiration date".to_string(), + )); + } + + // Handle year conversion + let final_year = match year.len() { + 4 => &year[2..4], // Convert 4-digit to 2-digit + 2 => year, // Already 2-digit + _ => { + logger::warn!( + "Invalid year length in expiration date (expected 2 or 4 digits)" + ); + return Err(BackfillError::CsvParsingError( + "Invalid year format in expiration date".to_string(), + )); + } + }; + + logger::debug!("Successfully parsed expiration date... ",); + Ok((Some(month.to_string()), Some(final_year.to_string()))) + }) + }) + .unwrap_or_else(|| { + logger::debug!("Empty expiration date, returning None"); + Ok((None, None)) + }) +} diff --git a/crates/router/src/lib.rs b/crates/router/src/lib.rs index d92dc311e0..a1ec5bd508 100644 --- a/crates/router/src/lib.rs +++ b/crates/router/src/lib.rs @@ -226,7 +226,8 @@ pub fn mk_app( .service(routes::UserDeprecated::server(state.clone())) .service(routes::ProcessTrackerDeprecated::server(state.clone())) .service(routes::ProcessTracker::server(state.clone())) - .service(routes::Gsm::server(state.clone())); + .service(routes::Gsm::server(state.clone())) + .service(routes::RecoveryDataBackfill::server(state.clone())); } } diff --git a/crates/router/src/routes.rs b/crates/router/src/routes.rs index b8fde79e2a..d49f81d9d9 100644 --- a/crates/router/src/routes.rs +++ b/crates/router/src/routes.rs @@ -48,6 +48,8 @@ pub mod profiles; #[cfg(feature = "recon")] pub mod recon; pub mod refunds; +#[cfg(feature = "v2")] +pub mod revenue_recovery_data_backfill; #[cfg(feature = "olap")] pub mod routing; pub mod three_ds_decision_rule; @@ -85,8 +87,6 @@ pub use self::app::PaymentMethodSession; pub use self::app::Proxy; #[cfg(all(feature = "olap", feature = "recon", feature = "v1"))] pub use self::app::Recon; -#[cfg(feature = "v2")] -pub use self::app::Tokenization; pub use self::app::{ ApiKeys, AppState, ApplePayCertificatesMigration, Authentication, Cache, Cards, Chat, Configs, ConnectorOnboarding, Customers, Disputes, EphemeralKey, FeatureMatrix, Files, Forex, Gsm, @@ -99,6 +99,8 @@ pub use self::app::{ pub use self::app::{Blocklist, Organization, Routing, Verify, WebhookEvents}; #[cfg(feature = "payouts")] pub use self::app::{PayoutLink, Payouts}; +#[cfg(feature = "v2")] +pub use self::app::{RecoveryDataBackfill, Tokenization}; #[cfg(all(feature = "stripe", feature = "v1"))] pub use super::compatibility::stripe::StripeApis; #[cfg(feature = "olap")] diff --git a/crates/router/src/routes/app.rs b/crates/router/src/routes/app.rs index d52dc1570c..9f27455c75 100644 --- a/crates/router/src/routes/app.rs +++ b/crates/router/src/routes/app.rs @@ -2970,3 +2970,19 @@ impl ProfileAcquirer { ) } } + +#[cfg(feature = "v2")] +pub struct RecoveryDataBackfill; +#[cfg(feature = "v2")] +impl RecoveryDataBackfill { + pub fn server(state: AppState) -> Scope { + web::scope("/v2/recovery/data-backfill") + .app_data(web::Data::new(state)) + .service( + web::resource("").route( + web::post() + .to(super::revenue_recovery_data_backfill::revenue_recovery_data_backfill), + ), + ) + } +} diff --git a/crates/router/src/routes/lock_utils.rs b/crates/router/src/routes/lock_utils.rs index 28581198d8..76a8839a91 100644 --- a/crates/router/src/routes/lock_utils.rs +++ b/crates/router/src/routes/lock_utils.rs @@ -49,6 +49,7 @@ pub enum ApiIdentifier { ProfileAcquirer, ThreeDsDecisionRule, GenericTokenization, + RecoveryDataBackfill, } impl From for ApiIdentifier { @@ -380,6 +381,8 @@ impl From for ApiIdentifier { Flow::TokenizationCreate | Flow::TokenizationRetrieve | Flow::TokenizationDelete => { Self::GenericTokenization } + + Flow::RecoveryDataBackfill => Self::RecoveryDataBackfill, } } } diff --git a/crates/router/src/routes/revenue_recovery_data_backfill.rs b/crates/router/src/routes/revenue_recovery_data_backfill.rs new file mode 100644 index 0000000000..340d9a084b --- /dev/null +++ b/crates/router/src/routes/revenue_recovery_data_backfill.rs @@ -0,0 +1,67 @@ +use actix_multipart::form::MultipartForm; +use actix_web::{web, HttpRequest, HttpResponse}; +use api_models::revenue_recovery_data_backfill::{BackfillQuery, RevenueRecoveryDataBackfillForm}; +use router_env::{instrument, tracing, Flow}; + +use crate::{ + core::{api_locking, revenue_recovery_data_backfill}, + routes::AppState, + services::{api, authentication as auth}, + types::domain, +}; + +#[instrument(skip_all, fields(flow = ?Flow::RecoveryDataBackfill))] +pub async fn revenue_recovery_data_backfill( + state: web::Data, + req: HttpRequest, + query: web::Query, + MultipartForm(form): MultipartForm, +) -> HttpResponse { + let flow = Flow::RecoveryDataBackfill; + + // Parse cutoff_time from query parameter + let cutoff_datetime = match query + .cutoff_time + .as_ref() + .map(|time_str| { + time::PrimitiveDateTime::parse( + time_str, + &time::format_description::well_known::Iso8601::DEFAULT, + ) + }) + .transpose() + { + Ok(datetime) => datetime, + Err(err) => { + return HttpResponse::BadRequest().json(serde_json::json!({ + "error": format!("Invalid datetime format: {}. Use ISO8601: 2024-01-15T10:30:00", err) + })); + } + }; + + let records = match form.validate_and_get_records_with_errors() { + Ok(records) => records, + Err(e) => { + return HttpResponse::BadRequest().json(serde_json::json!({ + "error": e.to_string() + })); + } + }; + + Box::pin(api::server_wrap( + flow, + state, + &req, + records, + |state, _, records, _req| { + revenue_recovery_data_backfill::revenue_recovery_data_backfill( + state, + records.records, + cutoff_datetime, + ) + }, + &auth::V2AdminApiAuth, + api_locking::LockAction::NotApplicable, + )) + .await +} diff --git a/crates/router/src/types/storage/revenue_recovery_redis_operation.rs b/crates/router/src/types/storage/revenue_recovery_redis_operation.rs index f083e8bbbe..e89db0d1aa 100644 --- a/crates/router/src/types/storage/revenue_recovery_redis_operation.rs +++ b/crates/router/src/types/storage/revenue_recovery_redis_operation.rs @@ -1,9 +1,10 @@ use std::collections::HashMap; +use api_models; use common_enums::enums::CardNetwork; use common_utils::{date_time, errors::CustomResult, id_type}; use error_stack::ResultExt; -use masking::Secret; +use masking::{ExposeInterface, Secret}; use redis_interface::{DelReply, SetnxReply}; use router_env::{instrument, logger, tracing}; use serde::{Deserialize, Serialize}; @@ -214,7 +215,6 @@ impl RedisTokenManager { .await .change_context(get_hash_err)?; - // build the result map using iterator adapters (explicit match preserved for logging) let payment_processor_token_info_map: HashMap = payment_processor_tokens .into_iter() @@ -754,4 +754,93 @@ impl RedisTokenManager { Ok(token) } + + /// Update Redis token with comprehensive card data + #[instrument(skip_all)] + pub async fn update_redis_token_with_comprehensive_card_data( + state: &SessionState, + customer_id: &str, + token: &str, + card_data: &api_models::revenue_recovery_data_backfill::ComprehensiveCardData, + cutoff_datetime: Option, + ) -> CustomResult<(), errors::StorageError> { + // Get existing token data + let mut token_map = + Self::get_connector_customer_payment_processor_tokens(state, customer_id).await?; + + // Find the token to update + let existing_token = token_map.get_mut(token).ok_or_else(|| { + tracing::warn!( + customer_id = customer_id, + "Token not found in parsed Redis data - may be corrupted or missing for " + ); + error_stack::Report::new(errors::StorageError::ValueNotFound( + "Token not found in Redis".to_string(), + )) + })?; + + // Update the token details with new card data + card_data.card_type.as_ref().map(|card_type| { + existing_token.payment_processor_token_details.card_type = Some(card_type.clone()) + }); + + card_data.card_exp_month.as_ref().map(|exp_month| { + existing_token.payment_processor_token_details.expiry_month = Some(exp_month.clone()) + }); + + card_data.card_exp_year.as_ref().map(|exp_year| { + existing_token.payment_processor_token_details.expiry_year = Some(exp_year.clone()) + }); + + card_data.card_network.as_ref().map(|card_network| { + existing_token.payment_processor_token_details.card_network = Some(card_network.clone()) + }); + + card_data.card_issuer.as_ref().map(|card_issuer| { + existing_token.payment_processor_token_details.card_issuer = Some(card_issuer.clone()) + }); + + // Update daily retry history if provided + card_data + .daily_retry_history + .as_ref() + .map(|retry_history| existing_token.daily_retry_history = retry_history.clone()); + + // If cutoff_datetime is provided and existing scheduled_at < cutoff_datetime, set to None + // If no scheduled_at value exists, leave it as None + existing_token.scheduled_at = existing_token + .scheduled_at + .and_then(|existing_scheduled_at| { + cutoff_datetime + .map(|cutoff| { + if existing_scheduled_at < cutoff { + tracing::info!( + customer_id = customer_id, + existing_scheduled_at = %existing_scheduled_at, + cutoff_datetime = %cutoff, + "Set scheduled_at to None because existing time is before cutoff time" + ); + None + } else { + Some(existing_scheduled_at) + } + }) + .unwrap_or(Some(existing_scheduled_at)) // No cutoff provided, keep existing value + }); + + // Save the updated token map back to Redis + Self::update_or_add_connector_customer_payment_processor_tokens( + state, + customer_id, + token_map, + ) + .await?; + + tracing::info!( + customer_id = customer_id, + "Updated Redis token data with comprehensive card data using struct" + ); + + Ok(()) + } } diff --git a/crates/router_env/src/logger/types.rs b/crates/router_env/src/logger/types.rs index f5359d8401..0f03e26a5c 100644 --- a/crates/router_env/src/logger/types.rs +++ b/crates/router_env/src/logger/types.rs @@ -652,6 +652,8 @@ pub enum Flow { RecoveryPaymentsCreate, /// Tokenization delete flow TokenizationDelete, + /// Payment method data backfill flow + RecoveryDataBackfill, /// Gift card balance check flow GiftCardBalanceCheck, }