refactor(router): make error_type generic in domain_models inorder to avoid conversion of errors in storage_impl (#7537)

Co-authored-by: hyperswitch-bot[bot] <148525504+hyperswitch-bot[bot]@users.noreply.github.com>
This commit is contained in:
Jagan
2025-03-21 17:13:19 +05:30
committed by GitHub
parent aedf460c70
commit 80218d0f27
38 changed files with 380 additions and 428 deletions

View File

@ -3,9 +3,11 @@ use bb8::CustomizeConnection;
use common_utils::DbConnectionParams;
use diesel::PgConnection;
use error_stack::ResultExt;
use hyperswitch_domain_models::errors::{StorageError, StorageResult};
use crate::config::{Database, TenantConfig};
use crate::{
config::{Database, TenantConfig},
errors::{StorageError, StorageResult},
};
pub type PgPool = bb8::Pool<async_bb8_diesel::ConnectionManager<PgConnection>>;
pub type PgPooledConn = async_bb8_diesel::Connection<PgConnection>;

View File

@ -1,12 +1,13 @@
pub use common_enums::{ApiClientError, ApplicationError, ApplicationResult};
use common_utils::errors::ErrorSwitch;
use hyperswitch_domain_models::errors::StorageError as DataStorageError;
pub use redis_interface::errors::RedisError;
use crate::store::errors::DatabaseError;
pub type StorageResult<T> = error_stack::Result<T, StorageError>;
#[derive(Debug, thiserror::Error)]
pub enum StorageError {
#[error("Initialization Error")]
InitializationError,
#[error("DatabaseError: {0:?}")]
DatabaseError(error_stack::Report<DatabaseError>),
#[error("ValueNotFound: {0}")]
@ -38,56 +39,6 @@ pub enum StorageError {
RedisError(error_stack::Report<RedisError>),
}
impl ErrorSwitch<DataStorageError> for StorageError {
fn switch(&self) -> DataStorageError {
self.into()
}
}
#[allow(clippy::from_over_into)]
impl Into<DataStorageError> for &StorageError {
fn into(self) -> DataStorageError {
match self {
StorageError::DatabaseError(i) => match i.current_context() {
DatabaseError::DatabaseConnectionError => DataStorageError::DatabaseConnectionError,
// TODO: Update this error type to encompass & propagate the missing type (instead of generic `db value not found`)
DatabaseError::NotFound => {
DataStorageError::ValueNotFound(String::from("db value not found"))
}
// TODO: Update this error type to encompass & propagate the duplicate type (instead of generic `db value not found`)
DatabaseError::UniqueViolation => DataStorageError::DuplicateValue {
entity: "db entity",
key: None,
},
err => DataStorageError::DatabaseError(error_stack::report!(*err)),
},
StorageError::ValueNotFound(i) => DataStorageError::ValueNotFound(i.clone()),
StorageError::DuplicateValue { entity, key } => DataStorageError::DuplicateValue {
entity,
key: key.clone(),
},
StorageError::DatabaseConnectionError => DataStorageError::DatabaseConnectionError,
StorageError::KVError => DataStorageError::KVError,
StorageError::SerializationFailed => DataStorageError::SerializationFailed,
StorageError::MockDbError => DataStorageError::MockDbError,
StorageError::KafkaError => DataStorageError::KafkaError,
StorageError::CustomerRedacted => DataStorageError::CustomerRedacted,
StorageError::DeserializationFailed => DataStorageError::DeserializationFailed,
StorageError::EncryptionError => DataStorageError::EncryptionError,
StorageError::DecryptionError => DataStorageError::DecryptionError,
StorageError::RedisError(i) => match i.current_context() {
// TODO: Update this error type to encompass & propagate the missing type (instead of generic `redis value not found`)
RedisError::NotFound => {
DataStorageError::ValueNotFound("redis value not found".to_string())
}
RedisError::JsonSerializationFailed => DataStorageError::SerializationFailed,
RedisError::JsonDeserializationFailed => DataStorageError::DeserializationFailed,
i => DataStorageError::RedisError(format!("{:?}", i)),
},
}
}
}
impl From<error_stack::Report<RedisError>> for StorageError {
fn from(err: error_stack::Report<RedisError>) -> Self {
Self::RedisError(err)
@ -128,22 +79,22 @@ impl StorageError {
pub trait RedisErrorExt {
#[track_caller]
fn to_redis_failed_response(self, key: &str) -> error_stack::Report<DataStorageError>;
fn to_redis_failed_response(self, key: &str) -> error_stack::Report<StorageError>;
}
impl RedisErrorExt for error_stack::Report<RedisError> {
fn to_redis_failed_response(self, key: &str) -> error_stack::Report<DataStorageError> {
fn to_redis_failed_response(self, key: &str) -> error_stack::Report<StorageError> {
match self.current_context() {
RedisError::NotFound => self.change_context(DataStorageError::ValueNotFound(format!(
RedisError::NotFound => self.change_context(StorageError::ValueNotFound(format!(
"Data does not exist for key {key}",
))),
RedisError::SetNxFailed | RedisError::SetAddMembersFailed => {
self.change_context(DataStorageError::DuplicateValue {
self.change_context(StorageError::DuplicateValue {
entity: "redis",
key: Some(key.to_string()),
})
}
_ => self.change_context(DataStorageError::KVError),
_ => self.change_context(StorageError::KVError),
}
}
}

View File

@ -6,7 +6,6 @@ use diesel_models::{errors::DatabaseError, kv};
use error_stack::ResultExt;
use hyperswitch_domain_models::{
behaviour::{Conversion, ReverseConversion},
errors::{self, StorageResult},
merchant_key_store::MerchantKeyStore,
};
#[cfg(not(feature = "payouts"))]
@ -22,7 +21,7 @@ use crate::{
config::TenantConfig,
database::store::PgPool,
diesel_error_to_data_error,
errors::RedisErrorExt,
errors::{self, RedisErrorExt, StorageResult},
lookup::ReverseLookupInterface,
metrics,
redis::kv_store::{
@ -65,6 +64,41 @@ pub struct FilterResourceParams<'a> {
pub limit: Option<i64>,
}
pub enum FindResourceBy<'a> {
Id(String, PartitionKey<'a>),
LookupId(String),
}
pub trait DomainType: Debug + Sync + Conversion {}
impl<T: Debug + Sync + Conversion> DomainType for T {}
/// Storage model with all required capabilities for KV operations
pub trait StorageModel<D: Conversion>:
de::DeserializeOwned
+ serde::Serialize
+ Debug
+ KvStorePartition
+ UniqueConstraints
+ Sync
+ Send
+ ReverseConversion<D>
{
}
impl<T, D> StorageModel<D> for T
where
T: de::DeserializeOwned
+ serde::Serialize
+ Debug
+ KvStorePartition
+ UniqueConstraints
+ Sync
+ Send
+ ReverseConversion<D>,
D: DomainType,
{
}
#[async_trait::async_trait]
impl<T> DatabaseStore for KVRouterStore<T>
where
@ -175,17 +209,11 @@ impl<T: DatabaseStore> KVRouterStore<T> {
key_store: &MerchantKeyStore,
storage_scheme: MerchantStorageScheme,
find_resource_db_fut: R,
lookup_id: String,
find_by: FindResourceBy<'_>,
) -> error_stack::Result<D, errors::StorageError>
where
D: Debug + Sync + Conversion,
M: de::DeserializeOwned
+ serde::Serialize
+ Debug
+ KvStorePartition
+ UniqueConstraints
+ Sync
+ ReverseConversion<D>,
D: DomainType,
M: StorageModel<D>,
R: futures::Future<Output = error_stack::Result<M, DatabaseError>> + Send,
{
let database_call = || async {
@ -204,19 +232,26 @@ impl<T: DatabaseStore> KVRouterStore<T> {
match storage_scheme {
MerchantStorageScheme::PostgresOnly => database_call().await,
MerchantStorageScheme::RedisKv => {
let lookup = fallback_reverse_lookup_not_found!(
self.get_lookup_by_lookup_id(&lookup_id, storage_scheme)
.await,
database_call().await
);
let key = PartitionKey::CombinationKey {
combination: &lookup.pk_id,
let (field, key) = match find_by {
FindResourceBy::Id(field, key) => (field, key),
FindResourceBy::LookupId(lookup_id) => {
let lookup = fallback_reverse_lookup_not_found!(
self.get_lookup_by_lookup_id(&lookup_id, storage_scheme)
.await,
database_call().await
);
(
lookup.clone().sk_id,
PartitionKey::CombinationKey {
combination: &lookup.clone().pk_id,
},
)
}
};
Box::pin(try_redis_get_else_try_database_get(
async {
Box::pin(kv_wrapper(self, KvOperation::<M>::HGet(&lookup.sk_id), key))
Box::pin(kv_wrapper(self, KvOperation::<M>::HGet(&field), key))
.await?
.try_into_hget()
},
@ -237,6 +272,80 @@ impl<T: DatabaseStore> KVRouterStore<T> {
.change_context(errors::StorageError::DecryptionError)
}
pub async fn find_optional_resource_by_id<D, R, M>(
&self,
state: &KeyManagerState,
key_store: &MerchantKeyStore,
storage_scheme: MerchantStorageScheme,
find_resource_db_fut: R,
find_by: FindResourceBy<'_>,
) -> error_stack::Result<Option<D>, errors::StorageError>
where
D: DomainType,
M: StorageModel<D>,
R: futures::Future<Output = error_stack::Result<Option<M>, DatabaseError>> + Send,
{
let database_call = || async {
find_resource_db_fut.await.map_err(|error| {
let new_err = diesel_error_to_data_error(*error.current_context());
error.change_context(new_err)
})
};
let storage_scheme = Box::pin(decide_storage_scheme::<T, M>(
self,
storage_scheme,
Op::Find,
))
.await;
let res = || async {
match storage_scheme {
MerchantStorageScheme::PostgresOnly => database_call().await,
MerchantStorageScheme::RedisKv => {
let (field, key) = match find_by {
FindResourceBy::Id(field, key) => (field, key),
FindResourceBy::LookupId(lookup_id) => {
let lookup = fallback_reverse_lookup_not_found!(
self.get_lookup_by_lookup_id(&lookup_id, storage_scheme)
.await,
database_call().await
);
(
lookup.clone().sk_id,
PartitionKey::CombinationKey {
combination: &lookup.clone().pk_id,
},
)
}
};
Box::pin(try_redis_get_else_try_database_get(
async {
Box::pin(kv_wrapper(self, KvOperation::<M>::HGet(&field), key))
.await?
.try_into_hget()
.map(Some)
},
database_call,
))
.await
}
}
};
match res().await? {
Some(resource) => Ok(Some(
resource
.convert(
state,
key_store.key.get_inner(),
key_store.merchant_id.clone().into(),
)
.await
.change_context(errors::StorageError::DecryptionError)?,
)),
None => Ok(None),
}
}
pub async fn insert_resource<D, R, M>(
&self,
state: &KeyManagerState,
@ -254,13 +363,7 @@ impl<T: DatabaseStore> KVRouterStore<T> {
) -> error_stack::Result<D, errors::StorageError>
where
D: Debug + Sync + Conversion,
M: de::DeserializeOwned
+ serde::Serialize
+ Debug
+ KvStorePartition
+ UniqueConstraints
+ Sync
+ ReverseConversion<D>,
M: StorageModel<D>,
R: futures::Future<Output = error_stack::Result<M, DatabaseError>> + Send,
{
let storage_scheme = Box::pin(decide_storage_scheme::<_, M>(
@ -336,13 +439,7 @@ impl<T: DatabaseStore> KVRouterStore<T> {
) -> error_stack::Result<D, errors::StorageError>
where
D: Debug + Sync + Conversion,
M: de::DeserializeOwned
+ serde::Serialize
+ Debug
+ KvStorePartition
+ UniqueConstraints
+ Sync
+ ReverseConversion<D>,
M: StorageModel<D>,
R: futures::Future<Output = error_stack::Result<M, DatabaseError>> + Send,
{
match operation {
@ -408,13 +505,7 @@ impl<T: DatabaseStore> KVRouterStore<T> {
) -> error_stack::Result<Vec<D>, errors::StorageError>
where
D: Debug + Sync + Conversion,
M: de::DeserializeOwned
+ serde::Serialize
+ Debug
+ KvStorePartition
+ UniqueConstraints
+ Sync
+ ReverseConversion<D>,
M: StorageModel<D>,
R: futures::Future<Output = error_stack::Result<Vec<M>, DatabaseError>> + Send,
{
let db_call = || async {

View File

@ -4,7 +4,6 @@ use diesel_models as store;
use error_stack::ResultExt;
use hyperswitch_domain_models::{
behaviour::{Conversion, ReverseConversion},
errors::{StorageError, StorageResult},
merchant_key_store::MerchantKeyStore,
};
use masking::StrongSecret;
@ -37,9 +36,9 @@ use hyperswitch_domain_models::{PayoutAttemptInterface, PayoutsInterface};
pub use mock_db::MockDb;
use redis_interface::{errors::RedisError, RedisConnectionPool, SaddReply};
pub use crate::database::store::DatabaseStore;
#[cfg(not(feature = "payouts"))]
pub use crate::database::store::Store;
pub use crate::{database::store::DatabaseStore, errors::StorageError};
#[derive(Debug, Clone)]
pub struct RouterStore<T: DatabaseStore> {
@ -65,7 +64,7 @@ where
config: Self::Config,
tenant_config: &dyn config::TenantConfig,
test_transaction: bool,
) -> StorageResult<Self> {
) -> error_stack::Result<Self, StorageError> {
let (db_conf, cache_conf, encryption_key, cache_error_signal, inmemory_cache_stream) =
config;
if test_transaction {
@ -113,7 +112,7 @@ impl<T: DatabaseStore> RouterStore<T> {
encryption_key: StrongSecret<Vec<u8>>,
cache_store: Arc<RedisStore>,
inmemory_cache_stream: &str,
) -> StorageResult<Self> {
) -> error_stack::Result<Self, StorageError> {
let db_store = T::new(db_conf, tenant_config, false).await?;
let redis_conn = cache_store.redis_conn.clone();
let cache_store = Arc::new(RedisStore {
@ -140,7 +139,7 @@ impl<T: DatabaseStore> RouterStore<T> {
pub async fn cache_store(
cache_conf: &redis_interface::RedisSettings,
cache_error_signal: tokio::sync::oneshot::Sender<()>,
) -> StorageResult<Arc<RedisStore>> {
) -> error_stack::Result<Arc<RedisStore>, StorageError> {
let cache_store = RedisStore::new(cache_conf)
.await
.change_context(StorageError::InitializationError)
@ -225,7 +224,7 @@ impl<T: DatabaseStore> RouterStore<T> {
tenant_config: &dyn config::TenantConfig,
cache_conf: &redis_interface::RedisSettings,
encryption_key: StrongSecret<Vec<u8>>,
) -> StorageResult<Self> {
) -> error_stack::Result<Self, StorageError> {
// TODO: create an error enum and return proper error here
let db_store = T::new(db_conf, tenant_config, true).await?;
let cache_store = RedisStore::new(cache_conf)

View File

@ -6,12 +6,11 @@ use diesel_models::{
},
};
use error_stack::ResultExt;
use hyperswitch_domain_models::errors;
use redis_interface::SetnxReply;
use crate::{
diesel_error_to_data_error,
errors::RedisErrorExt,
errors::{self, RedisErrorExt},
kv_router_store::KVRouterStore,
redis::kv_store::{decide_storage_scheme, kv_wrapper, KvOperation, Op, PartitionKey},
utils::{self, try_redis_get_else_try_database_get},

View File

@ -6,13 +6,12 @@ use error_stack::ResultExt;
use futures::lock::{Mutex, MutexGuard};
use hyperswitch_domain_models::{
behaviour::{Conversion, ReverseConversion},
errors::StorageError,
merchant_key_store::MerchantKeyStore,
payments::{payment_attempt::PaymentAttempt, PaymentIntent},
};
use redis_interface::RedisSettings;
use crate::redis::RedisStore;
use crate::{errors::StorageError, redis::RedisStore};
pub mod payment_attempt;
pub mod payment_intent;

View File

@ -6,17 +6,18 @@ use diesel_models::enums as storage_enums;
use hyperswitch_domain_models::merchant_key_store::MerchantKeyStore;
#[cfg(feature = "v1")]
use hyperswitch_domain_models::payments::payment_attempt::PaymentAttemptNew;
use hyperswitch_domain_models::{
errors::StorageError,
payments::payment_attempt::{PaymentAttempt, PaymentAttemptInterface, PaymentAttemptUpdate},
use hyperswitch_domain_models::payments::payment_attempt::{
PaymentAttempt, PaymentAttemptInterface, PaymentAttemptUpdate,
};
use super::MockDb;
use crate::errors::StorageError;
#[cfg(feature = "v1")]
use crate::DataModelExt;
#[async_trait::async_trait]
impl PaymentAttemptInterface for MockDb {
type Error = StorageError;
#[cfg(feature = "v1")]
async fn find_payment_attempt_by_payment_id_merchant_id_attempt_id(
&self,

View File

@ -3,7 +3,6 @@ use diesel_models::enums as storage_enums;
use error_stack::ResultExt;
use hyperswitch_domain_models::{
behaviour::Conversion,
errors::StorageError,
merchant_key_store::MerchantKeyStore,
payments::{
payment_intent::{PaymentIntentInterface, PaymentIntentUpdate},
@ -12,9 +11,11 @@ use hyperswitch_domain_models::{
};
use super::MockDb;
use crate::errors::StorageError;
#[async_trait::async_trait]
impl PaymentIntentInterface for MockDb {
type Error = StorageError;
#[cfg(all(feature = "v1", feature = "olap"))]
async fn filter_payment_intent_by_constraints(
&self,

View File

@ -1,19 +1,18 @@
use common_utils::errors::CustomResult;
use diesel_models::enums as storage_enums;
use hyperswitch_domain_models::{
errors::StorageError,
payouts::{
payout_attempt::{
PayoutAttempt, PayoutAttemptInterface, PayoutAttemptNew, PayoutAttemptUpdate,
},
payouts::Payouts,
use hyperswitch_domain_models::payouts::{
payout_attempt::{
PayoutAttempt, PayoutAttemptInterface, PayoutAttemptNew, PayoutAttemptUpdate,
},
payouts::Payouts,
};
use super::MockDb;
use crate::errors::StorageError;
#[async_trait::async_trait]
impl PayoutAttemptInterface for MockDb {
type Error = StorageError;
async fn update_payout_attempt(
&self,
_this: &PayoutAttempt,

View File

@ -1,17 +1,15 @@
use common_utils::errors::CustomResult;
use diesel_models::enums as storage_enums;
use hyperswitch_domain_models::{
errors::StorageError,
payouts::{
payout_attempt::PayoutAttempt,
payouts::{Payouts, PayoutsInterface, PayoutsNew, PayoutsUpdate},
},
use hyperswitch_domain_models::payouts::{
payout_attempt::PayoutAttempt,
payouts::{Payouts, PayoutsInterface, PayoutsNew, PayoutsUpdate},
};
use super::MockDb;
use crate::{errors::StorageError, MockDb};
#[async_trait::async_trait]
impl PayoutsInterface for MockDb {
type Error = StorageError;
async fn find_payout_by_merchant_id_payout_id(
&self,
_merchant_id: &common_utils::id_type::MerchantId,

View File

@ -13,7 +13,6 @@ use diesel_models::{
use error_stack::ResultExt;
use hyperswitch_domain_models::{
behaviour::{Conversion, ReverseConversion},
errors,
merchant_key_store::MerchantKeyStore,
payment_methods::{PaymentMethod as DomainPaymentMethod, PaymentMethodInterface},
};
@ -21,9 +20,10 @@ use router_env::{instrument, tracing};
use super::MockDb;
use crate::{
diesel_error_to_data_error,
diesel_error_to_data_error, errors,
kv_router_store::{
FilterResourceParams, InsertResourceParams, KVRouterStore, UpdateResourceParams,
FilterResourceParams, FindResourceBy, InsertResourceParams, KVRouterStore,
UpdateResourceParams,
},
redis::kv_store::{Op, PartitionKey},
utils::{pg_connection_read, pg_connection_write},
@ -32,6 +32,7 @@ use crate::{
#[async_trait::async_trait]
impl<T: DatabaseStore> PaymentMethodInterface for KVRouterStore<T> {
type Error = errors::StorageError;
#[cfg(all(
any(feature = "v1", feature = "v2"),
not(feature = "payment_methods_v2")
@ -50,7 +51,7 @@ impl<T: DatabaseStore> PaymentMethodInterface for KVRouterStore<T> {
key_store,
storage_scheme,
PaymentMethod::find_by_payment_method_id(&conn, payment_method_id),
format!("payment_method_{}", payment_method_id),
FindResourceBy::LookupId(format!("payment_method_{}", payment_method_id)),
)
.await
}
@ -70,7 +71,10 @@ impl<T: DatabaseStore> PaymentMethodInterface for KVRouterStore<T> {
key_store,
storage_scheme,
PaymentMethod::find_by_id(&conn, payment_method_id),
format!("payment_method_{}", payment_method_id.get_string_repr()),
FindResourceBy::LookupId(format!(
"payment_method_{}",
payment_method_id.get_string_repr()
)),
)
.await
}
@ -93,7 +97,7 @@ impl<T: DatabaseStore> PaymentMethodInterface for KVRouterStore<T> {
key_store,
storage_scheme,
PaymentMethod::find_by_locker_id(&conn, locker_id),
format!("payment_method_locker_{}", locker_id),
FindResourceBy::LookupId(format!("payment_method_locker_{}", locker_id)),
)
.await
}
@ -423,6 +427,7 @@ impl<T: DatabaseStore> PaymentMethodInterface for KVRouterStore<T> {
#[async_trait::async_trait]
impl<T: DatabaseStore> PaymentMethodInterface for RouterStore<T> {
type Error = errors::StorageError;
#[instrument(skip_all)]
#[cfg(all(
any(feature = "v1", feature = "v2"),
@ -751,6 +756,7 @@ impl<T: DatabaseStore> PaymentMethodInterface for RouterStore<T> {
#[async_trait::async_trait]
impl PaymentMethodInterface for MockDb {
type Error = errors::StorageError;
#[cfg(all(
any(feature = "v1", feature = "v2"),
not(feature = "payment_methods_v2")

View File

@ -25,7 +25,6 @@ use hyperswitch_domain_models::{
merchant_key_store::MerchantKeyStore,
};
use hyperswitch_domain_models::{
errors,
mandates::{MandateAmountData, MandateDataType, MandateDetails},
payments::payment_attempt::{PaymentAttempt, PaymentAttemptInterface, PaymentAttemptUpdate},
};
@ -38,7 +37,7 @@ use router_env::{instrument, tracing};
use crate::{
diesel_error_to_data_error,
errors::RedisErrorExt,
errors::{self, RedisErrorExt},
kv_router_store::KVRouterStore,
lookup::ReverseLookupInterface,
redis::kv_store::{decide_storage_scheme, kv_wrapper, KvOperation, Op, PartitionKey},
@ -48,6 +47,7 @@ use crate::{
#[async_trait::async_trait]
impl<T: DatabaseStore> PaymentAttemptInterface for RouterStore<T> {
type Error = errors::StorageError;
#[cfg(feature = "v1")]
#[instrument(skip_all)]
async fn insert_payment_attempt(
@ -550,6 +550,7 @@ impl<T: DatabaseStore> PaymentAttemptInterface for RouterStore<T> {
#[async_trait::async_trait]
impl<T: DatabaseStore> PaymentAttemptInterface for KVRouterStore<T> {
type Error = errors::StorageError;
#[cfg(feature = "v1")]
#[instrument(skip_all)]
async fn insert_payment_attempt(

View File

@ -2,8 +2,6 @@
use api_models::payments::{AmountFilter, Order, SortBy, SortOn};
#[cfg(feature = "olap")]
use async_bb8_diesel::{AsyncConnection, AsyncRunQueryDsl};
#[cfg(feature = "olap")]
use common_utils::errors::ReportSwitchExt;
use common_utils::{
ext_traits::{AsyncExt, Encode},
types::keymanager::KeyManagerState,
@ -34,7 +32,6 @@ use hyperswitch_domain_models::payments::{
};
use hyperswitch_domain_models::{
behaviour::Conversion,
errors::StorageError,
merchant_key_store::MerchantKeyStore,
payments::{
payment_intent::{PaymentIntentInterface, PaymentIntentUpdate},
@ -50,7 +47,7 @@ use router_env::{instrument, tracing};
use crate::connection;
use crate::{
diesel_error_to_data_error,
errors::RedisErrorExt,
errors::{RedisErrorExt, StorageError},
kv_router_store::KVRouterStore,
redis::kv_store::{decide_storage_scheme, kv_wrapper, KvOperation, Op, PartitionKey},
utils::{self, pg_connection_read, pg_connection_write},
@ -59,6 +56,7 @@ use crate::{
#[async_trait::async_trait]
impl<T: DatabaseStore> PaymentIntentInterface for KVRouterStore<T> {
type Error = StorageError;
#[cfg(feature = "v1")]
async fn insert_payment_intent(
&self,
@ -528,6 +526,7 @@ impl<T: DatabaseStore> PaymentIntentInterface for KVRouterStore<T> {
#[async_trait::async_trait]
impl<T: DatabaseStore> PaymentIntentInterface for crate::RouterStore<T> {
type Error = StorageError;
#[instrument(skip_all)]
async fn insert_payment_intent(
&self,
@ -730,10 +729,9 @@ impl<T: DatabaseStore> PaymentIntentInterface for crate::RouterStore<T> {
merchant_key_store: &MerchantKeyStore,
storage_scheme: MerchantStorageScheme,
) -> error_stack::Result<Vec<PaymentIntent>, StorageError> {
use common_utils::errors::ReportSwitchExt;
use futures::{future::try_join_all, FutureExt};
let conn = connection::pg_connection_read(self).await.switch()?;
let conn = connection::pg_connection_read(self).await?;
let conn = async_bb8_diesel::Connection::as_async_conn(&conn);
//[#350]: Replace this with Boxable Expression and pass it into generic filter
@ -876,7 +874,7 @@ impl<T: DatabaseStore> PaymentIntentInterface for crate::RouterStore<T> {
profile_id_list: Option<Vec<common_utils::id_type::ProfileId>>,
time_range: &common_utils::types::TimeRange,
) -> error_stack::Result<Vec<(common_enums::IntentStatus, i64)>, StorageError> {
let conn = connection::pg_connection_read(self).await.switch()?;
let conn = connection::pg_connection_read(self).await?;
let conn = async_bb8_diesel::Connection::as_async_conn(&conn);
let mut query = <DieselPaymentIntent as HasTable>::table()
@ -926,7 +924,7 @@ impl<T: DatabaseStore> PaymentIntentInterface for crate::RouterStore<T> {
use crate::DataModelExt;
let conn = connection::pg_connection_read(self).await.switch()?;
let conn = connection::pg_connection_read(self).await?;
let conn = async_bb8_diesel::Connection::as_async_conn(&conn);
let mut query = DieselPaymentIntent::table()
.filter(pi_dsl::merchant_id.eq(merchant_id.to_owned()))
@ -1140,7 +1138,7 @@ impl<T: DatabaseStore> PaymentIntentInterface for crate::RouterStore<T> {
use crate::DataModelExt;
let conn = connection::pg_connection_read(self).await.switch()?;
let conn = connection::pg_connection_read(self).await?;
let conn = async_bb8_diesel::Connection::as_async_conn(&conn);
let mut query = DieselPaymentIntent::table()
.filter(pi_dsl::merchant_id.eq(merchant_id.to_owned()))
@ -1343,7 +1341,7 @@ impl<T: DatabaseStore> PaymentIntentInterface for crate::RouterStore<T> {
constraints: &PaymentIntentFetchConstraints,
_storage_scheme: MerchantStorageScheme,
) -> error_stack::Result<Vec<Option<String>>, StorageError> {
let conn = connection::pg_connection_read(self).await.switch()?;
let conn = connection::pg_connection_read(self).await?;
let conn = async_bb8_diesel::Connection::as_async_conn(&conn);
let mut query = DieselPaymentIntent::table()
.select(pi_dsl::active_attempt_id)
@ -1430,7 +1428,7 @@ impl<T: DatabaseStore> PaymentIntentInterface for crate::RouterStore<T> {
constraints: &PaymentIntentFetchConstraints,
_storage_scheme: MerchantStorageScheme,
) -> error_stack::Result<Vec<String>, StorageError> {
let conn = connection::pg_connection_read(self).await.switch()?;
let conn = connection::pg_connection_read(self).await?;
let conn = async_bb8_diesel::Connection::as_async_conn(&conn);
let mut query = DieselPaymentIntent::table()
.select(pi_dsl::active_attempt_id)

View File

@ -13,21 +13,18 @@ use diesel_models::{
ReverseLookupNew,
};
use error_stack::ResultExt;
use hyperswitch_domain_models::{
errors,
payouts::{
payout_attempt::{
PayoutAttempt, PayoutAttemptInterface, PayoutAttemptNew, PayoutAttemptUpdate,
PayoutListFilters,
},
payouts::Payouts,
use hyperswitch_domain_models::payouts::{
payout_attempt::{
PayoutAttempt, PayoutAttemptInterface, PayoutAttemptNew, PayoutAttemptUpdate,
PayoutListFilters,
},
payouts::Payouts,
};
use redis_interface::HsetnxReply;
use router_env::{instrument, logger, tracing};
use crate::{
diesel_error_to_data_error,
diesel_error_to_data_error, errors,
errors::RedisErrorExt,
kv_router_store::KVRouterStore,
lookup::ReverseLookupInterface,
@ -38,6 +35,7 @@ use crate::{
#[async_trait::async_trait]
impl<T: DatabaseStore> PayoutAttemptInterface for KVRouterStore<T> {
type Error = errors::StorageError;
#[instrument(skip_all)]
async fn insert_payout_attempt(
&self,
@ -384,6 +382,7 @@ impl<T: DatabaseStore> PayoutAttemptInterface for KVRouterStore<T> {
#[async_trait::async_trait]
impl<T: DatabaseStore> PayoutAttemptInterface for crate::RouterStore<T> {
type Error = errors::StorageError;
#[instrument(skip_all)]
async fn insert_payout_attempt(
&self,

View File

@ -2,8 +2,6 @@
use api_models::enums::PayoutConnectors;
#[cfg(feature = "olap")]
use async_bb8_diesel::{AsyncConnection, AsyncRunQueryDsl};
#[cfg(feature = "olap")]
use common_utils::errors::ReportSwitchExt;
use common_utils::ext_traits::Encode;
#[cfg(all(
feature = "olap",
@ -43,12 +41,9 @@ use diesel_models::{
use error_stack::ResultExt;
#[cfg(feature = "olap")]
use hyperswitch_domain_models::payouts::PayoutFetchConstraints;
use hyperswitch_domain_models::{
errors::StorageError,
payouts::{
payout_attempt::PayoutAttempt,
payouts::{Payouts, PayoutsInterface, PayoutsNew, PayoutsUpdate},
},
use hyperswitch_domain_models::payouts::{
payout_attempt::PayoutAttempt,
payouts::{Payouts, PayoutsInterface, PayoutsNew, PayoutsUpdate},
};
use redis_interface::HsetnxReply;
#[cfg(feature = "olap")]
@ -68,7 +63,7 @@ use crate::store::schema::{
};
use crate::{
diesel_error_to_data_error,
errors::RedisErrorExt,
errors::{RedisErrorExt, StorageError},
kv_router_store::KVRouterStore,
redis::kv_store::{decide_storage_scheme, kv_wrapper, KvOperation, Op, PartitionKey},
utils::{self, pg_connection_read, pg_connection_write},
@ -77,6 +72,7 @@ use crate::{
#[async_trait::async_trait]
impl<T: DatabaseStore> PayoutsInterface for KVRouterStore<T> {
type Error = StorageError;
#[instrument(skip_all)]
async fn insert_payout(
&self,
@ -413,6 +409,7 @@ impl<T: DatabaseStore> PayoutsInterface for KVRouterStore<T> {
#[async_trait::async_trait]
impl<T: DatabaseStore> PayoutsInterface for crate::RouterStore<T> {
type Error = StorageError;
#[instrument(skip_all)]
async fn insert_payout(
&self,
@ -492,7 +489,7 @@ impl<T: DatabaseStore> PayoutsInterface for crate::RouterStore<T> {
filters: &PayoutFetchConstraints,
storage_scheme: MerchantStorageScheme,
) -> error_stack::Result<Vec<Payouts>, StorageError> {
let conn = connection::pg_connection_read(self).await.switch()?;
let conn = connection::pg_connection_read(self).await?;
let conn = async_bb8_diesel::Connection::as_async_conn(&conn);
//[#350]: Replace this with Boxable Expression and pass it into generic filter
@ -606,9 +603,7 @@ impl<T: DatabaseStore> PayoutsInterface for crate::RouterStore<T> {
)>,
StorageError,
> {
use common_utils::errors::ReportSwitchExt;
let conn = connection::pg_connection_read(self).await.switch()?;
let conn = connection::pg_connection_read(self).await?;
let conn = async_bb8_diesel::Connection::as_async_conn(&conn);
let mut query = DieselPayouts::table()
.inner_join(
@ -831,7 +826,7 @@ impl<T: DatabaseStore> PayoutsInterface for crate::RouterStore<T> {
merchant_id: &common_utils::id_type::MerchantId,
constraints: &PayoutFetchConstraints,
) -> error_stack::Result<Vec<String>, StorageError> {
let conn = connection::pg_connection_read(self).await.switch()?;
let conn = connection::pg_connection_read(self).await?;
let conn = async_bb8_diesel::Connection::as_async_conn(&conn);
let mut query = DieselPayouts::table()
.inner_join(

View File

@ -1,9 +1,11 @@
use bb8::PooledConnection;
use diesel::PgConnection;
use error_stack::ResultExt;
use hyperswitch_domain_models::errors::StorageError;
use crate::{errors::RedisErrorExt, metrics, DatabaseStore};
use crate::{
errors::{RedisErrorExt, StorageError},
metrics, DatabaseStore,
};
pub async fn pg_connection_read<T: DatabaseStore>(
store: &T,