feat(revenue_recovery): add support to fetch data and update additional token data in redis (#9611)

Co-authored-by: hyperswitch-bot[bot] <148525504+hyperswitch-bot[bot]@users.noreply.github.com>
This commit is contained in:
AdityaWNL
2025-10-01 20:29:52 +05:30
committed by GitHub
parent a517c21c76
commit af159867ae
10 changed files with 398 additions and 9 deletions

View File

@ -3,11 +3,11 @@ use std::{collections::HashMap, fs::File, io::BufReader};
use actix_multipart::form::{tempfile::TempFile, MultipartForm};
use actix_web::{HttpResponse, ResponseError};
use common_enums::{CardNetwork, PaymentMethodType};
use common_utils::events::ApiEventMetric;
use common_utils::{events::ApiEventMetric, pii::PhoneNumberStrategy};
use csv::Reader;
use masking::Secret;
use serde::{Deserialize, Serialize};
use time::Date;
use time::{Date, PrimitiveDateTime};
#[derive(Debug, Deserialize, Serialize)]
pub struct RevenueRecoveryBackfillRequest {
@ -82,6 +82,24 @@ impl ApiEventMetric for CsvParsingError {
}
}
impl ApiEventMetric for RedisDataResponse {
fn get_api_event_type(&self) -> Option<common_utils::events::ApiEventsType> {
Some(common_utils::events::ApiEventsType::Miscellaneous)
}
}
impl ApiEventMetric for UpdateTokenStatusRequest {
fn get_api_event_type(&self) -> Option<common_utils::events::ApiEventsType> {
Some(common_utils::events::ApiEventsType::Miscellaneous)
}
}
impl ApiEventMetric for UpdateTokenStatusResponse {
fn get_api_event_type(&self) -> Option<common_utils::events::ApiEventsType> {
Some(common_utils::events::ApiEventsType::Miscellaneous)
}
}
#[derive(Debug, Clone, Serialize)]
pub enum BackfillError {
InvalidCardType(String),
@ -96,6 +114,72 @@ pub struct BackfillQuery {
pub cutoff_time: Option<String>,
}
#[derive(Debug, Serialize, Deserialize)]
pub enum RedisKeyType {
Status, // for customer:{id}:status
Tokens, // for customer:{id}:tokens
}
#[derive(Debug, Deserialize)]
pub struct GetRedisDataQuery {
pub key_type: RedisKeyType,
}
#[derive(Debug, Serialize)]
pub struct RedisDataResponse {
pub exists: bool,
pub ttl_seconds: i64,
pub data: Option<serde_json::Value>,
}
#[derive(Debug, Serialize)]
pub enum ScheduledAtUpdate {
SetToNull,
SetToDateTime(PrimitiveDateTime),
}
impl<'de> Deserialize<'de> for ScheduledAtUpdate {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
let value = serde_json::Value::deserialize(deserializer)?;
match value {
serde_json::Value::String(s) => {
if s.to_lowercase() == "null" {
Ok(Self::SetToNull)
} else {
// Parse as datetime using iso8601 deserializer
common_utils::custom_serde::iso8601::deserialize(
&mut serde_json::Deserializer::from_str(&format!("\"{}\"", s)),
)
.map(Self::SetToDateTime)
.map_err(serde::de::Error::custom)
}
}
_ => Err(serde::de::Error::custom(
"Expected null variable or datetime iso8601 ",
)),
}
}
}
#[derive(Debug, Deserialize, Serialize)]
pub struct UpdateTokenStatusRequest {
pub connector_customer_id: String,
pub payment_processor_token: Secret<String, PhoneNumberStrategy>,
pub scheduled_at: Option<ScheduledAtUpdate>,
pub is_hard_decline: Option<bool>,
pub error_code: Option<String>,
}
#[derive(Debug, Serialize)]
pub struct UpdateTokenStatusResponse {
pub updated: bool,
pub message: String,
}
impl std::fmt::Display for BackfillError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {

View File

@ -485,6 +485,14 @@ impl super::RedisConnectionPool {
.change_context(errors::RedisError::SetExpiryFailed)
}
#[instrument(level = "DEBUG", skip(self))]
pub async fn get_ttl(&self, key: &RedisKey) -> CustomResult<i64, errors::RedisError> {
self.pool
.ttl(key.tenant_aware_key(self))
.await
.change_context(errors::RedisError::GetFailed)
}
#[instrument(level = "DEBUG", skip(self))]
pub async fn set_hash_fields<V>(
&self,

View File

@ -1,10 +1,12 @@
use std::collections::HashMap;
use api_models::revenue_recovery_data_backfill::{
BackfillError, ComprehensiveCardData, RevenueRecoveryBackfillRequest,
RevenueRecoveryDataBackfillResponse, UnlockStatusResponse,
BackfillError, ComprehensiveCardData, GetRedisDataQuery, RedisDataResponse, RedisKeyType,
RevenueRecoveryBackfillRequest, RevenueRecoveryDataBackfillResponse, ScheduledAtUpdate,
UnlockStatusResponse, UpdateTokenStatusRequest, UpdateTokenStatusResponse,
};
use common_enums::{CardNetwork, PaymentMethodType};
use error_stack::ResultExt;
use hyperswitch_domain_models::api::ApplicationResponse;
use masking::ExposeInterface;
use router_env::{instrument, logger};
@ -86,6 +88,150 @@ pub async fn unlock_connector_customer_status(
Ok(ApplicationResponse::Json(response))
}
pub async fn get_redis_data(
state: SessionState,
connector_customer_id: &str,
key_type: &RedisKeyType,
) -> RouterResult<ApplicationResponse<RedisDataResponse>> {
match storage::revenue_recovery_redis_operation::RedisTokenManager::get_redis_key_data_raw(
&state,
connector_customer_id,
key_type,
)
.await
{
Ok((exists, ttl_seconds, data)) => {
let response = RedisDataResponse {
exists,
ttl_seconds,
data,
};
logger::info!(
"Retrieved Redis data for connector customer {}, exists={}, ttl={}",
connector_customer_id,
exists,
ttl_seconds
);
Ok(ApplicationResponse::Json(response))
}
Err(error) => Err(
error.change_context(errors::ApiErrorResponse::GenericNotFoundError {
message: format!(
"Redis data not found for connector customer id:- '{}'",
connector_customer_id
),
}),
),
}
}
pub async fn redis_update_additional_details_for_revenue_recovery(
state: SessionState,
request: UpdateTokenStatusRequest,
) -> RouterResult<ApplicationResponse<UpdateTokenStatusResponse>> {
// Get existing token
let existing_token = storage::revenue_recovery_redis_operation::
RedisTokenManager::get_payment_processor_token_using_token_id(
&state,
&request.connector_customer_id,
&request.payment_processor_token.clone().expose(),
)
.await
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable("Failed to retrieve existing token data")?;
// Check if token exists
let mut token_status = existing_token.ok_or_else(|| {
error_stack::Report::new(errors::ApiErrorResponse::GenericNotFoundError {
message: format!(
"Token '{:?}' not found for connector customer id:- '{}'",
request.payment_processor_token, request.connector_customer_id
),
})
})?;
let mut updated_fields = Vec::new();
// Handle scheduled_at update
match request.scheduled_at {
Some(ScheduledAtUpdate::SetToDateTime(dt)) => {
// Field provided with datetime - update schedule_at field with datetime
token_status.scheduled_at = Some(dt);
updated_fields.push(format!("scheduled_at: {}", dt));
logger::info!(
"Set scheduled_at to '{}' for token '{:?}'",
dt,
request.payment_processor_token
);
}
Some(ScheduledAtUpdate::SetToNull) => {
// Field provided with "null" variable - set schedule_at field to null
token_status.scheduled_at = None;
updated_fields.push("scheduled_at: set to null".to_string());
logger::info!(
"Set scheduled_at to null for token '{:?}'",
request.payment_processor_token
);
}
None => {
// Field not provided - we don't update schedule_at field
logger::debug!("scheduled_at not provided in request - leaving unchanged");
}
}
// Update is_hard_decline field
request.is_hard_decline.map(|is_hard_decline| {
token_status.is_hard_decline = Some(is_hard_decline);
updated_fields.push(format!("is_hard_decline: {}", is_hard_decline));
});
// Update error_code field
request.error_code.as_ref().map(|error_code| {
token_status.error_code = Some(error_code.clone());
updated_fields.push(format!("error_code: {}", error_code));
});
// Update Redis with modified token
let mut tokens_map = HashMap::new();
tokens_map.insert(
request.payment_processor_token.clone().expose(),
token_status,
);
storage::revenue_recovery_redis_operation::
RedisTokenManager::update_or_add_connector_customer_payment_processor_tokens(
&state,
&request.connector_customer_id,
tokens_map,
)
.await
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable("Failed to update token status in Redis")?;
let updated_fields_str = if updated_fields.is_empty() {
"no fields were updated".to_string()
} else {
updated_fields.join(", ")
};
let response = UpdateTokenStatusResponse {
updated: true,
message: format!(
"Successfully updated token '{:?}' for connector customer '{}'. Updated fields: {}",
request.payment_processor_token, request.connector_customer_id, updated_fields_str
),
};
logger::info!(
"Updated token status for connector customer {}, token: {:?}",
request.connector_customer_id,
request.payment_processor_token
);
Ok(ApplicationResponse::Json(response))
}
async fn process_payment_method_record(
state: &SessionState,

View File

@ -50,6 +50,8 @@ pub mod recon;
pub mod refunds;
#[cfg(feature = "v2")]
pub mod revenue_recovery_data_backfill;
#[cfg(feature = "v2")]
pub mod revenue_recovery_redis;
#[cfg(feature = "olap")]
pub mod routing;
#[cfg(feature = "v1")]

View File

@ -3029,5 +3029,15 @@ impl RecoveryDataBackfill {
super::revenue_recovery_data_backfill::revenue_recovery_data_backfill_status,
),
))
.service(web::resource("/redis-data/{token_id}").route(
web::get().to(
super::revenue_recovery_redis::get_revenue_recovery_redis_data,
),
))
.service(web::resource("/update-token").route(
web::put().to(
super::revenue_recovery_data_backfill::update_revenue_recovery_additional_redis_data,
),
))
}
}

