refactor(pm): create new crate for payment methods (#7355)

Co-authored-by: hyperswitch-bot[bot] <148525504+hyperswitch-bot[bot]@users.noreply.github.com>
This commit is contained in:
Jagan
2025-03-12 03:10:53 +05:30
committed by GitHub
parent 5cdfe83254
commit a31d1403ee
32 changed files with 2076 additions and 2098 deletions

View File

@ -0,0 +1,462 @@
use std::{fmt::Debug, sync::Arc};
use common_enums::enums::MerchantStorageScheme;
use common_utils::{fallback_reverse_lookup_not_found, types::keymanager::KeyManagerState};
use diesel_models::{errors::DatabaseError, kv};
use error_stack::ResultExt;
use hyperswitch_domain_models::{
behaviour::{Conversion, ReverseConversion},
errors::{self, StorageResult},
merchant_key_store::MerchantKeyStore,
};
#[cfg(not(feature = "payouts"))]
use hyperswitch_domain_models::{PayoutAttemptInterface, PayoutsInterface};
use masking::StrongSecret;
use redis_interface::{errors::RedisError, types::HsetnxReply, RedisConnectionPool};
use router_env::logger;
use serde::de;
#[cfg(not(feature = "payouts"))]
pub use crate::database::store::Store;
use crate::{
config::TenantConfig,
database::store::PgPool,
diesel_error_to_data_error,
errors::RedisErrorExt,
lookup::ReverseLookupInterface,
metrics,
redis::kv_store::{
decide_storage_scheme, kv_wrapper, KvOperation, KvStorePartition, Op, PartitionKey,
RedisConnInterface,
},
utils::{find_all_combined_kv_database, try_redis_get_else_try_database_get},
RouterStore, UniqueConstraints,
};
pub use crate::{database::store::DatabaseStore, mock_db::MockDb};
#[derive(Debug, Clone)]
pub struct KVRouterStore<T: DatabaseStore> {
pub router_store: RouterStore<T>,
drainer_stream_name: String,
drainer_num_partitions: u8,
pub ttl_for_kv: u32,
pub request_id: Option<String>,
pub soft_kill_mode: bool,
}
pub struct InsertResourceParams<'a> {
pub insertable: kv::Insertable,
pub reverse_lookups: Vec<String>,
pub key: PartitionKey<'a>,
// secondary key
pub identifier: String,
// type of resource Eg: "payment_attempt"
pub resource_type: &'static str,
}
pub struct UpdateResourceParams<'a> {
pub updateable: kv::Updateable,
pub operation: Op<'a>,
}
pub struct FilterResourceParams<'a> {
pub key: PartitionKey<'a>,
pub pattern: &'static str,
pub limit: Option<i64>,
}
#[async_trait::async_trait]
impl<T> DatabaseStore for KVRouterStore<T>
where
RouterStore<T>: DatabaseStore,
T: DatabaseStore,
{
type Config = (RouterStore<T>, String, u8, u32, Option<bool>);
async fn new(
config: Self::Config,
tenant_config: &dyn TenantConfig,
_test_transaction: bool,
) -> StorageResult<Self> {
let (router_store, _, drainer_num_partitions, ttl_for_kv, soft_kill_mode) = config;
let drainer_stream_name = format!("{}_{}", tenant_config.get_schema(), config.1);
Ok(Self::from_store(
router_store,
drainer_stream_name,
drainer_num_partitions,
ttl_for_kv,
soft_kill_mode,
))
}
fn get_master_pool(&self) -> &PgPool {
self.router_store.get_master_pool()
}
fn get_replica_pool(&self) -> &PgPool {
self.router_store.get_replica_pool()
}
fn get_accounts_master_pool(&self) -> &PgPool {
self.router_store.get_accounts_master_pool()
}
fn get_accounts_replica_pool(&self) -> &PgPool {
self.router_store.get_accounts_replica_pool()
}
}
impl<T: DatabaseStore> RedisConnInterface for KVRouterStore<T> {
fn get_redis_conn(&self) -> error_stack::Result<Arc<RedisConnectionPool>, RedisError> {
self.router_store.get_redis_conn()
}
}
impl<T: DatabaseStore> KVRouterStore<T> {
pub fn from_store(
store: RouterStore<T>,
drainer_stream_name: String,
drainer_num_partitions: u8,
ttl_for_kv: u32,
soft_kill: Option<bool>,
) -> Self {
let request_id = store.request_id.clone();
Self {
router_store: store,
drainer_stream_name,
drainer_num_partitions,
ttl_for_kv,
request_id,
soft_kill_mode: soft_kill.unwrap_or(false),
}
}
pub fn master_key(&self) -> &StrongSecret<Vec<u8>> {
self.router_store.master_key()
}
pub fn get_drainer_stream_name(&self, shard_key: &str) -> String {
format!("{{{}}}_{}", shard_key, self.drainer_stream_name)
}
pub async fn push_to_drainer_stream<R>(
&self,
redis_entry: kv::TypedSql,
partition_key: PartitionKey<'_>,
) -> error_stack::Result<(), RedisError>
where
R: KvStorePartition,
{
let global_id = format!("{}", partition_key);
let request_id = self.request_id.clone().unwrap_or_default();
let shard_key = R::shard_key(partition_key, self.drainer_num_partitions);
let stream_name = self.get_drainer_stream_name(&shard_key);
self.router_store
.cache_store
.redis_conn
.stream_append_entry(
&stream_name.into(),
&redis_interface::RedisEntryId::AutoGeneratedID,
redis_entry
.to_field_value_pairs(request_id, global_id)
.change_context(RedisError::JsonSerializationFailed)?,
)
.await
.map(|_| metrics::KV_PUSHED_TO_DRAINER.add(1, &[]))
.inspect_err(|error| {
metrics::KV_FAILED_TO_PUSH_TO_DRAINER.add(1, &[]);
logger::error!(?error, "Failed to add entry in drainer stream");
})
.change_context(RedisError::StreamAppendFailed)
}
pub async fn find_resource_by_id<D, R, M>(
&self,
state: &KeyManagerState,
key_store: &MerchantKeyStore,
storage_scheme: MerchantStorageScheme,
find_resource_db_fut: R,
lookup_id: String,
) -> error_stack::Result<D, errors::StorageError>
where
D: Debug + Sync + Conversion,
M: de::DeserializeOwned
+ serde::Serialize
+ Debug
+ KvStorePartition
+ UniqueConstraints
+ Sync
+ ReverseConversion<D>,
R: futures::Future<Output = error_stack::Result<M, DatabaseError>> + Send,
{
let database_call = || async {
find_resource_db_fut.await.map_err(|error| {
let new_err = diesel_error_to_data_error(*error.current_context());
error.change_context(new_err)
})
};
let storage_scheme = Box::pin(decide_storage_scheme::<T, M>(
self,
storage_scheme,
Op::Find,
))
.await;
let res = || async {
match storage_scheme {
MerchantStorageScheme::PostgresOnly => database_call().await,
MerchantStorageScheme::RedisKv => {
let lookup = fallback_reverse_lookup_not_found!(
self.get_lookup_by_lookup_id(&lookup_id, storage_scheme)
.await,
database_call().await
);
let key = PartitionKey::CombinationKey {
combination: &lookup.pk_id,
};
Box::pin(try_redis_get_else_try_database_get(
async {
Box::pin(kv_wrapper(self, KvOperation::<M>::HGet(&lookup.sk_id), key))
.await?
.try_into_hget()
},
database_call,
))
.await
}
}
};
res()
.await?
.convert(
state,
key_store.key.get_inner(),
key_store.merchant_id.clone().into(),
)
.await
.change_context(errors::StorageError::DecryptionError)
}
pub async fn insert_resource<D, R, M>(
&self,
state: &KeyManagerState,
key_store: &MerchantKeyStore,
storage_scheme: MerchantStorageScheme,
create_resource_fut: R,
resource_new: M,
InsertResourceParams {
insertable,
reverse_lookups,
key,
identifier,
resource_type,
}: InsertResourceParams<'_>,
) -> error_stack::Result<D, errors::StorageError>
where
D: Debug + Sync + Conversion,
M: de::DeserializeOwned
+ serde::Serialize
+ Debug
+ KvStorePartition
+ UniqueConstraints
+ Sync
+ ReverseConversion<D>,
R: futures::Future<Output = error_stack::Result<M, DatabaseError>> + Send,
{
let storage_scheme = Box::pin(decide_storage_scheme::<_, M>(
self,
storage_scheme,
Op::Insert,
))
.await;
match storage_scheme {
MerchantStorageScheme::PostgresOnly => create_resource_fut.await.map_err(|error| {
let new_err = diesel_error_to_data_error(*error.current_context());
error.change_context(new_err)
}),
MerchantStorageScheme::RedisKv => {
let key_str = key.to_string();
let reverse_lookup_entry = |v: String| diesel_models::ReverseLookupNew {
sk_id: identifier.clone(),
pk_id: key_str.clone(),
lookup_id: v,
source: resource_type.to_string(),
updated_by: storage_scheme.to_string(),
};
let results = reverse_lookups
.into_iter()
.map(|v| self.insert_reverse_lookup(reverse_lookup_entry(v), storage_scheme));
futures::future::try_join_all(results).await?;
let redis_entry = kv::TypedSql {
op: kv::DBOperation::Insert {
insertable: Box::new(insertable),
},
};
match Box::pin(kv_wrapper::<M, _, _>(
self,
KvOperation::<M>::HSetNx(&identifier, &resource_new, redis_entry),
key.clone(),
))
.await
.map_err(|err| err.to_redis_failed_response(&key.to_string()))?
.try_into_hsetnx()
{
Ok(HsetnxReply::KeyNotSet) => Err(errors::StorageError::DuplicateValue {
entity: resource_type,
key: Some(key_str),
}
.into()),
Ok(HsetnxReply::KeySet) => Ok(resource_new),
Err(er) => Err(er).change_context(errors::StorageError::KVError),
}
}
}?
.convert(
state,
key_store.key.get_inner(),
key_store.merchant_id.clone().into(),
)
.await
.change_context(errors::StorageError::DecryptionError)
}
pub async fn update_resource<D, R, M>(
&self,
state: &KeyManagerState,
key_store: &MerchantKeyStore,
storage_scheme: MerchantStorageScheme,
update_resource_fut: R,
updated_resource: M,
UpdateResourceParams {
updateable,
operation,
}: UpdateResourceParams<'_>,
) -> error_stack::Result<D, errors::StorageError>
where
D: Debug + Sync + Conversion,
M: de::DeserializeOwned
+ serde::Serialize
+ Debug
+ KvStorePartition
+ UniqueConstraints
+ Sync
+ ReverseConversion<D>,
R: futures::Future<Output = error_stack::Result<M, DatabaseError>> + Send,
{
match operation {
Op::Update(key, field, updated_by) => {
let storage_scheme = Box::pin(decide_storage_scheme::<_, M>(
self,
storage_scheme,
Op::Update(key.clone(), field, updated_by),
))
.await;
match storage_scheme {
MerchantStorageScheme::PostgresOnly => {
update_resource_fut.await.map_err(|error| {
let new_err = diesel_error_to_data_error(*error.current_context());
error.change_context(new_err)
})
}
MerchantStorageScheme::RedisKv => {
let key_str = key.to_string();
let redis_value = serde_json::to_string(&updated_resource)
.change_context(errors::StorageError::SerializationFailed)?;
let redis_entry = kv::TypedSql {
op: kv::DBOperation::Update {
updatable: Box::new(updateable),
},
};
Box::pin(kv_wrapper::<(), _, _>(
self,
KvOperation::<M>::Hset((field, redis_value), redis_entry),
key,
))
.await
.map_err(|err| err.to_redis_failed_response(&key_str))?
.try_into_hset()
.change_context(errors::StorageError::KVError)?;
Ok(updated_resource)
}
}
}
_ => Err(errors::StorageError::KVError.into()),
}?
.convert(
state,
key_store.key.get_inner(),
key_store.merchant_id.clone().into(),
)
.await
.change_context(errors::StorageError::DecryptionError)
}
pub async fn filter_resources<D, R, M>(
&self,
state: &KeyManagerState,
key_store: &MerchantKeyStore,
storage_scheme: MerchantStorageScheme,
filter_resource_db_fut: R,
filter_fn: impl Fn(&M) -> bool,
FilterResourceParams {
key,
pattern,
limit,
}: FilterResourceParams<'_>,
) -> error_stack::Result<Vec<D>, errors::StorageError>
where
D: Debug + Sync + Conversion,
M: de::DeserializeOwned
+ serde::Serialize
+ Debug
+ KvStorePartition
+ UniqueConstraints
+ Sync
+ ReverseConversion<D>,
R: futures::Future<Output = error_stack::Result<Vec<M>, DatabaseError>> + Send,
{
let db_call = || async {
filter_resource_db_fut.await.map_err(|error| {
let new_err = diesel_error_to_data_error(*error.current_context());
error.change_context(new_err)
})
};
let resources = match storage_scheme {
MerchantStorageScheme::PostgresOnly => db_call().await,
MerchantStorageScheme::RedisKv => {
let redis_fut = async {
let kv_result = Box::pin(kv_wrapper::<M, _, _>(
self,
KvOperation::<M>::Scan(pattern),
key,
))
.await?
.try_into_scan();
kv_result.map(|records| records.into_iter().filter(filter_fn).collect())
};
Box::pin(find_all_combined_kv_database(redis_fut, db_call, limit)).await
}
}?;
let resource_futures = resources
.into_iter()
.map(|pm| async {
pm.convert(
state,
key_store.key.get_inner(),
key_store.merchant_id.clone().into(),
)
.await
.change_context(errors::StorageError::DecryptionError)
})
.collect::<Vec<_>>();
futures::future::try_join_all(resource_futures).await
}
}
#[cfg(not(feature = "payouts"))]
impl<T: DatabaseStore> PayoutAttemptInterface for KVRouterStore<T> {}
#[cfg(not(feature = "payouts"))]
impl<T: DatabaseStore> PayoutsInterface for KVRouterStore<T> {}

