mirror of
https://github.com/juspay/hyperswitch.git
synced 2025-10-27 19:46:48 +08:00
fix: add fallback to reverselookup error (#3025)
Co-authored-by: hyperswitch-bot[bot] <148525504+hyperswitch-bot[bot]@users.noreply.github.com>
This commit is contained in:
1
Cargo.lock
generated
1
Cargo.lock
generated
@ -2052,6 +2052,7 @@ dependencies = [
|
||||
"async-trait",
|
||||
"common_enums",
|
||||
"common_utils",
|
||||
"diesel_models",
|
||||
"error-stack",
|
||||
"masking",
|
||||
"serde",
|
||||
|
||||
@ -10,6 +10,7 @@ pub mod errors;
|
||||
pub mod events;
|
||||
pub mod ext_traits;
|
||||
pub mod fp_utils;
|
||||
pub mod macros;
|
||||
pub mod pii;
|
||||
#[allow(missing_docs)] // Todo: add docs
|
||||
pub mod request;
|
||||
|
||||
92
crates/common_utils/src/macros.rs
Normal file
92
crates/common_utils/src/macros.rs
Normal file
@ -0,0 +1,92 @@
|
||||
#![allow(missing_docs)]
|
||||
|
||||
#[macro_export]
|
||||
macro_rules! newtype_impl {
|
||||
($is_pub:vis, $name:ident, $ty_path:path) => {
|
||||
impl std::ops::Deref for $name {
|
||||
type Target = $ty_path;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl std::ops::DerefMut for $name {
|
||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||
&mut self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl From<$ty_path> for $name {
|
||||
fn from(ty: $ty_path) -> Self {
|
||||
Self(ty)
|
||||
}
|
||||
}
|
||||
|
||||
impl $name {
|
||||
pub fn into_inner(self) -> $ty_path {
|
||||
self.0
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
#[macro_export]
|
||||
macro_rules! newtype {
|
||||
($is_pub:vis $name:ident = $ty_path:path) => {
|
||||
$is_pub struct $name(pub $ty_path);
|
||||
|
||||
$crate::newtype_impl!($is_pub, $name, $ty_path);
|
||||
};
|
||||
|
||||
($is_pub:vis $name:ident = $ty_path:path, derives = ($($trt:path),*)) => {
|
||||
#[derive($($trt),*)]
|
||||
$is_pub struct $name(pub $ty_path);
|
||||
|
||||
$crate::newtype_impl!($is_pub, $name, $ty_path);
|
||||
};
|
||||
}
|
||||
|
||||
#[macro_export]
|
||||
macro_rules! async_spawn {
|
||||
($t:block) => {
|
||||
tokio::spawn(async move { $t });
|
||||
};
|
||||
}
|
||||
|
||||
#[macro_export]
|
||||
macro_rules! fallback_reverse_lookup_not_found {
|
||||
($a:expr,$b:expr) => {
|
||||
match $a {
|
||||
Ok(res) => res,
|
||||
Err(err) => {
|
||||
router_env::logger::error!(reverse_lookup_fallback = %err);
|
||||
match err.current_context() {
|
||||
errors::StorageError::ValueNotFound(_) => return $b,
|
||||
errors::StorageError::DatabaseError(data_err) => {
|
||||
match data_err.current_context() {
|
||||
diesel_models::errors::DatabaseError::NotFound => return $b,
|
||||
_ => return Err(err)
|
||||
}
|
||||
}
|
||||
_=> return Err(err)
|
||||
}
|
||||
}
|
||||
};
|
||||
};
|
||||
}
|
||||
|
||||
#[macro_export]
|
||||
macro_rules! collect_missing_value_keys {
|
||||
[$(($key:literal, $option:expr)),+] => {
|
||||
{
|
||||
let mut keys: Vec<&'static str> = Vec::new();
|
||||
$(
|
||||
if $option.is_none() {
|
||||
keys.push($key);
|
||||
}
|
||||
)*
|
||||
keys
|
||||
}
|
||||
};
|
||||
}
|
||||
@ -17,6 +17,7 @@ api_models = { version = "0.1.0", path = "../api_models" }
|
||||
common_enums = { version = "0.1.0", path = "../common_enums" }
|
||||
common_utils = { version = "0.1.0", path = "../common_utils" }
|
||||
masking = { version = "0.1.0", path = "../masking" }
|
||||
diesel_models = { version = "0.1.0", path = "../diesel_models", features = ["kv_store"] }
|
||||
|
||||
# Third party deps
|
||||
async-trait = "0.1.68"
|
||||
|
||||
@ -1,3 +1,5 @@
|
||||
use diesel_models::errors::DatabaseError;
|
||||
|
||||
pub type StorageResult<T> = error_stack::Result<T, StorageError>;
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
@ -6,7 +8,7 @@ pub enum StorageError {
|
||||
InitializationError,
|
||||
// TODO: deprecate this error type to use a domain error instead
|
||||
#[error("DatabaseError: {0:?}")]
|
||||
DatabaseError(String),
|
||||
DatabaseError(error_stack::Report<DatabaseError>),
|
||||
#[error("ValueNotFound: {0}")]
|
||||
ValueNotFound(String),
|
||||
#[error("DuplicateValue: {entity} already exists {key:?}")]
|
||||
|
||||
@ -1,4 +1,4 @@
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
#[derive(Copy, Clone, Debug, thiserror::Error)]
|
||||
pub enum DatabaseError {
|
||||
#[error("An error occurred when obtaining database connection")]
|
||||
DatabaseConnectionError,
|
||||
@ -14,3 +14,17 @@ pub enum DatabaseError {
|
||||
#[error("An unknown error occurred")]
|
||||
Others,
|
||||
}
|
||||
|
||||
impl From<diesel::result::Error> for DatabaseError {
|
||||
fn from(error: diesel::result::Error) -> Self {
|
||||
match error {
|
||||
diesel::result::Error::DatabaseError(
|
||||
diesel::result::DatabaseErrorKind::UniqueViolation,
|
||||
_,
|
||||
) => Self::UniqueViolation,
|
||||
diesel::result::Error::NotFound => Self::NotFound,
|
||||
diesel::result::Error::QueryBuilderError(_) => Self::QueryGenerationFailed,
|
||||
_ => Self::Others,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -267,7 +267,7 @@ mod storage {
|
||||
|
||||
#[cfg(feature = "kv_store")]
|
||||
mod storage {
|
||||
use common_utils::date_time;
|
||||
use common_utils::{date_time, fallback_reverse_lookup_not_found};
|
||||
use error_stack::{IntoReport, ResultExt};
|
||||
use redis_interface::HsetnxReply;
|
||||
use storage_impl::redis::kv_store::{kv_wrapper, KvOperation};
|
||||
@ -277,7 +277,6 @@ mod storage {
|
||||
connection,
|
||||
core::errors::{self, CustomResult},
|
||||
db::reverse_lookup::ReverseLookupInterface,
|
||||
logger,
|
||||
services::Store,
|
||||
types::storage::{self as storage_types, enums, kv},
|
||||
utils::{self, db_utils},
|
||||
@ -304,10 +303,12 @@ mod storage {
|
||||
match storage_scheme {
|
||||
enums::MerchantStorageScheme::PostgresOnly => database_call().await,
|
||||
enums::MerchantStorageScheme::RedisKv => {
|
||||
let lookup_id = format!("{merchant_id}_{internal_reference_id}");
|
||||
let lookup = self
|
||||
.get_lookup_by_lookup_id(&lookup_id, storage_scheme)
|
||||
.await?;
|
||||
let lookup_id = format!("ref_inter_ref_{merchant_id}_{internal_reference_id}");
|
||||
let lookup = fallback_reverse_lookup_not_found!(
|
||||
self.get_lookup_by_lookup_id(&lookup_id, storage_scheme)
|
||||
.await,
|
||||
database_call().await
|
||||
);
|
||||
|
||||
let key = &lookup.pk_id;
|
||||
Box::pin(db_utils::try_redis_get_else_try_database_get(
|
||||
@ -382,6 +383,50 @@ mod storage {
|
||||
},
|
||||
};
|
||||
|
||||
let mut reverse_lookups = vec![
|
||||
storage_types::ReverseLookupNew {
|
||||
sk_id: field.clone(),
|
||||
lookup_id: format!(
|
||||
"ref_ref_id_{}_{}",
|
||||
created_refund.merchant_id, created_refund.refund_id
|
||||
),
|
||||
pk_id: key.clone(),
|
||||
source: "refund".to_string(),
|
||||
updated_by: storage_scheme.to_string(),
|
||||
},
|
||||
// [#492]: A discussion is required on whether this is required?
|
||||
storage_types::ReverseLookupNew {
|
||||
sk_id: field.clone(),
|
||||
lookup_id: format!(
|
||||
"ref_inter_ref_{}_{}",
|
||||
created_refund.merchant_id, created_refund.internal_reference_id
|
||||
),
|
||||
pk_id: key.clone(),
|
||||
source: "refund".to_string(),
|
||||
updated_by: storage_scheme.to_string(),
|
||||
},
|
||||
];
|
||||
if let Some(connector_refund_id) = created_refund.to_owned().connector_refund_id
|
||||
{
|
||||
reverse_lookups.push(storage_types::ReverseLookupNew {
|
||||
sk_id: field.clone(),
|
||||
lookup_id: format!(
|
||||
"ref_connector_{}_{}_{}",
|
||||
created_refund.merchant_id,
|
||||
connector_refund_id,
|
||||
created_refund.connector
|
||||
),
|
||||
pk_id: key.clone(),
|
||||
source: "refund".to_string(),
|
||||
updated_by: storage_scheme.to_string(),
|
||||
})
|
||||
};
|
||||
let rev_look = reverse_lookups
|
||||
.into_iter()
|
||||
.map(|rev| self.insert_reverse_lookup(rev, storage_scheme));
|
||||
|
||||
futures::future::try_join_all(rev_look).await?;
|
||||
|
||||
match kv_wrapper::<storage_types::Refund, _, _>(
|
||||
self,
|
||||
KvOperation::<storage_types::Refund>::HSetNx(
|
||||
@ -400,55 +445,7 @@ mod storage {
|
||||
key: Some(created_refund.refund_id),
|
||||
})
|
||||
.into_report(),
|
||||
Ok(HsetnxReply::KeySet) => {
|
||||
let mut reverse_lookups = vec![
|
||||
storage_types::ReverseLookupNew {
|
||||
sk_id: field.clone(),
|
||||
lookup_id: format!(
|
||||
"{}_{}",
|
||||
created_refund.merchant_id, created_refund.refund_id
|
||||
),
|
||||
pk_id: key.clone(),
|
||||
source: "refund".to_string(),
|
||||
updated_by: storage_scheme.to_string(),
|
||||
},
|
||||
// [#492]: A discussion is required on whether this is required?
|
||||
storage_types::ReverseLookupNew {
|
||||
sk_id: field.clone(),
|
||||
lookup_id: format!(
|
||||
"{}_{}",
|
||||
created_refund.merchant_id,
|
||||
created_refund.internal_reference_id
|
||||
),
|
||||
pk_id: key.clone(),
|
||||
source: "refund".to_string(),
|
||||
updated_by: storage_scheme.to_string(),
|
||||
},
|
||||
];
|
||||
if let Some(connector_refund_id) =
|
||||
created_refund.to_owned().connector_refund_id
|
||||
{
|
||||
reverse_lookups.push(storage_types::ReverseLookupNew {
|
||||
sk_id: field.clone(),
|
||||
lookup_id: format!(
|
||||
"{}_{}_{}",
|
||||
created_refund.merchant_id,
|
||||
connector_refund_id,
|
||||
created_refund.connector
|
||||
),
|
||||
pk_id: key,
|
||||
source: "refund".to_string(),
|
||||
updated_by: storage_scheme.to_string(),
|
||||
})
|
||||
};
|
||||
let rev_look = reverse_lookups
|
||||
.into_iter()
|
||||
.map(|rev| self.insert_reverse_lookup(rev, storage_scheme));
|
||||
|
||||
futures::future::try_join_all(rev_look).await?;
|
||||
|
||||
Ok(created_refund)
|
||||
}
|
||||
Ok(HsetnxReply::KeySet) => Ok(created_refund),
|
||||
Err(er) => Err(er).change_context(errors::StorageError::KVError),
|
||||
}
|
||||
}
|
||||
@ -475,17 +472,14 @@ mod storage {
|
||||
match storage_scheme {
|
||||
enums::MerchantStorageScheme::PostgresOnly => database_call().await,
|
||||
enums::MerchantStorageScheme::RedisKv => {
|
||||
let lookup_id = format!("{merchant_id}_{connector_transaction_id}");
|
||||
let lookup = match self
|
||||
.get_lookup_by_lookup_id(&lookup_id, storage_scheme)
|
||||
.await
|
||||
{
|
||||
Ok(l) => l,
|
||||
Err(err) => {
|
||||
logger::error!(?err);
|
||||
return Ok(vec![]);
|
||||
}
|
||||
};
|
||||
let lookup_id =
|
||||
format!("pa_conn_trans_{merchant_id}_{connector_transaction_id}");
|
||||
let lookup = fallback_reverse_lookup_not_found!(
|
||||
self.get_lookup_by_lookup_id(&lookup_id, storage_scheme)
|
||||
.await,
|
||||
database_call().await
|
||||
);
|
||||
|
||||
let key = &lookup.pk_id;
|
||||
|
||||
let pattern = db_utils::generate_hscan_pattern_for_refund(&lookup.sk_id);
|
||||
@ -575,10 +569,12 @@ mod storage {
|
||||
match storage_scheme {
|
||||
enums::MerchantStorageScheme::PostgresOnly => database_call().await,
|
||||
enums::MerchantStorageScheme::RedisKv => {
|
||||
let lookup_id = format!("{merchant_id}_{refund_id}");
|
||||
let lookup = self
|
||||
.get_lookup_by_lookup_id(&lookup_id, storage_scheme)
|
||||
.await?;
|
||||
let lookup_id = format!("ref_ref_id_{merchant_id}_{refund_id}");
|
||||
let lookup = fallback_reverse_lookup_not_found!(
|
||||
self.get_lookup_by_lookup_id(&lookup_id, storage_scheme)
|
||||
.await,
|
||||
database_call().await
|
||||
);
|
||||
|
||||
let key = &lookup.pk_id;
|
||||
Box::pin(db_utils::try_redis_get_else_try_database_get(
|
||||
@ -620,10 +616,13 @@ mod storage {
|
||||
match storage_scheme {
|
||||
enums::MerchantStorageScheme::PostgresOnly => database_call().await,
|
||||
enums::MerchantStorageScheme::RedisKv => {
|
||||
let lookup_id = format!("{merchant_id}_{connector_refund_id}_{connector}");
|
||||
let lookup = self
|
||||
.get_lookup_by_lookup_id(&lookup_id, storage_scheme)
|
||||
.await?;
|
||||
let lookup_id =
|
||||
format!("ref_connector_{merchant_id}_{connector_refund_id}_{connector}");
|
||||
let lookup = fallback_reverse_lookup_not_found!(
|
||||
self.get_lookup_by_lookup_id(&lookup_id, storage_scheme)
|
||||
.await,
|
||||
database_call().await
|
||||
);
|
||||
|
||||
let key = &lookup.pk_id;
|
||||
Box::pin(db_utils::try_redis_get_else_try_database_get(
|
||||
|
||||
@ -193,13 +193,7 @@ fn diesel_error_to_data_error(diesel_error: Report<DatabaseError>) -> Report<Sto
|
||||
entity: "entity ",
|
||||
key: None,
|
||||
},
|
||||
DatabaseError::NoFieldsToUpdate => {
|
||||
StorageError::DatabaseError("No fields to update".to_string())
|
||||
}
|
||||
DatabaseError::QueryGenerationFailed => {
|
||||
StorageError::DatabaseError("Query generation failed".to_string())
|
||||
}
|
||||
DatabaseError::Others => StorageError::DatabaseError("Others".to_string()),
|
||||
err => StorageError::DatabaseError(error_stack::report!(*err)),
|
||||
};
|
||||
diesel_error.change_context(new_err)
|
||||
}
|
||||
|
||||
@ -35,6 +35,7 @@ use storage_impl::errors::ApplicationResult;
|
||||
use tokio::sync::{mpsc, oneshot};
|
||||
|
||||
pub use self::env::logger;
|
||||
pub(crate) use self::macros::*;
|
||||
use crate::{configs::settings, core::errors};
|
||||
|
||||
#[cfg(feature = "mimalloc")]
|
||||
|
||||
@ -1,68 +1,4 @@
|
||||
#[macro_export]
|
||||
macro_rules! newtype_impl {
|
||||
($is_pub:vis, $name:ident, $ty_path:path) => {
|
||||
impl std::ops::Deref for $name {
|
||||
type Target = $ty_path;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl std::ops::DerefMut for $name {
|
||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||
&mut self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl From<$ty_path> for $name {
|
||||
fn from(ty: $ty_path) -> Self {
|
||||
Self(ty)
|
||||
}
|
||||
}
|
||||
|
||||
impl $name {
|
||||
pub fn into_inner(self) -> $ty_path {
|
||||
self.0
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
#[macro_export]
|
||||
macro_rules! newtype {
|
||||
($is_pub:vis $name:ident = $ty_path:path) => {
|
||||
$is_pub struct $name(pub $ty_path);
|
||||
|
||||
$crate::newtype_impl!($is_pub, $name, $ty_path);
|
||||
};
|
||||
|
||||
($is_pub:vis $name:ident = $ty_path:path, derives = ($($trt:path),*)) => {
|
||||
#[derive($($trt),*)]
|
||||
$is_pub struct $name(pub $ty_path);
|
||||
|
||||
$crate::newtype_impl!($is_pub, $name, $ty_path);
|
||||
};
|
||||
}
|
||||
|
||||
#[macro_export]
|
||||
macro_rules! async_spawn {
|
||||
($t:block) => {
|
||||
tokio::spawn(async move { $t });
|
||||
};
|
||||
}
|
||||
|
||||
#[macro_export]
|
||||
macro_rules! collect_missing_value_keys {
|
||||
[$(($key:literal, $option:expr)),+] => {
|
||||
{
|
||||
let mut keys: Vec<&'static str> = Vec::new();
|
||||
$(
|
||||
if $option.is_none() {
|
||||
keys.push($key);
|
||||
}
|
||||
)*
|
||||
keys
|
||||
}
|
||||
};
|
||||
}
|
||||
pub use common_utils::{
|
||||
async_spawn, collect_missing_value_keys, fallback_reverse_lookup_not_found, newtype,
|
||||
newtype_impl,
|
||||
};
|
||||
|
||||
@ -92,15 +92,7 @@ impl Into<DataStorageError> for &StorageError {
|
||||
key: None,
|
||||
}
|
||||
}
|
||||
storage_errors::DatabaseError::NoFieldsToUpdate => {
|
||||
DataStorageError::DatabaseError("No fields to update".to_string())
|
||||
}
|
||||
storage_errors::DatabaseError::QueryGenerationFailed => {
|
||||
DataStorageError::DatabaseError("Query generation failed".to_string())
|
||||
}
|
||||
storage_errors::DatabaseError::Others => {
|
||||
DataStorageError::DatabaseError("Unknown database error".to_string())
|
||||
}
|
||||
err => DataStorageError::DatabaseError(error_stack::report!(*err)),
|
||||
},
|
||||
StorageError::ValueNotFound(i) => DataStorageError::ValueNotFound(i.clone()),
|
||||
StorageError::DuplicateValue { entity, key } => DataStorageError::DuplicateValue {
|
||||
|
||||
@ -251,14 +251,6 @@ pub(crate) fn diesel_error_to_data_error(
|
||||
entity: "entity ",
|
||||
key: None,
|
||||
},
|
||||
diesel_models::errors::DatabaseError::NoFieldsToUpdate => {
|
||||
StorageError::DatabaseError("No fields to update".to_string())
|
||||
}
|
||||
diesel_models::errors::DatabaseError::QueryGenerationFailed => {
|
||||
StorageError::DatabaseError("Query generation failed".to_string())
|
||||
}
|
||||
diesel_models::errors::DatabaseError::Others => {
|
||||
StorageError::DatabaseError("Others".to_string())
|
||||
}
|
||||
_ => StorageError::DatabaseError(error_stack::report!(*diesel_error)),
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,5 +1,5 @@
|
||||
use api_models::enums::{AuthenticationType, Connector, PaymentMethod, PaymentMethodType};
|
||||
use common_utils::errors::CustomResult;
|
||||
use common_utils::{errors::CustomResult, fallback_reverse_lookup_not_found};
|
||||
use data_models::{
|
||||
errors,
|
||||
mandates::{MandateAmountData, MandateDataType},
|
||||
@ -399,6 +399,20 @@ impl<T: DatabaseStore> PaymentAttemptInterface for KVRouterStore<T> {
|
||||
},
|
||||
};
|
||||
|
||||
//Reverse lookup for attempt_id
|
||||
let reverse_lookup = ReverseLookupNew {
|
||||
lookup_id: format!(
|
||||
"pa_{}_{}",
|
||||
&created_attempt.merchant_id, &created_attempt.attempt_id,
|
||||
),
|
||||
pk_id: key.clone(),
|
||||
sk_id: field.clone(),
|
||||
source: "payment_attempt".to_string(),
|
||||
updated_by: storage_scheme.to_string(),
|
||||
};
|
||||
self.insert_reverse_lookup(reverse_lookup, storage_scheme)
|
||||
.await?;
|
||||
|
||||
match kv_wrapper::<PaymentAttempt, _, _>(
|
||||
self,
|
||||
KvOperation::HSetNx(
|
||||
@ -417,23 +431,7 @@ impl<T: DatabaseStore> PaymentAttemptInterface for KVRouterStore<T> {
|
||||
key: Some(key),
|
||||
})
|
||||
.into_report(),
|
||||
Ok(HsetnxReply::KeySet) => {
|
||||
//Reverse lookup for attempt_id
|
||||
let reverse_lookup = ReverseLookupNew {
|
||||
lookup_id: format!(
|
||||
"{}_{}",
|
||||
&created_attempt.merchant_id, &created_attempt.attempt_id,
|
||||
),
|
||||
pk_id: key,
|
||||
sk_id: field,
|
||||
source: "payment_attempt".to_string(),
|
||||
updated_by: storage_scheme.to_string(),
|
||||
};
|
||||
self.insert_reverse_lookup(reverse_lookup, storage_scheme)
|
||||
.await?;
|
||||
|
||||
Ok(created_attempt)
|
||||
}
|
||||
Ok(HsetnxReply::KeySet) => Ok(created_attempt),
|
||||
Err(error) => Err(error.change_context(errors::StorageError::KVError)),
|
||||
}
|
||||
}
|
||||
@ -480,16 +478,6 @@ impl<T: DatabaseStore> PaymentAttemptInterface for KVRouterStore<T> {
|
||||
},
|
||||
};
|
||||
|
||||
kv_wrapper::<(), _, _>(
|
||||
self,
|
||||
KvOperation::Hset::<DieselPaymentAttempt>((&field, redis_value), redis_entry),
|
||||
&key,
|
||||
)
|
||||
.await
|
||||
.change_context(errors::StorageError::KVError)?
|
||||
.try_into_hset()
|
||||
.change_context(errors::StorageError::KVError)?;
|
||||
|
||||
match (
|
||||
old_connector_transaction_id,
|
||||
&updated_attempt.connector_transaction_id,
|
||||
@ -549,6 +537,16 @@ impl<T: DatabaseStore> PaymentAttemptInterface for KVRouterStore<T> {
|
||||
(_, _) => {}
|
||||
}
|
||||
|
||||
kv_wrapper::<(), _, _>(
|
||||
self,
|
||||
KvOperation::Hset::<DieselPaymentAttempt>((&field, redis_value), redis_entry),
|
||||
&key,
|
||||
)
|
||||
.await
|
||||
.change_context(errors::StorageError::KVError)?
|
||||
.try_into_hset()
|
||||
.change_context(errors::StorageError::KVError)?;
|
||||
|
||||
Ok(updated_attempt)
|
||||
}
|
||||
}
|
||||
@ -574,10 +572,20 @@ impl<T: DatabaseStore> PaymentAttemptInterface for KVRouterStore<T> {
|
||||
}
|
||||
MerchantStorageScheme::RedisKv => {
|
||||
// We assume that PaymentAttempt <=> PaymentIntent is a one-to-one relation for now
|
||||
let lookup_id = format!("conn_trans_{merchant_id}_{connector_transaction_id}");
|
||||
let lookup = self
|
||||
.get_lookup_by_lookup_id(&lookup_id, storage_scheme)
|
||||
.await?;
|
||||
let lookup_id = format!("pa_conn_trans_{merchant_id}_{connector_transaction_id}");
|
||||
let lookup = fallback_reverse_lookup_not_found!(
|
||||
self.get_lookup_by_lookup_id(&lookup_id, storage_scheme)
|
||||
.await,
|
||||
self.router_store
|
||||
.find_payment_attempt_by_connector_transaction_id_payment_id_merchant_id(
|
||||
connector_transaction_id,
|
||||
payment_id,
|
||||
merchant_id,
|
||||
storage_scheme,
|
||||
)
|
||||
.await
|
||||
);
|
||||
|
||||
let key = &lookup.pk_id;
|
||||
|
||||
Box::pin(try_redis_get_else_try_database_get(
|
||||
@ -707,10 +715,18 @@ impl<T: DatabaseStore> PaymentAttemptInterface for KVRouterStore<T> {
|
||||
.await
|
||||
}
|
||||
MerchantStorageScheme::RedisKv => {
|
||||
let lookup_id = format!("{merchant_id}_{connector_txn_id}");
|
||||
let lookup = self
|
||||
.get_lookup_by_lookup_id(&lookup_id, storage_scheme)
|
||||
.await?;
|
||||
let lookup_id = format!("pa_conn_trans_{merchant_id}_{connector_txn_id}");
|
||||
let lookup = fallback_reverse_lookup_not_found!(
|
||||
self.get_lookup_by_lookup_id(&lookup_id, storage_scheme)
|
||||
.await,
|
||||
self.router_store
|
||||
.find_payment_attempt_by_merchant_id_connector_txn_id(
|
||||
merchant_id,
|
||||
connector_txn_id,
|
||||
storage_scheme,
|
||||
)
|
||||
.await
|
||||
);
|
||||
|
||||
let key = &lookup.pk_id;
|
||||
Box::pin(try_redis_get_else_try_database_get(
|
||||
@ -799,10 +815,19 @@ impl<T: DatabaseStore> PaymentAttemptInterface for KVRouterStore<T> {
|
||||
.await
|
||||
}
|
||||
MerchantStorageScheme::RedisKv => {
|
||||
let lookup_id = format!("{merchant_id}_{attempt_id}");
|
||||
let lookup = self
|
||||
.get_lookup_by_lookup_id(&lookup_id, storage_scheme)
|
||||
.await?;
|
||||
let lookup_id = format!("pa_{merchant_id}_{attempt_id}");
|
||||
let lookup = fallback_reverse_lookup_not_found!(
|
||||
self.get_lookup_by_lookup_id(&lookup_id, storage_scheme)
|
||||
.await,
|
||||
self.router_store
|
||||
.find_payment_attempt_by_attempt_id_merchant_id(
|
||||
attempt_id,
|
||||
merchant_id,
|
||||
storage_scheme,
|
||||
)
|
||||
.await
|
||||
);
|
||||
|
||||
let key = &lookup.pk_id;
|
||||
Box::pin(try_redis_get_else_try_database_get(
|
||||
async {
|
||||
@ -846,10 +871,18 @@ impl<T: DatabaseStore> PaymentAttemptInterface for KVRouterStore<T> {
|
||||
.await
|
||||
}
|
||||
MerchantStorageScheme::RedisKv => {
|
||||
let lookup_id = format!("preprocessing_{merchant_id}_{preprocessing_id}");
|
||||
let lookup = self
|
||||
.get_lookup_by_lookup_id(&lookup_id, storage_scheme)
|
||||
.await?;
|
||||
let lookup_id = format!("pa_preprocessing_{merchant_id}_{preprocessing_id}");
|
||||
let lookup = fallback_reverse_lookup_not_found!(
|
||||
self.get_lookup_by_lookup_id(&lookup_id, storage_scheme)
|
||||
.await,
|
||||
self.router_store
|
||||
.find_payment_attempt_by_preprocessing_id_merchant_id(
|
||||
preprocessing_id,
|
||||
merchant_id,
|
||||
storage_scheme,
|
||||
)
|
||||
.await
|
||||
);
|
||||
let key = &lookup.pk_id;
|
||||
|
||||
Box::pin(try_redis_get_else_try_database_get(
|
||||
@ -1757,7 +1790,7 @@ async fn add_connector_txn_id_to_reverse_lookup<T: DatabaseStore>(
|
||||
) -> CustomResult<ReverseLookup, errors::StorageError> {
|
||||
let field = format!("pa_{}", updated_attempt_attempt_id);
|
||||
let reverse_lookup_new = ReverseLookupNew {
|
||||
lookup_id: format!("conn_trans_{}_{}", merchant_id, connector_transaction_id),
|
||||
lookup_id: format!("pa_conn_trans_{}_{}", merchant_id, connector_transaction_id),
|
||||
pk_id: key.to_owned(),
|
||||
sk_id: field.clone(),
|
||||
source: "payment_attempt".to_string(),
|
||||
@ -1779,7 +1812,7 @@ async fn add_preprocessing_id_to_reverse_lookup<T: DatabaseStore>(
|
||||
) -> CustomResult<ReverseLookup, errors::StorageError> {
|
||||
let field = format!("pa_{}", updated_attempt_attempt_id);
|
||||
let reverse_lookup_new = ReverseLookupNew {
|
||||
lookup_id: format!("preprocessing_{}_{}", merchant_id, preprocessing_id),
|
||||
lookup_id: format!("pa_preprocessing_{}_{}", merchant_id, preprocessing_id),
|
||||
pk_id: key.to_owned(),
|
||||
sk_id: field.clone(),
|
||||
source: "payment_attempt".to_string(),
|
||||
|
||||
@ -494,12 +494,13 @@ impl<T: DatabaseStore> PaymentIntentInterface for crate::RouterStore<T> {
|
||||
.map(PaymentIntent::from_storage_model)
|
||||
.collect::<Vec<PaymentIntent>>()
|
||||
})
|
||||
.into_report()
|
||||
.map_err(|er| {
|
||||
let new_err = StorageError::DatabaseError(format!("{er:?}"));
|
||||
er.change_context(new_err)
|
||||
StorageError::DatabaseError(
|
||||
error_stack::report!(diesel_models::errors::DatabaseError::from(er))
|
||||
.attach_printable("Error filtering payment records"),
|
||||
)
|
||||
})
|
||||
.attach_printable_lazy(|| "Error filtering records by predicate")
|
||||
.into_report()
|
||||
}
|
||||
|
||||
#[cfg(feature = "olap")]
|
||||
@ -646,12 +647,13 @@ impl<T: DatabaseStore> PaymentIntentInterface for crate::RouterStore<T> {
|
||||
})
|
||||
.collect()
|
||||
})
|
||||
.into_report()
|
||||
.map_err(|er| {
|
||||
let new_er = StorageError::DatabaseError(format!("{er:?}"));
|
||||
er.change_context(new_er)
|
||||
StorageError::DatabaseError(
|
||||
error_stack::report!(diesel_models::errors::DatabaseError::from(er))
|
||||
.attach_printable("Error filtering payment records"),
|
||||
)
|
||||
})
|
||||
.attach_printable("Error filtering payment records")
|
||||
.into_report()
|
||||
}
|
||||
|
||||
#[cfg(feature = "olap")]
|
||||
@ -712,12 +714,13 @@ impl<T: DatabaseStore> PaymentIntentInterface for crate::RouterStore<T> {
|
||||
db_metrics::DatabaseOperation::Filter,
|
||||
)
|
||||
.await
|
||||
.into_report()
|
||||
.map_err(|er| {
|
||||
let new_err = StorageError::DatabaseError(format!("{er:?}"));
|
||||
er.change_context(new_err)
|
||||
StorageError::DatabaseError(
|
||||
error_stack::report!(diesel_models::errors::DatabaseError::from(er))
|
||||
.attach_printable("Error filtering payment records"),
|
||||
)
|
||||
})
|
||||
.attach_printable_lazy(|| "Error filtering records by predicate")
|
||||
.into_report()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user