View File

@ -50,7 +50,7 @@ pub enum ApiIdentifier {
ProfileAcquirer,
ThreeDsDecisionRule,
GenericTokenization,
RecoveryDataBackfill,
RecoveryRecovery,
}
impl From<Flow> for ApiIdentifier {
@ -350,7 +350,7 @@ impl From<Flow> for ApiIdentifier {
Self::GenericTokenization
}
Flow::RecoveryDataBackfill => Self::RecoveryDataBackfill,
Flow::RecoveryDataBackfill | Flow::RevenueRecoveryRedis => Self::RecoveryRecovery,
}
}
}

View File

@ -1,6 +1,8 @@
use actix_multipart::form::MultipartForm;
use actix_web::{web, HttpRequest, HttpResponse};
use api_models::revenue_recovery_data_backfill::{BackfillQuery, RevenueRecoveryDataBackfillForm};
use api_models::revenue_recovery_data_backfill::{
BackfillQuery, GetRedisDataQuery, RevenueRecoveryDataBackfillForm, UpdateTokenStatusRequest,
};
use router_env::{instrument, tracing, Flow};
use crate::{
@ -66,6 +68,30 @@ pub async fn revenue_recovery_data_backfill(
.await
}
#[instrument(skip_all, fields(flow = ?Flow::RecoveryDataBackfill))]
pub async fn update_revenue_recovery_additional_redis_data(
state: web::Data<AppState>,
req: HttpRequest,
json_payload: web::Json<UpdateTokenStatusRequest>,
) -> HttpResponse {
let flow = Flow::RecoveryDataBackfill;
Box::pin(api::server_wrap(
flow,
state,
&req,
json_payload.into_inner(),
|state, _: (), request, _| {
revenue_recovery_data_backfill::redis_update_additional_details_for_revenue_recovery(
state, request,
)
},
&auth::V2AdminApiAuth,
api_locking::LockAction::NotApplicable,
))
.await
}
#[instrument(skip_all, fields(flow = ?Flow::RecoveryDataBackfill))]
pub async fn revenue_recovery_data_backfill_status(
state: web::Data<AppState>,

View File

@ -0,0 +1,34 @@
use actix_web::{web, HttpRequest, HttpResponse};
use api_models::revenue_recovery_data_backfill::GetRedisDataQuery;
use router_env::{instrument, tracing, Flow};
use crate::{
core::{api_locking, revenue_recovery_data_backfill},
routes::AppState,
services::{api, authentication as auth},
};
#[instrument(skip_all, fields(flow = ?Flow::RevenueRecoveryRedis))]
pub async fn get_revenue_recovery_redis_data(
state: web::Data<AppState>,
req: HttpRequest,
path: web::Path<String>,
query: web::Query<GetRedisDataQuery>,
) -> HttpResponse {
let flow = Flow::RevenueRecoveryRedis;
let connector_customer_id = path.into_inner();
let key_type = &query.key_type;
Box::pin(api::server_wrap(
flow,
state,
&req,
(),
|state, _: (), _, _| {
revenue_recovery_data_backfill::get_redis_data(state, &connector_customer_id, key_type)
},
&auth::V2AdminApiAuth,
api_locking::LockAction::NotApplicable,
))
.await
}

View File

@ -1,6 +1,6 @@
use std::collections::HashMap;
use api_models;
use api_models::revenue_recovery_data_backfill::{self, RedisKeyType};
use common_enums::enums::CardNetwork;
use common_utils::{date_time, errors::CustomResult, id_type};
use error_stack::ResultExt;
@ -755,13 +755,90 @@ impl RedisTokenManager {
Ok(token)
}
/// Get Redis key data for revenue recovery
#[instrument(skip_all)]
pub async fn get_redis_key_data_raw(
state: &SessionState,
connector_customer_id: &str,
key_type: &RedisKeyType,
) -> CustomResult<(bool, i64, Option<serde_json::Value>), errors::StorageError> {
let redis_conn =
state
.store
.get_redis_conn()
.change_context(errors::StorageError::RedisError(
errors::RedisError::RedisConnectionError.into(),
))?;
let redis_key = match key_type {
RedisKeyType::Status => Self::get_connector_customer_lock_key(connector_customer_id),
RedisKeyType::Tokens => Self::get_connector_customer_tokens_key(connector_customer_id),
};
// Get TTL
let ttl = redis_conn
.get_ttl(&redis_key.clone().into())
.await
.map_err(|error| {
tracing::error!(operation = "get_ttl", err = ?error);
errors::StorageError::RedisError(errors::RedisError::GetHashFieldFailed.into())
})?;
// Get data based on key type and determine existence
let (key_exists, data) = match key_type {
RedisKeyType::Status => match redis_conn.get_key::<String>(&redis_key.into()).await {
Ok(status_value) => (true, serde_json::Value::String(status_value)),
Err(error) => {
tracing::error!(operation = "get_status_key", err = ?error);
(
false,
serde_json::Value::String(format!(
"Error retrieving status key: {}",
error
)),
)
}
},
RedisKeyType::Tokens => {
match redis_conn
.get_hash_fields::<HashMap<String, String>>(&redis_key.into())
.await
{
Ok(hash_fields) => {
let exists = !hash_fields.is_empty();
let data = if exists {
serde_json::to_value(hash_fields).unwrap_or(serde_json::Value::Null)
} else {
serde_json::Value::Object(serde_json::Map::new())
};
(exists, data)
}
Err(error) => {
tracing::error!(operation = "get_tokens_hash", err = ?error);
(false, serde_json::Value::Null)
}
}
}
};
tracing::debug!(
connector_customer_id = connector_customer_id,
key_type = ?key_type,
exists = key_exists,
ttl = ttl,
"Retrieved Redis key data"
);
Ok((key_exists, ttl, Some(data)))
}
/// Update Redis token with comprehensive card data
#[instrument(skip_all)]
pub async fn update_redis_token_with_comprehensive_card_data(
state: &SessionState,
customer_id: &str,
token: &str,
card_data: &api_models::revenue_recovery_data_backfill::ComprehensiveCardData,
card_data: &revenue_recovery_data_backfill::ComprehensiveCardData,
cutoff_datetime: Option<PrimitiveDateTime>,
) -> CustomResult<(), errors::StorageError> {
// Get existing token data

View File

@ -660,6 +660,8 @@ pub enum Flow {
TokenizationDelete,
/// Payment method data backfill flow
RecoveryDataBackfill,
/// Revenue recovery Redis operations flow
RevenueRecoveryRedis,
/// Gift card balance check flow
GiftCardBalanceCheck,
}