View File

@ -1,8 +1,12 @@
use std::sync::Arc;
use std::{fmt::Debug, sync::Arc};
use diesel_models as store;
use error_stack::ResultExt;
use hyperswitch_domain_models::errors::{StorageError, StorageResult};
use hyperswitch_domain_models::{
behaviour::{Conversion, ReverseConversion},
errors::{StorageError, StorageResult},
merchant_key_store::MerchantKeyStore,
};
use masking::StrongSecret;
use redis::{kv_store::RedisConnInterface, pub_sub::PubSubInterface, RedisStore};
mod address;
@ -12,7 +16,8 @@ pub mod connection;
pub mod customers;
pub mod database;
pub mod errors;
mod lookup;
pub mod kv_router_store;
pub mod lookup;
pub mod mandate;
pub mod metrics;
pub mod mock_db;
@ -23,15 +28,14 @@ pub mod payouts;
pub mod redis;
pub mod refund;
mod reverse_lookup;
mod utils;
pub mod utils;
use common_utils::errors::CustomResult;
use common_utils::{errors::CustomResult, types::keymanager::KeyManagerState};
use database::store::PgPool;
#[cfg(not(feature = "payouts"))]
use hyperswitch_domain_models::{PayoutAttemptInterface, PayoutsInterface};
pub use mock_db::MockDb;
use redis_interface::{errors::RedisError, RedisConnectionPool, SaddReply};
use router_env::logger;
pub use crate::database::store::DatabaseStore;
#[cfg(not(feature = "payouts"))]
@ -149,6 +153,70 @@ impl<T: DatabaseStore> RouterStore<T> {
&self.master_encryption_key
}
pub async fn call_database<D, R, M>(
&self,
state: &KeyManagerState,
key_store: &MerchantKeyStore,
execute_query: R,
) -> error_stack::Result<D, StorageError>
where
D: Debug + Sync + Conversion,
R: futures::Future<Output = error_stack::Result<M, diesel_models::errors::DatabaseError>>
+ Send,
M: ReverseConversion<D>,
{
execute_query
.await
.map_err(|error| {
let new_err = diesel_error_to_data_error(*error.current_context());
error.change_context(new_err)
})?
.convert(
state,
key_store.key.get_inner(),
key_store.merchant_id.clone().into(),
)
.await
.change_context(StorageError::DecryptionError)
}
pub async fn find_resources<D, R, M>(
&self,
state: &KeyManagerState,
key_store: &MerchantKeyStore,
execute_query: R,
) -> error_stack::Result<Vec<D>, StorageError>
where
D: Debug + Sync + Conversion,
R: futures::Future<
Output = error_stack::Result<Vec<M>, diesel_models::errors::DatabaseError>,
> + Send,
M: ReverseConversion<D>,
{
let resource_futures = execute_query
.await
.map_err(|error| {
let new_err = diesel_error_to_data_error(*error.current_context());
error.change_context(new_err)
})?
.into_iter()
.map(|resource| async {
resource
.convert(
state,
key_store.key.get_inner(),
key_store.merchant_id.clone().into(),
)
.await
.change_context(StorageError::DecryptionError)
})
.collect::<Vec<_>>();
let resources = futures::future::try_join_all(resource_futures).await?;
Ok(resources)
}
/// # Panics
///
/// Will panic if `CONNECTOR_AUTH_FILE_PATH` is not set
@ -173,121 +241,6 @@ impl<T: DatabaseStore> RouterStore<T> {
}
}
#[derive(Debug, Clone)]
pub struct KVRouterStore<T: DatabaseStore> {
router_store: RouterStore<T>,
drainer_stream_name: String,
drainer_num_partitions: u8,
ttl_for_kv: u32,
pub request_id: Option<String>,
soft_kill_mode: bool,
}
#[async_trait::async_trait]
impl<T> DatabaseStore for KVRouterStore<T>
where
RouterStore<T>: DatabaseStore,
T: DatabaseStore,
{
type Config = (RouterStore<T>, String, u8, u32, Option<bool>);
async fn new(
config: Self::Config,
tenant_config: &dyn config::TenantConfig,
_test_transaction: bool,
) -> StorageResult<Self> {
let (router_store, _, drainer_num_partitions, ttl_for_kv, soft_kill_mode) = config;
let drainer_stream_name = format!("{}_{}", tenant_config.get_schema(), config.1);
Ok(Self::from_store(
router_store,
drainer_stream_name,
drainer_num_partitions,
ttl_for_kv,
soft_kill_mode,
))
}
fn get_master_pool(&self) -> &PgPool {
self.router_store.get_master_pool()
}
fn get_replica_pool(&self) -> &PgPool {
self.router_store.get_replica_pool()
}
fn get_accounts_master_pool(&self) -> &PgPool {
self.router_store.get_accounts_master_pool()
}
fn get_accounts_replica_pool(&self) -> &PgPool {
self.router_store.get_accounts_replica_pool()
}
}
impl<T: DatabaseStore> RedisConnInterface for KVRouterStore<T> {
fn get_redis_conn(&self) -> error_stack::Result<Arc<RedisConnectionPool>, RedisError> {
self.router_store.get_redis_conn()
}
}
impl<T: DatabaseStore> KVRouterStore<T> {
pub fn from_store(
store: RouterStore<T>,
drainer_stream_name: String,
drainer_num_partitions: u8,
ttl_for_kv: u32,
soft_kill: Option<bool>,
) -> Self {
let request_id = store.request_id.clone();
Self {
router_store: store,
drainer_stream_name,
drainer_num_partitions,
ttl_for_kv,
request_id,
soft_kill_mode: soft_kill.unwrap_or(false),
}
}
pub fn master_key(&self) -> &StrongSecret<Vec<u8>> {
self.router_store.master_key()
}
pub fn get_drainer_stream_name(&self, shard_key: &str) -> String {
format!("{{{}}}_{}", shard_key, self.drainer_stream_name)
}
pub async fn push_to_drainer_stream<R>(
&self,
redis_entry: diesel_models::kv::TypedSql,
partition_key: redis::kv_store::PartitionKey<'_>,
) -> error_stack::Result<(), RedisError>
where
R: redis::kv_store::KvStorePartition,
{
let global_id = format!("{}", partition_key);
let request_id = self.request_id.clone().unwrap_or_default();
let shard_key = R::shard_key(partition_key, self.drainer_num_partitions);
let stream_name = self.get_drainer_stream_name(&shard_key);
self.router_store
.cache_store
.redis_conn
.stream_append_entry(
&stream_name.into(),
&redis_interface::RedisEntryId::AutoGeneratedID,
redis_entry
.to_field_value_pairs(request_id, global_id)
.change_context(RedisError::JsonSerializationFailed)?,
)
.await
.map(|_| metrics::KV_PUSHED_TO_DRAINER.add(1, &[]))
.inspect_err(|error| {
metrics::KV_FAILED_TO_PUSH_TO_DRAINER.add(1, &[]);
logger::error!(?error, "Failed to add entry in drainer stream");
})
.change_context(RedisError::StreamAppendFailed)
}
}
// TODO: This should not be used beyond this crate
// Remove the pub modified once StorageScheme usage is completed
pub trait DataModelExt {
@ -503,11 +456,7 @@ impl UniqueConstraints for diesel_models::Customer {
}
}
#[cfg(not(feature = "payouts"))]
impl<T: DatabaseStore> PayoutAttemptInterface for KVRouterStore<T> {}
#[cfg(not(feature = "payouts"))]
impl<T: DatabaseStore> PayoutAttemptInterface for RouterStore<T> {}
#[cfg(not(feature = "payouts"))]
impl<T: DatabaseStore> PayoutsInterface for KVRouterStore<T> {}
#[cfg(not(feature = "payouts"))]
impl<T: DatabaseStore> PayoutsInterface for RouterStore<T> {}

