refactor(storage_impl): split payment attempt models to domain + diesel (#2010)

Signed-off-by: chikke srujan <121822803+srujanchikke@users.noreply.github.com>
Co-authored-by: Mani Chandra <84711804+ThisIsMani@users.noreply.github.com>
Co-authored-by: Arjun Karthik <m.arjunkarthik@gmail.com>
Co-authored-by: Sai Harsha Vardhan <56996463+sai-harsha-vardhan@users.noreply.github.com>
Co-authored-by: Sanchith Hegde <22217505+SanchithHegde@users.noreply.github.com>
Co-authored-by: chikke srujan <121822803+srujanchikke@users.noreply.github.com>
Co-authored-by: Prasunna Soppa <prasunna.soppa@juspay.in>
Co-authored-by: github-actions <41898282+github-actions[bot]@users.noreply.github.com>
Co-authored-by: DEEPANSHU BANSAL <41580413+deepanshu-iiitu@users.noreply.github.com>
Co-authored-by: Arvind Patel <52006565+arvindpatel24@users.noreply.github.com>
Co-authored-by: Jagan Elavarasan <jaganelavarasan@gmail.com>
Co-authored-by: arvindpatel24 <arvind.patel@juspay.in>
Co-authored-by: anji-reddy-j <125157119+anji-reddy-j@users.noreply.github.com>
Co-authored-by: Hrithikesh <61539176+hrithikesh026@users.noreply.github.com>
Co-authored-by: Apoorv Dixit <64925866+apoorvdixit88@users.noreply.github.com>
Co-authored-by: Pa1NarK <69745008+pixincreate@users.noreply.github.com>
This commit is contained in:
Sampras Lopes
2023-09-11 17:33:47 +05:30
committed by GitHub
parent 25e82a1f7f
commit ad4b7de628
57 changed files with 2362 additions and 1546 deletions

View File

@ -1,26 +1,24 @@
use std::sync::Arc;
use common_utils::errors::CustomResult;
use data_models::{
errors::{StorageError, StorageResult},
payments::payment_intent::PaymentIntent,
};
use data_models::errors::{StorageError, StorageResult};
use diesel_models::{self as store};
use error_stack::ResultExt;
use futures::lock::Mutex;
use masking::StrongSecret;
use redis::{kv_store::RedisConnInterface, RedisStore};
pub mod config;
pub mod connection;
pub mod database;
pub mod errors;
mod lookup;
pub mod metrics;
pub mod mock_db;
pub mod payments;
pub mod redis;
pub mod refund;
mod utils;
use database::store::PgPool;
pub use mock_db::MockDb;
use redis_interface::errors::RedisError;
pub use crate::database::store::DatabaseStore;
@ -168,6 +166,7 @@ impl<T: DatabaseStore> RedisConnInterface for KVRouterStore<T> {
self.router_store.get_redis_conn()
}
}
impl<T: DatabaseStore> KVRouterStore<T> {
pub fn from_store(
store: RouterStore<T>,
@ -214,60 +213,6 @@ impl<T: DatabaseStore> KVRouterStore<T> {
}
}
#[derive(Clone)]
pub struct MockDb {
pub addresses: Arc<Mutex<Vec<store::Address>>>,
pub configs: Arc<Mutex<Vec<store::Config>>>,
pub merchant_accounts: Arc<Mutex<Vec<store::MerchantAccount>>>,
pub merchant_connector_accounts: Arc<Mutex<Vec<store::MerchantConnectorAccount>>>,
pub payment_attempts: Arc<Mutex<Vec<store::PaymentAttempt>>>,
pub payment_intents: Arc<Mutex<Vec<PaymentIntent>>>,
pub payment_methods: Arc<Mutex<Vec<store::PaymentMethod>>>,
pub customers: Arc<Mutex<Vec<store::Customer>>>,
pub refunds: Arc<Mutex<Vec<store::Refund>>>,
pub processes: Arc<Mutex<Vec<store::ProcessTracker>>>,
pub connector_response: Arc<Mutex<Vec<store::ConnectorResponse>>>,
// pub redis: Arc<redis_interface::RedisConnectionPool>,
pub api_keys: Arc<Mutex<Vec<store::ApiKey>>>,
pub ephemeral_keys: Arc<Mutex<Vec<store::EphemeralKey>>>,
pub cards_info: Arc<Mutex<Vec<store::CardInfo>>>,
pub events: Arc<Mutex<Vec<store::Event>>>,
pub disputes: Arc<Mutex<Vec<store::Dispute>>>,
pub lockers: Arc<Mutex<Vec<store::LockerMockUp>>>,
pub mandates: Arc<Mutex<Vec<store::Mandate>>>,
pub captures: Arc<Mutex<Vec<crate::store::capture::Capture>>>,
pub merchant_key_store: Arc<Mutex<Vec<crate::store::merchant_key_store::MerchantKeyStore>>>,
pub business_profiles: Arc<Mutex<Vec<crate::store::business_profile::BusinessProfile>>>,
}
impl MockDb {
pub async fn new() -> Self {
Self {
addresses: Default::default(),
configs: Default::default(),
merchant_accounts: Default::default(),
merchant_connector_accounts: Default::default(),
payment_attempts: Default::default(),
payment_intents: Default::default(),
payment_methods: Default::default(),
customers: Default::default(),
refunds: Default::default(),
processes: Default::default(),
connector_response: Default::default(),
// redis: Arc::new(crate::connection::redis_connection(&redis).await),
api_keys: Default::default(),
ephemeral_keys: Default::default(),
cards_info: Default::default(),
events: Default::default(),
disputes: Default::default(),
lockers: Default::default(),
mandates: Default::default(),
captures: Default::default(),
merchant_key_store: Default::default(),
business_profiles: Default::default(),
}
}
}
// TODO: This should not be used beyond this crate
// Remove the pub modified once StorageScheme usage is completed
pub trait DataModelExt {
@ -294,14 +239,6 @@ impl DataModelExt for data_models::MerchantStorageScheme {
}
}
impl RedisConnInterface for MockDb {
fn get_redis_conn(
&self,
) -> Result<Arc<redis_interface::RedisConnectionPool>, error_stack::Report<RedisError>> {
Err(RedisError::RedisConnectionError.into())
}
}
pub(crate) fn diesel_error_to_data_error(
diesel_error: &diesel_models::errors::DatabaseError,
) -> StorageError {

View File

@ -0,0 +1,72 @@
use common_utils::errors::CustomResult;
use data_models::errors;
use diesel_models::reverse_lookup::{
ReverseLookup as DieselReverseLookup, ReverseLookupNew as DieselReverseLookupNew,
};
use error_stack::{IntoReport, ResultExt};
use crate::{redis::cache::get_or_populate_redis, DatabaseStore, KVRouterStore, RouterStore};
#[async_trait::async_trait]
pub trait ReverseLookupInterface {
async fn insert_reverse_lookup(
&self,
_new: DieselReverseLookupNew,
) -> CustomResult<DieselReverseLookup, errors::StorageError>;
async fn get_lookup_by_lookup_id(
&self,
_id: &str,
) -> CustomResult<DieselReverseLookup, errors::StorageError>;
}
#[async_trait::async_trait]
impl<T: DatabaseStore> ReverseLookupInterface for RouterStore<T> {
async fn insert_reverse_lookup(
&self,
new: DieselReverseLookupNew,
) -> CustomResult<DieselReverseLookup, errors::StorageError> {
let conn = self
.get_master_pool()
.get()
.await
.into_report()
.change_context(errors::StorageError::DatabaseConnectionError)?;
new.insert(&conn).await.map_err(|er| {
let new_err = crate::diesel_error_to_data_error(er.current_context());
er.change_context(new_err)
})
}
async fn get_lookup_by_lookup_id(
&self,
id: &str,
) -> CustomResult<DieselReverseLookup, errors::StorageError> {
let database_call = || async {
let conn = crate::utils::pg_connection_read(self).await?;
DieselReverseLookup::find_by_lookup_id(id, &conn)
.await
.map_err(|er| {
let new_err = crate::diesel_error_to_data_error(er.current_context());
er.change_context(new_err)
})
};
get_or_populate_redis(self, id, database_call).await
}
}
#[async_trait::async_trait]
impl<T: DatabaseStore> ReverseLookupInterface for KVRouterStore<T> {
async fn insert_reverse_lookup(
&self,
new: DieselReverseLookupNew,
) -> CustomResult<DieselReverseLookup, errors::StorageError> {
self.router_store.insert_reverse_lookup(new).await
}
async fn get_lookup_by_lookup_id(
&self,
id: &str,
) -> CustomResult<DieselReverseLookup, errors::StorageError> {
self.router_store.get_lookup_by_lookup_id(id).await
}
}

View File

@ -0,0 +1,75 @@
use std::sync::Arc;
use data_models::{
errors::StorageError,
payments::{payment_attempt::PaymentAttempt, payment_intent::PaymentIntent},
};
use diesel_models::{self as store};
use error_stack::ResultExt;
use futures::lock::Mutex;
use redis_interface::RedisSettings;
use crate::redis::RedisStore;
pub mod payment_attempt;
pub mod payment_intent;
pub mod redis_conn;
#[derive(Clone)]
pub struct MockDb {
pub addresses: Arc<Mutex<Vec<store::Address>>>,
pub configs: Arc<Mutex<Vec<store::Config>>>,
pub merchant_accounts: Arc<Mutex<Vec<store::MerchantAccount>>>,
pub merchant_connector_accounts: Arc<Mutex<Vec<store::MerchantConnectorAccount>>>,
pub payment_attempts: Arc<Mutex<Vec<PaymentAttempt>>>,
pub payment_intents: Arc<Mutex<Vec<PaymentIntent>>>,
pub payment_methods: Arc<Mutex<Vec<store::PaymentMethod>>>,
pub customers: Arc<Mutex<Vec<store::Customer>>>,
pub refunds: Arc<Mutex<Vec<store::Refund>>>,
pub processes: Arc<Mutex<Vec<store::ProcessTracker>>>,
pub connector_response: Arc<Mutex<Vec<store::ConnectorResponse>>>,
pub redis: Arc<RedisStore>,
pub api_keys: Arc<Mutex<Vec<store::ApiKey>>>,
pub ephemeral_keys: Arc<Mutex<Vec<store::EphemeralKey>>>,
pub cards_info: Arc<Mutex<Vec<store::CardInfo>>>,
pub events: Arc<Mutex<Vec<store::Event>>>,
pub disputes: Arc<Mutex<Vec<store::Dispute>>>,
pub lockers: Arc<Mutex<Vec<store::LockerMockUp>>>,
pub mandates: Arc<Mutex<Vec<store::Mandate>>>,
pub captures: Arc<Mutex<Vec<crate::store::capture::Capture>>>,
pub merchant_key_store: Arc<Mutex<Vec<crate::store::merchant_key_store::MerchantKeyStore>>>,
pub business_profiles: Arc<Mutex<Vec<crate::store::business_profile::BusinessProfile>>>,
}
impl MockDb {
pub async fn new(redis: &RedisSettings) -> error_stack::Result<Self, StorageError> {
Ok(Self {
addresses: Default::default(),
configs: Default::default(),
merchant_accounts: Default::default(),
merchant_connector_accounts: Default::default(),
payment_attempts: Default::default(),
payment_intents: Default::default(),
payment_methods: Default::default(),
customers: Default::default(),
refunds: Default::default(),
processes: Default::default(),
connector_response: Default::default(),
redis: Arc::new(
RedisStore::new(redis)
.await
.change_context(StorageError::InitializationError)?,
),
api_keys: Default::default(),
ephemeral_keys: Default::default(),
cards_info: Default::default(),
events: Default::default(),
disputes: Default::default(),
lockers: Default::default(),
mandates: Default::default(),
captures: Default::default(),
merchant_key_store: Default::default(),
business_profiles: Default::default(),
})
}
}

View File

@ -0,0 +1,199 @@
use api_models::enums::{Connector, PaymentMethod};
use common_utils::errors::CustomResult;
use data_models::{
errors::StorageError,
payments::payment_attempt::{
PaymentAttempt, PaymentAttemptInterface, PaymentAttemptNew, PaymentAttemptUpdate,
},
MerchantStorageScheme,
};
use super::MockDb;
use crate::DataModelExt;
#[async_trait::async_trait]
impl PaymentAttemptInterface for MockDb {
async fn find_payment_attempt_by_payment_id_merchant_id_attempt_id(
&self,
_payment_id: &str,
_merchant_id: &str,
_attempt_id: &str,
_storage_scheme: MerchantStorageScheme,
) -> CustomResult<PaymentAttempt, StorageError> {
// [#172]: Implement function for `MockDb`
Err(StorageError::MockDbError)?
}
async fn get_filters_for_payments(
&self,
_pi: &[data_models::payments::payment_intent::PaymentIntent],
_merchant_id: &str,
_storage_scheme: MerchantStorageScheme,
) -> CustomResult<data_models::payments::payment_attempt::PaymentListFilters, StorageError>
{
Err(StorageError::MockDbError)?
}
async fn get_total_count_of_filtered_payment_attempts(
&self,
_merchant_id: &str,
_active_attempt_ids: &[String],
_connector: Option<Vec<Connector>>,
_payment_methods: Option<Vec<PaymentMethod>>,
_storage_scheme: MerchantStorageScheme,
) -> CustomResult<i64, StorageError> {
Err(StorageError::MockDbError)?
}
async fn find_payment_attempt_by_attempt_id_merchant_id(
&self,
_attempt_id: &str,
_merchant_id: &str,
_storage_scheme: MerchantStorageScheme,
) -> CustomResult<PaymentAttempt, StorageError> {
// [#172]: Implement function for `MockDb`
Err(StorageError::MockDbError)?
}
async fn find_payment_attempt_by_preprocessing_id_merchant_id(
&self,
_preprocessing_id: &str,
_merchant_id: &str,
_storage_scheme: MerchantStorageScheme,
) -> CustomResult<PaymentAttempt, StorageError> {
// [#172]: Implement function for `MockDb`
Err(StorageError::MockDbError)?
}
async fn find_payment_attempt_by_merchant_id_connector_txn_id(
&self,
_merchant_id: &str,
_connector_txn_id: &str,
_storage_scheme: MerchantStorageScheme,
) -> CustomResult<PaymentAttempt, StorageError> {
// [#172]: Implement function for `MockDb`
Err(StorageError::MockDbError)?
}
async fn find_attempts_by_merchant_id_payment_id(
&self,
_merchant_id: &str,
_payment_id: &str,
_storage_scheme: MerchantStorageScheme,
) -> CustomResult<Vec<PaymentAttempt>, StorageError> {
// [#172]: Implement function for `MockDb`
Err(StorageError::MockDbError)?
}
#[allow(clippy::panic)]
async fn insert_payment_attempt(
&self,
payment_attempt: PaymentAttemptNew,
_storage_scheme: MerchantStorageScheme,
) -> CustomResult<PaymentAttempt, StorageError> {
let mut payment_attempts = self.payment_attempts.lock().await;
#[allow(clippy::as_conversions)]
let id = payment_attempts.len() as i32;
let time = common_utils::date_time::now();
let payment_attempt = PaymentAttempt {
id,
payment_id: payment_attempt.payment_id,
merchant_id: payment_attempt.merchant_id,
attempt_id: payment_attempt.attempt_id,
status: payment_attempt.status,
amount: payment_attempt.amount,
currency: payment_attempt.currency,
save_to_locker: payment_attempt.save_to_locker,
connector: payment_attempt.connector,
error_message: payment_attempt.error_message,
offer_amount: payment_attempt.offer_amount,
surcharge_amount: payment_attempt.surcharge_amount,
tax_amount: payment_attempt.tax_amount,
payment_method_id: payment_attempt.payment_method_id,
payment_method: payment_attempt.payment_method,
connector_transaction_id: None,
capture_method: payment_attempt.capture_method,
capture_on: payment_attempt.capture_on,
confirm: payment_attempt.confirm,
authentication_type: payment_attempt.authentication_type,
created_at: payment_attempt.created_at.unwrap_or(time),
modified_at: payment_attempt.modified_at.unwrap_or(time),
last_synced: payment_attempt.last_synced,
cancellation_reason: payment_attempt.cancellation_reason,
amount_to_capture: payment_attempt.amount_to_capture,
mandate_id: None,
browser_info: None,
payment_token: None,
error_code: payment_attempt.error_code,
connector_metadata: None,
payment_experience: payment_attempt.payment_experience,
payment_method_type: payment_attempt.payment_method_type,
payment_method_data: payment_attempt.payment_method_data,
business_sub_label: payment_attempt.business_sub_label,
straight_through_algorithm: payment_attempt.straight_through_algorithm,
mandate_details: payment_attempt.mandate_details,
preprocessing_step_id: payment_attempt.preprocessing_step_id,
error_reason: payment_attempt.error_reason,
multiple_capture_count: payment_attempt.multiple_capture_count,
connector_response_reference_id: None,
};
payment_attempts.push(payment_attempt.clone());
Ok(payment_attempt)
}
// safety: only used for testing
#[allow(clippy::unwrap_used)]
async fn update_payment_attempt_with_attempt_id(
&self,
this: PaymentAttempt,
payment_attempt: PaymentAttemptUpdate,
_storage_scheme: MerchantStorageScheme,
) -> CustomResult<PaymentAttempt, StorageError> {
let mut payment_attempts = self.payment_attempts.lock().await;
let item = payment_attempts
.iter_mut()
.find(|item| item.attempt_id == this.attempt_id)
.unwrap();
*item = PaymentAttempt::from_storage_model(
payment_attempt
.to_storage_model()
.apply_changeset(this.to_storage_model()),
);
Ok(item.clone())
}
async fn find_payment_attempt_by_connector_transaction_id_payment_id_merchant_id(
&self,
_connector_transaction_id: &str,
_payment_id: &str,
_merchant_id: &str,
_storage_scheme: MerchantStorageScheme,
) -> CustomResult<PaymentAttempt, StorageError> {
// [#172]: Implement function for `MockDb`
Err(StorageError::MockDbError)?
}
// safety: only used for testing
#[allow(clippy::unwrap_used)]
async fn find_payment_attempt_last_successful_attempt_by_payment_id_merchant_id(
&self,
payment_id: &str,
merchant_id: &str,
_storage_scheme: MerchantStorageScheme,
) -> CustomResult<PaymentAttempt, StorageError> {
let payment_attempts = self.payment_attempts.lock().await;
Ok(payment_attempts
.iter()
.find(|payment_attempt| {
payment_attempt.payment_id == payment_id
&& payment_attempt.merchant_id == merchant_id
})
.cloned()
.unwrap())
}
}

View File

@ -0,0 +1,149 @@
use common_utils::errors::CustomResult;
use data_models::{
errors::StorageError,
payments::payment_intent::{
PaymentIntent, PaymentIntentInterface, PaymentIntentNew, PaymentIntentUpdate,
},
MerchantStorageScheme,
};
use error_stack::{IntoReport, ResultExt};
use super::MockDb;
#[async_trait::async_trait]
impl PaymentIntentInterface for MockDb {
#[cfg(feature = "olap")]
async fn filter_payment_intent_by_constraints(
&self,
_merchant_id: &str,
_filters: &data_models::payments::payment_intent::PaymentIntentFetchConstraints,
_storage_scheme: MerchantStorageScheme,
) -> CustomResult<Vec<PaymentIntent>, StorageError> {
// [#172]: Implement function for `MockDb`
Err(StorageError::MockDbError)?
}
#[cfg(feature = "olap")]
async fn filter_payment_intents_by_time_range_constraints(
&self,
_merchant_id: &str,
_time_range: &api_models::payments::TimeRange,
_storage_scheme: MerchantStorageScheme,
) -> CustomResult<Vec<PaymentIntent>, StorageError> {
// [#172]: Implement function for `MockDb`
Err(StorageError::MockDbError)?
}
#[cfg(feature = "olap")]
async fn get_filtered_active_attempt_ids_for_total_count(
&self,
_merchant_id: &str,
_constraints: &data_models::payments::payment_intent::PaymentIntentFetchConstraints,
_storage_scheme: MerchantStorageScheme,
) -> error_stack::Result<Vec<String>, StorageError> {
// [#172]: Implement function for `MockDb`
Err(StorageError::MockDbError)?
}
#[cfg(feature = "olap")]
async fn get_filtered_payment_intents_attempt(
&self,
_merchant_id: &str,
_constraints: &data_models::payments::payment_intent::PaymentIntentFetchConstraints,
_storage_scheme: MerchantStorageScheme,
) -> error_stack::Result<
Vec<(
PaymentIntent,
data_models::payments::payment_attempt::PaymentAttempt,
)>,
StorageError,
> {
// [#172]: Implement function for `MockDb`
Err(StorageError::MockDbError)?
}
#[allow(clippy::panic)]
async fn insert_payment_intent(
&self,
new: PaymentIntentNew,
_storage_scheme: MerchantStorageScheme,
) -> CustomResult<PaymentIntent, StorageError> {
let mut payment_intents = self.payment_intents.lock().await;
let time = common_utils::date_time::now();
let payment_intent = PaymentIntent {
#[allow(clippy::as_conversions)]
id: payment_intents
.len()
.try_into()
.into_report()
.change_context(StorageError::MockDbError)?,
payment_id: new.payment_id,
merchant_id: new.merchant_id,
status: new.status,
amount: new.amount,
currency: new.currency,
amount_captured: new.amount_captured,
customer_id: new.customer_id,
description: new.description,
return_url: new.return_url,
metadata: new.metadata,
connector_id: new.connector_id,
shipping_address_id: new.shipping_address_id,
billing_address_id: new.billing_address_id,
statement_descriptor_name: new.statement_descriptor_name,
statement_descriptor_suffix: new.statement_descriptor_suffix,
created_at: new.created_at.unwrap_or(time),
modified_at: new.modified_at.unwrap_or(time),
last_synced: new.last_synced,
setup_future_usage: new.setup_future_usage,
off_session: new.off_session,
client_secret: new.client_secret,
business_country: new.business_country,
business_label: new.business_label,
active_attempt_id: new.active_attempt_id.to_owned(),
order_details: new.order_details,
allowed_payment_method_types: new.allowed_payment_method_types,
connector_metadata: new.connector_metadata,
feature_metadata: new.feature_metadata,
attempt_count: new.attempt_count,
profile_id: new.profile_id,
merchant_decision: new.merchant_decision,
payment_confirm_source: new.payment_confirm_source,
};
payment_intents.push(payment_intent.clone());
Ok(payment_intent)
}
// safety: only used for testing
#[allow(clippy::unwrap_used)]
async fn update_payment_intent(
&self,
this: PaymentIntent,
update: PaymentIntentUpdate,
_storage_scheme: MerchantStorageScheme,
) -> CustomResult<PaymentIntent, StorageError> {
let mut payment_intents = self.payment_intents.lock().await;
let payment_intent = payment_intents
.iter_mut()
.find(|item| item.id == this.id)
.unwrap();
*payment_intent = update.apply_changeset(this);
Ok(payment_intent.clone())
}
// safety: only used for testing
#[allow(clippy::unwrap_used)]
async fn find_payment_intent_by_payment_id_merchant_id(
&self,
payment_id: &str,
merchant_id: &str,
_storage_scheme: MerchantStorageScheme,
) -> CustomResult<PaymentIntent, StorageError> {
let payment_intents = self.payment_intents.lock().await;
Ok(payment_intents
.iter()
.find(|payment_intent| {
payment_intent.payment_id == payment_id && payment_intent.merchant_id == merchant_id
})
.cloned()
.unwrap())
}
}

View File

@ -0,0 +1,14 @@
use std::sync::Arc;
use redis_interface::errors::RedisError;
use super::MockDb;
use crate::redis::kv_store::RedisConnInterface;
impl RedisConnInterface for MockDb {
fn get_redis_conn(
&self,
) -> Result<Arc<redis_interface::RedisConnectionPool>, error_stack::Report<RedisError>> {
self.redis.get_redis_conn()
}
}

File diff suppressed because it is too large Load Diff

View File

@ -36,7 +36,7 @@ use router_env::logger;
use crate::{
redis::kv_store::{PartitionKey, RedisConnInterface},
utils::{pg_connection_read, pg_connection_write},
CustomResult, DataModelExt, DatabaseStore, KVRouterStore, MockDb,
DataModelExt, DatabaseStore, KVRouterStore,
};
#[async_trait::async_trait]
@ -663,138 +663,6 @@ impl<T: DatabaseStore> PaymentIntentInterface for crate::RouterStore<T> {
}
}
#[async_trait::async_trait]
impl PaymentIntentInterface for MockDb {
#[cfg(feature = "olap")]
async fn filter_payment_intent_by_constraints(
&self,
_merchant_id: &str,
_filters: &PaymentIntentFetchConstraints,
_storage_scheme: MerchantStorageScheme,
) -> CustomResult<Vec<PaymentIntent>, StorageError> {
// [#172]: Implement function for `MockDb`
Err(StorageError::MockDbError)?
}
#[cfg(feature = "olap")]
async fn filter_payment_intents_by_time_range_constraints(
&self,
_merchant_id: &str,
_time_range: &api_models::payments::TimeRange,
_storage_scheme: MerchantStorageScheme,
) -> CustomResult<Vec<PaymentIntent>, StorageError> {
// [#172]: Implement function for `MockDb`
Err(StorageError::MockDbError)?
}
#[cfg(feature = "olap")]
async fn get_filtered_active_attempt_ids_for_total_count(
&self,
_merchant_id: &str,
_constraints: &PaymentIntentFetchConstraints,
_storage_scheme: MerchantStorageScheme,
) -> error_stack::Result<Vec<String>, StorageError> {
// [#172]: Implement function for `MockDb`
Err(StorageError::MockDbError)?
}
#[cfg(feature = "olap")]
async fn get_filtered_payment_intents_attempt(
&self,
_merchant_id: &str,
_constraints: &PaymentIntentFetchConstraints,
_storage_scheme: MerchantStorageScheme,
) -> error_stack::Result<Vec<(PaymentIntent, PaymentAttempt)>, StorageError> {
// [#172]: Implement function for `MockDb`
Err(StorageError::MockDbError)?
}
#[allow(clippy::panic)]
async fn insert_payment_intent(
&self,
new: PaymentIntentNew,
_storage_scheme: MerchantStorageScheme,
) -> CustomResult<PaymentIntent, StorageError> {
let mut payment_intents = self.payment_intents.lock().await;
let time = common_utils::date_time::now();
let payment_intent = PaymentIntent {
#[allow(clippy::as_conversions)]
id: payment_intents
.len()
.try_into()
.into_report()
.change_context(StorageError::MockDbError)?,
payment_id: new.payment_id,
merchant_id: new.merchant_id,
status: new.status,
amount: new.amount,
currency: new.currency,
amount_captured: new.amount_captured,
customer_id: new.customer_id,
description: new.description,
return_url: new.return_url,
metadata: new.metadata,
connector_id: new.connector_id,
shipping_address_id: new.shipping_address_id,
billing_address_id: new.billing_address_id,
statement_descriptor_name: new.statement_descriptor_name,
statement_descriptor_suffix: new.statement_descriptor_suffix,
created_at: new.created_at.unwrap_or(time),
modified_at: new.modified_at.unwrap_or(time),
last_synced: new.last_synced,
setup_future_usage: new.setup_future_usage,
off_session: new.off_session,
client_secret: new.client_secret,
business_country: new.business_country,
business_label: new.business_label,
active_attempt_id: new.active_attempt_id.to_owned(),
order_details: new.order_details,
allowed_payment_method_types: new.allowed_payment_method_types,
connector_metadata: new.connector_metadata,
feature_metadata: new.feature_metadata,
attempt_count: new.attempt_count,
profile_id: new.profile_id,
merchant_decision: new.merchant_decision,
payment_confirm_source: new.payment_confirm_source,
};
payment_intents.push(payment_intent.clone());
Ok(payment_intent)
}
// safety: only used for testing
#[allow(clippy::unwrap_used)]
async fn update_payment_intent(
&self,
this: PaymentIntent,
update: PaymentIntentUpdate,
_storage_scheme: MerchantStorageScheme,
) -> CustomResult<PaymentIntent, StorageError> {
let mut payment_intents = self.payment_intents.lock().await;
let payment_intent = payment_intents
.iter_mut()
.find(|item| item.id == this.id)
.unwrap();
*payment_intent = update.apply_changeset(this);
Ok(payment_intent.clone())
}
// safety: only used for testing
#[allow(clippy::unwrap_used)]
async fn find_payment_intent_by_payment_id_merchant_id(
&self,
payment_id: &str,
merchant_id: &str,
_storage_scheme: MerchantStorageScheme,
) -> CustomResult<PaymentIntent, StorageError> {
let payment_intents = self.payment_intents.lock().await;
Ok(payment_intents
.iter()
.find(|payment_intent| {
payment_intent.payment_id == payment_id && payment_intent.merchant_id == merchant_id
})
.cloned()
.unwrap())
}
}
impl DataModelExt for PaymentIntentNew {
type StorageModel = DieselPaymentIntentNew;

View File

@ -1,11 +1,19 @@
use std::{any::Any, borrow::Cow, sync::Arc};
use common_utils::errors;
use common_utils::{
errors::{self, CustomResult},
ext_traits::AsyncExt,
};
use data_models::errors::StorageError;
use dyn_clone::DynClone;
use error_stack::Report;
use error_stack::{Report, ResultExt};
use moka::future::Cache as MokaCache;
use once_cell::sync::Lazy;
use redis_interface::RedisValue;
use redis_interface::{errors::RedisError, RedisValue};
use super::{kv_store::RedisConnInterface, pub_sub::PubSubInterface};
pub(crate) const PUB_SUB_CHANNEL: &str = "hyperswitch_invalidate";
/// Prefix for config cache key
const CONFIG_CACHE_PREFIX: &str = "config";
@ -128,6 +136,127 @@ impl Cache {
}
}
pub async fn get_or_populate_redis<T, F, Fut>(
store: &(dyn RedisConnInterface + Send + Sync),
key: &str,
fun: F,
) -> CustomResult<T, StorageError>
where
T: serde::Serialize + serde::de::DeserializeOwned + std::fmt::Debug,
F: FnOnce() -> Fut + Send,
Fut: futures::Future<Output = CustomResult<T, StorageError>> + Send,
{
let type_name = std::any::type_name::<T>();
let redis = &store
.get_redis_conn()
.map_err(|er| {
let error = format!("{}", er);
er.change_context(StorageError::RedisError(error))
})
.attach_printable("Failed to get redis connection")?;
let redis_val = redis.get_and_deserialize_key::<T>(key, type_name).await;
let get_data_set_redis = || async {
let data = fun().await?;
redis
.serialize_and_set_key(key, &data)
.await
.change_context(StorageError::KVError)?;
Ok::<_, Report<StorageError>>(data)
};
match redis_val {
Err(err) => match err.current_context() {
RedisError::NotFound | RedisError::JsonDeserializationFailed => {
get_data_set_redis().await
}
_ => Err(err
.change_context(StorageError::KVError)
.attach_printable(format!("Error while fetching cache for {type_name}"))),
},
Ok(val) => Ok(val),
}
}
pub async fn get_or_populate_in_memory<T, F, Fut>(
store: &(dyn RedisConnInterface + Send + Sync),
key: &str,
fun: F,
cache: &Cache,
) -> CustomResult<T, StorageError>
where
T: Cacheable + serde::Serialize + serde::de::DeserializeOwned + std::fmt::Debug + Clone,
F: FnOnce() -> Fut + Send,
Fut: futures::Future<Output = CustomResult<T, StorageError>> + Send,
{
let cache_val = cache.get_val::<T>(key);
if let Some(val) = cache_val {
Ok(val)
} else {
let val = get_or_populate_redis(store, key, fun).await?;
cache.push(key.to_string(), val.clone()).await;
Ok(val)
}
}
pub async fn redact_cache<T, F, Fut>(
store: &dyn RedisConnInterface,
key: &str,
fun: F,
in_memory: Option<&Cache>,
) -> CustomResult<T, StorageError>
where
F: FnOnce() -> Fut + Send,
Fut: futures::Future<Output = CustomResult<T, StorageError>> + Send,
{
let data = fun().await?;
in_memory.async_map(|cache| cache.invalidate(key)).await;
let redis_conn = store
.get_redis_conn()
.map_err(|er| {
let error = format!("{}", er);
er.change_context(StorageError::RedisError(error))
})
.attach_printable("Failed to get redis connection")?;
redis_conn
.delete_key(key)
.await
.change_context(StorageError::KVError)?;
Ok(data)
}
pub async fn publish_into_redact_channel<'a>(
store: &dyn RedisConnInterface,
key: CacheKind<'a>,
) -> CustomResult<usize, StorageError> {
let redis_conn = store
.get_redis_conn()
.map_err(|er| {
let error = format!("{}", er);
er.change_context(StorageError::RedisError(error))
})
.attach_printable("Failed to get redis connection")?;
redis_conn
.publish(PUB_SUB_CHANNEL, key)
.await
.change_context(StorageError::KVError)
}
pub async fn publish_and_redact<'a, T, F, Fut>(
store: &dyn RedisConnInterface,
key: CacheKind<'a>,
fun: F,
) -> CustomResult<T, StorageError>
where
F: FnOnce() -> Fut + Send,
Fut: futures::Future<Output = CustomResult<T, StorageError>> + Send,
{
let data = fun().await?;
publish_into_redact_channel(store, key).await?;
Ok(data)
}
#[cfg(test)]
mod cache_tests {
use super::*;

View File

@ -68,3 +68,13 @@ where
},
}
}
/// Generates hscan field pattern. Suppose the field is pa_1234 it will generate
/// pa_*
pub fn generate_hscan_pattern_for_attempt(sk: &str) -> String {
sk.split('_')
.take(1)
.chain(["*"])
.collect::<Vec<&str>>()
.join("_")
}