View File

@ -12,9 +12,10 @@ use redis_interface::SetnxReply;
use crate::{
diesel_error_to_data_error,
errors::RedisErrorExt,
kv_router_store::KVRouterStore,
redis::kv_store::{decide_storage_scheme, kv_wrapper, KvOperation, Op, PartitionKey},
utils::{self, try_redis_get_else_try_database_get},
DatabaseStore, KVRouterStore, RouterStore,
DatabaseStore, RouterStore,
};
#[async_trait::async_trait]

View File

@ -1,10 +1,13 @@
use std::sync::Arc;
use common_utils::{errors::CustomResult, types::keymanager::KeyManagerState};
use diesel_models as store;
use error_stack::ResultExt;
use futures::lock::Mutex;
use futures::lock::{Mutex, MutexGuard};
use hyperswitch_domain_models::{
behaviour::{Conversion, ReverseConversion},
errors::StorageError,
merchant_key_store::MerchantKeyStore,
payments::{payment_attempt::PaymentAttempt, PaymentIntent},
};
use redis_interface::RedisSettings;
@ -109,6 +112,96 @@ impl MockDb {
themes: Default::default(),
})
}
pub async fn find_resource<D, R>(
&self,
state: &KeyManagerState,
key_store: &MerchantKeyStore,
resources: MutexGuard<'_, Vec<D>>,
filter_fn: impl Fn(&&D) -> bool,
error_message: String,
) -> CustomResult<R, StorageError>
where
D: Sync + ReverseConversion<R> + Clone,
R: Conversion,
{
let resource = resources.iter().find(filter_fn).cloned();
match resource {
Some(res) => Ok(res
.convert(
state,
key_store.key.get_inner(),
key_store.merchant_id.clone().into(),
)
.await
.change_context(StorageError::DecryptionError)?),
None => Err(StorageError::ValueNotFound(error_message).into()),
}
}
pub async fn find_resources<D, R>(
&self,
state: &KeyManagerState,
key_store: &MerchantKeyStore,
resources: MutexGuard<'_, Vec<D>>,
filter_fn: impl Fn(&&D) -> bool,
error_message: String,
) -> CustomResult<Vec<R>, StorageError>
where
D: Sync + ReverseConversion<R> + Clone,
R: Conversion,
{
let resources: Vec<_> = resources.iter().filter(filter_fn).cloned().collect();
if resources.is_empty() {
Err(StorageError::ValueNotFound(error_message).into())
} else {
let pm_futures = resources
.into_iter()
.map(|pm| async {
pm.convert(
state,
key_store.key.get_inner(),
key_store.merchant_id.clone().into(),
)
.await
.change_context(StorageError::DecryptionError)
})
.collect::<Vec<_>>();
let domain_resources = futures::future::try_join_all(pm_futures).await?;
Ok(domain_resources)
}
}
pub async fn update_resource<D, R>(
&self,
state: &KeyManagerState,
key_store: &MerchantKeyStore,
mut resources: MutexGuard<'_, Vec<D>>,
resource_updated: D,
filter_fn: impl Fn(&&mut D) -> bool,
error_message: String,
) -> CustomResult<R, StorageError>
where
D: Sync + ReverseConversion<R> + Clone,
R: Conversion,
{
if let Some(pm) = resources.iter_mut().find(filter_fn) {
*pm = resource_updated.clone();
let result = resource_updated
.convert(
state,
key_store.key.get_inner(),
key_store.merchant_id.clone().into(),
)
.await
.change_context(StorageError::DecryptionError)?;
Ok(result)
} else {
Err(StorageError::ValueNotFound(error_message).into())
}
}
}
#[cfg(not(feature = "payouts"))]

File diff suppressed because it is too large Load Diff

View File

@ -39,10 +39,11 @@ use router_env::{instrument, tracing};
use crate::{
diesel_error_to_data_error,
errors::RedisErrorExt,
kv_router_store::KVRouterStore,
lookup::ReverseLookupInterface,
redis::kv_store::{decide_storage_scheme, kv_wrapper, KvOperation, Op, PartitionKey},
utils::{pg_connection_read, pg_connection_write, try_redis_get_else_try_database_get},
DataModelExt, DatabaseStore, KVRouterStore, RouterStore,
DataModelExt, DatabaseStore, RouterStore,
};
#[async_trait::async_trait]

View File

@ -51,9 +51,10 @@ use crate::connection;
use crate::{
diesel_error_to_data_error,
errors::RedisErrorExt,
kv_router_store::KVRouterStore,
redis::kv_store::{decide_storage_scheme, kv_wrapper, KvOperation, Op, PartitionKey},
utils::{self, pg_connection_read, pg_connection_write},
DatabaseStore, KVRouterStore,
DatabaseStore,
};
#[async_trait::async_trait]

View File

@ -29,10 +29,11 @@ use router_env::{instrument, logger, tracing};
use crate::{
diesel_error_to_data_error,
errors::RedisErrorExt,
kv_router_store::KVRouterStore,
lookup::ReverseLookupInterface,
redis::kv_store::{decide_storage_scheme, kv_wrapper, KvOperation, Op, PartitionKey},
utils::{self, pg_connection_read, pg_connection_write},
DataModelExt, DatabaseStore, KVRouterStore,
DataModelExt, DatabaseStore,
};
#[async_trait::async_trait]

View File

@ -69,9 +69,10 @@ use crate::store::schema::{
use crate::{
diesel_error_to_data_error,
errors::RedisErrorExt,
kv_router_store::KVRouterStore,
redis::kv_store::{decide_storage_scheme, kv_wrapper, KvOperation, Op, PartitionKey},
utils::{self, pg_connection_read, pg_connection_write},
DataModelExt, DatabaseStore, KVRouterStore,
DataModelExt, DatabaseStore,
};
#[async_trait::async_trait]

View File

@ -8,7 +8,7 @@ use router_derive::TryGetEnumVariant;
use router_env::logger;
use serde::de;
use crate::{metrics, store::kv::TypedSql, KVRouterStore, UniqueConstraints};
use crate::{kv_router_store::KVRouterStore, metrics, store::kv::TypedSql, UniqueConstraints};
pub trait KvStorePartition {
fn partition_number(key: PartitionKey<'_>, num_partitions: u8) -> u32 {

View File

@ -67,3 +67,73 @@ where
},
}
}
use std::collections::HashSet;
use crate::UniqueConstraints;
fn union_vec<T>(mut kv_rows: Vec<T>, sql_rows: Vec<T>) -> Vec<T>
where
T: UniqueConstraints,
{
let mut kv_unique_keys = HashSet::new();
kv_rows.iter().for_each(|v| {
kv_unique_keys.insert(v.unique_constraints().concat());
});
sql_rows.into_iter().for_each(|v| {
let unique_key = v.unique_constraints().concat();
if !kv_unique_keys.contains(&unique_key) {
kv_rows.push(v);
}
});
kv_rows
}
pub async fn find_all_combined_kv_database<F, RFut, DFut, T>(
redis_fut: RFut,
database_call: F,
limit: Option<i64>,
) -> error_stack::Result<Vec<T>, StorageError>
where
T: UniqueConstraints,
F: FnOnce() -> DFut,
RFut:
futures::Future<Output = error_stack::Result<Vec<T>, redis_interface::errors::RedisError>>,
DFut: futures::Future<Output = error_stack::Result<Vec<T>, StorageError>>,
{
let trunc = |v: &mut Vec<_>| {
if let Some(l) = limit.and_then(|v| TryInto::try_into(v).ok()) {
v.truncate(l);
}
};
let limit_satisfies = |len: usize, limit: i64| {
TryInto::try_into(limit)
.ok()
.map_or(true, |val: usize| len >= val)
};
let redis_output = redis_fut.await;
match (redis_output, limit) {
(Ok(mut kv_rows), Some(lim)) if limit_satisfies(kv_rows.len(), lim) => {
trunc(&mut kv_rows);
Ok(kv_rows)
}
(Ok(kv_rows), _) => database_call().await.map(|db_rows| {
let mut res = union_vec(kv_rows, db_rows);
trunc(&mut res);
res
}),
(Err(redis_error), _) => match redis_error.current_context() {
redis_interface::errors::RedisError::NotFound => {
metrics::KV_MISS.add(1, &[]);
database_call().await
}
// Keeping the key empty here since the error would never go here.
_ => Err(redis_error.to_redis_failed_response("")),
},
}
}